#!/usr/bin/env python3 """ Zentraler Shutdown-Manager für robuste Anwendungsbeendigung Koordiniert alle Cleanup-Operationen mit Timeouts und Fehlerbehandlung """ import os import sys import signal import threading import time import atexit from typing import List, Callable, Dict, Any, Optional from datetime import datetime from contextlib import contextmanager import logging # Logging-Setup mit Fallback try: from utils.logging_config import get_logger shutdown_logger = get_logger("shutdown_manager") except ImportError: logging.basicConfig(level=logging.INFO) shutdown_logger = logging.getLogger("shutdown_manager") class ShutdownManager: """ Zentraler Manager für ordnungsgemäße Anwendungsbeendigung. Koordiniert alle Cleanup-Operationen mit Timeouts und Prioritäten. """ def __init__(self, timeout: int = 30): self.timeout = timeout self.is_shutting_down = False self.shutdown_started = None self.cleanup_functions: List[Dict[str, Any]] = [] self.components: Dict[str, Any] = {} self._lock = threading.Lock() self._signal_handlers_registered = False # Shutdown-Ereignis für Thread-Koordination self.shutdown_event = threading.Event() # Registriere grundlegende Signal-Handler self._register_signal_handlers() shutdown_logger.info("🔧 Shutdown-Manager initialisiert") def _register_signal_handlers(self): """Registriert plattformspezifische Signal-Handler""" if self._signal_handlers_registered: return try: if os.name == 'nt': # Windows signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) if hasattr(signal, 'SIGBREAK'): signal.signal(signal.SIGBREAK, self._signal_handler) else: # Unix/Linux signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGHUP, self._signal_handler) self._signal_handlers_registered = True shutdown_logger.debug("✅ Signal-Handler registriert") except Exception as e: shutdown_logger.warning(f"⚠️ Signal-Handler konnten nicht registriert werden: {e}") def _signal_handler(self, signum, frame): """Zentraler Signal-Handler für ordnungsgemäße Beendigung""" if not self.is_shutting_down: shutdown_logger.warning(f"🛑 Signal {signum} empfangen - starte koordiniertes Shutdown") self.shutdown() def register_component(self, name: str, component: Any, stop_method: str = "stop"): """ Registriert eine Komponente für ordnungsgemäße Beendigung Args: name: Name der Komponente component: Komponenten-Objekt stop_method: Name der Stopp-Methode (Standard: "stop") """ with self._lock: self.components[name] = { 'component': component, 'stop_method': stop_method, 'priority': 1 # Standard-Priorität } shutdown_logger.debug(f"📝 Komponente '{name}' registriert") def register_cleanup_function(self, func: Callable, name: str, priority: int = 1, timeout: int = 10, args: tuple = (), kwargs: dict = None): """ Registriert eine Cleanup-Funktion Args: func: Cleanup-Funktion name: Beschreibender Name priority: Priorität (1=hoch, 3=niedrig) timeout: Timeout in Sekunden args: Funktions-Argumente kwargs: Funktions-Keyword-Argumente """ if kwargs is None: kwargs = {} cleanup_item = { 'function': func, 'name': name, 'priority': priority, 'timeout': timeout, 'args': args, 'kwargs': kwargs } with self._lock: self.cleanup_functions.append(cleanup_item) # Nach Priorität sortieren (1 = höchste Priorität zuerst) self.cleanup_functions.sort(key=lambda x: x['priority']) shutdown_logger.debug(f"📝 Cleanup-Funktion '{name}' registriert (Priorität: {priority})") def register_queue_manager(self, queue_manager_module): """Registriert den Queue Manager für ordnungsgemäße Beendigung""" try: stop_func = getattr(queue_manager_module, 'stop_queue_manager', None) if stop_func: self.register_cleanup_function( func=stop_func, name="Queue Manager", priority=1, # Hohe Priorität timeout=15 ) shutdown_logger.debug("✅ Queue Manager registriert") else: shutdown_logger.warning("⚠️ Queue Manager stop_queue_manager Funktion nicht gefunden") except Exception as e: shutdown_logger.error(f"❌ Fehler beim Registrieren des Queue Managers: {e}") def register_scheduler(self, scheduler, scheduler_enabled: bool = True): """Registriert den Job-Scheduler für ordnungsgemäße Beendigung""" if not scheduler_enabled or not scheduler: shutdown_logger.debug("Scheduler nicht registriert (deaktiviert oder nicht vorhanden)") return def stop_scheduler(): try: if hasattr(scheduler, 'shutdown'): shutdown_logger.info("🔄 Beende Scheduler mit shutdown()...") scheduler.shutdown(wait=True) elif hasattr(scheduler, 'stop'): shutdown_logger.info("🔄 Beende Scheduler mit stop()...") scheduler.stop() else: shutdown_logger.warning("⚠️ Scheduler hat keine bekannte Stop-Methode") shutdown_logger.info("✅ Scheduler erfolgreich gestoppt") except Exception as e: shutdown_logger.error(f"❌ Fehler beim Stoppen des Schedulers: {e}") self.register_cleanup_function( func=stop_scheduler, name="Job Scheduler", priority=1, # Hohe Priorität timeout=10 ) def register_database_cleanup(self): """Registriert Datenbank-Cleanup mit robuster Fehlerbehandlung""" def safe_database_cleanup(): try: shutdown_logger.info("💾 Führe sicheres Datenbank-Cleanup durch...") # Versuche den neuen DatabaseCleanupManager zu verwenden try: from utils.database_cleanup import safe_database_cleanup result = safe_database_cleanup(force_mode_switch=False) # Kein riskantes Mode-Switching if result.get("success", False): shutdown_logger.info(f"✅ Datenbank-Cleanup erfolgreich: {', '.join(result.get('operations', []))}") else: shutdown_logger.warning(f"⚠️ Datenbank-Cleanup mit Problemen: {', '.join(result.get('errors', []))}") except ImportError: shutdown_logger.info("Fallback: Verwende einfaches WAL-Checkpoint...") # Einfacher Fallback nur mit WAL-Checkpoint try: from models import create_optimized_engine from sqlalchemy import text engine = create_optimized_engine() with engine.connect() as conn: result = conn.execute(text("PRAGMA wal_checkpoint(PASSIVE)")).fetchone() if result and result[1] > 0: shutdown_logger.info(f"WAL-Checkpoint: {result[1]} Seiten übertragen") conn.commit() engine.dispose() shutdown_logger.info("✅ Einfaches Datenbank-Cleanup abgeschlossen") except Exception as db_error: shutdown_logger.error(f"❌ Fehler beim einfachen Datenbank-Cleanup: {db_error}") except Exception as cleanup_error: shutdown_logger.error(f"❌ Fehler beim Datenbank-Cleanup: {cleanup_error}") self.register_cleanup_function( func=safe_database_cleanup, name="Datenbank Cleanup", priority=3, # Niedrige Priorität (am Ende) timeout=20 ) def register_windows_thread_manager(self): """Registriert Windows Thread Manager falls verfügbar""" try: if os.name == 'nt': from utils.windows_fixes import get_windows_thread_manager thread_manager = get_windows_thread_manager() if thread_manager: self.register_cleanup_function( func=thread_manager.shutdown_all, name="Windows Thread Manager", priority=2, timeout=15 ) shutdown_logger.debug("✅ Windows Thread Manager registriert") except Exception as e: shutdown_logger.debug(f"Windows Thread Manager nicht verfügbar: {e}") @contextmanager def cleanup_timeout(self, timeout: int, operation_name: str): """Context Manager für Timeout-überwachte Operationen""" start_time = time.time() success = False try: yield success = True except Exception as e: elapsed = time.time() - start_time shutdown_logger.error(f"❌ Fehler bei {operation_name} nach {elapsed:.1f}s: {e}") raise finally: elapsed = time.time() - start_time if success: shutdown_logger.debug(f"✅ {operation_name} abgeschlossen in {elapsed:.1f}s") elif elapsed >= timeout: shutdown_logger.warning(f"⏱️ {operation_name} Timeout nach {elapsed:.1f}s") def shutdown(self, exit_code: int = 0): """ Führt koordiniertes Shutdown aller registrierten Komponenten durch Args: exit_code: Exit-Code für sys.exit() """ if self.is_shutting_down: shutdown_logger.debug("Shutdown bereits in Bearbeitung") return with self._lock: if self.is_shutting_down: return self.is_shutting_down = True self.shutdown_started = datetime.now() shutdown_logger.info("🔄 Starte koordiniertes System-Shutdown...") self.shutdown_event.set() total_start = time.time() try: # 1. Komponenten stoppen (nach Priorität) self._shutdown_components() # 2. Cleanup-Funktionen ausführen (nach Priorität) self._execute_cleanup_functions() total_elapsed = time.time() - total_start shutdown_logger.info(f"✅ Koordiniertes Shutdown abgeschlossen in {total_elapsed:.1f}s") except Exception as e: shutdown_logger.error(f"❌ Fehler beim koordinierten Shutdown: {e}") finally: shutdown_logger.info("🏁 System wird beendet...") sys.exit(exit_code) def _shutdown_components(self): """Stoppt alle registrierten Komponenten""" if not self.components: return shutdown_logger.info(f"🔄 Stoppe {len(self.components)} registrierte Komponenten...") for name, config in self.components.items(): try: component = config['component'] stop_method = config['stop_method'] if hasattr(component, stop_method): with self.cleanup_timeout(10, f"Komponente {name}"): method = getattr(component, stop_method) method() shutdown_logger.debug(f"✅ Komponente '{name}' gestoppt") else: shutdown_logger.warning(f"⚠️ Komponente '{name}' hat keine '{stop_method}' Methode") except Exception as e: shutdown_logger.error(f"❌ Fehler beim Stoppen der Komponente '{name}': {e}") def _execute_cleanup_functions(self): """Führt alle registrierten Cleanup-Funktionen aus""" if not self.cleanup_functions: return shutdown_logger.info(f"🧹 Führe {len(self.cleanup_functions)} Cleanup-Funktionen aus...") for cleanup_item in self.cleanup_functions: try: func = cleanup_item['function'] name = cleanup_item['name'] timeout = cleanup_item['timeout'] args = cleanup_item['args'] kwargs = cleanup_item['kwargs'] shutdown_logger.debug(f"🔄 Führe Cleanup aus: {name}") with self.cleanup_timeout(timeout, f"Cleanup {name}"): func(*args, **kwargs) shutdown_logger.debug(f"✅ Cleanup '{name}' abgeschlossen") except Exception as e: shutdown_logger.error(f"❌ Fehler bei Cleanup '{cleanup_item['name']}': {e}") # Weiter mit nächstem Cleanup continue def is_shutdown_requested(self) -> bool: """Prüft ob Shutdown angefordert wurde""" return self.shutdown_event.is_set() def get_shutdown_time(self) -> Optional[datetime]: """Gibt den Zeitpunkt des Shutdown-Starts zurück""" return self.shutdown_started def force_shutdown(self, exit_code: int = 1): """Erzwingt sofortiges Shutdown ohne Cleanup""" shutdown_logger.warning("🚨 Erzwinge sofortiges Shutdown ohne Cleanup!") sys.exit(exit_code) # Globaler Shutdown-Manager _shutdown_manager: Optional[ShutdownManager] = None _manager_lock = threading.Lock() def get_shutdown_manager(timeout: int = 30) -> ShutdownManager: """ Singleton-Pattern für globalen Shutdown-Manager Args: timeout: Gesamttimeout für Shutdown-Operationen Returns: ShutdownManager: Globaler Shutdown-Manager """ global _shutdown_manager with _manager_lock: if _shutdown_manager is None: _shutdown_manager = ShutdownManager(timeout=timeout) # Registriere atexit-Handler als Fallback atexit.register(_shutdown_manager.shutdown) return _shutdown_manager def register_for_shutdown(component_or_function, name: str, component_stop_method: str = "stop", priority: int = 1, timeout: int = 10): """ Vereinfachte Registrierung für Shutdown Args: component_or_function: Komponente mit Stop-Methode oder Cleanup-Funktion name: Beschreibender Name component_stop_method: Name der Stop-Methode für Komponenten priority: Priorität (1=hoch, 3=niedrig) timeout: Timeout in Sekunden """ manager = get_shutdown_manager() if callable(component_or_function): # Es ist eine Funktion manager.register_cleanup_function( func=component_or_function, name=name, priority=priority, timeout=timeout ) else: # Es ist eine Komponente manager.register_component( name=name, component=component_or_function, stop_method=component_stop_method ) def shutdown_application(exit_code: int = 0): """ Startet koordiniertes Shutdown der Anwendung Args: exit_code: Exit-Code für sys.exit() """ manager = get_shutdown_manager() manager.shutdown(exit_code) def is_shutdown_requested() -> bool: """Prüft ob Shutdown angefordert wurde""" if _shutdown_manager: return _shutdown_manager.is_shutdown_requested() return False