- Introduced a lazy initialization method for the scheduler logger to optimize logging performance. - Updated logger usage in the BackgroundTaskScheduler class to utilize the new lazy logger method.
244 lines
8.0 KiB
Python
244 lines
8.0 KiB
Python
import threading
|
|
import time
|
|
import logging
|
|
from typing import Dict, Callable, Any, List, Optional, Union
|
|
from datetime import datetime, timedelta
|
|
|
|
from utils.logging_config import get_logger
|
|
|
|
# Lazy logger initialization
|
|
_logger = None
|
|
|
|
def get_scheduler_logger():
|
|
"""Lazy initialization of the scheduler logger."""
|
|
global _logger
|
|
if _logger is None:
|
|
_logger = get_logger("scheduler")
|
|
return _logger
|
|
|
|
class BackgroundTaskScheduler:
|
|
"""
|
|
Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt.
|
|
Tasks können als Platzhalter registriert und später konfiguriert werden.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._tasks: Dict[str, Dict[str, Any]] = {}
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
self._running = False
|
|
|
|
def register_task(self,
|
|
task_id: str,
|
|
func: Callable,
|
|
interval: int = 60,
|
|
args: List = None,
|
|
kwargs: Dict = None,
|
|
enabled: bool = True) -> bool:
|
|
"""
|
|
Registriert eine neue Hintergrund-Task.
|
|
|
|
Args:
|
|
task_id: Eindeutige ID für die Task
|
|
func: Die auszuführende Funktion
|
|
interval: Intervall in Sekunden zwischen den Ausführungen
|
|
args: Positionsargumente für die Funktion
|
|
kwargs: Schlüsselwortargumente für die Funktion
|
|
enabled: Ob die Task aktiviert sein soll
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn die ID bereits existiert
|
|
"""
|
|
logger = get_scheduler_logger()
|
|
if task_id in self._tasks:
|
|
logger.error(f"Task mit ID {task_id} existiert bereits")
|
|
return False
|
|
|
|
self._tasks[task_id] = {
|
|
"func": func,
|
|
"interval": interval,
|
|
"args": args or [],
|
|
"kwargs": kwargs or {},
|
|
"enabled": enabled,
|
|
"last_run": None,
|
|
"next_run": datetime.now() if enabled else None
|
|
}
|
|
|
|
logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}")
|
|
return True
|
|
|
|
def update_task(self,
|
|
task_id: str,
|
|
interval: Optional[int] = None,
|
|
args: Optional[List] = None,
|
|
kwargs: Optional[Dict] = None,
|
|
enabled: Optional[bool] = None) -> bool:
|
|
"""
|
|
Aktualisiert die Konfiguration einer bestehenden Task.
|
|
|
|
Args:
|
|
task_id: ID der zu aktualisierenden Task
|
|
interval: Neues Intervall in Sekunden
|
|
args: Neue Positionsargumente
|
|
kwargs: Neue Schlüsselwortargumente
|
|
enabled: Neuer Aktivierungsstatus
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn die ID nicht existiert
|
|
"""
|
|
logger = get_scheduler_logger()
|
|
if task_id not in self._tasks:
|
|
logger.error(f"Task mit ID {task_id} existiert nicht")
|
|
return False
|
|
|
|
task = self._tasks[task_id]
|
|
|
|
if interval is not None:
|
|
task["interval"] = interval
|
|
|
|
if args is not None:
|
|
task["args"] = args
|
|
|
|
if kwargs is not None:
|
|
task["kwargs"] = kwargs
|
|
|
|
if enabled is not None and enabled != task["enabled"]:
|
|
task["enabled"] = enabled
|
|
if enabled:
|
|
task["next_run"] = datetime.now()
|
|
else:
|
|
task["next_run"] = None
|
|
|
|
logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}")
|
|
return True
|
|
|
|
def remove_task(self, task_id: str) -> bool:
|
|
"""
|
|
Entfernt eine Task aus dem Scheduler.
|
|
|
|
Args:
|
|
task_id: ID der zu entfernenden Task
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich, False wenn die ID nicht existiert
|
|
"""
|
|
logger = get_scheduler_logger()
|
|
if task_id not in self._tasks:
|
|
logger.error(f"Task mit ID {task_id} existiert nicht")
|
|
return False
|
|
|
|
del self._tasks[task_id]
|
|
logger.info(f"Task {task_id} entfernt")
|
|
return True
|
|
|
|
def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]:
|
|
"""
|
|
Gibt Informationen zu einer Task oder allen Tasks zurück.
|
|
|
|
Args:
|
|
task_id: ID der Task oder None für alle Tasks
|
|
|
|
Returns:
|
|
Dict oder List: Task-Informationen
|
|
"""
|
|
if task_id is not None:
|
|
if task_id not in self._tasks:
|
|
return {}
|
|
|
|
task = self._tasks[task_id]
|
|
return {
|
|
"id": task_id,
|
|
"interval": task["interval"],
|
|
"enabled": task["enabled"],
|
|
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
|
|
"next_run": task["next_run"].isoformat() if task["next_run"] else None
|
|
}
|
|
|
|
return [
|
|
{
|
|
"id": tid,
|
|
"interval": task["interval"],
|
|
"enabled": task["enabled"],
|
|
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
|
|
"next_run": task["next_run"].isoformat() if task["next_run"] else None
|
|
}
|
|
for tid, task in self._tasks.items()
|
|
]
|
|
|
|
def start(self) -> bool:
|
|
"""
|
|
Startet den Scheduler.
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
|
|
"""
|
|
logger = get_scheduler_logger()
|
|
if self._running:
|
|
logger.warning("Scheduler läuft bereits")
|
|
return False
|
|
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._run)
|
|
self._thread.daemon = True
|
|
self._thread.start()
|
|
self._running = True
|
|
|
|
logger.info("Scheduler gestartet")
|
|
return True
|
|
|
|
def stop(self) -> bool:
|
|
"""
|
|
Stoppt den Scheduler.
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
|
|
"""
|
|
logger = get_scheduler_logger()
|
|
if not self._running:
|
|
logger.warning("Scheduler läuft nicht")
|
|
return False
|
|
|
|
self._stop_event.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=5.0)
|
|
|
|
self._running = False
|
|
logger.info("Scheduler gestoppt")
|
|
return True
|
|
|
|
def is_running(self) -> bool:
|
|
"""
|
|
Prüft, ob der Scheduler läuft.
|
|
|
|
Returns:
|
|
bool: True wenn der Scheduler läuft, sonst False
|
|
"""
|
|
return self._running
|
|
|
|
def _run(self) -> None:
|
|
"""Interne Methode zum Ausführen des Scheduler-Loops."""
|
|
while not self._stop_event.is_set():
|
|
now = datetime.now()
|
|
|
|
for task_id, task in self._tasks.items():
|
|
if not task["enabled"] or not task["next_run"]:
|
|
continue
|
|
|
|
if now >= task["next_run"]:
|
|
logger = get_scheduler_logger()
|
|
logger.info(f"Ausführung von Task {task_id}")
|
|
|
|
try:
|
|
task["func"](*task["args"], **task["kwargs"])
|
|
logger.info(f"Task {task_id} erfolgreich ausgeführt")
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}")
|
|
|
|
task["last_run"] = now
|
|
task["next_run"] = now + timedelta(seconds=task["interval"])
|
|
|
|
# 1 Sekunde warten und erneut prüfen
|
|
self._stop_event.wait(1)
|
|
|
|
# Singleton-Instanz
|
|
scheduler = BackgroundTaskScheduler() |