🎉 Refactor and optimize database files, enhance error handling with new utility scripts 📚, and update documentation on fault tolerance and unattended operation. 🚀

This commit is contained in:
2025-06-02 14:57:58 +02:00
parent 7bea427bd6
commit 6ff407a895
29 changed files with 3148 additions and 450 deletions

View File

@ -0,0 +1,641 @@
#!/usr/bin/env python3
"""
Robustes Error-Recovery-System für wartungsfreien Produktionsbetrieb
Automatische Fehlererkennung, -behebung und -prävention
"""
import os
import sys
import time
import threading
import traceback
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from enum import Enum
import logging
import json
import subprocess
import psutil
from contextlib import contextmanager
import signal
# Logging-Setup
try:
from utils.logging_config import get_logger
recovery_logger = get_logger("error_recovery")
except ImportError:
logging.basicConfig(level=logging.INFO)
recovery_logger = logging.getLogger("error_recovery")
class ErrorSeverity(Enum):
"""Schweregrade von Fehlern"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class RecoveryAction(Enum):
"""Verfügbare Recovery-Aktionen"""
LOG_ONLY = "log_only"
RESTART_SERVICE = "restart_service"
RESTART_COMPONENT = "restart_component"
CLEAR_CACHE = "clear_cache"
RESET_DATABASE = "reset_database"
RESTART_SYSTEM = "restart_system"
EMERGENCY_STOP = "emergency_stop"
@dataclass
class ErrorPattern:
"""Definiert ein Fehlermuster und zugehörige Recovery-Aktionen"""
name: str
patterns: List[str] # Regex-Patterns für Fehlererkennung
severity: ErrorSeverity
actions: List[RecoveryAction]
max_occurrences: int = 3 # Maximale Anzahl vor Eskalation
time_window: int = 300 # Zeitfenster in Sekunden
escalation_actions: List[RecoveryAction] = field(default_factory=list)
description: str = ""
@dataclass
class ErrorOccurrence:
"""Einzelnes Auftreten eines Fehlers"""
timestamp: datetime
pattern_name: str
error_message: str
severity: ErrorSeverity
context: Dict[str, Any] = field(default_factory=dict)
recovery_attempted: List[RecoveryAction] = field(default_factory=list)
recovery_successful: bool = False
class ErrorRecoveryManager:
"""
Zentraler Manager für automatische Fehlererkennung und -behebung.
Überwacht kontinuierlich das System und führt automatische Recovery durch.
"""
def __init__(self):
self.is_active = False
self.error_patterns: Dict[str, ErrorPattern] = {}
self.error_history: List[ErrorOccurrence] = []
self.recovery_handlers: Dict[RecoveryAction, Callable] = {}
self.monitoring_thread: Optional[threading.Thread] = None
self.lock = threading.Lock()
# Konfiguration
self.config = {
"check_interval": 30, # Sekunden
"max_history_size": 1000,
"auto_recovery_enabled": True,
"critical_error_threshold": 5,
"system_restart_threshold": 10,
"log_file_paths": [
"logs/app/app.log",
"logs/errors/errors.log",
"logs/database/database.log"
]
}
# Initialisiere Standard-Fehlermuster
self._init_default_patterns()
# Initialisiere Recovery-Handler
self._init_recovery_handlers()
recovery_logger.info("🛡️ Error-Recovery-Manager initialisiert")
def _init_default_patterns(self):
"""Initialisiert Standard-Fehlermuster für häufige Probleme"""
patterns = [
# Datenbank-Fehler
ErrorPattern(
name="database_lock",
patterns=[
r"database is locked",
r"SQLite.*locked",
r"OperationalError.*locked"
],
severity=ErrorSeverity.HIGH,
actions=[RecoveryAction.RESET_DATABASE],
max_occurrences=3,
escalation_actions=[RecoveryAction.RESTART_SERVICE],
description="Datenbank-Sperrung"
),
# Memory-Fehler
ErrorPattern(
name="memory_exhausted",
patterns=[
r"MemoryError",
r"Out of memory",
r"Cannot allocate memory"
],
severity=ErrorSeverity.CRITICAL,
actions=[RecoveryAction.CLEAR_CACHE, RecoveryAction.RESTART_SERVICE],
max_occurrences=2,
escalation_actions=[RecoveryAction.RESTART_SYSTEM],
description="Speicher erschöpft"
),
# Network-Fehler
ErrorPattern(
name="connection_error",
patterns=[
r"ConnectionError",
r"Network is unreachable",
r"Connection refused"
],
severity=ErrorSeverity.MEDIUM,
actions=[RecoveryAction.RESTART_COMPONENT],
max_occurrences=5,
escalation_actions=[RecoveryAction.RESTART_SERVICE],
description="Netzwerk-Verbindungsfehler"
),
# Kiosk-Fehler
ErrorPattern(
name="kiosk_crash",
patterns=[
r"chromium.*crashed",
r"firefox.*crashed",
r"X11.*error",
r"Display.*not found"
],
severity=ErrorSeverity.HIGH,
actions=[RecoveryAction.RESTART_COMPONENT],
max_occurrences=3,
escalation_actions=[RecoveryAction.RESTART_SYSTEM],
description="Kiosk-Display Fehler"
),
# Service-Fehler
ErrorPattern(
name="service_failure",
patterns=[
r"systemctl.*failed",
r"Service.*not found",
r"Failed to start"
],
severity=ErrorSeverity.HIGH,
actions=[RecoveryAction.RESTART_SERVICE],
max_occurrences=3,
escalation_actions=[RecoveryAction.RESTART_SYSTEM],
description="System-Service Fehler"
),
# Disk-Fehler
ErrorPattern(
name="disk_full",
patterns=[
r"No space left on device",
r"Disk full",
r"OSError.*28"
],
severity=ErrorSeverity.CRITICAL,
actions=[RecoveryAction.CLEAR_CACHE],
max_occurrences=1,
escalation_actions=[RecoveryAction.EMERGENCY_STOP],
description="Festplatte voll"
),
# Flask-Fehler
ErrorPattern(
name="flask_error",
patterns=[
r"Internal Server Error",
r"500 Internal Server Error",
r"Application failed to start"
],
severity=ErrorSeverity.HIGH,
actions=[RecoveryAction.RESTART_SERVICE],
max_occurrences=3,
escalation_actions=[RecoveryAction.RESTART_SYSTEM],
description="Flask-Anwendungsfehler"
)
]
for pattern in patterns:
self.error_patterns[pattern.name] = pattern
def _init_recovery_handlers(self):
"""Initialisiert Handler für Recovery-Aktionen"""
self.recovery_handlers = {
RecoveryAction.LOG_ONLY: self._handle_log_only,
RecoveryAction.RESTART_SERVICE: self._handle_restart_service,
RecoveryAction.RESTART_COMPONENT: self._handle_restart_component,
RecoveryAction.CLEAR_CACHE: self._handle_clear_cache,
RecoveryAction.RESET_DATABASE: self._handle_reset_database,
RecoveryAction.RESTART_SYSTEM: self._handle_restart_system,
RecoveryAction.EMERGENCY_STOP: self._handle_emergency_stop
}
def start_monitoring(self):
"""Startet kontinuierliche Überwachung"""
if self.is_active:
recovery_logger.warning("Monitoring bereits aktiv")
return
self.is_active = True
self.monitoring_thread = threading.Thread(
target=self._monitor_loop,
daemon=True,
name="ErrorRecoveryMonitor"
)
self.monitoring_thread.start()
recovery_logger.info("🔍 Error-Monitoring gestartet")
def stop_monitoring(self):
"""Stoppt Überwachung"""
self.is_active = False
if self.monitoring_thread and self.monitoring_thread.is_alive():
self.monitoring_thread.join(timeout=5)
recovery_logger.info("🛑 Error-Monitoring gestoppt")
def _monitor_loop(self):
"""Hauptschleife für kontinuierliche Überwachung"""
while self.is_active:
try:
# Log-Dateien prüfen
self._check_log_files()
# System-Metriken prüfen
self._check_system_metrics()
# Service-Status prüfen
self._check_service_status()
# Alte Einträge bereinigen
self._cleanup_old_entries()
time.sleep(self.config["check_interval"])
except Exception as e:
recovery_logger.error(f"Fehler in Monitor-Loop: {e}")
time.sleep(5) # Kurze Pause bei Fehlern
def _check_log_files(self):
"""Prüft Log-Dateien auf Fehlermuster"""
for log_path in self.config["log_file_paths"]:
try:
if not os.path.exists(log_path):
continue
# Lese nur neue Zeilen (vereinfacht)
with open(log_path, 'r', encoding='utf-8') as f:
# Gehe zu den letzten 1000 Zeilen
lines = f.readlines()
recent_lines = lines[-1000:] if len(lines) > 1000 else lines
for line in recent_lines:
self._analyze_log_line(line, log_path)
except Exception as e:
recovery_logger.debug(f"Fehler beim Lesen von {log_path}: {e}")
def _analyze_log_line(self, line: str, source: str):
"""Analysiert einzelne Log-Zeile auf Fehlermuster"""
import re
for pattern_name, pattern in self.error_patterns.items():
for regex in pattern.patterns:
try:
if re.search(regex, line, re.IGNORECASE):
self._handle_error_detection(
pattern_name=pattern_name,
error_message=line.strip(),
context={"source": source, "pattern": regex}
)
break
except Exception as e:
recovery_logger.debug(f"Regex-Fehler für {regex}: {e}")
def _check_system_metrics(self):
"""Prüft System-Metriken auf kritische Werte"""
try:
# Memory-Check
memory = psutil.virtual_memory()
if memory.percent > 95:
self._handle_error_detection(
pattern_name="memory_exhausted",
error_message=f"Speicherverbrauch kritisch: {memory.percent:.1f}%",
context={"memory_percent": memory.percent}
)
# Disk-Check
disk = psutil.disk_usage('/')
if disk.percent > 98:
self._handle_error_detection(
pattern_name="disk_full",
error_message=f"Festplatte fast voll: {disk.percent:.1f}%",
context={"disk_percent": disk.percent}
)
# Load-Check
if hasattr(psutil, 'getloadavg'):
load_avg = psutil.getloadavg()[0]
if load_avg > 5.0: # Sehr hohe Last
self._handle_error_detection(
pattern_name="system_overload",
error_message=f"System-Last kritisch: {load_avg:.2f}",
context={"load_average": load_avg}
)
except Exception as e:
recovery_logger.debug(f"System-Metrics-Check fehlgeschlagen: {e}")
def _check_service_status(self):
"""Prüft Status wichtiger Services"""
services = ["myp-https.service", "myp-kiosk.service"]
for service in services:
try:
result = subprocess.run(
["sudo", "systemctl", "is-active", service],
capture_output=True, text=True, timeout=10
)
if result.returncode != 0:
self._handle_error_detection(
pattern_name="service_failure",
error_message=f"Service {service} nicht aktiv: {result.stdout.strip()}",
context={"service": service, "status": result.stdout.strip()}
)
except Exception as e:
recovery_logger.debug(f"Service-Check für {service} fehlgeschlagen: {e}")
def _handle_error_detection(self, pattern_name: str, error_message: str, context: Dict[str, Any] = None):
"""Behandelt erkannten Fehler und startet Recovery"""
with self.lock:
if pattern_name not in self.error_patterns:
recovery_logger.warning(f"Unbekanntes Fehlermuster: {pattern_name}")
return
pattern = self.error_patterns[pattern_name]
# Prüfe ob bereits kürzlich aufgetreten
recent_occurrences = self._count_recent_occurrences(pattern_name, pattern.time_window)
# Erstelle Error-Occurrence
occurrence = ErrorOccurrence(
timestamp=datetime.now(),
pattern_name=pattern_name,
error_message=error_message,
severity=pattern.severity,
context=context or {}
)
self.error_history.append(occurrence)
recovery_logger.warning(f"🚨 Fehler erkannt: {pattern_name} - {error_message}")
# Entscheide über Recovery-Aktionen
if recent_occurrences >= pattern.max_occurrences:
# Eskalation
actions = pattern.escalation_actions
recovery_logger.error(f"🔥 Eskalation für {pattern_name}: {recent_occurrences} Vorkommen in {pattern.time_window}s")
else:
# Normale Recovery
actions = pattern.actions
# Führe Recovery-Aktionen aus
if self.config["auto_recovery_enabled"]:
self._execute_recovery_actions(occurrence, actions)
def _count_recent_occurrences(self, pattern_name: str, time_window: int) -> int:
"""Zählt kürzliche Vorkommen eines Fehlermusters"""
cutoff_time = datetime.now() - timedelta(seconds=time_window)
return sum(1 for err in self.error_history
if err.pattern_name == pattern_name and err.timestamp > cutoff_time)
def _execute_recovery_actions(self, occurrence: ErrorOccurrence, actions: List[RecoveryAction]):
"""Führt Recovery-Aktionen aus"""
for action in actions:
try:
recovery_logger.info(f"🔧 Führe Recovery-Aktion aus: {action.value}")
handler = self.recovery_handlers.get(action)
if handler:
success = handler(occurrence)
occurrence.recovery_attempted.append(action)
if success:
occurrence.recovery_successful = True
recovery_logger.info(f"✅ Recovery erfolgreich: {action.value}")
break # Stoppe bei erfolgreicher Recovery
else:
recovery_logger.warning(f"❌ Recovery fehlgeschlagen: {action.value}")
else:
recovery_logger.error(f"Kein Handler für Recovery-Aktion: {action.value}")
except Exception as e:
recovery_logger.error(f"Fehler bei Recovery-Aktion {action.value}: {e}")
def _handle_log_only(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: Nur Logging, keine weitere Aktion"""
recovery_logger.info(f"📝 Log-Only für: {occurrence.error_message}")
return True
def _handle_restart_service(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: Service-Neustart"""
try:
from utils.system_control import get_system_control_manager, SystemOperation
manager = get_system_control_manager()
result = manager.schedule_operation(
SystemOperation.SERVICE_RESTART,
delay_seconds=5,
reason=f"Automatische Recovery für: {occurrence.pattern_name}"
)
return result.get("success", False)
except Exception as e:
recovery_logger.error(f"Service-Neustart fehlgeschlagen: {e}")
return False
def _handle_restart_component(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: Komponenten-Neustart (z.B. Kiosk)"""
try:
from utils.system_control import get_system_control_manager, SystemOperation
manager = get_system_control_manager()
result = manager.schedule_operation(
SystemOperation.KIOSK_RESTART,
delay_seconds=5,
reason=f"Automatische Recovery für: {occurrence.pattern_name}"
)
return result.get("success", False)
except Exception as e:
recovery_logger.error(f"Komponenten-Neustart fehlgeschlagen: {e}")
return False
def _handle_clear_cache(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: Cache leeren"""
try:
# App-Caches leeren
from app import clear_user_cache, clear_printer_status_cache
clear_user_cache()
clear_printer_status_cache()
# System-Cache leeren
if os.name != 'nt':
subprocess.run(["sudo", "sync"], timeout=10)
return True
except Exception as e:
recovery_logger.error(f"Cache-Clearing fehlgeschlagen: {e}")
return False
def _handle_reset_database(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: Datenbank-Reset"""
try:
from utils.database_cleanup import safe_database_cleanup
result = safe_database_cleanup(force_mode_switch=True)
return result.get("success", False)
except Exception as e:
recovery_logger.error(f"Database-Reset fehlgeschlagen: {e}")
return False
def _handle_restart_system(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: System-Neustart"""
try:
from utils.system_control import schedule_system_restart
result = schedule_system_restart(
delay_seconds=60,
reason=f"Automatische Recovery für kritischen Fehler: {occurrence.pattern_name}",
force=True
)
return result.get("success", False)
except Exception as e:
recovery_logger.error(f"System-Neustart fehlgeschlagen: {e}")
return False
def _handle_emergency_stop(self, occurrence: ErrorOccurrence) -> bool:
"""Handler: Notfall-Stopp"""
try:
recovery_logger.critical(f"🚨 NOTFALL-STOPP: {occurrence.error_message}")
# Führe sofortigen Shutdown durch
from utils.shutdown_manager import get_shutdown_manager
shutdown_manager = get_shutdown_manager()
shutdown_manager.force_shutdown(1)
return True
except Exception as e:
recovery_logger.error(f"Notfall-Stopp fehlgeschlagen: {e}")
return False
def _cleanup_old_entries(self):
"""Bereinigt alte Error-History-Einträge"""
with self.lock:
if len(self.error_history) > self.config["max_history_size"]:
self.error_history = self.error_history[-self.config["max_history_size"]:]
def get_error_statistics(self) -> Dict[str, Any]:
"""Gibt Fehler-Statistiken zurück"""
with self.lock:
total_errors = len(self.error_history)
# Fehler nach Schweregrad
by_severity = {}
for severity in ErrorSeverity:
by_severity[severity.value] = sum(1 for err in self.error_history
if err.severity == severity)
# Fehler nach Pattern
by_pattern = {}
for pattern_name in self.error_patterns.keys():
by_pattern[pattern_name] = sum(1 for err in self.error_history
if err.pattern_name == pattern_name)
# Letzten 24h
last_24h = datetime.now() - timedelta(hours=24)
recent_errors = sum(1 for err in self.error_history
if err.timestamp > last_24h)
# Recovery-Erfolgsrate
attempted_recoveries = sum(1 for err in self.error_history
if err.recovery_attempted)
successful_recoveries = sum(1 for err in self.error_history
if err.recovery_successful)
success_rate = (successful_recoveries / attempted_recoveries * 100) if attempted_recoveries > 0 else 0
return {
"total_errors": total_errors,
"errors_last_24h": recent_errors,
"by_severity": by_severity,
"by_pattern": by_pattern,
"recovery_success_rate": round(success_rate, 1),
"monitoring_active": self.is_active,
"auto_recovery_enabled": self.config["auto_recovery_enabled"]
}
def get_recent_errors(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Gibt kürzliche Fehler zurück"""
with self.lock:
recent = self.error_history[-limit:] if limit else self.error_history
return [{
"timestamp": err.timestamp.isoformat(),
"pattern_name": err.pattern_name,
"error_message": err.error_message,
"severity": err.severity.value,
"context": err.context,
"recovery_attempted": [action.value for action in err.recovery_attempted],
"recovery_successful": err.recovery_successful
} for err in recent]
# Globaler Error-Recovery-Manager
_error_recovery_manager: Optional[ErrorRecoveryManager] = None
_recovery_lock = threading.Lock()
def get_error_recovery_manager() -> ErrorRecoveryManager:
"""
Singleton-Pattern für globalen Error-Recovery-Manager.
Returns:
ErrorRecoveryManager: Globaler Error-Recovery-Manager
"""
global _error_recovery_manager
with _recovery_lock:
if _error_recovery_manager is None:
_error_recovery_manager = ErrorRecoveryManager()
return _error_recovery_manager
def start_error_monitoring():
"""Startet Error-Monitoring"""
manager = get_error_recovery_manager()
manager.start_monitoring()
def stop_error_monitoring():
"""Stoppt Error-Monitoring"""
manager = get_error_recovery_manager()
manager.stop_monitoring()
def force_error_check(log_message: str = None):
"""Erzwingt manuelle Fehlerprüfung"""
if log_message:
manager = get_error_recovery_manager()
manager._analyze_log_line(log_message, "manual_check")

View File

@ -0,0 +1,658 @@
#!/usr/bin/env python3
"""
Robuste System-Control-Funktionen für wartungsfreien Produktionsbetrieb
Bietet sichere Restart-, Shutdown- und Kiosk-Verwaltungsfunktionen
"""
import os
import sys
import subprocess
import time
import signal
import psutil
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from pathlib import Path
import logging
import json
from contextlib import contextmanager
from enum import Enum
# Logging-Setup
try:
from utils.logging_config import get_logger
system_logger = get_logger("system_control")
except ImportError:
logging.basicConfig(level=logging.INFO)
system_logger = logging.getLogger("system_control")
class SystemOperation(Enum):
"""Verfügbare System-Operationen"""
RESTART = "restart"
SHUTDOWN = "shutdown"
KIOSK_RESTART = "kiosk_restart"
KIOSK_ENABLE = "kiosk_enable"
KIOSK_DISABLE = "kiosk_disable"
SERVICE_RESTART = "service_restart"
EMERGENCY_STOP = "emergency_stop"
class SystemControlManager:
"""
Zentraler Manager für alle System-Control-Operationen.
Bietet sichere und robuste Funktionen für wartungsfreien Betrieb.
"""
def __init__(self):
self.is_windows = os.name == 'nt'
self.pending_operations: Dict[str, Dict] = {}
self.operation_history: List[Dict] = []
self.lock = threading.Lock()
# Konfiguration
self.config = {
"restart_delay": 60, # Sekunden
"shutdown_delay": 30, # Sekunden
"kiosk_restart_delay": 10, # Sekunden
"max_operation_history": 100,
"safety_checks": True,
"require_confirmation": True
}
# Service-Namen für verschiedene Plattformen
self.services = {
"https": "myp-https.service",
"kiosk": "myp-kiosk.service",
"watchdog": "kiosk-watchdog.service"
}
system_logger.info("🔧 System-Control-Manager initialisiert")
def is_safe_to_operate(self) -> Tuple[bool, str]:
"""
Prüft ob System-Operationen sicher ausgeführt werden können.
Returns:
Tuple[bool, str]: (is_safe, reason)
"""
try:
# Prüfe Systemlast
load_avg = psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0
if load_avg > 2.0:
return False, f"Hohe Systemlast: {load_avg:.2f}"
# Prüfe verfügbaren Speicher
memory = psutil.virtual_memory()
if memory.percent > 90:
return False, f"Wenig verfügbarer Speicher: {memory.percent:.1f}% belegt"
# Prüfe aktive Drucker-Jobs
try:
from models import get_db_session, Job
db_session = get_db_session()
active_jobs = db_session.query(Job).filter(
Job.status.in_(["printing", "queued", "preparing"])
).count()
db_session.close()
if active_jobs > 0:
return False, f"Aktive Druckjobs: {active_jobs}"
except Exception as e:
system_logger.warning(f"Job-Prüfung fehlgeschlagen: {e}")
# Prüfe kritische Prozesse
critical_processes = ["chromium", "firefox", "python"]
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
if any(crit in proc.info['name'].lower() for crit in critical_processes):
if proc.info['cpu_percent'] > 80:
return False, f"Kritischer Prozess unter hoher Last: {proc.info['name']}"
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return True, "System ist sicher für Operationen"
except Exception as e:
system_logger.error(f"Fehler bei Sicherheitsprüfung: {e}")
return False, f"Sicherheitsprüfung fehlgeschlagen: {e}"
def schedule_operation(self,
operation: SystemOperation,
delay_seconds: int = None,
user_id: str = None,
reason: str = None,
force: bool = False) -> Dict[str, Any]:
"""
Plant eine System-Operation mit Verzögerung.
Args:
operation: Art der Operation
delay_seconds: Verzögerung in Sekunden (None = Standard)
user_id: ID des anfragenden Benutzers
reason: Grund für die Operation
force: Sicherheitsprüfungen überspringen
Returns:
Dict mit Operation-Details
"""
with self.lock:
# Sicherheitsprüfung (außer bei Force)
if not force and self.config["safety_checks"]:
is_safe, safety_reason = self.is_safe_to_operate()
if not is_safe:
return {
"success": False,
"error": f"Operation abgelehnt: {safety_reason}",
"safety_check": False
}
# Standard-Verzögerung setzen
if delay_seconds is None:
delay_seconds = {
SystemOperation.RESTART: self.config["restart_delay"],
SystemOperation.SHUTDOWN: self.config["shutdown_delay"],
SystemOperation.KIOSK_RESTART: self.config["kiosk_restart_delay"],
SystemOperation.KIOSK_ENABLE: 5,
SystemOperation.KIOSK_DISABLE: 5,
SystemOperation.SERVICE_RESTART: 10,
SystemOperation.EMERGENCY_STOP: 0
}.get(operation, 30)
# Operations-ID generieren
operation_id = f"{operation.value}_{int(time.time())}"
scheduled_time = datetime.now() + timedelta(seconds=delay_seconds)
# Operation speichern
operation_data = {
"id": operation_id,
"operation": operation.value,
"scheduled_time": scheduled_time,
"delay_seconds": delay_seconds,
"user_id": user_id,
"reason": reason or "Keine Begründung angegeben",
"force": force,
"created_at": datetime.now(),
"status": "scheduled"
}
self.pending_operations[operation_id] = operation_data
# Operation in separatem Thread ausführen
thread = threading.Thread(
target=self._execute_delayed_operation,
args=(operation_id,),
daemon=True
)
thread.start()
system_logger.info(f"🕐 Operation geplant: {operation.value} in {delay_seconds}s")
return {
"success": True,
"operation_id": operation_id,
"scheduled_time": scheduled_time.isoformat(),
"delay_seconds": delay_seconds,
"message": f"Operation '{operation.value}' geplant für {scheduled_time.strftime('%H:%M:%S')}"
}
def _execute_delayed_operation(self, operation_id: str):
"""
Führt geplante Operation nach Verzögerung aus.
Args:
operation_id: ID der auszuführenden Operation
"""
try:
operation_data = self.pending_operations.get(operation_id)
if not operation_data:
return
# Warten bis zur geplanten Zeit
scheduled_time = operation_data["scheduled_time"]
wait_time = (scheduled_time - datetime.now()).total_seconds()
if wait_time > 0:
time.sleep(wait_time)
# Status aktualisieren
operation_data["status"] = "executing"
operation_data["executed_at"] = datetime.now()
# Operation ausführen
operation = SystemOperation(operation_data["operation"])
result = self._execute_operation(operation, operation_data)
# Ergebnis speichern
operation_data["result"] = result
operation_data["status"] = "completed" if result.get("success") else "failed"
operation_data["completed_at"] = datetime.now()
# In Historie verschieben
self._move_to_history(operation_id)
except Exception as e:
system_logger.error(f"Fehler bei verzögerter Operation {operation_id}: {e}")
if operation_id in self.pending_operations:
self.pending_operations[operation_id]["status"] = "error"
self.pending_operations[operation_id]["error"] = str(e)
self._move_to_history(operation_id)
def _execute_operation(self, operation: SystemOperation, operation_data: Dict) -> Dict[str, Any]:
"""
Führt die eigentliche System-Operation aus.
Args:
operation: Art der Operation
operation_data: Operation-Daten
Returns:
Dict mit Ergebnis
"""
try:
system_logger.info(f"▶️ Führe Operation aus: {operation.value}")
if operation == SystemOperation.RESTART:
return self._restart_system(operation_data)
elif operation == SystemOperation.SHUTDOWN:
return self._shutdown_system(operation_data)
elif operation == SystemOperation.KIOSK_RESTART:
return self._restart_kiosk(operation_data)
elif operation == SystemOperation.KIOSK_ENABLE:
return self._enable_kiosk(operation_data)
elif operation == SystemOperation.KIOSK_DISABLE:
return self._disable_kiosk(operation_data)
elif operation == SystemOperation.SERVICE_RESTART:
return self._restart_services(operation_data)
elif operation == SystemOperation.EMERGENCY_STOP:
return self._emergency_stop(operation_data)
else:
return {"success": False, "error": f"Unbekannte Operation: {operation.value}"}
except Exception as e:
system_logger.error(f"Fehler bei Operation {operation.value}: {e}")
return {"success": False, "error": str(e)}
def _restart_system(self, operation_data: Dict) -> Dict[str, Any]:
"""Startet das System neu."""
try:
system_logger.warning("🔄 System-Neustart wird ausgeführt...")
# Cleanup vor Neustart
self._cleanup_before_restart()
# System-Neustart je nach Plattform
if self.is_windows:
subprocess.run(["shutdown", "/r", "/t", "0"], check=True)
else:
subprocess.run(["sudo", "systemctl", "reboot"], check=True)
return {"success": True, "message": "System-Neustart initiiert"}
except subprocess.CalledProcessError as e:
return {"success": False, "error": f"Neustart fehlgeschlagen: {e}"}
except Exception as e:
return {"success": False, "error": f"Unerwarteter Fehler: {e}"}
def _shutdown_system(self, operation_data: Dict) -> Dict[str, Any]:
"""Fährt das System herunter."""
try:
system_logger.warning("🛑 System-Shutdown wird ausgeführt...")
# Cleanup vor Shutdown
self._cleanup_before_restart()
# System-Shutdown je nach Plattform
if self.is_windows:
subprocess.run(["shutdown", "/s", "/t", "0"], check=True)
else:
subprocess.run(["sudo", "systemctl", "poweroff"], check=True)
return {"success": True, "message": "System-Shutdown initiiert"}
except subprocess.CalledProcessError as e:
return {"success": False, "error": f"Shutdown fehlgeschlagen: {e}"}
except Exception as e:
return {"success": False, "error": f"Unerwarteter Fehler: {e}"}
def _restart_kiosk(self, operation_data: Dict) -> Dict[str, Any]:
"""Startet nur den Kiosk-Modus neu."""
try:
system_logger.info("🖥️ Kiosk-Neustart wird ausgeführt...")
success_count = 0
errors = []
# Kiosk-Service neustarten
try:
subprocess.run(["sudo", "systemctl", "restart", self.services["kiosk"]],
check=True, timeout=30)
success_count += 1
system_logger.info("✅ Kiosk-Service neugestartet")
except Exception as e:
errors.append(f"Kiosk-Service: {e}")
# Watchdog-Service neustarten (falls vorhanden)
try:
subprocess.run(["sudo", "systemctl", "restart", self.services["watchdog"]],
check=True, timeout=30)
success_count += 1
system_logger.info("✅ Watchdog-Service neugestartet")
except Exception as e:
errors.append(f"Watchdog-Service: {e}")
# X11-Session neustarten
try:
subprocess.run(["sudo", "systemctl", "restart", "getty@tty1.service"],
check=True, timeout=30)
success_count += 1
system_logger.info("✅ X11-Session neugestartet")
except Exception as e:
errors.append(f"X11-Session: {e}")
if success_count > 0:
return {
"success": True,
"message": f"Kiosk neugestartet ({success_count} Services)",
"errors": errors if errors else None
}
else:
return {
"success": False,
"error": "Alle Kiosk-Neustarts fehlgeschlagen",
"details": errors
}
except Exception as e:
return {"success": False, "error": f"Kiosk-Neustart fehlgeschlagen: {e}"}
def _enable_kiosk(self, operation_data: Dict) -> Dict[str, Any]:
"""Aktiviert den Kiosk-Modus."""
try:
system_logger.info("🖥️ Kiosk-Modus wird aktiviert...")
# Kiosk-Service aktivieren und starten
subprocess.run(["sudo", "systemctl", "enable", self.services["kiosk"]],
check=True, timeout=30)
subprocess.run(["sudo", "systemctl", "start", self.services["kiosk"]],
check=True, timeout=30)
# Watchdog aktivieren
try:
subprocess.run(["sudo", "systemctl", "enable", self.services["watchdog"]],
check=True, timeout=30)
subprocess.run(["sudo", "systemctl", "start", self.services["watchdog"]],
check=True, timeout=30)
except Exception as e:
system_logger.warning(f"Watchdog-Aktivierung fehlgeschlagen: {e}")
return {"success": True, "message": "Kiosk-Modus aktiviert"}
except subprocess.CalledProcessError as e:
return {"success": False, "error": f"Kiosk-Aktivierung fehlgeschlagen: {e}"}
except Exception as e:
return {"success": False, "error": f"Unerwarteter Fehler: {e}"}
def _disable_kiosk(self, operation_data: Dict) -> Dict[str, Any]:
"""Deaktiviert den Kiosk-Modus."""
try:
system_logger.info("🖥️ Kiosk-Modus wird deaktiviert...")
# Kiosk-Service stoppen und deaktivieren
subprocess.run(["sudo", "systemctl", "stop", self.services["kiosk"]],
check=True, timeout=30)
subprocess.run(["sudo", "systemctl", "disable", self.services["kiosk"]],
check=True, timeout=30)
# Watchdog stoppen
try:
subprocess.run(["sudo", "systemctl", "stop", self.services["watchdog"]],
check=True, timeout=30)
subprocess.run(["sudo", "systemctl", "disable", self.services["watchdog"]],
check=True, timeout=30)
except Exception as e:
system_logger.warning(f"Watchdog-Deaktivierung fehlgeschlagen: {e}")
return {"success": True, "message": "Kiosk-Modus deaktiviert"}
except subprocess.CalledProcessError as e:
return {"success": False, "error": f"Kiosk-Deaktivierung fehlgeschlagen: {e}"}
except Exception as e:
return {"success": False, "error": f"Unerwarteter Fehler: {e}"}
def _restart_services(self, operation_data: Dict) -> Dict[str, Any]:
"""Startet wichtige Services neu."""
try:
system_logger.info("🔄 Services werden neugestartet...")
success_count = 0
errors = []
# HTTPS-Service neustarten
try:
subprocess.run(["sudo", "systemctl", "restart", self.services["https"]],
check=True, timeout=60)
success_count += 1
system_logger.info("✅ HTTPS-Service neugestartet")
except Exception as e:
errors.append(f"HTTPS-Service: {e}")
# NetworkManager neustarten (falls nötig)
try:
subprocess.run(["sudo", "systemctl", "restart", "NetworkManager"],
check=True, timeout=30)
success_count += 1
system_logger.info("✅ NetworkManager neugestartet")
except Exception as e:
errors.append(f"NetworkManager: {e}")
if success_count > 0:
return {
"success": True,
"message": f"Services neugestartet ({success_count})",
"errors": errors if errors else None
}
else:
return {
"success": False,
"error": "Alle Service-Neustarts fehlgeschlagen",
"details": errors
}
except Exception as e:
return {"success": False, "error": f"Service-Neustart fehlgeschlagen: {e}"}
def _emergency_stop(self, operation_data: Dict) -> Dict[str, Any]:
"""Notfall-Stopp aller Services."""
try:
system_logger.warning("🚨 Notfall-Stopp wird ausgeführt...")
# Flask-App stoppen
try:
os.kill(os.getpid(), signal.SIGTERM)
except Exception as e:
system_logger.error(f"Flask-Stopp fehlgeschlagen: {e}")
return {"success": True, "message": "Notfall-Stopp initiiert"}
except Exception as e:
return {"success": False, "error": f"Notfall-Stopp fehlgeschlagen: {e}"}
def _cleanup_before_restart(self):
"""Führt Cleanup-Operationen vor Neustart/Shutdown aus."""
try:
system_logger.info("🧹 Cleanup vor Neustart/Shutdown...")
# Shutdown-Manager verwenden falls verfügbar
try:
from utils.shutdown_manager import get_shutdown_manager
shutdown_manager = get_shutdown_manager()
shutdown_manager.shutdown(exit_code=0)
except ImportError:
system_logger.warning("Shutdown-Manager nicht verfügbar")
# Datenbank-Cleanup
try:
from utils.database_cleanup import safe_database_cleanup
safe_database_cleanup(force_mode_switch=False)
except ImportError:
system_logger.warning("Database-Cleanup nicht verfügbar")
# Cache leeren
self._clear_caches()
except Exception as e:
system_logger.error(f"Cleanup fehlgeschlagen: {e}")
def _clear_caches(self):
"""Leert alle Caches."""
try:
# User-Cache leeren
from app import clear_user_cache, clear_printer_status_cache
clear_user_cache()
clear_printer_status_cache()
# System-Cache leeren
if not self.is_windows:
subprocess.run(["sudo", "sync"], timeout=10)
subprocess.run(["sudo", "echo", "3", ">", "/proc/sys/vm/drop_caches"],
shell=True, timeout=10)
except Exception as e:
system_logger.warning(f"Cache-Clearing fehlgeschlagen: {e}")
def _move_to_history(self, operation_id: str):
"""Verschiebt abgeschlossene Operation in Historie."""
with self.lock:
if operation_id in self.pending_operations:
operation_data = self.pending_operations.pop(operation_id)
self.operation_history.append(operation_data)
# Historie begrenzen
if len(self.operation_history) > self.config["max_operation_history"]:
self.operation_history = self.operation_history[-self.config["max_operation_history"]:]
def cancel_operation(self, operation_id: str) -> Dict[str, Any]:
"""
Bricht geplante Operation ab.
Args:
operation_id: ID der abzubrechenden Operation
Returns:
Dict mit Ergebnis
"""
with self.lock:
if operation_id not in self.pending_operations:
return {"success": False, "error": "Operation nicht gefunden"}
operation_data = self.pending_operations[operation_id]
if operation_data["status"] == "executing":
return {"success": False, "error": "Operation bereits in Ausführung"}
operation_data["status"] = "cancelled"
operation_data["cancelled_at"] = datetime.now()
self._move_to_history(operation_id)
system_logger.info(f"❌ Operation abgebrochen: {operation_id}")
return {"success": True, "message": "Operation erfolgreich abgebrochen"}
def get_pending_operations(self) -> List[Dict]:
"""Gibt alle geplanten Operationen zurück."""
with self.lock:
return list(self.pending_operations.values())
def get_operation_history(self, limit: int = 20) -> List[Dict]:
"""Gibt Operation-Historie zurück."""
with self.lock:
return self.operation_history[-limit:] if limit else self.operation_history
def get_system_status(self) -> Dict[str, Any]:
"""Gibt aktuellen System-Status zurück."""
try:
# Service-Status prüfen
service_status = {}
for name, service in self.services.items():
try:
result = subprocess.run(
["sudo", "systemctl", "is-active", service],
capture_output=True, text=True, timeout=10
)
service_status[name] = result.stdout.strip()
except Exception as e:
service_status[name] = f"error: {e}"
# System-Metriken
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Aktive Operations
pending_ops = len(self.pending_operations)
return {
"success": True,
"timestamp": datetime.now().isoformat(),
"services": service_status,
"system_metrics": {
"memory_percent": memory.percent,
"memory_available_gb": memory.available / (1024**3),
"disk_percent": disk.percent,
"disk_free_gb": disk.free / (1024**3),
"load_average": psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0
},
"operations": {
"pending": pending_ops,
"history_count": len(self.operation_history)
},
"is_safe": self.is_safe_to_operate()[0]
}
except Exception as e:
return {"success": False, "error": str(e)}
# Globaler System-Control-Manager
_system_control_manager: Optional[SystemControlManager] = None
_control_lock = threading.Lock()
def get_system_control_manager() -> SystemControlManager:
"""
Singleton-Pattern für globalen System-Control-Manager.
Returns:
SystemControlManager: Globaler System-Control-Manager
"""
global _system_control_manager
with _control_lock:
if _system_control_manager is None:
_system_control_manager = SystemControlManager()
return _system_control_manager
# Convenience-Funktionen
def schedule_system_restart(delay_seconds: int = 60, user_id: str = None, reason: str = None, force: bool = False) -> Dict[str, Any]:
"""Plant System-Neustart."""
manager = get_system_control_manager()
return manager.schedule_operation(SystemOperation.RESTART, delay_seconds, user_id, reason, force)
def schedule_system_shutdown(delay_seconds: int = 30, user_id: str = None, reason: str = None, force: bool = False) -> Dict[str, Any]:
"""Plant System-Shutdown."""
manager = get_system_control_manager()
return manager.schedule_operation(SystemOperation.SHUTDOWN, delay_seconds, user_id, reason, force)
def restart_kiosk(delay_seconds: int = 10, user_id: str = None, reason: str = None) -> Dict[str, Any]:
"""Plant Kiosk-Neustart."""
manager = get_system_control_manager()
return manager.schedule_operation(SystemOperation.KIOSK_RESTART, delay_seconds, user_id, reason)
def get_system_status() -> Dict[str, Any]:
"""Gibt System-Status zurück."""
manager = get_system_control_manager()
return manager.get_system_status()