#!/usr/bin/env python3 """ Intelligenter Watchdog-Manager für MYP Druckerverwaltung Erweiterte Überwachung mit Python für bessere Fehlerbehandlung und Logging Optimiert für Debian/Linux-Systeme im Kiosk-Modus """ import os import sys import time import json import logging import subprocess import threading import signal from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Optional, Callable import psutil import requests from urllib3.exceptions import InsecureRequestWarning # SSL-Warnungen unterdrücken für localhost requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class WatchdogConfig: """Konfiguration für den Watchdog-Manager""" def __init__(self, app_dir: str = "/opt/myp"): self.app_dir = Path(app_dir) self.config_file = self.app_dir / "config" / "watchdog.json" # Standard-Konfiguration self.defaults = { "https_service": "myp-https", "kiosk_service": "myp-kiosk", "kiosk_user": "kiosk", "https_url": "https://localhost:443", "check_interval": 30, "https_timeout": 10, "restart_delay": 15, "max_memory_percent": 85, "cert_expire_days": 7, "log_rotation_size_mb": 10, "max_restart_attempts": 3, "restart_cooldown": 300, "enable_auto_cleanup": True, "enable_performance_monitoring": True } self.config = self.load_config() def load_config(self) -> Dict: """Lädt Konfiguration aus Datei oder verwendet Defaults""" try: if self.config_file.exists(): with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) # Merge mit Defaults merged = self.defaults.copy() merged.update(config) return merged else: self.save_config(self.defaults) return self.defaults.copy() except Exception as e: logging.error(f"Fehler beim Laden der Konfiguration: {e}") return self.defaults.copy() def save_config(self, config: Dict) -> None: """Speichert Konfiguration in Datei""" try: self.config_file.parent.mkdir(parents=True, exist_ok=True) with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) except Exception as e: logging.error(f"Fehler beim Speichern der Konfiguration: {e}") def get(self, key: str, default=None): """Holt Konfigurationswert""" return self.config.get(key, default) def set(self, key: str, value) -> None: """Setzt Konfigurationswert""" self.config[key] = value self.save_config(self.config) class ServiceMonitor: """Überwacht systemd-Services""" def __init__(self, config: WatchdogConfig): self.config = config self.restart_counts = {} self.last_restart_times = {} def is_service_active(self, service_name: str) -> bool: """Prüft ob Service aktiv ist""" try: result = subprocess.run( ["systemctl", "is-active", "--quiet", service_name], capture_output=True ) return result.returncode == 0 except Exception: return False def is_service_enabled(self, service_name: str) -> bool: """Prüft ob Service aktiviert ist""" try: result = subprocess.run( ["systemctl", "is-enabled", "--quiet", service_name], capture_output=True ) return result.returncode == 0 except Exception: return False def restart_service(self, service_name: str) -> bool: """Startet Service neu mit Cooldown-Logik""" now = datetime.now() # Prüfe Restart-Cooldown if service_name in self.last_restart_times: time_since_last = (now - self.last_restart_times[service_name]).total_seconds() if time_since_last < self.config.get("restart_cooldown", 300): logging.warning(f"Service {service_name} im Cooldown ({time_since_last:.0f}s)") return False # Prüfe maximale Restart-Versuche restart_count = self.restart_counts.get(service_name, 0) max_attempts = self.config.get("max_restart_attempts", 3) if restart_count >= max_attempts: logging.error(f"Service {service_name} erreichte maximale Restart-Versuche ({max_attempts})") return False try: logging.info(f"Starte Service neu: {service_name} (Versuch {restart_count + 1}/{max_attempts})") result = subprocess.run( ["systemctl", "restart", service_name], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: self.restart_counts[service_name] = restart_count + 1 self.last_restart_times[service_name] = now time.sleep(self.config.get("restart_delay", 15)) logging.info(f"Service {service_name} erfolgreich neugestartet") return True else: logging.error(f"Service-Neustart fehlgeschlagen: {result.stderr}") return False except subprocess.TimeoutExpired: logging.error(f"Service-Neustart Timeout: {service_name}") return False except Exception as e: logging.error(f"Service-Neustart Fehler: {e}") return False def reset_restart_counter(self, service_name: str) -> None: """Setzt Restart-Zähler zurück""" if service_name in self.restart_counts: del self.restart_counts[service_name] if service_name in self.last_restart_times: del self.last_restart_times[service_name] class HTTPSMonitor: """Überwacht HTTPS-Backend""" def __init__(self, config: WatchdogConfig): self.config = config self.session = requests.Session() self.session.verify = False # Selbstsignierte Zertifikate def check_connectivity(self) -> bool: """Prüft HTTPS-Erreichbarkeit""" try: url = self.config.get("https_url", "https://localhost:443") timeout = self.config.get("https_timeout", 10) response = self.session.get( url, timeout=timeout, allow_redirects=True ) return response.status_code < 500 except Exception as e: logging.debug(f"HTTPS-Konnektivitätsprüfung fehlgeschlagen: {e}") return False def check_ssl_certificate(self) -> bool: """Prüft SSL-Zertifikat-Gültigkeit""" try: cert_file = self.config.app_dir / "certs" / "localhost" / "localhost.crt" if not cert_file.exists(): return False expire_days = self.config.get("cert_expire_days", 7) expire_seconds = expire_days * 86400 result = subprocess.run([ "openssl", "x509", "-in", str(cert_file), "-noout", "-checkend", str(expire_seconds) ], capture_output=True) return result.returncode == 0 except Exception as e: logging.error(f"SSL-Zertifikat-Prüfung fehlgeschlagen: {e}") return False def regenerate_ssl_certificate(self) -> bool: """Regeneriert SSL-Zertifikat""" try: logging.info("Regeneriere SSL-Zertifikat...") # Importiere SSL-Konfiguration sys.path.insert(0, str(self.config.app_dir)) from utils.ssl_config import ensure_ssl_certificates success = ensure_ssl_certificates(str(self.config.app_dir), force_regenerate=True) if success: logging.info("SSL-Zertifikat erfolgreich regeneriert") else: logging.error("SSL-Zertifikat-Regenerierung fehlgeschlagen") return success except Exception as e: logging.error(f"SSL-Zertifikat-Regenerierung Fehler: {e}") return False class KioskMonitor: """Überwacht Kiosk-Session und Browser""" def __init__(self, config: WatchdogConfig): self.config = config self.kiosk_user = config.get("kiosk_user", "kiosk") def check_user_session(self) -> bool: """Prüft ob Kiosk-User-Session aktiv ist""" try: for proc in psutil.process_iter(['username']): if proc.info['username'] == self.kiosk_user: return True return False except Exception: return False def check_chromium_process(self) -> bool: """Prüft ob Chromium-Kiosk-Prozess läuft""" try: for proc in psutil.process_iter(['username', 'cmdline']): if (proc.info['username'] == self.kiosk_user and proc.info['cmdline'] and any('chromium' in arg and 'kiosk' in arg for arg in proc.info['cmdline'])): return True return False except Exception: return False def check_x_server(self) -> bool: """Prüft ob X-Server läuft""" try: for proc in psutil.process_iter(['cmdline']): if (proc.info['cmdline'] and any('X' in arg and ':0' in arg for arg in proc.info['cmdline'])): return True return False except Exception: return False def check_display_availability(self) -> bool: """Prüft ob Display verfügbar ist""" try: result = subprocess.run( ["xdpyinfo"], env={"DISPLAY": ":0"}, capture_output=True, timeout=5 ) return result.returncode == 0 except Exception: return False def restart_kiosk_session(self) -> bool: """Startet Kiosk-Session neu""" try: logging.info("Starte Kiosk-Session neu...") # Beende Kiosk-Prozesse sanft subprocess.run(["pkill", "-u", self.kiosk_user, "-TERM"], timeout=10) time.sleep(5) # Erzwinge Beendigung falls nötig subprocess.run(["pkill", "-u", self.kiosk_user, "-KILL"], timeout=5) time.sleep(2) # Starte Getty-Service neu für Autologin subprocess.run(["systemctl", "restart", "getty@tty1.service"], timeout=15) time.sleep(self.config.get("restart_delay", 15)) logging.info("Kiosk-Session neugestartet") return True except Exception as e: logging.error(f"Kiosk-Session-Neustart fehlgeschlagen: {e}") return False class SystemMonitor: """Überwacht Systemressourcen""" def __init__(self, config: WatchdogConfig): self.config = config def get_memory_usage(self) -> float: """Gibt Speichernutzung in Prozent zurück""" try: return psutil.virtual_memory().percent except Exception: return 0.0 def get_cpu_usage(self) -> float: """Gibt CPU-Nutzung in Prozent zurück""" try: return psutil.cpu_percent(interval=1) except Exception: return 0.0 def get_disk_usage(self) -> float: """Gibt Festplatten-Nutzung in Prozent zurück""" try: return psutil.disk_usage('/').percent except Exception: return 0.0 def cleanup_system_resources(self) -> None: """Bereinigt Systemressourcen""" try: memory_before = self.get_memory_usage() logging.info(f"Bereinige Systemressourcen (Speicher: {memory_before:.1f}%)") kiosk_user = self.config.get("kiosk_user", "kiosk") app_dir = self.config.app_dir # Browser-Cache bereinigen cache_dirs = [ f"/home/{kiosk_user}/.chromium-kiosk/Default/Cache", f"/home/{kiosk_user}/.cache" ] for cache_dir in cache_dirs: if os.path.exists(cache_dir): subprocess.run(["rm", "-rf", f"{cache_dir}/*"], shell=True) # Temporäre Dateien bereinigen temp_dirs = [ "/tmp", str(app_dir / "uploads" / "temp") ] for temp_dir in temp_dirs: if os.path.exists(temp_dir): subprocess.run([ "find", temp_dir, "-type", "f", "-atime", "+1", "-delete" ], timeout=30) # System-Cache leeren subprocess.run(["sync"]) with open("/proc/sys/vm/drop_caches", "w") as f: f.write("3") memory_after = self.get_memory_usage() logging.info(f"Systemressourcen bereinigt (Speicher: {memory_after:.1f}%)") except Exception as e: logging.error(f"Systemressourcen-Bereinigung fehlgeschlagen: {e}") class WatchdogManager: """Hauptklasse für Watchdog-Management""" def __init__(self, app_dir: str = "/opt/myp"): self.config = WatchdogConfig(app_dir) self.service_monitor = ServiceMonitor(self.config) self.https_monitor = HTTPSMonitor(self.config) self.kiosk_monitor = KioskMonitor(self.config) self.system_monitor = SystemMonitor(self.config) self.running = False self.setup_logging() self.setup_signal_handlers() def setup_logging(self) -> None: """Konfiguriert Logging""" log_file = Path("/var/log/kiosk-watchdog-python.log") log_file.parent.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) def setup_signal_handlers(self) -> None: """Konfiguriert Signal-Handler für sauberes Beenden""" def signal_handler(signum, frame): logging.info(f"Signal {signum} empfangen - beende Watchdog...") self.running = False signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def rotate_log_if_needed(self) -> None: """Rotiert Log-Datei bei Bedarf""" try: log_file = Path("/var/log/kiosk-watchdog-python.log") max_size = self.config.get("log_rotation_size_mb", 10) * 1024 * 1024 if log_file.exists() and log_file.stat().st_size > max_size: # Behalte nur die letzten 1000 Zeilen subprocess.run([ "tail", "-n", "1000", str(log_file) ], stdout=open(f"{log_file}.tmp", "w")) log_file.unlink() Path(f"{log_file}.tmp").rename(log_file) logging.info("Log-Datei rotiert (>10MB)") except Exception as e: logging.error(f"Log-Rotation fehlgeschlagen: {e}") def check_https_backend(self) -> None: """Prüft HTTPS-Backend""" service_name = self.config.get("https_service", "myp-https") if not self.service_monitor.is_service_active(service_name): logging.error("HTTPS-Service nicht aktiv") self.service_monitor.restart_service(service_name) elif not self.https_monitor.check_connectivity(): logging.error("HTTPS Backend nicht erreichbar") self.service_monitor.restart_service(service_name) else: # Service läuft - Reset Restart-Counter self.service_monitor.reset_restart_counter(service_name) def check_ssl_certificate(self) -> None: """Prüft SSL-Zertifikat""" if not self.https_monitor.check_ssl_certificate(): cert_file = self.config.app_dir / "certs" / "localhost" / "localhost.crt" if cert_file.exists(): expire_days = self.config.get("cert_expire_days", 7) logging.warning(f"SSL-Zertifikat läuft in {expire_days} Tagen ab") else: logging.error("SSL-Zertifikat fehlt") if self.https_monitor.regenerate_ssl_certificate(): service_name = self.config.get("https_service", "myp-https") self.service_monitor.restart_service(service_name) def check_kiosk_session(self) -> None: """Prüft Kiosk-Session""" if not self.kiosk_monitor.check_user_session(): logging.error("Kiosk-Benutzer-Session nicht aktiv") self.kiosk_monitor.restart_kiosk_session() elif not self.kiosk_monitor.check_x_server(): logging.error("X-Server nicht verfügbar") self.kiosk_monitor.restart_kiosk_session() elif not self.kiosk_monitor.check_display_availability(): logging.error("Display :0 nicht verfügbar") self.kiosk_monitor.restart_kiosk_session() elif not self.kiosk_monitor.check_chromium_process(): logging.warning("Chromium-Kiosk-Prozess nicht gefunden") # Versuche Kiosk-Service zu starten kiosk_service = self.config.get("kiosk_service", "myp-kiosk") if self.service_monitor.is_service_enabled(kiosk_service): subprocess.run(["systemctl", "--user", "start", kiosk_service]) else: # Fallback: Browser direkt starten https_url = self.config.get("https_url", "https://localhost:443") kiosk_user = self.config.get("kiosk_user", "kiosk") subprocess.Popen([ "sudo", "-u", kiosk_user, "DISPLAY=:0", "chromium", "--kiosk", "--no-sandbox", "--ignore-certificate-errors", https_url ], env={"DISPLAY": ":0"}) time.sleep(self.config.get("restart_delay", 15)) def check_system_resources(self) -> None: """Prüft Systemressourcen""" if not self.config.get("enable_performance_monitoring", True): return memory_usage = self.system_monitor.get_memory_usage() max_memory = self.config.get("max_memory_percent", 85) if memory_usage > max_memory: logging.warning(f"Hohe Speichernutzung: {memory_usage:.1f}%") if self.config.get("enable_auto_cleanup", True): self.system_monitor.cleanup_system_resources() def run_monitoring_cycle(self) -> None: """Führt einen Überwachungszyklus durch""" try: # HTTPS Backend prüfen self.check_https_backend() # SSL-Zertifikat prüfen self.check_ssl_certificate() # Kiosk-Session prüfen self.check_kiosk_session() # Systemressourcen prüfen self.check_system_resources() # Log-Rotation self.rotate_log_if_needed() except Exception as e: logging.error(f"Fehler im Überwachungszyklus: {e}") def run(self) -> None: """Startet Hauptüberwachungsschleife""" self.running = True check_interval = self.config.get("check_interval", 30) logging.info(f"Kiosk-Watchdog gestartet (PID: {os.getpid()})") logging.info(f"Überwachungsintervall: {check_interval}s") while self.running: try: self.run_monitoring_cycle() time.sleep(check_interval) except KeyboardInterrupt: logging.info("Watchdog durch Benutzer beendet") break except Exception as e: logging.error(f"Unerwarteter Fehler: {e}") time.sleep(check_interval) logging.info("Kiosk-Watchdog beendet") def main(): """Hauptfunktion""" import argparse parser = argparse.ArgumentParser(description="MYP Kiosk Watchdog Manager") parser.add_argument("--app-dir", default="/opt/myp", help="Anwendungsverzeichnis") parser.add_argument("--config", help="Konfigurationsdatei") parser.add_argument("--daemon", action="store_true", help="Als Daemon ausführen") args = parser.parse_args() try: watchdog = WatchdogManager(args.app_dir) if args.daemon: # Daemon-Modus (für systemd) watchdog.run() else: # Interaktiver Modus print("Starte Watchdog... (Strg+C zum Beenden)") watchdog.run() except Exception as e: logging.error(f"Watchdog-Start fehlgeschlagen: {e}") sys.exit(1) if __name__ == "__main__": main()