manage-your-printer/utils/job_scheduler.py
2025-06-04 10:03:22 +02:00

729 lines
28 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
import logging
from typing import Dict, Callable, Any, List, Optional, Union
from datetime import datetime, timedelta
from PyP100 import PyP110
from sqlalchemy.orm import joinedload
from utils.logging_config import get_logger
from models import Job, Printer, get_db_session
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
# Lazy logger initialization
_logger = None
def get_scheduler_logger():
"""Lazy initialization of the scheduler logger."""
global _logger
if _logger is None:
_logger = get_logger("scheduler")
return _logger
class BackgroundTaskScheduler:
"""
Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt.
Tasks können als Platzhalter registriert und später konfiguriert werden.
"""
def __init__(self):
self._tasks: Dict[str, Dict[str, Any]] = {}
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._running = False
self._start_time: Optional[datetime] = None
self.logger = get_scheduler_logger()
def register_task(self,
task_id: str,
func: Callable,
interval: int = 60,
args: List = None,
kwargs: Dict = None,
enabled: bool = True) -> bool:
"""
Registriert eine neue Hintergrund-Task.
Args:
task_id: Eindeutige ID für die Task
func: Die auszuführende Funktion
interval: Intervall in Sekunden zwischen den Ausführungen
args: Positionsargumente für die Funktion
kwargs: Schlüsselwortargumente für die Funktion
enabled: Ob die Task aktiviert sein soll
Returns:
bool: True wenn erfolgreich, False wenn die ID bereits existiert
"""
if task_id in self._tasks:
self.logger.error(f"Task mit ID {task_id} existiert bereits")
return False
self._tasks[task_id] = {
"func": func,
"interval": interval,
"args": args or [],
"kwargs": kwargs or {},
"enabled": enabled,
"last_run": None,
"next_run": datetime.now() if enabled else None
}
self.logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}")
return True
def update_task(self,
task_id: str,
interval: Optional[int] = None,
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
enabled: Optional[bool] = None) -> bool:
"""
Aktualisiert die Konfiguration einer bestehenden Task.
Args:
task_id: ID der zu aktualisierenden Task
interval: Neues Intervall in Sekunden
args: Neue Positionsargumente
kwargs: Neue Schlüsselwortargumente
enabled: Neuer Aktivierungsstatus
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
if task_id not in self._tasks:
self.logger.error(f"Task mit ID {task_id} existiert nicht")
return False
task = self._tasks[task_id]
if interval is not None:
task["interval"] = interval
if args is not None:
task["args"] = args
if kwargs is not None:
task["kwargs"] = kwargs
if enabled is not None and enabled != task["enabled"]:
task["enabled"] = enabled
if enabled:
task["next_run"] = datetime.now()
else:
task["next_run"] = None
self.logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}")
return True
def remove_task(self, task_id: str) -> bool:
"""
Entfernt eine Task aus dem Scheduler.
Args:
task_id: ID der zu entfernenden Task
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
if task_id not in self._tasks:
self.logger.error(f"Task mit ID {task_id} existiert nicht")
return False
del self._tasks[task_id]
self.logger.info(f"Task {task_id} entfernt")
return True
def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]:
"""
Gibt Informationen zu einer Task oder allen Tasks zurück.
Args:
task_id: ID der Task oder None für alle Tasks
Returns:
Dict oder List: Task-Informationen
"""
if task_id is not None:
if task_id not in self._tasks:
return {}
task = self._tasks[task_id]
return {
"id": task_id,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
return [
{
"id": tid,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for tid, task in self._tasks.items()
]
def get_tasks(self) -> Dict[str, Dict[str, Any]]:
"""
Gibt alle Tasks mit ihren Konfigurationen zurück.
Returns:
Dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte
"""
return {
task_id: {
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for task_id, task in self._tasks.items()
}
def get_uptime(self) -> Optional[str]:
"""
Gibt die Laufzeit des Schedulers seit dem Start zurück.
Returns:
str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft
"""
if not self._running or not self._start_time:
return None
uptime = datetime.now() - self._start_time
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
return f"{days} Tage, {hours} Stunden, {minutes} Minuten"
elif hours > 0:
return f"{hours} Stunden, {minutes} Minuten"
else:
return f"{minutes} Minuten, {seconds} Sekunden"
def start(self) -> bool:
"""
Startet den Scheduler.
Returns:
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
"""
if self._running:
self.logger.warning("Scheduler läuft bereits")
return False
self._stop_event.clear()
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
self._running = True
self._start_time = datetime.now()
self.logger.info("Scheduler gestartet")
return True
def stop(self) -> bool:
"""
Stoppt den Scheduler.
Returns:
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
"""
if not self._running:
self.logger.warning("Scheduler läuft nicht")
return False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5.0)
self._running = False
self._start_time = None
self.logger.info("Scheduler gestoppt")
return True
def is_running(self) -> bool:
"""
Prüft, ob der Scheduler läuft.
Returns:
bool: True wenn der Scheduler läuft, sonst False
"""
return self._running
def _run(self) -> None:
"""Hauptloop des Schedulers."""
self.logger.info("Scheduler-Thread gestartet")
while not self._stop_event.is_set():
now = datetime.now()
for task_id, task in self._tasks.items():
if not task["enabled"] or not task["next_run"]:
continue
if now >= task["next_run"]:
try:
self.logger.debug(f"Führe Task {task_id} aus")
task["func"](*task["args"], **task["kwargs"])
task["last_run"] = now
task["next_run"] = now + timedelta(seconds=task["interval"])
self.logger.debug(f"Task {task_id} erfolgreich ausgeführt, nächste Ausführung: {task['next_run']}")
except Exception as e:
self.logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}")
# Trotzdem nächste Ausführung planen
task["next_run"] = now + timedelta(seconds=task["interval"])
# Schlafenszeit berechnen (1 Sekunde oder weniger)
time.sleep(1)
self.logger.info("Scheduler-Thread beendet")
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool:
"""
Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus.
Args:
ip: IP-Adresse der Steckdose
state: True = Ein, False = Aus
username: Benutzername für die Steckdose (wird überschrieben mit globalen Credentials)
password: Passwort für die Steckdose (wird überschrieben mit globalen Credentials)
Returns:
bool: True wenn erfolgreich geschaltet
"""
try:
# PyP100 importieren
try:
from PyP100 import PyP100
except ImportError:
self.logger.error("❌ PyP100-Modul nicht installiert - Steckdose kann nicht geschaltet werden")
return False
# IMMER globale Anmeldedaten verwenden (da diese funktionieren)
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
username = TAPO_USERNAME
password = TAPO_PASSWORD
self.logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip}")
# P100-Verbindung herstellen (P100 statt P110 verwenden)
p100 = PyP100.P100(ip, username, password)
# Handshake und Login durchführen
p100.handshake()
p100.login()
# Steckdose schalten
if state:
p100.turnOn()
self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet")
else:
p100.turnOff()
self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet")
return True
except Exception as e:
action = "ein" if state else "aus"
self.logger.error(f"❌ Fehler beim {action}schalten der Tapo-Steckdose {ip}: {str(e)}")
return False
def toggle_printer_plug(self, printer_id: int, state: bool) -> bool:
"""
Schaltet die Steckdose eines Druckers ein oder aus mit korrektem Status-Mapping:
- Steckdose AUS = Drucker ONLINE (bereit zum Drucken)
- Steckdose AN = Drucker PRINTING (druckt gerade)
Args:
printer_id: ID des Druckers
state: True für ein, False für aus
Returns:
bool: True wenn erfolgreich, False wenn fehlgeschlagen
"""
try:
# Drucker aus Datenbank holen
db_session = get_db_session()
printer = db_session.get(Printer, printer_id)
if not printer:
self.logger.error(f"❌ Drucker mit ID {printer_id} nicht gefunden")
db_session.close()
return False
# Konfiguration validieren
if not printer.plug_ip:
self.logger.error(f"❌ Unvollständige Steckdosen-Konfiguration für Drucker {printer.name}")
db_session.close()
return False
# Steckdose schalten
success = self.toggle_plug(
ip=printer.plug_ip,
state=state,
username=printer.plug_username, # Wird überschrieben mit globalen Credentials
password=printer.plug_password # Wird überschrieben mit globalen Credentials
)
if success:
# Status in Datenbank aktualisieren entsprechend der neuen Logik
if state:
# Steckdose eingeschaltet = Drucker druckt
printer.status = "printing"
self.logger.info(f"🖨️ Drucker {printer.name}: Status auf 'printing' gesetzt (Steckdose eingeschaltet)")
else:
# Steckdose ausgeschaltet = Drucker bereit
printer.status = "online"
self.logger.info(f"✅ Drucker {printer.name}: Status auf 'online' gesetzt (Steckdose ausgeschaltet - bereit)")
printer.last_checked = datetime.now()
db_session.commit()
self.logger.info(f"✅ Status für Drucker {printer.name} erfolgreich aktualisiert")
db_session.close()
return success
except Exception as e:
action = "ein" if state else "aus"
self.logger.error(f"❌ Fehler beim {action}schalten der Steckdose für Drucker {printer_id}: {str(e)}")
try:
db_session.close()
except:
pass
return False
def _check_jobs(self) -> None:
"""
Überprüft und verwaltet Druckjobs mit intelligentem Power Management:
- Startet anstehende Jobs (geplante Jobs)
- Beendet abgelaufene Jobs (schaltet Steckdose aus)
- Schaltet Drucker automatisch aus bei Leerlauf
- Schaltet Drucker automatisch ein bei neuen Jobs
"""
db_session = get_db_session()
try:
now = datetime.now()
# 1. Anstehende Jobs starten (geplante Jobs)
pending_jobs = db_session.query(Job).filter(
Job.status == "scheduled",
Job.start_at <= now
).all()
for job in pending_jobs:
self.logger.info(f"🚀 Starte geplanten Job {job.id}: {job.name}")
# Steckdose einschalten
if self.toggle_printer_plug(job.printer_id, True):
# Job als laufend markieren
job.status = "running"
db_session.commit()
self.logger.info(f"✅ Job {job.id} gestartet - Drucker eingeschaltet")
else:
self.logger.error(f"❌ Konnte Steckdose für Job {job.id} nicht einschalten")
# 2. Sofort-Jobs starten (Jobs die bereits hätten starten sollen)
immediate_jobs = db_session.query(Job).filter(
Job.status == "waiting_for_printer",
Job.start_at <= now
).all()
for job in immediate_jobs:
self.logger.info(f"⚡ Starte Sofort-Job {job.id}: {job.name}")
# Steckdose einschalten
if self.toggle_printer_plug(job.printer_id, True):
# Job als laufend markieren
job.status = "running"
db_session.commit()
self.logger.info(f"✅ Sofort-Job {job.id} gestartet - Drucker automatisch eingeschaltet")
else:
self.logger.error(f"❌ Konnte Steckdose für Sofort-Job {job.id} nicht einschalten")
# 3. Abgelaufene Jobs beenden
running_jobs = db_session.query(Job).filter(
Job.status == "running",
Job.end_at <= now
).all()
for job in running_jobs:
self.logger.info(f"🏁 Beende Job {job.id}: {job.name}")
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = now
db_session.commit()
self.logger.info(f"✅ Job {job.id} beendet")
# Prüfen ob weitere Jobs für diesen Drucker anstehen
pending_jobs_for_printer = db_session.query(Job).filter(
Job.printer_id == job.printer_id,
Job.status.in_(["scheduled", "running", "waiting_for_printer"])
).count()
if pending_jobs_for_printer == 0:
# Keine weiteren Jobs - Drucker ausschalten (Leerlauf-Management)
if self.toggle_printer_plug(job.printer_id, False):
self.logger.info(f"💤 Drucker {job.printer_id} automatisch ausgeschaltet - Leerlauf erkannt")
else:
self.logger.warning(f"⚠️ Konnte Drucker {job.printer_id} nicht ausschalten")
else:
self.logger.info(f"🔄 Drucker {job.printer_id} bleibt eingeschaltet - {pending_jobs_for_printer} weitere Jobs anstehend")
# 4. Intelligentes Leerlauf-Management für alle aktiven Drucker
active_printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None),
Printer.status == "online"
).all()
for printer in active_printers:
# Prüfen ob Jobs für diesen Drucker anstehen
active_jobs_count = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(["scheduled", "running", "waiting_for_printer"])
).count()
if active_jobs_count == 0:
# Keine Jobs anstehend - prüfen ob Drucker schon längere Zeit im Leerlauf ist
if printer.last_checked:
idle_time = now - printer.last_checked
# Drucker ausschalten wenn länger als 5 Minuten im Leerlauf
if idle_time.total_seconds() > 300: # 5 Minuten
if self.toggle_printer_plug(printer.id, False):
self.logger.info(f"💤 Drucker {printer.name} nach {idle_time.total_seconds()//60:.0f} Min Leerlauf ausgeschaltet")
else:
self.logger.warning(f"⚠️ Konnte Drucker {printer.name} nach Leerlauf nicht ausschalten")
except Exception as e:
self.logger.error(f"❌ Fehler bei Überprüfung der Jobs: {str(e)}")
try:
db_session.rollback()
except:
pass
finally:
db_session.close()
def handle_immediate_job(self, job_id: int) -> bool:
"""
Behandelt einen Job sofort (für Sofort-Start bei Job-Erstellung).
Args:
job_id: ID des zu startenden Jobs
Returns:
bool: True wenn Job erfolgreich gestartet wurde
"""
db_session = get_db_session()
try:
now = datetime.now()
# Job aus Datenbank laden
job = db_session.get(Job, job_id)
if not job:
self.logger.error(f"❌ Job {job_id} nicht gefunden")
db_session.close()
return False
# Nur Jobs behandeln die sofort starten sollen
if job.start_at > now:
self.logger.info(f"⏰ Job {job_id} ist für später geplant ({job.start_at}) - kein Sofort-Start")
db_session.close()
return False
# Nur Jobs in passenden Status
if job.status not in ["scheduled", "waiting_for_printer"]:
self.logger.info(f" Job {job_id} hat Status '{job.status}' - kein Sofort-Start nötig")
db_session.close()
return False
self.logger.info(f"⚡ Starte Sofort-Job {job_id}: {job.name} für Drucker {job.printer_id}")
# Steckdose einschalten
if self.toggle_printer_plug(job.printer_id, True):
# Job als laufend markieren
job.status = "running"
db_session.commit()
db_session.close()
self.logger.info(f"✅ Sofort-Job {job_id} erfolgreich gestartet - Drucker automatisch eingeschaltet")
return True
else:
self.logger.error(f"❌ Konnte Steckdose für Sofort-Job {job_id} nicht einschalten")
db_session.close()
return False
except Exception as e:
self.logger.error(f"❌ Fehler beim Starten von Sofort-Job {job_id}: {str(e)}")
try:
db_session.rollback()
db_session.close()
except:
pass
return False
def check_and_manage_printer_power(self, printer_id: int) -> bool:
"""
Prüft und verwaltet die Stromversorgung eines spezifischen Druckers.
Args:
printer_id: ID des zu prüfenden Druckers
Returns:
bool: True wenn Power-Management erfolgreich
"""
db_session = get_db_session()
try:
now = datetime.now()
# Drucker laden
printer = db_session.get(Printer, printer_id)
if not printer or not printer.plug_ip:
db_session.close()
return False
# Aktive Jobs für diesen Drucker prüfen
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running", "waiting_for_printer"])
).all()
current_jobs = [job for job in active_jobs if job.start_at <= now]
future_jobs = [job for job in active_jobs if job.start_at > now]
if current_jobs:
# Jobs laufen oder sollten laufen - Drucker einschalten
self.logger.info(f"🔋 Drucker {printer.name} benötigt Strom - {len(current_jobs)} aktive Jobs")
success = self.toggle_printer_plug(printer_id, True)
# Jobs von waiting_for_printer auf running umstellen
for job in current_jobs:
if job.status == "waiting_for_printer":
job.status = "running"
self.logger.info(f"🚀 Job {job.id} von 'waiting_for_printer' auf 'running' umgestellt")
db_session.commit()
db_session.close()
return success
elif future_jobs:
# Nur zukünftige Jobs - Drucker kann ausgeschaltet bleiben
next_job_time = min(job.start_at for job in future_jobs)
time_until_next = (next_job_time - now).total_seconds() / 60
self.logger.info(f"⏳ Drucker {printer.name} hat {len(future_jobs)} zukünftige Jobs, nächster in {time_until_next:.1f} Min")
# Drucker ausschalten wenn nächster Job erst in mehr als 10 Minuten
if time_until_next > 10:
success = self.toggle_printer_plug(printer_id, False)
db_session.close()
return success
else:
self.logger.info(f"🔄 Drucker {printer.name} bleibt eingeschaltet - nächster Job bald")
db_session.close()
return True
else:
# Keine Jobs - Drucker ausschalten (Leerlauf)
self.logger.info(f"💤 Drucker {printer.name} hat keine anstehenden Jobs - ausschalten")
success = self.toggle_printer_plug(printer_id, False)
db_session.close()
return success
except Exception as e:
self.logger.error(f"❌ Fehler beim Power-Management für Drucker {printer_id}: {str(e)}")
try:
db_session.close()
except:
pass
return False
def test_tapo_connection(ip_address: str, username: str = None, password: str = None) -> dict:
"""
Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose (optional)
password: Passwort für die Steckdose (optional)
Returns:
dict: Ergebnis mit Status und Informationen
"""
logger = get_logger("tapo")
result = {
"success": False,
"message": "",
"device_info": None,
"error": None
}
try:
# Importiere PyP100 für Tapo-Unterstützung
try:
from PyP100 import PyP100
except ImportError:
result["message"] = "PyP100-Modul nicht verfügbar"
result["error"] = "ModuleNotFound"
logger.error("PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen nicht testen")
return result
# Verwende globale Anmeldedaten falls nicht angegeben
if not username or not password:
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
username = TAPO_USERNAME
password = TAPO_PASSWORD
logger.debug(f"Verwende globale Tapo-Anmeldedaten für {ip_address}")
# TP-Link Tapo P100 Verbindung herstellen
p100 = PyP100.P100(ip_address, username, password)
p100.handshake() # Authentifizierung
p100.login() # Login
# Geräteinformationen abrufen
device_info = p100.getDeviceInfo()
result["success"] = True
result["message"] = "Verbindung erfolgreich"
result["device_info"] = device_info
logger.info(f"Tapo-Verbindung zu {ip_address} erfolgreich: {device_info.get('nickname', 'Unbekannt')}")
except Exception as e:
result["success"] = False
result["message"] = f"Verbindungsfehler: {str(e)}"
result["error"] = str(e)
logger.error(f"Fehler bei Tapo-Test zu {ip_address}: {str(e)}")
return result
# Scheduler-Instanz erzeugen
scheduler = BackgroundTaskScheduler()
# Standardaufgaben registrieren - reduziertes Intervall für bessere Reaktionszeit
scheduler.register_task("check_jobs", scheduler._check_jobs, interval=30)
# Alias für Kompatibilität
JobScheduler = BackgroundTaskScheduler
def get_job_scheduler() -> BackgroundTaskScheduler:
"""
Gibt den globalen Job-Scheduler zurück.
Returns:
BackgroundTaskScheduler: Der globale Scheduler
"""
return scheduler