528 lines
18 KiB
Python
528 lines
18 KiB
Python
import threading
|
|
import time
|
|
import logging
|
|
from typing import Dict, Callable, Any, List, Optional, Union
|
|
from datetime import datetime, timedelta
|
|
|
|
from PyP100 import PyP110
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from utils.logging_config import get_logger
|
|
from models import Job, Printer, get_db_session
|
|
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
|
|
|
|
# Lazy logger initialization
|
|
_logger = None
|
|
|
|
def get_scheduler_logger():
|
|
"""Lazy initialization of the scheduler logger."""
|
|
global _logger
|
|
if _logger is None:
|
|
_logger = get_logger("scheduler")
|
|
return _logger
|
|
|
|
class BackgroundTaskScheduler:
|
|
"""
|
|
Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt.
|
|
Tasks können als Platzhalter registriert und später konfiguriert werden.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._tasks: Dict[str, Dict[str, Any]] = {}
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
self._running = False
|
|
self._start_time: Optional[datetime] = None
|
|
self.logger = get_scheduler_logger()
|
|
|
|
def register_task(self,
|
|
task_id: str,
|
|
func: Callable,
|
|
interval: int = 60,
|
|
args: List = None,
|
|
kwargs: Dict = None,
|
|
enabled: bool = True) -> bool:
|
|
"""
|
|
Registriert eine neue Hintergrund-Task.
|
|
|
|
Args:
|
|
task_id: Eindeutige ID für die Task
|
|
func: Die auszuführende Funktion
|
|
interval: Intervall in Sekunden zwischen den Ausführungen
|
|
args: Positionsargumente für die Funktion
|
|
kwargs: Schlüsselwortargumente für die Funktion
|
|
enabled: Ob die Task aktiviert sein soll
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn die ID bereits existiert
|
|
"""
|
|
if task_id in self._tasks:
|
|
self.logger.error(f"Task mit ID {task_id} existiert bereits")
|
|
return False
|
|
|
|
self._tasks[task_id] = {
|
|
"func": func,
|
|
"interval": interval,
|
|
"args": args or [],
|
|
"kwargs": kwargs or {},
|
|
"enabled": enabled,
|
|
"last_run": None,
|
|
"next_run": datetime.now() if enabled else None
|
|
}
|
|
|
|
self.logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}")
|
|
return True
|
|
|
|
def update_task(self,
|
|
task_id: str,
|
|
interval: Optional[int] = None,
|
|
args: Optional[List] = None,
|
|
kwargs: Optional[Dict] = None,
|
|
enabled: Optional[bool] = None) -> bool:
|
|
"""
|
|
Aktualisiert die Konfiguration einer bestehenden Task.
|
|
|
|
Args:
|
|
task_id: ID der zu aktualisierenden Task
|
|
interval: Neues Intervall in Sekunden
|
|
args: Neue Positionsargumente
|
|
kwargs: Neue Schlüsselwortargumente
|
|
enabled: Neuer Aktivierungsstatus
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn die ID nicht existiert
|
|
"""
|
|
if task_id not in self._tasks:
|
|
self.logger.error(f"Task mit ID {task_id} existiert nicht")
|
|
return False
|
|
|
|
task = self._tasks[task_id]
|
|
|
|
if interval is not None:
|
|
task["interval"] = interval
|
|
|
|
if args is not None:
|
|
task["args"] = args
|
|
|
|
if kwargs is not None:
|
|
task["kwargs"] = kwargs
|
|
|
|
if enabled is not None and enabled != task["enabled"]:
|
|
task["enabled"] = enabled
|
|
if enabled:
|
|
task["next_run"] = datetime.now()
|
|
else:
|
|
task["next_run"] = None
|
|
|
|
self.logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}")
|
|
return True
|
|
|
|
def remove_task(self, task_id: str) -> bool:
|
|
"""
|
|
Entfernt eine Task aus dem Scheduler.
|
|
|
|
Args:
|
|
task_id: ID der zu entfernenden Task
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn die ID nicht existiert
|
|
"""
|
|
if task_id not in self._tasks:
|
|
self.logger.error(f"Task mit ID {task_id} existiert nicht")
|
|
return False
|
|
|
|
del self._tasks[task_id]
|
|
self.logger.info(f"Task {task_id} entfernt")
|
|
return True
|
|
|
|
def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]:
|
|
"""
|
|
Gibt Informationen zu einer Task oder allen Tasks zurück.
|
|
|
|
Args:
|
|
task_id: ID der Task oder None für alle Tasks
|
|
|
|
Returns:
|
|
Dict oder List: Task-Informationen
|
|
"""
|
|
if task_id is not None:
|
|
if task_id not in self._tasks:
|
|
return {}
|
|
|
|
task = self._tasks[task_id]
|
|
return {
|
|
"id": task_id,
|
|
"interval": task["interval"],
|
|
"enabled": task["enabled"],
|
|
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
|
|
"next_run": task["next_run"].isoformat() if task["next_run"] else None
|
|
}
|
|
|
|
return [
|
|
{
|
|
"id": tid,
|
|
"interval": task["interval"],
|
|
"enabled": task["enabled"],
|
|
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
|
|
"next_run": task["next_run"].isoformat() if task["next_run"] else None
|
|
}
|
|
for tid, task in self._tasks.items()
|
|
]
|
|
|
|
def get_tasks(self) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Gibt alle Tasks mit ihren Konfigurationen zurück.
|
|
|
|
Returns:
|
|
Dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte
|
|
"""
|
|
return {
|
|
task_id: {
|
|
"interval": task["interval"],
|
|
"enabled": task["enabled"],
|
|
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
|
|
"next_run": task["next_run"].isoformat() if task["next_run"] else None
|
|
}
|
|
for task_id, task in self._tasks.items()
|
|
}
|
|
|
|
def get_uptime(self) -> Optional[str]:
|
|
"""
|
|
Gibt die Laufzeit des Schedulers seit dem Start zurück.
|
|
|
|
Returns:
|
|
str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft
|
|
"""
|
|
if not self._running or not self._start_time:
|
|
return None
|
|
|
|
uptime = datetime.now() - self._start_time
|
|
days = uptime.days
|
|
hours, remainder = divmod(uptime.seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
|
if days > 0:
|
|
return f"{days} Tage, {hours} Stunden, {minutes} Minuten"
|
|
elif hours > 0:
|
|
return f"{hours} Stunden, {minutes} Minuten"
|
|
else:
|
|
return f"{minutes} Minuten, {seconds} Sekunden"
|
|
|
|
def start(self) -> bool:
|
|
"""
|
|
Startet den Scheduler.
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
|
|
"""
|
|
if self._running:
|
|
self.logger.warning("Scheduler läuft bereits")
|
|
return False
|
|
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._run)
|
|
self._thread.daemon = True
|
|
self._thread.start()
|
|
self._running = True
|
|
self._start_time = datetime.now()
|
|
|
|
self.logger.info("Scheduler gestartet")
|
|
return True
|
|
|
|
def stop(self) -> bool:
|
|
"""
|
|
Stoppt den Scheduler.
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
|
|
"""
|
|
if not self._running:
|
|
self.logger.warning("Scheduler läuft nicht")
|
|
return False
|
|
|
|
self._stop_event.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=5.0)
|
|
|
|
self._running = False
|
|
self._start_time = None
|
|
self.logger.info("Scheduler gestoppt")
|
|
return True
|
|
|
|
def is_running(self) -> bool:
|
|
"""
|
|
Prüft, ob der Scheduler läuft.
|
|
|
|
Returns:
|
|
bool: True wenn der Scheduler läuft, sonst False
|
|
"""
|
|
return self._running
|
|
|
|
def _run(self) -> None:
|
|
"""Hauptloop des Schedulers."""
|
|
self.logger.info("Scheduler-Thread gestartet")
|
|
|
|
while not self._stop_event.is_set():
|
|
now = datetime.now()
|
|
|
|
for task_id, task in self._tasks.items():
|
|
if not task["enabled"] or not task["next_run"]:
|
|
continue
|
|
|
|
if now >= task["next_run"]:
|
|
try:
|
|
self.logger.debug(f"Führe Task {task_id} aus")
|
|
task["func"](*task["args"], **task["kwargs"])
|
|
task["last_run"] = now
|
|
task["next_run"] = now + timedelta(seconds=task["interval"])
|
|
self.logger.debug(f"Task {task_id} erfolgreich ausgeführt, nächste Ausführung: {task['next_run']}")
|
|
except Exception as e:
|
|
self.logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}")
|
|
# Trotzdem nächste Ausführung planen
|
|
task["next_run"] = now + timedelta(seconds=task["interval"])
|
|
|
|
# Schlafenszeit berechnen (1 Sekunde oder weniger)
|
|
time.sleep(1)
|
|
|
|
self.logger.info("Scheduler-Thread beendet")
|
|
|
|
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool:
|
|
"""
|
|
Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus.
|
|
|
|
Args:
|
|
ip: IP-Adresse der Steckdose
|
|
state: True = Ein, False = Aus
|
|
username: Benutzername für die Steckdose (optional)
|
|
password: Passwort für die Steckdose (optional)
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich geschaltet
|
|
"""
|
|
try:
|
|
# PyP100 importieren
|
|
try:
|
|
from PyP100 import PyP100
|
|
except ImportError:
|
|
self.logger.error("❌ PyP100-Modul nicht installiert - Steckdose kann nicht geschaltet werden")
|
|
return False
|
|
|
|
# Anmeldedaten aus Einstellungen verwenden, falls nicht angegeben
|
|
if not username or not password:
|
|
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
|
|
username = TAPO_USERNAME
|
|
password = TAPO_PASSWORD
|
|
self.logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip}")
|
|
|
|
# P100-Verbindung herstellen (P100 statt P110 verwenden)
|
|
p100 = PyP100.P100(ip, username, password)
|
|
|
|
# Handshake und Login durchführen
|
|
p100.handshake()
|
|
p100.login()
|
|
|
|
# Steckdose schalten
|
|
if state:
|
|
p100.turnOn()
|
|
self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet")
|
|
else:
|
|
p100.turnOff()
|
|
self.logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
action = "ein" if state else "aus"
|
|
self.logger.error(f"❌ Fehler beim {action}schalten der Tapo-Steckdose {ip}: {str(e)}")
|
|
return False
|
|
|
|
def toggle_printer_plug(self, printer_id: int, state: bool) -> bool:
|
|
"""
|
|
Schaltet die Steckdose eines Druckers ein oder aus.
|
|
|
|
Args:
|
|
printer_id: ID des Druckers
|
|
state: True für ein, False für aus
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn fehlgeschlagen
|
|
"""
|
|
try:
|
|
# Drucker aus Datenbank holen
|
|
db_session = get_db_session()
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
|
|
if not printer:
|
|
self.logger.error(f"❌ Drucker mit ID {printer_id} nicht gefunden")
|
|
db_session.close()
|
|
return False
|
|
|
|
# Konfiguration validieren
|
|
if not printer.plug_ip:
|
|
self.logger.error(f"❌ Unvollständige Steckdosen-Konfiguration für Drucker {printer.name}")
|
|
db_session.close()
|
|
return False
|
|
|
|
# Steckdose schalten
|
|
success = self.toggle_plug(
|
|
ip=printer.plug_ip,
|
|
state=state,
|
|
username=printer.plug_username,
|
|
password=printer.plug_password
|
|
)
|
|
|
|
if success:
|
|
# Status in Datenbank aktualisieren
|
|
printer.status = "online" if state else "offline"
|
|
printer.last_checked = datetime.now()
|
|
db_session.commit()
|
|
self.logger.info(f"✅ Status für Drucker {printer.name} aktualisiert: {'online' if state else 'offline'}")
|
|
|
|
db_session.close()
|
|
return success
|
|
|
|
except Exception as e:
|
|
action = "ein" if state else "aus"
|
|
self.logger.error(f"❌ Fehler beim {action}schalten der Steckdose für Drucker {printer_id}: {str(e)}")
|
|
try:
|
|
db_session.close()
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
def _check_jobs(self) -> None:
|
|
"""
|
|
Überprüft und verwaltet Druckjobs:
|
|
- Startet anstehende Jobs
|
|
- Beendet abgelaufene Jobs
|
|
"""
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
now = datetime.now()
|
|
|
|
# 1. Anstehende Jobs starten
|
|
pending_jobs = db_session.query(Job).filter(
|
|
Job.status == "scheduled",
|
|
Job.start_at <= now
|
|
).all()
|
|
|
|
for job in pending_jobs:
|
|
self.logger.info(f"🚀 Starte Job {job.id}: {job.name}")
|
|
|
|
# Steckdose einschalten
|
|
if self.toggle_printer_plug(job.printer_id, True):
|
|
# Job als laufend markieren
|
|
job.status = "running"
|
|
db_session.commit()
|
|
self.logger.info(f"✅ Job {job.id} gestartet")
|
|
else:
|
|
self.logger.error(f"❌ Konnte Steckdose für Job {job.id} nicht einschalten")
|
|
|
|
# 2. Abgelaufene Jobs beenden
|
|
running_jobs = db_session.query(Job).filter(
|
|
Job.status == "running",
|
|
Job.end_at <= now
|
|
).all()
|
|
|
|
for job in running_jobs:
|
|
self.logger.info(f"🏁 Beende Job {job.id}: {job.name}")
|
|
|
|
# Steckdose ausschalten
|
|
if self.toggle_printer_plug(job.printer_id, False):
|
|
# Job als beendet markieren
|
|
job.status = "finished"
|
|
job.actual_end_time = now
|
|
db_session.commit()
|
|
self.logger.info(f"✅ Job {job.id} beendet")
|
|
else:
|
|
self.logger.error(f"❌ Konnte Steckdose für Job {job.id} nicht ausschalten")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Fehler bei Überprüfung der Jobs: {str(e)}")
|
|
try:
|
|
db_session.rollback()
|
|
except:
|
|
pass
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
def test_tapo_connection(ip_address: str, username: str = None, password: str = None) -> dict:
|
|
"""
|
|
Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose.
|
|
|
|
Args:
|
|
ip_address: IP-Adresse der Steckdose
|
|
username: Benutzername für die Steckdose (optional)
|
|
password: Passwort für die Steckdose (optional)
|
|
|
|
Returns:
|
|
dict: Ergebnis mit Status und Informationen
|
|
"""
|
|
logger = get_logger("tapo")
|
|
result = {
|
|
"success": False,
|
|
"message": "",
|
|
"device_info": None,
|
|
"error": None
|
|
}
|
|
|
|
try:
|
|
# Importiere PyP100 für Tapo-Unterstützung
|
|
try:
|
|
from PyP100 import PyP100
|
|
except ImportError:
|
|
result["message"] = "PyP100-Modul nicht verfügbar"
|
|
result["error"] = "ModuleNotFound"
|
|
logger.error("PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen nicht testen")
|
|
return result
|
|
|
|
# Verwende globale Anmeldedaten falls nicht angegeben
|
|
if not username or not password:
|
|
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
|
|
username = TAPO_USERNAME
|
|
password = TAPO_PASSWORD
|
|
logger.debug(f"Verwende globale Tapo-Anmeldedaten für {ip_address}")
|
|
|
|
# TP-Link Tapo P100 Verbindung herstellen
|
|
p100 = PyP100.P100(ip_address, username, password)
|
|
p100.handshake() # Authentifizierung
|
|
p100.login() # Login
|
|
|
|
# Geräteinformationen abrufen
|
|
device_info = p100.getDeviceInfo()
|
|
|
|
result["success"] = True
|
|
result["message"] = "Verbindung erfolgreich"
|
|
result["device_info"] = device_info
|
|
|
|
logger.info(f"Tapo-Verbindung zu {ip_address} erfolgreich: {device_info.get('nickname', 'Unbekannt')}")
|
|
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["message"] = f"Verbindungsfehler: {str(e)}"
|
|
result["error"] = str(e)
|
|
logger.error(f"Fehler bei Tapo-Test zu {ip_address}: {str(e)}")
|
|
|
|
return result
|
|
|
|
|
|
# Scheduler-Instanz erzeugen
|
|
scheduler = BackgroundTaskScheduler()
|
|
|
|
# Standardaufgaben registrieren
|
|
scheduler.register_task("check_jobs", scheduler._check_jobs, interval=60)
|
|
|
|
# Alias für Kompatibilität
|
|
JobScheduler = BackgroundTaskScheduler
|
|
|
|
def get_job_scheduler() -> BackgroundTaskScheduler:
|
|
"""
|
|
Gibt den globalen Job-Scheduler zurück.
|
|
|
|
Returns:
|
|
BackgroundTaskScheduler: Der globale Scheduler
|
|
"""
|
|
return scheduler |