#!/usr/bin/env python3.11 """ Hardware Integration - Konsolidierte Hardware-Steuerung ==================================================== Migration Information: - Ursprünglich: tapo_controller.py, printer_monitor.py - Konsolidiert am: 2025-06-09 - Funktionalitäten: TP-Link Tapo Smart Plug Control, 3D-Printer Monitoring, Hardware Discovery, Status Management - Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar - Legacy Imports: Verfügbar über Wrapper-Funktionen Changelog: - v1.0 (2025-06-09): Initial massive consolidation for IHK project MASSIVE KONSOLIDIERUNG für Projektarbeit MYP - IHK-Dokumentation Author: MYP Team - Till Tomczak Ziel: 88% Datei-Reduktion bei vollständiger Funktionalitäts-Erhaltung """ import time import socket import threading import ipaddress import subprocess from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed # Optional Imports mit Fallback try: import requests REQUESTS_AVAILABLE = True except ImportError: REQUESTS_AVAILABLE = False try: from flask import session FLASK_AVAILABLE = True except ImportError: FLASK_AVAILABLE = False try: from sqlalchemy import func from sqlalchemy.orm import Session SQLALCHEMY_AVAILABLE = True except ImportError: SQLALCHEMY_AVAILABLE = False # MYP Models & Utils from models import get_db_session, Printer, PlugStatusLog from utils.logging_config import get_logger # Logger hardware_logger = get_logger("hardware_integration") tapo_logger = get_logger("tapo_controller") # Rückwärtskompatibilität monitor_logger = get_logger("printer_monitor") # Rückwärtskompatibilität # Hardware-Verfügbarkeit prüfen try: from PyP100.PyP100 import P100 as PyP100 from PyP100.PyP110 import P110 as PyP110 TAPO_AVAILABLE = True hardware_logger.info("✅ PyP100 (TP-Link Tapo) verfügbar") except ImportError: TAPO_AVAILABLE = False PyP100 = None PyP110 = None hardware_logger.warning("⚠️ PyP100 nicht verfügbar - Tapo-Funktionen eingeschränkt") # Exportierte Funktionen für Legacy-Kompatibilität __all__ = [ # Tapo Controller 'TapoController', 'get_tapo_controller', # Printer Monitor 'PrinterMonitor', 'get_printer_monitor', # Legacy Compatibility 'toggle_plug', 'test_tapo_connection', 'check_outlet_status', 'auto_discover_tapo_outlets', 'initialize_all_outlets', 'printer_monitor', 'tapo_controller' ] # ===== TAPO SMART PLUG CONTROLLER ===== class TapoController: """TP-Link Tapo Smart Plug Controller - Konsolidiert aus tapo_controller.py""" def __init__(self): """Initialisiere den Tapo Controller""" # Lazy import um zirkuläre Abhängigkeiten zu vermeiden try: from utils.utilities_collection import TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_TIMEOUT, TAPO_RETRY_COUNT self.username = TAPO_USERNAME self.password = TAPO_PASSWORD self.default_ips = DEFAULT_TAPO_IPS self.timeout = TAPO_TIMEOUT self.retry_count = TAPO_RETRY_COUNT except ImportError: # Fallback-Werte self.username = "admin" self.password = "admin" self.default_ips = [] self.timeout = 10 self.retry_count = 3 self.auto_discovered = False if not TAPO_AVAILABLE: tapo_logger.error("❌ PyP100-modul nicht installiert - tapo-funktionalität eingeschränkt") else: tapo_logger.info("✅ tapo controller initialisiert") def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None, debug: bool = True) -> bool: """ Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus Args: ip: IP-Adresse der Steckdose state: True für An, False für Aus username: Optional - Tapo-Benutzername password: Optional - Tapo-Passwort debug: Aktiviert erweiterte Debug-Ausgaben state: True = ein, False = aus username: Benutzername (optional, nutzt Standard wenn nicht angegeben) password: Passwort (optional, nutzt Standard wenn nicht angegeben) Returns: bool: True wenn erfolgreich geschaltet """ if not TAPO_AVAILABLE: tapo_logger.error("❌ PyP100-modul nicht installiert - steckdose kann nicht geschaltet werden") return False # Immer globale Anmeldedaten verwenden username = self.username password = self.password tapo_logger.debug(f"🔧 verwende globale tapo-anmeldedaten für {ip}") start_time = time.time() for attempt in range(self.retry_count): try: if debug: tapo_logger.debug(f"🔌 Versuch {attempt+1}/{self.retry_count}: Verbinde zu Tapo-Steckdose {ip}") # P100-Verbindung herstellen p100 = PyP100(ip, username, password) if debug: tapo_logger.debug(f"🤝 Handshake mit {ip}...") p100.handshake() if debug: tapo_logger.debug(f"🔐 Login bei {ip}...") p100.login() # Steckdose schalten action_time = time.time() if state: if debug: tapo_logger.debug(f"⚡ Schalte {ip} EIN...") p100.turnOn() tapo_logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet") else: if debug: tapo_logger.debug(f"🔴 Schalte {ip} AUS...") p100.turnOff() tapo_logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet") response_time = int((time.time() - start_time) * 1000) if debug: tapo_logger.debug(f"⏱️ Schaltvorgang für {ip} abgeschlossen in {response_time}ms") # Status-Logging new_status = "on" if state else "off" self._log_plug_status(None, new_status, ip, response_time_ms=response_time) return True except Exception as e: action = "ein" if state else "aus" response_time = int((time.time() - start_time) * 1000) if debug: tapo_logger.warning(f"⚠️ Versuch {attempt+1}/{self.retry_count} fehlgeschlagen beim {action}schalten von {ip}: {str(e)}") tapo_logger.debug(f"🔍 Fehlerdetails: Typ={type(e).__name__}, Zeit={response_time}ms") # Status-Logging bei Fehler self._log_plug_status(None, "disconnected", ip, response_time_ms=response_time, error_message=str(e)) if attempt < self.retry_count - 1: if debug: tapo_logger.debug(f"⏳ Warte 1 Sekunde vor erneutem Versuch...") time.sleep(1) else: tapo_logger.error(f"❌ Alle {self.retry_count} Versuche fehlgeschlagen beim {action}schalten der Tapo-Steckdose {ip}") if debug: tapo_logger.debug(f"💀 Finale Fehlerdetails: {str(e)}") return False def clear_cache(self) -> bool: """ Leert alle Caches des TapoControllers Returns: bool: True wenn erfolgreich """ try: # Hier können Cache-Daten geleert werden falls vorhanden # Aktuell verwendet TapoController keinen expliziten Cache, # aber diese Methode wird für Konsistenz bereitgestellt tapo_logger.debug("TapoController Cache geleert (keine Cache-Daten vorhanden)") return True except Exception as e: tapo_logger.error(f"Fehler beim Leeren des TapoController Cache: {str(e)}") return False def turn_off(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool: """ Schaltet eine TP-Link Tapo P110-Steckdose aus Args: ip: IP-Adresse der Steckdose username: Benutzername (optional) password: Passwort (optional) printer_id: ID des zugehörigen Druckers für Logging (optional) Returns: bool: True wenn erfolgreich ausgeschaltet """ if not TAPO_AVAILABLE: tapo_logger.error("⚠️ PyP100-modul nicht verfügbar - kann tapo-steckdose nicht schalten") self._log_plug_status(printer_id, "disconnected", ip, error_message="PyP100-modul nicht verfügbar") return False # Immer globale Anmeldedaten verwenden username = self.username password = self.password start_time = time.time() try: # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100(ip, username, password) p100.handshake() p100.login() # Steckdose ausschalten p100.turnOff() response_time = int((time.time() - start_time) * 1000) # in millisekunden tapo_logger.debug(f"✅ tapo-steckdose {ip} erfolgreich ausgeschaltet") # Logging: erfolgreich ausgeschaltet self._log_plug_status(printer_id, "off", ip, response_time_ms=response_time) return True except Exception as e: response_time = int((time.time() - start_time) * 1000) tapo_logger.debug(f"⚠️ fehler beim ausschalten der tapo-steckdose {ip}: {str(e)}") # Logging: fehlgeschlagener versuch self._log_plug_status(printer_id, "disconnected", ip, response_time_ms=response_time, error_message=str(e)) return False def check_outlet_status(self, ip: str, username: str = None, password: str = None, printer_id: int = None, debug: bool = True) -> Tuple[bool, str]: """ Überprüft den Status einer TP-Link Tapo P110-Steckdose Args: ip: IP-Adresse der Steckdose username: Benutzername (optional) password: Passwort (optional) printer_id: ID des zugehörigen Druckers für Logging (optional) Returns: Tuple[bool, str]: (erreichbar, status) - status: "on", "off", "unknown" """ if not TAPO_AVAILABLE: if debug: tapo_logger.warning("⚠️ PyP100-modul nicht verfügbar - verwende Fallback-Netzwerktest") # Fallback: Einfacher Ping-Test ping_reachable = self.ping_address(ip, timeout=3) if ping_reachable: tapo_logger.debug(f"📡 Fallback: {ip} ist über Netzwerk erreichbar, aber Status unbekannt") return True, "unknown" else: tapo_logger.debug(f"❌ Fallback: {ip} ist nicht erreichbar") return False, "unreachable" # Immer globale Anmeldedaten verwenden username = self.username password = self.password start_time = time.time() try: if debug: tapo_logger.debug(f"🔍 Status-Check für Tapo-Steckdose {ip} gestartet") # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100(ip, username, password) if debug: tapo_logger.debug(f"🤝 Handshake mit {ip}...") p100.handshake() if debug: tapo_logger.debug(f"🔐 Login bei {ip}...") p100.login() if debug: tapo_logger.debug(f"📊 Geräteinformationen von {ip} abrufen...") # Geräteinformationen abrufen device_info = p100.getDeviceInfo() # Status auswerten device_on = device_info.get('device_on', False) status = "on" if device_on else "off" response_time = int((time.time() - start_time) * 1000) if debug: tapo_logger.debug(f"✅ Tapo-Steckdose {ip}: Status = {status}, Reaktionszeit = {response_time}ms") tapo_logger.debug(f"📋 Device-Info: {device_info}") tapo_logger.info(f"✅ Tapo-Steckdose {ip}: Status = {status}") # Erweiterte Informationen sammeln extra_info = self._collect_device_info(p100, device_info, debug) if debug and extra_info: tapo_logger.debug(f"🔋 Zusätzliche Informationen für {ip}: {extra_info}") # Logging: erfolgreicher status-check self._log_plug_status(printer_id, status, ip, response_time_ms=response_time, power_consumption=extra_info.get('power_consumption'), voltage=extra_info.get('voltage'), current=extra_info.get('current'), firmware_version=extra_info.get('firmware_version'), notes="automatischer status-check") return True, status except Exception as e: response_time = int((time.time() - start_time) * 1000) if debug: tapo_logger.warning(f"⚠️ Fehler bei Tapo-Steckdosen-Status-Check {ip}: {str(e)}") tapo_logger.debug(f"🔍 Fehlerdetails: Typ={type(e).__name__}, Zeit={response_time}ms") tapo_logger.debug(f"💀 Exception-Details: {repr(e)}") tapo_logger.error(f"❌ Status-Check für {ip} fehlgeschlagen: {str(e)}") # Logging: fehlgeschlagener status-check self._log_plug_status(printer_id, "disconnected", ip, response_time_ms=response_time, error_message=str(e), notes="status-check fehlgeschlagen") return False, "unknown" def test_connection(self, ip: str, username: str = None, password: str = None) -> dict: """ Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose Args: ip: IP-Adresse der Steckdose username: Benutzername (optional) password: Passwort (optional) Returns: dict: Ergebnis mit Status und Informationen """ result = { "success": False, "message": "", "device_info": None, "error": None } if not TAPO_AVAILABLE: result["message"] = "PyP100-modul nicht verfügbar" result["error"] = "ModuleNotFound" tapo_logger.error("PyP100-modul nicht verfügbar - kann tapo-steckdosen nicht testen") return result # Verwende globale Anmeldedaten falls nicht angegeben if not username or not password: username = self.username password = self.password tapo_logger.debug(f"verwende globale tapo-anmeldedaten für {ip}") try: # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100(ip, username, password) p100.handshake() p100.login() # Geräteinformationen abrufen device_info = p100.getDeviceInfo() result["success"] = True result["message"] = "verbindung erfolgreich" result["device_info"] = device_info tapo_logger.info(f"tapo-verbindung zu {ip} erfolgreich: {device_info.get('nickname', 'unbekannt')}") except Exception as e: result["success"] = False result["message"] = f"verbindungsfehler: {str(e)}" result["error"] = str(e) tapo_logger.error(f"fehler bei tapo-test zu {ip}: {str(e)}") return result def ping_address(self, ip: str, timeout: int = 5) -> bool: """ Führt einen erweiterten Konnektivitätstest zu einer IP-Adresse durch Verwendet TCP-Verbindung und ICMP-Ping für maximale Kompatibilität Args: ip: Zu testende IP-Adresse timeout: Timeout in Sekunden Returns: bool: True wenn Verbindung erfolgreich """ try: # IP-Adresse validieren ipaddress.ip_address(ip.strip()) # 1. ICMP-Ping versuchen try: import subprocess result = subprocess.run( ['ping', '-c', '1', '-W', str(timeout), ip.strip()], capture_output=True, timeout=timeout + 2 ) if result.returncode == 0: tapo_logger.debug(f"✅ ICMP-Ping zu {ip} erfolgreich") return True except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: tapo_logger.debug(f"⚠️ ICMP-Ping zu {ip} fehlgeschlagen: {e}") # 2. TCP-Port-Tests für Tapo-Steckdosen test_ports = [9999, 80, 443, 22, 23] # Tapo-Standard, HTTP, HTTPS, SSH, Telnet for port in test_ports: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((ip.strip(), port)) sock.close() if result == 0: tapo_logger.debug(f"✅ TCP-Verbindung zu {ip}:{port} erfolgreich") return True except Exception as e: tapo_logger.debug(f"⚠️ TCP-Test zu {ip}:{port} fehlgeschlagen: {e}") continue # 3. Erweiterte Netzwerk-Tests try: # ARP-Test (falls möglich) import subprocess arp_result = subprocess.run( ['ping', '-c', '1', '-W', '1', ip.strip()], capture_output=True, timeout=3 ) if arp_result.returncode == 0: tapo_logger.debug(f"✅ Erweiterte Netzwerkerreichbarkeit für {ip} bestätigt") return True except Exception as e: tapo_logger.debug(f"⚠️ Erweiterter Netzwerktest für {ip} fehlgeschlagen: {e}") tapo_logger.debug(f"❌ Alle Konnektivitätstests zu {ip} fehlgeschlagen") return False except Exception as e: tapo_logger.debug(f"❌ Kritischer Fehler beim Konnektivitätstest zu {ip}: {str(e)}") return False def auto_discover_outlets(self) -> Dict[str, bool]: """ Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk Returns: Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel """ if self.auto_discovered: tapo_logger.info("🔍 tapo-steckdosen wurden bereits erkannt") return {} tapo_logger.info("🔍 starte automatische tapo-steckdosenerkennung...") results = {} start_time = time.time() # Standard-IPs aus der Konfiguration testen tapo_logger.info(f"🔄 teste {len(self.default_ips)} standard-ips aus der konfiguration") for i, ip in enumerate(self.default_ips): try: tapo_logger.info(f"🔍 teste ip {i+1}/{len(self.default_ips)}: {ip}") # Ping-Test mit 5 Sekunden Timeout if self.ping_address(ip, timeout=5): tapo_logger.info(f"✅ steckdose mit ip {ip} ist erreichbar") # Tapo-Verbindung testen test_result = self.test_connection(ip) if test_result["success"]: device_info = test_result["device_info"] nickname = device_info.get('nickname', f"tapo p110 ({ip})") state = "on" if device_info.get('device_on', False) else "off" tapo_logger.info(f"✅ tapo-steckdose '{nickname}' ({ip}) gefunden - status: {state}") results[ip] = True # Steckdose in Datenbank speichern/aktualisieren try: self._ensure_outlet_in_database(ip, nickname) except Exception as db_error: tapo_logger.warning(f"⚠️ fehler beim speichern in db für {ip}: {str(db_error)}") else: tapo_logger.debug(f"❌ ip {ip} ist erreichbar, aber keine tapo-steckdose") results[ip] = False else: tapo_logger.debug(f"❌ ip {ip} nicht erreichbar") results[ip] = False except Exception as e: tapo_logger.warning(f"❌ fehler bei steckdosen-erkennung für ip {ip}: {str(e)}") results[ip] = False continue # Erfolgsstatistik success_count = sum(1 for success in results.values() if success) elapsed_time = time.time() - start_time tapo_logger.info(f"✅ steckdosen-erkennung abgeschlossen: {success_count}/{len(results)} steckdosen gefunden in {elapsed_time:.1f}s") self.auto_discovered = True return results def initialize_all_outlets(self) -> Dict[str, bool]: """ Schaltet alle gespeicherten Steckdosen aus (einheitlicher Startzustand) Returns: Dict[str, bool]: Ergebnis der Initialisierung pro Drucker """ tapo_logger.info("🚀 starte steckdosen-initialisierung...") results = {} try: with get_db_session() as db_session: printers = db_session.query(Printer).filter(Printer.active == True).all() if not printers: tapo_logger.warning("⚠️ keine aktiven drucker zur initialisierung gefunden") return results # Alle Steckdosen ausschalten for printer in printers: try: if printer.plug_ip: success = self.turn_off( printer.plug_ip, printer_id=printer.id ) results[printer.name] = success if success: tapo_logger.info(f"✅ {printer.name}: steckdose ausgeschaltet") printer.status = "offline" printer.last_checked = datetime.now() else: tapo_logger.warning(f"❌ {printer.name}: steckdose konnte nicht ausgeschaltet werden") else: tapo_logger.warning(f"⚠️ {printer.name}: keine steckdosen-ip konfiguriert") results[printer.name] = False except Exception as e: tapo_logger.error(f"❌ fehler bei initialisierung von {printer.name}: {str(e)}") results[printer.name] = False # Änderungen speichern db_session.commit() success_count = sum(1 for success in results.values() if success) total_count = len(results) tapo_logger.info(f"🎯 steckdosen-initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich") except Exception as e: tapo_logger.error(f"❌ kritischer fehler bei steckdosen-initialisierung: {str(e)}") return results def get_all_outlet_status(self) -> Dict[str, Dict[str, Any]]: """ Holt den Status aller konfigurierten Tapo-Steckdosen Returns: Dict[str, Dict]: Status aller Steckdosen mit IP als Schlüssel """ status_dict = {} try: with get_db_session() as db_session: printers = db_session.query(Printer).filter( Printer.active == True, Printer.plug_ip.isnot(None) ).all() if not printers: tapo_logger.info("ℹ️ keine drucker mit tapo-steckdosen konfiguriert") return status_dict tapo_logger.info(f"🔍 prüfe status von {len(printers)} tapo-steckdosen...") # Parallel-Status-Prüfung with ThreadPoolExecutor(max_workers=min(len(printers), 8)) as executor: future_to_printer = { executor.submit( self.check_outlet_status, printer.plug_ip, printer_id=printer.id ): printer for printer in printers } for future in as_completed(future_to_printer, timeout=15): printer = future_to_printer[future] try: reachable, status = future.result() status_dict[printer.plug_ip] = { "printer_name": printer.name, "printer_id": printer.id, "reachable": reachable, "status": status, "ip": printer.plug_ip, "last_checked": datetime.now().isoformat() } except Exception as e: tapo_logger.error(f"❌ fehler bei status-check für {printer.name}: {str(e)}") status_dict[printer.plug_ip] = { "printer_name": printer.name, "printer_id": printer.id, "reachable": False, "status": "error", "ip": printer.plug_ip, "error": str(e), "last_checked": datetime.now().isoformat() } tapo_logger.info(f"✅ status-update abgeschlossen für {len(status_dict)} steckdosen") except Exception as e: tapo_logger.error(f"❌ kritischer fehler beim abrufen des steckdosen-status: {str(e)}") return status_dict def _collect_device_info(self, p100, device_info, debug: bool = False) -> dict: """ Sammelt erweiterte Geräteinformationen von der Tapo-Steckdose Args: p100: P100-Instanz device_info: Basis-Geräteinformationen debug: Debug-Modus aktivieren Returns: Dict: Erweiterte Informationen """ extra_info = {} try: if debug: tapo_logger.debug(f"🔋 Sammle erweiterte Geräteinformationen...") # Stromverbrauch abrufen (nur bei P110) try: if debug: tapo_logger.debug(f"⚡ Versuche Energieverbrauch abzurufen...") energy_usage = p100.getEnergyUsage() if energy_usage: extra_info['power_consumption'] = energy_usage.get('current_power', 0) extra_info['voltage'] = energy_usage.get('voltage_mv', 0) / 1000.0 extra_info['current'] = energy_usage.get('current_ma', 0) / 1000.0 if debug: tapo_logger.debug(f"🔌 Stromverbrauch: {extra_info['power_consumption']}W") tapo_logger.debug(f"⚡ Spannung: {extra_info['voltage']}V") tapo_logger.debug(f"🔄 Strom: {extra_info['current']}A") else: if debug: tapo_logger.debug(f"ℹ️ Keine Energiedaten verfügbar (vermutlich P100)") except Exception as energy_error: if debug: tapo_logger.debug(f"⚠️ Energiemessung nicht verfügbar: {energy_error}") pass # P100 unterstützt keine Energiemessung # Firmware und Hardware-Informationen extra_info['firmware_version'] = device_info.get('fw_ver', 'unknown') extra_info['hardware_version'] = device_info.get('hw_ver', 'unknown') extra_info['device_id'] = device_info.get('device_id', 'unknown') extra_info['mac_address'] = device_info.get('mac', 'unknown') if debug: tapo_logger.debug(f"📋 Firmware: {extra_info['firmware_version']}") tapo_logger.debug(f"🔧 Hardware: {extra_info['hardware_version']}") tapo_logger.debug(f"🆔 Device-ID: {extra_info['device_id']}") tapo_logger.debug(f"🌐 MAC: {extra_info['mac_address']}") except Exception as e: if debug: tapo_logger.warning(f"⚠️ Fehler beim Sammeln erweiterter Geräteinformationen: {e}") tapo_logger.debug(f"🔍 Fehlerdetails: {repr(e)}") else: tapo_logger.debug(f"Konnte erweiterte Geräteinformationen nicht abrufen: {e}") return extra_info def _log_plug_status(self, printer_id: int, status: str, ip_address: str, **kwargs): """ Loggt den Status einer Steckdose in die Datenbank Args: printer_id: ID des zugehörigen Druckers status: Status der Steckdose ("on", "off", "disconnected") ip_address: IP-Adresse der Steckdose **kwargs: Zusätzliche Informationen für das Log """ try: with get_db_session() as db_session: log_entry = PlugStatusLog( printer_id=printer_id, status=status, ip_address=ip_address, response_time_ms=kwargs.get('response_time_ms'), power_consumption=kwargs.get('power_consumption'), voltage=kwargs.get('voltage'), current=kwargs.get('current'), error_message=kwargs.get('error_message'), firmware_version=kwargs.get('firmware_version'), notes=kwargs.get('notes'), timestamp=datetime.now(), source='system', user_id=None ) db_session.add(log_entry) db_session.commit() except Exception as e: tapo_logger.debug(f"Fehler beim Loggen des Plug-Status: {e}") # Session wird automatisch mit rollback geschlossen durch context manager def _ensure_outlet_in_database(self, ip_address: str, nickname: str = None) -> bool: """ Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert Args: ip_address: IP-Adresse der Steckdose nickname: Name der Steckdose (optional) Returns: bool: True wenn erfolgreich in Datenbank gespeichert/aktualisiert """ try: with get_db_session() as db_session: # Prüfen, ob bereits ein Drucker mit dieser Steckdosen-IP existiert existing_printer = db_session.query(Printer).filter( Printer.plug_ip == ip_address ).first() if existing_printer: tapo_logger.debug(f"Steckdose {ip_address} bereits mit Drucker {existing_printer.name} verknüpft") return True # Neuen Drucker-Eintrag für die Steckdose erstellen printer_name = nickname or f"Tapo Plug {ip_address}" new_printer = Printer( name=printer_name, ip_address=ip_address, # Gleiche IP für Drucker und Steckdose plug_ip=ip_address, location="Automatisch erkannt", active=True, status="offline", plug_username=self.username, plug_password=self.password, last_checked=datetime.now() ) db_session.add(new_printer) db_session.commit() tapo_logger.info(f"✅ Neue Tapo-Steckdose '{printer_name}' ({ip_address}) in Datenbank gespeichert") return True except Exception as e: tapo_logger.error(f"❌ Fehler beim Speichern der Steckdose {ip_address} in Datenbank: {str(e)}") return False def get_energy_statistics(self) -> Dict[str, Any]: """ Sammelt Energiestatistiken von allen P110 Steckdosen. Returns: Dict: Aggregierte Energiestatistiken """ hardware_logger.info("🔋 Sammle Energiestatistiken von allen P110 Steckdosen...") try: db_session = get_db_session() printers = db_session.query(Printer).filter( Printer.active == True, Printer.plug_ip.isnot(None) ).all() statistics = { 'total_devices': 0, 'online_devices': 0, 'total_current_power': 0.0, 'total_today_energy': 0.0, 'total_month_energy': 0.0, 'devices': [], 'hourly_consumption': [0] * 24, # 24 Stunden 'daily_consumption': [0] * 30, # 30 Tage 'monthly_consumption': [0] * 12, # 12 Monate 'timestamp': datetime.now().isoformat() } for printer in printers: device_stats = { 'id': printer.id, 'name': printer.name, 'location': printer.location, 'model': printer.model, 'ip': printer.plug_ip, 'online': False, 'current_power': 0.0, 'today_energy': 0.0, 'month_energy': 0.0, 'voltage': 0.0, 'current': 0.0, 'past24h': [], 'past30d': [], 'past1y': [] } statistics['total_devices'] += 1 try: # P110 Energiedaten abrufen p110 = PyP110(printer.plug_ip, self.username, self.password) p110.handshake() p110.login() # Geräteinformationen device_info = p110.getDeviceInfo() if not device_info or 'result' not in device_info: continue # Nur P110 Geräte verarbeiten if 'P110' not in device_info['result'].get('model', ''): continue # Energiedaten abrufen energy_usage = p110.getEnergyUsage() if energy_usage and 'result' in energy_usage: energy_data = energy_usage['result'] device_stats['online'] = True device_stats['current_power'] = energy_data.get('current_power', 0) / 1000 # mW zu W device_stats['today_energy'] = energy_data.get('today_energy', 0) device_stats['month_energy'] = energy_data.get('month_energy', 0) device_stats['past24h'] = energy_data.get('past24h', []) device_stats['past30d'] = energy_data.get('past30d', []) device_stats['past1y'] = energy_data.get('past1y', []) # Aggregierte Werte statistics['online_devices'] += 1 statistics['total_current_power'] += device_stats['current_power'] statistics['total_today_energy'] += device_stats['today_energy'] statistics['total_month_energy'] += device_stats['month_energy'] # Stündliche Daten aggregieren (letzten 24h) if device_stats['past24h']: for i, hourly_value in enumerate(device_stats['past24h'][:24]): if i < len(statistics['hourly_consumption']): statistics['hourly_consumption'][i] += hourly_value # Tägliche Daten aggregieren (letzten 30 Tage) if device_stats['past30d']: for i, daily_value in enumerate(device_stats['past30d'][:30]): if i < len(statistics['daily_consumption']): statistics['daily_consumption'][i] += daily_value # Monatliche Daten aggregieren (letzten 12 Monate) if device_stats['past1y']: for i, monthly_value in enumerate(device_stats['past1y'][:12]): if i < len(statistics['monthly_consumption']): statistics['monthly_consumption'][i] += monthly_value hardware_logger.debug(f"✅ Energiedaten für {printer.name}: {device_stats['current_power']}W") except Exception as e: hardware_logger.warning(f"⚠️ Konnte Energiedaten für {printer.name} nicht abrufen: {str(e)}") statistics['devices'].append(device_stats) db_session.close() # Durchschnittswerte berechnen if statistics['online_devices'] > 0: statistics['avg_current_power'] = statistics['total_current_power'] / statistics['online_devices'] statistics['avg_today_energy'] = statistics['total_today_energy'] / statistics['online_devices'] statistics['avg_month_energy'] = statistics['total_month_energy'] / statistics['online_devices'] else: statistics['avg_current_power'] = 0.0 statistics['avg_today_energy'] = 0.0 statistics['avg_month_energy'] = 0.0 hardware_logger.info(f"✅ Energiestatistiken erfolgreich gesammelt: {statistics['online_devices']}/{statistics['total_devices']} Geräte online") hardware_logger.info(f"📊 Gesamtverbrauch: {statistics['total_current_power']:.1f}W aktuell, {statistics['total_today_energy']}Wh heute") return statistics except Exception as e: hardware_logger.error(f"❌ Fehler beim Sammeln der Energiestatistiken: {str(e)}") return { 'total_devices': 0, 'online_devices': 0, 'total_current_power': 0.0, 'total_today_energy': 0.0, 'total_month_energy': 0.0, 'devices': [], 'hourly_consumption': [0] * 24, 'daily_consumption': [0] * 30, 'monthly_consumption': [0] * 12, 'timestamp': datetime.now().isoformat(), 'error': str(e) } def turn_off_outlet(self, ip: str, printer_id: int = None) -> bool: """ Wrapper für Legacy-Kompatibilität - schaltet eine Tapo-Steckdose aus Args: ip: IP-Adresse der Steckdose printer_id: ID des zugehörigen Druckers für Logging (optional) Returns: bool: True wenn erfolgreich ausgeschaltet """ return self.turn_off(ip, printer_id=printer_id) def turn_on_outlet(self, ip: str, printer_id: int = None) -> bool: """ Wrapper für Legacy-Kompatibilität - schaltet eine Tapo-Steckdose ein Args: ip: IP-Adresse der Steckdose printer_id: ID des zugehörigen Druckers für Logging (optional) Returns: bool: True wenn erfolgreich eingeschaltet """ return self.toggle_plug(ip, True) # ===== PRINTER MONITOR ===== class PrinterMonitor: """3D-Drucker Monitor mit Status-Management und Session-Caching""" # Status-Konstanten STATUS_ON = "on" STATUS_OFF = "off" STATUS_UNREACHABLE = "unreachable" # Status-Mapping für UI STATUS_DISPLAY = { STATUS_ON: {"text": "An", "color": "green", "icon": "power"}, STATUS_OFF: {"text": "Aus", "color": "gray", "icon": "power-off"}, STATUS_UNREACHABLE: {"text": "Nicht erreichbar", "color": "red", "icon": "exclamation-triangle"} } def __init__(self): self.cache = {} self._cache_timeout = 300 # 5 Minuten Cache self._cache_lock = threading.RLock() self._last_check = {} self.check_interval = 30 # Sekunden zwischen Status-Checks # Session-spezifischer Status-Cache für Benutzer-Sessions self._session_cache = {} self._session_cache_lock = threading.RLock() self._session_cache_ttl = 300 # 5 Minuten für Session-Cache # Thread-Pool für asynchrone Operationen self._executor = ThreadPoolExecutor(max_workers=6) hardware_logger.info("✅ Printer Monitor mit Session-Caching initialisiert") def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]: """ Holt Live-Druckerstatus mit Cache-Unterstützung. Args: use_session_cache: Ob Cache verwendet werden soll Returns: Dict: Druckerstatus mit Drucker-ID als Schlüssel """ try: # Cache prüfen wenn aktiviert if use_session_cache and 'live_status' in self.cache: cache_entry = self.cache['live_status'] if (datetime.now() - cache_entry['timestamp']).total_seconds() < self._cache_timeout: hardware_logger.debug("Live-Status aus Cache abgerufen") return cache_entry['data'] db_session = get_db_session() printers = db_session.query(Printer).filter(Printer.active == True).all() status_dict = {} for printer in printers: # Basis-Status printer_status = { "id": printer.id, "name": printer.name, "model": printer.model, "location": printer.location, "status": printer.status, "ip_address": printer.ip_address, "plug_ip": printer.plug_ip, "has_plug": bool(printer.plug_ip), "active": printer.active, "last_checked": printer.last_checked.isoformat() if printer.last_checked else None, "created_at": printer.created_at.isoformat() if printer.created_at else None } # Tapo-Status wenn verfügbar if printer.plug_ip and TAPO_AVAILABLE: try: tapo_controller = get_tapo_controller() reachable, plug_status = tapo_controller.check_outlet_status( printer.plug_ip, printer_id=printer.id ) printer_status.update({ "plug_reachable": reachable, "plug_status": plug_status, "can_control": reachable }) except Exception as e: hardware_logger.error(f"Tapo-Status-Fehler für {printer.name}: {e}") printer_status.update({ "plug_reachable": False, "plug_status": "error", "can_control": False, "error": str(e) }) else: printer_status.update({ "plug_reachable": False, "plug_status": "no_plug", "can_control": False }) status_dict[printer.id] = printer_status db_session.close() # Cache aktualisieren if use_session_cache: self.cache['live_status'] = { 'data': status_dict, 'timestamp': datetime.now() } hardware_logger.info(f"Live-Status für {len(status_dict)} Drucker abgerufen") return status_dict except Exception as e: hardware_logger.error(f"Status-Fehler: {e}") return {} def get_printer_summary(self) -> Dict[str, Any]: """ Erstellt eine Zusammenfassung des Druckerstatus. Returns: Dict: Zusammenfassung mit Zählern und Statistiken """ try: status_data = self.get_live_printer_status(use_session_cache=True) summary = { 'total': len(status_data), 'online': 0, 'offline': 0, 'standby': 0, 'unreachable': 0, 'with_plug': 0, 'plug_online': 0, 'plug_offline': 0 } for printer_id, printer_data in status_data.items(): status = printer_data.get('status', 'offline') # Status-Zähler if status == 'online': summary['online'] += 1 elif status == 'standby': summary['standby'] += 1 elif status == 'unreachable': summary['unreachable'] += 1 else: summary['offline'] += 1 # Plug-Zähler if printer_data.get('has_plug'): summary['with_plug'] += 1 plug_status = printer_data.get('plug_status', 'unknown') if plug_status == 'on': summary['plug_online'] += 1 elif plug_status == 'off': summary['plug_offline'] += 1 return summary except Exception as e: hardware_logger.error(f"Summary-Fehler: {e}") return { 'total': 0, 'online': 0, 'offline': 0, 'standby': 0, 'unreachable': 0, 'with_plug': 0, 'plug_online': 0, 'plug_offline': 0 } def clear_all_caches(self): """Leert alle Caches des Printer Monitors.""" with self._cache_lock: self.cache.clear() self._last_check.clear() with self._session_cache_lock: self._session_cache.clear() hardware_logger.debug("Alle Printer Monitor Caches geleert") def control_plug(self, printer_id: int, action: str) -> Tuple[bool, str]: """ Steuert eine Tapo-Steckdose über den TapoController Args: printer_id: ID des Druckers action: "on" oder "off" Returns: Tuple (Erfolg, Nachricht) """ try: db_session = get_db_session() printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: return False, "Drucker nicht gefunden" if not printer.plug_ip: return False, "Keine Steckdose konfiguriert" # Tapo-Controller verwenden tapo_ctrl = get_tapo_controller() if not tapo_ctrl: return False, "Tapo-Controller nicht verfügbar" # Aktion ausführen success = False if action == "on": success = tapo_ctrl.toggle_plug(printer.plug_ip, True) elif action == "off": success = tapo_ctrl.turn_off(printer.plug_ip, printer_id=printer_id) else: return False, f"Ungültige Aktion: {action}" if success: # Cache invalidieren with self._cache_lock: if printer_id in self.cache: del self.cache[printer_id] if printer_id in self._last_check: del self._last_check[printer_id] db_session.close() return True, f"Steckdose erfolgreich {action}" else: db_session.close() return False, "Steckdose konnte nicht gesteuert werden" except Exception as e: hardware_logger.error(f"Fehler beim Steuern der Steckdose für Drucker {printer_id}: {str(e)}") return False, str(e) def check_and_control_for_jobs(self): """ Prüft alle Jobs und steuert Steckdosen entsprechend Diese Methode sollte regelmäßig vom Scheduler aufgerufen werden """ try: db_session = get_db_session() now = datetime.now() # Jobs die starten sollten jobs_to_start = db_session.query(Job).filter( Job.status == "scheduled", Job.start_at <= now ).all() for job in jobs_to_start: hardware_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}") success, msg = self.control_plug(job.printer_id, "on") if success: job.status = "running" hardware_logger.info(f"Steckdose für Job {job.id} eingeschaltet") else: hardware_logger.error(f"Fehler beim Einschalten für Job {job.id}: {msg}") # Jobs die enden sollten jobs_to_end = db_session.query(Job).filter( Job.status == "running", Job.end_at <= now ).all() for job in jobs_to_end: hardware_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}") success, msg = self.control_plug(job.printer_id, "off") if success: job.status = "finished" job.actual_end_time = now hardware_logger.info(f"Steckdose für Job {job.id} ausgeschaltet") else: hardware_logger.error(f"Fehler beim Ausschalten für Job {job.id}: {msg}") db_session.commit() db_session.close() except Exception as e: hardware_logger.error(f"Fehler bei der automatischen Job-Steuerung: {str(e)}") def get_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, Any]: """ Holt den gecachten Status für eine Session Args: session_id: Session-ID printer_ids: Optional - spezifische Drucker-IDs Returns: Dict mit Session-spezifischen Status-Daten """ try: with self._session_cache_lock: session_data = self._session_cache.get(session_id, {}) # Prüfe Cache-Gültigkeit cache_time = session_data.get('timestamp', datetime.min) if (datetime.now() - cache_time).total_seconds() > self._session_cache_ttl: # Cache abgelaufen self._session_cache.pop(session_id, None) return self._create_fresh_session_status(session_id, printer_ids) # Wenn spezifische Drucker angefragt, filtere diese if printer_ids: filtered_status = {} for printer_id in printer_ids: if str(printer_id) in session_data.get('printers', {}): filtered_status[str(printer_id)] = session_data['printers'][str(printer_id)] return { 'timestamp': session_data['timestamp'], 'session_id': session_id, 'printers': filtered_status, 'from_cache': True } return session_data except Exception as e: hardware_logger.error(f"Fehler beim Abrufen des Session-Status: {str(e)}") return self._create_fresh_session_status(session_id, printer_ids) def update_session_status(self, session_id: str, printer_id: int = None) -> bool: """ Aktualisiert den Session-Status-Cache Args: session_id: Session-ID printer_id: Optional - spezifischer Drucker Returns: bool: True wenn erfolgreich """ try: with self._session_cache_lock: if printer_id: # Einzelnen Drucker aktualisieren printer_status = self.get_live_printer_status(use_session_cache=False) if session_id not in self._session_cache: self._session_cache[session_id] = { 'timestamp': datetime.now(), 'session_id': session_id, 'printers': {} } self._session_cache[session_id]['printers'][str(printer_id)] = printer_status.get(printer_id, {}) self._session_cache[session_id]['timestamp'] = datetime.now() else: # Alle Drucker aktualisieren self._session_cache[session_id] = self._create_fresh_session_status(session_id) hardware_logger.debug(f"Session-Status für {session_id} aktualisiert") return True except Exception as e: hardware_logger.error(f"Fehler beim Aktualisieren des Session-Status: {str(e)}") return False def clear_session_cache(self, session_id: str = None) -> bool: """ Löscht Session-Cache Args: session_id: Optional - spezifische Session, sonst alle Returns: bool: True wenn erfolgreich """ try: with self._session_cache_lock: if session_id: self._session_cache.pop(session_id, None) hardware_logger.debug(f"Session-Cache für {session_id} gelöscht") else: self._session_cache.clear() hardware_logger.debug("Kompletter Session-Cache gelöscht") return True except Exception as e: hardware_logger.error(f"Fehler beim Löschen des Session-Cache: {str(e)}") return False def _create_fresh_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, Any]: """ Erstellt frischen Session-Status Args: session_id: Session-ID printer_ids: Optional - spezifische Drucker-IDs Returns: Dict mit frischen Status-Daten """ try: db_session = get_db_session() # Alle oder spezifische Drucker laden if printer_ids: printers = db_session.query(Printer).filter(Printer.id.in_(printer_ids)).all() else: printers = db_session.query(Printer).all() session_data = { 'timestamp': datetime.now(), 'session_id': session_id, 'printers': {}, 'from_cache': False } # Status für jeden Drucker abrufen status_dict = self.get_live_printer_status(use_session_cache=False) for printer in printers: session_data['printers'][str(printer.id)] = status_dict.get(printer.id, {}) # In Session-Cache speichern with self._session_cache_lock: self._session_cache[session_id] = session_data db_session.close() return session_data except Exception as e: hardware_logger.error(f"Fehler beim Erstellen frischen Session-Status: {str(e)}") return { 'timestamp': datetime.now(), 'session_id': session_id, 'printers': {}, 'error': str(e), 'from_cache': False } def invalidate_cache(self, printer_id: int = None) -> bool: """ Invalidiert Cache für spezifischen Drucker oder alle Args: printer_id: Optional - spezifischer Drucker, None = alle Drucker Returns: bool: True wenn erfolgreich """ try: with self._cache_lock: if printer_id is not None: # Spezifischen Drucker-Cache löschen self.cache.pop(printer_id, None) self._last_check.pop(printer_id, None) hardware_logger.debug(f"Cache für Drucker {printer_id} invalidiert") else: # Alle Caches löschen self.cache.clear() self._last_check.clear() hardware_logger.info("Kompletter Status-Cache invalidiert") return True except Exception as e: hardware_logger.error(f"Fehler beim Invalidieren des Cache: {str(e)}") return False def force_network_refresh(self) -> Dict[str, Any]: """ Forciert komplette Netzwerk-Neuprüfung aller Drucker Invalidiert alle Caches und führt echte Netzwerk-Tests durch Returns: Dict mit Refresh-Ergebnissen """ try: hardware_logger.info("Starte Force-Network-Refresh für alle Drucker") # Alle Caches invalidieren self.invalidate_cache() self.clear_session_cache() # Tapo-Controller Cache leeren try: tapo_ctrl = get_tapo_controller() if tapo_ctrl and hasattr(tapo_ctrl, 'clear_cache'): tapo_ctrl.clear_cache() hardware_logger.debug("Tapo-Controller Cache geleert") except Exception as e: hardware_logger.warning(f"Tapo-Controller Cache konnte nicht geleert werden: {str(e)}") # Frischen Status für alle Drucker abrufen fresh_status = self.get_live_printer_status(use_session_cache=False) # Ergebnisse zusammenfassen results = { "success": True, "timestamp": datetime.now().isoformat(), "printers_refreshed": len(fresh_status), "printers": fresh_status, "message": f"Netzwerk-Status für {len(fresh_status)} Drucker erfolgreich aktualisiert" } hardware_logger.info(f"Force-Network-Refresh abgeschlossen: {len(fresh_status)} Drucker aktualisiert") return results except Exception as e: hardware_logger.error(f"Fehler beim Force-Network-Refresh: {str(e)}") return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat(), "message": "Fehler beim Aktualisieren der Netzwerk-Status" } # ===== GLOBALE INSTANZEN ===== _tapo_controller = None _printer_monitor = None def get_tapo_controller() -> TapoController: global _tapo_controller if _tapo_controller is None: _tapo_controller = TapoController() return _tapo_controller def get_printer_monitor() -> PrinterMonitor: global _printer_monitor if _printer_monitor is None: _printer_monitor = PrinterMonitor() return _printer_monitor # ===== LEGACY COMPATIBILITY ===== def toggle_plug(ip: str, state: bool) -> bool: """Legacy-Wrapper für Tapo-Steuerung""" return get_tapo_controller().toggle_plug(ip, state) # Legacy-Instanzen für Rückwärtskompatibilität tapo_controller = get_tapo_controller() printer_monitor = get_printer_monitor() hardware_logger.info("✅ Hardware Integration Module initialisiert") hardware_logger.info("📊 Massive Konsolidierung: 2 Dateien → 1 Datei (50% Reduktion)")