""" Live-Drucker-Monitor für MYP Platform Überwacht Druckerstatus in Echtzeit mit Session-Caching und automatischer Steckdosen-Initialisierung. """ import time import threading import requests import subprocess import ipaddress from datetime import datetime, timedelta from typing import Dict, Tuple, List, Optional from flask import session from sqlalchemy import func from sqlalchemy.orm import Session import os from models import get_db_session, Printer from utils.logging_config import get_logger from config.settings import PRINTERS, TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_AUTO_DISCOVERY # TP-Link Tapo P110 Unterstützung hinzufügen try: from PyP100 import PyP110 TAPO_AVAILABLE = True except ImportError: TAPO_AVAILABLE = False # Logger initialisieren monitor_logger = get_logger("printer_monitor") class PrinterMonitor: """ Live-Drucker-Monitor mit Session-Caching und automatischer Initialisierung. """ def __init__(self): self.session_cache = {} # Session-basierter Cache für schnelle Zugriffe self.db_cache = {} # Datenbank-Cache für persistente Daten self.cache_lock = threading.Lock() self.last_db_sync = datetime.now() self.monitoring_active = False self.monitor_thread = None self.startup_initialized = False self.auto_discovered_tapo = False # Cache-Konfiguration self.session_cache_ttl = 30 # 30 Sekunden für Session-Cache self.db_cache_ttl = 300 # 5 Minuten für DB-Cache monitor_logger.info("🖨️ Drucker-Monitor initialisiert") # Automatische Steckdosenerkennung starten, falls aktiviert if TAPO_AUTO_DISCOVERY: self.auto_discover_tapo_outlets() def initialize_all_outlets_on_startup(self) -> Dict[str, bool]: """ Schaltet beim Programmstart alle gespeicherten Steckdosen aus (gleicher Startzustand). Returns: Dict[str, bool]: Ergebnis der Initialisierung pro Drucker """ if self.startup_initialized: monitor_logger.info("🔄 Steckdosen bereits beim Start initialisiert") return {} monitor_logger.info("🚀 Starte Steckdosen-Initialisierung beim Programmstart...") results = {} try: db_session = get_db_session() printers = db_session.query(Printer).filter(Printer.active == True).all() if not printers: monitor_logger.warning("⚠️ Keine aktiven Drucker zur Initialisierung gefunden") db_session.close() self.startup_initialized = True return results # Alle Steckdosen ausschalten für einheitlichen Startzustand for printer in printers: try: if printer.plug_ip and printer.plug_username and printer.plug_password: success = self._turn_outlet_off( printer.plug_ip, printer.plug_username, printer.plug_password ) results[printer.name] = success if success: monitor_logger.info(f"✅ {printer.name}: Steckdose ausgeschaltet") # Status in Datenbank aktualisieren printer.status = "offline" printer.last_checked = datetime.now() else: monitor_logger.warning(f"❌ {printer.name}: Steckdose konnte nicht ausgeschaltet werden") else: monitor_logger.warning(f"⚠️ {printer.name}: Unvollständige Steckdosen-Konfiguration") results[printer.name] = False except Exception as e: monitor_logger.error(f"❌ Fehler bei Initialisierung von {printer.name}: {str(e)}") results[printer.name] = False # Änderungen speichern db_session.commit() db_session.close() success_count = sum(1 for success in results.values() if success) total_count = len(results) monitor_logger.info(f"🎯 Steckdosen-Initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich") self.startup_initialized = True except Exception as e: monitor_logger.error(f"❌ Kritischer Fehler bei Steckdosen-Initialisierung: {str(e)}") results = {} return results def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5) -> bool: """ Schaltet eine TP-Link Tapo P110-Steckdose aus. Args: ip_address: IP-Adresse der Steckdose username: Benutzername für die Steckdose password: Passwort für die Steckdose timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat) Returns: bool: True wenn erfolgreich ausgeschaltet """ if not TAPO_AVAILABLE: monitor_logger.error("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Steckdose nicht schalten") return False try: # TP-Link Tapo P110 Verbindung herstellen p110 = PyP110.P110(ip_address, username, password) p110.handshake() # Authentifizierung p110.login() # Login # Steckdose ausschalten p110.turnOff() monitor_logger.debug(f"✅ Tapo-Steckdose {ip_address} erfolgreich ausgeschaltet") return True except Exception as e: monitor_logger.debug(f"⚠️ Fehler beim Ausschalten der Tapo-Steckdose {ip_address}: {str(e)}") return False def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]: """ Holt Live-Druckerstatus mit Session- und DB-Caching. Args: use_session_cache: Ob Session-Cache verwendet werden soll Returns: Dict[int, Dict]: Status-Dict mit Drucker-ID als Key """ current_time = datetime.now() # Session-Cache prüfen (nur wenn aktiviert) if use_session_cache and hasattr(session, 'get'): session_key = "printer_status_cache" session_timestamp_key = "printer_status_timestamp" cached_data = session.get(session_key) cached_timestamp = session.get(session_timestamp_key) if cached_data and cached_timestamp: cache_age = (current_time - datetime.fromisoformat(cached_timestamp)).total_seconds() if cache_age < self.session_cache_ttl: monitor_logger.debug("📋 Verwende Session-Cache für Druckerstatus") return cached_data # DB-Cache prüfen with self.cache_lock: if self.db_cache and (current_time - self.last_db_sync).total_seconds() < self.db_cache_ttl: monitor_logger.debug("🗃️ Verwende DB-Cache für Druckerstatus") # Session-Cache aktualisieren if use_session_cache and hasattr(session, '__setitem__'): session["printer_status_cache"] = self.db_cache session["printer_status_timestamp"] = current_time.isoformat() return self.db_cache # Live-Status von Druckern abrufen monitor_logger.info("🔄 Aktualisiere Live-Druckerstatus...") status_dict = self._fetch_live_printer_status() # Caches aktualisieren with self.cache_lock: self.db_cache = status_dict self.last_db_sync = current_time if use_session_cache and hasattr(session, '__setitem__'): session["printer_status_cache"] = status_dict session["printer_status_timestamp"] = current_time.isoformat() return status_dict def _fetch_live_printer_status(self) -> Dict[int, Dict]: """ Holt den aktuellen Status aller Drucker direkt von den Geräten. Returns: Dict[int, Dict]: Status-Dict mit umfassenden Informationen """ status_dict = {} try: db_session = get_db_session() printers = db_session.query(Printer).filter(Printer.active == True).all() # Wenn keine aktiven Drucker vorhanden sind, gebe leeres Dict zurück if not printers: monitor_logger.info("ℹ️ Keine aktiven Drucker gefunden") db_session.close() return status_dict monitor_logger.info(f"🔍 Prüfe Status von {len(printers)} aktiven Druckern...") # Parallel-Status-Prüfung mit ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor, as_completed # Sicherstellen, dass max_workers mindestens 1 ist max_workers = min(max(len(printers), 1), 8) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_printer = { executor.submit(self._check_single_printer_status, printer): printer for printer in printers } for future in as_completed(future_to_printer, timeout=15): printer = future_to_printer[future] try: status_info = future.result() status_dict[printer.id] = status_info # Status in Datenbank aktualisieren printer.status = status_info["status"] printer.last_checked = datetime.now() except Exception as e: monitor_logger.error(f"❌ Fehler bei Status-Check für Drucker {printer.name}: {str(e)}") status_dict[printer.id] = { "id": printer.id, "name": printer.name, "status": "offline", "active": False, "ip_address": printer.ip_address, "plug_ip": printer.plug_ip, "location": printer.location, "last_checked": datetime.now().isoformat(), "error": str(e) } # Änderungen in Datenbank speichern db_session.commit() db_session.close() monitor_logger.info(f"✅ Status-Update abgeschlossen für {len(status_dict)} Drucker") except Exception as e: monitor_logger.error(f"❌ Kritischer Fehler beim Abrufen des Live-Status: {str(e)}") return status_dict def _check_single_printer_status(self, printer: Printer, timeout: int = 7) -> Dict: """ Überprüft den Status eines einzelnen Druckers. Args: printer: Printer-Objekt aus der Datenbank timeout: Timeout in Sekunden Returns: Dict: Umfassende Status-Informationen """ status_info = { "id": printer.id, "name": printer.name, "status": "offline", "active": False, "ip_address": printer.ip_address, "plug_ip": printer.plug_ip, "location": printer.location, "last_checked": datetime.now().isoformat(), "ping_successful": False, "outlet_reachable": False, "outlet_state": "unknown" } try: # 1. Ping-Test für Grundkonnektivität if printer.plug_ip: ping_success = self._ping_address(printer.plug_ip, timeout=3) status_info["ping_successful"] = ping_success if ping_success: # 2. Smart Plug Status prüfen outlet_reachable, outlet_state = self._check_outlet_status( printer.plug_ip, printer.plug_username, printer.plug_password, timeout ) status_info["outlet_reachable"] = outlet_reachable status_info["outlet_state"] = outlet_state if outlet_reachable and outlet_state == "on": status_info["status"] = "online" status_info["active"] = True elif outlet_reachable and outlet_state == "off": status_info["status"] = "standby" status_info["active"] = False else: status_info["status"] = "offline" status_info["active"] = False else: status_info["status"] = "unreachable" status_info["active"] = False else: status_info["status"] = "unconfigured" status_info["active"] = False except Exception as e: monitor_logger.error(f"❌ Fehler bei Status-Check für {printer.name}: {str(e)}") status_info["error"] = str(e) return status_info def _ping_address(self, ip_address: str, timeout: int = 3) -> bool: """ Führt einen Ping-Test zu einer IP-Adresse durch. Args: ip_address: Zu testende IP-Adresse timeout: Timeout in Sekunden Returns: bool: True wenn Ping erfolgreich """ try: # IP-Adresse validieren ipaddress.ip_address(ip_address.strip()) # Platform-spezifische Ping-Befehle if os.name == 'nt': # Windows cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()] else: # Unix/Linux/macOS cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()] result = subprocess.run( cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', timeout=timeout + 1 ) return result.returncode == 0 except Exception: return False def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5) -> Tuple[bool, str]: """ Überprüft den Status einer TP-Link Tapo P110-Steckdose. Args: ip_address: IP-Adresse der Steckdose username: Benutzername für die Steckdose password: Passwort für die Steckdose timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat) Returns: Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown" """ if not TAPO_AVAILABLE: monitor_logger.debug("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen-Status nicht abfragen") return False, "unknown" # Fallback zu globalen Anmeldedaten wenn keine lokalen vorhanden if not username or not password: username = TAPO_USERNAME password = TAPO_PASSWORD monitor_logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip_address}") try: # TP-Link Tapo P110 Verbindung herstellen p110 = PyP110.P110(ip_address, username, password) p110.handshake() # Authentifizierung p110.login() # Login # Geräteinformationen abrufen device_info = p110.getDeviceInfo() # Status auswerten device_on = device_info.get('device_on', False) status = "on" if device_on else "off" monitor_logger.debug(f"✅ Tapo-Steckdose {ip_address}: Status = {status}") return True, status except Exception as e: monitor_logger.debug(f"⚠️ Fehler bei Tapo-Steckdosen-Status-Check {ip_address}: {str(e)}") return False, "unknown" def clear_all_caches(self): """Löscht alle Caches (Session und DB).""" with self.cache_lock: self.db_cache = {} self.last_db_sync = datetime.now() if hasattr(session, 'pop'): session.pop("printer_status_cache", None) session.pop("printer_status_timestamp", None) monitor_logger.info("🧹 Alle Drucker-Caches gelöscht") def get_printer_summary(self) -> Dict[str, int]: """ Gibt eine Zusammenfassung der Druckerstatus zurück. Returns: Dict[str, int]: Anzahl Drucker pro Status """ status_dict = self.get_live_printer_status() summary = { "total": len(status_dict), "online": 0, "offline": 0, "standby": 0, "unreachable": 0, "unconfigured": 0 } for printer_info in status_dict.values(): status = printer_info.get("status", "offline") if status in summary: summary[status] += 1 else: summary["offline"] += 1 return summary def auto_discover_tapo_outlets(self) -> Dict[str, bool]: """ Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk. Returns: Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel """ if self.auto_discovered_tapo: monitor_logger.info("🔍 Tapo-Steckdosen wurden bereits erkannt") return {} monitor_logger.info("🔍 Starte automatische Tapo-Steckdosenerkennung...") results = {} # 1. Zuerst die Standard-IPs aus der Konfiguration testen monitor_logger.info(f"🔄 Teste {len(DEFAULT_TAPO_IPS)} Standard-IPs aus der Konfiguration") for ip in DEFAULT_TAPO_IPS: try: # Ping-Test für Grundkonnektivität ping_success = self._ping_address(ip, timeout=2) if ping_success: monitor_logger.info(f"✅ Steckdose mit IP {ip} ist pingbar") # Tapo-Verbindung testen if TAPO_AVAILABLE: try: p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD) p110.handshake() p110.login() device_info = p110.getDeviceInfo() # Steckdose gefunden und verbunden nickname = device_info.get('nickname', f"Tapo P110 ({ip})") state = "on" if device_info.get('device_on', False) else "off" monitor_logger.info(f"✅ Tapo-Steckdose '{nickname}' ({ip}) gefunden - Status: {state}") results[ip] = True # Steckdose in Datenbank speichern/aktualisieren self._ensure_tapo_in_database(ip, nickname) except Exception as e: monitor_logger.debug(f"❌ IP {ip} ist pingbar, aber keine Tapo-Steckdose: {str(e)}") results[ip] = False else: monitor_logger.warning("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Verbindung nicht testen") results[ip] = False else: monitor_logger.debug(f"❌ IP {ip} nicht erreichbar") results[ip] = False except Exception as e: monitor_logger.error(f"❌ Fehler bei Steckdosen-Erkennung für IP {ip}: {str(e)}") results[ip] = False # Erfolgsstatistik berechnen success_count = sum(1 for success in results.values() if success) monitor_logger.info(f"✅ Steckdosen-Erkennung abgeschlossen: {success_count}/{len(results)} Steckdosen gefunden") # Markieren, dass automatische Erkennung durchgeführt wurde self.auto_discovered_tapo = True return results def _ensure_tapo_in_database(self, ip_address: str, nickname: str = None) -> bool: """ Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert. Args: ip_address: IP-Adresse der Steckdose nickname: Name der Steckdose (optional) Returns: bool: True wenn erfolgreich in Datenbank gespeichert/aktualisiert """ try: db_session = get_db_session() # Prüfen, ob Drucker mit dieser IP bereits existiert existing_printer = db_session.query(Printer).filter(Printer.plug_ip == ip_address).first() if existing_printer: # Drucker aktualisieren, falls nötig if not existing_printer.plug_username or not existing_printer.plug_password: existing_printer.plug_username = TAPO_USERNAME existing_printer.plug_password = TAPO_PASSWORD monitor_logger.info(f"✅ Drucker {existing_printer.name} mit Tapo-Anmeldedaten aktualisiert") if nickname and existing_printer.name != nickname and "Tapo P110" not in existing_printer.name: old_name = existing_printer.name existing_printer.name = nickname monitor_logger.info(f"✅ Drucker {old_name} umbenannt zu {nickname}") # Status aktualisieren existing_printer.last_checked = datetime.now() db_session.commit() db_session.close() return True else: # Neuen Drucker erstellen, falls keiner existiert printer_name = nickname or f"Tapo P110 ({ip_address})" mac_address = f"tapo:{ip_address.replace('.', '-')}" # Pseudo-MAC-Adresse new_printer = Printer( name=printer_name, model="TP-Link Tapo P110", location="Automatisch erkannt", ip_address=ip_address, # Drucker-IP setzen wir gleich Steckdosen-IP mac_address=mac_address, plug_ip=ip_address, plug_username=TAPO_USERNAME, plug_password=TAPO_PASSWORD, status="offline", active=True, last_checked=datetime.now() ) db_session.add(new_printer) db_session.commit() monitor_logger.info(f"✅ Neuer Drucker '{printer_name}' mit Tapo-Steckdose {ip_address} erstellt") db_session.close() return True except Exception as e: monitor_logger.error(f"❌ Fehler beim Speichern der Tapo-Steckdose {ip_address}: {str(e)}") try: db_session.rollback() db_session.close() except: pass return False # Globale Instanz printer_monitor = PrinterMonitor()