Files
Projektarbeit-MYP/backend/utils/hardware_integration.py

730 lines
29 KiB
Python

#!/usr/bin/env python3.11
"""
Hardware Integration - VOLLSTÄNDIGE Backend-Steuerung für Drucker/Steckdosen
============================================================================
NEUE PHILOSOPHIE - BACKEND DIKTIERT FRONTEND:
- Drucker werden AUSSCHLIESSLICH über ihre Tapo-Steckdosen gesteuert
- KEIN JavaScript für Hardware-Steuerung - nur Flask/Jinja
- Backend sammelt ALLE Daten und übergibt sie komplett an Templates
- Frontend ist PASSIV und zeigt nur an, was Backend vorgibt
Autor: Till Tomczak - Mercedes-Benz TBA Marienfelde
Datum: 2025-06-19 (Komplett-Neuschreibung für Backend-Kontrolle)
"""
import time
import socket
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# Hardware-Bibliotheken
try:
from PyP100.PyP100 import P100 as PyP100
from PyP100.PyP110 import P110 as PyP110
TAPO_AVAILABLE = True
except ImportError:
PyP100 = None
PyP110 = None
TAPO_AVAILABLE = False
# MYP Models & Utils
from models import get_db_session, Printer, PlugStatusLog
from utils.logging_config import get_logger
import os
# Logger
hardware_logger = get_logger("hardware_integration")
# ===== DRUCKER-STEUERUNGS-KLASSE =====
class DruckerSteuerung:
"""
VOLLSTÄNDIGE Backend-Steuerung aller Drucker über Tapo-Steckdosen.
Diese Klasse übernimmt die KOMPLETTE Kontrolle:
- Status-Sammlung für alle Drucker
- Ein/Aus-Schaltung über Steckdosen
- Energiemonitoring
- Template-Daten-Vorbereitung
Das Frontend erhält fertige Daten und muss NICHTS selbst berechnen!
"""
def __init__(self):
"""Initialisiere die Drucker-Steuerung"""
# Tapo-Zugangsdaten für Mercedes-Benz
self.tapo_username = "till.tomczak@mercedes-benz.com"
self.tapo_password = "744563017196A"
self.timeout = 10
self.retry_count = 3
# Backend-State-Management
self.drucker_stati = {} # Aktueller Status aller Drucker
self.energie_daten = {} # Energie-Monitoring-Daten
self.letztes_update = {} # Letzte Aktualisierung pro Drucker
self.verbindung_cache = {} # Connection-Pool für Performance
hardware_logger.info("🎯 DruckerSteuerung initialisiert - BACKEND ÜBERNIMMT KONTROLLE")
if not TAPO_AVAILABLE:
hardware_logger.warning("⚠️ PyP100 nicht verfügbar - Simulation-Modus aktiv")
# ===== KERN-STEUERUNGS-FUNKTIONEN =====
def drucker_einschalten(self, drucker_id: int, grund: str = "Manuell") -> Dict[str, Any]:
"""
Schaltet einen Drucker über seine Tapo-Steckdose EIN.
Args:
drucker_id: ID des Druckers
grund: Grund für das Einschalten (für Logging)
Returns:
Dict mit Ergebnis und neuen Template-Daten
"""
hardware_logger.info(f"🟢 Drucker {drucker_id} wird eingeschaltet - Grund: {grund}")
try:
with get_db_session() as session:
drucker = session.query(Printer).filter(Printer.id == drucker_id).first()
if not drucker:
return {
'success': False,
'error': f'Drucker {drucker_id} nicht gefunden',
'template_data': {}
}
if not drucker.plug_ip:
return {
'success': False,
'error': f'Keine Steckdosen-IP für {drucker.name} konfiguriert',
'template_data': {}
}
# Steckdose einschalten
erfolg = self._steckdose_schalten(drucker.plug_ip, True)
if erfolg:
# Drucker-Status in DB aktualisieren
drucker.status = 'online'
drucker.last_checked = datetime.now()
session.commit()
# Status-Log erstellen
self._status_log_erstellen(drucker_id, 'turned_on', grund)
# Backend-State aktualisieren
self._drucker_state_aktualisieren(drucker)
hardware_logger.info(f"✅ Drucker {drucker.name} erfolgreich eingeschaltet")
return {
'success': True,
'message': f'Drucker {drucker.name} eingeschaltet',
'template_data': self._template_daten_sammeln()
}
else:
hardware_logger.error(f"❌ Drucker {drucker.name} konnte nicht eingeschaltet werden")
return {
'success': False,
'error': f'Steckdose {drucker.plug_ip} nicht erreichbar',
'template_data': self._template_daten_sammeln()
}
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Einschalten von Drucker {drucker_id}: {e}")
return {
'success': False,
'error': f'Technischer Fehler: {str(e)}',
'template_data': {}
}
def drucker_ausschalten(self, drucker_id: int, grund: str = "Manuell") -> Dict[str, Any]:
"""
Schaltet einen Drucker über seine Tapo-Steckdose AUS.
Args:
drucker_id: ID des Druckers
grund: Grund für das Ausschalten (für Logging)
Returns:
Dict mit Ergebnis und neuen Template-Daten
"""
hardware_logger.info(f"🔴 Drucker {drucker_id} wird ausgeschaltet - Grund: {grund}")
try:
with get_db_session() as session:
drucker = session.query(Printer).filter(Printer.id == drucker_id).first()
if not drucker:
return {
'success': False,
'error': f'Drucker {drucker_id} nicht gefunden',
'template_data': {}
}
if not drucker.plug_ip:
return {
'success': False,
'error': f'Keine Steckdosen-IP für {drucker.name} konfiguriert',
'template_data': {}
}
# Steckdose ausschalten
erfolg = self._steckdose_schalten(drucker.plug_ip, False)
if erfolg:
# Drucker-Status in DB aktualisieren
drucker.status = 'offline'
drucker.last_checked = datetime.now()
session.commit()
# Status-Log erstellen
self._status_log_erstellen(drucker_id, 'turned_off', grund)
# Backend-State aktualisieren
self._drucker_state_aktualisieren(drucker)
hardware_logger.info(f"✅ Drucker {drucker.name} erfolgreich ausgeschaltet")
return {
'success': True,
'message': f'Drucker {drucker.name} ausgeschaltet',
'template_data': self._template_daten_sammeln()
}
else:
hardware_logger.error(f"❌ Drucker {drucker.name} konnte nicht ausgeschaltet werden")
return {
'success': False,
'error': f'Steckdose {drucker.plug_ip} nicht erreichbar',
'template_data': self._template_daten_sammeln()
}
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Ausschalten von Drucker {drucker_id}: {e}")
return {
'success': False,
'error': f'Technischer Fehler: {str(e)}',
'template_data': {}
}
def drucker_toggle(self, drucker_id: int, grund: str = "Toggle") -> Dict[str, Any]:
"""
Wechselt den Status eines Druckers (Ein <-> Aus).
Args:
drucker_id: ID des Druckers
grund: Grund für den Wechsel
Returns:
Dict mit Ergebnis und neuen Template-Daten
"""
try:
with get_db_session() as session:
drucker = session.query(Printer).filter(Printer.id == drucker_id).first()
if not drucker:
return {
'success': False,
'error': f'Drucker {drucker_id} nicht gefunden',
'template_data': {}
}
# Aktueller Status bestimmen
if drucker.status == 'online':
return self.drucker_ausschalten(drucker_id, f"{grund} (war online)")
else:
return self.drucker_einschalten(drucker_id, f"{grund} (war offline)")
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Toggle von Drucker {drucker_id}: {e}")
return {
'success': False,
'error': f'Technischer Fehler: {str(e)}',
'template_data': {}
}
# ===== DATEN-SAMMLUNG FÜR TEMPLATES =====
def template_daten_sammeln(self) -> Dict[str, Any]:
"""
Sammelt ALLE Daten für die Frontend-Templates.
Das Frontend muss NICHTS selbst berechnen!
Returns:
Dict mit allen Template-Daten für Jinja
"""
hardware_logger.debug("📊 Sammle Template-Daten für Frontend")
try:
with get_db_session() as session:
drucker_liste = session.query(Printer).order_by(Printer.name).all()
# Drucker-Daten mit Status sammeln
drucker_daten = []
gesamt_online = 0
gesamt_offline = 0
for drucker in drucker_liste:
# Status aktualisieren falls nötig
aktueller_status = self._drucker_status_pruefen(drucker)
drucker_info = {
'id': drucker.id,
'name': drucker.name,
'model': drucker.model or 'Unbekannt',
'location': drucker.location or 'TBA Marienfelde',
'status': aktueller_status,
'plug_ip': drucker.plug_ip,
'ip_address': drucker.ip_address,
'active': drucker.active,
'last_checked': drucker.last_checked,
'created_at': drucker.created_at,
# UI-Hilfsdaten
'status_class': 'success' if aktueller_status == 'online' else 'danger',
'status_text': 'Online' if aktueller_status == 'online' else 'Offline',
'status_icon': '🟢' if aktueller_status == 'online' else '🔴',
'kann_gesteuert_werden': bool(drucker.plug_ip),
'toggle_text': 'Ausschalten' if aktueller_status == 'online' else 'Einschalten',
'toggle_action': 'off' if aktueller_status == 'online' else 'on',
# Energie-Daten (Mock für Demo)
'current_power': 125.5 if aktueller_status == 'online' else 0.0,
'daily_consumption': 2.4 if aktueller_status == 'online' else 0.0,
'monthly_consumption': 45.8,
}
drucker_daten.append(drucker_info)
if aktueller_status == 'online':
gesamt_online += 1
else:
gesamt_offline += 1
# System-Statistiken
statistiken = {
'gesamt_drucker': len(drucker_liste),
'online_drucker': gesamt_online,
'offline_drucker': gesamt_offline,
'verfügbarkeits_rate': round((gesamt_online / len(drucker_liste) * 100) if drucker_liste else 0, 1),
'letztes_update': datetime.now(),
# Energie-Gesamtdaten (Mock)
'gesamt_verbrauch': round(sum(d['daily_consumption'] for d in drucker_daten), 2),
'aktuelle_leistung': round(sum(d['current_power'] for d in drucker_daten), 1),
'geschätzte_kosten': round(sum(d['daily_consumption'] for d in drucker_daten) * 0.30, 2)
}
return {
'drucker': drucker_daten,
'stats': statistiken,
'system_status': 'healthy' if gesamt_online > 0 else 'warning',
'timestamp': datetime.now().isoformat(),
'tapo_verfügbar': TAPO_AVAILABLE
}
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Sammeln der Template-Daten: {e}")
return {
'drucker': [],
'stats': {
'gesamt_drucker': 0,
'online_drucker': 0,
'offline_drucker': 0,
'verfügbarkeits_rate': 0.0
},
'system_status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat(),
'tapo_verfügbar': TAPO_AVAILABLE
}
# ===== PRIVATE HILFSFUNKTIONEN =====
def _steckdose_schalten(self, ip: str, einschalten: bool) -> bool:
"""Schaltet eine Tapo-Steckdose ein oder aus"""
if not TAPO_AVAILABLE:
hardware_logger.warning(f"⚠️ Simulation: Steckdose {ip} würde {'eingeschaltet' if einschalten else 'ausgeschaltet'}")
return True # Simulation immer erfolgreich
# Zuerst Netzwerk-Erreichbarkeit prüfen
if not self._erweiterte_netzwerk_prüfung(ip):
hardware_logger.error(f"❌ Steckdose {ip} ist im Netzwerk nicht erreichbar")
return False
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
action = "einschalten" if einschalten else "ausschalten"
hardware_logger.debug(f"🔌 Versuche Steckdose {ip} zu {action} (Versuch {retry_count + 1}/{max_retries})")
# P100-Verbindung herstellen mit Timeout
p100 = PyP100(ip, self.tapo_username, self.tapo_password)
p100.handshake()
p100.login()
# Schalten
if einschalten:
p100.turnOn()
else:
p100.turnOff()
hardware_logger.info(f"✅ Steckdose {ip} erfolgreich {action}")
return True
except Exception as e:
retry_count += 1
error_msg = str(e)
# Spezifische Fehlerbehandlung
if "Connection refused" in error_msg:
hardware_logger.error(f"❌ Verbindung zu {ip} verweigert - Steckdose antwortet nicht auf Port 80")
elif "timeout" in error_msg.lower():
hardware_logger.error(f"❌ Zeitüberschreitung bei Verbindung zu {ip}")
elif "handshake" in error_msg.lower():
hardware_logger.error(f"❌ Tapo-Handshake fehlgeschlagen für {ip} - Möglicherweise falsche Credentials")
elif "login" in error_msg.lower():
hardware_logger.error(f"❌ Tapo-Login fehlgeschlagen für {ip} - Benutzername/Passwort prüfen")
else:
hardware_logger.error(f"❌ Fehler beim Schalten der Steckdose {ip}: {e}")
if retry_count < max_retries:
hardware_logger.info(f"🔄 Warte 2 Sekunden vor erneutem Versuch...")
time.sleep(2)
hardware_logger.error(f"❌ Alle {max_retries} Versuche für Steckdose {ip} fehlgeschlagen")
return False
def _drucker_status_pruefen(self, drucker: Printer) -> str:
"""Prüft den aktuellen Status eines Druckers"""
if not drucker.plug_ip:
return 'unknown'
# Ping-Test zur Steckdose
if self._ping_test(drucker.plug_ip):
return 'online'
else:
return 'offline'
def _ping_test(self, ip: str, timeout: int = 3) -> bool:
"""Einfacher Ping-Test zu einer IP"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, 80))
sock.close()
return result == 0
except:
return False
def _erweiterte_netzwerk_prüfung(self, ip: str) -> bool:
"""
Erweiterte Netzwerk-Prüfung mit mehreren Tests.
Args:
ip: IP-Adresse zum Prüfen
Returns:
bool: True wenn erreichbar
"""
hardware_logger.debug(f"🔍 Erweiterte Netzwerk-Prüfung für {ip}")
# Test 1: Port 80 (HTTP)
if self._ping_test(ip, timeout=2):
hardware_logger.debug(f"{ip} auf Port 80 erreichbar")
return True
# Test 2: Port 9999 (Tapo-spezifisch für manche Modelle)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((ip, 9999))
sock.close()
if result == 0:
hardware_logger.debug(f"{ip} auf Port 9999 erreichbar")
return True
except:
pass
# Test 3: ICMP Ping (falls verfügbar)
try:
import subprocess
# Windows und Linux kompatibel
param = '-n' if os.name == 'nt' else '-c'
command = ['ping', param, '1', '-w', '2000', ip]
result = subprocess.run(command, capture_output=True, text=True, timeout=3)
if result.returncode == 0:
hardware_logger.debug(f"{ip} via ICMP Ping erreichbar")
return True
except:
pass
hardware_logger.warning(f"⚠️ {ip} ist über keine Methode erreichbar")
return False
def _status_log_erstellen(self, drucker_id: int, action: str, grund: str):
"""Erstellt einen Eintrag im Status-Log"""
try:
with get_db_session() as session:
log_entry = PlugStatusLog(
printer_id=drucker_id,
timestamp=datetime.now(),
action=action,
reason=grund,
success=True
)
session.add(log_entry)
session.commit()
except Exception as e:
hardware_logger.warning(f"⚠️ Status-Log konnte nicht erstellt werden: {e}")
def _drucker_state_aktualisieren(self, drucker: Printer):
"""Aktualisiert den internen Backend-State"""
self.drucker_stati[drucker.id] = {
'name': drucker.name,
'status': drucker.status,
'last_update': datetime.now(),
'plug_ip': drucker.plug_ip
}
def _template_daten_sammeln(self) -> Dict[str, Any]:
"""Wrapper für template_daten_sammeln (Backward-Compatibility)"""
return self.template_daten_sammeln()
def check_outlet_status(self, ip: str, printer_id: Optional[int] = None) -> tuple:
"""
Prüft den Status einer Tapo-Steckdose.
Args:
ip: IP-Adresse der Steckdose
printer_id: Optional - ID des Druckers (für Legacy-Kompatibilität)
Returns:
tuple: (reachable: bool, status: str) - Legacy-Format für Kompatibilität
"""
hardware_logger.debug(f"🔍 Prüfe Steckdosen-Status: {ip}" + (f" (Drucker ID: {printer_id})" if printer_id else ""))
if not TAPO_AVAILABLE:
# Legacy-Format: (reachable, status)
return (True, 'online')
# Zuerst Netzwerk-Erreichbarkeit prüfen
if not self._erweiterte_netzwerk_prüfung(ip):
hardware_logger.warning(f"⚠️ Steckdose {ip} ist im Netzwerk nicht erreichbar")
return (False, 'unreachable')
retry_count = 0
max_retries = 2
while retry_count < max_retries:
try:
# Tapo P100/P110 Verbindung
p100 = PyP100(ip, self.tapo_username, self.tapo_password)
p100.handshake()
p100.login()
# Device Info abrufen
device_info = p100.getDeviceInfo()
if device_info and 'error_code' in device_info:
if device_info['error_code'] == 0:
device_on = device_info.get('result', {}).get('device_on', False)
hardware_logger.debug(f"✅ Steckdose {ip}: {'EIN' if device_on else 'AUS'}")
# Legacy-Format: (reachable, status)
return (True, 'on' if device_on else 'off')
else:
hardware_logger.warning(f"⚠️ Steckdose {ip} Error Code: {device_info['error_code']}")
return (False, 'error')
else:
hardware_logger.error(f"❌ Steckdose {ip}: Keine gültige Antwort")
return (False, 'unreachable')
except Exception as e:
retry_count += 1
error_msg = str(e)
if "Connection refused" in error_msg:
hardware_logger.error(f"❌ Verbindung zu {ip} verweigert")
elif "timeout" in error_msg.lower():
hardware_logger.error(f"❌ Zeitüberschreitung bei {ip}")
elif "handshake" in error_msg.lower():
hardware_logger.error(f"❌ Handshake-Fehler bei {ip}")
else:
hardware_logger.error(f"❌ Fehler beim Prüfen von Steckdose {ip}: {e}")
if retry_count < max_retries:
time.sleep(1)
return (False, 'unreachable')
def ping_address(self, ip: str, timeout: int = 5) -> bool:
"""
Prüft die Netzwerk-Erreichbarkeit einer IP-Adresse.
Args:
ip: IP-Adresse zum Testen
timeout: Timeout in Sekunden
Returns:
bool: True wenn erreichbar, False sonst
"""
hardware_logger.debug(f"📡 Teste Netzwerk-Erreichbarkeit: {ip}")
try:
# Socket-basierter Ping-Test auf Port 80 (HTTP)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, 80))
sock.close()
is_reachable = (result == 0)
hardware_logger.debug(f"📡 {ip}: {'✅ erreichbar' if is_reachable else '❌ nicht erreichbar'}")
return is_reachable
except Exception as e:
hardware_logger.debug(f"❌ Ping-Test für {ip} fehlgeschlagen: {e}")
return False
def turn_off(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool:
"""
Schaltet eine Tapo-Steckdose aus.
Args:
ip: IP-Adresse der Steckdose
username: Benutzername (wird ignoriert, verwendet interne Credentials)
password: Passwort (wird ignoriert, verwendet interne Credentials)
printer_id: Optional - ID des Druckers für Logging
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
hardware_logger.debug(f"🔴 Schalte Steckdose aus: {ip}" + (f" (Drucker ID: {printer_id})" if printer_id else ""))
if not TAPO_AVAILABLE:
hardware_logger.info(f"🔄 SIMULATION: Steckdose {ip} ausgeschaltet")
return True
try:
# P100-Verbindung mit internen Credentials
p100 = PyP100(ip, self.tapo_username, self.tapo_password)
p100.handshake()
p100.login()
# Steckdose ausschalten
p100.turnOff()
hardware_logger.info(f"✅ Steckdose {ip} erfolgreich ausgeschaltet")
# Status-Log erstellen falls Drucker-ID verfügbar
if printer_id:
try:
self._status_log_erstellen(printer_id, 'turned_off', 'Startup-Initialisierung')
except:
pass # Fehler beim Logging nicht kritisch
return True
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Ausschalten der Steckdose {ip}: {e}")
return False
def turn_on(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool:
"""
Schaltet eine Tapo-Steckdose ein.
Args:
ip: IP-Adresse der Steckdose
username: Benutzername (wird ignoriert, verwendet interne Credentials)
password: Passwort (wird ignoriert, verwendet interne Credentials)
printer_id: Optional - ID des Druckers für Logging
Returns:
bool: True wenn erfolgreich eingeschaltet
"""
hardware_logger.debug(f"🟢 Schalte Steckdose ein: {ip}" + (f" (Drucker ID: {printer_id})" if printer_id else ""))
if not TAPO_AVAILABLE:
hardware_logger.info(f"🔄 SIMULATION: Steckdose {ip} eingeschaltet")
return True
try:
# P100-Verbindung mit internen Credentials
p100 = PyP100(ip, self.tapo_username, self.tapo_password)
p100.handshake()
p100.login()
# Steckdose einschalten
p100.turnOn()
hardware_logger.info(f"✅ Steckdose {ip} erfolgreich eingeschaltet")
# Status-Log erstellen falls Drucker-ID verfügbar
if printer_id:
try:
self._status_log_erstellen(printer_id, 'turned_on', 'Manuell')
except:
pass # Fehler beim Logging nicht kritisch
return True
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Einschalten der Steckdose {ip}: {e}")
return False
# ===== GLOBALE INSTANZ =====
# Singleton-Pattern für globale Drucker-Steuerung
_drucker_steuerung_instanz = None
def get_drucker_steuerung() -> DruckerSteuerung:
"""
Gibt die globale DruckerSteuerung-Instanz zurück (Singleton).
Returns:
DruckerSteuerung: Die globale Instanz
"""
global _drucker_steuerung_instanz
if _drucker_steuerung_instanz is None:
_drucker_steuerung_instanz = DruckerSteuerung()
return _drucker_steuerung_instanz
# ===== LEGACY-KOMPATIBILITÄT =====
# Backward-Compatibility für bestehenden Code
def get_tapo_controller():
"""Legacy-Funktion für Rückwärtskompatibilität"""
return get_drucker_steuerung()
def toggle_plug(ip: str, state: bool) -> bool:
"""Legacy-Funktion für direktes Steckdosen-Schalten"""
controller = get_drucker_steuerung()
return controller._steckdose_schalten(ip, state)
def get_printer_monitor():
"""Legacy-Funktion für Drucker-Monitoring"""
return get_drucker_steuerung()
# Export für andere Module
__all__ = [
'DruckerSteuerung',
'get_drucker_steuerung',
'get_tapo_controller', # Legacy
'toggle_plug', # Legacy
'get_printer_monitor' # Legacy
]
hardware_logger.info("🚀 Hardware Integration (Backend-Kontrolle) erfolgreich geladen")