Projektarbeit-MYP/backend/app/utils/job_scheduler.py

485 lines
16 KiB
Python

import threading
import time
import logging
from typing import Dict, Callable, Any, List, Optional, Union
from datetime import datetime, timedelta
from PyP100 import PyP110
from sqlalchemy.orm import joinedload
from utils.logging_config import get_logger
from models import Job, Printer, get_db_session
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
# Lazy logger initialization
_logger = None
def get_scheduler_logger():
"""Lazy initialization of the scheduler logger."""
global _logger
if _logger is None:
_logger = get_logger("scheduler")
return _logger
class BackgroundTaskScheduler:
"""
Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt.
Tasks können als Platzhalter registriert und später konfiguriert werden.
"""
def __init__(self):
self._tasks: Dict[str, Dict[str, Any]] = {}
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._running = False
self._start_time: Optional[datetime] = None
def register_task(self,
task_id: str,
func: Callable,
interval: int = 60,
args: List = None,
kwargs: Dict = None,
enabled: bool = True) -> bool:
"""
Registriert eine neue Hintergrund-Task.
Args:
task_id: Eindeutige ID für die Task
func: Die auszuführende Funktion
interval: Intervall in Sekunden zwischen den Ausführungen
args: Positionsargumente für die Funktion
kwargs: Schlüsselwortargumente für die Funktion
enabled: Ob die Task aktiviert sein soll
Returns:
bool: True wenn erfolgreich, False wenn die ID bereits existiert
"""
logger = get_scheduler_logger()
if task_id in self._tasks:
logger.error(f"Task mit ID {task_id} existiert bereits")
return False
self._tasks[task_id] = {
"func": func,
"interval": interval,
"args": args or [],
"kwargs": kwargs or {},
"enabled": enabled,
"last_run": None,
"next_run": datetime.now() if enabled else None
}
logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}")
return True
def update_task(self,
task_id: str,
interval: Optional[int] = None,
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
enabled: Optional[bool] = None) -> bool:
"""
Aktualisiert die Konfiguration einer bestehenden Task.
Args:
task_id: ID der zu aktualisierenden Task
interval: Neues Intervall in Sekunden
args: Neue Positionsargumente
kwargs: Neue Schlüsselwortargumente
enabled: Neuer Aktivierungsstatus
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
logger = get_scheduler_logger()
if task_id not in self._tasks:
logger.error(f"Task mit ID {task_id} existiert nicht")
return False
task = self._tasks[task_id]
if interval is not None:
task["interval"] = interval
if args is not None:
task["args"] = args
if kwargs is not None:
task["kwargs"] = kwargs
if enabled is not None and enabled != task["enabled"]:
task["enabled"] = enabled
if enabled:
task["next_run"] = datetime.now()
else:
task["next_run"] = None
logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}")
return True
def remove_task(self, task_id: str) -> bool:
"""
Entfernt eine Task aus dem Scheduler.
Args:
task_id: ID der zu entfernenden Task
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
logger = get_scheduler_logger()
if task_id not in self._tasks:
logger.error(f"Task mit ID {task_id} existiert nicht")
return False
del self._tasks[task_id]
logger.info(f"Task {task_id} entfernt")
return True
def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]:
"""
Gibt Informationen zu einer Task oder allen Tasks zurück.
Args:
task_id: ID der Task oder None für alle Tasks
Returns:
Dict oder List: Task-Informationen
"""
if task_id is not None:
if task_id not in self._tasks:
return {}
task = self._tasks[task_id]
return {
"id": task_id,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
return [
{
"id": tid,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for tid, task in self._tasks.items()
]
def get_tasks(self) -> Dict[str, Dict[str, Any]]:
"""
Gibt alle Tasks mit ihren Konfigurationen zurück.
Returns:
Dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte
"""
return {
task_id: {
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for task_id, task in self._tasks.items()
}
def get_uptime(self) -> Optional[str]:
"""
Gibt die Laufzeit des Schedulers seit dem Start zurück.
Returns:
str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft
"""
if not self._running or not self._start_time:
return None
uptime = datetime.now() - self._start_time
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
return f"{days} Tage, {hours} Stunden, {minutes} Minuten"
elif hours > 0:
return f"{hours} Stunden, {minutes} Minuten"
else:
return f"{minutes} Minuten, {seconds} Sekunden"
def start(self) -> bool:
"""
Startet den Scheduler.
Returns:
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
"""
logger = get_scheduler_logger()
if self._running:
logger.warning("Scheduler läuft bereits")
return False
self._stop_event.clear()
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
self._running = True
self._start_time = datetime.now()
logger.info("Scheduler gestartet")
return True
def stop(self) -> bool:
"""
Stoppt den Scheduler.
Returns:
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
"""
logger = get_scheduler_logger()
if not self._running:
logger.warning("Scheduler läuft nicht")
return False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5.0)
self._running = False
self._start_time = None
logger.info("Scheduler gestoppt")
return True
def is_running(self) -> bool:
"""
Prüft, ob der Scheduler läuft.
Returns:
bool: True wenn der Scheduler läuft, sonst False
"""
return self._running
def _run(self) -> None:
"""Hauptloop des Schedulers."""
logger = get_scheduler_logger()
logger.info("Scheduler-Thread gestartet")
while not self._stop_event.is_set():
now = datetime.now()
for task_id, task in self._tasks.items():
if not task["enabled"] or not task["next_run"]:
continue
if now >= task["next_run"]:
try:
logger.debug(f"Führe Task {task_id} aus")
task["func"](*task["args"], **task["kwargs"])
task["last_run"] = now
task["next_run"] = now + timedelta(seconds=task["interval"])
logger.debug(f"Task {task_id} erfolgreich ausgeführt, nächste Ausführung: {task['next_run']}")
except Exception as e:
logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}")
# Trotzdem nächste Ausführung planen
task["next_run"] = now + timedelta(seconds=task["interval"])
# Schlafenszeit berechnen (1 Sekunde oder weniger)
time.sleep(1)
logger.info("Scheduler-Thread beendet")
def toggle_plug(printer_id: int, state: bool) -> bool:
"""
Schaltet eine Tapo-Steckdose ein oder aus.
Args:
printer_id: ID des Druckers
state: True für ein, False für aus
Returns:
bool: True wenn erfolgreich, False wenn fehlgeschlagen
"""
logger = get_logger("printers")
db_session = get_db_session()
try:
printer = db_session.query(Printer).get(printer_id)
if not printer:
logger.error(f"Drucker mit ID {printer_id} nicht gefunden")
db_session.close()
return False
# Konfiguration validieren und Fallback verwenden
plug_ip = printer.plug_ip
plug_username = printer.plug_username or TAPO_USERNAME
plug_password = printer.plug_password or TAPO_PASSWORD
if not plug_ip:
logger.error(f"Keine Steckdosen-IP für Drucker {printer.name} (ID: {printer_id}) konfiguriert")
db_session.close()
return False
if not plug_username or not plug_password:
logger.error(f"Unvollständige Tapo-Konfiguration für Drucker {printer.name} (ID: {printer_id}) - verwende globale Anmeldedaten")
# TP-Link Tapo P110 Verbindung herstellen
logger.debug(f"Verbinde zu Tapo-Steckdose {plug_ip} für Drucker {printer.name}")
p110 = PyP110.P110(plug_ip, plug_username, plug_password)
p110.handshake() # Authentifizierung
p110.login() # Login
# Steckdose ein-/ausschalten
if state:
p110.turnOn()
logger.info(f"✅ Steckdose für Drucker {printer.name} (ID: {printer_id}) eingeschaltet")
else:
p110.turnOff()
logger.info(f"✅ Steckdose für Drucker {printer.name} (ID: {printer_id}) ausgeschaltet")
db_session.close()
return True
except Exception as e:
logger.error(f"❌ Fehler beim Schalten der Steckdose für Drucker {printer_id}: {str(e)}")
db_session.close()
return False
def test_tapo_connection(ip_address: str, username: str = None, password: str = None) -> dict:
"""
Testet die Verbindung zu einer Tapo-Steckdose und gibt detaillierte Informationen zurück.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername (optional, verwendet globale Konfiguration als Fallback)
password: Passwort (optional, verwendet globale Konfiguration als Fallback)
Returns:
dict: Testergebnis mit Status und Informationen
"""
logger = get_logger("printers")
result = {
"success": False,
"error": None,
"device_info": None,
"status": "unknown"
}
# Fallback zu globalen Anmeldedaten
if not username or not password:
username = TAPO_USERNAME
password = TAPO_PASSWORD
logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip_address}")
try:
logger.debug(f"Teste Tapo-Verbindung zu {ip_address}")
p110 = PyP110.P110(ip_address, username, password)
p110.handshake() # Authentifizierung
p110.login() # Login
# Geräteinformationen abrufen
device_info = p110.getDeviceInfo()
result["device_info"] = device_info
result["status"] = "on" if device_info.get('device_on', False) else "off"
result["success"] = True
logger.debug(f"✅ Tapo-Verbindung zu {ip_address} erfolgreich")
except Exception as e:
result["error"] = str(e)
logger.warning(f"❌ Tapo-Verbindung zu {ip_address} fehlgeschlagen: {str(e)}")
return result
def check_jobs():
"""
Überprüft alle geplanten und laufenden Jobs und schaltet Steckdosen entsprechend.
Diese Funktion wird vom Scheduler regelmäßig aufgerufen und:
1. Prüft, ob geplante Jobs gestartet werden müssen
2. Prüft, ob laufende Jobs beendet werden müssen
3. Aktualisiert den Status der Jobs
"""
logger = get_logger("jobs")
db_session = get_db_session()
try:
now = datetime.now()
# Geplante Jobs abrufen (mit 5 Minuten Puffer für vergangene Jobs)
scheduled_jobs = db_session.query(Job).options(
joinedload(Job.printer)
).filter(
Job.status == "scheduled",
Job.start_at <= now
).all()
# Laufende Jobs abrufen (mit 5 Minuten Sicherheitspuffer)
running_jobs = db_session.query(Job).options(
joinedload(Job.printer)
).filter(
Job.status == "running",
Job.end_at <= now - timedelta(minutes=5) # 5 Minuten Sicherheitspuffer
).all()
# Geplante Jobs starten
for job in scheduled_jobs:
logger.info(f"Starte geplanten Job {job.id}: {job.name} für Drucker {job.printer.name}")
# Steckdose einschalten
if toggle_plug(job.printer_id, True):
# Job als laufend markieren
job.status = "running"
job.end_at = job.start_at + timedelta(minutes=job.duration_minutes)
db_session.commit()
logger.info(f"Job {job.id} gestartet: läuft bis {job.end_at}")
else:
logger.error(f"Fehler beim Starten von Job {job.id}: Steckdose konnte nicht eingeschaltet werden")
# Beendete Jobs stoppen
for job in running_jobs:
logger.info(f"Beende laufenden Job {job.id}: {job.name} für Drucker {job.printer.name}")
# Steckdose ausschalten
if toggle_plug(job.printer_id, False):
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = now
db_session.commit()
logger.info(f"Job {job.id} beendet: tatsächliche Endzeit {job.actual_end_time}")
else:
logger.error(f"Fehler beim Beenden von Job {job.id}: Steckdose konnte nicht ausgeschaltet werden")
db_session.close()
except Exception as e:
logger.error(f"Fehler im Job-Scheduler: {str(e)}")
db_session.close()
# Globaler Scheduler
scheduler = BackgroundTaskScheduler()
# Job-Überprüfungs-Task registrieren (alle 60 Sekunden)
scheduler.register_task(
task_id="check_jobs",
func=check_jobs,
interval=60,
enabled=True
)
# Alias für Kompatibilität
JobScheduler = BackgroundTaskScheduler
def get_job_scheduler() -> BackgroundTaskScheduler:
"""
Gibt den globalen Job-Scheduler zurück.
Returns:
BackgroundTaskScheduler: Der globale Scheduler
"""
return scheduler