""" Tapo Status Manager - Verwaltung der 3 Steckdosen-Status Dieser Manager stellt sicher, dass: 1. Alle 6 Drucker/Steckdosen immer angezeigt werden 2. Die 3 Status korrekt verwaltet werden: an, aus, nicht erreichbar 3. Der Status persistent gespeichert wird 4. Die automatische Steuerung basierend auf Jobs funktioniert """ from typing import Dict, Tuple, Optional, List from datetime import datetime, timedelta import asyncio from concurrent.futures import ThreadPoolExecutor import threading from models import Printer, Job, PlugStatusLog, get_db_session from utils.logging_config import get_logger logger = get_logger("tapo_status_manager") class TapoStatusManager: """ Zentraler Manager für Tapo-Steckdosen-Status """ # Die 3 möglichen Status-Zustände STATUS_ON = "on" STATUS_OFF = "off" STATUS_UNREACHABLE = "unreachable" # Status-Mapping für UI STATUS_DISPLAY = { STATUS_ON: {"text": "An", "color": "green", "icon": "power"}, STATUS_OFF: {"text": "Aus", "color": "gray", "icon": "power-off"}, STATUS_UNREACHABLE: {"text": "Nicht erreichbar", "color": "red", "icon": "exclamation-triangle"} } def __init__(self): """Initialisiert den Status-Manager""" self._status_cache = {} self._cache_lock = threading.RLock() self._last_check = {} self.check_interval = 30 # Sekunden zwischen Status-Checks # Session-spezifischer Status-Cache für Benutzer-Sessions self._session_cache = {} self._session_cache_lock = threading.RLock() self._session_cache_ttl = 300 # 5 Minuten für Session-Cache # Thread-Pool für asynchrone Operationen self._executor = ThreadPoolExecutor(max_workers=6) logger.info("TapoStatusManager mit Session-Caching initialisiert") def get_printer_status(self, printer_id: int) -> Dict[str, any]: """ Gibt den aktuellen Status eines Druckers zurück Args: printer_id: ID des Druckers Returns: Dict mit Status-Informationen """ with self._cache_lock: # Aus Cache holen wenn vorhanden und aktuell if printer_id in self._status_cache: cache_data = self._status_cache[printer_id] if self._is_cache_valid(printer_id): return cache_data # Neuen Status abrufen return self._fetch_printer_status(printer_id) def get_all_printer_status(self) -> List[Dict[str, any]]: """ Gibt den Status aller Drucker zurück Returns: Liste mit Status-Informationen aller Drucker """ try: db_session = get_db_session() printers = db_session.query(Printer).all() status_list = [] # Status für jeden Drucker abrufen for printer in printers: status = self.get_printer_status(printer.id) status_list.append(status) db_session.close() return status_list except Exception as e: logger.error(f"Fehler beim Abrufen aller Drucker-Status: {str(e)}") return [] def _fetch_printer_status(self, printer_id: int) -> Dict[str, any]: """ Holt den aktuellen Status eines Druckers Args: printer_id: ID des Druckers Returns: Dict mit Status-Informationen """ try: db_session = get_db_session() printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: logger.warning(f"Drucker {printer_id} nicht gefunden") return self._create_error_status(printer_id, "Drucker nicht gefunden") # Basis-Status erstellen status_info = { "id": printer.id, "name": printer.name, "model": printer.model, "location": printer.location, "ip_address": printer.ip_address, "has_plug": bool(printer.plug_ip), "plug_ip": printer.plug_ip, "active": printer.active, "last_checked": datetime.now() } # Wenn keine Steckdose konfiguriert if not printer.plug_ip: status_info.update({ "plug_status": "no_plug", "plug_reachable": False, "power_status": None, "can_control": False }) else: # Tapo-Status abrufen plug_status = self._check_tapo_status(printer) status_info.update(plug_status) # Aktuelle Jobs prüfen active_job = self._get_active_job(printer_id, db_session) if active_job: status_info["current_job"] = { "id": active_job.id, "name": active_job.name, "user": active_job.user.name, "start_at": active_job.start_at.isoformat(), "end_at": active_job.end_at.isoformat(), "status": active_job.status } else: status_info["current_job"] = None # Nächster geplanter Job next_job = self._get_next_job(printer_id, db_session) if next_job: status_info["next_job"] = { "id": next_job.id, "name": next_job.name, "user": next_job.user.name, "start_at": next_job.start_at.isoformat(), "starts_in_minutes": int((next_job.start_at - datetime.now()).total_seconds() / 60) } else: status_info["next_job"] = None # Status in Cache speichern with self._cache_lock: self._status_cache[printer_id] = status_info self._last_check[printer_id] = datetime.now() # Status in Datenbank loggen self._log_status(printer, status_info.get("plug_status", "unknown")) db_session.close() return status_info except Exception as e: logger.error(f"Fehler beim Abrufen des Status für Drucker {printer_id}: {str(e)}") return self._create_error_status(printer_id, str(e)) def _check_tapo_status(self, printer: Printer) -> Dict[str, any]: """ Prüft den Tapo-Steckdosen-Status Args: printer: Printer-Objekt Returns: Dict mit Tapo-Status """ try: # Tapo-Controller importieren from utils.hardware_integration import tapo_controller if not tapo_controller: return { "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "power_status": None, "can_control": False, "error": "Tapo-Controller nicht verfügbar" } # Status abrufen reachable, plug_status = tapo_controller.check_outlet_status( printer.plug_ip, printer_id=printer.id ) if reachable: # Erfolgreiche Verbindung return { "plug_status": self.STATUS_ON if plug_status == "on" else self.STATUS_OFF, "plug_reachable": True, "power_status": plug_status, "can_control": True } else: # Steckdose nicht erreichbar return { "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "power_status": None, "can_control": False, "error": "Steckdose nicht erreichbar" } except Exception as e: logger.error(f"Fehler beim Prüfen des Tapo-Status für {printer.name}: {str(e)}") return { "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "power_status": None, "can_control": False, "error": str(e) } def control_plug(self, printer_id: int, action: str) -> Tuple[bool, str]: """ Steuert eine Tapo-Steckdose Args: printer_id: ID des Druckers action: "on" oder "off" Returns: Tuple (Erfolg, Nachricht) """ try: db_session = get_db_session() printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: return False, "Drucker nicht gefunden" if not printer.plug_ip: return False, "Keine Steckdose konfiguriert" # Tapo-Controller verwenden from utils.hardware_integration import tapo_controller if not tapo_controller: return False, "Tapo-Controller nicht verfügbar" # Aktion ausführen success = False if action == "on": success = tapo_controller.turn_on_outlet(printer.plug_ip, printer_id) elif action == "off": success = tapo_controller.turn_off_outlet(printer.plug_ip, printer_id) else: return False, f"Ungültige Aktion: {action}" if success: # Cache invalidieren with self._cache_lock: if printer_id in self._status_cache: del self._status_cache[printer_id] # Status loggen self._log_status(printer, action, source="manual") db_session.close() return True, f"Steckdose erfolgreich {action}" else: db_session.close() return False, "Steckdose konnte nicht gesteuert werden" except Exception as e: logger.error(f"Fehler beim Steuern der Steckdose für Drucker {printer_id}: {str(e)}") return False, str(e) def check_and_control_for_jobs(self): """ Prüft alle Jobs und steuert Steckdosen entsprechend Diese Methode sollte regelmäßig vom Scheduler aufgerufen werden """ try: db_session = get_db_session() now = datetime.now() # Jobs die starten sollten jobs_to_start = db_session.query(Job).filter( Job.status == "scheduled", Job.start_at <= now ).all() for job in jobs_to_start: logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}") success, msg = self.control_plug(job.printer_id, "on") if success: job.status = "running" logger.info(f"Steckdose für Job {job.id} eingeschaltet") else: logger.error(f"Fehler beim Einschalten für Job {job.id}: {msg}") # Jobs die enden sollten jobs_to_end = db_session.query(Job).filter( Job.status == "running", Job.end_at <= now ).all() for job in jobs_to_end: logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}") success, msg = self.control_plug(job.printer_id, "off") if success: job.status = "finished" job.actual_end_time = now logger.info(f"Steckdose für Job {job.id} ausgeschaltet") else: logger.error(f"Fehler beim Ausschalten für Job {job.id}: {msg}") db_session.commit() db_session.close() except Exception as e: logger.error(f"Fehler bei der automatischen Job-Steuerung: {str(e)}") def _get_active_job(self, printer_id: int, db_session) -> Optional[Job]: """Gibt den aktuell aktiven Job für einen Drucker zurück""" return db_session.query(Job).filter( Job.printer_id == printer_id, Job.status == "running" ).first() def _get_next_job(self, printer_id: int, db_session) -> Optional[Job]: """Gibt den nächsten geplanten Job für einen Drucker zurück""" return db_session.query(Job).filter( Job.printer_id == printer_id, Job.status == "scheduled", Job.start_at > datetime.now() ).order_by(Job.start_at).first() def _is_cache_valid(self, printer_id: int) -> bool: """Prüft ob der Cache noch gültig ist""" if printer_id not in self._last_check: return False age = (datetime.now() - self._last_check[printer_id]).total_seconds() return age < self.check_interval def _create_error_status(self, printer_id: int, error: str) -> Dict[str, any]: """Erstellt einen Fehler-Status""" return { "id": printer_id, "name": f"Drucker {printer_id}", "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "error": error, "last_checked": datetime.now() } def _log_status(self, printer: Printer, status: str, source: str = "system"): """Loggt einen Status in die Datenbank""" try: PlugStatusLog.log_status_change( printer_id=printer.id, status=status, source=source, ip_address=printer.plug_ip ) except Exception as e: logger.error(f"Fehler beim Loggen des Status: {str(e)}") def get_status_for_calendar(self, start_date: datetime, end_date: datetime) -> List[Dict]: """ Gibt Status-Informationen für die Kalender-Ansicht zurück Args: start_date: Start-Datum end_date: End-Datum Returns: Liste mit Status-Events für den Kalender """ try: db_session = get_db_session() # Jobs im Zeitraum abrufen jobs = db_session.query(Job).filter( Job.start_at <= end_date, Job.end_at >= start_date ).all() events = [] for job in jobs: # Drucker-Status für Job printer = job.printer status = self.get_printer_status(printer.id) event = { "id": f"job_{job.id}", "title": f"{printer.name}: {job.name}", "start": job.start_at.isoformat(), "end": job.end_at.isoformat(), "backgroundColor": self._get_status_color(job.status), "extendedProps": { "job_id": job.id, "printer_id": printer.id, "printer_name": printer.name, "printer_status": status.get("plug_status", "unknown"), "job_status": job.status, "user": job.user.name, "plug_reachable": status.get("plug_reachable", False) } } events.append(event) db_session.close() return events except Exception as e: logger.error(f"Fehler beim Abrufen der Kalender-Status: {str(e)}") return [] def get_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, any]: """ Holt den gecachten Status für eine Session Args: session_id: Session-ID printer_ids: Optional - spezifische Drucker-IDs Returns: Dict mit Session-spezifischen Status-Daten """ try: with self._session_cache_lock: session_data = self._session_cache.get(session_id, {}) # Prüfe Cache-Gültigkeit cache_time = session_data.get('timestamp', datetime.min) if (datetime.now() - cache_time).total_seconds() > self._session_cache_ttl: # Cache abgelaufen self._session_cache.pop(session_id, None) return self._create_fresh_session_status(session_id, printer_ids) # Wenn spezifische Drucker angefragt, filtere diese if printer_ids: filtered_status = {} for printer_id in printer_ids: if str(printer_id) in session_data.get('printers', {}): filtered_status[str(printer_id)] = session_data['printers'][str(printer_id)] return { 'timestamp': session_data['timestamp'], 'session_id': session_id, 'printers': filtered_status, 'from_cache': True } return session_data except Exception as e: logger.error(f"Fehler beim Abrufen des Session-Status: {str(e)}") return self._create_fresh_session_status(session_id, printer_ids) def update_session_status(self, session_id: str, printer_id: int = None) -> bool: """ Aktualisiert den Session-Status-Cache Args: session_id: Session-ID printer_id: Optional - spezifischer Drucker Returns: bool: True wenn erfolgreich """ try: with self._session_cache_lock: if printer_id: # Einzelnen Drucker aktualisieren printer_status = self.get_printer_status(printer_id) if session_id not in self._session_cache: self._session_cache[session_id] = { 'timestamp': datetime.now(), 'session_id': session_id, 'printers': {} } self._session_cache[session_id]['printers'][str(printer_id)] = printer_status self._session_cache[session_id]['timestamp'] = datetime.now() else: # Alle Drucker aktualisieren self._session_cache[session_id] = self._create_fresh_session_status(session_id) logger.debug(f"Session-Status für {session_id} aktualisiert") return True except Exception as e: logger.error(f"Fehler beim Aktualisieren des Session-Status: {str(e)}") return False def clear_session_cache(self, session_id: str = None) -> bool: """ Löscht Session-Cache Args: session_id: Optional - spezifische Session, sonst alle Returns: bool: True wenn erfolgreich """ try: with self._session_cache_lock: if session_id: self._session_cache.pop(session_id, None) logger.debug(f"Session-Cache für {session_id} gelöscht") else: self._session_cache.clear() logger.debug("Kompletter Session-Cache gelöscht") return True except Exception as e: logger.error(f"Fehler beim Löschen des Session-Cache: {str(e)}") return False def _create_fresh_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, any]: """ Erstellt frischen Session-Status Args: session_id: Session-ID printer_ids: Optional - spezifische Drucker-IDs Returns: Dict mit frischen Status-Daten """ try: db_session = get_db_session() # Alle oder spezifische Drucker laden if printer_ids: printers = db_session.query(Printer).filter(Printer.id.in_(printer_ids)).all() else: printers = db_session.query(Printer).all() session_data = { 'timestamp': datetime.now(), 'session_id': session_id, 'printers': {}, 'from_cache': False } # Status für jeden Drucker abrufen for printer in printers: printer_status = self.get_printer_status(printer.id) session_data['printers'][str(printer.id)] = printer_status # In Session-Cache speichern with self._session_cache_lock: self._session_cache[session_id] = session_data db_session.close() return session_data except Exception as e: logger.error(f"Fehler beim Erstellen frischen Session-Status: {str(e)}") return { 'timestamp': datetime.now(), 'session_id': session_id, 'printers': {}, 'error': str(e), 'from_cache': False } def get_session_cache_stats(self) -> Dict[str, any]: """ Gibt Session-Cache-Statistiken zurück Returns: Dict mit Cache-Statistiken """ try: with self._session_cache_lock: stats = { 'total_sessions': len(self._session_cache), 'cache_ttl_seconds': self._session_cache_ttl, 'cache_size_bytes': len(str(self._session_cache)), 'sessions': {} } for session_id, data in self._session_cache.items(): stats['sessions'][session_id] = { 'timestamp': data.get('timestamp', datetime.min).isoformat(), 'printer_count': len(data.get('printers', {})), 'age_seconds': (datetime.now() - data.get('timestamp', datetime.now())).total_seconds() } return stats except Exception as e: logger.error(f"Fehler beim Abrufen der Cache-Statistiken: {str(e)}") return {'error': str(e)} def cleanup_expired_session_cache(self) -> int: """ Bereinigt abgelaufene Session-Cache-Einträge Returns: int: Anzahl gelöschter Einträge """ try: expired_count = 0 current_time = datetime.now() with self._session_cache_lock: expired_sessions = [] for session_id, data in self._session_cache.items(): cache_time = data.get('timestamp', datetime.min) if (current_time - cache_time).total_seconds() > self._session_cache_ttl: expired_sessions.append(session_id) for session_id in expired_sessions: self._session_cache.pop(session_id, None) expired_count += 1 if expired_count > 0: logger.info(f"Session-Cache bereinigt: {expired_count} abgelaufene Einträge entfernt") return expired_count except Exception as e: logger.error(f"Fehler beim Bereinigen des Session-Cache: {str(e)}") return 0 def _get_status_color(self, status: str) -> str: """Gibt die Farbe für einen Status zurück""" colors = { "scheduled": "#3788d8", "running": "#28a745", "finished": "#6c757d", "aborted": "#dc3545" } return colors.get(status, "#6c757d") # Globale Instanz tapo_status_manager = TapoStatusManager() def get_tapo_status_manager() -> TapoStatusManager: """Gibt die globale TapoStatusManager-Instanz zurück""" return tapo_status_manager