manage-your-printer/utils/watchdog_manager.py
2025-06-04 10:03:22 +02:00

590 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Intelligenter Watchdog-Manager für MYP Druckerverwaltung
Erweiterte Überwachung mit Python für bessere Fehlerbehandlung und Logging
Optimiert für Debian/Linux-Systeme im Kiosk-Modus
"""
import os
import sys
import time
import json
import logging
import subprocess
import threading
import signal
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable
import psutil
import requests
from urllib3.exceptions import InsecureRequestWarning
# SSL-Warnungen unterdrücken für localhost
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class WatchdogConfig:
"""Konfiguration für den Watchdog-Manager"""
def __init__(self, app_dir: str = "/opt/myp"):
self.app_dir = Path(app_dir)
self.config_file = self.app_dir / "config" / "watchdog.json"
# Standard-Konfiguration
self.defaults = {
"https_service": "myp-https",
"kiosk_service": "myp-kiosk",
"kiosk_user": "kiosk",
"https_url": "https://localhost:443",
"check_interval": 30,
"https_timeout": 10,
"restart_delay": 15,
"max_memory_percent": 85,
"cert_expire_days": 7,
"log_rotation_size_mb": 10,
"max_restart_attempts": 3,
"restart_cooldown": 300,
"enable_auto_cleanup": True,
"enable_performance_monitoring": True
}
self.config = self.load_config()
def load_config(self) -> Dict:
"""Lädt Konfiguration aus Datei oder verwendet Defaults"""
try:
if self.config_file.exists():
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# Merge mit Defaults
merged = self.defaults.copy()
merged.update(config)
return merged
else:
self.save_config(self.defaults)
return self.defaults.copy()
except Exception as e:
logging.error(f"Fehler beim Laden der Konfiguration: {e}")
return self.defaults.copy()
def save_config(self, config: Dict) -> None:
"""Speichert Konfiguration in Datei"""
try:
self.config_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
except Exception as e:
logging.error(f"Fehler beim Speichern der Konfiguration: {e}")
def get(self, key: str, default=None):
"""Holt Konfigurationswert"""
return self.config.get(key, default)
def set(self, key: str, value) -> None:
"""Setzt Konfigurationswert"""
self.config[key] = value
self.save_config(self.config)
class ServiceMonitor:
"""Überwacht systemd-Services"""
def __init__(self, config: WatchdogConfig):
self.config = config
self.restart_counts = {}
self.last_restart_times = {}
def is_service_active(self, service_name: str) -> bool:
"""Prüft ob Service aktiv ist"""
try:
result = subprocess.run(
["systemctl", "is-active", "--quiet", service_name],
capture_output=True
)
return result.returncode == 0
except Exception:
return False
def is_service_enabled(self, service_name: str) -> bool:
"""Prüft ob Service aktiviert ist"""
try:
result = subprocess.run(
["systemctl", "is-enabled", "--quiet", service_name],
capture_output=True
)
return result.returncode == 0
except Exception:
return False
def restart_service(self, service_name: str) -> bool:
"""Startet Service neu mit Cooldown-Logik"""
now = datetime.now()
# Prüfe Restart-Cooldown
if service_name in self.last_restart_times:
time_since_last = (now - self.last_restart_times[service_name]).total_seconds()
if time_since_last < self.config.get("restart_cooldown", 300):
logging.warning(f"Service {service_name} im Cooldown ({time_since_last:.0f}s)")
return False
# Prüfe maximale Restart-Versuche
restart_count = self.restart_counts.get(service_name, 0)
max_attempts = self.config.get("max_restart_attempts", 3)
if restart_count >= max_attempts:
logging.error(f"Service {service_name} erreichte maximale Restart-Versuche ({max_attempts})")
return False
try:
logging.info(f"Starte Service neu: {service_name} (Versuch {restart_count + 1}/{max_attempts})")
result = subprocess.run(
["systemctl", "restart", service_name],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
self.restart_counts[service_name] = restart_count + 1
self.last_restart_times[service_name] = now
time.sleep(self.config.get("restart_delay", 15))
logging.info(f"Service {service_name} erfolgreich neugestartet")
return True
else:
logging.error(f"Service-Neustart fehlgeschlagen: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logging.error(f"Service-Neustart Timeout: {service_name}")
return False
except Exception as e:
logging.error(f"Service-Neustart Fehler: {e}")
return False
def reset_restart_counter(self, service_name: str) -> None:
"""Setzt Restart-Zähler zurück"""
if service_name in self.restart_counts:
del self.restart_counts[service_name]
if service_name in self.last_restart_times:
del self.last_restart_times[service_name]
class HTTPSMonitor:
"""Überwacht HTTPS-Backend"""
def __init__(self, config: WatchdogConfig):
self.config = config
self.session = requests.Session()
self.session.verify = False # Selbstsignierte Zertifikate
def check_connectivity(self) -> bool:
"""Prüft HTTPS-Erreichbarkeit"""
try:
url = self.config.get("https_url", "https://localhost:443")
timeout = self.config.get("https_timeout", 10)
response = self.session.get(
url,
timeout=timeout,
allow_redirects=True
)
return response.status_code < 500
except Exception as e:
logging.debug(f"HTTPS-Konnektivitätsprüfung fehlgeschlagen: {e}")
return False
def check_ssl_certificate(self) -> bool:
"""Prüft SSL-Zertifikat-Gültigkeit"""
try:
cert_file = self.config.app_dir / "certs" / "localhost" / "localhost.crt"
if not cert_file.exists():
return False
expire_days = self.config.get("cert_expire_days", 7)
expire_seconds = expire_days * 86400
result = subprocess.run([
"openssl", "x509",
"-in", str(cert_file),
"-noout", "-checkend", str(expire_seconds)
], capture_output=True)
return result.returncode == 0
except Exception as e:
logging.error(f"SSL-Zertifikat-Prüfung fehlgeschlagen: {e}")
return False
def regenerate_ssl_certificate(self) -> bool:
"""Regeneriert SSL-Zertifikat"""
try:
logging.info("Regeneriere SSL-Zertifikat...")
# Importiere SSL-Konfiguration
sys.path.insert(0, str(self.config.app_dir))
from utils.ssl_config import ensure_ssl_certificates
success = ensure_ssl_certificates(str(self.config.app_dir), force_regenerate=True)
if success:
logging.info("SSL-Zertifikat erfolgreich regeneriert")
else:
logging.error("SSL-Zertifikat-Regenerierung fehlgeschlagen")
return success
except Exception as e:
logging.error(f"SSL-Zertifikat-Regenerierung Fehler: {e}")
return False
class KioskMonitor:
"""Überwacht Kiosk-Session und Browser"""
def __init__(self, config: WatchdogConfig):
self.config = config
self.kiosk_user = config.get("kiosk_user", "kiosk")
def check_user_session(self) -> bool:
"""Prüft ob Kiosk-User-Session aktiv ist"""
try:
for proc in psutil.process_iter(['username']):
if proc.info['username'] == self.kiosk_user:
return True
return False
except Exception:
return False
def check_chromium_process(self) -> bool:
"""Prüft ob Chromium-Kiosk-Prozess läuft"""
try:
for proc in psutil.process_iter(['username', 'cmdline']):
if (proc.info['username'] == self.kiosk_user and
proc.info['cmdline'] and
any('chromium' in arg and 'kiosk' in arg for arg in proc.info['cmdline'])):
return True
return False
except Exception:
return False
def check_x_server(self) -> bool:
"""Prüft ob X-Server läuft"""
try:
for proc in psutil.process_iter(['cmdline']):
if (proc.info['cmdline'] and
any('X' in arg and ':0' in arg for arg in proc.info['cmdline'])):
return True
return False
except Exception:
return False
def check_display_availability(self) -> bool:
"""Prüft ob Display verfügbar ist"""
try:
result = subprocess.run(
["xdpyinfo"],
env={"DISPLAY": ":0"},
capture_output=True,
timeout=5
)
return result.returncode == 0
except Exception:
return False
def restart_kiosk_session(self) -> bool:
"""Startet Kiosk-Session neu"""
try:
logging.info("Starte Kiosk-Session neu...")
# Beende Kiosk-Prozesse sanft
subprocess.run(["pkill", "-u", self.kiosk_user, "-TERM"], timeout=10)
time.sleep(5)
# Erzwinge Beendigung falls nötig
subprocess.run(["pkill", "-u", self.kiosk_user, "-KILL"], timeout=5)
time.sleep(2)
# Starte Getty-Service neu für Autologin
subprocess.run(["systemctl", "restart", "getty@tty1.service"], timeout=15)
time.sleep(self.config.get("restart_delay", 15))
logging.info("Kiosk-Session neugestartet")
return True
except Exception as e:
logging.error(f"Kiosk-Session-Neustart fehlgeschlagen: {e}")
return False
class SystemMonitor:
"""Überwacht Systemressourcen"""
def __init__(self, config: WatchdogConfig):
self.config = config
def get_memory_usage(self) -> float:
"""Gibt Speichernutzung in Prozent zurück"""
try:
return psutil.virtual_memory().percent
except Exception:
return 0.0
def get_cpu_usage(self) -> float:
"""Gibt CPU-Nutzung in Prozent zurück"""
try:
return psutil.cpu_percent(interval=1)
except Exception:
return 0.0
def get_disk_usage(self) -> float:
"""Gibt Festplatten-Nutzung in Prozent zurück"""
try:
return psutil.disk_usage('/').percent
except Exception:
return 0.0
def cleanup_system_resources(self) -> None:
"""Bereinigt Systemressourcen"""
try:
memory_before = self.get_memory_usage()
logging.info(f"Bereinige Systemressourcen (Speicher: {memory_before:.1f}%)")
kiosk_user = self.config.get("kiosk_user", "kiosk")
app_dir = self.config.app_dir
# Browser-Cache bereinigen
cache_dirs = [
f"/home/{kiosk_user}/.chromium-kiosk/Default/Cache",
f"/home/{kiosk_user}/.cache"
]
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
subprocess.run(["rm", "-rf", f"{cache_dir}/*"], shell=True)
# Temporäre Dateien bereinigen
temp_dirs = [
"/tmp",
str(app_dir / "uploads" / "temp")
]
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
subprocess.run([
"find", temp_dir, "-type", "f", "-atime", "+1", "-delete"
], timeout=30)
# System-Cache leeren
subprocess.run(["sync"])
with open("/proc/sys/vm/drop_caches", "w") as f:
f.write("3")
memory_after = self.get_memory_usage()
logging.info(f"Systemressourcen bereinigt (Speicher: {memory_after:.1f}%)")
except Exception as e:
logging.error(f"Systemressourcen-Bereinigung fehlgeschlagen: {e}")
class WatchdogManager:
"""Hauptklasse für Watchdog-Management"""
def __init__(self, app_dir: str = "/opt/myp"):
self.config = WatchdogConfig(app_dir)
self.service_monitor = ServiceMonitor(self.config)
self.https_monitor = HTTPSMonitor(self.config)
self.kiosk_monitor = KioskMonitor(self.config)
self.system_monitor = SystemMonitor(self.config)
self.running = False
self.setup_logging()
self.setup_signal_handlers()
def setup_logging(self) -> None:
"""Konfiguriert Logging"""
log_file = Path("/var/log/kiosk-watchdog-python.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
def setup_signal_handlers(self) -> None:
"""Konfiguriert Signal-Handler für sauberes Beenden"""
def signal_handler(signum, frame):
logging.info(f"Signal {signum} empfangen - beende Watchdog...")
self.running = False
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def rotate_log_if_needed(self) -> None:
"""Rotiert Log-Datei bei Bedarf"""
try:
log_file = Path("/var/log/kiosk-watchdog-python.log")
max_size = self.config.get("log_rotation_size_mb", 10) * 1024 * 1024
if log_file.exists() and log_file.stat().st_size > max_size:
# Behalte nur die letzten 1000 Zeilen
subprocess.run([
"tail", "-n", "1000", str(log_file)
], stdout=open(f"{log_file}.tmp", "w"))
log_file.unlink()
Path(f"{log_file}.tmp").rename(log_file)
logging.info("Log-Datei rotiert (>10MB)")
except Exception as e:
logging.error(f"Log-Rotation fehlgeschlagen: {e}")
def check_https_backend(self) -> None:
"""Prüft HTTPS-Backend"""
service_name = self.config.get("https_service", "myp-https")
if not self.service_monitor.is_service_active(service_name):
logging.error("HTTPS-Service nicht aktiv")
self.service_monitor.restart_service(service_name)
elif not self.https_monitor.check_connectivity():
logging.error("HTTPS Backend nicht erreichbar")
self.service_monitor.restart_service(service_name)
else:
# Service läuft - Reset Restart-Counter
self.service_monitor.reset_restart_counter(service_name)
def check_ssl_certificate(self) -> None:
"""Prüft SSL-Zertifikat"""
if not self.https_monitor.check_ssl_certificate():
cert_file = self.config.app_dir / "certs" / "localhost" / "localhost.crt"
if cert_file.exists():
expire_days = self.config.get("cert_expire_days", 7)
logging.warning(f"SSL-Zertifikat läuft in {expire_days} Tagen ab")
else:
logging.error("SSL-Zertifikat fehlt")
if self.https_monitor.regenerate_ssl_certificate():
service_name = self.config.get("https_service", "myp-https")
self.service_monitor.restart_service(service_name)
def check_kiosk_session(self) -> None:
"""Prüft Kiosk-Session"""
if not self.kiosk_monitor.check_user_session():
logging.error("Kiosk-Benutzer-Session nicht aktiv")
self.kiosk_monitor.restart_kiosk_session()
elif not self.kiosk_monitor.check_x_server():
logging.error("X-Server nicht verfügbar")
self.kiosk_monitor.restart_kiosk_session()
elif not self.kiosk_monitor.check_display_availability():
logging.error("Display :0 nicht verfügbar")
self.kiosk_monitor.restart_kiosk_session()
elif not self.kiosk_monitor.check_chromium_process():
logging.warning("Chromium-Kiosk-Prozess nicht gefunden")
# Versuche Kiosk-Service zu starten
kiosk_service = self.config.get("kiosk_service", "myp-kiosk")
if self.service_monitor.is_service_enabled(kiosk_service):
subprocess.run(["systemctl", "--user", "start", kiosk_service])
else:
# Fallback: Browser direkt starten
https_url = self.config.get("https_url", "https://localhost:443")
kiosk_user = self.config.get("kiosk_user", "kiosk")
subprocess.Popen([
"sudo", "-u", kiosk_user,
"DISPLAY=:0", "chromium",
"--kiosk", "--no-sandbox", "--ignore-certificate-errors",
https_url
], env={"DISPLAY": ":0"})
time.sleep(self.config.get("restart_delay", 15))
def check_system_resources(self) -> None:
"""Prüft Systemressourcen"""
if not self.config.get("enable_performance_monitoring", True):
return
memory_usage = self.system_monitor.get_memory_usage()
max_memory = self.config.get("max_memory_percent", 85)
if memory_usage > max_memory:
logging.warning(f"Hohe Speichernutzung: {memory_usage:.1f}%")
if self.config.get("enable_auto_cleanup", True):
self.system_monitor.cleanup_system_resources()
def run_monitoring_cycle(self) -> None:
"""Führt einen Überwachungszyklus durch"""
try:
# HTTPS Backend prüfen
self.check_https_backend()
# SSL-Zertifikat prüfen
self.check_ssl_certificate()
# Kiosk-Session prüfen
self.check_kiosk_session()
# Systemressourcen prüfen
self.check_system_resources()
# Log-Rotation
self.rotate_log_if_needed()
except Exception as e:
logging.error(f"Fehler im Überwachungszyklus: {e}")
def run(self) -> None:
"""Startet Hauptüberwachungsschleife"""
self.running = True
check_interval = self.config.get("check_interval", 30)
logging.info(f"Kiosk-Watchdog gestartet (PID: {os.getpid()})")
logging.info(f"Überwachungsintervall: {check_interval}s")
while self.running:
try:
self.run_monitoring_cycle()
time.sleep(check_interval)
except KeyboardInterrupt:
logging.info("Watchdog durch Benutzer beendet")
break
except Exception as e:
logging.error(f"Unerwarteter Fehler: {e}")
time.sleep(check_interval)
logging.info("Kiosk-Watchdog beendet")
def main():
"""Hauptfunktion"""
import argparse
parser = argparse.ArgumentParser(description="MYP Kiosk Watchdog Manager")
parser.add_argument("--app-dir", default="/opt/myp", help="Anwendungsverzeichnis")
parser.add_argument("--config", help="Konfigurationsdatei")
parser.add_argument("--daemon", action="store_true", help="Als Daemon ausführen")
args = parser.parse_args()
try:
watchdog = WatchdogManager(args.app_dir)
if args.daemon:
# Daemon-Modus (für systemd)
watchdog.run()
else:
# Interaktiver Modus
print("Starte Watchdog... (Strg+C zum Beenden)")
watchdog.run()
except Exception as e:
logging.error(f"Watchdog-Start fehlgeschlagen: {e}")
sys.exit(1)
if __name__ == "__main__":
main()