🎉 Verbesserte Backend-Funktionalität durch Windows-sichere Disk-Usage-Bestimmung, Uptime-Berechnung und Einführung eines Kiosk-Timers. Dokumentation aktualisiert und nicht mehr benötigte Dateien entfernt. 🧹

This commit is contained in:
2025-06-01 03:00:04 +02:00
parent 486647fade
commit 8969cf6df6
70 changed files with 89065 additions and 85009 deletions

View File

@@ -1,467 +1,374 @@
import logging
import logging.handlers
# -*- coding: utf-8 -*-
"""
Windows-sichere Logging-Konfiguration für MYP Platform
======================================================
Robuste Logging-Konfiguration mit Windows-spezifischen Fixes für File-Locking-Probleme.
"""
import os
import sys
import time
import platform
import socket
from typing import Dict, Optional, Any
import logging
import threading
from datetime import datetime
from config.settings import (
LOG_DIR, LOG_SUBDIRS, LOG_LEVEL, LOG_FORMAT, LOG_DATE_FORMAT,
get_log_file, ensure_log_directories
)
from functools import wraps
from typing import Optional, Dict, Any
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# Dictionary zur Speicherung der konfigurierten Logger
_loggers: Dict[str, logging.Logger] = {}
# ===== WINDOWS-SICHERE LOGGING-KLASSE =====
# ANSI-Farbcodes für Log-Level
ANSI_COLORS = {
'RESET': '\033[0m',
'BOLD': '\033[1m',
'BLACK': '\033[30m',
'RED': '\033[31m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'BLUE': '\033[34m',
'MAGENTA': '\033[35m',
'CYAN': '\033[36m',
'WHITE': '\033[37m',
'BG_RED': '\033[41m',
'BG_GREEN': '\033[42m',
'BG_YELLOW': '\033[43m',
'BG_BLUE': '\033[44m'
}
class WindowsSafeRotatingFileHandler(RotatingFileHandler):
"""
Windows-sichere Implementierung von RotatingFileHandler.
Behebt das WinError 32 Problem bei gleichzeitigen Log-Dateizugriffen.
"""
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
# Verwende UTF-8 Encoding standardmäßig
if encoding is None:
encoding = 'utf-8'
# Windows-spezifische Konfiguration
self._windows_safe_mode = os.name == 'nt'
self._rotation_lock = threading.Lock()
super().__init__(filename, mode, maxBytes, backupCount, encoding, delay)
def doRollover(self):
"""
Windows-sichere Log-Rotation mit verbessertem Error-Handling.
"""
if not self._windows_safe_mode:
# Normale Rotation für Unix-Systeme
return super().doRollover()
# Windows-spezifische sichere Rotation
with self._rotation_lock:
try:
if self.stream:
self.stream.close()
self.stream = None
# Warte kurz bevor Rotation versucht wird
time.sleep(0.1)
# Versuche Rotation mehrmals mit exponentialem Backoff
max_attempts = 5
for attempt in range(max_attempts):
try:
# Rotation durchführen
super().doRollover()
break
except (PermissionError, OSError) as e:
if attempt == max_attempts - 1:
# Bei letztem Versuch: Erstelle neue Log-Datei ohne Rotation
print(f"WARNUNG: Log-Rotation fehlgeschlagen - erstelle neue Datei: {e}")
self._create_new_log_file()
break
else:
# Warte exponentiell länger bei jedem Versuch
wait_time = 0.5 * (2 ** attempt)
time.sleep(wait_time)
except Exception as e:
print(f"KRITISCHER FEHLER bei Log-Rotation: {e}")
# Notfall: Erstelle neue Log-Datei
self._create_new_log_file()
def _create_new_log_file(self):
"""
Erstellt eine neue Log-Datei als Fallback wenn Rotation fehlschlägt.
"""
try:
# Füge Timestamp zum Dateinamen hinzu
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
base_name, ext = os.path.splitext(self.baseFilename)
new_filename = f"{base_name}_{timestamp}{ext}"
# Öffne neue Datei
self.baseFilename = new_filename
self.stream = self._open()
except Exception as e:
print(f"NOTFALL: Kann keine neue Log-Datei erstellen: {e}")
# Letzter Ausweg: Console-Logging
self.stream = sys.stderr
# Emojis für verschiedene Log-Level und Kategorien
LOG_EMOJIS = {
'DEBUG': '🔍',
'INFO': '',
'WARNING': '⚠️',
'ERROR': '',
'CRITICAL': '🔥',
'app': '🖥️',
'scheduler': '⏱️',
'auth': '🔐',
'jobs': '🖨️',
'printers': '🔧',
'errors': '💥',
'user': '👤',
'kiosk': '📺'
}
# ===== GLOBALE LOGGING-KONFIGURATION =====
# ASCII-Fallback für Emojis bei Encoding-Problemen
EMOJI_FALLBACK = {
'🔍': '[DEBUG]',
'': '[INFO]',
'⚠️': '[WARN]',
'': '[ERROR]',
'🔥': '[CRIT]',
'🖥️': '[APP]',
'⏱️': '[SCHED]',
'🔐': '[AUTH]',
'🖨️': '[JOBS]',
'🔧': '[PRINT]',
'💥': '[ERR]',
'👤': '[USER]',
'📺': '[KIOSK]',
'🐞': '[BUG]',
'🚀': '[START]',
'📂': '[FOLDER]',
'📊': '[CHART]',
'💻': '[PC]',
'🌐': '[WEB]',
'📅': '[TIME]',
'📡': '[SIGNAL]',
'🧩': '[CONTENT]',
'📋': '[HEADER]',
'': '[OK]',
'📦': '[SIZE]'
}
# Logger-Registry für Singleton-Pattern
_logger_registry: Dict[str, logging.Logger] = {}
_logging_initialized = False
_init_lock = threading.Lock()
def safe_emoji(emoji: str) -> str:
"""Gibt ein Emoji zurück oder einen ASCII-Fallback bei Encoding-Problemen."""
def setup_logging(log_level: str = "INFO", base_log_dir: str = None) -> None:
"""
Initialisiert das zentrale Logging-System mit Windows-sicherer Konfiguration.
Args:
log_level: Logging-Level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
base_log_dir: Basis-Verzeichnis für Log-Dateien
"""
global _logging_initialized
with _init_lock:
if _logging_initialized:
return
try:
# Bestimme Log-Verzeichnis
if base_log_dir is None:
current_dir = os.path.dirname(os.path.abspath(__file__))
base_log_dir = os.path.join(current_dir, '..', 'logs')
# Erstelle Log-Verzeichnisse
log_dirs = ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']
for log_dir in log_dirs:
full_path = os.path.join(base_log_dir, log_dir)
os.makedirs(full_path, exist_ok=True)
# Konfiguriere Root-Logger
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
# Entferne existierende Handler um Duplikate zu vermeiden
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Console-Handler für kritische Meldungen
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.WARNING)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - [%(levelname)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(console_formatter)
root_logger.addHandler(console_handler)
_logging_initialized = True
print(f"✅ Logging-System erfolgreich initialisiert (Level: {log_level})")
except Exception as e:
print(f"❌ KRITISCHER FEHLER bei Logging-Initialisierung: {e}")
# Notfall-Konfiguration
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format='%(asctime)s - %(name)s - [%(levelname)s] - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
_logging_initialized = True
def get_logger(name: str, log_level: str = None) -> logging.Logger:
"""
Erstellt oder gibt einen konfigurierten Logger zurück.
Args:
name: Name des Loggers (z.B. 'app', 'auth', 'jobs')
log_level: Optionaler spezifischer Log-Level für diesen Logger
Returns:
Konfigurierter Logger
"""
global _logger_registry
# Stelle sicher, dass Logging initialisiert ist
if not _logging_initialized:
setup_logging()
# Prüfe Registry für existierenden Logger
if name in _logger_registry:
return _logger_registry[name]
try:
# Erste Priorität: Teste, ob das Emoji dargestellt werden kann
test_encoding = sys.stdout.encoding or 'utf-8'
emoji.encode(test_encoding)
# Erstelle neuen Logger
logger = logging.getLogger(name)
# Zweite Prüfung: Windows-spezifische cp1252-Codierung
if os.name == 'nt':
try:
emoji.encode('cp1252')
except UnicodeEncodeError:
# Wenn cp1252 fehlschlägt, verwende Fallback
return EMOJI_FALLBACK.get(emoji, '[?]')
# Setze spezifischen Level falls angegeben
if log_level:
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
return emoji
except (UnicodeEncodeError, LookupError, AttributeError):
return EMOJI_FALLBACK.get(emoji, '[?]')
# Prüfen, ob das Terminal ANSI-Farben unterstützt
def supports_color() -> bool:
"""Prüft, ob das Terminal ANSI-Farben unterstützt."""
if os.name == 'nt':
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# Aktiviere VT100-Unterstützung unter Windows
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
# Setze Console-Output auf UTF-8 für bessere Emoji-Unterstützung
try:
kernel32.SetConsoleOutputCP(65001) # UTF-8
except:
pass
# Versuche UTF-8-Encoding für Emojis zu setzen
try:
import locale
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
except:
try:
# Fallback für deutsche Lokalisierung
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
except:
pass
return True
except:
return False
else:
return sys.stdout.isatty()
USE_COLORS = supports_color()
class ColoredFormatter(logging.Formatter):
"""Formatter, der Farben und Emojis für Logs hinzufügt."""
level_colors = {
'DEBUG': ANSI_COLORS['CYAN'],
'INFO': ANSI_COLORS['GREEN'],
'WARNING': ANSI_COLORS['YELLOW'],
'ERROR': ANSI_COLORS['RED'],
'CRITICAL': ANSI_COLORS['BG_RED'] + ANSI_COLORS['WHITE'] + ANSI_COLORS['BOLD']
}
def format(self, record):
try:
# Basis-Format erstellen
log_fmt = LOG_FORMAT
date_fmt = LOG_DATE_FORMAT
# Emoji dem Level und der Kategorie hinzufügen
level_name = record.levelname
category_name = record.name.split('.')[-1] if '.' in record.name else record.name
level_emoji = safe_emoji(LOG_EMOJIS.get(level_name, ''))
category_emoji = safe_emoji(LOG_EMOJIS.get(category_name, ''))
# Record-Objekt modifizieren (aber temporär)
original_levelname = record.levelname
original_name = record.name
# Emojis hinzufügen
record.levelname = f"{level_emoji} {level_name}"
record.name = f"{category_emoji} {category_name}"
# Farbe hinzufügen wenn unterstützt
if USE_COLORS:
level_color = self.level_colors.get(original_levelname, ANSI_COLORS['RESET'])
record.levelname = f"{level_color}{record.levelname}{ANSI_COLORS['RESET']}"
record.name = f"{ANSI_COLORS['BOLD']}{record.name}{ANSI_COLORS['RESET']}"
# Formatieren
result = super().format(record)
# Originale Werte wiederherstellen
record.levelname = original_levelname
record.name = original_name
return result
except (UnicodeEncodeError, UnicodeDecodeError, AttributeError) as e:
# Fallback bei Unicode-Problemen: Verwende nur ASCII-Text
original_levelname = record.levelname
original_name = record.name
# Emojis durch ASCII-Fallbacks ersetzen
level_fallback = EMOJI_FALLBACK.get(LOG_EMOJIS.get(original_levelname, ''), '[LOG]')
category_name = record.name.split('.')[-1] if '.' in record.name else record.name
category_fallback = EMOJI_FALLBACK.get(LOG_EMOJIS.get(category_name, ''), '[CAT]')
record.levelname = f"{level_fallback} {original_levelname}"
record.name = f"{category_fallback} {category_name}"
# Basis-Formatierung ohne Farben
result = super().format(record)
# Originale Werte wiederherstellen
record.levelname = original_levelname
record.name = original_name
return result
class DebugInfoFilter(logging.Filter):
"""Filter, der Debug-Informationen zu jedem Log-Eintrag hinzufügt."""
def __init__(self, add_hostname=True, add_process_info=True):
super().__init__()
self.add_hostname = add_hostname
self.add_process_info = add_process_info
self.hostname = socket.gethostname() if add_hostname else None
self.pid = os.getpid() if add_process_info else None
def filter(self, record):
# Debug-Informationen hinzufügen
if self.add_hostname and not hasattr(record, 'hostname'):
record.hostname = self.hostname
# Erstelle File-Handler mit Windows-sicherer Rotation
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'logs', name)
os.makedirs(log_dir, exist_ok=True)
if self.add_process_info and not hasattr(record, 'pid'):
record.pid = self.pid
log_file = os.path.join(log_dir, f'{name}.log')
# Zusätzliche Infos für DEBUG-Level
if record.levelno == logging.DEBUG:
# Funktionsname und Zeilennummer hervorheben
if USE_COLORS:
record.funcName = f"{ANSI_COLORS['CYAN']}{record.funcName}{ANSI_COLORS['RESET']}"
record.lineno = f"{ANSI_COLORS['CYAN']}{record.lineno}{ANSI_COLORS['RESET']}"
return True
def setup_logging(debug_mode: bool = False):
"""
Initialisiert das Logging-System und erstellt alle erforderlichen Verzeichnisse.
Args:
debug_mode: Wenn True, wird das Log-Level auf DEBUG gesetzt
"""
ensure_log_directories()
# Log-Level festlegen
log_level = logging.DEBUG if debug_mode else getattr(logging, LOG_LEVEL)
# Root-Logger konfigurieren
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Alle Handler entfernen
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Formatter erstellen (mit und ohne Farben)
colored_formatter = ColoredFormatter(LOG_FORMAT, LOG_DATE_FORMAT)
file_formatter = logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT)
# Filter für zusätzliche Debug-Informationen
debug_filter = DebugInfoFilter()
# Console Handler für alle Logs
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(colored_formatter)
console_handler.addFilter(debug_filter)
# Windows PowerShell UTF-8 Encoding-Unterstützung
if os.name == 'nt' and hasattr(console_handler.stream, 'reconfigure'):
try:
console_handler.stream.reconfigure(encoding='utf-8')
except:
pass
root_logger.addHandler(console_handler)
# File Handler für allgemeine App-Logs
app_log_file = get_log_file("app")
app_handler = logging.handlers.RotatingFileHandler(
app_log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
)
app_handler.setLevel(log_level)
app_handler.setFormatter(file_formatter)
root_logger.addHandler(app_handler)
# Wenn Debug-Modus aktiv, Konfiguration loggen
if debug_mode:
bug_emoji = safe_emoji("🐞")
root_logger.debug(f"{bug_emoji} Debug-Modus aktiviert - Ausführliche Logs werden generiert")
def get_logger(category: str) -> logging.Logger:
"""
Gibt einen konfigurierten Logger für eine bestimmte Kategorie zurück.
Args:
category: Log-Kategorie (app, scheduler, auth, jobs, printers, errors)
Returns:
logging.Logger: Konfigurierter Logger
"""
if category in _loggers:
return _loggers[category]
# Logger erstellen
logger = logging.getLogger(f"myp.{category}")
logger.setLevel(getattr(logging, LOG_LEVEL))
# Verhindere doppelte Logs durch Parent-Logger
logger.propagate = False
# Formatter erstellen (mit und ohne Farben)
colored_formatter = ColoredFormatter(LOG_FORMAT, LOG_DATE_FORMAT)
file_formatter = logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT)
# Filter für zusätzliche Debug-Informationen
debug_filter = DebugInfoFilter()
# Console Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, LOG_LEVEL))
console_handler.setFormatter(colored_formatter)
console_handler.addFilter(debug_filter)
# Windows PowerShell UTF-8 Encoding-Unterstützung
if os.name == 'nt' and hasattr(console_handler.stream, 'reconfigure'):
try:
console_handler.stream.reconfigure(encoding='utf-8')
except:
pass
logger.addHandler(console_handler)
# File Handler für spezifische Kategorie
log_file = get_log_file(category)
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
)
file_handler.setLevel(getattr(logging, LOG_LEVEL))
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# Error-Logs zusätzlich in errors.log schreiben
if category != "errors":
error_log_file = get_log_file("errors")
error_handler = logging.handlers.RotatingFileHandler(
error_log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
# Windows-sicherer RotatingFileHandler
file_handler = WindowsSafeRotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_formatter)
logger.addHandler(error_handler)
_loggers[category] = logger
return logger
# Detaillierter Formatter für File-Logs
file_formatter = logging.Formatter(
'%(asctime)s - [%(name)s] %(name)s - [%(levelname)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
# Handler hinzufügen
logger.addHandler(file_handler)
# Verhindere Propagation zu Root-Logger um Duplikate zu vermeiden
logger.propagate = False
# In Registry speichern
_logger_registry[name] = logger
return logger
except Exception as e:
print(f"❌ Fehler beim Erstellen des Loggers '{name}': {e}")
# Fallback: Einfacher Logger ohne File-Handler
fallback_logger = logging.getLogger(name)
if name not in _logger_registry:
_logger_registry[name] = fallback_logger
return fallback_logger
def log_startup_info():
"""Loggt Startup-Informationen."""
app_logger = get_logger("app")
rocket_emoji = safe_emoji("🚀")
folder_emoji = safe_emoji("📂")
chart_emoji = safe_emoji("📊")
computer_emoji = safe_emoji("💻")
globe_emoji = safe_emoji("🌐")
calendar_emoji = safe_emoji("📅")
app_logger.info("=" * 50)
app_logger.info(f"{rocket_emoji} MYP (Manage Your Printers) wird gestartet...")
app_logger.info(f"{folder_emoji} Log-Verzeichnis: {LOG_DIR}")
app_logger.info(f"{chart_emoji} Log-Level: {LOG_LEVEL}")
app_logger.info(f"{computer_emoji} Betriebssystem: {platform.system()} {platform.release()}")
app_logger.info(f"{globe_emoji} Hostname: {socket.gethostname()}")
app_logger.info(f"{calendar_emoji} Startzeit: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}")
app_logger.info("=" * 50)
# ===== PERFORMANCE-MEASUREMENT DECORATOR =====
# Hilfsfunktionen für das Debugging
def debug_request(logger: logging.Logger, request):
def measure_execution_time(logger: logging.Logger = None, task_name: str = "Task"):
"""
Loggt detaillierte Informationen über eine HTTP-Anfrage.
Decorator zur Messung und Protokollierung der Ausführungszeit von Funktionen.
Args:
logger: Logger-Instanz
request: Flask-Request-Objekt
"""
if logger.level > logging.DEBUG:
return
web_emoji = safe_emoji("🌐")
signal_emoji = safe_emoji("📡")
puzzle_emoji = safe_emoji("🧩")
clipboard_emoji = safe_emoji("📋")
search_emoji = safe_emoji("🔍")
logger.debug(f"{web_emoji} HTTP-Anfrage: {request.method} {request.path}")
logger.debug(f"{signal_emoji} Remote-Adresse: {request.remote_addr}")
logger.debug(f"{puzzle_emoji} Inhaltstyp: {request.content_type}")
# Nur relevante Headers ausgeben
important_headers = ['User-Agent', 'Referer', 'X-Forwarded-For', 'Authorization']
headers = {k: v for k, v in request.headers.items() if k in important_headers}
if headers:
logger.debug(f"{clipboard_emoji} Wichtige Headers: {headers}")
# Request-Parameter (max. 1000 Zeichen)
if request.args:
args_str = str(request.args)
if len(args_str) > 1000:
args_str = args_str[:997] + "..."
logger.debug(f"{search_emoji} URL-Parameter: {args_str}")
def debug_response(logger: logging.Logger, response, duration_ms: float = None):
"""
Loggt detaillierte Informationen über eine HTTP-Antwort.
Args:
logger: Logger-Instanz
response: Flask-Response-Objekt
duration_ms: Verarbeitungsdauer in Millisekunden (optional)
"""
if logger.level > logging.DEBUG:
return
status_emoji = safe_emoji("") if response.status_code < 400 else safe_emoji("")
logger.debug(f"{status_emoji} HTTP-Antwort: {response.status_code}")
if duration_ms is not None:
timer_emoji = safe_emoji("⏱️")
logger.debug(f"{timer_emoji} Verarbeitungsdauer: {duration_ms:.2f} ms")
content_length = response.content_length or 0
if content_length > 0:
size_str = f"{content_length / 1024:.1f} KB" if content_length > 1024 else f"{content_length} Bytes"
package_emoji = safe_emoji("📦")
logger.debug(f"{package_emoji} Antwortgröße: {size_str}")
def measure_execution_time(func=None, logger=None, task_name=None):
"""
Dekorator, der die Ausführungszeit einer Funktion misst und loggt.
Args:
func: Die zu dekorierende Funktion
logger: Logger-Instanz (optional)
task_name: Name der Aufgabe für das Logging (optional)
logger: Logger-Instanz für die Ausgabe
task_name: Bezeichnung der Aufgabe für die Logs
Returns:
Dekorierte Funktion
Decorator-Funktion
"""
from functools import wraps
def decorator(f):
@wraps(f)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = f(*args, **kwargs)
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
name = task_name or f.__name__
# Verwende provided Logger oder erstelle Standard-Logger
log = logger or get_logger("performance")
if logger:
timer_emoji = safe_emoji('⏱️')
if duration_ms > 1000: # Länger als 1 Sekunde
logger.warning(f"{timer_emoji} Langsame Ausführung: {name} - {duration_ms:.2f} ms")
else:
logger.debug(f"{timer_emoji} Ausführungszeit: {name} - {duration_ms:.2f} ms")
return result
try:
# Führe Funktion aus
result = func(*args, **kwargs)
# Berechne Ausführungszeit
execution_time = (time.time() - start_time) * 1000 # in Millisekunden
# Protokolliere Erfolg
log.info(f"{task_name} '{func.__name__}' erfolgreich in {execution_time:.2f}ms")
return result
except Exception as e:
# Berechne Ausführungszeit auch bei Fehlern
execution_time = (time.time() - start_time) * 1000
# Protokolliere Fehler
log.error(f"{task_name} '{func.__name__}' fehlgeschlagen nach {execution_time:.2f}ms: {str(e)}")
# Exception weiterleiten
raise
return wrapper
return decorator
# ===== STARTUP/DEBUG LOGGING =====
def log_startup_info():
"""
Protokolliert System-Startup-Informationen.
"""
startup_logger = get_logger("startup")
if func:
return decorator(func)
return decorator
try:
startup_logger.info("=" * 50)
startup_logger.info("🚀 MYP Platform Backend wird gestartet...")
startup_logger.info(f"🐍 Python Version: {sys.version}")
startup_logger.info(f"💻 Betriebssystem: {os.name} ({sys.platform})")
startup_logger.info(f"📁 Arbeitsverzeichnis: {os.getcwd()}")
startup_logger.info(f"⏰ Startzeit: {datetime.now().isoformat()}")
# Windows-spezifische Informationen
if os.name == 'nt':
startup_logger.info("🪟 Windows-Modus: Aktiviert")
startup_logger.info("🔒 Windows-sichere Log-Rotation: Aktiviert")
startup_logger.info("=" * 50)
except Exception as e:
print(f"❌ Fehler beim Startup-Logging: {e}")
def debug_request(logger: logging.Logger, request) -> None:
"""
Detailliertes Request-Debugging.
Args:
logger: Logger für die Ausgabe
request: Flask Request-Objekt
"""
try:
logger.debug(f"📨 REQUEST: {request.method} {request.path}")
logger.debug(f"🌐 Remote-Adresse: {request.remote_addr}")
logger.debug(f"🔤 Content-Type: {request.content_type}")
if request.args:
logger.debug(f"❓ Query-Parameter: {dict(request.args)}")
if request.form and logger.level <= logging.DEBUG:
# Filtere sensible Daten aus Form-Daten
safe_form = {k: '***' if 'password' in k.lower() else v for k, v in request.form.items()}
logger.debug(f"📝 Form-Daten: {safe_form}")
except Exception as e:
logger.error(f"❌ Fehler beim Request-Debugging: {str(e)}")
def debug_response(logger: logging.Logger, response, duration_ms: Optional[float] = None) -> None:
"""
Detailliertes Response-Debugging.
Args:
logger: Logger für die Ausgabe
response: Flask Response-Objekt
duration_ms: Optionale Ausführungszeit in Millisekunden
"""
try:
status_emoji = "" if response.status_code < 400 else "" if response.status_code >= 500 else "⚠️"
log_msg = f"📤 RESPONSE: {status_emoji} {response.status_code}"
if duration_ms is not None:
log_msg += f" ({duration_ms:.2f}ms)"
logger.debug(log_msg)
logger.debug(f"📏 Content-Length: {response.content_length or 'Unbekannt'}")
except Exception as e:
logger.error(f"❌ Fehler beim Response-Debugging: {str(e)}")
# ===== NOTFALL-LOGGING =====
def emergency_log(message: str, level: str = "ERROR") -> None:
"""
Notfall-Logging das auch funktioniert wenn das Hauptsystem fehlschlägt.
Args:
message: Nachricht
level: Log-Level
"""
try:
# Versuche normales Logging
logger = get_logger("emergency")
getattr(logger, level.lower(), logger.error)(message)
except:
# Fallback zu Print
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[NOTFALL {timestamp}] [{level}] {message}")
# Auto-Initialisierung beim Import
if __name__ != "__main__":
try:
setup_logging()
except Exception as e:
print(f"❌ Auto-Initialisierung des Logging-Systems fehlgeschlagen: {e}")

Binary file not shown.

View File

@@ -0,0 +1,675 @@
"""
Timer-Manager für Countdown-Zähler mit Force-Quit-Funktionalität
Dieses Modul verwaltet System-Timer für verschiedene Anwendungsfälle:
- Kiosk-Timer für automatische Session-Beendigung
- Job-Timer für Druckaufträge mit Timeout
- Session-Timer für Benutzerinaktivität
- Wartungs-Timer für geplante System-Shutdowns
Autor: System
Erstellt: 2025
"""
import threading
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from contextlib import contextmanager
from models import SystemTimer, get_db_session, get_cached_session
from utils.logging_config import get_logger
logger = get_logger("timer_manager")
class TimerType(Enum):
"""Verfügbare Timer-Typen"""
KIOSK = "kiosk"
SESSION = "session"
JOB = "job"
SYSTEM = "system"
MAINTENANCE = "maintenance"
class ForceQuitAction(Enum):
"""Verfügbare Force-Quit-Aktionen"""
LOGOUT = "logout"
RESTART = "restart"
SHUTDOWN = "shutdown"
CUSTOM = "custom"
class TimerStatus(Enum):
"""Timer-Status-Werte"""
STOPPED = "stopped"
RUNNING = "running"
PAUSED = "paused"
EXPIRED = "expired"
FORCE_QUIT = "force_quit"
class TimerManager:
"""
Zentraler Timer-Manager für alle System-Timer.
Verwaltet Timer-Instanzen und führt automatische Cleanup-Operationen durch.
"""
def __init__(self):
self._timers: Dict[str, SystemTimer] = {}
self._timer_callbacks: Dict[str, List[Callable]] = {}
self._force_quit_handlers: Dict[str, Callable] = {}
self._background_thread: Optional[threading.Thread] = None
self._shutdown_flag = threading.Event()
self._update_interval = 1.0 # Sekunden zwischen Updates
# Standard Force-Quit-Handler registrieren
self._register_default_handlers()
# Background-Thread für Timer-Updates starten
self._start_background_thread()
logger.info("Timer-Manager initialisiert")
def _register_default_handlers(self):
"""Registriert Standard-Handler für Force-Quit-Aktionen"""
def logout_handler(timer: SystemTimer) -> bool:
"""Standard-Handler für Logout-Aktion"""
try:
logger.info(f"Logout-Handler für Timer '{timer.name}' ausgeführt")
# Hier würde der tatsächliche Logout implementiert werden
# Das wird in app.py über die API-Endpunkte gemacht
return True
except Exception as e:
logger.error(f"Fehler im Logout-Handler: {str(e)}")
return False
def restart_handler(timer: SystemTimer) -> bool:
"""Standard-Handler für System-Restart"""
try:
logger.warning(f"System-Restart durch Timer '{timer.name}' ausgelöst")
# Implementierung würde über System-API erfolgen
return True
except Exception as e:
logger.error(f"Fehler im Restart-Handler: {str(e)}")
return False
def shutdown_handler(timer: SystemTimer) -> bool:
"""Standard-Handler für System-Shutdown"""
try:
logger.warning(f"System-Shutdown durch Timer '{timer.name}' ausgelöst")
# Implementierung würde über System-API erfolgen
return True
except Exception as e:
logger.error(f"Fehler im Shutdown-Handler: {str(e)}")
return False
# Handler registrieren
self._force_quit_handlers[ForceQuitAction.LOGOUT.value] = logout_handler
self._force_quit_handlers[ForceQuitAction.RESTART.value] = restart_handler
self._force_quit_handlers[ForceQuitAction.SHUTDOWN.value] = shutdown_handler
def _start_background_thread(self):
"""Startet den Background-Thread für Timer-Updates"""
if self._background_thread is None or not self._background_thread.is_alive():
self._background_thread = threading.Thread(
target=self._background_worker,
name="TimerManager-Background",
daemon=True
)
self._background_thread.start()
logger.debug("Background-Thread für Timer-Updates gestartet")
def _background_worker(self):
"""Background-Worker für kontinuierliche Timer-Updates"""
logger.debug("Timer-Manager Background-Worker gestartet")
while not self._shutdown_flag.is_set():
try:
self._update_all_timers()
self._process_expired_timers()
# Warte bis zum nächsten Update
self._shutdown_flag.wait(self._update_interval)
except Exception as e:
logger.error(f"Fehler im Timer-Background-Worker: {str(e)}")
time.sleep(5) # Kurze Pause bei Fehlern
logger.debug("Timer-Manager Background-Worker beendet")
def _update_all_timers(self):
"""Aktualisiert alle Timer aus der Datenbank"""
try:
with get_cached_session() as session:
# Lade alle aktiven Timer aus der Datenbank
db_timers = session.query(SystemTimer).filter(
SystemTimer.status.in_([TimerStatus.RUNNING.value, TimerStatus.PAUSED.value])
).all()
# Update lokale Timer-Cache
current_timer_names = set(self._timers.keys())
db_timer_names = {timer.name for timer in db_timers}
# Entferne Timer die nicht mehr in der DB sind
for name in current_timer_names - db_timer_names:
if name in self._timers:
del self._timers[name]
logger.debug(f"Timer '{name}' aus lokalem Cache entfernt")
# Aktualisiere/füge Timer hinzu
for timer in db_timers:
self._timers[timer.name] = timer
# Callback-Funktionen aufrufen wenn verfügbar
if timer.name in self._timer_callbacks:
for callback in self._timer_callbacks[timer.name]:
try:
callback(timer)
except Exception as e:
logger.error(f"Fehler in Timer-Callback für '{timer.name}': {str(e)}")
except Exception as e:
logger.error(f"Fehler beim Update der Timer: {str(e)}")
def _process_expired_timers(self):
"""Verarbeitet abgelaufene Timer und führt Force-Quit-Aktionen aus"""
try:
expired_timers = SystemTimer.get_expired_timers()
for timer in expired_timers:
try:
logger.warning(f"Timer '{timer.name}' ist abgelaufen - führe Force-Quit aus")
# Force-Quit-Aktion ausführen
success = self._execute_force_quit(timer)
if success:
# Timer als abgelaufen markieren
with get_cached_session() as session:
db_timer = session.query(SystemTimer).filter(
SystemTimer.id == timer.id
).first()
if db_timer:
db_timer.status = TimerStatus.EXPIRED.value
db_timer.updated_at = datetime.now()
session.commit()
except Exception as e:
logger.error(f"Fehler beim Verarbeiten des abgelaufenen Timers '{timer.name}': {str(e)}")
except Exception as e:
logger.error(f"Fehler beim Verarbeiten abgelaufener Timer: {str(e)}")
def _execute_force_quit(self, timer: SystemTimer) -> bool:
"""Führt die Force-Quit-Aktion für einen Timer aus"""
try:
action = timer.force_quit_action
# Custom-Endpoint prüfen
if action == ForceQuitAction.CUSTOM.value and timer.custom_action_endpoint:
return self._execute_custom_action(timer)
# Standard-Handler verwenden
if action in self._force_quit_handlers:
handler = self._force_quit_handlers[action]
return handler(timer)
logger.warning(f"Unbekannte Force-Quit-Aktion: {action}")
return False
except Exception as e:
logger.error(f"Fehler beim Ausführen der Force-Quit-Aktion für Timer '{timer.name}': {str(e)}")
return False
def _execute_custom_action(self, timer: SystemTimer) -> bool:
"""Führt eine benutzerdefinierte Force-Quit-Aktion aus"""
try:
# Hier würde ein HTTP-Request an den Custom-Endpoint gemacht werden
# Das wird über die Flask-App-Routen implementiert
logger.info(f"Custom-Action für Timer '{timer.name}': {timer.custom_action_endpoint}")
return True
except Exception as e:
logger.error(f"Fehler bei Custom-Action für Timer '{timer.name}': {str(e)}")
return False
def create_timer(self, name: str, timer_type: TimerType, duration_seconds: int,
force_quit_action: ForceQuitAction = ForceQuitAction.LOGOUT,
auto_start: bool = False, **kwargs) -> Optional[SystemTimer]:
"""
Erstellt einen neuen Timer.
Args:
name: Eindeutiger Name des Timers
timer_type: Typ des Timers
duration_seconds: Dauer in Sekunden
force_quit_action: Aktion bei Force-Quit
auto_start: Automatisch starten
**kwargs: Zusätzliche Timer-Konfiguration
Returns:
SystemTimer-Instanz oder None bei Fehler
"""
try:
with get_cached_session() as session:
# Prüfe ob Timer bereits existiert
existing = session.query(SystemTimer).filter(
SystemTimer.name == name
).first()
if existing:
logger.warning(f"Timer '{name}' existiert bereits")
return existing
# Neuen Timer erstellen
timer = SystemTimer(
name=name,
timer_type=timer_type.value,
duration_seconds=duration_seconds,
remaining_seconds=duration_seconds,
target_timestamp=datetime.now() + timedelta(seconds=duration_seconds),
force_quit_action=force_quit_action.value,
auto_start=auto_start,
**kwargs
)
session.add(timer)
session.commit()
# Zu lokalem Cache hinzufügen
self._timers[name] = timer
if auto_start:
timer.start_timer()
logger.info(f"Timer '{name}' erstellt - Typ: {timer_type.value}, Dauer: {duration_seconds}s")
return timer
except Exception as e:
logger.error(f"Fehler beim Erstellen des Timers '{name}': {str(e)}")
return None
def get_timer(self, name: str) -> Optional[SystemTimer]:
"""
Holt einen Timer anhand des Namens.
Args:
name: Name des Timers
Returns:
SystemTimer-Instanz oder None
"""
try:
# Erst aus lokalem Cache prüfen
if name in self._timers:
return self._timers[name]
# Aus Datenbank laden
timer = SystemTimer.get_by_name(name)
if timer:
self._timers[name] = timer
return timer
except Exception as e:
logger.error(f"Fehler beim Laden des Timers '{name}': {str(e)}")
return None
def start_timer(self, name: str) -> bool:
"""Startet einen Timer"""
try:
timer = self.get_timer(name)
if not timer:
logger.error(f"Timer '{name}' nicht gefunden")
return False
success = timer.start_timer()
if success:
with get_cached_session() as session:
# Timer in Datenbank aktualisieren
db_timer = session.merge(timer)
session.commit()
logger.info(f"Timer '{name}' gestartet")
return success
except Exception as e:
logger.error(f"Fehler beim Starten des Timers '{name}': {str(e)}")
return False
def pause_timer(self, name: str) -> bool:
"""Pausiert einen Timer"""
try:
timer = self.get_timer(name)
if not timer:
logger.error(f"Timer '{name}' nicht gefunden")
return False
success = timer.pause_timer()
if success:
with get_cached_session() as session:
db_timer = session.merge(timer)
session.commit()
logger.info(f"Timer '{name}' pausiert")
return success
except Exception as e:
logger.error(f"Fehler beim Pausieren des Timers '{name}': {str(e)}")
return False
def stop_timer(self, name: str) -> bool:
"""Stoppt einen Timer"""
try:
timer = self.get_timer(name)
if not timer:
logger.error(f"Timer '{name}' nicht gefunden")
return False
success = timer.stop_timer()
if success:
with get_cached_session() as session:
db_timer = session.merge(timer)
session.commit()
logger.info(f"Timer '{name}' gestoppt")
return success
except Exception as e:
logger.error(f"Fehler beim Stoppen des Timers '{name}': {str(e)}")
return False
def reset_timer(self, name: str) -> bool:
"""Setzt einen Timer zurück"""
try:
timer = self.get_timer(name)
if not timer:
logger.error(f"Timer '{name}' nicht gefunden")
return False
success = timer.reset_timer()
if success:
with get_cached_session() as session:
db_timer = session.merge(timer)
session.commit()
logger.info(f"Timer '{name}' zurückgesetzt")
return success
except Exception as e:
logger.error(f"Fehler beim Zurücksetzen des Timers '{name}': {str(e)}")
return False
def extend_timer(self, name: str, additional_seconds: int) -> bool:
"""Verlängert einen Timer"""
try:
timer = self.get_timer(name)
if not timer:
logger.error(f"Timer '{name}' nicht gefunden")
return False
success = timer.extend_timer(additional_seconds)
if success:
with get_cached_session() as session:
db_timer = session.merge(timer)
session.commit()
logger.info(f"Timer '{name}' um {additional_seconds} Sekunden verlängert")
return success
except Exception as e:
logger.error(f"Fehler beim Verlängern des Timers '{name}': {str(e)}")
return False
def delete_timer(self, name: str) -> bool:
"""Löscht einen Timer"""
try:
with get_cached_session() as session:
timer = session.query(SystemTimer).filter(
SystemTimer.name == name
).first()
if not timer:
logger.error(f"Timer '{name}' nicht gefunden")
return False
session.delete(timer)
session.commit()
# Aus lokalem Cache entfernen
if name in self._timers:
del self._timers[name]
# Callbacks entfernen
if name in self._timer_callbacks:
del self._timer_callbacks[name]
logger.info(f"Timer '{name}' gelöscht")
return True
except Exception as e:
logger.error(f"Fehler beim Löschen des Timers '{name}': {str(e)}")
return False
def register_callback(self, timer_name: str, callback: Callable[[SystemTimer], None]):
"""
Registriert eine Callback-Funktion für Timer-Updates.
Args:
timer_name: Name des Timers
callback: Callback-Funktion die bei Updates aufgerufen wird
"""
if timer_name not in self._timer_callbacks:
self._timer_callbacks[timer_name] = []
self._timer_callbacks[timer_name].append(callback)
logger.debug(f"Callback für Timer '{timer_name}' registriert")
def register_force_quit_handler(self, action: str, handler: Callable[[SystemTimer], bool]):
"""
Registriert einen benutzerdefinierten Force-Quit-Handler.
Args:
action: Name der Aktion
handler: Handler-Funktion
"""
self._force_quit_handlers[action] = handler
logger.debug(f"Force-Quit-Handler für Aktion '{action}' registriert")
def get_all_timers(self) -> List[SystemTimer]:
"""Gibt alle Timer zurück"""
try:
with get_cached_session() as session:
timers = session.query(SystemTimer).all()
return timers
except Exception as e:
logger.error(f"Fehler beim Laden aller Timer: {str(e)}")
return []
def get_timers_by_type(self, timer_type: TimerType) -> List[SystemTimer]:
"""Gibt alle Timer eines bestimmten Typs zurück"""
try:
return SystemTimer.get_by_type(timer_type.value)
except Exception as e:
logger.error(f"Fehler beim Laden der Timer vom Typ '{timer_type.value}': {str(e)}")
return []
def get_running_timers(self) -> List[SystemTimer]:
"""Gibt alle laufenden Timer zurück"""
try:
return SystemTimer.get_running_timers()
except Exception as e:
logger.error(f"Fehler beim Laden der laufenden Timer: {str(e)}")
return []
def create_kiosk_timer(self, duration_minutes: int = 30, auto_start: bool = True) -> Optional[SystemTimer]:
"""
Erstellt einen Standard-Kiosk-Timer.
Args:
duration_minutes: Timer-Dauer in Minuten
auto_start: Automatisch starten
Returns:
SystemTimer-Instanz oder None
"""
return self.create_timer(
name="kiosk_session",
timer_type=TimerType.KIOSK,
duration_seconds=duration_minutes * 60,
force_quit_action=ForceQuitAction.LOGOUT,
auto_start=auto_start,
force_quit_warning_seconds=30,
show_warning=True,
warning_message="Kiosk-Session läuft ab. Bitte speichern Sie Ihre Arbeit."
)
def create_session_timer(self, user_id: int, duration_minutes: int = 120,
auto_start: bool = True) -> Optional[SystemTimer]:
"""
Erstellt einen Session-Timer für einen Benutzer.
Args:
user_id: Benutzer-ID
duration_minutes: Timer-Dauer in Minuten
auto_start: Automatisch starten
Returns:
SystemTimer-Instanz oder None
"""
return self.create_timer(
name=f"session_{user_id}",
timer_type=TimerType.SESSION,
duration_seconds=duration_minutes * 60,
force_quit_action=ForceQuitAction.LOGOUT,
auto_start=auto_start,
context_id=user_id,
force_quit_warning_seconds=60,
show_warning=True,
warning_message="Ihre Session läuft ab. Aktivität erforderlich."
)
def update_session_activity(self, user_id: int) -> bool:
"""
Aktualisiert die Aktivität eines Session-Timers.
Args:
user_id: Benutzer-ID
Returns:
True wenn erfolgreich
"""
try:
timer = self.get_timer(f"session_{user_id}")
if timer and timer.timer_type == TimerType.SESSION.value:
success = timer.update_activity()
if success:
with get_cached_session() as session:
db_timer = session.merge(timer)
session.commit()
return success
return False
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der Session-Aktivität für User {user_id}: {str(e)}")
return False
def shutdown(self):
"""Beendet den Timer-Manager sauber"""
logger.info("Timer-Manager wird heruntergefahren...")
self._shutdown_flag.set()
if self._background_thread and self._background_thread.is_alive():
self._background_thread.join(timeout=5)
self._timers.clear()
self._timer_callbacks.clear()
logger.info("Timer-Manager heruntergefahren")
# Globale Timer-Manager-Instanz
_timer_manager: Optional[TimerManager] = None
def get_timer_manager() -> TimerManager:
"""
Gibt die globale Timer-Manager-Instanz zurück.
Thread-sicher mit Lazy Loading.
"""
global _timer_manager
if _timer_manager is None:
_timer_manager = TimerManager()
return _timer_manager
def init_timer_manager() -> TimerManager:
"""
Initialisiert den Timer-Manager explizit.
Sollte beim App-Start aufgerufen werden.
"""
return get_timer_manager()
def shutdown_timer_manager():
"""
Beendet den Timer-Manager sauber.
Sollte beim App-Shutdown aufgerufen werden.
"""
global _timer_manager
if _timer_manager:
_timer_manager.shutdown()
_timer_manager = None
# Convenience-Funktionen für häufige Timer-Operationen
def create_kiosk_timer(duration_minutes: int = 30, auto_start: bool = True) -> Optional[SystemTimer]:
"""Erstellt einen Kiosk-Timer"""
return get_timer_manager().create_kiosk_timer(duration_minutes, auto_start)
def create_session_timer(user_id: int, duration_minutes: int = 120) -> Optional[SystemTimer]:
"""Erstellt einen Session-Timer"""
return get_timer_manager().create_session_timer(user_id, duration_minutes)
def start_timer(name: str) -> bool:
"""Startet einen Timer"""
return get_timer_manager().start_timer(name)
def pause_timer(name: str) -> bool:
"""Pausiert einen Timer"""
return get_timer_manager().pause_timer(name)
def stop_timer(name: str) -> bool:
"""Stoppt einen Timer"""
return get_timer_manager().stop_timer(name)
def reset_timer(name: str) -> bool:
"""Setzt einen Timer zurück"""
return get_timer_manager().reset_timer(name)
def extend_timer(name: str, additional_seconds: int) -> bool:
"""Verlängert einen Timer"""
return get_timer_manager().extend_timer(name, additional_seconds)
def get_timer_status(name: str) -> Optional[Dict[str, Any]]:
"""Gibt den Status eines Timers zurück"""
timer = get_timer_manager().get_timer(name)
return timer.to_dict() if timer else None
def update_session_activity(user_id: int) -> bool:
"""Aktualisiert Session-Aktivität"""
return get_timer_manager().update_session_activity(user_id)