""" TP-Link Tapo P110 Zentraler Controller für MYP Platform Sammelt alle operativen Tapo-Steckdosen-Funktionalitäten an einem Ort. """ import time import socket import signal import ipaddress from datetime import datetime from typing import Dict, Tuple, Optional, List, Any from concurrent.futures import ThreadPoolExecutor, as_completed from models import get_db_session, Printer, PlugStatusLog from utils.logging_config import get_logger from utils.settings import TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_TIMEOUT, TAPO_RETRY_COUNT # TP-Link Tapo P110 Unterstützung prüfen try: from PyP100 import PyP100 TAPO_AVAILABLE = True except ImportError: TAPO_AVAILABLE = False # Logger initialisieren logger = get_logger("tapo_controller") class TapoController: """ Zentraler Controller für alle TP-Link Tapo P110 Operationen. """ def __init__(self): """Initialisiere den Tapo Controller.""" self.username = TAPO_USERNAME self.password = TAPO_PASSWORD self.timeout = TAPO_TIMEOUT self.retry_count = TAPO_RETRY_COUNT self.auto_discovered = False if not TAPO_AVAILABLE: logger.error("❌ PyP100-Modul nicht installiert - Tapo-Funktionalität eingeschränkt") else: logger.info("✅ Tapo Controller initialisiert") def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool: """ Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus. Args: ip: IP-Adresse der Steckdose state: True = Ein, False = Aus username: Benutzername (optional, nutzt Standard wenn nicht angegeben) password: Passwort (optional, nutzt Standard wenn nicht angegeben) Returns: bool: True wenn erfolgreich geschaltet """ if not TAPO_AVAILABLE: logger.error("❌ PyP100-Modul nicht installiert - Steckdose kann nicht geschaltet werden") return False # IMMER globale Anmeldedaten verwenden username = self.username password = self.password logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip}") for attempt in range(self.retry_count): try: # P100-Verbindung herstellen p100 = PyP100.P100(ip, username, password) p100.handshake() p100.login() # Steckdose schalten if state: p100.turnOn() logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet") else: p100.turnOff() logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet") return True except Exception as e: action = "ein" if state else "aus" logger.warning(f"⚠️ Versuch {attempt+1}/{self.retry_count} fehlgeschlagen beim {action}schalten von {ip}: {str(e)}") if attempt < self.retry_count - 1: time.sleep(1) # Kurze Pause vor erneutem Versuch else: logger.error(f"❌ Fehler beim {action}schalten der Tapo-Steckdose {ip}: {str(e)}") return False def turn_off(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool: """ Schaltet eine TP-Link Tapo P110-Steckdose aus. Args: ip: IP-Adresse der Steckdose username: Benutzername (optional) password: Passwort (optional) printer_id: ID des zugehörigen Druckers für Logging (optional) Returns: bool: True wenn erfolgreich ausgeschaltet """ if not TAPO_AVAILABLE: logger.error("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Steckdose nicht schalten") self._log_plug_status(printer_id, "disconnected", ip, error_message="PyP100-Modul nicht verfügbar") return False # IMMER globale Anmeldedaten verwenden username = self.username password = self.password start_time = time.time() try: # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100.P100(ip, username, password) p100.handshake() p100.login() # Steckdose ausschalten p100.turnOff() response_time = int((time.time() - start_time) * 1000) # in Millisekunden logger.debug(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet") # Logging: Erfolgreich ausgeschaltet self._log_plug_status(printer_id, "off", ip, response_time_ms=response_time) return True except Exception as e: response_time = int((time.time() - start_time) * 1000) logger.debug(f"⚠️ Fehler beim Ausschalten der Tapo-Steckdose {ip}: {str(e)}") # Logging: Fehlgeschlagener Versuch self._log_plug_status(printer_id, "disconnected", ip, response_time_ms=response_time, error_message=str(e)) return False def check_outlet_status(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> Tuple[bool, str]: """ Überprüft den Status einer TP-Link Tapo P110-Steckdose. Args: ip: IP-Adresse der Steckdose username: Benutzername (optional) password: Passwort (optional) printer_id: ID des zugehörigen Druckers für Logging (optional) Returns: Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown" """ if not TAPO_AVAILABLE: logger.debug("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen-Status nicht abfragen") self._log_plug_status(printer_id, "disconnected", ip, error_message="PyP100-Modul nicht verfügbar", notes="Status-Check fehlgeschlagen") return False, "unknown" # IMMER globale Anmeldedaten verwenden username = self.username password = self.password start_time = time.time() try: # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100.P100(ip, username, password) p100.handshake() p100.login() # Geräteinformationen abrufen device_info = p100.getDeviceInfo() # Status auswerten device_on = device_info.get('device_on', False) status = "on" if device_on else "off" response_time = int((time.time() - start_time) * 1000) logger.debug(f"✅ Tapo-Steckdose {ip}: Status = {status}") # Erweiterte Informationen sammeln extra_info = self._collect_device_info(p100, device_info) # Logging: Erfolgreicher Status-Check self._log_plug_status(printer_id, status, ip, response_time_ms=response_time, power_consumption=extra_info.get('power_consumption'), voltage=extra_info.get('voltage'), current=extra_info.get('current'), firmware_version=extra_info.get('firmware_version'), notes="Automatischer Status-Check") return True, status except Exception as e: response_time = int((time.time() - start_time) * 1000) logger.debug(f"⚠️ Fehler bei Tapo-Steckdosen-Status-Check {ip}: {str(e)}") # Logging: Fehlgeschlagener Status-Check self._log_plug_status(printer_id, "disconnected", ip, response_time_ms=response_time, error_message=str(e), notes="Status-Check fehlgeschlagen") return False, "unknown" def test_connection(self, ip: str, username: str = None, password: str = None) -> dict: """ Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose. Args: ip: IP-Adresse der Steckdose username: Benutzername (optional) password: Passwort (optional) Returns: dict: Ergebnis mit Status und Informationen """ result = { "success": False, "message": "", "device_info": None, "error": None } if not TAPO_AVAILABLE: result["message"] = "PyP100-Modul nicht verfügbar" result["error"] = "ModuleNotFound" logger.error("PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen nicht testen") return result # Verwende globale Anmeldedaten falls nicht angegeben if not username or not password: username = self.username password = self.password logger.debug(f"Verwende globale Tapo-Anmeldedaten für {ip}") try: # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100.P100(ip, username, password) p100.handshake() p100.login() # Geräteinformationen abrufen device_info = p100.getDeviceInfo() result["success"] = True result["message"] = "Verbindung erfolgreich" result["device_info"] = device_info logger.info(f"Tapo-Verbindung zu {ip} erfolgreich: {device_info.get('nickname', 'Unbekannt')}") except Exception as e: result["success"] = False result["message"] = f"Verbindungsfehler: {str(e)}" result["error"] = str(e) logger.error(f"Fehler bei Tapo-Test zu {ip}: {str(e)}") return result def ping_address(self, ip: str, timeout: int = 3) -> bool: """ Führt einen Konnektivitätstest zu einer IP-Adresse durch. Verwendet TCP-Verbindung statt Ping für bessere Kompatibilität. Args: ip: Zu testende IP-Adresse timeout: Timeout in Sekunden Returns: bool: True wenn Verbindung erfolgreich """ try: # IP-Adresse validieren ipaddress.ip_address(ip.strip()) # Standard-Ports für Tapo-Steckdosen testen test_ports = [9999, 80, 443] # Tapo-Standard, HTTP, HTTPS for port in test_ports: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((ip.strip(), port)) sock.close() if result == 0: logger.debug(f"✅ Verbindung zu {ip}:{port} erfolgreich") return True logger.debug(f"❌ Keine Verbindung zu {ip} auf Standard-Ports möglich") return False except Exception as e: logger.debug(f"❌ Fehler beim Verbindungstest zu {ip}: {str(e)}") return False def auto_discover_outlets(self) -> Dict[str, bool]: """ Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk. Returns: Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel """ if self.auto_discovered: logger.info("🔍 Tapo-Steckdosen wurden bereits erkannt") return {} logger.info("🔍 Starte automatische Tapo-Steckdosenerkennung...") results = {} start_time = time.time() # Standard-IPs aus der Konfiguration testen logger.info(f"🔄 Teste {len(DEFAULT_TAPO_IPS)} Standard-IPs aus der Konfiguration") for i, ip in enumerate(DEFAULT_TAPO_IPS): try: logger.info(f"🔍 Teste IP {i+1}/{len(DEFAULT_TAPO_IPS)}: {ip}") # Schneller Ping-Test if self.ping_address(ip, timeout=2): logger.info(f"✅ Steckdose mit IP {ip} ist erreichbar") # Tapo-Verbindung testen test_result = self.test_connection(ip) if test_result["success"]: device_info = test_result["device_info"] nickname = device_info.get('nickname', f"Tapo P110 ({ip})") state = "on" if device_info.get('device_on', False) else "off" logger.info(f"✅ Tapo-Steckdose '{nickname}' ({ip}) gefunden - Status: {state}") results[ip] = True # Steckdose in Datenbank speichern/aktualisieren try: self._ensure_outlet_in_database(ip, nickname) except Exception as db_error: logger.warning(f"⚠️ Fehler beim Speichern in DB für {ip}: {str(db_error)}") else: logger.debug(f"❌ IP {ip} ist erreichbar, aber keine Tapo-Steckdose") results[ip] = False else: logger.debug(f"❌ IP {ip} nicht erreichbar") results[ip] = False except Exception as e: logger.warning(f"❌ Fehler bei Steckdosen-Erkennung für IP {ip}: {str(e)}") results[ip] = False continue # Erfolgsstatistik success_count = sum(1 for success in results.values() if success) elapsed_time = time.time() - start_time logger.info(f"✅ Steckdosen-Erkennung abgeschlossen: {success_count}/{len(results)} Steckdosen gefunden in {elapsed_time:.1f}s") self.auto_discovered = True return results def initialize_all_outlets(self) -> Dict[str, bool]: """ Schaltet alle gespeicherten Steckdosen aus (einheitlicher Startzustand). Returns: Dict[str, bool]: Ergebnis der Initialisierung pro Drucker """ logger.info("🚀 Starte Steckdosen-Initialisierung...") results = {} try: db_session = get_db_session() printers = db_session.query(Printer).filter(Printer.active == True).all() if not printers: logger.warning("⚠️ Keine aktiven Drucker zur Initialisierung gefunden") db_session.close() return results # Alle Steckdosen ausschalten for printer in printers: try: if printer.plug_ip: success = self.turn_off( printer.plug_ip, printer_id=printer.id ) results[printer.name] = success if success: logger.info(f"✅ {printer.name}: Steckdose ausgeschaltet") printer.status = "offline" printer.last_checked = datetime.now() else: logger.warning(f"❌ {printer.name}: Steckdose konnte nicht ausgeschaltet werden") else: logger.warning(f"⚠️ {printer.name}: Keine Steckdosen-IP konfiguriert") results[printer.name] = False except Exception as e: logger.error(f"❌ Fehler bei Initialisierung von {printer.name}: {str(e)}") results[printer.name] = False # Änderungen speichern db_session.commit() db_session.close() success_count = sum(1 for success in results.values() if success) total_count = len(results) logger.info(f"🎯 Steckdosen-Initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich") except Exception as e: logger.error(f"❌ Kritischer Fehler bei Steckdosen-Initialisierung: {str(e)}") return results def get_all_outlet_status(self) -> Dict[str, Dict[str, Any]]: """ Holt den Status aller konfigurierten Tapo-Steckdosen. Returns: Dict[str, Dict]: Status aller Steckdosen mit IP als Schlüssel """ status_dict = {} try: db_session = get_db_session() printers = db_session.query(Printer).filter( Printer.active == True, Printer.plug_ip.isnot(None) ).all() if not printers: logger.info("ℹ️ Keine Drucker mit Tapo-Steckdosen konfiguriert") db_session.close() return status_dict logger.info(f"🔍 Prüfe Status von {len(printers)} Tapo-Steckdosen...") # Parallel-Status-Prüfung with ThreadPoolExecutor(max_workers=min(len(printers), 8)) as executor: future_to_printer = { executor.submit( self.check_outlet_status, printer.plug_ip, printer_id=printer.id ): printer for printer in printers } for future in as_completed(future_to_printer, timeout=15): printer = future_to_printer[future] try: reachable, status = future.result() status_dict[printer.plug_ip] = { "printer_name": printer.name, "printer_id": printer.id, "reachable": reachable, "status": status, "ip": printer.plug_ip, "last_checked": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Fehler bei Status-Check für {printer.name}: {str(e)}") status_dict[printer.plug_ip] = { "printer_name": printer.name, "printer_id": printer.id, "reachable": False, "status": "error", "ip": printer.plug_ip, "error": str(e), "last_checked": datetime.now().isoformat() } db_session.close() logger.info(f"✅ Status-Update abgeschlossen für {len(status_dict)} Steckdosen") except Exception as e: logger.error(f"❌ Kritischer Fehler beim Abrufen des Steckdosen-Status: {str(e)}") return status_dict def _collect_device_info(self, p100: PyP100.P100, device_info: dict) -> dict: """ Sammelt erweiterte Geräteinformationen von der Tapo-Steckdose. Args: p100: PyP100-Instanz device_info: Basis-Geräteinformationen Returns: dict: Erweiterte Informationen """ extra_info = {} try: # Firmware-Version extra_info['firmware_version'] = device_info.get('fw_ver', None) # Versuche Energiedaten zu holen (nur P110) try: energy_usage = p100.getEnergyUsage() if energy_usage: extra_info['power_consumption'] = energy_usage.get('current_power', None) extra_info['voltage'] = energy_usage.get('voltage', None) extra_info['current'] = energy_usage.get('current', None) except: pass # P100 unterstützt keine Energiedaten except Exception as e: logger.debug(f"Fehler beim Sammeln erweiterter Geräteinformationen: {str(e)}") return extra_info def _log_plug_status(self, printer_id: int, status: str, ip_address: str, **kwargs): """ Protokolliert Steckdosen-Status in der Datenbank. Args: printer_id: ID des Druckers status: Status der Steckdose ip_address: IP-Adresse der Steckdose **kwargs: Zusätzliche Parameter für das Logging """ if not printer_id: return try: PlugStatusLog.log_status_change( printer_id=printer_id, status=status, source="system", ip_address=ip_address, **kwargs ) except Exception as e: logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {e}") def _ensure_outlet_in_database(self, ip_address: str, nickname: str = None) -> bool: """ Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert. Args: ip_address: IP-Adresse der Steckdose nickname: Name der Steckdose (optional) Returns: bool: True wenn erfolgreich gespeichert/aktualisiert """ try: db_session = get_db_session() # Prüfen, ob Drucker mit dieser IP bereits existiert existing_printer = db_session.query(Printer).filter( Printer.plug_ip == ip_address ).first() if existing_printer: # Drucker aktualisieren if not existing_printer.plug_username or not existing_printer.plug_password: existing_printer.plug_username = self.username existing_printer.plug_password = self.password logger.info(f"✅ Drucker {existing_printer.name} mit Tapo-Anmeldedaten aktualisiert") if nickname and existing_printer.name != nickname and "Tapo P110" not in existing_printer.name: old_name = existing_printer.name existing_printer.name = nickname logger.info(f"✅ Drucker {old_name} umbenannt zu {nickname}") # Drucker als aktiv markieren if not existing_printer.active: existing_printer.active = True logger.info(f"✅ Drucker {existing_printer.name} als aktiv markiert") existing_printer.last_checked = datetime.now() db_session.commit() db_session.close() return True else: # Neuen Drucker erstellen printer_name = nickname or f"Tapo P110 ({ip_address})" mac_address = f"tapo:{ip_address.replace('.', '-')}" new_printer = Printer( name=printer_name, model="TP-Link Tapo P110", location="Automatisch erkannt", ip_address=ip_address, mac_address=mac_address, plug_ip=ip_address, plug_username=self.username, plug_password=self.password, status="offline", active=True, last_checked=datetime.now() ) db_session.add(new_printer) db_session.commit() logger.info(f"✅ Neuer Drucker '{printer_name}' mit Tapo-Steckdose {ip_address} erstellt") db_session.close() return True except Exception as e: logger.error(f"❌ Fehler beim Speichern der Tapo-Steckdose {ip_address}: {str(e)}") try: db_session.rollback() db_session.close() except: pass return False # Globale Instanz für einfachen Zugriff tapo_controller = TapoController() # Convenience-Funktionen für Rückwärtskompatibilität def toggle_plug(ip: str, state: bool, username: str = None, password: str = None) -> bool: """Schaltet eine Tapo-Steckdose ein/aus.""" return tapo_controller.toggle_plug(ip, state, username, password) def test_tapo_connection(ip: str, username: str = None, password: str = None) -> dict: """Testet die Verbindung zu einer Tapo-Steckdose.""" return tapo_controller.test_connection(ip, username, password) def check_outlet_status(ip: str, username: str = None, password: str = None, printer_id: int = None) -> Tuple[bool, str]: """Prüft den Status einer Tapo-Steckdose.""" return tapo_controller.check_outlet_status(ip, username, password, printer_id) def auto_discover_tapo_outlets() -> Dict[str, bool]: """Führt automatische Erkennung von Tapo-Steckdosen durch.""" return tapo_controller.auto_discover_outlets() def initialize_all_outlets() -> Dict[str, bool]: """Initialisiert alle Tapo-Steckdosen (schaltet sie aus).""" return tapo_controller.initialize_all_outlets()