""" Queue Manager für die Verwaltung von Druckjobs in Warteschlangen. Überwacht offline Drucker und aktiviert Jobs automatisch. """ import threading import time import logging import subprocess import os import requests import signal import atexit from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple from contextlib import contextmanager from models import get_db_session, Job, Printer, User, Notification from utils.logging_config import get_logger # Windows-spezifische Imports if os.name == 'nt': try: from utils.windows_fixes import get_windows_thread_manager except ImportError: get_windows_thread_manager = None else: get_windows_thread_manager = None # Logger für Queue-Manager queue_logger = get_logger("queue_manager") def check_printer_status(ip_address: str, timeout: int = 5) -> Tuple[str, bool]: """ Vereinfachte Drucker-Status-Prüfung für den Queue Manager. Args: ip_address: IP-Adresse der Drucker-Steckdose timeout: Timeout in Sekunden (Standard: 5) Returns: Tuple[str, bool]: (Status, Aktiv) - Status ist "online" oder "offline", Aktiv ist True/False """ if not ip_address or ip_address.strip() == "": return "offline", False try: # Ping-Test um Erreichbarkeit zu prüfen if os.name == 'nt': # Windows cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()] else: # Unix/Linux/macOS cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()] result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout + 1 ) # Wenn Ping erfolgreich ist, als online betrachten if result.returncode == 0: queue_logger.debug(f"✅ Drucker {ip_address} ist erreichbar (Ping erfolgreich)") return "online", True else: queue_logger.debug(f"❌ Drucker {ip_address} nicht erreichbar (Ping fehlgeschlagen)") return "offline", False except subprocess.TimeoutExpired: queue_logger.warning(f"⏱️ Ping-Timeout für Drucker {ip_address} nach {timeout} Sekunden") return "offline", False except Exception as e: queue_logger.error(f"❌ Fehler beim Status-Check für Drucker {ip_address}: {str(e)}") return "offline", False class PrinterQueueManager: """ Verwaltet die Warteschlangen für offline Drucker und überwacht deren Status. Verbesserte Version mit ordnungsgemäßem Thread-Management für Windows. """ def __init__(self): self.is_running = False self.monitor_thread = None self.shutdown_event = threading.Event() # Sauberes Shutdown-Signal self.check_interval = 120 # 2 Minuten zwischen Status-Checks self.last_status_cache = {} # Cache für letzten bekannten Status self.notification_cooldown = {} # Verhindert Spam-Benachrichtigungen self._lock = threading.Lock() # Thread-Sicherheit # Windows-spezifische Signal-Handler registrieren if os.name == 'nt': signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """Signal-Handler für ordnungsgemäßes Shutdown.""" queue_logger.warning(f"🛑 Signal {signum} empfangen - stoppe Queue Manager...") self.stop() def start(self): """Startet den Queue-Manager mit verbessertem Thread-Management.""" with self._lock: if not self.is_running: self.is_running = True self.shutdown_event.clear() self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=False) self.monitor_thread.name = "PrinterQueueMonitor" # Windows Thread-Manager verwenden falls verfügbar if os.name == 'nt' and get_windows_thread_manager: try: thread_manager = get_windows_thread_manager() thread_manager.register_thread(self.monitor_thread) thread_manager.register_cleanup_function(self.stop) queue_logger.debug("✅ Queue Manager bei Windows Thread-Manager registriert") except Exception as e: queue_logger.warning(f"⚠️ Windows Thread-Manager nicht verfügbar: {str(e)}") self.monitor_thread.start() queue_logger.info("✅ Printer Queue Manager erfolgreich gestartet") def stop(self): """Stoppt den Queue-Manager ordnungsgemäß.""" with self._lock: if self.is_running: queue_logger.info("🔄 Beende Queue Manager...") self.is_running = False self.shutdown_event.set() if self.monitor_thread and self.monitor_thread.is_alive(): queue_logger.debug("⏳ Warte auf Thread-Beendigung...") self.monitor_thread.join(timeout=10) if self.monitor_thread.is_alive(): queue_logger.warning("⚠️ Thread konnte nicht ordnungsgemäß beendet werden") else: queue_logger.info("✅ Monitor-Thread erfolgreich beendet") self.monitor_thread = None queue_logger.info("❌ Printer Queue Manager gestoppt") def _monitor_loop(self): """Hauptschleife für die Überwachung der Drucker mit verbessertem Shutdown-Handling.""" queue_logger.info(f"🔄 Queue-Überwachung gestartet (Intervall: {self.check_interval} Sekunden)") while self.is_running and not self.shutdown_event.is_set(): try: self._check_waiting_jobs() # Verwende Event.wait() statt time.sleep() für unterbrechbares Warten if self.shutdown_event.wait(timeout=self.check_interval): # Shutdown-Signal erhalten queue_logger.info("🛑 Shutdown-Signal empfangen - beende Monitor-Loop") break except Exception as e: queue_logger.error(f"❌ Fehler in Monitor-Schleife: {str(e)}") # Kürzere Wartezeit bei Fehlern, aber auch unterbrechbar if self.shutdown_event.wait(timeout=30): break queue_logger.info("🔚 Monitor-Loop beendet") def _check_waiting_jobs(self): """Überprüft alle wartenden Jobs und aktiviert sie bei verfügbaren Druckern.""" if self.shutdown_event.is_set(): return db_session = get_db_session() try: # Alle wartenden Jobs abrufen waiting_jobs = db_session.query(Job).filter( Job.status == "waiting_for_printer" ).all() if not waiting_jobs: return queue_logger.info(f"🔍 Überprüfe {len(waiting_jobs)} wartende Jobs...") activated_jobs = [] for job in waiting_jobs: # Shutdown-Check zwischen Jobs if self.shutdown_event.is_set(): break # Drucker-Status prüfen printer = db_session.get(Printer, job.printer_id) if not printer: continue # Status-Check mit Cache-Optimierung printer_key = f"printer_{printer.id}" current_status = None try: if printer.plug_ip: status, active = check_printer_status(printer.plug_ip, timeout=5) current_status = "online" if (status == "online" and active) else "offline" else: current_status = "offline" except Exception as e: queue_logger.warning(f"⚠️ Status-Check für Drucker {printer.name} fehlgeschlagen: {str(e)}") current_status = "offline" # Prüfen, ob Drucker online geworden ist last_status = self.last_status_cache.get(printer_key, "offline") self.last_status_cache[printer_key] = current_status if current_status == "online" and last_status == "offline": # Drucker ist online geworden! queue_logger.info(f"🟢 Drucker {printer.name} ist ONLINE geworden - aktiviere wartende Jobs") # Job aktivieren job.status = "scheduled" printer.status = "available" printer.active = True printer.last_checked = datetime.now() activated_jobs.append({ "job": job, "printer": printer }) elif current_status == "online": # Drucker ist bereits online, Job kann aktiviert werden job.status = "scheduled" printer.status = "available" printer.active = True printer.last_checked = datetime.now() activated_jobs.append({ "job": job, "printer": printer }) else: # Drucker bleibt offline printer.status = "offline" printer.active = False printer.last_checked = datetime.now() # Speichere alle Änderungen if activated_jobs: db_session.commit() queue_logger.info(f"✅ {len(activated_jobs)} Jobs erfolgreich aktiviert") # Benachrichtigungen versenden (nur wenn nicht im Shutdown) if not self.shutdown_event.is_set(): for item in activated_jobs: self._send_job_activation_notification(item["job"], item["printer"]) else: # Auch offline-Status speichern db_session.commit() except Exception as e: db_session.rollback() queue_logger.error(f"❌ Fehler beim Überprüfen wartender Jobs: {str(e)}") finally: db_session.close() def _send_job_activation_notification(self, job: Job, printer: Printer): """Sendet eine Benachrichtigung, wenn ein Job aktiviert wird.""" if self.shutdown_event.is_set(): return try: # Cooldown prüfen (keine Spam-Benachrichtigungen) cooldown_key = f"job_{job.id}_activated" now = datetime.now() if cooldown_key in self.notification_cooldown: last_notification = self.notification_cooldown[cooldown_key] if (now - last_notification).total_seconds() < 300: # 5 Minuten Cooldown return self.notification_cooldown[cooldown_key] = now # Benachrichtigung erstellen db_session = get_db_session() try: user = db_session.get(User, job.user_id) if not user: return notification = Notification( user_id=user.id, type="job_activated", payload={ "job_id": job.id, "job_name": job.name, "printer_id": printer.id, "printer_name": printer.name, "start_time": job.start_at.isoformat() if job.start_at else None, "message": f"🎉 Gute Nachrichten! Drucker '{printer.name}' ist online. Ihr Job '{job.name}' wurde aktiviert und startet bald." } ) db_session.add(notification) db_session.commit() queue_logger.info(f"📧 Benachrichtigung für User {user.name} gesendet: Job {job.name} aktiviert") except Exception as e: db_session.rollback() queue_logger.error(f"❌ Fehler beim Erstellen der Benachrichtigung: {str(e)}") finally: db_session.close() except Exception as e: queue_logger.error(f"❌ Fehler beim Senden der Aktivierungs-Benachrichtigung: {str(e)}") def get_queue_status(self) -> Dict: """Gibt den aktuellen Status der Warteschlangen zurück.""" db_session = get_db_session() try: # Wartende Jobs zählen waiting_jobs = db_session.query(Job).filter( Job.status == "waiting_for_printer" ).count() # Offline Drucker mit wartenden Jobs offline_printers_with_queue = db_session.query(Printer).join(Job).filter( Printer.status == "offline", Job.status == "waiting_for_printer" ).distinct().count() # Online Drucker online_printers = db_session.query(Printer).filter( Printer.status == "available" ).count() total_printers = db_session.query(Printer).count() return { "waiting_jobs": waiting_jobs, "offline_printers_with_queue": offline_printers_with_queue, "online_printers": online_printers, "total_printers": total_printers, "queue_manager_running": self.is_running, "last_check": datetime.now().isoformat(), "check_interval_seconds": self.check_interval } except Exception as e: queue_logger.error(f"❌ Fehler beim Abrufen des Queue-Status: {str(e)}") return { "error": str(e), "queue_manager_running": self.is_running } finally: db_session.close() def is_healthy(self) -> bool: """Prüft, ob der Queue Manager ordnungsgemäß läuft.""" return (self.is_running and self.monitor_thread is not None and self.monitor_thread.is_alive() and not self.shutdown_event.is_set()) # Globale Instanz des Queue-Managers _queue_manager_instance = None _queue_manager_lock = threading.Lock() def get_queue_manager() -> PrinterQueueManager: """Gibt die globale Instanz des Queue-Managers zurück.""" global _queue_manager_instance with _queue_manager_lock: if _queue_manager_instance is None: _queue_manager_instance = PrinterQueueManager() return _queue_manager_instance def start_queue_manager(): """Startet den globalen Queue-Manager.""" manager = get_queue_manager() manager.start() return manager def stop_queue_manager(): """Stoppt den globalen Queue-Manager.""" global _queue_manager_instance with _queue_manager_lock: if _queue_manager_instance: _queue_manager_instance.stop() _queue_manager_instance = None # Automatisches Cleanup bei Prozess-Ende registrieren atexit.register(stop_queue_manager)