""" Tapo Status Manager - Verwaltung der 3 Steckdosen-Status Dieser Manager stellt sicher, dass: 1. Alle 6 Drucker/Steckdosen immer angezeigt werden 2. Die 3 Status korrekt verwaltet werden: an, aus, nicht erreichbar 3. Der Status persistent gespeichert wird 4. Die automatische Steuerung basierend auf Jobs funktioniert """ from typing import Dict, Tuple, Optional, List from datetime import datetime, timedelta import asyncio from concurrent.futures import ThreadPoolExecutor import threading from models import Printer, Job, PlugStatusLog, get_db_session from utils.logging_config import get_logger logger = get_logger("tapo_status_manager") class TapoStatusManager: """ Zentraler Manager für Tapo-Steckdosen-Status """ # Die 3 möglichen Status-Zustände STATUS_ON = "on" STATUS_OFF = "off" STATUS_UNREACHABLE = "unreachable" # Status-Mapping für UI STATUS_DISPLAY = { STATUS_ON: {"text": "An", "color": "green", "icon": "power"}, STATUS_OFF: {"text": "Aus", "color": "gray", "icon": "power-off"}, STATUS_UNREACHABLE: {"text": "Nicht erreichbar", "color": "red", "icon": "exclamation-triangle"} } def __init__(self): """Initialisiert den Status-Manager""" self._status_cache = {} self._cache_lock = threading.RLock() self._last_check = {} self.check_interval = 30 # Sekunden zwischen Status-Checks # Thread-Pool für asynchrone Operationen self._executor = ThreadPoolExecutor(max_workers=6) logger.info("TapoStatusManager initialisiert") def get_printer_status(self, printer_id: int) -> Dict[str, any]: """ Gibt den aktuellen Status eines Druckers zurück Args: printer_id: ID des Druckers Returns: Dict mit Status-Informationen """ with self._cache_lock: # Aus Cache holen wenn vorhanden und aktuell if printer_id in self._status_cache: cache_data = self._status_cache[printer_id] if self._is_cache_valid(printer_id): return cache_data # Neuen Status abrufen return self._fetch_printer_status(printer_id) def get_all_printer_status(self) -> List[Dict[str, any]]: """ Gibt den Status aller Drucker zurück Returns: Liste mit Status-Informationen aller Drucker """ try: db_session = get_db_session() printers = db_session.query(Printer).all() status_list = [] # Status für jeden Drucker abrufen for printer in printers: status = self.get_printer_status(printer.id) status_list.append(status) db_session.close() return status_list except Exception as e: logger.error(f"Fehler beim Abrufen aller Drucker-Status: {str(e)}") return [] def _fetch_printer_status(self, printer_id: int) -> Dict[str, any]: """ Holt den aktuellen Status eines Druckers Args: printer_id: ID des Druckers Returns: Dict mit Status-Informationen """ try: db_session = get_db_session() printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: logger.warning(f"Drucker {printer_id} nicht gefunden") return self._create_error_status(printer_id, "Drucker nicht gefunden") # Basis-Status erstellen status_info = { "id": printer.id, "name": printer.name, "model": printer.model, "location": printer.location, "ip_address": printer.ip_address, "has_plug": bool(printer.plug_ip), "plug_ip": printer.plug_ip, "active": printer.active, "last_checked": datetime.now() } # Wenn keine Steckdose konfiguriert if not printer.plug_ip: status_info.update({ "plug_status": "no_plug", "plug_reachable": False, "power_status": None, "can_control": False }) else: # Tapo-Status abrufen plug_status = self._check_tapo_status(printer) status_info.update(plug_status) # Aktuelle Jobs prüfen active_job = self._get_active_job(printer_id, db_session) if active_job: status_info["current_job"] = { "id": active_job.id, "name": active_job.name, "user": active_job.user.name, "start_at": active_job.start_at.isoformat(), "end_at": active_job.end_at.isoformat(), "status": active_job.status } else: status_info["current_job"] = None # Nächster geplanter Job next_job = self._get_next_job(printer_id, db_session) if next_job: status_info["next_job"] = { "id": next_job.id, "name": next_job.name, "user": next_job.user.name, "start_at": next_job.start_at.isoformat(), "starts_in_minutes": int((next_job.start_at - datetime.now()).total_seconds() / 60) } else: status_info["next_job"] = None # Status in Cache speichern with self._cache_lock: self._status_cache[printer_id] = status_info self._last_check[printer_id] = datetime.now() # Status in Datenbank loggen self._log_status(printer, status_info.get("plug_status", "unknown")) db_session.close() return status_info except Exception as e: logger.error(f"Fehler beim Abrufen des Status für Drucker {printer_id}: {str(e)}") return self._create_error_status(printer_id, str(e)) def _check_tapo_status(self, printer: Printer) -> Dict[str, any]: """ Prüft den Tapo-Steckdosen-Status Args: printer: Printer-Objekt Returns: Dict mit Tapo-Status """ try: # Tapo-Controller importieren from utils.hardware_integration import tapo_controller if not tapo_controller: return { "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "power_status": None, "can_control": False, "error": "Tapo-Controller nicht verfügbar" } # Status abrufen reachable, plug_status = tapo_controller.check_outlet_status( printer.plug_ip, printer_id=printer.id ) if reachable: # Erfolgreiche Verbindung return { "plug_status": self.STATUS_ON if plug_status == "on" else self.STATUS_OFF, "plug_reachable": True, "power_status": plug_status, "can_control": True } else: # Steckdose nicht erreichbar return { "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "power_status": None, "can_control": False, "error": "Steckdose nicht erreichbar" } except Exception as e: logger.error(f"Fehler beim Prüfen des Tapo-Status für {printer.name}: {str(e)}") return { "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "power_status": None, "can_control": False, "error": str(e) } def control_plug(self, printer_id: int, action: str) -> Tuple[bool, str]: """ Steuert eine Tapo-Steckdose Args: printer_id: ID des Druckers action: "on" oder "off" Returns: Tuple (Erfolg, Nachricht) """ try: db_session = get_db_session() printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: return False, "Drucker nicht gefunden" if not printer.plug_ip: return False, "Keine Steckdose konfiguriert" # Tapo-Controller verwenden from utils.hardware_integration import tapo_controller if not tapo_controller: return False, "Tapo-Controller nicht verfügbar" # Aktion ausführen success = False if action == "on": success = tapo_controller.turn_on_outlet(printer.plug_ip, printer_id) elif action == "off": success = tapo_controller.turn_off_outlet(printer.plug_ip, printer_id) else: return False, f"Ungültige Aktion: {action}" if success: # Cache invalidieren with self._cache_lock: if printer_id in self._status_cache: del self._status_cache[printer_id] # Status loggen self._log_status(printer, action, source="manual") db_session.close() return True, f"Steckdose erfolgreich {action}" else: db_session.close() return False, "Steckdose konnte nicht gesteuert werden" except Exception as e: logger.error(f"Fehler beim Steuern der Steckdose für Drucker {printer_id}: {str(e)}") return False, str(e) def check_and_control_for_jobs(self): """ Prüft alle Jobs und steuert Steckdosen entsprechend Diese Methode sollte regelmäßig vom Scheduler aufgerufen werden """ try: db_session = get_db_session() now = datetime.now() # Jobs die starten sollten jobs_to_start = db_session.query(Job).filter( Job.status == "scheduled", Job.start_at <= now ).all() for job in jobs_to_start: logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}") success, msg = self.control_plug(job.printer_id, "on") if success: job.status = "running" logger.info(f"Steckdose für Job {job.id} eingeschaltet") else: logger.error(f"Fehler beim Einschalten für Job {job.id}: {msg}") # Jobs die enden sollten jobs_to_end = db_session.query(Job).filter( Job.status == "running", Job.end_at <= now ).all() for job in jobs_to_end: logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}") success, msg = self.control_plug(job.printer_id, "off") if success: job.status = "finished" job.actual_end_time = now logger.info(f"Steckdose für Job {job.id} ausgeschaltet") else: logger.error(f"Fehler beim Ausschalten für Job {job.id}: {msg}") db_session.commit() db_session.close() except Exception as e: logger.error(f"Fehler bei der automatischen Job-Steuerung: {str(e)}") def _get_active_job(self, printer_id: int, db_session) -> Optional[Job]: """Gibt den aktuell aktiven Job für einen Drucker zurück""" return db_session.query(Job).filter( Job.printer_id == printer_id, Job.status == "running" ).first() def _get_next_job(self, printer_id: int, db_session) -> Optional[Job]: """Gibt den nächsten geplanten Job für einen Drucker zurück""" return db_session.query(Job).filter( Job.printer_id == printer_id, Job.status == "scheduled", Job.start_at > datetime.now() ).order_by(Job.start_at).first() def _is_cache_valid(self, printer_id: int) -> bool: """Prüft ob der Cache noch gültig ist""" if printer_id not in self._last_check: return False age = (datetime.now() - self._last_check[printer_id]).total_seconds() return age < self.check_interval def _create_error_status(self, printer_id: int, error: str) -> Dict[str, any]: """Erstellt einen Fehler-Status""" return { "id": printer_id, "name": f"Drucker {printer_id}", "plug_status": self.STATUS_UNREACHABLE, "plug_reachable": False, "error": error, "last_checked": datetime.now() } def _log_status(self, printer: Printer, status: str, source: str = "system"): """Loggt einen Status in die Datenbank""" try: PlugStatusLog.log_status_change( printer_id=printer.id, status=status, source=source, ip_address=printer.plug_ip ) except Exception as e: logger.error(f"Fehler beim Loggen des Status: {str(e)}") def get_status_for_calendar(self, start_date: datetime, end_date: datetime) -> List[Dict]: """ Gibt Status-Informationen für die Kalender-Ansicht zurück Args: start_date: Start-Datum end_date: End-Datum Returns: Liste mit Status-Events für den Kalender """ try: db_session = get_db_session() # Jobs im Zeitraum abrufen jobs = db_session.query(Job).filter( Job.start_at <= end_date, Job.end_at >= start_date ).all() events = [] for job in jobs: # Drucker-Status für Job printer = job.printer status = self.get_printer_status(printer.id) event = { "id": f"job_{job.id}", "title": f"{printer.name}: {job.name}", "start": job.start_at.isoformat(), "end": job.end_at.isoformat(), "backgroundColor": self._get_status_color(job.status), "extendedProps": { "job_id": job.id, "printer_id": printer.id, "printer_name": printer.name, "printer_status": status.get("plug_status", "unknown"), "job_status": job.status, "user": job.user.name, "plug_reachable": status.get("plug_reachable", False) } } events.append(event) db_session.close() return events except Exception as e: logger.error(f"Fehler beim Abrufen der Kalender-Status: {str(e)}") return [] def _get_status_color(self, status: str) -> str: """Gibt die Farbe für einen Status zurück""" colors = { "scheduled": "#3788d8", "running": "#28a745", "finished": "#6c757d", "aborted": "#dc3545" } return colors.get(status, "#6c757d") # Globale Instanz tapo_status_manager = TapoStatusManager() def get_tapo_status_manager() -> TapoStatusManager: """Gibt die globale TapoStatusManager-Instanz zurück""" return tapo_status_manager