497 lines
21 KiB
Python
497 lines
21 KiB
Python
"""
|
|
Queue Manager für die Verwaltung von Druckjobs in Warteschlangen.
|
|
Überwacht offline Drucker und aktiviert Jobs automatisch.
|
|
"""
|
|
|
|
import threading
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import os
|
|
import requests
|
|
import signal
|
|
import atexit
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional, Tuple
|
|
from contextlib import contextmanager
|
|
|
|
from models import get_db_session, Job, Printer, User, Notification
|
|
from utils.logging_config import get_logger
|
|
|
|
# Windows-spezifische Imports
|
|
if os.name == 'nt':
|
|
try:
|
|
from utils.windows_fixes import get_windows_thread_manager
|
|
except ImportError:
|
|
get_windows_thread_manager = None
|
|
else:
|
|
get_windows_thread_manager = None
|
|
|
|
# Logger für Queue-Manager
|
|
queue_logger = get_logger("queue_manager")
|
|
|
|
def check_printer_status(ip_address: str, timeout: int = 5) -> Tuple[str, bool]:
|
|
"""
|
|
Vereinfachte Drucker-Status-Prüfung für den Queue Manager.
|
|
|
|
Args:
|
|
ip_address: IP-Adresse der Drucker-Steckdose
|
|
timeout: Timeout in Sekunden (Standard: 5)
|
|
|
|
Returns:
|
|
Tuple[str, bool]: (Status, Aktiv) - Status ist "online" oder "offline", Aktiv ist True/False
|
|
"""
|
|
if not ip_address or ip_address.strip() == "":
|
|
return "offline", False
|
|
|
|
try:
|
|
# Ping-Test um Erreichbarkeit zu prüfen
|
|
if os.name == 'nt': # Windows
|
|
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()]
|
|
else: # Unix/Linux/macOS
|
|
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout + 1,
|
|
encoding='utf-8',
|
|
errors='replace'
|
|
)
|
|
|
|
# Wenn Ping erfolgreich ist, als online betrachten
|
|
if result.returncode == 0:
|
|
queue_logger.debug(f"✅ Drucker {ip_address} ist erreichbar (Ping erfolgreich)")
|
|
return "online", True
|
|
else:
|
|
queue_logger.debug(f"❌ Drucker {ip_address} nicht erreichbar (Ping fehlgeschlagen)")
|
|
return "offline", False
|
|
|
|
except subprocess.TimeoutExpired:
|
|
queue_logger.warning(f"⏱️ Ping-Timeout für Drucker {ip_address} nach {timeout} Sekunden")
|
|
return "offline", False
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler beim Status-Check für Drucker {ip_address}: {str(e)}")
|
|
return "offline", False
|
|
|
|
class PrinterQueueManager:
|
|
"""
|
|
Verwaltet die Warteschlangen für offline Drucker und überwacht deren Status.
|
|
Verbesserte Version mit ordnungsgemäßem Thread-Management für Windows.
|
|
"""
|
|
|
|
def __init__(self, register_signal_handlers: bool = True):
|
|
self.is_running = False
|
|
self.monitor_thread = None
|
|
self.shutdown_event = threading.Event() # Sauberes Shutdown-Signal
|
|
self.check_interval = 120 # 2 Minuten zwischen Status-Checks
|
|
self.last_status_cache = {} # Cache für letzten bekannten Status
|
|
self.notification_cooldown = {} # Verhindert Spam-Benachrichtigungen
|
|
self._lock = threading.Lock() # Thread-Sicherheit
|
|
self._signal_handlers_registered = False
|
|
|
|
# Signal-Handler nur registrieren wenn explizit gewünscht
|
|
# (Verhindert Interferenzen mit zentralem Shutdown-Manager)
|
|
if register_signal_handlers and os.name == 'nt':
|
|
self._register_signal_handlers()
|
|
|
|
def _register_signal_handlers(self):
|
|
"""Windows-spezifische Signal-Handler registrieren (nur wenn gewünscht)"""
|
|
if self._signal_handlers_registered:
|
|
return
|
|
|
|
try:
|
|
# Prüfe ob bereits zentrale Signal-Handler existieren
|
|
try:
|
|
from utils.shutdown_manager import is_shutdown_requested
|
|
if is_shutdown_requested is not None:
|
|
queue_logger.info("🔄 Zentrale Signal-Handler erkannt - deaktiviere lokale Handler")
|
|
return
|
|
except ImportError:
|
|
pass # Kein zentraler Manager verfügbar, verwende lokale Handler
|
|
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
self._signal_handlers_registered = True
|
|
queue_logger.debug("✅ Lokale Signal-Handler für Queue Manager registriert")
|
|
|
|
except Exception as e:
|
|
queue_logger.warning(f"⚠️ Lokale Signal-Handler konnten nicht registriert werden: {e}")
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Signal-Handler für ordnungsgemäßes Shutdown (nur als Fallback)."""
|
|
queue_logger.warning(f"🛑 Signal {signum} empfangen - stoppe Queue Manager...")
|
|
self.stop()
|
|
|
|
def start(self):
|
|
"""Startet den Queue-Manager mit verbessertem Shutdown-Handling."""
|
|
with self._lock:
|
|
if self.is_running:
|
|
queue_logger.warning("Queue-Manager läuft bereits")
|
|
return self
|
|
|
|
queue_logger.info("🚀 Starte Printer Queue Manager...")
|
|
self.is_running = True
|
|
self.shutdown_event.clear()
|
|
|
|
# Monitor-Thread mit Daemon-Flag für automatische Beendigung
|
|
self.monitor_thread = threading.Thread(
|
|
target=self._monitor_loop,
|
|
name="PrinterQueueMonitor",
|
|
daemon=True # Automatische Beendigung bei Programm-Ende
|
|
)
|
|
self.monitor_thread.start()
|
|
|
|
queue_logger.info("✅ Printer Queue Manager gestartet")
|
|
return self
|
|
|
|
def stop(self):
|
|
"""Stoppt den Queue-Manager ordnungsgemäß mit verbessertem Timeout-Handling."""
|
|
with self._lock:
|
|
if not self.is_running:
|
|
queue_logger.debug("Queue-Manager ist bereits gestoppt")
|
|
return
|
|
|
|
queue_logger.info("🔄 Beende Queue Manager...")
|
|
self.is_running = False
|
|
self.shutdown_event.set()
|
|
|
|
if self.monitor_thread and self.monitor_thread.is_alive():
|
|
queue_logger.debug("⏳ Warte auf Thread-Beendigung...")
|
|
|
|
# Verbessertes Timeout-Handling
|
|
try:
|
|
self.monitor_thread.join(timeout=5.0) # Reduziertes Timeout
|
|
|
|
if self.monitor_thread.is_alive():
|
|
queue_logger.warning("⚠️ Thread konnte nicht in 5s beendet werden - setze als Daemon")
|
|
# Thread als Daemon markieren für automatische Beendigung
|
|
self.monitor_thread.daemon = True
|
|
else:
|
|
queue_logger.info("✅ Monitor-Thread erfolgreich beendet")
|
|
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler beim Thread-Join: {e}")
|
|
|
|
self.monitor_thread = None
|
|
queue_logger.info("❌ Printer Queue Manager gestoppt")
|
|
|
|
def _monitor_loop(self):
|
|
"""Hauptschleife für die Überwachung der Drucker mit verbessertem Shutdown-Handling."""
|
|
queue_logger.info(f"🔄 Queue-Überwachung gestartet (Intervall: {self.check_interval} Sekunden)")
|
|
|
|
while self.is_running and not self.shutdown_event.is_set():
|
|
try:
|
|
# Prüfe auf zentrales Shutdown-Signal
|
|
try:
|
|
from utils.shutdown_manager import is_shutdown_requested
|
|
if is_shutdown_requested():
|
|
queue_logger.info("🛑 Zentrales Shutdown-Signal empfangen - beende Monitor-Loop")
|
|
break
|
|
except ImportError:
|
|
pass # Kein zentraler Manager verfügbar
|
|
|
|
self._check_waiting_jobs()
|
|
|
|
# Verwende Event.wait() statt time.sleep() für unterbrechbares Warten
|
|
if self.shutdown_event.wait(timeout=self.check_interval):
|
|
# Shutdown-Signal erhalten
|
|
queue_logger.info("🛑 Shutdown-Signal empfangen - beende Monitor-Loop")
|
|
break
|
|
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler in Monitor-Schleife: {str(e)}")
|
|
# Kürzere Wartezeit bei Fehlern, aber auch unterbrechbar
|
|
if self.shutdown_event.wait(timeout=30):
|
|
break
|
|
|
|
queue_logger.info("🔚 Monitor-Loop beendet")
|
|
|
|
def _check_waiting_jobs(self):
|
|
"""Überprüft alle wartenden Jobs und aktiviert sie bei verfügbaren Druckern."""
|
|
if self.shutdown_event.is_set():
|
|
return
|
|
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Alle wartenden Jobs abrufen
|
|
waiting_jobs = db_session.query(Job).filter(
|
|
Job.status == "waiting_for_printer"
|
|
).all()
|
|
|
|
if not waiting_jobs:
|
|
return
|
|
|
|
queue_logger.info(f"🔍 Überprüfe {len(waiting_jobs)} wartende Jobs...")
|
|
|
|
activated_jobs = []
|
|
|
|
for job in waiting_jobs:
|
|
# Shutdown-Check zwischen Jobs
|
|
if self.shutdown_event.is_set():
|
|
break
|
|
|
|
# Drucker-Status prüfen
|
|
printer = db_session.get(Printer, job.printer_id)
|
|
if not printer:
|
|
continue
|
|
|
|
# Status-Check mit Cache-Optimierung
|
|
printer_key = f"printer_{printer.id}"
|
|
current_status = None
|
|
|
|
try:
|
|
if printer.plug_ip:
|
|
status, active = check_printer_status(printer.plug_ip, timeout=5)
|
|
current_status = "online" if (status == "online" and active) else "offline"
|
|
else:
|
|
current_status = "offline"
|
|
|
|
except Exception as e:
|
|
queue_logger.warning(f"⚠️ Status-Check für Drucker {printer.name} fehlgeschlagen: {str(e)}")
|
|
current_status = "offline"
|
|
|
|
# Prüfen, ob Drucker online geworden ist
|
|
last_status = self.last_status_cache.get(printer_key, "offline")
|
|
self.last_status_cache[printer_key] = current_status
|
|
|
|
if current_status == "online" and last_status == "offline":
|
|
# Drucker ist online geworden!
|
|
queue_logger.info(f"🟢 Drucker {printer.name} ist ONLINE geworden - aktiviere wartende Jobs")
|
|
|
|
# Job aktivieren
|
|
job.status = "scheduled"
|
|
printer.status = "available"
|
|
printer.active = True
|
|
printer.last_checked = datetime.now()
|
|
|
|
activated_jobs.append({
|
|
"job": job,
|
|
"printer": printer
|
|
})
|
|
|
|
elif current_status == "online":
|
|
# Drucker ist bereits online, Job kann aktiviert werden
|
|
job.status = "scheduled"
|
|
printer.status = "available"
|
|
printer.active = True
|
|
printer.last_checked = datetime.now()
|
|
|
|
activated_jobs.append({
|
|
"job": job,
|
|
"printer": printer
|
|
})
|
|
|
|
else:
|
|
# Drucker bleibt offline
|
|
printer.status = "offline"
|
|
printer.active = False
|
|
printer.last_checked = datetime.now()
|
|
|
|
# Speichere alle Änderungen
|
|
if activated_jobs:
|
|
db_session.commit()
|
|
queue_logger.info(f"✅ {len(activated_jobs)} Jobs erfolgreich aktiviert")
|
|
|
|
# Benachrichtigungen versenden (nur wenn nicht im Shutdown)
|
|
if not self.shutdown_event.is_set():
|
|
for item in activated_jobs:
|
|
self._send_job_activation_notification(item["job"], item["printer"])
|
|
else:
|
|
# Auch offline-Status speichern
|
|
db_session.commit()
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
queue_logger.error(f"❌ Fehler beim Überprüfen wartender Jobs: {str(e)}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
def _send_job_activation_notification(self, job: Job, printer: Printer):
|
|
"""Sendet eine Benachrichtigung, wenn ein Job aktiviert wird."""
|
|
if self.shutdown_event.is_set():
|
|
return
|
|
|
|
try:
|
|
# Cooldown prüfen (keine Spam-Benachrichtigungen)
|
|
cooldown_key = f"job_{job.id}_activated"
|
|
now = datetime.now()
|
|
|
|
if cooldown_key in self.notification_cooldown:
|
|
last_notification = self.notification_cooldown[cooldown_key]
|
|
if (now - last_notification).total_seconds() < 300: # 5 Minuten Cooldown
|
|
return
|
|
|
|
self.notification_cooldown[cooldown_key] = now
|
|
|
|
# Benachrichtigung erstellen
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
user = db_session.get(User, job.user_id)
|
|
if not user:
|
|
return
|
|
|
|
notification = Notification(
|
|
user_id=user.id,
|
|
type="job_activated",
|
|
payload={
|
|
"job_id": job.id,
|
|
"job_name": job.name,
|
|
"printer_id": printer.id,
|
|
"printer_name": printer.name,
|
|
"start_time": job.start_at.isoformat() if job.start_at else None,
|
|
"message": f"🎉 Gute Nachrichten! Drucker '{printer.name}' ist online. Ihr Job '{job.name}' wurde aktiviert und startet bald."
|
|
}
|
|
)
|
|
|
|
db_session.add(notification)
|
|
db_session.commit()
|
|
|
|
queue_logger.info(f"📧 Benachrichtigung für User {user.name} gesendet: Job {job.name} aktiviert")
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
queue_logger.error(f"❌ Fehler beim Erstellen der Benachrichtigung: {str(e)}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler beim Senden der Aktivierungs-Benachrichtigung: {str(e)}")
|
|
|
|
def get_queue_status(self) -> Dict:
|
|
"""Gibt den aktuellen Status der Warteschlangen zurück."""
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Wartende Jobs zählen
|
|
waiting_jobs = db_session.query(Job).filter(
|
|
Job.status == "waiting_for_printer"
|
|
).count()
|
|
|
|
# Offline Drucker mit wartenden Jobs
|
|
offline_printers_with_queue = db_session.query(Printer).join(Job).filter(
|
|
Printer.status == "offline",
|
|
Job.status == "waiting_for_printer"
|
|
).distinct().count()
|
|
|
|
# Online Drucker
|
|
online_printers = db_session.query(Printer).filter(
|
|
Printer.status == "available"
|
|
).count()
|
|
|
|
total_printers = db_session.query(Printer).count()
|
|
|
|
return {
|
|
"waiting_jobs": waiting_jobs,
|
|
"offline_printers_with_queue": offline_printers_with_queue,
|
|
"online_printers": online_printers,
|
|
"total_printers": total_printers,
|
|
"queue_manager_running": self.is_running,
|
|
"last_check": datetime.now().isoformat(),
|
|
"check_interval_seconds": self.check_interval
|
|
}
|
|
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler beim Abrufen des Queue-Status: {str(e)}")
|
|
return {
|
|
"error": str(e),
|
|
"queue_manager_running": self.is_running
|
|
}
|
|
finally:
|
|
db_session.close()
|
|
|
|
def is_healthy(self) -> bool:
|
|
"""Prüft, ob der Queue Manager ordnungsgemäß läuft."""
|
|
return (self.is_running and
|
|
self.monitor_thread is not None and
|
|
self.monitor_thread.is_alive() and
|
|
not self.shutdown_event.is_set())
|
|
|
|
# Globale Instanz des Queue-Managers
|
|
_queue_manager_instance = None
|
|
_queue_manager_lock = threading.Lock()
|
|
|
|
def get_queue_manager() -> PrinterQueueManager:
|
|
"""Gibt die globale Instanz des Queue-Managers zurück."""
|
|
global _queue_manager_instance
|
|
with _queue_manager_lock:
|
|
if _queue_manager_instance is None:
|
|
_queue_manager_instance = PrinterQueueManager()
|
|
return _queue_manager_instance
|
|
|
|
def start_queue_manager():
|
|
"""Startet den globalen Queue-Manager sicher und ohne Signal-Handler-Interferenzen."""
|
|
global _queue_manager_instance
|
|
with _queue_manager_lock:
|
|
if _queue_manager_instance is not None:
|
|
queue_logger.warning("Queue-Manager läuft bereits")
|
|
return _queue_manager_instance
|
|
|
|
try:
|
|
queue_logger.info("🚀 Initialisiere neuen Queue-Manager...")
|
|
|
|
# Prüfe ob zentraler Shutdown-Manager verfügbar ist
|
|
register_signals = True
|
|
try:
|
|
from utils.shutdown_manager import is_shutdown_requested
|
|
if is_shutdown_requested is not None:
|
|
queue_logger.info("🔄 Zentrale Shutdown-Verwaltung erkannt - deaktiviere lokale Signal-Handler")
|
|
register_signals = False
|
|
except ImportError:
|
|
queue_logger.debug("Kein zentraler Shutdown-Manager verfügbar - verwende lokale Signal-Handler")
|
|
|
|
# Erstelle Queue-Manager ohne Signal-Handler wenn zentraler Manager vorhanden
|
|
_queue_manager_instance = PrinterQueueManager(register_signal_handlers=register_signals)
|
|
_queue_manager_instance.start()
|
|
|
|
queue_logger.info("✅ Queue-Manager erfolgreich gestartet")
|
|
return _queue_manager_instance
|
|
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler beim Starten des Queue-Managers: {str(e)}")
|
|
_queue_manager_instance = None
|
|
raise
|
|
|
|
def stop_queue_manager():
|
|
"""Stoppt den globalen Queue-Manager definitiv und sicher."""
|
|
global _queue_manager_instance
|
|
with _queue_manager_lock:
|
|
if _queue_manager_instance:
|
|
try:
|
|
queue_logger.info("🔄 Stoppe Queue-Manager...")
|
|
|
|
# Shutdown-Event setzen
|
|
_queue_manager_instance.shutdown_event.set()
|
|
|
|
# Monitor-Thread beenden
|
|
if (_queue_manager_instance.monitor_thread and
|
|
_queue_manager_instance.monitor_thread.is_alive()):
|
|
|
|
queue_logger.info("⏳ Warte auf Monitor-Thread...")
|
|
_queue_manager_instance.monitor_thread.join(timeout=5.0)
|
|
|
|
# Falls Thread nicht beendet wurde, forciere Beendigung
|
|
if _queue_manager_instance.monitor_thread.is_alive():
|
|
queue_logger.warning("⚠️ Monitor-Thread reagiert nicht - forciere Beendigung")
|
|
# Thread als Daemon markieren für automatische Beendigung
|
|
_queue_manager_instance.monitor_thread.daemon = True
|
|
|
|
# Status auf gestoppt setzen
|
|
_queue_manager_instance.is_running = False
|
|
|
|
# Explizit stop() aufrufen
|
|
_queue_manager_instance.stop()
|
|
|
|
queue_logger.info("✅ Queue-Manager erfolgreich gestoppt")
|
|
|
|
except Exception as e:
|
|
queue_logger.error(f"❌ Fehler beim Stoppen des Queue-Managers: {str(e)}")
|
|
finally:
|
|
# Instanz definitiv auf None setzen
|
|
_queue_manager_instance = None
|
|
|
|
# Automatisches Cleanup bei Prozess-Ende registrieren
|
|
atexit.register(stop_queue_manager) |