"feat: Enhanced queue management in backend/app/utils/queue_manager.py"
This commit is contained in:
parent
51601516f4
commit
4f7602b047
@ -1 +1,264 @@
|
||||
|
||||
"""
|
||||
Queue Manager für die Verwaltung von Druckjobs in Warteschlangen.
|
||||
Überwacht offline Drucker und aktiviert Jobs automatisch.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Dict, Optional
|
||||
from contextlib import contextmanager
|
||||
|
||||
from models import get_db_session, Job, Printer, User, Notification
|
||||
from utils.logging_config import get_logger
|
||||
from utils.printer_status import check_printer_status
|
||||
|
||||
# Logger für Queue-Manager
|
||||
queue_logger = get_logger("queue_manager")
|
||||
|
||||
class PrinterQueueManager:
|
||||
"""
|
||||
Verwaltet die Warteschlangen für offline Drucker und überwacht deren Status.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.is_running = False
|
||||
self.monitor_thread = None
|
||||
self.check_interval = 120 # 2 Minuten zwischen Status-Checks
|
||||
self.last_status_cache = {} # Cache für letzten bekannten Status
|
||||
self.notification_cooldown = {} # Verhindert Spam-Benachrichtigungen
|
||||
|
||||
def start(self):
|
||||
"""Startet den Queue-Manager."""
|
||||
if not self.is_running:
|
||||
self.is_running = True
|
||||
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
||||
self.monitor_thread.start()
|
||||
queue_logger.info("✅ Printer Queue Manager erfolgreich gestartet")
|
||||
|
||||
def stop(self):
|
||||
"""Stoppt den Queue-Manager."""
|
||||
self.is_running = False
|
||||
if self.monitor_thread and self.monitor_thread.is_alive():
|
||||
self.monitor_thread.join(timeout=5)
|
||||
queue_logger.info("❌ Printer Queue Manager gestoppt")
|
||||
|
||||
def _monitor_loop(self):
|
||||
"""Hauptschleife für die Überwachung der Drucker."""
|
||||
queue_logger.info(f"🔄 Queue-Überwachung gestartet (Intervall: {self.check_interval} Sekunden)")
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
self._check_waiting_jobs()
|
||||
time.sleep(self.check_interval)
|
||||
except Exception as e:
|
||||
queue_logger.error(f"❌ Fehler in Monitor-Schleife: {str(e)}")
|
||||
time.sleep(30) # Kürzere Wartezeit bei Fehlern
|
||||
|
||||
def _check_waiting_jobs(self):
|
||||
"""Überprüft alle wartenden Jobs und aktiviert sie bei verfügbaren Druckern."""
|
||||
db_session = get_db_session()
|
||||
|
||||
try:
|
||||
# Alle wartenden Jobs abrufen
|
||||
waiting_jobs = db_session.query(Job).filter(
|
||||
Job.status == "waiting_for_printer"
|
||||
).all()
|
||||
|
||||
if not waiting_jobs:
|
||||
return
|
||||
|
||||
queue_logger.info(f"🔍 Überprüfe {len(waiting_jobs)} wartende Jobs...")
|
||||
|
||||
activated_jobs = []
|
||||
|
||||
for job in waiting_jobs:
|
||||
# Drucker-Status prüfen
|
||||
printer = db_session.query(Printer).get(job.printer_id)
|
||||
if not printer:
|
||||
continue
|
||||
|
||||
# Status-Check mit Cache-Optimierung
|
||||
printer_key = f"printer_{printer.id}"
|
||||
current_status = None
|
||||
|
||||
try:
|
||||
if printer.plug_ip:
|
||||
status, active = check_printer_status(printer.plug_ip, timeout=5)
|
||||
current_status = "online" if (status == "online" and active) else "offline"
|
||||
else:
|
||||
current_status = "offline"
|
||||
|
||||
except Exception as e:
|
||||
queue_logger.warning(f"⚠️ Status-Check für Drucker {printer.name} fehlgeschlagen: {str(e)}")
|
||||
current_status = "offline"
|
||||
|
||||
# Prüfen, ob Drucker online geworden ist
|
||||
last_status = self.last_status_cache.get(printer_key, "offline")
|
||||
self.last_status_cache[printer_key] = current_status
|
||||
|
||||
if current_status == "online" and last_status == "offline":
|
||||
# Drucker ist online geworden!
|
||||
queue_logger.info(f"🟢 Drucker {printer.name} ist ONLINE geworden - aktiviere wartende Jobs")
|
||||
|
||||
# Job aktivieren
|
||||
job.status = "scheduled"
|
||||
printer.status = "available"
|
||||
printer.active = True
|
||||
printer.last_checked = datetime.now()
|
||||
|
||||
activated_jobs.append({
|
||||
"job": job,
|
||||
"printer": printer
|
||||
})
|
||||
|
||||
elif current_status == "online":
|
||||
# Drucker ist bereits online, Job kann aktiviert werden
|
||||
job.status = "scheduled"
|
||||
printer.status = "available"
|
||||
printer.active = True
|
||||
printer.last_checked = datetime.now()
|
||||
|
||||
activated_jobs.append({
|
||||
"job": job,
|
||||
"printer": printer
|
||||
})
|
||||
|
||||
else:
|
||||
# Drucker bleibt offline
|
||||
printer.status = "offline"
|
||||
printer.active = False
|
||||
printer.last_checked = datetime.now()
|
||||
|
||||
# Speichere alle Änderungen
|
||||
if activated_jobs:
|
||||
db_session.commit()
|
||||
queue_logger.info(f"✅ {len(activated_jobs)} Jobs erfolgreich aktiviert")
|
||||
|
||||
# Benachrichtigungen versenden
|
||||
for item in activated_jobs:
|
||||
self._send_job_activation_notification(item["job"], item["printer"])
|
||||
else:
|
||||
# Auch offline-Status speichern
|
||||
db_session.commit()
|
||||
|
||||
except Exception as e:
|
||||
db_session.rollback()
|
||||
queue_logger.error(f"❌ Fehler beim Überprüfen wartender Jobs: {str(e)}")
|
||||
finally:
|
||||
db_session.close()
|
||||
|
||||
def _send_job_activation_notification(self, job: Job, printer: Printer):
|
||||
"""Sendet eine Benachrichtigung, wenn ein Job aktiviert wird."""
|
||||
try:
|
||||
# Cooldown prüfen (keine Spam-Benachrichtigungen)
|
||||
cooldown_key = f"job_{job.id}_activated"
|
||||
now = datetime.now()
|
||||
|
||||
if cooldown_key in self.notification_cooldown:
|
||||
last_notification = self.notification_cooldown[cooldown_key]
|
||||
if (now - last_notification).total_seconds() < 300: # 5 Minuten Cooldown
|
||||
return
|
||||
|
||||
self.notification_cooldown[cooldown_key] = now
|
||||
|
||||
# Benachrichtigung erstellen
|
||||
db_session = get_db_session()
|
||||
|
||||
try:
|
||||
user = db_session.query(User).get(job.user_id)
|
||||
if not user:
|
||||
return
|
||||
|
||||
notification = Notification(
|
||||
user_id=user.id,
|
||||
type="job_activated",
|
||||
payload={
|
||||
"job_id": job.id,
|
||||
"job_name": job.name,
|
||||
"printer_id": printer.id,
|
||||
"printer_name": printer.name,
|
||||
"start_time": job.start_at.isoformat() if job.start_at else None,
|
||||
"message": f"🎉 Gute Nachrichten! Drucker '{printer.name}' ist online. Ihr Job '{job.name}' wurde aktiviert und startet bald."
|
||||
}
|
||||
)
|
||||
|
||||
db_session.add(notification)
|
||||
db_session.commit()
|
||||
|
||||
queue_logger.info(f"📧 Benachrichtigung für User {user.name} gesendet: Job {job.name} aktiviert")
|
||||
|
||||
except Exception as e:
|
||||
db_session.rollback()
|
||||
queue_logger.error(f"❌ Fehler beim Erstellen der Benachrichtigung: {str(e)}")
|
||||
finally:
|
||||
db_session.close()
|
||||
|
||||
except Exception as e:
|
||||
queue_logger.error(f"❌ Fehler beim Senden der Aktivierungs-Benachrichtigung: {str(e)}")
|
||||
|
||||
def get_queue_status(self) -> Dict:
|
||||
"""Gibt den aktuellen Status der Warteschlangen zurück."""
|
||||
db_session = get_db_session()
|
||||
|
||||
try:
|
||||
# Wartende Jobs zählen
|
||||
waiting_jobs = db_session.query(Job).filter(
|
||||
Job.status == "waiting_for_printer"
|
||||
).count()
|
||||
|
||||
# Offline Drucker mit wartenden Jobs
|
||||
offline_printers_with_queue = db_session.query(Printer).join(Job).filter(
|
||||
Printer.status == "offline",
|
||||
Job.status == "waiting_for_printer"
|
||||
).distinct().count()
|
||||
|
||||
# Online Drucker
|
||||
online_printers = db_session.query(Printer).filter(
|
||||
Printer.status == "available"
|
||||
).count()
|
||||
|
||||
total_printers = db_session.query(Printer).count()
|
||||
|
||||
return {
|
||||
"waiting_jobs": waiting_jobs,
|
||||
"offline_printers_with_queue": offline_printers_with_queue,
|
||||
"online_printers": online_printers,
|
||||
"total_printers": total_printers,
|
||||
"queue_manager_running": self.is_running,
|
||||
"last_check": datetime.now().isoformat(),
|
||||
"check_interval_seconds": self.check_interval
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
queue_logger.error(f"❌ Fehler beim Abrufen des Queue-Status: {str(e)}")
|
||||
return {
|
||||
"error": str(e),
|
||||
"queue_manager_running": self.is_running
|
||||
}
|
||||
finally:
|
||||
db_session.close()
|
||||
|
||||
# Globale Instanz des Queue-Managers
|
||||
_queue_manager_instance = None
|
||||
|
||||
def get_queue_manager() -> PrinterQueueManager:
|
||||
"""Gibt die globale Instanz des Queue-Managers zurück."""
|
||||
global _queue_manager_instance
|
||||
if _queue_manager_instance is None:
|
||||
_queue_manager_instance = PrinterQueueManager()
|
||||
return _queue_manager_instance
|
||||
|
||||
def start_queue_manager():
|
||||
"""Startet den globalen Queue-Manager."""
|
||||
manager = get_queue_manager()
|
||||
manager.start()
|
||||
return manager
|
||||
|
||||
def stop_queue_manager():
|
||||
"""Stoppt den globalen Queue-Manager."""
|
||||
global _queue_manager_instance
|
||||
if _queue_manager_instance:
|
||||
_queue_manager_instance.stop()
|
||||
_queue_manager_instance = None
|
Loading…
x
Reference in New Issue
Block a user