Files
Projektarbeit-MYP/backend/app/utils/job_scheduler.py

528 lines
18 KiB
Python

import threading
import time
import logging
from typing import Dict, Callable, Any, List, Optional, Union
from datetime import datetime, timedelta
from PyP100 import PyP110
from sqlalchemy.orm import joinedload
from utils.logging_config import get_logger
from models import Job, Printer, get_db_session
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
# Lazy logger initialization
_logger = None
def get_scheduler_logger():
"""Lazy initialization of the scheduler logger."""
global _logger
if _logger is None:
_logger = get_logger("scheduler")
return _logger
class BackgroundTaskScheduler:
"""
Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt.
Tasks können als Platzhalter registriert und später konfiguriert werden.
"""
def __init__(self):
self._tasks: Dict[str, Dict[str, Any]] = {}
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._running = False
self._start_time: Optional[datetime] = None
self.logger = get_scheduler_logger()
def register_task(self,
task_id: str,
func: Callable,
interval: int = 60,
args: List = None,
kwargs: Dict = None,
enabled: bool = True) -> bool:
"""
Registriert eine neue Hintergrund-Task.
Args:
task_id: Eindeutige ID für die Task
func: Die auszuführende Funktion
interval: Intervall in Sekunden zwischen den Ausführungen
args: Positionsargumente für die Funktion
kwargs: Schlüsselwortargumente für die Funktion
enabled: Ob die Task aktiviert sein soll
Returns:
bool: True wenn erfolgreich, False wenn die ID bereits existiert
"""
if task_id in self._tasks:
self.logger.error(f"Task mit ID {task_id} existiert bereits")
return False
self._tasks[task_id] = {
"func": func,
"interval": interval,
"args": args or [],
"kwargs": kwargs or {},
"enabled": enabled,
"last_run": None,
"next_run": datetime.now() if enabled else None
}
self.logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}")
return True
def update_task(self,
task_id: str,
interval: Optional[int] = None,
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
enabled: Optional[bool] = None) -> bool:
"""
Aktualisiert die Konfiguration einer bestehenden Task.
Args:
task_id: ID der zu aktualisierenden Task
interval: Neues Intervall in Sekunden
args: Neue Positionsargumente
kwargs: Neue Schlüsselwortargumente
enabled: Neuer Aktivierungsstatus
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
if task_id not in self._tasks:
self.logger.error(f"Task mit ID {task_id} existiert nicht")
return False
task = self._tasks[task_id]
if interval is not None:
task["interval"] = interval
if args is not None:
task["args"] = args
if kwargs is not None:
task["kwargs"] = kwargs
if enabled is not None and enabled != task["enabled"]:
task["enabled"] = enabled
if enabled:
task["next_run"] = datetime.now()
else:
task["next_run"] = None
self.logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}")
return True
def remove_task(self, task_id: str) -> bool:
"""
Entfernt eine Task aus dem Scheduler.
Args:
task_id: ID der zu entfernenden Task
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
if task_id not in self._tasks:
self.logger.error(f"Task mit ID {task_id} existiert nicht")
return False
del self._tasks[task_id]
self.logger.info(f"Task {task_id} entfernt")
return True
def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]:
"""
Gibt Informationen zu einer Task oder allen Tasks zurück.
Args:
task_id: ID der Task oder None für alle Tasks
Returns:
Dict oder List: Task-Informationen
"""
if task_id is not None:
if task_id not in self._tasks:
return {}
task = self._tasks[task_id]
return {
"id": task_id,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
return [
{
"id": tid,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for tid, task in self._tasks.items()
]
def get_tasks(self) -> Dict[str, Dict[str, Any]]:
"""
Gibt alle Tasks mit ihren Konfigurationen zurück.
Returns:
Dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte
"""
return {
task_id: {
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for task_id, task in self._tasks.items()
}
def get_uptime(self) -> Optional[str]:
"""
Gibt die Laufzeit des Schedulers seit dem Start zurück.
Returns:
str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft
"""
if not self._running or not self._start_time:
return None
uptime = datetime.now() - self._start_time
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
return f"{days} Tage, {hours} Stunden, {minutes} Minuten"
elif hours > 0:
return f"{hours} Stunden, {minutes} Minuten"
else:
return f"{minutes} Minuten, {seconds} Sekunden"
def start(self) -> bool:
"""
Startet den Scheduler.
Returns:
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
"""
if self._running:
self.logger.warning("Scheduler läuft bereits")
return False
self._stop_event.clear()
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
self._running = True
self._start_time = datetime.now()
self.logger.info("Scheduler gestartet")
return True
def stop(self) -> bool:
"""
Stoppt den Scheduler.
Returns:
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
"""
if not self._running:
self.logger.warning("Scheduler läuft nicht")
return False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5.0)
self._running = False
self._start_time = None
self.logger.info("Scheduler gestoppt")
return True
def is_running(self) -> bool:
"""
Prüft, ob der Scheduler läuft.
Returns:
bool: True wenn der Scheduler läuft, sonst False
"""
return self._running
def _run(self) -> None:
"""Hauptloop des Schedulers."""
self.logger.info("Scheduler-Thread gestartet")
while not self._stop_event.is_set():
now = datetime.now()
for task_id, task in self._tasks.items():
if not task["enabled"] or not task["next_run"]:
continue
if now >= task["next_run"]:
try:
self.logger.debug(f"Führe Task {task_id} aus")
task["func"](*task["args"], **task["kwargs"])
task["last_run"] = now
task["next_run"] = now + timedelta(seconds=task["interval"])
self.logger.debug(f"Task {task_id} erfolgreich ausgeführt, nächste Ausführung: {task['next_run']}")
except Exception as e:
self.logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}")
# Trotzdem nächste Ausführung planen
task["next_run"] = now + timedelta(seconds=task["interval"])
# Schlafenszeit berechnen (1 Sekunde oder weniger)
time.sleep(1)
self.logger.info("Scheduler-Thread beendet")
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool:
"""
Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus.
Args:
ip: IP-Adresse der Steckdose
state: True = Ein, False = Aus
username: Benutzername für die Steckdose (optional)
password: Passwort für die Steckdose (optional)
Returns:
bool: True wenn erfolgreich geschaltet
"""
try:
# PyP100 importieren
try:
from PyP100 import PyP100
except ImportError:
self.logger.error("❌ PyP100-Modul nicht installiert - Steckdose kann nicht geschaltet werden")
return False
# Anmeldedaten aus Einstellungen verwenden, falls nicht angegeben
if not username or not password:
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
username = TAPO_USERNAME
password = TAPO_PASSWORD
self.logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip}")
# P100-Verbindung herstellen (P100 statt P110 verwenden)
p100 = PyP100.P100(ip, username, password)
# Handshake und Login durchführen
p100.handshake()
p100.login()
# Steckdose schalten
if state:
p100.turnOn()
self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet")
else:
p100.turnOff()
self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet")
return True
except Exception as e:
action = "ein" if state else "aus"
self.logger.error(f"❌ Fehler beim {action}schalten der Tapo-Steckdose {ip}: {str(e)}")
return False
def toggle_printer_plug(self, printer_id: int, state: bool) -> bool:
"""
Schaltet die Steckdose eines Druckers ein oder aus.
Args:
printer_id: ID des Druckers
state: True für ein, False für aus
Returns:
bool: True wenn erfolgreich, False wenn fehlgeschlagen
"""
try:
# Drucker aus Datenbank holen
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
self.logger.error(f"❌ Drucker mit ID {printer_id} nicht gefunden")
db_session.close()
return False
# Konfiguration validieren
if not printer.plug_ip:
self.logger.error(f"❌ Unvollständige Steckdosen-Konfiguration für Drucker {printer.name}")
db_session.close()
return False
# Steckdose schalten
success = self.toggle_plug(
ip=printer.plug_ip,
state=state,
username=printer.plug_username,
password=printer.plug_password
)
if success:
# Status in Datenbank aktualisieren
printer.status = "online" if state else "offline"
printer.last_checked = datetime.now()
db_session.commit()
self.logger.info(f"✅ Status für Drucker {printer.name} aktualisiert: {'online' if state else 'offline'}")
db_session.close()
return success
except Exception as e:
action = "ein" if state else "aus"
self.logger.error(f"❌ Fehler beim {action}schalten der Steckdose für Drucker {printer_id}: {str(e)}")
try:
db_session.close()
except:
pass
return False
def _check_jobs(self) -> None:
"""
Überprüft und verwaltet Druckjobs:
- Startet anstehende Jobs
- Beendet abgelaufene Jobs
"""
db_session = get_db_session()
try:
now = datetime.now()
# 1. Anstehende Jobs starten
pending_jobs = db_session.query(Job).filter(
Job.status == "scheduled",
Job.start_at <= now
).all()
for job in pending_jobs:
self.logger.info(f"🚀 Starte Job {job.id}: {job.name}")
# Steckdose einschalten
if self.toggle_printer_plug(job.printer_id, True):
# Job als laufend markieren
job.status = "running"
db_session.commit()
self.logger.info(f"✅ Job {job.id} gestartet")
else:
self.logger.error(f"❌ Konnte Steckdose für Job {job.id} nicht einschalten")
# 2. Abgelaufene Jobs beenden
running_jobs = db_session.query(Job).filter(
Job.status == "running",
Job.end_at <= now
).all()
for job in running_jobs:
self.logger.info(f"🏁 Beende Job {job.id}: {job.name}")
# Steckdose ausschalten
if self.toggle_printer_plug(job.printer_id, False):
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = now
db_session.commit()
self.logger.info(f"✅ Job {job.id} beendet")
else:
self.logger.error(f"❌ Konnte Steckdose für Job {job.id} nicht ausschalten")
except Exception as e:
self.logger.error(f"❌ Fehler bei Überprüfung der Jobs: {str(e)}")
try:
db_session.rollback()
except:
pass
finally:
db_session.close()
def test_tapo_connection(ip_address: str, username: str = None, password: str = None) -> dict:
"""
Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose (optional)
password: Passwort für die Steckdose (optional)
Returns:
dict: Ergebnis mit Status und Informationen
"""
logger = get_logger("tapo")
result = {
"success": False,
"message": "",
"device_info": None,
"error": None
}
try:
# Importiere PyP100 für Tapo-Unterstützung
try:
from PyP100 import PyP100
except ImportError:
result["message"] = "PyP100-Modul nicht verfügbar"
result["error"] = "ModuleNotFound"
logger.error("PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen nicht testen")
return result
# Verwende globale Anmeldedaten falls nicht angegeben
if not username or not password:
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
username = TAPO_USERNAME
password = TAPO_PASSWORD
logger.debug(f"Verwende globale Tapo-Anmeldedaten für {ip_address}")
# TP-Link Tapo P100 Verbindung herstellen
p100 = PyP100.P100(ip_address, username, password)
p100.handshake() # Authentifizierung
p100.login() # Login
# Geräteinformationen abrufen
device_info = p100.getDeviceInfo()
result["success"] = True
result["message"] = "Verbindung erfolgreich"
result["device_info"] = device_info
logger.info(f"Tapo-Verbindung zu {ip_address} erfolgreich: {device_info.get('nickname', 'Unbekannt')}")
except Exception as e:
result["success"] = False
result["message"] = f"Verbindungsfehler: {str(e)}"
result["error"] = str(e)
logger.error(f"Fehler bei Tapo-Test zu {ip_address}: {str(e)}")
return result
# Scheduler-Instanz erzeugen
scheduler = BackgroundTaskScheduler()
# Standardaufgaben registrieren
scheduler.register_task("check_jobs", scheduler._check_jobs, interval=60)
# Alias für Kompatibilität
JobScheduler = BackgroundTaskScheduler
def get_job_scheduler() -> BackgroundTaskScheduler:
"""
Gibt den globalen Job-Scheduler zurück.
Returns:
BackgroundTaskScheduler: Der globale Scheduler
"""
return scheduler