import threading import time import logging from typing import Dict, Callable, Any, List, Optional, Union from datetime import datetime, timedelta from PyP100 import PyP110 from sqlalchemy.orm import joinedload from utils.logging_config import get_logger from models import Job, Printer, get_db_session from utils.settings import TAPO_USERNAME, TAPO_PASSWORD # Lazy logger initialization _logger = None def get_scheduler_logger(): """Lazy initialization of the scheduler logger.""" global _logger if _logger is None: _logger = get_logger("scheduler") return _logger class BackgroundTaskScheduler: """ Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt. Tasks können als Platzhalter registriert und später konfiguriert werden. """ def __init__(self): self._tasks: Dict[str, Dict[str, Any]] = {} self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._running = False self._start_time: Optional[datetime] = None self.logger = get_scheduler_logger() def register_task(self, task_id: str, func: Callable, interval: int = 60, args: List = None, kwargs: Dict = None, enabled: bool = True) -> bool: """ Registriert eine neue Hintergrund-Task. Args: task_id: Eindeutige ID für die Task func: Die auszuführende Funktion interval: Intervall in Sekunden zwischen den Ausführungen args: Positionsargumente für die Funktion kwargs: Schlüsselwortargumente für die Funktion enabled: Ob die Task aktiviert sein soll Returns: bool: True wenn erfolgreich, False wenn die ID bereits existiert """ if task_id in self._tasks: self.logger.error(f"Task mit ID {task_id} existiert bereits") return False self._tasks[task_id] = { "func": func, "interval": interval, "args": args or [], "kwargs": kwargs or {}, "enabled": enabled, "last_run": None, "next_run": datetime.now() if enabled else None } self.logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}") return True def update_task(self, task_id: str, interval: Optional[int] = None, args: Optional[List] = None, kwargs: Optional[Dict] = None, enabled: Optional[bool] = None) -> bool: """ Aktualisiert die Konfiguration einer bestehenden Task. Args: task_id: ID der zu aktualisierenden Task interval: Neues Intervall in Sekunden args: Neue Positionsargumente kwargs: Neue Schlüsselwortargumente enabled: Neuer Aktivierungsstatus Returns: bool: True wenn erfolgreich, False wenn die ID nicht existiert """ if task_id not in self._tasks: self.logger.error(f"Task mit ID {task_id} existiert nicht") return False task = self._tasks[task_id] if interval is not None: task["interval"] = interval if args is not None: task["args"] = args if kwargs is not None: task["kwargs"] = kwargs if enabled is not None and enabled != task["enabled"]: task["enabled"] = enabled if enabled: task["next_run"] = datetime.now() else: task["next_run"] = None self.logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}") return True def remove_task(self, task_id: str) -> bool: """ Entfernt eine Task aus dem Scheduler. Args: task_id: ID der zu entfernenden Task Returns: bool: True wenn erfolgreich, False wenn die ID nicht existiert """ if task_id not in self._tasks: self.logger.error(f"Task mit ID {task_id} existiert nicht") return False del self._tasks[task_id] self.logger.info(f"Task {task_id} entfernt") return True def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]: """ Gibt Informationen zu einer Task oder allen Tasks zurück. Args: task_id: ID der Task oder None für alle Tasks Returns: Dict oder List: Task-Informationen """ if task_id is not None: if task_id not in self._tasks: return {} task = self._tasks[task_id] return { "id": task_id, "interval": task["interval"], "enabled": task["enabled"], "last_run": task["last_run"].isoformat() if task["last_run"] else None, "next_run": task["next_run"].isoformat() if task["next_run"] else None } return [ { "id": tid, "interval": task["interval"], "enabled": task["enabled"], "last_run": task["last_run"].isoformat() if task["last_run"] else None, "next_run": task["next_run"].isoformat() if task["next_run"] else None } for tid, task in self._tasks.items() ] def get_tasks(self) -> Dict[str, Dict[str, Any]]: """ Gibt alle Tasks mit ihren Konfigurationen zurück. Returns: Dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte """ return { task_id: { "interval": task["interval"], "enabled": task["enabled"], "last_run": task["last_run"].isoformat() if task["last_run"] else None, "next_run": task["next_run"].isoformat() if task["next_run"] else None } for task_id, task in self._tasks.items() } def get_uptime(self) -> Optional[str]: """ Gibt die Laufzeit des Schedulers seit dem Start zurück. Returns: str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft """ if not self._running or not self._start_time: return None uptime = datetime.now() - self._start_time days = uptime.days hours, remainder = divmod(uptime.seconds, 3600) minutes, seconds = divmod(remainder, 60) if days > 0: return f"{days} Tage, {hours} Stunden, {minutes} Minuten" elif hours > 0: return f"{hours} Stunden, {minutes} Minuten" else: return f"{minutes} Minuten, {seconds} Sekunden" def start(self) -> bool: """ Startet den Scheduler. Returns: bool: True wenn erfolgreich gestartet, False wenn bereits läuft """ if self._running: self.logger.warning("Scheduler läuft bereits") return False self._stop_event.clear() self._thread = threading.Thread(target=self._run) self._thread.daemon = True self._thread.start() self._running = True self._start_time = datetime.now() self.logger.info("Scheduler gestartet") return True def stop(self) -> bool: """ Stoppt den Scheduler. Returns: bool: True wenn erfolgreich gestoppt, False wenn nicht läuft """ if not self._running: self.logger.warning("Scheduler läuft nicht") return False self._stop_event.set() if self._thread: self._thread.join(timeout=5.0) self._running = False self._start_time = None self.logger.info("Scheduler gestoppt") return True def is_running(self) -> bool: """ Prüft, ob der Scheduler läuft. Returns: bool: True wenn der Scheduler läuft, sonst False """ return self._running def _run(self) -> None: """Hauptloop des Schedulers.""" self.logger.info("Scheduler-Thread gestartet") while not self._stop_event.is_set(): now = datetime.now() for task_id, task in self._tasks.items(): if not task["enabled"] or not task["next_run"]: continue if now >= task["next_run"]: try: self.logger.debug(f"Führe Task {task_id} aus") task["func"](*task["args"], **task["kwargs"]) task["last_run"] = now task["next_run"] = now + timedelta(seconds=task["interval"]) self.logger.debug(f"Task {task_id} erfolgreich ausgeführt, nächste Ausführung: {task['next_run']}") except Exception as e: self.logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}") # Trotzdem nächste Ausführung planen task["next_run"] = now + timedelta(seconds=task["interval"]) # Schlafenszeit berechnen (1 Sekunde oder weniger) time.sleep(1) self.logger.info("Scheduler-Thread beendet") def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool: """ Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus. Args: ip: IP-Adresse der Steckdose state: True = Ein, False = Aus username: Benutzername für die Steckdose (wird überschrieben mit globalen Credentials) password: Passwort für die Steckdose (wird überschrieben mit globalen Credentials) Returns: bool: True wenn erfolgreich geschaltet """ try: # PyP100 importieren try: from PyP100 import PyP100 except ImportError: self.logger.error("❌ PyP100-Modul nicht installiert - Steckdose kann nicht geschaltet werden") return False # IMMER globale Anmeldedaten verwenden (da diese funktionieren) from utils.settings import TAPO_USERNAME, TAPO_PASSWORD username = TAPO_USERNAME password = TAPO_PASSWORD self.logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip}") # P100-Verbindung herstellen (P100 statt P110 verwenden) p100 = PyP100.P100(ip, username, password) # Handshake und Login durchführen p100.handshake() p100.login() # Steckdose schalten if state: p100.turnOn() self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet") else: p100.turnOff() self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet") return True except Exception as e: action = "ein" if state else "aus" self.logger.error(f"❌ Fehler beim {action}schalten der Tapo-Steckdose {ip}: {str(e)}") return False def toggle_printer_plug(self, printer_id: int, state: bool) -> bool: """ Schaltet die Steckdose eines Druckers ein oder aus mit korrektem Status-Mapping: - Steckdose AUS = Drucker ONLINE (bereit zum Drucken) - Steckdose AN = Drucker PRINTING (druckt gerade) Args: printer_id: ID des Druckers state: True für ein, False für aus Returns: bool: True wenn erfolgreich, False wenn fehlgeschlagen """ try: # Drucker aus Datenbank holen db_session = get_db_session() printer = db_session.get(Printer, printer_id) if not printer: self.logger.error(f"❌ Drucker mit ID {printer_id} nicht gefunden") db_session.close() return False # Konfiguration validieren if not printer.plug_ip: self.logger.error(f"❌ Unvollständige Steckdosen-Konfiguration für Drucker {printer.name}") db_session.close() return False # Steckdose schalten success = self.toggle_plug( ip=printer.plug_ip, state=state, username=printer.plug_username, # Wird überschrieben mit globalen Credentials password=printer.plug_password # Wird überschrieben mit globalen Credentials ) if success: # Status in Datenbank aktualisieren entsprechend der neuen Logik if state: # Steckdose eingeschaltet = Drucker druckt printer.status = "printing" self.logger.info(f"🖨️ Drucker {printer.name}: Status auf 'printing' gesetzt (Steckdose eingeschaltet)") else: # Steckdose ausgeschaltet = Drucker bereit printer.status = "online" self.logger.info(f"✅ Drucker {printer.name}: Status auf 'online' gesetzt (Steckdose ausgeschaltet - bereit)") printer.last_checked = datetime.now() db_session.commit() self.logger.info(f"✅ Status für Drucker {printer.name} erfolgreich aktualisiert") db_session.close() return success except Exception as e: action = "ein" if state else "aus" self.logger.error(f"❌ Fehler beim {action}schalten der Steckdose für Drucker {printer_id}: {str(e)}") try: db_session.close() except: pass return False def _check_jobs(self) -> None: """ Überprüft und verwaltet Druckjobs mit intelligentem Power Management: - Startet anstehende Jobs (geplante Jobs) - Beendet abgelaufene Jobs (schaltet Steckdose aus) - Schaltet Drucker automatisch aus bei Leerlauf - Schaltet Drucker automatisch ein bei neuen Jobs """ db_session = get_db_session() try: now = datetime.now() # 1. Anstehende Jobs starten (geplante Jobs) pending_jobs = db_session.query(Job).filter( Job.status == "scheduled", Job.start_at <= now ).all() for job in pending_jobs: self.logger.info(f"🚀 Starte geplanten Job {job.id}: {job.name}") # Steckdose einschalten if self.toggle_printer_plug(job.printer_id, True): # Job als laufend markieren job.status = "running" db_session.commit() self.logger.info(f"✅ Job {job.id} gestartet - Drucker eingeschaltet") else: self.logger.error(f"❌ Konnte Steckdose für Job {job.id} nicht einschalten") # 2. Sofort-Jobs starten (Jobs die bereits hätten starten sollen) immediate_jobs = db_session.query(Job).filter( Job.status == "waiting_for_printer", Job.start_at <= now ).all() for job in immediate_jobs: self.logger.info(f"⚡ Starte Sofort-Job {job.id}: {job.name}") # Steckdose einschalten if self.toggle_printer_plug(job.printer_id, True): # Job als laufend markieren job.status = "running" db_session.commit() self.logger.info(f"✅ Sofort-Job {job.id} gestartet - Drucker automatisch eingeschaltet") else: self.logger.error(f"❌ Konnte Steckdose für Sofort-Job {job.id} nicht einschalten") # 3. Abgelaufene Jobs beenden running_jobs = db_session.query(Job).filter( Job.status == "running", Job.end_at <= now ).all() for job in running_jobs: self.logger.info(f"🏁 Beende Job {job.id}: {job.name}") # Job als beendet markieren job.status = "finished" job.actual_end_time = now db_session.commit() self.logger.info(f"✅ Job {job.id} beendet") # Prüfen ob weitere Jobs für diesen Drucker anstehen pending_jobs_for_printer = db_session.query(Job).filter( Job.printer_id == job.printer_id, Job.status.in_(["scheduled", "running", "waiting_for_printer"]) ).count() if pending_jobs_for_printer == 0: # Keine weiteren Jobs - Drucker ausschalten (Leerlauf-Management) if self.toggle_printer_plug(job.printer_id, False): self.logger.info(f"💤 Drucker {job.printer_id} automatisch ausgeschaltet - Leerlauf erkannt") else: self.logger.warning(f"⚠️ Konnte Drucker {job.printer_id} nicht ausschalten") else: self.logger.info(f"🔄 Drucker {job.printer_id} bleibt eingeschaltet - {pending_jobs_for_printer} weitere Jobs anstehend") # 4. Intelligentes Leerlauf-Management für alle aktiven Drucker active_printers = db_session.query(Printer).filter( Printer.active == True, Printer.plug_ip.isnot(None), Printer.status == "online" ).all() for printer in active_printers: # Prüfen ob Jobs für diesen Drucker anstehen active_jobs_count = db_session.query(Job).filter( Job.printer_id == printer.id, Job.status.in_(["scheduled", "running", "waiting_for_printer"]) ).count() if active_jobs_count == 0: # Keine Jobs anstehend - prüfen ob Drucker schon längere Zeit im Leerlauf ist if printer.last_checked: idle_time = now - printer.last_checked # Drucker ausschalten wenn länger als 5 Minuten im Leerlauf if idle_time.total_seconds() > 300: # 5 Minuten if self.toggle_printer_plug(printer.id, False): self.logger.info(f"💤 Drucker {printer.name} nach {idle_time.total_seconds()//60:.0f} Min Leerlauf ausgeschaltet") else: self.logger.warning(f"⚠️ Konnte Drucker {printer.name} nach Leerlauf nicht ausschalten") except Exception as e: self.logger.error(f"❌ Fehler bei Überprüfung der Jobs: {str(e)}") try: db_session.rollback() except: pass finally: db_session.close() def handle_immediate_job(self, job_id: int) -> bool: """ Behandelt einen Job sofort (für Sofort-Start bei Job-Erstellung). Args: job_id: ID des zu startenden Jobs Returns: bool: True wenn Job erfolgreich gestartet wurde """ db_session = get_db_session() try: now = datetime.now() # Job aus Datenbank laden job = db_session.get(Job, job_id) if not job: self.logger.error(f"❌ Job {job_id} nicht gefunden") db_session.close() return False # Nur Jobs behandeln die sofort starten sollen if job.start_at > now: self.logger.info(f"⏰ Job {job_id} ist für später geplant ({job.start_at}) - kein Sofort-Start") db_session.close() return False # Nur Jobs in passenden Status if job.status not in ["scheduled", "waiting_for_printer"]: self.logger.info(f"ℹ️ Job {job_id} hat Status '{job.status}' - kein Sofort-Start nötig") db_session.close() return False self.logger.info(f"⚡ Starte Sofort-Job {job_id}: {job.name} für Drucker {job.printer_id}") # Steckdose einschalten if self.toggle_printer_plug(job.printer_id, True): # Job als laufend markieren job.status = "running" db_session.commit() db_session.close() self.logger.info(f"✅ Sofort-Job {job_id} erfolgreich gestartet - Drucker automatisch eingeschaltet") return True else: self.logger.error(f"❌ Konnte Steckdose für Sofort-Job {job_id} nicht einschalten") db_session.close() return False except Exception as e: self.logger.error(f"❌ Fehler beim Starten von Sofort-Job {job_id}: {str(e)}") try: db_session.rollback() db_session.close() except: pass return False def check_and_manage_printer_power(self, printer_id: int) -> bool: """ Prüft und verwaltet die Stromversorgung eines spezifischen Druckers. Args: printer_id: ID des zu prüfenden Druckers Returns: bool: True wenn Power-Management erfolgreich """ db_session = get_db_session() try: now = datetime.now() # Drucker laden printer = db_session.get(Printer, printer_id) if not printer or not printer.plug_ip: db_session.close() return False # Aktive Jobs für diesen Drucker prüfen active_jobs = db_session.query(Job).filter( Job.printer_id == printer_id, Job.status.in_(["scheduled", "running", "waiting_for_printer"]) ).all() current_jobs = [job for job in active_jobs if job.start_at <= now] future_jobs = [job for job in active_jobs if job.start_at > now] if current_jobs: # Jobs laufen oder sollten laufen - Drucker einschalten self.logger.info(f"🔋 Drucker {printer.name} benötigt Strom - {len(current_jobs)} aktive Jobs") success = self.toggle_printer_plug(printer_id, True) # Jobs von waiting_for_printer auf running umstellen for job in current_jobs: if job.status == "waiting_for_printer": job.status = "running" self.logger.info(f"🚀 Job {job.id} von 'waiting_for_printer' auf 'running' umgestellt") db_session.commit() db_session.close() return success elif future_jobs: # Nur zukünftige Jobs - Drucker kann ausgeschaltet bleiben next_job_time = min(job.start_at for job in future_jobs) time_until_next = (next_job_time - now).total_seconds() / 60 self.logger.info(f"⏳ Drucker {printer.name} hat {len(future_jobs)} zukünftige Jobs, nächster in {time_until_next:.1f} Min") # Drucker ausschalten wenn nächster Job erst in mehr als 10 Minuten if time_until_next > 10: success = self.toggle_printer_plug(printer_id, False) db_session.close() return success else: self.logger.info(f"🔄 Drucker {printer.name} bleibt eingeschaltet - nächster Job bald") db_session.close() return True else: # Keine Jobs - Drucker ausschalten (Leerlauf) self.logger.info(f"💤 Drucker {printer.name} hat keine anstehenden Jobs - ausschalten") success = self.toggle_printer_plug(printer_id, False) db_session.close() return success except Exception as e: self.logger.error(f"❌ Fehler beim Power-Management für Drucker {printer_id}: {str(e)}") try: db_session.close() except: pass return False def test_tapo_connection(ip_address: str, username: str = None, password: str = None) -> dict: """ Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose. Args: ip_address: IP-Adresse der Steckdose username: Benutzername für die Steckdose (optional) password: Passwort für die Steckdose (optional) Returns: dict: Ergebnis mit Status und Informationen """ logger = get_logger("tapo") result = { "success": False, "message": "", "device_info": None, "error": None } try: # Importiere PyP100 für Tapo-Unterstützung try: from PyP100 import PyP100 except ImportError: result["message"] = "PyP100-Modul nicht verfügbar" result["error"] = "ModuleNotFound" logger.error("PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen nicht testen") return result # Verwende globale Anmeldedaten falls nicht angegeben if not username or not password: from utils.settings import TAPO_USERNAME, TAPO_PASSWORD username = TAPO_USERNAME password = TAPO_PASSWORD logger.debug(f"Verwende globale Tapo-Anmeldedaten für {ip_address}") # TP-Link Tapo P100 Verbindung herstellen p100 = PyP100.P100(ip_address, username, password) p100.handshake() # Authentifizierung p100.login() # Login # Geräteinformationen abrufen device_info = p100.getDeviceInfo() result["success"] = True result["message"] = "Verbindung erfolgreich" result["device_info"] = device_info logger.info(f"Tapo-Verbindung zu {ip_address} erfolgreich: {device_info.get('nickname', 'Unbekannt')}") except Exception as e: result["success"] = False result["message"] = f"Verbindungsfehler: {str(e)}" result["error"] = str(e) logger.error(f"Fehler bei Tapo-Test zu {ip_address}: {str(e)}") return result # Scheduler-Instanz erzeugen scheduler = BackgroundTaskScheduler() # Standardaufgaben registrieren - reduziertes Intervall für bessere Reaktionszeit scheduler.register_task("check_jobs", scheduler._check_jobs, interval=30) # Alias für Kompatibilität JobScheduler = BackgroundTaskScheduler def get_job_scheduler() -> BackgroundTaskScheduler: """ Gibt den globalen Job-Scheduler zurück. Returns: BackgroundTaskScheduler: Der globale Scheduler """ return scheduler