import os import sys import logging import atexit from datetime import datetime, timedelta from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file, abort, session, make_response, Response, current_app from flask_login import LoginManager, login_user, logout_user, login_required, current_user from flask_wtf import CSRFProtect from flask_wtf.csrf import CSRFError from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from sqlalchemy.orm import sessionmaker, joinedload from sqlalchemy import func, text from functools import wraps, lru_cache from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple, Optional import time import subprocess import json import signal import shutil from contextlib import contextmanager import threading # ===== OPTIMIERTE KONFIGURATION FÜR RASPBERRY PI ===== class OptimizedConfig: """Configuration for performance-optimized deployment on Raspberry Pi""" # Performance optimization flags OPTIMIZED_MODE = True USE_MINIFIED_ASSETS = True DISABLE_ANIMATIONS = True LIMIT_GLASSMORPHISM = True # Flask performance settings DEBUG = False TESTING = False SEND_FILE_MAX_AGE_DEFAULT = 31536000 # 1 year cache for static files # Template settings TEMPLATES_AUTO_RELOAD = False EXPLAIN_TEMPLATE_LOADING = False # Session configuration SESSION_COOKIE_SECURE = True SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SAMESITE = 'Lax' # Performance optimizations MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max upload JSON_SORT_KEYS = False JSONIFY_PRETTYPRINT_REGULAR = False # Database optimizations SQLALCHEMY_ECHO = False SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_ENGINE_OPTIONS = { 'pool_size': 5, 'pool_recycle': 3600, 'pool_pre_ping': True, 'connect_args': { 'check_same_thread': False } } # Cache configuration CACHE_TYPE = 'simple' CACHE_DEFAULT_TIMEOUT = 300 CACHE_KEY_PREFIX = 'myp_' # Static file caching headers SEND_FILE_MAX_AGE_DEFAULT = 31536000 # 1 year @staticmethod def init_app(app): """Initialize application with optimized settings""" # Set optimized template app.jinja_env.globals['optimized_mode'] = True app.jinja_env.globals['base_template'] = 'base-optimized.html' # Add cache headers for static files @app.after_request def add_cache_headers(response): if 'static' in response.headers.get('Location', ''): response.headers['Cache-Control'] = 'public, max-age=31536000' response.headers['Vary'] = 'Accept-Encoding' return response # Disable unnecessary features app.config['EXPLAIN_TEMPLATE_LOADING'] = False app.config['TEMPLATES_AUTO_RELOAD'] = False print("🚀 Running in OPTIMIZED mode for Raspberry Pi") def detect_raspberry_pi(): """Erkennt ob das System auf einem Raspberry Pi läuft""" try: # Prüfe auf Raspberry Pi Hardware with open('/proc/cpuinfo', 'r') as f: cpuinfo = f.read() if 'Raspberry Pi' in cpuinfo or 'BCM' in cpuinfo: return True except: pass try: # Prüfe auf ARM-Architektur import platform machine = platform.machine().lower() if 'arm' in machine or 'aarch64' in machine: return True except: pass # Umgebungsvariable für manuelle Aktivierung return os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes'] def should_use_optimized_config(): """Bestimmt ob die optimierte Konfiguration verwendet werden soll""" # Kommandozeilen-Argument prüfen if '--optimized' in sys.argv: return True # Raspberry Pi-Erkennung if detect_raspberry_pi(): return True # Umgebungsvariable if os.getenv('USE_OPTIMIZED_CONFIG', '').lower() in ['true', '1', 'yes']: return True # Schwache Hardware-Erkennung (weniger als 2GB RAM) try: import psutil memory_gb = psutil.virtual_memory().total / (1024**3) if memory_gb < 2.0: return True except: pass return False # Windows-spezifische Fixes früh importieren (sichere Version) if os.name == 'nt': try: from utils.windows_fixes import get_windows_thread_manager # apply_all_windows_fixes() wird automatisch beim Import ausgeführt print("✅ Windows-Fixes (sichere Version) geladen") except ImportError as e: # Fallback falls windows_fixes nicht verfügbar get_windows_thread_manager = None print(f"⚠️ Windows-Fixes nicht verfügbar: {str(e)}") else: get_windows_thread_manager = None # Lokale Imports from models import init_database, create_initial_admin, User, Printer, Job, Stats, SystemLog, get_db_session, GuestRequest, UserPermission, Notification, JobOrder, Base, get_engine, PlugStatusLog from utils.logging_config import setup_logging, get_logger, measure_execution_time, log_startup_info, debug_request, debug_response from utils.job_scheduler import JobScheduler, get_job_scheduler from utils.queue_manager import start_queue_manager, stop_queue_manager, get_queue_manager from config.settings import SECRET_KEY, UPLOAD_FOLDER, ALLOWED_EXTENSIONS, ENVIRONMENT, SESSION_LIFETIME, SCHEDULER_ENABLED, SCHEDULER_INTERVAL, TAPO_USERNAME, TAPO_PASSWORD from utils.file_manager import file_manager, save_job_file, save_guest_file, save_avatar_file, save_asset_file, save_log_file, save_backup_file, save_temp_file, delete_file as delete_file_safe # ===== OFFLINE-MODUS KONFIGURATION ===== # System läuft im Offline-Modus ohne Internetverbindung OFFLINE_MODE = True # Produktionseinstellung für Offline-Betrieb # ===== BEDINGTE IMPORTS FÜR OFFLINE-MODUS ===== if not OFFLINE_MODE: # Nur laden wenn Online-Modus import requests else: # Offline-Mock für requests class OfflineRequestsMock: """Mock-Klasse für requests im Offline-Modus""" @staticmethod def get(*args, **kwargs): raise ConnectionError("System läuft im Offline-Modus - keine Internet-Verbindung verfügbar") @staticmethod def post(*args, **kwargs): raise ConnectionError("System läuft im Offline-Modus - keine Internet-Verbindung verfügbar") requests = OfflineRequestsMock() # Datenbank-Engine für Kompatibilität mit init_simple_db.py from models import engine as db_engine # Blueprints importieren from blueprints.guest import guest_blueprint from blueprints.calendar import calendar_blueprint from blueprints.users import users_blueprint from blueprints.printers import printers_blueprint from blueprints.jobs import jobs_blueprint # Scheduler importieren falls verfügbar try: from utils.job_scheduler import scheduler except ImportError: scheduler = None # SSL-Kontext importieren falls verfügbar try: from utils.ssl_config import get_ssl_context except ImportError: def get_ssl_context(): return None # Template-Helfer importieren falls verfügbar try: from utils.template_helpers import register_template_helpers except ImportError: def register_template_helpers(app): pass # Datenbank-Monitor und Backup-Manager importieren falls verfügbar try: from utils.database_utils import DatabaseMonitor database_monitor = DatabaseMonitor() except ImportError: database_monitor = None try: from utils.backup_manager import BackupManager backup_manager = BackupManager() except ImportError: backup_manager = None # Import neuer Systeme from utils.rate_limiter import limit_requests, rate_limiter, cleanup_rate_limiter from utils.security import init_security, require_secure_headers, security_check from utils.permissions import init_permission_helpers, require_permission, Permission, check_permission from utils.analytics import analytics_engine, track_event, get_dashboard_stats # Import der neuen System-Module from utils.form_validation import ( FormValidator, ValidationError, ValidationResult, get_user_registration_validator, get_job_creation_validator, get_printer_creation_validator, get_guest_request_validator, validate_form, get_client_validation_js ) from utils.report_generator import ( ReportFactory, ReportConfig, JobReportBuilder, UserReportBuilder, PrinterReportBuilder, generate_comprehensive_report ) from utils.realtime_dashboard import ( DashboardManager, EventType, DashboardEvent, emit_job_event, emit_printer_event, emit_system_alert, get_dashboard_client_js ) from utils.drag_drop_system import ( drag_drop_manager, DragDropConfig, validate_file_upload, get_drag_drop_javascript, get_drag_drop_css ) from utils.advanced_tables import ( AdvancedTableQuery, TableDataProcessor, ColumnConfig, create_table_config, get_advanced_tables_js, get_advanced_tables_css ) from utils.maintenance_system import ( MaintenanceManager, MaintenanceType, MaintenanceStatus, create_maintenance_task, schedule_maintenance, get_maintenance_overview, update_maintenance_status ) from utils.multi_location_system import ( LocationManager, LocationType, AccessLevel, create_location, assign_user_to_location, get_user_locations, calculate_distance, find_nearest_location ) # Drucker-Monitor importieren from utils.printer_monitor import printer_monitor # Logging initialisieren (früh, damit andere Module es verwenden können) setup_logging() log_startup_info() # app_logger für verschiedene Komponenten (früh definieren) app_logger = get_logger("app") auth_logger = get_logger("auth") jobs_logger = get_logger("jobs") printers_logger = get_logger("printers") user_logger = get_logger("user") kiosk_logger = get_logger("kiosk") # Timeout Force-Quit Manager importieren (nach Logger-Definition) try: from utils.timeout_force_quit_manager import ( get_timeout_manager, start_force_quit_timeout, cancel_force_quit_timeout, extend_force_quit_timeout, get_force_quit_status, register_shutdown_callback, timeout_context ) TIMEOUT_FORCE_QUIT_AVAILABLE = True app_logger.info("✅ Timeout Force-Quit Manager geladen") except ImportError as e: TIMEOUT_FORCE_QUIT_AVAILABLE = False app_logger.warning(f"⚠️ Timeout Force-Quit Manager nicht verfügbar: {e}") # ===== PERFORMANCE-OPTIMIERTE CACHES ===== # Thread-sichere Caches für häufig abgerufene Daten _user_cache = {} _user_cache_lock = threading.RLock() _printer_status_cache = {} _printer_status_cache_lock = threading.RLock() _printer_status_cache_ttl = {} # Cache-Konfiguration USER_CACHE_TTL = 300 # 5 Minuten PRINTER_STATUS_CACHE_TTL = 30 # 30 Sekunden def clear_user_cache(user_id: Optional[int] = None): """Löscht User-Cache (komplett oder für spezifischen User)""" with _user_cache_lock: if user_id: _user_cache.pop(user_id, None) else: _user_cache.clear() def clear_printer_status_cache(): """Löscht Drucker-Status-Cache""" with _printer_status_cache_lock: _printer_status_cache.clear() _printer_status_cache_ttl.clear() # ===== AGGRESSIVE SOFORT-SHUTDOWN HANDLER FÜR STRG+C ===== def aggressive_shutdown_handler(sig, frame): """ Aggressiver Signal-Handler für sofortiges Herunterfahren bei Strg+C. Schließt sofort alle Datenbankverbindungen und beendet das Programm um jeden Preis. """ print("\n🚨 STRG+C ERKANNT - SOFORTIGES SHUTDOWN!") print("🔥 Schließe Datenbank sofort und beende Programm um jeden Preis!") try: # 1. Caches leeren clear_user_cache() clear_printer_status_cache() # 2. Sofort alle Datenbank-Sessions und Engine schließen try: from models import _engine, _scoped_session, _session_factory if _scoped_session: try: _scoped_session.remove() print("✅ Scoped Sessions geschlossen") except Exception as e: print(f"⚠️ Fehler beim Schließen der Scoped Sessions: {e}") if _engine: try: _engine.dispose() print("✅ Datenbank-Engine geschlossen") except Exception as e: print(f"⚠️ Fehler beim Schließen der Engine: {e}") except ImportError: print("⚠️ Models nicht verfügbar für Database-Cleanup") # 3. Alle offenen DB-Sessions forciert schließen try: import gc # Garbage Collection für nicht geschlossene Sessions gc.collect() print("✅ Garbage Collection ausgeführt") except Exception as e: print(f"⚠️ Garbage Collection fehlgeschlagen: {e}") # 4. SQLite WAL-Dateien forciert synchronisieren try: import sqlite3 from config.settings import DATABASE_PATH conn = sqlite3.connect(DATABASE_PATH, timeout=1.0) conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") conn.close() print("✅ SQLite WAL-Checkpoint ausgeführt") except Exception as e: print(f"⚠️ WAL-Checkpoint fehlgeschlagen: {e}") # 5. Queue Manager stoppen falls verfügbar try: from utils.queue_manager import stop_queue_manager stop_queue_manager() print("✅ Queue Manager gestoppt") except Exception as e: print(f"⚠️ Queue Manager Stop fehlgeschlagen: {e}") except Exception as e: print(f"❌ Fehler beim Database-Cleanup: {e}") print("🛑 SOFORTIGES PROGRAMM-ENDE - EXIT CODE 0") # Sofortiger Exit ohne weitere Cleanup-Routinen os._exit(0) def register_aggressive_shutdown(): """ Registriert den aggressiven Shutdown-Handler für alle relevanten Signale. Muss VOR allen anderen Signal-Handlern registriert werden. """ # Signal-Handler für alle Plattformen registrieren signal.signal(signal.SIGINT, aggressive_shutdown_handler) # Strg+C signal.signal(signal.SIGTERM, aggressive_shutdown_handler) # Terminate Signal # Windows-spezifische Signale if os.name == 'nt': try: signal.signal(signal.SIGBREAK, aggressive_shutdown_handler) # Strg+Break print("✅ Windows SIGBREAK Handler registriert") except AttributeError: pass # SIGBREAK nicht auf allen Windows-Versionen verfügbar else: # Unix/Linux-spezifische Signale try: signal.signal(signal.SIGHUP, aggressive_shutdown_handler) # Hangup Signal print("✅ Unix SIGHUP Handler registriert") except AttributeError: pass # Atexit-Handler als Backup registrieren atexit.register(lambda: print("🔄 Atexit-Handler ausgeführt - Programm beendet")) print("🚨 AGGRESSIVER STRG+C SHUTDOWN-HANDLER AKTIVIERT") print("📋 Bei Strg+C wird die Datenbank sofort geschlossen und das Programm beendet!") # Aggressive Shutdown-Handler sofort registrieren register_aggressive_shutdown() # ===== ENDE AGGRESSIVE SHUTDOWN HANDLER ===== # Flask-App initialisieren app = Flask(__name__) app.secret_key = SECRET_KEY # ===== OPTIMIERTE KONFIGURATION ANWENDEN ===== # Prüfe ob optimierte Konfiguration verwendet werden soll USE_OPTIMIZED_CONFIG = should_use_optimized_config() if USE_OPTIMIZED_CONFIG: app_logger.info("🚀 Aktiviere optimierte Konfiguration für schwache Hardware/Raspberry Pi") # Optimierte Flask-Konfiguration anwenden app.config.update({ "DEBUG": OptimizedConfig.DEBUG, "TESTING": OptimizedConfig.TESTING, "SEND_FILE_MAX_AGE_DEFAULT": OptimizedConfig.SEND_FILE_MAX_AGE_DEFAULT, "TEMPLATES_AUTO_RELOAD": OptimizedConfig.TEMPLATES_AUTO_RELOAD, "EXPLAIN_TEMPLATE_LOADING": OptimizedConfig.EXPLAIN_TEMPLATE_LOADING, "SESSION_COOKIE_SECURE": OptimizedConfig.SESSION_COOKIE_SECURE, "SESSION_COOKIE_HTTPONLY": OptimizedConfig.SESSION_COOKIE_HTTPONLY, "SESSION_COOKIE_SAMESITE": OptimizedConfig.SESSION_COOKIE_SAMESITE, "MAX_CONTENT_LENGTH": OptimizedConfig.MAX_CONTENT_LENGTH, "JSON_SORT_KEYS": OptimizedConfig.JSON_SORT_KEYS, "JSONIFY_PRETTYPRINT_REGULAR": OptimizedConfig.JSONIFY_PRETTYPRINT_REGULAR, "SQLALCHEMY_ECHO": OptimizedConfig.SQLALCHEMY_ECHO, "SQLALCHEMY_TRACK_MODIFICATIONS": OptimizedConfig.SQLALCHEMY_TRACK_MODIFICATIONS, "SQLALCHEMY_ENGINE_OPTIONS": OptimizedConfig.SQLALCHEMY_ENGINE_OPTIONS }) # Session-Konfiguration app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME app.config["WTF_CSRF_ENABLED"] = True # Jinja2-Globals für optimierte Templates app.jinja_env.globals.update({ 'optimized_mode': True, 'use_minified_assets': OptimizedConfig.USE_MINIFIED_ASSETS, 'disable_animations': OptimizedConfig.DISABLE_ANIMATIONS, 'limit_glassmorphism': OptimizedConfig.LIMIT_GLASSMORPHISM, 'base_template': 'base-optimized.html' }) # Optimierte After-Request-Handler @app.after_request def add_optimized_cache_headers(response): """Fügt optimierte Cache-Header für statische Dateien hinzu""" if request.endpoint == 'static' or '/static/' in request.path: response.headers['Cache-Control'] = 'public, max-age=31536000' response.headers['Vary'] = 'Accept-Encoding' # Preload-Header für kritische Assets if request.path.endswith(('.css', '.js')): response.headers['X-Optimized-Asset'] = 'true' return response app_logger.info("✅ Optimierte Konfiguration aktiviert") else: # Standard-Konfiguration app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["WTF_CSRF_ENABLED"] = True # Standard Jinja2-Globals app.jinja_env.globals.update({ 'optimized_mode': False, 'use_minified_assets': False, 'disable_animations': False, 'limit_glassmorphism': False, 'base_template': 'base.html' }) app_logger.info("📋 Standard-Konfiguration verwendet") # Globale db-Variable für Kompatibilität mit init_simple_db.py db = db_engine # System-Manager initialisieren dashboard_manager = DashboardManager() maintenance_manager = MaintenanceManager() location_manager = LocationManager() # SocketIO für Realtime Dashboard initialisieren socketio = dashboard_manager.init_socketio(app, cors_allowed_origins="*") # CSRF-Schutz initialisieren csrf = CSRFProtect(app) # Security-System initialisieren app = init_security(app) # Permission Template Helpers registrieren init_permission_helpers(app) # Template-Helper registrieren register_template_helpers(app) # CSRF-Error-Handler - Korrigierte Version für Flask-WTF 1.2.1+ @app.errorhandler(CSRFError) def csrf_error(error): """Behandelt CSRF-Fehler und gibt detaillierte Informationen zurück.""" app_logger.error(f"CSRF-Fehler für {request.path}: {error}") if request.path.startswith('/api/'): # Für API-Anfragen: JSON-Response return jsonify({ "error": "CSRF-Token fehlt oder ungültig", "reason": str(error), "help": "Fügen Sie ein gültiges CSRF-Token zu Ihrer Anfrage hinzu" }), 400 else: # Für normale Anfragen: Weiterleitung zur Fehlerseite flash("Sicherheitsfehler: Anfrage wurde abgelehnt. Bitte versuchen Sie es erneut.", "error") return redirect(request.url) # Blueprints registrieren app.register_blueprint(guest_blueprint) app.register_blueprint(calendar_blueprint) app.register_blueprint(users_blueprint) app.register_blueprint(printers_blueprint) app.register_blueprint(jobs_blueprint) # Login-Manager initialisieren login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen." login_manager.login_message_category = "info" @login_manager.user_loader def load_user(user_id): """ Performance-optimierter User-Loader mit Caching und robustem Error-Handling. """ try: # user_id von Flask-Login ist immer ein String - zu Integer konvertieren try: user_id_int = int(user_id) except (ValueError, TypeError): app_logger.error(f"Ungültige User-ID: {user_id}") return None # Cache-Check mit TTL current_time = time.time() with _user_cache_lock: if user_id_int in _user_cache: cached_user, cache_time = _user_cache[user_id_int] if current_time - cache_time < USER_CACHE_TTL: return cached_user else: # Cache abgelaufen - entfernen del _user_cache[user_id_int] # Versuche Benutzer über robustes Caching-System zu laden try: from models import User cached_user = User.get_by_id_cached(user_id_int) if cached_user: # In lokalen Cache speichern with _user_cache_lock: _user_cache[user_id_int] = (cached_user, current_time) return cached_user except Exception as cache_error: app_logger.debug(f"Cache-Abfrage fehlgeschlagen: {str(cache_error)}") db_session = get_db_session() # Primäre Abfrage mit SQLAlchemy ORM try: user = db_session.query(User).filter(User.id == user_id_int).first() if user: # In Cache speichern with _user_cache_lock: _user_cache[user_id_int] = (user, current_time) db_session.close() return user except Exception as orm_error: # SQLAlchemy ORM-Fehler - versuche Core-Query app_logger.warning(f"ORM-Abfrage fehlgeschlagen für User-ID {user_id_int}: {str(orm_error)}") try: # Verwende SQLAlchemy Core für robuste Abfrage from sqlalchemy import text # Sichere Parameter-Bindung mit expliziter Typisierung stmt = text(""" SELECT id, email, username, password_hash, name, role, active, created_at, last_login, updated_at, settings, department, position, phone, bio, last_activity FROM users WHERE id = :user_id """) result = db_session.execute(stmt, {"user_id": user_id_int}).fetchone() if result: # User-Objekt manuell erstellen mit robusten Defaults user = User() # Sichere Feld-Zuordnung mit Fallbacks user.id = int(result[0]) if result[0] is not None else user_id_int user.email = str(result[1]) if result[1] else f"user_{user_id_int}@system.local" user.username = str(result[2]) if result[2] else f"user_{user_id_int}" user.password_hash = str(result[3]) if result[3] else "" user.name = str(result[4]) if result[4] else f"User {user_id_int}" user.role = str(result[5]) if result[5] else "user" user.active = bool(result[6]) if result[6] is not None else True # Datetime-Felder mit robuster Behandlung try: user.created_at = result[7] if result[7] else datetime.now() user.last_login = result[8] if result[8] else None user.updated_at = result[9] if result[9] else datetime.now() user.last_activity = result[15] if len(result) > 15 and result[15] else datetime.now() except (IndexError, TypeError, ValueError): user.created_at = datetime.now() user.last_login = None user.updated_at = datetime.now() user.last_activity = datetime.now() # Optional-Felder try: user.settings = result[10] if len(result) > 10 else None user.department = result[11] if len(result) > 11 else None user.position = result[12] if len(result) > 12 else None user.phone = result[13] if len(result) > 13 else None user.bio = result[14] if len(result) > 14 else None except (IndexError, TypeError): user.settings = None user.department = None user.position = None user.phone = None user.bio = None # In Cache speichern with _user_cache_lock: _user_cache[user_id_int] = (user, current_time) app_logger.info(f"User {user_id_int} erfolgreich über Core-Query geladen") db_session.close() return user except Exception as core_error: app_logger.error(f"Auch Core-Query fehlgeschlagen für User-ID {user_id_int}: {str(core_error)}") # Letzter Fallback: Minimale Existenz-Prüfung und Notfall-User try: exists_stmt = text("SELECT COUNT(*) FROM users WHERE id = :user_id") exists_result = db_session.execute(exists_stmt, {"user_id": user_id_int}).fetchone() if exists_result and exists_result[0] > 0: # User existiert - erstelle Notfall-Objekt user = User() user.id = user_id_int user.email = f"recovery_user_{user_id_int}@system.local" user.username = f"recovery_user_{user_id_int}" user.password_hash = "" user.name = f"Recovery User {user_id_int}" user.role = "user" user.active = True user.created_at = datetime.now() user.last_login = None user.updated_at = datetime.now() user.last_activity = datetime.now() # In Cache speichern with _user_cache_lock: _user_cache[user_id_int] = (user, current_time) app_logger.warning(f"Notfall-User-Objekt für ID {user_id_int} erstellt (DB korrupt)") db_session.close() return user except Exception as fallback_error: app_logger.error(f"Auch Fallback-User-Erstellung fehlgeschlagen: {str(fallback_error)}") db_session.close() return None except Exception as e: app_logger.error(f"Kritischer Fehler im User-Loader für ID {user_id}: {str(e)}") # Session sicher schließen falls noch offen try: if 'db_session' in locals(): db_session.close() except: pass return None # Jinja2 Context Processors @app.context_processor def inject_now(): """Inject the current datetime into templates.""" return {'now': datetime.now()} # Custom Jinja2 filter für Datumsformatierung @app.template_filter('format_datetime') def format_datetime_filter(value, format='%d.%m.%Y %H:%M'): """Format a datetime object to a German-style date and time string""" if value is None: return "" if isinstance(value, str): try: value = datetime.fromisoformat(value) except ValueError: return value return value.strftime(format) # Template-Helper für Optimierungsstatus @app.template_global() def is_optimized_mode(): """Prüft ob die Anwendung im optimierten Modus läuft""" return USE_OPTIMIZED_CONFIG @app.template_global() def get_optimization_info(): """Gibt Optimierungsinformationen für Templates zurück""" return { 'active': USE_OPTIMIZED_CONFIG, 'raspberry_pi': detect_raspberry_pi(), 'minified_assets': app.jinja_env.globals.get('use_minified_assets', False), 'disabled_animations': app.jinja_env.globals.get('disable_animations', False), 'limited_glassmorphism': app.jinja_env.globals.get('limit_glassmorphism', False) } # HTTP-Request/Response-Middleware für automatisches Debug-Logging @app.before_request def log_request_info(): """Loggt detaillierte Informationen über eingehende HTTP-Anfragen.""" # Nur für API-Endpunkte und wenn Debug-Level aktiviert ist if request.path.startswith('/api/') or app_logger.level <= logging.DEBUG: debug_request(app_logger, request) @app.after_request def log_response_info(response): """Loggt detaillierte Informationen über ausgehende HTTP-Antworten.""" # Nur für API-Endpunkte und wenn Debug-Level aktiviert ist if request.path.startswith('/api/') or app_logger.level <= logging.DEBUG: # Berechne Response-Zeit aus dem g-Objekt wenn verfügbar duration_ms = None if hasattr(request, '_start_time'): duration_ms = (time.time() - request._start_time) * 1000 debug_response(app_logger, response, duration_ms) return response # Start-Zeit für Request-Timing setzen @app.before_request def start_timer(): """Setzt einen Timer für die Request-Bearbeitung.""" request._start_time = time.time() # Sicheres Passwort-Hash für Kiosk-Deaktivierung KIOSK_PASSWORD_HASH = generate_password_hash("744563017196A") print("Alle Blueprints wurden in app.py integriert") # Custom decorator für Job-Besitzer-Check def job_owner_required(f): @wraps(f) def decorated_function(job_id, *args, **kwargs): db_session = get_db_session() job = db_session.query(Job).filter(Job.id == job_id).first() if not job: db_session.close() return jsonify({"error": "Job nicht gefunden"}), 404 is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id) is_admin = current_user.is_admin if not (is_owner or is_admin): db_session.close() return jsonify({"error": "Keine Berechtigung"}), 403 db_session.close() return f(job_id, *args, **kwargs) return decorated_function # Custom decorator für Admin-Check def admin_required(f): @wraps(f) @login_required def decorated_function(*args, **kwargs): app_logger.info(f"Admin-Check für Funktion {f.__name__}: User authenticated: {current_user.is_authenticated}, User ID: {current_user.id if current_user.is_authenticated else 'None'}, Is Admin: {current_user.is_admin if current_user.is_authenticated else 'None'}") if not current_user.is_admin: app_logger.warning(f"Admin-Zugriff verweigert für User {current_user.id if current_user.is_authenticated else 'Anonymous'} auf Funktion {f.__name__}") return jsonify({"error": "Nur Administratoren haben Zugriff"}), 403 return f(*args, **kwargs) return decorated_function # ===== AUTHENTIFIZIERUNGS-ROUTEN (ehemals auth.py) ===== @app.route("/auth/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: return redirect(url_for("index")) error = None if request.method == "POST": # Debug-Logging für Request-Details auth_logger.debug(f"Login-Request: Content-Type={request.content_type}, Headers={dict(request.headers)}") # Erweiterte Content-Type-Erkennung für AJAX-Anfragen content_type = request.content_type or "" is_json_request = ( request.is_json or "application/json" in content_type or request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.headers.get('Accept', '').startswith('application/json') ) # Robuste Datenextraktion username = None password = None remember_me = False try: if is_json_request: # JSON-Request verarbeiten try: data = request.get_json(force=True) or {} username = data.get("username") or data.get("email") password = data.get("password") remember_me = data.get("remember_me", False) except Exception as json_error: auth_logger.warning(f"JSON-Parsing fehlgeschlagen: {str(json_error)}") # Fallback zu Form-Daten username = request.form.get("email") password = request.form.get("password") remember_me = request.form.get("remember_me") == "on" else: # Form-Request verarbeiten username = request.form.get("email") password = request.form.get("password") remember_me = request.form.get("remember_me") == "on" # Zusätzlicher Fallback für verschiedene Feldnamen if not username: username = request.form.get("username") or request.values.get("email") or request.values.get("username") if not password: password = request.form.get("password") or request.values.get("password") except Exception as extract_error: auth_logger.error(f"Fehler beim Extrahieren der Login-Daten: {str(extract_error)}") error = "Fehler beim Verarbeiten der Anmeldedaten." if is_json_request: return jsonify({"error": error, "success": False}), 400 if not username or not password: error = "E-Mail-Adresse und Passwort müssen angegeben werden." auth_logger.warning(f"Unvollständige Login-Daten: username={bool(username)}, password={bool(password)}") if is_json_request: return jsonify({"error": error, "success": False}), 400 else: db_session = None try: db_session = get_db_session() # Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail user = db_session.query(User).filter( (User.username == username) | (User.email == username) ).first() if user and user.check_password(password): # Update last login timestamp user.update_last_login() db_session.commit() # Cache invalidieren für diesen User clear_user_cache(user.id) login_user(user, remember=remember_me) auth_logger.info(f"Benutzer {username} hat sich erfolgreich angemeldet") next_page = request.args.get("next") if is_json_request: return jsonify({ "success": True, "message": "Anmeldung erfolgreich", "redirect_url": next_page or url_for("index") }) else: if next_page: return redirect(next_page) return redirect(url_for("index")) else: error = "Ungültige E-Mail-Adresse oder Passwort." auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}") if is_json_request: return jsonify({"error": error, "success": False}), 401 except Exception as e: # Fehlerbehandlung für Datenbankprobleme error = "Anmeldefehler. Bitte versuchen Sie es später erneut." auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}") if is_json_request: return jsonify({"error": error, "success": False}), 500 finally: # Sicherstellen, dass die Datenbankverbindung geschlossen wird if db_session: try: db_session.close() except Exception as close_error: auth_logger.error(f"Fehler beim Schließen der DB-Session: {str(close_error)}") return render_template("login.html", error=error) @app.route("/auth/logout", methods=["GET", "POST"]) @login_required def auth_logout(): """Meldet den Benutzer ab.""" user_id = current_user.id app_logger.info(f"Benutzer {current_user.email} hat sich abgemeldet") logout_user() # Cache für abgemeldeten User löschen clear_user_cache(user_id) flash("Sie wurden erfolgreich abgemeldet.", "info") return redirect(url_for("login")) @app.route("/auth/reset-password-request", methods=["GET", "POST"]) def reset_password_request(): """Passwort-Reset anfordern (Placeholder).""" # TODO: Implement password reset functionality flash("Passwort-Reset-Funktionalität ist noch nicht implementiert.", "info") return redirect(url_for("login")) @app.route("/auth/api/login", methods=["POST"]) def api_login(): """API-Login-Endpunkt für Frontend""" try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 username = data.get("username") password = data.get("password") remember_me = data.get("remember_me", False) if not username or not password: return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400 db_session = get_db_session() user = db_session.query(User).filter( (User.username == username) | (User.email == username) ).first() if user and user.check_password(password): # Update last login timestamp user.update_last_login() db_session.commit() # Cache invalidieren für diesen User clear_user_cache(user.id) login_user(user, remember=remember_me) auth_logger.info(f"API-Login erfolgreich für Benutzer {username}") user_data = { "id": user.id, "username": user.username, "name": user.name, "email": user.email, "is_admin": user.is_admin } db_session.close() return jsonify({ "success": True, "user": user_data, "redirect_url": url_for("index") }) else: auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}") db_session.close() return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401 except Exception as e: auth_logger.error(f"Fehler beim API-Login: {str(e)}") return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500 @app.route("/auth/api/callback", methods=["GET", "POST"]) def api_callback(): """OAuth-Callback-Endpunkt für externe Authentifizierung""" try: # OAuth-Provider bestimmen provider = request.args.get('provider', 'github') if request.method == "GET": # Authorization Code aus URL-Parameter extrahieren code = request.args.get('code') state = request.args.get('state') error = request.args.get('error') if error: auth_logger.warning(f"OAuth-Fehler von {provider}: {error}") return jsonify({ "error": f"OAuth-Authentifizierung fehlgeschlagen: {error}", "redirect_url": url_for("login") }), 400 if not code: auth_logger.warning(f"Kein Authorization Code von {provider} erhalten") return jsonify({ "error": "Kein Authorization Code erhalten", "redirect_url": url_for("login") }), 400 # State-Parameter validieren (CSRF-Schutz) session_state = session.get('oauth_state') if not state or state != session_state: auth_logger.warning(f"Ungültiger State-Parameter von {provider}") return jsonify({ "error": "Ungültiger State-Parameter", "redirect_url": url_for("login") }), 400 # OAuth-Token austauschen if provider == 'github': user_data = handle_github_callback(code) else: auth_logger.error(f"Unbekannter OAuth-Provider: {provider}") return jsonify({ "error": "Unbekannter OAuth-Provider", "redirect_url": url_for("login") }), 400 if not user_data: return jsonify({ "error": "Fehler beim Abrufen der Benutzerdaten", "redirect_url": url_for("login") }), 400 # Benutzer in Datenbank suchen oder erstellen db_session = get_db_session() try: user = db_session.query(User).filter( User.email == user_data['email'] ).first() if not user: # Neuen Benutzer erstellen user = User( username=user_data['username'], email=user_data['email'], name=user_data['name'], role="user", oauth_provider=provider, oauth_id=str(user_data['id']) ) # Zufälliges Passwort setzen (wird nicht verwendet) import secrets user.set_password(secrets.token_urlsafe(32)) db_session.add(user) db_session.commit() auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}") else: # Bestehenden Benutzer aktualisieren user.oauth_provider = provider user.oauth_id = str(user_data['id']) user.name = user_data['name'] user.updated_at = datetime.now() db_session.commit() auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}") # Update last login timestamp user.update_last_login() db_session.commit() # Cache invalidieren für diesen User clear_user_cache(user.id) login_user(user, remember=True) # Session-State löschen session.pop('oauth_state', None) response_data = { "success": True, "user": { "id": user.id, "username": user.username, "name": user.name, "email": user.email, "is_admin": user.is_admin }, "redirect_url": url_for("index") } db_session.close() return jsonify(response_data) except Exception as e: db_session.rollback() db_session.close() auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}") return jsonify({ "error": "Datenbankfehler bei der Benutzeranmeldung", "redirect_url": url_for("login") }), 500 elif request.method == "POST": # POST-Anfragen für manuelle Token-Übermittlung data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 access_token = data.get('access_token') provider = data.get('provider', 'github') if not access_token: return jsonify({"error": "Kein Access Token erhalten"}), 400 # Benutzerdaten mit Access Token abrufen if provider == 'github': user_data = get_github_user_data(access_token) else: return jsonify({"error": "Unbekannter OAuth-Provider"}), 400 if not user_data: return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 400 # Benutzer verarbeiten (gleiche Logik wie bei GET) db_session = get_db_session() try: user = db_session.query(User).filter( User.email == user_data['email'] ).first() if not user: user = User( username=user_data['username'], email=user_data['email'], name=user_data['name'], role="user", oauth_provider=provider, oauth_id=str(user_data['id']) ) import secrets user.set_password(secrets.token_urlsafe(32)) db_session.add(user) db_session.commit() auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}") else: user.oauth_provider = provider user.oauth_id = str(user_data['id']) user.name = user_data['name'] user.updated_at = datetime.now() db_session.commit() auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}") # Update last login timestamp user.update_last_login() db_session.commit() # Cache invalidieren für diesen User clear_user_cache(user.id) login_user(user, remember=True) response_data = { "success": True, "user": { "id": user.id, "username": user.username, "name": user.name, "email": user.email, "is_admin": user.is_admin }, "redirect_url": url_for("index") } db_session.close() return jsonify(response_data) except Exception as e: db_session.rollback() db_session.close() auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}") return jsonify({ "error": "Datenbankfehler bei der Benutzeranmeldung", "redirect_url": url_for("login") }), 500 except Exception as e: auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}") return jsonify({ "error": "OAuth-Callback-Fehler", "redirect_url": url_for("login") }), 500 @lru_cache(maxsize=128) def handle_github_callback(code): """GitHub OAuth-Callback verarbeiten (mit Caching)""" try: import requests # GitHub OAuth-Konfiguration (sollte aus Umgebungsvariablen kommen) client_id = "7c5d8bef1a5519ec1fdc" client_secret = "5f1e586204358fbd53cf5fb7d418b3f06ccab8fd" if not client_id or not client_secret: auth_logger.error("GitHub OAuth-Konfiguration fehlt") return None # Access Token anfordern token_url = "https://github.com/login/oauth/access_token" token_data = { 'client_id': client_id, 'client_secret': client_secret, 'code': code } token_response = requests.post( token_url, data=token_data, headers={'Accept': 'application/json'}, timeout=10 ) if token_response.status_code != 200: auth_logger.error(f"GitHub Token-Anfrage fehlgeschlagen: {token_response.status_code}") return None token_json = token_response.json() access_token = token_json.get('access_token') if not access_token: auth_logger.error("Kein Access Token von GitHub erhalten") return None return get_github_user_data(access_token) except Exception as e: auth_logger.error(f"Fehler bei GitHub OAuth-Callback: {str(e)}") return None def get_github_user_data(access_token): """GitHub-Benutzerdaten mit Access Token abrufen""" try: import requests # Benutzerdaten von GitHub API abrufen user_url = "https://api.github.com/user" headers = { 'Authorization': f'token {access_token}', 'Accept': 'application/vnd.github.v3+json' } user_response = requests.get(user_url, headers=headers, timeout=10) if user_response.status_code != 200: auth_logger.error(f"GitHub User-API-Anfrage fehlgeschlagen: {user_response.status_code}") return None user_data = user_response.json() # E-Mail-Adresse separat abrufen (falls nicht öffentlich) email = user_data.get('email') if not email: email_url = "https://api.github.com/user/emails" email_response = requests.get(email_url, headers=headers, timeout=10) if email_response.status_code == 200: emails = email_response.json() # Primäre E-Mail-Adresse finden for email_obj in emails: if email_obj.get('primary', False): email = email_obj.get('email') break # Fallback: Erste E-Mail-Adresse verwenden if not email and emails: email = emails[0].get('email') if not email: auth_logger.error("Keine E-Mail-Adresse von GitHub erhalten") return None return { 'id': user_data.get('id'), 'username': user_data.get('login'), 'name': user_data.get('name') or user_data.get('login'), 'email': email } except Exception as e: auth_logger.error(f"Fehler beim Abrufen der GitHub-Benutzerdaten: {str(e)}") return None # ===== KIOSK-KONTROLL-ROUTEN (ehemals kiosk_control.py) ===== @app.route('/api/kiosk/status', methods=['GET']) def kiosk_get_status(): """Kiosk-Status abrufen.""" try: # Prüfen ob Kiosk-Modus aktiv ist kiosk_active = os.path.exists('/tmp/kiosk_active') return jsonify({ "active": kiosk_active, "message": "Kiosk-Status erfolgreich abgerufen" }) except Exception as e: kiosk_logger.error(f"Fehler beim Abrufen des Kiosk-Status: {str(e)}") return jsonify({"error": "Fehler beim Abrufen des Status"}), 500 @app.route('/api/kiosk/deactivate', methods=['POST']) def kiosk_deactivate(): """Kiosk-Modus mit Passwort deaktivieren.""" try: data = request.get_json() if not data or 'password' not in data: return jsonify({"error": "Passwort erforderlich"}), 400 password = data['password'] # Passwort überprüfen if not check_password_hash(KIOSK_PASSWORD_HASH, password): kiosk_logger.warning(f"Fehlgeschlagener Kiosk-Deaktivierungsversuch von IP: {request.remote_addr}") return jsonify({"error": "Ungültiges Passwort"}), 401 # Kiosk deaktivieren try: # Kiosk-Service stoppen subprocess.run(['sudo', 'systemctl', 'stop', 'myp-kiosk'], check=True) subprocess.run(['sudo', 'systemctl', 'disable', 'myp-kiosk'], check=True) # Kiosk-Marker entfernen if os.path.exists('/tmp/kiosk_active'): os.remove('/tmp/kiosk_active') # Normale Desktop-Umgebung wiederherstellen subprocess.run(['sudo', 'systemctl', 'set-default', 'graphical.target'], check=True) kiosk_logger.info(f"Kiosk-Modus erfolgreich deaktiviert von IP: {request.remote_addr}") return jsonify({ "success": True, "message": "Kiosk-Modus erfolgreich deaktiviert. System wird neu gestartet." }) except subprocess.CalledProcessError as e: kiosk_logger.error(f"Fehler beim Deaktivieren des Kiosk-Modus: {str(e)}") return jsonify({"error": "Fehler beim Deaktivieren des Kiosk-Modus"}), 500 except Exception as e: kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Deaktivierung: {str(e)}") return jsonify({"error": "Unerwarteter Fehler"}), 500 @app.route('/api/kiosk/activate', methods=['POST']) @login_required def kiosk_activate(): """Kiosk-Modus aktivieren (nur für Admins).""" try: # Admin-Authentifizierung prüfen if not current_user.is_admin: kiosk_logger.warning(f"Nicht-Admin-Benutzer {current_user.username} versuchte Kiosk-Aktivierung") return jsonify({"error": "Nur Administratoren können den Kiosk-Modus aktivieren"}), 403 # Kiosk aktivieren try: # Kiosk-Marker setzen with open('/tmp/kiosk_active', 'w') as f: f.write('1') # Kiosk-Service aktivieren subprocess.run(['sudo', 'systemctl', 'enable', 'myp-kiosk'], check=True) subprocess.run(['sudo', 'systemctl', 'start', 'myp-kiosk'], check=True) kiosk_logger.info(f"Kiosk-Modus erfolgreich aktiviert von Admin {current_user.username} (IP: {request.remote_addr})") return jsonify({ "success": True, "message": "Kiosk-Modus erfolgreich aktiviert" }) except subprocess.CalledProcessError as e: kiosk_logger.error(f"Fehler beim Aktivieren des Kiosk-Modus: {str(e)}") return jsonify({"error": "Fehler beim Aktivieren des Kiosk-Modus"}), 500 except Exception as e: kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Aktivierung: {str(e)}") return jsonify({"error": "Unerwarteter Fehler"}), 500 @app.route('/api/kiosk/restart', methods=['POST']) def kiosk_restart_system(): """System neu starten (nur nach Kiosk-Deaktivierung).""" try: data = request.get_json() if not data or 'password' not in data: return jsonify({"error": "Passwort erforderlich"}), 400 password = data['password'] # Passwort überprüfen if not check_password_hash(KIOSK_PASSWORD_HASH, password): kiosk_logger.warning(f"Fehlgeschlagener Neustart-Versuch von IP: {request.remote_addr}") return jsonify({"error": "Ungültiges Passwort"}), 401 kiosk_logger.info(f"System-Neustart initiiert von IP: {request.remote_addr}") # System nach kurzer Verzögerung neu starten subprocess.Popen(['sudo', 'shutdown', '-r', '+1']) return jsonify({ "success": True, "message": "System wird in 1 Minute neu gestartet" }) except Exception as e: kiosk_logger.error(f"Fehler beim System-Neustart: {str(e)}") return jsonify({"error": "Fehler beim Neustart"}), 500 # ===== ERWEITERTE SYSTEM-CONTROL API-ENDPUNKTE ===== @app.route('/api/admin/system/restart', methods=['POST']) @login_required @admin_required def api_admin_system_restart(): """Robuster System-Neustart mit Sicherheitsprüfungen.""" try: from utils.system_control import schedule_system_restart data = request.get_json() or {} delay_seconds = data.get('delay_seconds', 60) reason = data.get('reason', 'Manueller Admin-Neustart') force = data.get('force', False) # Begrenze Verzögerung auf sinnvolle Werte delay_seconds = max(10, min(3600, delay_seconds)) # 10s bis 1h result = schedule_system_restart( delay_seconds=delay_seconds, user_id=str(current_user.id), reason=reason, force=force ) if result.get('success'): app_logger.warning(f"System-Neustart geplant von Admin {current_user.username}: {reason}") return jsonify(result) else: return jsonify(result), 400 except Exception as e: app_logger.error(f"Fehler bei System-Neustart-Planung: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/system/shutdown', methods=['POST']) @login_required @admin_required def api_admin_system_shutdown(): """Robuster System-Shutdown mit Sicherheitsprüfungen.""" try: from utils.system_control import schedule_system_shutdown data = request.get_json() or {} delay_seconds = data.get('delay_seconds', 30) reason = data.get('reason', 'Manueller Admin-Shutdown') force = data.get('force', False) # Begrenze Verzögerung auf sinnvolle Werte delay_seconds = max(10, min(3600, delay_seconds)) # 10s bis 1h result = schedule_system_shutdown( delay_seconds=delay_seconds, user_id=str(current_user.id), reason=reason, force=force ) if result.get('success'): app_logger.warning(f"System-Shutdown geplant von Admin {current_user.username}: {reason}") return jsonify(result) else: return jsonify(result), 400 except Exception as e: app_logger.error(f"Fehler bei System-Shutdown-Planung: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/kiosk/restart', methods=['POST']) @login_required @admin_required def api_admin_kiosk_restart(): """Kiosk-Display neustarten ohne System-Neustart.""" try: from utils.system_control import restart_kiosk data = request.get_json() or {} delay_seconds = data.get('delay_seconds', 10) reason = data.get('reason', 'Manueller Kiosk-Neustart') # Begrenze Verzögerung delay_seconds = max(0, min(300, delay_seconds)) # 0s bis 5min result = restart_kiosk( delay_seconds=delay_seconds, user_id=str(current_user.id), reason=reason ) if result.get('success'): app_logger.info(f"Kiosk-Neustart geplant von Admin {current_user.username}: {reason}") return jsonify(result) else: return jsonify(result), 400 except Exception as e: app_logger.error(f"Fehler bei Kiosk-Neustart-Planung: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/system/status', methods=['GET']) @login_required @admin_required def api_admin_system_status_extended(): """Erweiterte System-Status-Informationen.""" try: from utils.system_control import get_system_status from utils.error_recovery import get_error_recovery_manager # System-Control-Status system_status = get_system_status() # Error-Recovery-Status error_manager = get_error_recovery_manager() error_stats = error_manager.get_error_statistics() # Kombiniere alle Informationen combined_status = { **system_status, "error_recovery": error_stats, "resilience_features": { "auto_recovery_enabled": error_stats.get('auto_recovery_enabled', False), "monitoring_active": error_stats.get('monitoring_active', False), "recovery_success_rate": error_stats.get('recovery_success_rate', 0) } } return jsonify(combined_status) except Exception as e: app_logger.error(f"Fehler bei System-Status-Abfrage: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/system/operations', methods=['GET']) @login_required @admin_required def api_admin_system_operations(): """Gibt geplante und vergangene System-Operationen zurück.""" try: from utils.system_control import get_system_control_manager manager = get_system_control_manager() return jsonify({ "success": True, "pending_operations": manager.get_pending_operations(), "operation_history": manager.get_operation_history(limit=50) }) except Exception as e: app_logger.error(f"Fehler bei Operations-Abfrage: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/system/operations//cancel', methods=['POST']) @login_required @admin_required def api_admin_cancel_operation(operation_id): """Bricht geplante System-Operation ab.""" try: from utils.system_control import get_system_control_manager manager = get_system_control_manager() result = manager.cancel_operation(operation_id) if result.get('success'): app_logger.info(f"Operation {operation_id} abgebrochen von Admin {current_user.username}") return jsonify(result) else: return jsonify(result), 400 except Exception as e: app_logger.error(f"Fehler beim Abbrechen von Operation {operation_id}: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/error-recovery/status', methods=['GET']) @login_required @admin_required def api_admin_error_recovery_status(): """Gibt Error-Recovery-Status und -Statistiken zurück.""" try: from utils.error_recovery import get_error_recovery_manager manager = get_error_recovery_manager() return jsonify({ "success": True, "statistics": manager.get_error_statistics(), "recent_errors": manager.get_recent_errors(limit=20) }) except Exception as e: app_logger.error(f"Fehler bei Error-Recovery-Status-Abfrage: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/admin/error-recovery/toggle', methods=['POST']) @login_required @admin_required def api_admin_toggle_error_recovery(): """Aktiviert/Deaktiviert Error-Recovery-Monitoring.""" try: from utils.error_recovery import get_error_recovery_manager data = request.get_json() or {} enable = data.get('enable', True) manager = get_error_recovery_manager() if enable: manager.start_monitoring() message = "Error-Recovery-Monitoring aktiviert" else: manager.stop_monitoring() message = "Error-Recovery-Monitoring deaktiviert" app_logger.info(f"{message} von Admin {current_user.username}") return jsonify({ "success": True, "message": message, "monitoring_active": manager.is_active }) except Exception as e: app_logger.error(f"Fehler beim Toggle von Error-Recovery: {e}") return jsonify({"success": False, "error": str(e)}), 500 # ===== BENUTZER-ROUTEN (ehemals user.py) ===== @app.route("/user/profile", methods=["GET"]) @login_required def user_profile(): """Profil-Seite anzeigen""" user_logger.info(f"Benutzer {current_user.username} hat seine Profilseite aufgerufen") return render_template("profile.html", user=current_user) @app.route("/user/settings", methods=["GET"]) @login_required def user_settings(): """Einstellungen-Seite anzeigen""" user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungsseite aufgerufen") return render_template("settings.html", user=current_user) @app.route("/user/update-profile", methods=["POST"]) @login_required def user_update_profile(): """Benutzerprofilinformationen aktualisieren""" try: # Überprüfen, ob es sich um eine JSON-Anfrage handelt is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json' if is_json_request: data = request.get_json() name = data.get("name") email = data.get("email") department = data.get("department") position = data.get("position") phone = data.get("phone") else: name = request.form.get("name") email = request.form.get("email") department = request.form.get("department") position = request.form.get("position") phone = request.form.get("phone") db_session = get_db_session() user = db_session.query(User).filter(User.id == int(current_user.id)).first() if user: # Aktualisiere die Benutzerinformationen if name: user.name = name if email: user.email = email if department: user.department = department if position: user.position = position if phone: user.phone = phone user.updated_at = datetime.now() db_session.commit() user_logger.info(f"Benutzer {current_user.username} hat sein Profil aktualisiert") if is_json_request: return jsonify({ "success": True, "message": "Profil erfolgreich aktualisiert" }) else: flash("Profil erfolgreich aktualisiert", "success") return redirect(url_for("user_profile")) else: error = "Benutzer nicht gefunden." if is_json_request: return jsonify({"error": error}), 404 else: flash(error, "error") return redirect(url_for("user_profile")) except Exception as e: error = f"Fehler beim Aktualisieren des Profils: {str(e)}" user_logger.error(error) if request.is_json: return jsonify({"error": error}), 500 else: flash(error, "error") return redirect(url_for("user_profile")) finally: db_session.close() @app.route("/user/api/update-settings", methods=["POST"]) @login_required def user_api_update_settings(): """API-Endpunkt für Einstellungen-Updates (JSON)""" return user_update_profile() @app.route("/user/update-settings", methods=["POST"]) @login_required def user_update_settings(): """Benutzereinstellungen aktualisieren""" db_session = get_db_session() try: # Überprüfen, ob es sich um eine JSON-Anfrage handelt is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json' # Einstellungen aus der Anfrage extrahieren if is_json_request: data = request.get_json() if not data: return jsonify({"error": "Keine Daten empfangen"}), 400 theme = data.get("theme", "system") reduced_motion = bool(data.get("reduced_motion", False)) contrast = data.get("contrast", "normal") notifications = data.get("notifications", {}) privacy = data.get("privacy", {}) else: theme = request.form.get("theme", "system") reduced_motion = request.form.get("reduced_motion") == "on" contrast = request.form.get("contrast", "normal") notifications = { "new_jobs": request.form.get("notify_new_jobs") == "on", "job_updates": request.form.get("notify_job_updates") == "on", "system": request.form.get("notify_system") == "on", "email": request.form.get("notify_email") == "on" } privacy = { "activity_logs": request.form.get("activity_logs") == "on", "two_factor": request.form.get("two_factor") == "on", "auto_logout": int(request.form.get("auto_logout", "60")) } # Validierung der Eingaben valid_themes = ["light", "dark", "system"] if theme not in valid_themes: theme = "system" valid_contrasts = ["normal", "high"] if contrast not in valid_contrasts: contrast = "normal" # Benutzer aus der Datenbank laden user = db_session.query(User).filter(User.id == int(current_user.id)).first() if not user: error = "Benutzer nicht gefunden." if is_json_request: return jsonify({"error": error}), 404 else: flash(error, "error") return redirect(url_for("user_settings")) # Einstellungen-Dictionary erstellen settings = { "theme": theme, "reduced_motion": reduced_motion, "contrast": contrast, "notifications": { "new_jobs": bool(notifications.get("new_jobs", True)), "job_updates": bool(notifications.get("job_updates", True)), "system": bool(notifications.get("system", True)), "email": bool(notifications.get("email", False)) }, "privacy": { "activity_logs": bool(privacy.get("activity_logs", True)), "two_factor": bool(privacy.get("two_factor", False)), "auto_logout": max(5, min(480, int(privacy.get("auto_logout", 60)))) # 5-480 Minuten }, "last_updated": datetime.now().isoformat() } # Prüfen, ob User-Tabelle eine settings-Spalte hat if hasattr(user, 'settings'): # Einstellungen in der Datenbank speichern import json user.settings = json.dumps(settings) else: # Fallback: In Session speichern (temporär) session['user_settings'] = settings user.updated_at = datetime.now() db_session.commit() user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungen aktualisiert") if is_json_request: return jsonify({ "success": True, "message": "Einstellungen erfolgreich aktualisiert", "settings": settings }) else: flash("Einstellungen erfolgreich aktualisiert", "success") return redirect(url_for("user_settings")) except ValueError as e: error = f"Ungültige Eingabedaten: {str(e)}" user_logger.warning(f"Ungültige Einstellungsdaten von Benutzer {current_user.username}: {str(e)}") if is_json_request: return jsonify({"error": error}), 400 else: flash(error, "error") return redirect(url_for("user_settings")) except Exception as e: db_session.rollback() error = f"Fehler beim Aktualisieren der Einstellungen: {str(e)}" user_logger.error(f"Fehler beim Aktualisieren der Einstellungen für Benutzer {current_user.username}: {str(e)}") if is_json_request: return jsonify({"error": "Interner Serverfehler"}), 500 else: flash("Fehler beim Speichern der Einstellungen", "error") return redirect(url_for("user_settings")) finally: db_session.close() @app.route("/api/user/settings", methods=["GET", "POST"]) @login_required def get_user_settings(): """Holt die aktuellen Benutzereinstellungen (GET) oder speichert sie (POST)""" if request.method == "GET": try: # Einstellungen aus Session oder Datenbank laden user_settings = session.get('user_settings', {}) # Standard-Einstellungen falls keine vorhanden default_settings = { "theme": "system", "reduced_motion": False, "contrast": "normal", "notifications": { "new_jobs": True, "job_updates": True, "system": True, "email": False }, "privacy": { "activity_logs": True, "two_factor": False, "auto_logout": 60 } } # Merge mit Standard-Einstellungen settings = {**default_settings, **user_settings} return jsonify({ "success": True, "settings": settings }) except Exception as e: user_logger.error(f"Fehler beim Laden der Benutzereinstellungen: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Laden der Einstellungen" }), 500 elif request.method == "POST": """Benutzereinstellungen über API aktualisieren""" db_session = get_db_session() try: # JSON-Daten extrahieren if not request.is_json: return jsonify({"error": "Anfrage muss im JSON-Format sein"}), 400 data = request.get_json() if not data: return jsonify({"error": "Keine Daten empfangen"}), 400 # Einstellungen aus der Anfrage extrahieren theme = data.get("theme", "system") reduced_motion = bool(data.get("reduced_motion", False)) contrast = data.get("contrast", "normal") notifications = data.get("notifications", {}) privacy = data.get("privacy", {}) # Validierung der Eingaben valid_themes = ["light", "dark", "system"] if theme not in valid_themes: theme = "system" valid_contrasts = ["normal", "high"] if contrast not in valid_contrasts: contrast = "normal" # Benutzer aus der Datenbank laden user = db_session.query(User).filter(User.id == int(current_user.id)).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Einstellungen-Dictionary erstellen settings = { "theme": theme, "reduced_motion": reduced_motion, "contrast": contrast, "notifications": { "new_jobs": bool(notifications.get("new_jobs", True)), "job_updates": bool(notifications.get("job_updates", True)), "system": bool(notifications.get("system", True)), "email": bool(notifications.get("email", False)) }, "privacy": { "activity_logs": bool(privacy.get("activity_logs", True)), "two_factor": bool(privacy.get("two_factor", False)), "auto_logout": max(5, min(480, int(privacy.get("auto_logout", 60)))) # 5-480 Minuten }, "last_updated": datetime.now().isoformat() } # Prüfen, ob User-Tabelle eine settings-Spalte hat if hasattr(user, 'settings'): # Einstellungen in der Datenbank speichern import json user.settings = json.dumps(settings) else: # Fallback: In Session speichern (temporär) session['user_settings'] = settings user.updated_at = datetime.now() db_session.commit() user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungen über die API aktualisiert") return jsonify({ "success": True, "message": "Einstellungen erfolgreich aktualisiert", "settings": settings }) except ValueError as e: error = f"Ungültige Eingabedaten: {str(e)}" user_logger.warning(f"Ungültige Einstellungsdaten von Benutzer {current_user.username}: {str(e)}") return jsonify({"error": error}), 400 except Exception as e: db_session.rollback() error = f"Fehler beim Aktualisieren der Einstellungen: {str(e)}" user_logger.error(f"Fehler beim Aktualisieren der Einstellungen für Benutzer {current_user.username}: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 finally: db_session.close() @app.route("/user/change-password", methods=["POST"]) @login_required def user_change_password(): """Benutzerpasswort ändern""" try: # Überprüfen, ob es sich um eine JSON-Anfrage handelt is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json' if is_json_request: data = request.get_json() current_password = data.get("current_password") new_password = data.get("new_password") confirm_password = data.get("confirm_password") else: current_password = request.form.get("current_password") new_password = request.form.get("new_password") confirm_password = request.form.get("confirm_password") # Prüfen, ob alle Felder ausgefüllt sind if not current_password or not new_password or not confirm_password: error = "Alle Passwortfelder müssen ausgefüllt sein." if is_json_request: return jsonify({"error": error}), 400 else: flash(error, "error") return redirect(url_for("user_profile")) # Prüfen, ob das neue Passwort und die Bestätigung übereinstimmen if new_password != confirm_password: error = "Das neue Passwort und die Bestätigung stimmen nicht überein." if is_json_request: return jsonify({"error": error}), 400 else: flash(error, "error") return redirect(url_for("user_profile")) db_session = get_db_session() user = db_session.query(User).filter(User.id == int(current_user.id)).first() if user and user.check_password(current_password): # Passwort aktualisieren user.set_password(new_password) user.updated_at = datetime.now() db_session.commit() user_logger.info(f"Benutzer {current_user.username} hat sein Passwort geändert") if is_json_request: return jsonify({ "success": True, "message": "Passwort erfolgreich geändert" }) else: flash("Passwort erfolgreich geändert", "success") return redirect(url_for("user_profile")) else: error = "Das aktuelle Passwort ist nicht korrekt." if is_json_request: return jsonify({"error": error}), 401 else: flash(error, "error") return redirect(url_for("user_profile")) except Exception as e: error = f"Fehler beim Ändern des Passworts: {str(e)}" user_logger.error(error) if request.is_json: return jsonify({"error": error}), 500 else: flash(error, "error") return redirect(url_for("user_profile")) finally: db_session.close() @app.route("/user/export", methods=["GET"]) @login_required def user_export_data(): """Exportiert alle Benutzerdaten als JSON für DSGVO-Konformität""" try: db_session = get_db_session() user = db_session.query(User).filter(User.id == int(current_user.id)).first() if not user: db_session.close() return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Benutzerdaten abrufen user_data = user.to_dict() # Jobs des Benutzers abrufen jobs = db_session.query(Job).filter(Job.user_id == user.id).all() user_data["jobs"] = [job.to_dict() for job in jobs] # Aktivitäten und Einstellungen hinzufügen user_data["settings"] = session.get('user_settings', {}) # Persönliche Statistiken user_data["statistics"] = { "total_jobs": len(jobs), "completed_jobs": len([j for j in jobs if j.status == "finished"]), "failed_jobs": len([j for j in jobs if j.status == "failed"]), "account_created": user.created_at.isoformat() if user.created_at else None, "last_login": user.last_login.isoformat() if user.last_login else None } db_session.close() # Daten als JSON-Datei zum Download anbieten response = make_response(json.dumps(user_data, indent=4)) response.headers["Content-Disposition"] = f"attachment; filename=user_data_{user.username}.json" response.headers["Content-Type"] = "application/json" user_logger.info(f"Benutzer {current_user.username} hat seine Daten exportiert") return response except Exception as e: error = f"Fehler beim Exportieren der Benutzerdaten: {str(e)}" user_logger.error(error) return jsonify({"error": error}), 500 @app.route("/user/profile", methods=["PUT"]) @login_required def user_update_profile_api(): """API-Endpunkt zum Aktualisieren des Benutzerprofils""" try: if not request.is_json: return jsonify({"error": "Anfrage muss im JSON-Format sein"}), 400 data = request.get_json() db_session = get_db_session() user = db_session.get(User, int(current_user.id)) if not user: db_session.close() return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Aktualisiere nur die bereitgestellten Felder if "name" in data: user.name = data["name"] if "email" in data: user.email = data["email"] if "department" in data: user.department = data["department"] if "position" in data: user.position = data["position"] if "phone" in data: user.phone = data["phone"] if "bio" in data: user.bio = data["bio"] user.updated_at = datetime.now() db_session.commit() # Aktualisierte Benutzerdaten zurückgeben user_data = user.to_dict() db_session.close() user_logger.info(f"Benutzer {current_user.username} hat sein Profil über die API aktualisiert") return jsonify({ "success": True, "message": "Profil erfolgreich aktualisiert", "user": user_data }) except Exception as e: error = f"Fehler beim Aktualisieren des Profils: {str(e)}" user_logger.error(error) return jsonify({"error": error}), 500 # ===== HILFSFUNKTIONEN ===== @measure_execution_time(logger=printers_logger, task_name="Drucker-Status-Prüfung") def check_printer_status(ip_address: str, timeout: int = 7) -> Tuple[str, bool]: """ Überprüft den Status eines Druckers anhand der Steckdosen-Logik: - Steckdose erreichbar aber AUS = Drucker ONLINE (bereit zum Drucken) - Steckdose erreichbar und AN = Drucker PRINTING (druckt gerade) - Steckdose nicht erreichbar = Drucker OFFLINE (kritischer Fehler) Args: ip_address: IP-Adresse des Druckers oder der Steckdose timeout: Timeout in Sekunden Returns: Tuple[str, bool]: (Status, Erreichbarkeit) """ status = "offline" reachable = False try: # Überprüfen, ob die Steckdose erreichbar ist import socket # Erst Port 9999 versuchen (Tapo-Standard) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((ip_address, 9999)) sock.close() if result == 0: reachable = True try: # TP-Link Tapo Steckdose mit PyP100 überprüfen from PyP100 import PyP100 p100 = PyP100.P100(ip_address, TAPO_USERNAME, TAPO_PASSWORD) p100.handshake() # Authentifizierung p100.login() # Login # Geräteinformationen abrufen device_info = p100.getDeviceInfo() # 🎯 KORREKTE LOGIK: Status auswerten if device_info.get('device_on', False): # Steckdose an = Drucker PRINTING (druckt gerade) status = "printing" printers_logger.info(f"🖨️ Drucker {ip_address}: PRINTING (Steckdose an - druckt gerade)") else: # Steckdose aus = Drucker ONLINE (bereit zum Drucken) status = "online" printers_logger.info(f"✅ Drucker {ip_address}: ONLINE (Steckdose aus - bereit zum Drucken)") except Exception as e: printers_logger.error(f"❌ Fehler bei Tapo-Status-Check für {ip_address}: {str(e)}") reachable = False status = "error" else: # Steckdose nicht erreichbar = kritischer Fehler printers_logger.warning(f"❌ Drucker {ip_address}: OFFLINE (Steckdose nicht erreichbar)") reachable = False status = "offline" except Exception as e: printers_logger.error(f"❌ Unerwarteter Fehler bei Status-Check für {ip_address}: {str(e)}") reachable = False status = "error" return status, reachable @measure_execution_time(logger=printers_logger, task_name="Mehrere-Drucker-Status-Prüfung") def check_multiple_printers_status(printers: List[Dict], timeout: int = 7) -> Dict[int, Tuple[str, bool]]: """ Überprüft den Status mehrerer Drucker parallel. Args: printers: Liste der zu prüfenden Drucker timeout: Timeout für jeden einzelnen Drucker Returns: Dict[int, Tuple[str, bool]]: Dictionary mit Drucker-ID als Key und (Status, Aktiv) als Value """ results = {} # Wenn keine Drucker vorhanden sind, gebe leeres Dict zurück if not printers: printers_logger.info("ℹ️ Keine Drucker zum Status-Check gefunden") return results printers_logger.info(f"🔍 Prüfe Status von {len(printers)} Druckern parallel...") # Parallel-Ausführung mit ThreadPoolExecutor # Sicherstellen, dass max_workers mindestens 1 ist max_workers = min(max(len(printers), 1), 10) with ThreadPoolExecutor(max_workers=max_workers) as executor: # Futures für alle Drucker erstellen future_to_printer = { executor.submit(check_printer_status, printer.get('ip_address'), timeout): printer for printer in printers } # Ergebnisse sammeln for future in as_completed(future_to_printer, timeout=timeout + 2): printer = future_to_printer[future] try: status, active = future.result() results[printer['id']] = (status, active) printers_logger.info(f"Drucker {printer['name']} ({printer.get('ip_address')}): {status}") except Exception as e: printers_logger.error(f"Fehler bei Status-Check für Drucker {printer['name']}: {str(e)}") results[printer['id']] = ("offline", False) printers_logger.info(f"✅ Status-Check abgeschlossen für {len(results)} Drucker") return results # ===== UI-ROUTEN ===== @app.route("/admin-dashboard") @login_required @admin_required def admin_page(): """Admin-Dashboard-Seite mit Live-Funktionen""" # Daten für das Template sammeln (gleiche Logik wie admin-dashboard) db_session = get_db_session() try: # Erfolgsrate berechnen completed_jobs = db_session.query(Job).filter(Job.status == 'completed').count() if db_session else 0 total_jobs = db_session.query(Job).count() if db_session else 0 success_rate = round((completed_jobs / total_jobs * 100), 1) if total_jobs > 0 else 0 # Statistiken sammeln stats = { 'total_users': db_session.query(User).count(), 'total_printers': db_session.query(Printer).count(), 'online_printers': db_session.query(Printer).filter(Printer.status == 'online').count(), 'active_jobs': db_session.query(Job).filter(Job.status.in_(['running', 'queued'])).count(), 'queued_jobs': db_session.query(Job).filter(Job.status == 'queued').count(), 'success_rate': success_rate } # Tab-Parameter mit erweiterten Optionen active_tab = request.args.get('tab', 'users') valid_tabs = ['users', 'printers', 'jobs', 'system', 'logs'] # Validierung des Tab-Parameters if active_tab not in valid_tabs: active_tab = 'users' # Benutzer laden (für users tab) users = [] if active_tab == 'users': users = db_session.query(User).all() # Drucker laden (für printers tab) printers = [] if active_tab == 'printers': printers = db_session.query(Printer).all() db_session.close() return render_template("admin.html", stats=stats, active_tab=active_tab, users=users, printers=printers) except Exception as e: app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}") db_session.close() flash("Fehler beim Laden des Admin-Bereichs.", "error") return redirect(url_for("index")) @app.route("/") def index(): if current_user.is_authenticated: return render_template("index.html") return redirect(url_for("login")) @app.route("/dashboard") @login_required def dashboard(): return render_template("dashboard.html") @app.route("/profile") @login_required def profile_redirect(): """Leitet zur neuen Profilseite im User-Blueprint weiter.""" return redirect(url_for("user_profile")) @app.route("/profil") @login_required def profil_redirect(): """Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL).""" return redirect(url_for("user_profile")) @app.route("/settings") @login_required def settings_redirect(): """Leitet zur neuen Einstellungsseite im User-Blueprint weiter.""" return redirect(url_for("user_settings")) @app.route("/einstellungen") @login_required def einstellungen_redirect(): """Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL).""" return redirect(url_for("user_settings")) @app.route("/admin") @login_required @admin_required def admin(): return render_template(url_for("admin_page")) @app.route("/socket-test") @login_required @admin_required def socket_test(): """ Steckdosen-Test-Seite für Ausbilder und Administratoren. """ app_logger.info(f"Admin {current_user.name} hat die Steckdosen-Test-Seite aufgerufen") return render_template("socket_test.html") @app.route("/demo") @login_required def components_demo(): """Demo-Seite für UI-Komponenten""" return render_template("components_demo.html") @app.route("/printers") @login_required def printers_page(): """Zeigt die Übersichtsseite für Drucker an.""" return render_template("printers.html") @app.route("/jobs") @login_required def jobs_page(): """Zeigt die Übersichtsseite für Druckaufträge an.""" return render_template("jobs.html") @app.route("/jobs/new") @login_required def new_job_page(): """Zeigt die Seite zum Erstellen neuer Druckaufträge an.""" return render_template("jobs.html") @app.route("/stats") @login_required def stats_page(): """Zeigt die Statistiken-Seite an""" return render_template("stats.html", title="Statistiken") @app.route("/privacy") def privacy(): """Datenschutzerklärung-Seite""" return render_template("privacy.html", title="Datenschutzerklärung") @app.route("/terms") def terms(): """Nutzungsbedingungen-Seite""" return render_template("terms.html", title="Nutzungsbedingungen") @app.route("/imprint") def imprint(): """Impressum-Seite""" return render_template("imprint.html", title="Impressum") @app.route("/legal") def legal(): """Rechtliche Hinweise-Übersichtsseite""" return render_template("legal.html", title="Rechtliche Hinweise") # ===== NEUE SYSTEM UI-ROUTEN ===== @app.route("/dashboard/realtime") @login_required def realtime_dashboard(): """Echtzeit-Dashboard mit WebSocket-Updates""" return render_template("realtime_dashboard.html", title="Echtzeit-Dashboard") @app.route("/reports") @login_required def reports_page(): """Reports-Generierung-Seite""" return render_template("reports.html", title="Reports") @app.route("/maintenance") @login_required def maintenance_page(): """Wartungs-Management-Seite""" return render_template("maintenance.html", title="Wartung") @app.route("/locations") @login_required @admin_required def locations_page(): """Multi-Location-System Verwaltungsseite.""" return render_template("locations.html", title="Standortverwaltung") @app.route("/admin/steckdosenschaltzeiten") @login_required @admin_required def admin_plug_schedules(): """ Administrator-Übersicht für Steckdosenschaltzeiten. Zeigt detaillierte Historie aller Smart Plug Schaltzeiten mit Kalenderansicht. """ app_logger.info(f"Admin {current_user.name} (ID: {current_user.id}) öffnet Steckdosenschaltzeiten") try: # Statistiken für die letzten 24 Stunden abrufen stats_24h = PlugStatusLog.get_status_statistics(hours=24) # Alle Drucker für Filter-Dropdown db_session = get_db_session() printers = db_session.query(Printer).filter(Printer.active == True).all() db_session.close() return render_template('admin_plug_schedules.html', stats=stats_24h, printers=printers, page_title="Steckdosenschaltzeiten", breadcrumb=[ {"name": "Admin-Dashboard", "url": url_for("admin_page")}, {"name": "Steckdosenschaltzeiten", "url": "#"} ]) except Exception as e: app_logger.error(f"Fehler beim Laden der Steckdosenschaltzeiten-Seite: {str(e)}") flash("Fehler beim Laden der Steckdosenschaltzeiten-Daten.", "error") return redirect(url_for("admin_page")) @app.route("/validation-demo") @login_required def validation_demo(): """Formular-Validierung Demo-Seite""" return render_template("validation_demo.html", title="Formular-Validierung Demo") @app.route("/tables-demo") @login_required def tables_demo(): """Advanced Tables Demo-Seite""" return render_template("tables_demo.html", title="Erweiterte Tabellen Demo") @app.route("/dragdrop-demo") @login_required def dragdrop_demo(): """Drag & Drop Demo-Seite""" return render_template("dragdrop_demo.html", title="Drag & Drop Demo") # ===== ERROR MONITORING SYSTEM ===== @app.route("/api/admin/system-health", methods=['GET']) @login_required @admin_required def api_admin_system_health(): """API-Endpunkt für System-Gesundheitscheck mit erweiterten Fehlermeldungen.""" try: critical_errors = [] warnings = [] # 1. Datenbankverbindung prüfen try: db_session = get_db_session() db_session.execute(text("SELECT 1")).fetchone() db_session.close() except Exception as e: critical_errors.append({ "type": "critical", "title": "Datenbankverbindung fehlgeschlagen", "description": f"Keine Verbindung zur Datenbank möglich: {str(e)[:100]}", "solution": "Datenbankdienst neustarten oder Konfiguration prüfen", "timestamp": datetime.now().isoformat() }) # 2. Verfügbaren Speicherplatz prüfen try: import shutil total, used, free = shutil.disk_usage("/") free_percentage = (free / total) * 100 if free_percentage < 5: critical_errors.append({ "type": "critical", "title": "Kritischer Speicherplatz", "description": f"Nur noch {free_percentage:.1f}% Speicherplatz verfügbar", "solution": "Temporäre Dateien löschen oder Speicher erweitern", "timestamp": datetime.now().isoformat() }) elif free_percentage < 15: warnings.append({ "type": "warning", "title": "Wenig Speicherplatz", "description": f"Nur noch {free_percentage:.1f}% Speicherplatz verfügbar", "solution": "Aufräumen empfohlen", "timestamp": datetime.now().isoformat() }) except Exception as e: warnings.append({ "type": "warning", "title": "Speicherplatz-Prüfung fehlgeschlagen", "description": f"Konnte Speicherplatz nicht prüfen: {str(e)[:100]}", "solution": "Manuell prüfen", "timestamp": datetime.now().isoformat() }) # 3. Upload-Ordner-Struktur prüfen upload_paths = [ "uploads/jobs", "uploads/avatars", "uploads/assets", "uploads/backups", "uploads/logs", "uploads/temp" ] for path in upload_paths: full_path = os.path.join(current_app.root_path, path) if not os.path.exists(full_path): warnings.append({ "type": "warning", "title": f"Upload-Ordner fehlt: {path}", "description": f"Der Upload-Ordner {path} existiert nicht", "solution": "Ordner automatisch erstellen lassen", "timestamp": datetime.now().isoformat() }) # 4. Log-Dateien-Größe prüfen try: logs_dir = os.path.join(current_app.root_path, "logs") if os.path.exists(logs_dir): total_log_size = sum( os.path.getsize(os.path.join(logs_dir, f)) for f in os.listdir(logs_dir) if os.path.isfile(os.path.join(logs_dir, f)) ) # Größe in MB log_size_mb = total_log_size / (1024 * 1024) if log_size_mb > 500: # > 500 MB warnings.append({ "type": "warning", "title": "Große Log-Dateien", "description": f"Log-Dateien belegen {log_size_mb:.1f} MB Speicherplatz", "solution": "Log-Rotation oder Archivierung empfohlen", "timestamp": datetime.now().isoformat() }) except Exception as e: app_logger.warning(f"Fehler beim Prüfen der Log-Dateien-Größe: {str(e)}") # 5. Aktive Drucker-Verbindungen prüfen try: db_session = get_db_session() total_printers = db_session.query(Printer).count() online_printers = db_session.query(Printer).filter(Printer.status == 'online').count() db_session.close() if total_printers > 0: offline_percentage = ((total_printers - online_printers) / total_printers) * 100 if offline_percentage > 50: warnings.append({ "type": "warning", "title": "Viele Drucker offline", "description": f"{offline_percentage:.0f}% der Drucker sind offline", "solution": "Drucker-Verbindungen überprüfen", "timestamp": datetime.now().isoformat() }) except Exception as e: app_logger.warning(f"Fehler beim Prüfen der Drucker-Status: {str(e)}") # Dashboard-Event senden emit_system_alert( "System-Gesundheitscheck durchgeführt", alert_type="info" if not critical_errors else "warning", priority="normal" if not critical_errors else "high" ) health_status = "healthy" if not critical_errors else "unhealthy" return jsonify({ "success": True, "health_status": health_status, "critical_errors": critical_errors, "warnings": warnings, "timestamp": datetime.now().isoformat(), "summary": { "total_issues": len(critical_errors) + len(warnings), "critical_count": len(critical_errors), "warning_count": len(warnings) } }) except Exception as e: app_logger.error(f"Fehler beim System-Gesundheitscheck: {str(e)}") return jsonify({ "success": False, "error": str(e), "health_status": "error" }), 500 @app.route("/api/admin/fix-errors", methods=['POST']) @login_required @admin_required def api_admin_fix_errors(): """API-Endpunkt für automatische Fehlerbehebung.""" try: fixed_issues = [] failed_fixes = [] # 1. Fehlende Upload-Ordner erstellen upload_paths = [ "uploads/jobs", "uploads/avatars", "uploads/assets", "uploads/backups", "uploads/logs", "uploads/temp", "uploads/guests" # Ergänzt um guests ] for path in upload_paths: full_path = os.path.join(current_app.root_path, path) if not os.path.exists(full_path): try: os.makedirs(full_path, exist_ok=True) fixed_issues.append(f"Upload-Ordner {path} erstellt") app_logger.info(f"Upload-Ordner automatisch erstellt: {full_path}") except Exception as e: failed_fixes.append(f"Konnte Upload-Ordner {path} nicht erstellen: {str(e)}") app_logger.error(f"Fehler beim Erstellen des Upload-Ordners {path}: {str(e)}") # 2. Temporäre Dateien aufräumen (älter als 24 Stunden) try: temp_path = os.path.join(current_app.root_path, "uploads/temp") if os.path.exists(temp_path): now = time.time() cleaned_files = 0 for filename in os.listdir(temp_path): file_path = os.path.join(temp_path, filename) if os.path.isfile(file_path): # Dateien älter als 24 Stunden löschen if now - os.path.getmtime(file_path) > 24 * 3600: try: os.remove(file_path) cleaned_files += 1 except Exception as e: app_logger.warning(f"Konnte temporäre Datei nicht löschen {filename}: {str(e)}") if cleaned_files > 0: fixed_issues.append(f"{cleaned_files} alte temporäre Dateien gelöscht") app_logger.info(f"Automatische Bereinigung: {cleaned_files} temporäre Dateien gelöscht") except Exception as e: failed_fixes.append(f"Temporäre Dateien Bereinigung fehlgeschlagen: {str(e)}") app_logger.error(f"Fehler bei der temporären Dateien Bereinigung: {str(e)}") # 3. Datenbankverbindung wiederherstellen try: db_session = get_db_session() db_session.execute(text("SELECT 1")).fetchone() db_session.close() fixed_issues.append("Datenbankverbindung erfolgreich getestet") except Exception as e: failed_fixes.append(f"Datenbankverbindung konnte nicht wiederhergestellt werden: {str(e)}") app_logger.error(f"Datenbankverbindung Wiederherstellung fehlgeschlagen: {str(e)}") # 4. Log-Rotation durchführen bei großen Log-Dateien try: logs_dir = os.path.join(current_app.root_path, "logs") if os.path.exists(logs_dir): rotated_logs = 0 for log_file in os.listdir(logs_dir): log_path = os.path.join(logs_dir, log_file) if os.path.isfile(log_path) and log_file.endswith('.log'): # Log-Dateien größer als 10 MB rotieren if os.path.getsize(log_path) > 10 * 1024 * 1024: try: # Backup erstellen backup_name = f"{log_file}.{datetime.now().strftime('%Y%m%d_%H%M%S')}.bak" backup_path = os.path.join(logs_dir, backup_name) shutil.copy2(log_path, backup_path) # Log-Datei leeren (aber nicht löschen) with open(log_path, 'w') as f: f.write(f"# Log rotiert am {datetime.now().isoformat()}\n") rotated_logs += 1 except Exception as e: app_logger.warning(f"Konnte Log-Datei nicht rotieren {log_file}: {str(e)}") if rotated_logs > 0: fixed_issues.append(f"{rotated_logs} große Log-Dateien rotiert") app_logger.info(f"Automatische Log-Rotation: {rotated_logs} Dateien rotiert") except Exception as e: failed_fixes.append(f"Log-Rotation fehlgeschlagen: {str(e)}") app_logger.error(f"Fehler bei der Log-Rotation: {str(e)}") # 5. Offline-Drucker Reconnect versuchen try: db_session = get_db_session() offline_printers = db_session.query(Printer).filter(Printer.status != 'online').all() reconnected_printers = 0 for printer in offline_printers: try: # Status-Check durchführen if printer.plug_ip: status, is_reachable = check_printer_status(printer.plug_ip, timeout=3) if is_reachable: printer.status = 'online' reconnected_printers += 1 except Exception as e: app_logger.debug(f"Drucker {printer.name} Reconnect fehlgeschlagen: {str(e)}") if reconnected_printers > 0: db_session.commit() fixed_issues.append(f"{reconnected_printers} Drucker wieder online") app_logger.info(f"Automatischer Drucker-Reconnect: {reconnected_printers} Drucker") db_session.close() except Exception as e: failed_fixes.append(f"Drucker-Reconnect fehlgeschlagen: {str(e)}") app_logger.error(f"Fehler beim Drucker-Reconnect: {str(e)}") # Ergebnis zusammenfassen total_fixed = len(fixed_issues) total_failed = len(failed_fixes) success = total_fixed > 0 or total_failed == 0 app_logger.info(f"Automatische Fehlerbehebung abgeschlossen: {total_fixed} behoben, {total_failed} fehlgeschlagen") return jsonify({ "success": success, "message": f"Automatische Reparatur abgeschlossen: {total_fixed} Probleme behoben" + (f", {total_failed} fehlgeschlagen" if total_failed > 0 else ""), "fixed_issues": fixed_issues, "failed_fixes": failed_fixes, "summary": { "total_fixed": total_fixed, "total_failed": total_failed }, "timestamp": datetime.now().isoformat() }) except Exception as e: app_logger.error(f"Fehler bei der automatischen Fehlerbehebung: {str(e)}") return jsonify({ "success": False, "error": str(e), "message": "Automatische Fehlerbehebung fehlgeschlagen" }), 500 @app.route("/api/admin/system-health-dashboard", methods=['GET']) @login_required @admin_required def api_admin_system_health_dashboard(): """API-Endpunkt für System-Gesundheitscheck mit Dashboard-Integration.""" try: # Basis-System-Gesundheitscheck durchführen critical_errors = [] warnings = [] # Dashboard-Event für System-Check senden emit_system_alert( "System-Gesundheitscheck durchgeführt", alert_type="info", priority="normal" ) return jsonify({ "success": True, "health_status": "healthy", "critical_errors": critical_errors, "warnings": warnings, "timestamp": datetime.now().isoformat() }) except Exception as e: app_logger.error(f"Fehler beim System-Gesundheitscheck: {str(e)}") return jsonify({ "success": False, "error": str(e) }), 500 def admin_printer_settings_page(printer_id): """Zeigt die Drucker-Einstellungsseite an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) db_session = get_db_session() try: printer = db_session.get(Printer, printer_id) if not printer: flash("Drucker nicht gefunden.", "error") return redirect(url_for("admin_page")) printer_data = { "id": printer.id, "name": printer.name, "model": printer.model or 'Unbekanntes Modell', "location": printer.location or 'Unbekannter Standort', "mac_address": printer.mac_address, "plug_ip": printer.plug_ip, "status": printer.status or "offline", "active": printer.active if hasattr(printer, 'active') else True, "created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat() } db_session.close() return render_template("admin_printer_settings.html", printer=printer_data) except Exception as e: db_session.close() app_logger.error(f"Fehler beim Laden der Drucker-Einstellungen: {str(e)}") flash("Fehler beim Laden der Drucker-Daten.", "error") return redirect(url_for("admin_page")) @app.route("/admin/guest-requests") @login_required @admin_required def admin_guest_requests(): """Admin-Seite für Gastanfragen Verwaltung""" try: app_logger.info(f"Admin-Gastanfragen Seite aufgerufen von User {current_user.id}") return render_template("admin_guest_requests.html") except Exception as e: app_logger.error(f"Fehler beim Laden der Admin-Gastanfragen Seite: {str(e)}") flash("Fehler beim Laden der Gastanfragen-Verwaltung.", "danger") return redirect(url_for("admin")) @app.route("/requests/overview") @login_required @admin_required def admin_guest_requests_overview(): """Admin-Oberfläche für die Verwaltung von Gastanfragen mit direkten Aktionen.""" try: app_logger.info(f"Admin-Gastanträge Übersicht aufgerufen von User {current_user.id}") return render_template("admin_guest_requests_overview.html") except Exception as e: app_logger.error(f"Fehler beim Laden der Admin-Gastanträge Übersicht: {str(e)}") flash("Fehler beim Laden der Gastanträge-Übersicht.", "danger") return redirect(url_for("admin")) # ===== ADMIN API-ROUTEN FÜR BENUTZER UND DRUCKER ===== @app.route("/api/admin/users", methods=["POST"]) @login_required def create_user_api(): """Erstellt einen neuen Benutzer (nur für Admins).""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können Benutzer erstellen"}), 403 try: # JSON-Daten sicher extrahieren data = request.get_json() if not data: return jsonify({"error": "Keine JSON-Daten empfangen"}), 400 # Pflichtfelder prüfen mit detaillierteren Meldungen required_fields = ["username", "email", "password"] missing_fields = [] for field in required_fields: if field not in data: missing_fields.append(f"'{field}' fehlt") elif not data[field] or not str(data[field]).strip(): missing_fields.append(f"'{field}' ist leer") if missing_fields: return jsonify({ "error": "Pflichtfelder fehlen oder sind leer", "details": missing_fields }), 400 # Daten extrahieren und bereinigen username = str(data["username"]).strip() email = str(data["email"]).strip().lower() password = str(data["password"]) name = str(data.get("name", "")).strip() # E-Mail-Validierung import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): return jsonify({"error": "Ungültige E-Mail-Adresse"}), 400 # Username-Validierung (nur alphanumerische Zeichen und Unterstriche) username_pattern = r'^[a-zA-Z0-9_]{3,30}$' if not re.match(username_pattern, username): return jsonify({ "error": "Ungültiger Benutzername", "details": "Benutzername muss 3-30 Zeichen lang sein und darf nur Buchstaben, Zahlen und Unterstriche enthalten" }), 400 # Passwort-Validierung if len(password) < 6: return jsonify({ "error": "Passwort zu kurz", "details": "Passwort muss mindestens 6 Zeichen lang sein" }), 400 # Starke Passwort-Validierung (optional) if len(password) < 8: user_logger.warning(f"Schwaches Passwort für neuen Benutzer {username}") db_session = get_db_session() try: # Prüfen, ob bereits ein Benutzer mit diesem Benutzernamen existiert existing_username = db_session.query(User).filter(User.username == username).first() if existing_username: db_session.close() return jsonify({ "error": "Benutzername bereits vergeben", "details": f"Ein Benutzer mit dem Benutzernamen '{username}' existiert bereits" }), 400 # Prüfen, ob bereits ein Benutzer mit dieser E-Mail existiert existing_email = db_session.query(User).filter(User.email == email).first() if existing_email: db_session.close() return jsonify({ "error": "E-Mail-Adresse bereits vergeben", "details": f"Ein Benutzer mit der E-Mail-Adresse '{email}' existiert bereits" }), 400 # Rolle bestimmen is_admin = bool(data.get("is_admin", False)) role = "admin" if is_admin else "user" # Neuen Benutzer erstellen new_user = User( username=username, email=email, name=name if name else username, # Fallback auf username wenn name leer role=role, active=True, created_at=datetime.now() ) # Optionale Felder setzen if "department" in data and data["department"]: new_user.department = str(data["department"]).strip() if "position" in data and data["position"]: new_user.position = str(data["position"]).strip() if "phone" in data and data["phone"]: new_user.phone = str(data["phone"]).strip() # Passwort setzen new_user.set_password(password) # Benutzer zur Datenbank hinzufügen db_session.add(new_user) db_session.commit() # Erfolgreiche Antwort mit Benutzerdaten user_data = { "id": new_user.id, "username": new_user.username, "email": new_user.email, "name": new_user.name, "role": new_user.role, "is_admin": new_user.is_admin, "active": new_user.active, "department": new_user.department, "position": new_user.position, "phone": new_user.phone, "created_at": new_user.created_at.isoformat() } db_session.close() user_logger.info(f"Neuer Benutzer '{new_user.username}' ({new_user.email}) erfolgreich erstellt von Admin {current_user.id}") return jsonify({ "success": True, "message": f"Benutzer '{new_user.username}' erfolgreich erstellt", "user": user_data }), 201 except Exception as db_error: db_session.rollback() db_session.close() user_logger.error(f"Datenbankfehler beim Erstellen des Benutzers: {str(db_error)}") return jsonify({ "error": "Datenbankfehler beim Erstellen des Benutzers", "details": "Bitte versuchen Sie es erneut" }), 500 except ValueError as ve: user_logger.warning(f"Validierungsfehler beim Erstellen eines Benutzers: {str(ve)}") return jsonify({ "error": "Ungültige Eingabedaten", "details": str(ve) }), 400 except Exception as e: user_logger.error(f"Unerwarteter Fehler beim Erstellen eines Benutzers: {str(e)}") return jsonify({ "error": "Interner Serverfehler", "details": "Ein unerwarteter Fehler ist aufgetreten" }), 500 @app.route("/api/admin/users/", methods=["GET"]) @login_required @admin_required def get_user_api(user_id): """Gibt einen einzelnen Benutzer zurück (nur für Admins).""" try: db_session = get_db_session() user = db_session.get(User, user_id) if not user: db_session.close() return jsonify({"error": "Benutzer nicht gefunden"}), 404 user_data = { "id": user.id, "username": user.username, "email": user.email, "name": user.name or "", "role": user.role, "is_admin": user.is_admin, "is_active": user.is_active, "created_at": user.created_at.isoformat() if user.created_at else None, "last_login": user.last_login.isoformat() if hasattr(user, 'last_login') and user.last_login else None } db_session.close() return jsonify({"success": True, "user": user_data}) except Exception as e: user_logger.error(f"Fehler beim Abrufen des Benutzers {user_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 @app.route("/api/admin/users/", methods=["PUT"]) @login_required @admin_required def update_user_api(user_id): """Aktualisiert einen Benutzer (nur für Admins).""" try: data = request.json db_session = get_db_session() user = db_session.get(User, user_id) if not user: db_session.close() return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Prüfen, ob bereits ein anderer Benutzer mit dieser E-Mail existiert if "email" in data and data["email"] != user.email: existing_user = db_session.query(User).filter( User.email == data["email"], User.id != user_id ).first() if existing_user: db_session.close() return jsonify({"error": "Ein Benutzer mit dieser E-Mail-Adresse existiert bereits"}), 400 # Aktualisierbare Felder if "email" in data: user.email = data["email"] if "username" in data: user.username = data["username"] if "name" in data: user.name = data["name"] if "is_admin" in data: user.role = "admin" if data["is_admin"] else "user" if "is_active" in data: user.is_active = data["is_active"] # Passwort separat behandeln if "password" in data and data["password"]: user.set_password(data["password"]) db_session.commit() user_data = { "id": user.id, "username": user.username, "email": user.email, "name": user.name, "role": user.role, "is_admin": user.is_admin, "is_active": user.is_active, "created_at": user.created_at.isoformat() if user.created_at else None } db_session.close() user_logger.info(f"Benutzer {user_id} aktualisiert von Admin {current_user.id}") return jsonify({"success": True, "user": user_data}) except Exception as e: user_logger.error(f"Fehler beim Aktualisieren des Benutzers {user_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 @app.route("/api/admin/printers//toggle", methods=["POST"]) @login_required def toggle_printer_power(printer_id): """ Schaltet einen Drucker über die zugehörige Steckdose ein/aus. """ if not current_user.is_admin: return jsonify({"error": "Administratorrechte erforderlich"}), 403 try: # Robuste JSON-Datenverarbeitung data = {} try: if request.is_json and request.get_json(): data = request.get_json() elif request.form: # Fallback für Form-Daten data = request.form.to_dict() except Exception as json_error: printers_logger.warning(f"Fehler beim Parsen der JSON-Daten für Drucker {printer_id}: {str(json_error)}") # Verwende Standard-Werte wenn JSON-Parsing fehlschlägt data = {} # Standard-Zustand ermitteln (Toggle-Verhalten) db_session = get_db_session() printer = db_session.get(Printer, printer_id) if not printer: db_session.close() return jsonify({"error": "Drucker nicht gefunden"}), 404 # Aktuellen Status ermitteln für Toggle-Verhalten current_status = getattr(printer, 'status', 'offline') current_active = getattr(printer, 'active', False) # Zielzustand bestimmen if 'state' in data: # Expliziter Zustand angegeben state = bool(data.get("state", True)) else: # Toggle-Verhalten: Umschalten basierend auf aktuellem Status state = not (current_status == "available" and current_active) db_session.close() # Steckdose schalten from utils.job_scheduler import toggle_plug success = toggle_plug(printer_id, state) if success: action = "eingeschaltet" if state else "ausgeschaltet" printers_logger.info(f"Drucker {printer.name} (ID: {printer_id}) erfolgreich {action} von Admin {current_user.name}") return jsonify({ "success": True, "message": f"Drucker erfolgreich {action}", "printer_id": printer_id, "printer_name": printer.name, "state": state, "action": action }) else: printers_logger.error(f"Fehler beim Schalten der Steckdose für Drucker {printer_id}") return jsonify({ "success": False, "error": "Fehler beim Schalten der Steckdose", "printer_id": printer_id }), 500 except Exception as e: printers_logger.error(f"Fehler beim Schalten von Drucker {printer_id}: {str(e)}") return jsonify({ "success": False, "error": "Interner Serverfehler", "details": str(e) }), 500 @app.route("/api/admin/printers//test-tapo", methods=["POST"]) @login_required @admin_required def test_printer_tapo_connection(printer_id): """ Testet die Tapo-Steckdosen-Verbindung für einen Drucker. """ try: db_session = get_db_session() printer = db_session.get(Printer, printer_id) if not printer: db_session.close() return jsonify({"error": "Drucker nicht gefunden"}), 404 if not printer.plug_ip or not printer.plug_username or not printer.plug_password: db_session.close() return jsonify({ "error": "Unvollständige Tapo-Konfiguration", "missing": [ key for key, value in { "plug_ip": printer.plug_ip, "plug_username": printer.plug_username, "plug_password": printer.plug_password }.items() if not value ] }), 400 db_session.close() # Tapo-Verbindung testen from utils.job_scheduler import test_tapo_connection test_result = test_tapo_connection( printer.plug_ip, printer.plug_username, printer.plug_password ) return jsonify({ "printer_id": printer_id, "printer_name": printer.name, "tapo_test": test_result }) except Exception as e: printers_logger.error(f"Fehler beim Testen der Tapo-Verbindung für Drucker {printer_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler beim Verbindungstest"}), 500 @app.route("/api/admin/printers/test-all-tapo", methods=["POST"]) @login_required @admin_required def test_all_printers_tapo_connection(): """ Testet die Tapo-Steckdosen-Verbindung für alle Drucker. Nützlich für Diagnose und Setup-Validierung. """ try: db_session = get_db_session() printers = db_session.query(Printer).filter(Printer.active == True).all() db_session.close() if not printers: return jsonify({ "message": "Keine aktiven Drucker gefunden", "results": [] }) # Alle Drucker testen from utils.job_scheduler import test_tapo_connection results = [] for printer in printers: result = { "printer_id": printer.id, "printer_name": printer.name, "plug_ip": printer.plug_ip, "has_config": bool(printer.plug_ip and printer.plug_username and printer.plug_password) } if result["has_config"]: # Tapo-Verbindung testen test_result = test_tapo_connection( printer.plug_ip, printer.plug_username, printer.plug_password ) result["tapo_test"] = test_result else: result["tapo_test"] = { "success": False, "error": "Unvollständige Tapo-Konfiguration", "device_info": None, "status": "unconfigured" } result["missing_config"] = [ key for key, value in { "plug_ip": printer.plug_ip, "plug_username": printer.plug_username, "plug_password": printer.plug_password }.items() if not value ] results.append(result) # Zusammenfassung erstellen total_printers = len(results) successful_connections = sum(1 for r in results if r["tapo_test"]["success"]) configured_printers = sum(1 for r in results if r["has_config"]) return jsonify({ "summary": { "total_printers": total_printers, "configured_printers": configured_printers, "successful_connections": successful_connections, "success_rate": round(successful_connections / total_printers * 100, 1) if total_printers > 0 else 0 }, "results": results }) except Exception as e: printers_logger.error(f"Fehler beim Testen aller Tapo-Verbindungen: {str(e)}") return jsonify({"error": "Interner Serverfehler beim Massentest"}), 500 # ===== ADMIN FORM ENDPOINTS ===== @app.route("/admin/users/add", methods=["GET"]) @login_required @admin_required def admin_add_user_page(): """Zeigt die Seite zum Hinzufügen neuer Benutzer an.""" try: app_logger.info(f"Admin-Benutzer-Hinzufügen-Seite aufgerufen von User {current_user.id}") return render_template("admin_add_user.html") except Exception as e: app_logger.error(f"Fehler beim Laden der Benutzer-Hinzufügen-Seite: {str(e)}") flash("Fehler beim Laden der Benutzer-Hinzufügen-Seite.", "error") return redirect(url_for("admin_page", tab="users")) @app.route("/admin/printers/add", methods=["GET"]) @login_required @admin_required def admin_add_printer_page(): """Zeigt die Seite zum Hinzufügen neuer Drucker an.""" try: app_logger.info(f"Admin-Drucker-Hinzufügen-Seite aufgerufen von User {current_user.id}") return render_template("admin_add_printer.html") except Exception as e: app_logger.error(f"Fehler beim Laden der Drucker-Hinzufügen-Seite: {str(e)}") flash("Fehler beim Laden der Drucker-Hinzufügen-Seite.", "error") return redirect(url_for("admin_page", tab="printers")) @app.route("/admin/printers//edit", methods=["GET"]) @login_required @admin_required def admin_edit_printer_page(printer_id): """Zeigt die Drucker-Bearbeitungsseite an.""" try: db_session = get_db_session() printer = db_session.get(Printer, printer_id) if not printer: db_session.close() flash("Drucker nicht gefunden.", "error") return redirect(url_for("admin_page", tab="printers")) printer_data = { "id": printer.id, "name": printer.name, "model": printer.model or 'Unbekanntes Modell', "location": printer.location or 'Unbekannter Standort', "mac_address": printer.mac_address, "plug_ip": printer.plug_ip, "status": printer.status or "offline", "active": printer.active if hasattr(printer, 'active') else True, "created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat() } db_session.close() app_logger.info(f"Admin-Drucker-Bearbeiten-Seite aufgerufen für Drucker {printer_id} von User {current_user.id}") return render_template("admin_edit_printer.html", printer=printer_data) except Exception as e: app_logger.error(f"Fehler beim Laden der Drucker-Bearbeitungsseite: {str(e)}") flash("Fehler beim Laden der Drucker-Daten.", "error") return redirect(url_for("admin_page", tab="printers")) @app.route("/admin/users/create", methods=["POST"]) @login_required def admin_create_user_form(): """Erstellt einen neuen Benutzer über HTML-Form (nur für Admins).""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) try: # Form-Daten lesen email = request.form.get("email", "").strip() name = request.form.get("name", "").strip() password = request.form.get("password", "").strip() role = request.form.get("role", "user").strip() # Pflichtfelder prüfen if not email or not password: flash("E-Mail und Passwort sind erforderlich.", "error") return redirect(url_for("admin_add_user_page")) # E-Mail validieren import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): flash("Ungültige E-Mail-Adresse.", "error") return redirect(url_for("admin_add_user_page")) db_session = get_db_session() # Prüfen, ob bereits ein Benutzer mit dieser E-Mail existiert existing_user = db_session.query(User).filter(User.email == email).first() if existing_user: db_session.close() flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error") return redirect(url_for("admin_add_user_page")) # E-Mail als Username verwenden (falls kein separates Username-Feld) username = email.split('@')[0] counter = 1 original_username = username while db_session.query(User).filter(User.username == username).first(): username = f"{original_username}{counter}" counter += 1 # Neuen Benutzer erstellen new_user = User( username=username, email=email, name=name, role=role, created_at=datetime.now() ) # Passwort setzen new_user.set_password(password) db_session.add(new_user) db_session.commit() db_session.close() user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}") flash(f"Benutzer '{new_user.email}' erfolgreich erstellt.", "success") return redirect(url_for("admin_page", tab="users")) except Exception as e: user_logger.error(f"Fehler beim Erstellen eines Benutzers über Form: {str(e)}") flash("Fehler beim Erstellen des Benutzers.", "error") return redirect(url_for("admin_add_user_page")) @app.route("/admin/printers/create", methods=["POST"]) @login_required def admin_create_printer_form(): """Erstellt einen neuen Drucker über HTML-Form (nur für Admins).""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) try: # Form-Daten lesen name = request.form.get("name", "").strip() ip_address = request.form.get("ip_address", "").strip() model = request.form.get("model", "").strip() location = request.form.get("location", "").strip() description = request.form.get("description", "").strip() status = request.form.get("status", "available").strip() # Pflichtfelder prüfen if not name or not ip_address: flash("Name und IP-Adresse sind erforderlich.", "error") return redirect(url_for("admin_add_printer_page")) # IP-Adresse validieren import re ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' if not re.match(ip_pattern, ip_address): flash("Ungültige IP-Adresse.", "error") return redirect(url_for("admin_add_printer_page")) db_session = get_db_session() # Prüfen, ob bereits ein Drucker mit diesem Namen existiert existing_printer = db_session.query(Printer).filter(Printer.name == name).first() if existing_printer: db_session.close() flash("Ein Drucker mit diesem Namen existiert bereits.", "error") return redirect(url_for("admin_add_printer_page")) # Neuen Drucker erstellen new_printer = Printer( name=name, model=model, location=location, description=description, mac_address="", # Wird später ausgefüllt plug_ip=ip_address, status=status, created_at=datetime.now() ) db_session.add(new_printer) db_session.commit() db_session.close() printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}") flash(f"Drucker '{new_printer.name}' erfolgreich erstellt.", "success") return redirect(url_for("admin_page", tab="printers")) except Exception as e: printers_logger.error(f"Fehler beim Erstellen eines Druckers über Form: {str(e)}") flash("Fehler beim Erstellen des Druckers.", "error") return redirect(url_for("admin_add_printer_page")) @app.route("/admin/users//edit", methods=["GET"]) @login_required def admin_edit_user_page(user_id): """Zeigt die Benutzer-Bearbeitungsseite an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) db_session = get_db_session() try: user = db_session.get(User, user_id) if not user: flash("Benutzer nicht gefunden.", "error") return redirect(url_for("admin_page", tab="users")) user_data = { "id": user.id, "username": user.username, "email": user.email, "name": user.name or "", "is_admin": user.is_admin, "active": user.active, "created_at": user.created_at.isoformat() if user.created_at else datetime.now().isoformat() } db_session.close() return render_template("admin_edit_user.html", user=user_data) except Exception as e: db_session.close() app_logger.error(f"Fehler beim Laden der Benutzer-Daten: {str(e)}") flash("Fehler beim Laden der Benutzer-Daten.", "error") return redirect(url_for("admin_page", tab="users")) @app.route("/admin/users//update", methods=["POST"]) @login_required def admin_update_user_form(user_id): """Aktualisiert einen Benutzer über HTML-Form (nur für Admins).""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) try: # Form-Daten lesen email = request.form.get("email", "").strip() name = request.form.get("name", "").strip() password = request.form.get("password", "").strip() role = request.form.get("role", "user").strip() is_active = request.form.get("is_active", "true").strip() == "true" # Pflichtfelder prüfen if not email: flash("E-Mail-Adresse ist erforderlich.", "error") return redirect(url_for("admin_edit_user_page", user_id=user_id)) # E-Mail validieren import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): flash("Ungültige E-Mail-Adresse.", "error") return redirect(url_for("admin_edit_user_page", user_id=user_id)) db_session = get_db_session() user = db_session.get(User, user_id) if not user: db_session.close() flash("Benutzer nicht gefunden.", "error") return redirect(url_for("admin_page", tab="users")) # Prüfen, ob bereits ein anderer Benutzer mit dieser E-Mail existiert existing_user = db_session.query(User).filter( User.email == email, User.id != user_id ).first() if existing_user: db_session.close() flash("Ein Benutzer mit dieser E-Mail-Adresse existiert bereits.", "error") return redirect(url_for("admin_edit_user_page", user_id=user_id)) # Benutzer aktualisieren user.email = email if name: user.name = name # Passwort nur ändern, wenn eines angegeben wurde if password: user.password_hash = generate_password_hash(password) user.role = "admin" if role == "admin" else "user" user.active = is_active db_session.commit() db_session.close() auth_logger.info(f"Benutzer '{user.email}' (ID: {user_id}) aktualisiert von Admin {current_user.id}") flash(f"Benutzer '{user.email}' erfolgreich aktualisiert.", "success") return redirect(url_for("admin_page", tab="users")) except Exception as e: auth_logger.error(f"Fehler beim Aktualisieren eines Benutzers über Form: {str(e)}") flash("Fehler beim Aktualisieren des Benutzers.", "error") return redirect(url_for("admin_edit_user_page", user_id=user_id)) @app.route("/admin/printers//update", methods=["POST"]) @login_required def admin_update_printer_form(printer_id): """Aktualisiert einen Drucker über HTML-Form (nur für Admins).""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) try: # Form-Daten lesen name = request.form.get("name", "").strip() ip_address = request.form.get("ip_address", "").strip() model = request.form.get("model", "").strip() location = request.form.get("location", "").strip() description = request.form.get("description", "").strip() status = request.form.get("status", "available").strip() # Pflichtfelder prüfen if not name or not ip_address: flash("Name und IP-Adresse sind erforderlich.", "error") return redirect(url_for("admin_edit_printer_page", printer_id=printer_id)) # IP-Adresse validieren import re ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' if not re.match(ip_pattern, ip_address): flash("Ungültige IP-Adresse.", "error") return redirect(url_for("admin_edit_printer_page", printer_id=printer_id)) db_session = get_db_session() printer = db_session.get(Printer, printer_id) if not printer: db_session.close() flash("Drucker nicht gefunden.", "error") return redirect(url_for("admin_page", tab="printers")) # Drucker aktualisieren printer.name = name printer.model = model printer.location = location printer.description = description printer.plug_ip = ip_address printer.status = status db_session.commit() db_session.close() printers_logger.info(f"Drucker '{printer.name}' (ID: {printer_id}) aktualisiert von Admin {current_user.id}") flash(f"Drucker '{printer.name}' erfolgreich aktualisiert.", "success") return redirect(url_for("admin_page", tab="printers")) except Exception as e: printers_logger.error(f"Fehler beim Aktualisieren eines Druckers über Form: {str(e)}") flash("Fehler beim Aktualisieren des Druckers.", "error") return redirect(url_for("admin_edit_printer_page", printer_id=printer_id)) @app.route("/api/admin/users/", methods=["DELETE"]) @login_required @admin_required def delete_user(user_id): """Löscht einen Benutzer (nur für Admins).""" # Verhindern, dass sich der Admin selbst löscht if user_id == current_user.id: return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400 try: db_session = get_db_session() user = db_session.get(User, user_id) if not user: db_session.close() return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Prüfen, ob noch aktive Jobs für diesen Benutzer existieren active_jobs = db_session.query(Job).filter( Job.user_id == user_id, Job.status.in_(["scheduled", "running"]) ).count() if active_jobs > 0: db_session.close() return jsonify({"error": f"Benutzer kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400 username = user.username or user.email db_session.delete(user) db_session.commit() db_session.close() user_logger.info(f"Benutzer '{username}' (ID: {user_id}) gelöscht von Admin {current_user.id}") return jsonify({"success": True, "message": "Benutzer erfolgreich gelöscht"}) except Exception as e: user_logger.error(f"Fehler beim Löschen des Benutzers {user_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 # ===== FILE-UPLOAD-ROUTEN ===== @app.route('/api/upload/job', methods=['POST']) @login_required def upload_job_file(): """ Lädt eine Datei für einen Druckjob hoch Form Data: file: Die hochzuladende Datei job_name: Name des Jobs (optional) """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] job_name = request.form.get('job_name', '') if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Metadaten für die Datei metadata = { 'uploader_id': current_user.id, 'uploader_name': current_user.username, 'job_name': job_name } # Datei speichern result = save_job_file(file, current_user.id, metadata) if result: relative_path, absolute_path, file_metadata = result app_logger.info(f"Job-Datei hochgeladen: {file_metadata['original_filename']} von User {current_user.id}") return jsonify({ 'success': True, 'message': 'Datei erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'], 'metadata': file_metadata }) else: return jsonify({'error': 'Fehler beim Speichern der Datei'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen der Job-Datei: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/upload/guest', methods=['POST']) def upload_guest_file(): """ Lädt eine Datei für einen Gastauftrag hoch Form Data: file: Die hochzuladende Datei guest_name: Name des Gasts (optional) guest_email: E-Mail des Gasts (optional) """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] guest_name = request.form.get('guest_name', '') guest_email = request.form.get('guest_email', '') if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Metadaten für die Datei metadata = { 'guest_name': guest_name, 'guest_email': guest_email } # Datei speichern result = save_guest_file(file, metadata) if result: relative_path, absolute_path, file_metadata = result app_logger.info(f"Gast-Datei hochgeladen: {file_metadata['original_filename']} für {guest_name or 'Unbekannt'}") return jsonify({ 'success': True, 'message': 'Datei erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'], 'metadata': file_metadata }) else: return jsonify({'error': 'Fehler beim Speichern der Datei'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen der Gast-Datei: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/upload/avatar', methods=['POST']) @login_required def upload_avatar(): """ Lädt ein Avatar-Bild für den aktuellen Benutzer hoch Form Data: file: Das Avatar-Bild """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Nur Bilder erlauben allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'} if not file.filename or '.' not in file.filename: return jsonify({'error': 'Ungültiger Dateityp'}), 400 file_ext = file.filename.rsplit('.', 1)[1].lower() if file_ext not in allowed_extensions: return jsonify({'error': 'Nur Bilddateien sind erlaubt (PNG, JPG, JPEG, GIF, WebP)'}), 400 # Alte Avatar-Datei löschen falls vorhanden db_session = get_db_session() user = db_session.get(User, current_user.id) if user and user.avatar_path: delete_file_safe(user.avatar_path) # Neue Avatar-Datei speichern result = save_avatar_file(file, current_user.id) if result: relative_path, absolute_path, file_metadata = result # Avatar-Pfad in der Datenbank aktualisieren user.avatar_path = relative_path db_session.commit() db_session.close() app_logger.info(f"Avatar hochgeladen für User {current_user.id}") return jsonify({ 'success': True, 'message': 'Avatar erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'] }) else: db_session.close() return jsonify({'error': 'Fehler beim Speichern des Avatars'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen des Avatars: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/upload/asset', methods=['POST']) @login_required @admin_required def upload_asset(): """ Lädt ein statisches Asset hoch (nur für Administratoren) Form Data: file: Die Asset-Datei asset_name: Name des Assets (optional) """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] asset_name = request.form.get('asset_name', '') if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Metadaten für die Datei metadata = { 'uploader_id': current_user.id, 'uploader_name': current_user.username, 'asset_name': asset_name } # Datei speichern result = save_asset_file(file, current_user.id, metadata) if result: relative_path, absolute_path, file_metadata = result app_logger.info(f"Asset hochgeladen: {file_metadata['original_filename']} von Admin {current_user.id}") return jsonify({ 'success': True, 'message': 'Asset erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'], 'metadata': file_metadata }) else: return jsonify({'error': 'Fehler beim Speichern des Assets'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen des Assets: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/upload/log', methods=['POST']) @login_required @admin_required def upload_log(): """ Lädt eine Log-Datei hoch (nur für Administratoren) Form Data: file: Die Log-Datei log_type: Typ des Logs (optional) """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] log_type = request.form.get('log_type', 'allgemein') if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Metadaten für die Datei metadata = { 'uploader_id': current_user.id, 'uploader_name': current_user.username, 'log_type': log_type } # Datei speichern result = save_log_file(file, current_user.id, metadata) if result: relative_path, absolute_path, file_metadata = result app_logger.info(f"Log-Datei hochgeladen: {file_metadata['original_filename']} von Admin {current_user.id}") return jsonify({ 'success': True, 'message': 'Log-Datei erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'], 'metadata': file_metadata }) else: return jsonify({'error': 'Fehler beim Speichern der Log-Datei'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen der Log-Datei: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/upload/backup', methods=['POST']) @login_required @admin_required def upload_backup(): """ Lädt eine Backup-Datei hoch (nur für Administratoren) Form Data: file: Die Backup-Datei backup_type: Typ des Backups (optional) """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] backup_type = request.form.get('backup_type', 'allgemein') if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Metadaten für die Datei metadata = { 'uploader_id': current_user.id, 'uploader_name': current_user.username, 'backup_type': backup_type } # Datei speichern result = save_backup_file(file, current_user.id, metadata) if result: relative_path, absolute_path, file_metadata = result app_logger.info(f"Backup-Datei hochgeladen: {file_metadata['original_filename']} von Admin {current_user.id}") return jsonify({ 'success': True, 'message': 'Backup-Datei erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'], 'metadata': file_metadata }) else: return jsonify({'error': 'Fehler beim Speichern der Backup-Datei'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen der Backup-Datei: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/upload/temp', methods=['POST']) @login_required def upload_temp_file(): """ Lädt eine temporäre Datei hoch Form Data: file: Die temporäre Datei purpose: Verwendungszweck (optional) """ try: if 'file' not in request.files: return jsonify({'error': 'Keine Datei ausgewählt'}), 400 file = request.files['file'] purpose = request.form.get('purpose', '') if file.filename == '': return jsonify({'error': 'Keine Datei ausgewählt'}), 400 # Metadaten für die Datei metadata = { 'uploader_id': current_user.id, 'uploader_name': current_user.username, 'purpose': purpose } # Datei speichern result = save_temp_file(file, current_user.id, metadata) if result: relative_path, absolute_path, file_metadata = result app_logger.info(f"Temporäre Datei hochgeladen: {file_metadata['original_filename']} von User {current_user.id}") return jsonify({ 'success': True, 'message': 'Temporäre Datei erfolgreich hochgeladen', 'file_path': relative_path, 'filename': file_metadata['original_filename'], 'unique_filename': file_metadata['unique_filename'], 'file_size': file_metadata['file_size'], 'metadata': file_metadata }) else: return jsonify({'error': 'Fehler beim Speichern der temporären Datei'}), 500 except Exception as e: app_logger.error(f"Fehler beim Hochladen der temporären Datei: {str(e)}") return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500 @app.route('/api/files/', methods=['GET']) @login_required def serve_uploaded_file(file_path): """ Stellt hochgeladene Dateien bereit (mit Zugriffskontrolle) """ try: # Datei-Info abrufen file_info = file_manager.get_file_info(file_path) if not file_info: return jsonify({'error': 'Datei nicht gefunden'}), 404 # Zugriffskontrolle basierend auf Dateikategorie if file_path.startswith('jobs/'): # Job-Dateien: Nur Besitzer und Admins if not current_user.is_admin: # Prüfen ob Benutzer der Besitzer ist if f"user_{current_user.id}" not in file_path: return jsonify({'error': 'Zugriff verweigert'}), 403 elif file_path.startswith('guests/'): # Gast-Dateien: Nur Admins if not current_user.is_admin: return jsonify({'error': 'Zugriff verweigert'}), 403 elif file_path.startswith('avatars/'): # Avatar-Dateien: Öffentlich zugänglich für angemeldete Benutzer pass elif file_path.startswith('temp/'): # Temporäre Dateien: Nur Besitzer und Admins if not current_user.is_admin: # Prüfen ob Benutzer der Besitzer ist if f"user_{current_user.id}" not in file_path: return jsonify({'error': 'Zugriff verweigert'}), 403 else: # Andere Dateien (assets, logs, backups): Nur Admins if not current_user.is_admin: return jsonify({'error': 'Zugriff verweigert'}), 403 # Datei bereitstellen return send_file(file_info['absolute_path'], as_attachment=False) except Exception as e: app_logger.error(f"Fehler beim Bereitstellen der Datei {file_path}: {str(e)}") return jsonify({'error': 'Fehler beim Laden der Datei'}), 500 @app.route('/api/files/', methods=['DELETE']) @login_required def delete_uploaded_file(file_path): """ Löscht eine hochgeladene Datei (mit Zugriffskontrolle) """ try: # Datei-Info abrufen file_info = file_manager.get_file_info(file_path) if not file_info: return jsonify({'error': 'Datei nicht gefunden'}), 404 # Zugriffskontrolle basierend auf Dateikategorie if file_path.startswith('jobs/'): # Job-Dateien: Nur Besitzer und Admins if not current_user.is_admin: # Prüfen ob Benutzer der Besitzer ist if f"user_{current_user.id}" not in file_path: return jsonify({'error': 'Zugriff verweigert'}), 403 elif file_path.startswith('guests/'): # Gast-Dateien: Nur Admins if not current_user.is_admin: return jsonify({'error': 'Zugriff verweigert'}), 403 elif file_path.startswith('avatars/'): # Avatar-Dateien: Nur Besitzer und Admins if not current_user.is_admin: # Prüfen ob Benutzer der Besitzer ist if f"user_{current_user.id}" not in file_path: return jsonify({'error': 'Zugriff verweigert'}), 403 elif file_path.startswith('temp/'): # Temporäre Dateien: Nur Besitzer und Admins if not current_user.is_admin: # Prüfen ob Benutzer der Besitzer ist if f"user_{current_user.id}" not in file_path: return jsonify({'error': 'Zugriff verweigert'}), 403 else: # Andere Dateien (assets, logs, backups): Nur Admins if not current_user.is_admin: return jsonify({'error': 'Zugriff verweigert'}), 403 # Datei löschen if delete_file_safe(file_path): app_logger.info(f"Datei gelöscht: {file_path} von User {current_user.id}") return jsonify({'success': True, 'message': 'Datei erfolgreich gelöscht'}) else: return jsonify({'error': 'Fehler beim Löschen der Datei'}), 500 except Exception as e: app_logger.error(f"Fehler beim Löschen der Datei {file_path}: {str(e)}") return jsonify({'error': f'Fehler beim Löschen der Datei: {str(e)}'}), 500 @app.route('/api/admin/files/stats', methods=['GET']) @login_required @admin_required def get_file_stats(): """ Gibt Statistiken zu allen Dateien zurück (nur für Administratoren) """ try: stats = file_manager.get_category_stats() # Gesamtstatistiken berechnen total_files = sum(category.get('file_count', 0) for category in stats.values()) total_size = sum(category.get('total_size', 0) for category in stats.values()) return jsonify({ 'success': True, 'categories': stats, 'totals': { 'file_count': total_files, 'total_size': total_size, 'total_size_mb': round(total_size / (1024 * 1024), 2) } }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Datei-Statistiken: {str(e)}") return jsonify({'error': f'Fehler beim Abrufen der Statistiken: {str(e)}'}), 500 @app.route('/api/admin/files/cleanup', methods=['POST']) @login_required @admin_required def cleanup_temp_files(): """ Räumt temporäre Dateien auf (nur für Administratoren) """ try: data = request.get_json() or {} max_age_hours = data.get('max_age_hours', 24) # Temporäre Dateien aufräumen deleted_count = file_manager.cleanup_temp_files(max_age_hours) app_logger.info(f"Temporäre Dateien aufgeräumt: {deleted_count} Dateien gelöscht") return jsonify({ 'success': True, 'message': f'{deleted_count} temporäre Dateien erfolgreich gelöscht', 'deleted_count': deleted_count }) except Exception as e: app_logger.error(f"Fehler beim Aufräumen temporärer Dateien: {str(e)}") return jsonify({'error': f'Fehler beim Aufräumen: {str(e)}'}), 500 # ===== WEITERE API-ROUTEN ===== # ===== JOB-MANAGEMENT-ROUTEN ===== @app.route("/api/jobs/current", methods=["GET"]) @login_required def get_current_job(): """ Gibt den aktuellen Job des Benutzers zurück. Legacy-Route für Kompatibilität - sollte durch Blueprint ersetzt werden. """ db_session = get_db_session() try: current_job = db_session.query(Job).filter( Job.user_id == int(current_user.id), Job.status.in_(["scheduled", "running"]) ).order_by(Job.start_at).first() if current_job: job_data = current_job.to_dict() else: job_data = None return jsonify(job_data) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen des aktuellen Jobs: {str(e)}") return jsonify({"error": str(e)}), 500 finally: db_session.close() @app.route("/api/jobs/", methods=["GET"]) @login_required @job_owner_required def get_job_detail(job_id): """ Gibt Details zu einem spezifischen Job zurück. """ db_session = get_db_session() try: # Eagerly load the user and printer relationships job = db_session.query(Job).options( joinedload(Job.user), joinedload(Job.printer) ).filter(Job.id == job_id).first() if not job: return jsonify({"error": "Job nicht gefunden"}), 404 # Convert to dict before closing session job_dict = job.to_dict() return jsonify(job_dict) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 finally: db_session.close() @app.route("/api/jobs/", methods=["DELETE"]) @login_required @job_owner_required def delete_job(job_id): """ Löscht einen Job. """ db_session = get_db_session() try: job = db_session.get(Job, job_id) if not job: return jsonify({"error": "Job nicht gefunden"}), 404 # Prüfen, ob der Job gelöscht werden kann if job.status == "running": return jsonify({"error": "Laufende Jobs können nicht gelöscht werden"}), 400 job_name = job.name db_session.delete(job) db_session.commit() jobs_logger.info(f"Job '{job_name}' (ID: {job_id}) gelöscht von Benutzer {current_user.id}") return jsonify({"success": True, "message": "Job erfolgreich gelöscht"}) except Exception as e: jobs_logger.error(f"Fehler beim Löschen des Jobs {job_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 finally: db_session.close() @app.route("/api/jobs", methods=["GET"]) @login_required def get_jobs(): """ Gibt alle Jobs zurück. Admins sehen alle Jobs, normale Benutzer nur ihre eigenen. Unterstützt Paginierung und Filterung. """ db_session = get_db_session() try: from sqlalchemy.orm import joinedload # Paginierung und Filter-Parameter page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) status_filter = request.args.get('status') # Query aufbauen mit Eager Loading query = db_session.query(Job).options( joinedload(Job.user), joinedload(Job.printer) ) # Admin sieht alle Jobs, User nur eigene if not current_user.is_admin: query = query.filter(Job.user_id == int(current_user.id)) # Status-Filter anwenden if status_filter: query = query.filter(Job.status == status_filter) # Sortierung: neueste zuerst query = query.order_by(Job.created_at.desc()) # Gesamtanzahl für Paginierung ermitteln total_count = query.count() # Paginierung anwenden offset = (page - 1) * per_page jobs = query.offset(offset).limit(per_page).all() # Convert jobs to dictionaries before closing the session job_dicts = [job.to_dict() for job in jobs] jobs_logger.info(f"Jobs abgerufen: {len(job_dicts)} von {total_count} (Seite {page})") return jsonify({ "jobs": job_dicts, "pagination": { "page": page, "per_page": per_page, "total": total_count, "pages": (total_count + per_page - 1) // per_page } }) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}") return jsonify({"error": "Interner Serverfehler"}), 500 finally: db_session.close() @app.route('/api/jobs', methods=['POST']) @login_required @measure_execution_time(logger=jobs_logger, task_name="API-Job-Erstellung") def create_job(): """ Erstellt einen neuen Job. Body: { "name": str (optional), "description": str (optional), "printer_id": int, "start_iso": str, "duration_minutes": int, "file_path": str (optional) } """ db_session = get_db_session() try: data = request.json # Pflichtfelder prüfen required_fields = ["printer_id", "start_iso", "duration_minutes"] for field in required_fields: if field not in data: return jsonify({"error": f"Feld '{field}' fehlt"}), 400 # Daten extrahieren und validieren printer_id = int(data["printer_id"]) start_iso = data["start_iso"] duration_minutes = int(data["duration_minutes"]) # Optional: Jobtitel, Beschreibung und Dateipfad name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y %H:%M')}") description = data.get("description", "") file_path = data.get("file_path") # Start-Zeit parsen try: start_at = datetime.fromisoformat(start_iso.replace('Z', '+00:00')) except ValueError: return jsonify({"error": "Ungültiges Startdatum"}), 400 # Dauer validieren if duration_minutes <= 0: return jsonify({"error": "Dauer muss größer als 0 sein"}), 400 # End-Zeit berechnen end_at = start_at + timedelta(minutes=duration_minutes) # Prüfen, ob der Drucker existiert printer = db_session.get(Printer, printer_id) if not printer: return jsonify({"error": "Drucker nicht gefunden"}), 404 # Prüfen, ob der Drucker online ist printer_status, printer_active = check_printer_status(printer.plug_ip if printer.plug_ip else "") # Status basierend auf Drucker-Verfügbarkeit setzen if printer_status == "online" and printer_active: job_status = "scheduled" else: job_status = "waiting_for_printer" # Neuen Job erstellen new_job = Job( name=name, description=description, printer_id=printer_id, user_id=current_user.id, owner_id=current_user.id, start_at=start_at, end_at=end_at, status=job_status, file_path=file_path, duration_minutes=duration_minutes ) db_session.add(new_job) db_session.commit() # Job-Objekt für die Antwort serialisieren job_dict = new_job.to_dict() jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten") return jsonify({"job": job_dict}), 201 except Exception as e: jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 finally: db_session.close() @app.route('/api/jobs/', methods=['PUT']) @login_required @job_owner_required def update_job(job_id): """ Aktualisiert einen existierenden Job. """ db_session = get_db_session() try: data = request.json job = db_session.get(Job, job_id) if not job: return jsonify({"error": "Job nicht gefunden"}), 404 # Prüfen, ob der Job bearbeitet werden kann if job.status in ["finished", "aborted"]: return jsonify({"error": f"Job kann im Status '{job.status}' nicht bearbeitet werden"}), 400 # Felder aktualisieren, falls vorhanden if "name" in data: job.name = data["name"] if "description" in data: job.description = data["description"] if "notes" in data: job.notes = data["notes"] if "start_iso" in data: try: new_start = datetime.fromisoformat(data["start_iso"].replace('Z', '+00:00')) job.start_at = new_start # End-Zeit neu berechnen falls Duration verfügbar if job.duration_minutes: job.end_at = new_start + timedelta(minutes=job.duration_minutes) except ValueError: return jsonify({"error": "Ungültiges Startdatum"}), 400 if "duration_minutes" in data: duration = int(data["duration_minutes"]) if duration <= 0: return jsonify({"error": "Dauer muss größer als 0 sein"}), 400 job.duration_minutes = duration # End-Zeit neu berechnen if job.start_at: job.end_at = job.start_at + timedelta(minutes=duration) # Aktualisierungszeitpunkt setzen job.updated_at = datetime.now() db_session.commit() # Job-Objekt für die Antwort serialisieren job_dict = job.to_dict() jobs_logger.info(f"Job {job_id} aktualisiert") return jsonify({"job": job_dict}) except Exception as e: jobs_logger.error(f"Fehler beim Aktualisieren von Job {job_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 finally: db_session.close() @app.route('/api/jobs/active', methods=['GET']) @login_required def get_active_jobs(): """ Gibt alle aktiven Jobs zurück. """ db_session = get_db_session() try: from sqlalchemy.orm import joinedload query = db_session.query(Job).options( joinedload(Job.user), joinedload(Job.printer) ).filter( Job.status.in_(["scheduled", "running"]) ) # Normale Benutzer sehen nur ihre eigenen aktiven Jobs if not current_user.is_admin: query = query.filter(Job.user_id == current_user.id) active_jobs = query.all() result = [] for job in active_jobs: job_dict = job.to_dict() # Aktuelle Restzeit berechnen if job.status == "running" and job.end_at: remaining_time = job.end_at - datetime.now() if remaining_time.total_seconds() > 0: job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60) else: job_dict["remaining_minutes"] = 0 result.append(job_dict) return jsonify({"jobs": result}) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 finally: db_session.close() # ===== DRUCKER-ROUTEN ===== @app.route("/api/printers", methods=["GET"]) @login_required def get_printers(): """Gibt alle Drucker zurück - OHNE Status-Check für schnelleres Laden.""" db_session = get_db_session() try: # Windows-kompatible Timeout-Implementierung import threading import time printers = None timeout_occurred = False def fetch_printers(): nonlocal printers, timeout_occurred try: printers = db_session.query(Printer).all() except Exception as e: printers_logger.error(f"Datenbankfehler beim Laden der Drucker: {str(e)}") timeout_occurred = True # Starte Datenbankabfrage in separatem Thread thread = threading.Thread(target=fetch_printers) thread.daemon = True thread.start() thread.join(timeout=5) # 5 Sekunden Timeout if thread.is_alive() or timeout_occurred or printers is None: printers_logger.warning("Database timeout when fetching printers for basic loading") return jsonify({ 'error': 'Database timeout beim Laden der Drucker', 'timeout': True, 'printers': [] }), 408 # Drucker-Daten OHNE Status-Check zusammenstellen für schnelles Laden printer_data = [] current_time = datetime.now() for printer in printers: printer_data.append({ "id": printer.id, "name": printer.name, "model": printer.model or 'Unbekanntes Modell', "location": printer.location or 'Unbekannter Standort', "mac_address": printer.mac_address, "plug_ip": printer.plug_ip, "status": printer.status or "offline", # Letzter bekannter Status "active": printer.active if hasattr(printer, 'active') else True, "ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None), "created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(), "last_checked": printer.last_checked.isoformat() if hasattr(printer, 'last_checked') and printer.last_checked else None }) db_session.close() printers_logger.info(f"Schnelles Laden abgeschlossen: {len(printer_data)} Drucker geladen (ohne Status-Check)") return jsonify({ "success": True, "printers": printer_data, "count": len(printer_data), "message": "Drucker erfolgreich geladen" }) except Exception as e: db_session.rollback() db_session.close() printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}") return jsonify({ "error": f"Fehler beim Laden der Drucker: {str(e)}", "printers": [] }), 500 # ===== ERWEITERTE SESSION-MANAGEMENT UND AUTO-LOGOUT ===== @app.before_request def check_session_activity(): """ Überprüft Session-Aktivität und meldet Benutzer bei Inaktivität automatisch ab. """ # Skip für nicht-authentifizierte Benutzer und Login-Route if not current_user.is_authenticated or request.endpoint in ['login', 'static', 'auth_logout']: return # Skip für AJAX/API calls die nicht als Session-Aktivität zählen sollen if request.path.startswith('/api/') and request.path.endswith('/heartbeat'): return now = datetime.now() # Session-Aktivität tracken if 'last_activity' in session: last_activity = datetime.fromisoformat(session['last_activity']) inactive_duration = now - last_activity # Definiere Inaktivitäts-Limits basierend auf Benutzerrolle max_inactive_minutes = 30 # Standard: 30 Minuten if hasattr(current_user, 'is_admin') and current_user.is_admin: max_inactive_minutes = 60 # Admins: 60 Minuten max_inactive_duration = timedelta(minutes=max_inactive_minutes) # Benutzer abmelden wenn zu lange inaktiv if inactive_duration > max_inactive_duration: auth_logger.info(f"🕒 Automatische Abmeldung: Benutzer {current_user.email} war {inactive_duration.total_seconds()/60:.1f} Minuten inaktiv (Limit: {max_inactive_minutes}min)") # Session-Daten vor Logout speichern für Benachrichtigung logout_reason = f"Automatische Abmeldung nach {max_inactive_minutes} Minuten Inaktivität" logout_time = now.isoformat() # Benutzer abmelden logout_user() # Session komplett leeren session.clear() # JSON-Response für AJAX-Requests if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.is_json: return jsonify({ "error": "Session abgelaufen", "reason": "auto_logout_inactivity", "message": f"Sie wurden nach {max_inactive_minutes} Minuten Inaktivität automatisch abgemeldet", "redirect_url": url_for("login") }), 401 # HTML-Redirect für normale Requests flash(f"Sie wurden nach {max_inactive_minutes} Minuten Inaktivität automatisch abgemeldet.", "warning") return redirect(url_for("login")) # Session-Aktivität aktualisieren (aber nicht bei jedem API-Call) if not request.path.startswith('/api/stats/') and not request.path.startswith('/api/heartbeat'): session['last_activity'] = now.isoformat() session['user_agent'] = request.headers.get('User-Agent', '')[:200] # Begrenzt auf 200 Zeichen session['ip_address'] = request.remote_addr # Session-Sicherheit: Überprüfe IP-Adresse und User-Agent (Optional) if 'session_ip' in session and session['session_ip'] != request.remote_addr: auth_logger.warning(f"⚠️ IP-Adresse geändert für Benutzer {current_user.email}: {session['session_ip']} → {request.remote_addr}") # Optional: Benutzer abmelden bei IP-Wechsel (kann bei VPN/Proxy problematisch sein) session['security_warning'] = "IP-Adresse hat sich geändert" @app.before_request def setup_session_security(): """ Initialisiert Session-Sicherheit für neue Sessions. """ if current_user.is_authenticated and 'session_created' not in session: session['session_created'] = datetime.now().isoformat() session['session_ip'] = request.remote_addr session['last_activity'] = datetime.now().isoformat() session.permanent = True # Session als permanent markieren auth_logger.info(f"🔐 Neue Session erstellt für Benutzer {current_user.email} von IP {request.remote_addr}") # ===== SESSION-MANAGEMENT API-ENDPUNKTE ===== @app.route('/api/session/heartbeat', methods=['POST']) @login_required def session_heartbeat(): """ Heartbeat-Endpunkt um Session am Leben zu halten. Wird vom Frontend alle 5 Minuten aufgerufen. """ try: now = datetime.now() session['last_activity'] = now.isoformat() # Berechne verbleibende Session-Zeit last_activity = datetime.fromisoformat(session.get('last_activity', now.isoformat())) max_inactive_minutes = 60 if hasattr(current_user, 'is_admin') and current_user.is_admin else 30 time_left = max_inactive_minutes * 60 - (now - last_activity).total_seconds() return jsonify({ "success": True, "session_active": True, "time_left_seconds": max(0, int(time_left)), "max_inactive_minutes": max_inactive_minutes, "current_time": now.isoformat() }) except Exception as e: auth_logger.error(f"Fehler beim Session-Heartbeat: {str(e)}") return jsonify({"error": "Heartbeat fehlgeschlagen"}), 500 @app.route('/api/session/status', methods=['GET']) @login_required def session_status(): """ Gibt detaillierten Session-Status zurück. """ try: now = datetime.now() last_activity = datetime.fromisoformat(session.get('last_activity', now.isoformat())) session_created = datetime.fromisoformat(session.get('session_created', now.isoformat())) max_inactive_minutes = 60 if hasattr(current_user, 'is_admin') and current_user.is_admin else 30 inactive_duration = (now - last_activity).total_seconds() time_left = max_inactive_minutes * 60 - inactive_duration return jsonify({ "success": True, "user": { "id": current_user.id, "email": current_user.email, "name": current_user.name, "is_admin": getattr(current_user, 'is_admin', False) }, "session": { "created": session_created.isoformat(), "last_activity": last_activity.isoformat(), "inactive_seconds": int(inactive_duration), "time_left_seconds": max(0, int(time_left)), "max_inactive_minutes": max_inactive_minutes, "ip_address": session.get('session_ip', 'unbekannt'), "user_agent": session.get('user_agent', 'unbekannt')[:50] + "..." if len(session.get('user_agent', '')) > 50 else session.get('user_agent', 'unbekannt') }, "warnings": [] }) except Exception as e: auth_logger.error(f"Fehler beim Abrufen des Session-Status: {str(e)}") return jsonify({"error": "Session-Status nicht verfügbar"}), 500 @app.route('/api/session/extend', methods=['POST']) @login_required def extend_session(): """Verlängert die aktuelle Session um die Standard-Lebensdauer""" try: # Session-Lebensdauer zurücksetzen session.permanent = True # Aktivität für Rate Limiting aktualisieren current_user.update_last_activity() # Optional: Session-Statistiken für Admin user_agent = request.headers.get('User-Agent', 'Unknown') ip_address = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr) app_logger.info(f"Session verlängert für User {current_user.id} (IP: {ip_address})") return jsonify({ 'success': True, 'message': 'Session erfolgreich verlängert', 'expires_at': (datetime.now() + SESSION_LIFETIME).isoformat() }) except Exception as e: app_logger.error(f"Fehler beim Verlängern der Session: {str(e)}") return jsonify({ 'success': False, 'error': 'Fehler beim Verlängern der Session' }), 500 # ===== GASTANTRÄGE API-ROUTEN ===== @app.route('/api/admin/guest-requests/test', methods=['GET']) def test_admin_guest_requests(): """Test-Endpunkt für Guest Requests Routing""" app_logger.info("Test-Route /api/admin/guest-requests/test aufgerufen") return jsonify({ 'success': True, 'message': 'Test-Route funktioniert', 'user_authenticated': current_user.is_authenticated, 'user_is_admin': current_user.is_admin if current_user.is_authenticated else False }) @app.route('/api/guest-status', methods=['POST']) def get_guest_request_status(): """ Öffentliche Route für Gäste um ihren Auftragsstatus mit OTP-Code zu prüfen. Keine Authentifizierung erforderlich. """ try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': 'Keine Daten empfangen' }), 400 otp_code = data.get('otp_code', '').strip() email = data.get('email', '').strip() # Optional für zusätzliche Verifikation if not otp_code: return jsonify({ 'success': False, 'message': 'OTP-Code ist erforderlich' }), 400 db_session = get_db_session() # Alle Gastaufträge finden, die den OTP-Code haben könnten # Da OTP gehashed ist, müssen wir durch alle iterieren guest_requests = db_session.query(GuestRequest).filter( GuestRequest.otp_code.isnot(None) ).all() found_request = None for request_obj in guest_requests: if request_obj.verify_otp(otp_code): # Zusätzliche E-Mail-Verifikation falls angegeben if email and request_obj.email.lower() != email.lower(): continue found_request = request_obj break if not found_request: db_session.close() app_logger.warning(f"Ungültiger OTP-Code für Gast-Status-Abfrage: {otp_code[:4]}****") return jsonify({ 'success': False, 'message': 'Ungültiger Code oder E-Mail-Adresse' }), 404 # Status-Informationen für den Gast zusammenstellen status_info = { 'id': found_request.id, 'name': found_request.name, 'file_name': found_request.file_name, 'status': found_request.status, 'created_at': found_request.created_at.isoformat() if found_request.created_at else None, 'updated_at': found_request.updated_at.isoformat() if found_request.updated_at else None, 'duration_minutes': found_request.duration_minutes, 'copies': found_request.copies, 'reason': found_request.reason } # Status-spezifische Informationen hinzufügen if found_request.status == 'approved': status_info.update({ 'approved_at': found_request.approved_at.isoformat() if found_request.approved_at else None, 'approval_notes': found_request.approval_notes, 'message': 'Ihr Auftrag wurde genehmigt! Sie können mit dem Drucken beginnen.' }) elif found_request.status == 'rejected': status_info.update({ 'rejected_at': found_request.rejected_at.isoformat() if found_request.rejected_at else None, 'rejection_reason': found_request.rejection_reason, 'message': 'Ihr Auftrag wurde leider abgelehnt.' }) elif found_request.status == 'pending': # Berechne wie lange der Auftrag schon wartet if found_request.created_at: waiting_time = datetime.now() - found_request.created_at hours_waiting = int(waiting_time.total_seconds() / 3600) status_info.update({ 'hours_waiting': hours_waiting, 'message': f'Ihr Auftrag wird bearbeitet. Wartezeit: {hours_waiting} Stunden.' }) else: status_info['message'] = 'Ihr Auftrag wird bearbeitet.' db_session.commit() # OTP als verwendet markieren db_session.close() app_logger.info(f"Gast-Status-Abfrage erfolgreich für Request {found_request.id}") return jsonify({ 'success': True, 'request': status_info }) except Exception as e: app_logger.error(f"Fehler bei Gast-Status-Abfrage: {str(e)}") return jsonify({ 'success': False, 'message': 'Fehler beim Abrufen des Status' }), 500 @app.route('/guest-status') def guest_status_page(): """ Öffentliche Seite für Gäste um ihren Auftragsstatus zu prüfen. """ return render_template('guest_status.html') @app.route('/api/admin/guest-requests', methods=['GET']) @admin_required def get_admin_guest_requests(): """Gibt alle Gastaufträge für Admin-Verwaltung zurück""" try: app_logger.info(f"API-Aufruf /api/admin/guest-requests von User {current_user.id if current_user.is_authenticated else 'Anonymous'}") db_session = get_db_session() # Parameter auslesen status = request.args.get('status', 'all') page = int(request.args.get('page', 0)) page_size = int(request.args.get('page_size', 50)) search = request.args.get('search', '') sort = request.args.get('sort', 'newest') urgent = request.args.get('urgent', 'all') # Basis-Query query = db_session.query(GuestRequest) # Status-Filter if status != 'all': query = query.filter(GuestRequest.status == status) # Suchfilter if search: search_term = f"%{search}%" query = query.filter( (GuestRequest.name.ilike(search_term)) | (GuestRequest.email.ilike(search_term)) | (GuestRequest.file_name.ilike(search_term)) | (GuestRequest.reason.ilike(search_term)) ) # Dringlichkeitsfilter if urgent == 'urgent': urgent_cutoff = datetime.now() - timedelta(hours=24) query = query.filter( GuestRequest.status == 'pending', GuestRequest.created_at < urgent_cutoff ) elif urgent == 'normal': urgent_cutoff = datetime.now() - timedelta(hours=24) query = query.filter( (GuestRequest.status != 'pending') | (GuestRequest.created_at >= urgent_cutoff) ) # Gesamtanzahl vor Pagination total = query.count() # Sortierung if sort == 'oldest': query = query.order_by(GuestRequest.created_at.asc()) elif sort == 'urgent': # Urgent first, then by creation date desc query = query.order_by(GuestRequest.created_at.asc()).order_by(GuestRequest.created_at.desc()) else: # newest query = query.order_by(GuestRequest.created_at.desc()) # Pagination offset = page * page_size requests = query.offset(offset).limit(page_size).all() # Statistiken berechnen stats = { 'total': db_session.query(GuestRequest).count(), 'pending': db_session.query(GuestRequest).filter(GuestRequest.status == 'pending').count(), 'approved': db_session.query(GuestRequest).filter(GuestRequest.status == 'approved').count(), 'rejected': db_session.query(GuestRequest).filter(GuestRequest.status == 'rejected').count(), } # Requests zu Dictionary konvertieren requests_data = [] for req in requests: # Priorität berechnen now = datetime.now() hours_old = (now - req.created_at).total_seconds() / 3600 if req.created_at else 0 is_urgent = hours_old > 24 and req.status == 'pending' request_data = { 'id': req.id, 'name': req.name, 'email': req.email, 'file_name': req.file_name, 'file_path': req.file_path, 'duration_minutes': req.duration_minutes, 'copies': req.copies, 'reason': req.reason, 'status': req.status, 'created_at': req.created_at.isoformat() if req.created_at else None, 'updated_at': req.updated_at.isoformat() if req.updated_at else None, 'approved_at': req.approved_at.isoformat() if req.approved_at else None, 'rejected_at': req.rejected_at.isoformat() if req.rejected_at else None, 'approval_notes': req.approval_notes, 'rejection_reason': req.rejection_reason, 'is_urgent': is_urgent, 'hours_old': round(hours_old, 1), 'author_ip': req.author_ip } requests_data.append(request_data) db_session.close() app_logger.info(f"Admin-Gastaufträge geladen: {len(requests_data)} von {total} (Status: {status})") return jsonify({ 'success': True, 'requests': requests_data, 'stats': stats, 'total': total, 'page': page, 'page_size': page_size, 'has_more': offset + page_size < total }) except Exception as e: app_logger.error(f"Fehler beim Laden der Admin-Gastaufträge: {str(e)}", exc_info=True) return jsonify({ 'success': False, 'message': f'Fehler beim Laden der Gastaufträge: {str(e)}' }), 500 @app.route('/api/guest-requests//approve', methods=['POST']) @admin_required def approve_guest_request(request_id): """Genehmigt einen Gastauftrag""" try: db_session = get_db_session() guest_request = db_session.get(GuestRequest, request_id) if not guest_request: db_session.close() return jsonify({ 'success': False, 'message': 'Gastauftrag nicht gefunden' }), 404 if guest_request.status != 'pending': db_session.close() return jsonify({ 'success': False, 'message': f'Gastauftrag kann im Status "{guest_request.status}" nicht genehmigt werden' }), 400 # Daten aus Request Body data = request.get_json() or {} notes = data.get('notes', '') printer_id = data.get('printer_id') # Status aktualisieren guest_request.status = 'approved' guest_request.approved_at = datetime.now() guest_request.approved_by = current_user.id guest_request.approval_notes = notes guest_request.updated_at = datetime.now() # Falls Drucker zugewiesen werden soll if printer_id: printer = db_session.get(Printer, printer_id) if printer: guest_request.assigned_printer_id = printer_id # OTP-Code generieren falls noch nicht vorhanden (nutze die Methode aus models.py) otp_code = None if not guest_request.otp_code: otp_code = guest_request.generate_otp() guest_request.otp_expires_at = datetime.now() + timedelta(hours=48) # 48h gültig db_session.commit() # Benachrichtigung an den Gast senden (falls E-Mail verfügbar) if guest_request.email and otp_code: try: # Hier würde normalerweise eine E-Mail gesendet werden app_logger.info(f"Genehmigungs-E-Mail würde an {guest_request.email} gesendet (OTP für Status-Abfrage verfügbar)") except Exception as e: app_logger.warning(f"Fehler beim Senden der E-Mail-Benachrichtigung: {str(e)}") db_session.close() app_logger.info(f"Gastauftrag {request_id} von Admin {current_user.id} genehmigt") response_data = { 'success': True, 'message': 'Gastauftrag erfolgreich genehmigt' } # OTP-Code nur zurückgeben wenn er neu generiert wurde (für Admin-Info) if otp_code: response_data['otp_code_generated'] = True response_data['status_check_url'] = url_for('guest_status_page', _external=True) return jsonify(response_data) except Exception as e: app_logger.error(f"Fehler beim Genehmigen des Gastauftrags {request_id}: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Genehmigen: {str(e)}' }), 500 @app.route('/api/guest-requests//reject', methods=['POST']) @admin_required def reject_guest_request(request_id): """Lehnt einen Gastauftrag ab""" try: db_session = get_db_session() guest_request = db_session.get(GuestRequest, request_id) if not guest_request: db_session.close() return jsonify({ 'success': False, 'message': 'Gastauftrag nicht gefunden' }), 404 if guest_request.status != 'pending': db_session.close() return jsonify({ 'success': False, 'message': f'Gastauftrag kann im Status "{guest_request.status}" nicht abgelehnt werden' }), 400 # Daten aus Request Body data = request.get_json() or {} reason = data.get('reason', '').strip() if not reason: db_session.close() return jsonify({ 'success': False, 'message': 'Ablehnungsgrund ist erforderlich' }), 400 # Status aktualisieren guest_request.status = 'rejected' guest_request.rejected_at = datetime.now() guest_request.rejected_by = current_user.id guest_request.rejection_reason = reason guest_request.updated_at = datetime.now() db_session.commit() # Benachrichtigung an den Gast senden (falls E-Mail verfügbar) if guest_request.email: try: # Hier würde normalerweise eine E-Mail gesendet werden app_logger.info(f"Ablehnungs-E-Mail würde an {guest_request.email} gesendet (Grund: {reason})") except Exception as e: app_logger.warning(f"Fehler beim Senden der Ablehnungs-E-Mail: {str(e)}") db_session.close() app_logger.info(f"Gastauftrag {request_id} von Admin {current_user.id} abgelehnt (Grund: {reason})") return jsonify({ 'success': True, 'message': 'Gastauftrag erfolgreich abgelehnt' }) except Exception as e: app_logger.error(f"Fehler beim Ablehnen des Gastauftrags {request_id}: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Ablehnen: {str(e)}' }), 500 @app.route('/api/guest-requests/', methods=['DELETE']) @admin_required def delete_guest_request(request_id): """Löscht einen Gastauftrag""" try: db_session = get_db_session() guest_request = db_session.get(GuestRequest, request_id) if not guest_request: db_session.close() return jsonify({ 'success': False, 'message': 'Gastauftrag nicht gefunden' }), 404 # Datei löschen falls vorhanden if guest_request.file_path and os.path.exists(guest_request.file_path): try: os.remove(guest_request.file_path) app_logger.info(f"Datei {guest_request.file_path} für Gastauftrag {request_id} gelöscht") except Exception as e: app_logger.warning(f"Fehler beim Löschen der Datei: {str(e)}") # Gastauftrag aus Datenbank löschen request_name = guest_request.name db_session.delete(guest_request) db_session.commit() db_session.close() app_logger.info(f"Gastauftrag {request_id} ({request_name}) von Admin {current_user.id} gelöscht") return jsonify({ 'success': True, 'message': 'Gastauftrag erfolgreich gelöscht' }) except Exception as e: app_logger.error(f"Fehler beim Löschen des Gastauftrags {request_id}: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Löschen: {str(e)}' }), 500 @app.route('/api/guest-requests/', methods=['GET']) @admin_required def get_guest_request_detail(request_id): """Gibt Details eines spezifischen Gastauftrags zurück""" try: db_session = get_db_session() guest_request = db_session.get(GuestRequest, request_id) if not guest_request: db_session.close() return jsonify({ 'success': False, 'message': 'Gastauftrag nicht gefunden' }), 404 # Detaildaten zusammenstellen request_data = { 'id': guest_request.id, 'name': guest_request.name, 'email': guest_request.email, 'file_name': guest_request.file_name, 'file_path': guest_request.file_path, 'file_size': None, 'duration_minutes': guest_request.duration_minutes, 'copies': guest_request.copies, 'reason': guest_request.reason, 'status': guest_request.status, 'created_at': guest_request.created_at.isoformat() if guest_request.created_at else None, 'updated_at': guest_request.updated_at.isoformat() if guest_request.updated_at else None, 'approved_at': guest_request.approved_at.isoformat() if guest_request.approved_at else None, 'rejected_at': guest_request.rejected_at.isoformat() if guest_request.rejected_at else None, 'approval_notes': guest_request.approval_notes, 'rejection_reason': guest_request.rejection_reason, 'otp_code': guest_request.otp_code, 'otp_expires_at': guest_request.otp_expires_at.isoformat() if guest_request.otp_expires_at else None, 'author_ip': guest_request.author_ip } # Dateigröße ermitteln if guest_request.file_path and os.path.exists(guest_request.file_path): try: file_size = os.path.getsize(guest_request.file_path) request_data['file_size'] = file_size request_data['file_size_mb'] = round(file_size / (1024 * 1024), 2) except Exception as e: app_logger.warning(f"Fehler beim Ermitteln der Dateigröße: {str(e)}") # Bearbeiter-Informationen hinzufügen if guest_request.approved_by: approved_by_user = db_session.get(User, guest_request.approved_by) if approved_by_user: request_data['approved_by_name'] = approved_by_user.name or approved_by_user.username if guest_request.rejected_by: rejected_by_user = db_session.get(User, guest_request.rejected_by) if rejected_by_user: request_data['rejected_by_name'] = rejected_by_user.name or rejected_by_user.username # Zugewiesener Drucker if hasattr(guest_request, 'assigned_printer_id') and guest_request.assigned_printer_id: assigned_printer = db_session.get(Printer, guest_request.assigned_printer_id) if assigned_printer: request_data['assigned_printer'] = { 'id': assigned_printer.id, 'name': assigned_printer.name, 'location': assigned_printer.location, 'status': assigned_printer.status } db_session.close() return jsonify({ 'success': True, 'request': request_data }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Gastauftrag-Details {request_id}: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Abrufen der Details: {str(e)}' }), 500 @app.route('/api/admin/guest-requests/stats', methods=['GET']) @admin_required def get_guest_requests_stats(): """Gibt detaillierte Statistiken zu Gastaufträgen zurück""" try: db_session = get_db_session() # Basis-Statistiken total = db_session.query(GuestRequest).count() pending = db_session.query(GuestRequest).filter(GuestRequest.status == 'pending').count() approved = db_session.query(GuestRequest).filter(GuestRequest.status == 'approved').count() rejected = db_session.query(GuestRequest).filter(GuestRequest.status == 'rejected').count() # Zeitbasierte Statistiken today = datetime.now().date() week_ago = datetime.now() - timedelta(days=7) month_ago = datetime.now() - timedelta(days=30) today_requests = db_session.query(GuestRequest).filter( func.date(GuestRequest.created_at) == today ).count() week_requests = db_session.query(GuestRequest).filter( GuestRequest.created_at >= week_ago ).count() month_requests = db_session.query(GuestRequest).filter( GuestRequest.created_at >= month_ago ).count() # Dringende Requests (älter als 24h und pending) urgent_cutoff = datetime.now() - timedelta(hours=24) urgent_requests = db_session.query(GuestRequest).filter( GuestRequest.status == 'pending', GuestRequest.created_at < urgent_cutoff ).count() # Durchschnittliche Bearbeitungszeit avg_processing_time = None try: processed_requests = db_session.query(GuestRequest).filter( GuestRequest.status.in_(['approved', 'rejected']), GuestRequest.updated_at.isnot(None) ).all() if processed_requests: total_time = sum([ (req.updated_at - req.created_at).total_seconds() for req in processed_requests if req.updated_at and req.created_at ]) avg_processing_time = round(total_time / len(processed_requests) / 3600, 2) # Stunden except Exception as e: app_logger.warning(f"Fehler beim Berechnen der durchschnittlichen Bearbeitungszeit: {str(e)}") # Erfolgsrate success_rate = 0 if approved + rejected > 0: success_rate = round((approved / (approved + rejected)) * 100, 1) stats = { 'total': total, 'pending': pending, 'approved': approved, 'rejected': rejected, 'urgent': urgent_requests, 'today': today_requests, 'week': week_requests, 'month': month_requests, 'success_rate': success_rate, 'avg_processing_time_hours': avg_processing_time, 'completion_rate': round(((approved + rejected) / total * 100), 1) if total > 0 else 0 } db_session.close() return jsonify({ 'success': True, 'stats': stats, 'generated_at': datetime.now().isoformat() }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Gastauftrag-Statistiken: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Abrufen der Statistiken: {str(e)}' }), 500 @app.route('/api/admin/guest-requests/export', methods=['GET']) @admin_required def export_guest_requests(): """Exportiert Gastaufträge als CSV""" try: db_session = get_db_session() # Filter-Parameter status = request.args.get('status', 'all') start_date = request.args.get('start_date') end_date = request.args.get('end_date') # Query aufbauen query = db_session.query(GuestRequest) if status != 'all': query = query.filter(GuestRequest.status == status) if start_date: try: start_dt = datetime.fromisoformat(start_date) query = query.filter(GuestRequest.created_at >= start_dt) except ValueError: pass if end_date: try: end_dt = datetime.fromisoformat(end_date) query = query.filter(GuestRequest.created_at <= end_dt) except ValueError: pass requests = query.order_by(GuestRequest.created_at.desc()).all() # CSV-Daten erstellen import csv import io output = io.StringIO() writer = csv.writer(output) # Header writer.writerow([ 'ID', 'Name', 'E-Mail', 'Datei', 'Status', 'Erstellt am', 'Dauer (Min)', 'Kopien', 'Begründung', 'Genehmigt am', 'Abgelehnt am', 'Bearbeitungsnotizen', 'Ablehnungsgrund', 'OTP-Code' ]) # Daten for req in requests: writer.writerow([ req.id, req.name or '', req.email or '', req.file_name or '', req.status, req.created_at.strftime('%Y-%m-%d %H:%M:%S') if req.created_at else '', req.duration_minutes or '', req.copies or '', req.reason or '', req.approved_at.strftime('%Y-%m-%d %H:%M:%S') if req.approved_at else '', req.rejected_at.strftime('%Y-%m-%d %H:%M:%S') if req.rejected_at else '', req.approval_notes or '', req.rejection_reason or '', req.otp_code or '' ]) db_session.close() # Response erstellen output.seek(0) filename = f"gastantraege_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" response = make_response(output.getvalue()) response.headers['Content-Type'] = 'text/csv; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' app_logger.info(f"Gastaufträge-Export erstellt: {len(requests)} Datensätze") return response except Exception as e: app_logger.error(f"Fehler beim Exportieren der Gastaufträge: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Export: {str(e)}' }), 500 # ===== AUTO-OPTIMIERUNG-API-ENDPUNKTE ===== @app.route('/api/optimization/auto-optimize', methods=['POST']) @login_required def auto_optimize_jobs(): """ Automatische Optimierung der Druckaufträge durchführen Implementiert intelligente Job-Verteilung basierend auf verschiedenen Algorithmen """ try: data = request.get_json() settings = data.get('settings', {}) enabled = data.get('enabled', False) db_session = get_db_session() # Aktuelle Jobs in der Warteschlange abrufen pending_jobs = db_session.query(Job).filter( Job.status.in_(['queued', 'pending']) ).all() if not pending_jobs: db_session.close() return jsonify({ 'success': True, 'message': 'Keine Jobs zur Optimierung verfügbar', 'optimized_jobs': 0 }) # Verfügbare Drucker abrufen available_printers = db_session.query(Printer).filter(Printer.active == True).all() if not available_printers: db_session.close() return jsonify({ 'success': False, 'error': 'Keine verfügbaren Drucker für Optimierung' }) # Optimierungs-Algorithmus anwenden algorithm = settings.get('algorithm', 'round_robin') optimized_count = 0 if algorithm == 'round_robin': optimized_count = apply_round_robin_optimization(pending_jobs, available_printers, db_session) elif algorithm == 'load_balance': optimized_count = apply_load_balance_optimization(pending_jobs, available_printers, db_session) elif algorithm == 'priority_based': optimized_count = apply_priority_optimization(pending_jobs, available_printers, db_session) db_session.commit() jobs_logger.info(f"Auto-Optimierung durchgeführt: {optimized_count} Jobs optimiert mit Algorithmus {algorithm}") # System-Log erstellen log_entry = SystemLog( level='INFO', component='optimization', message=f'Auto-Optimierung durchgeführt: {optimized_count} Jobs optimiert', user_id=current_user.id if current_user.is_authenticated else None, details=json.dumps({ 'algorithm': algorithm, 'optimized_jobs': optimized_count, 'settings': settings }) ) db_session.add(log_entry) db_session.commit() db_session.close() return jsonify({ 'success': True, 'optimized_jobs': optimized_count, 'algorithm': algorithm, 'message': f'Optimierung erfolgreich: {optimized_count} Jobs wurden optimiert' }) except Exception as e: app_logger.error(f"Fehler bei der Auto-Optimierung: {str(e)}") return jsonify({ 'success': False, 'error': f'Optimierung fehlgeschlagen: {str(e)}' }), 500 @app.route('/api/optimization/settings', methods=['GET', 'POST']) @login_required def optimization_settings(): """Optimierungs-Einstellungen abrufen und speichern""" db_session = get_db_session() if request.method == 'GET': try: # Standard-Einstellungen oder benutzerdefinierte laden default_settings = { 'algorithm': 'round_robin', 'consider_distance': True, 'minimize_changeover': True, 'max_batch_size': 10, 'time_window': 24, 'auto_optimization_enabled': False } # Benutzereinstellungen aus der Session laden oder Standardwerte verwenden user_settings = session.get('user_settings', {}) optimization_settings = user_settings.get('optimization', default_settings) # Sicherstellen, dass alle erforderlichen Schlüssel vorhanden sind for key, value in default_settings.items(): if key not in optimization_settings: optimization_settings[key] = value return jsonify({ 'success': True, 'settings': optimization_settings }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Optimierungs-Einstellungen: {str(e)}") return jsonify({ 'success': False, 'error': 'Fehler beim Laden der Einstellungen' }), 500 elif request.method == 'POST': try: settings = request.get_json() # Validierung der Einstellungen if not validate_optimization_settings(settings): return jsonify({ 'success': False, 'error': 'Ungültige Optimierungs-Einstellungen' }), 400 # Einstellungen in der Session speichern user_settings = session.get('user_settings', {}) if 'optimization' not in user_settings: user_settings['optimization'] = {} # Aktualisiere die Optimierungseinstellungen user_settings['optimization'].update(settings) session['user_settings'] = user_settings # Einstellungen in der Datenbank speichern, wenn möglich if hasattr(current_user, 'settings'): import json current_user.settings = json.dumps(user_settings) current_user.updated_at = datetime.now() db_session.commit() app_logger.info(f"Optimierungs-Einstellungen für Benutzer {current_user.id} aktualisiert") return jsonify({ 'success': True, 'message': 'Optimierungs-Einstellungen erfolgreich gespeichert' }) except Exception as e: db_session.rollback() app_logger.error(f"Fehler beim Speichern der Optimierungs-Einstellungen: {str(e)}") return jsonify({ 'success': False, 'error': f'Fehler beim Speichern der Einstellungen: {str(e)}' }), 500 finally: db_session.close() @app.route('/admin/advanced-settings') @login_required @admin_required def admin_advanced_settings(): """Erweiterte Admin-Einstellungen - HTML-Seite""" try: app_logger.info(f"🔧 Erweiterte Einstellungen aufgerufen von Admin {current_user.username}") db_session = get_db_session() # Aktuelle Optimierungs-Einstellungen laden default_settings = { 'algorithm': 'round_robin', 'consider_distance': True, 'minimize_changeover': True, 'max_batch_size': 10, 'time_window': 24, 'auto_optimization_enabled': False } user_settings = session.get('user_settings', {}) optimization_settings = user_settings.get('optimization', default_settings) # Performance-Optimierungs-Status hinzufügen performance_optimization = { 'active': USE_OPTIMIZED_CONFIG, 'raspberry_pi_detected': detect_raspberry_pi(), 'forced_mode': os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes'], 'cli_mode': '--optimized' in sys.argv, 'current_settings': { 'minified_assets': app.jinja_env.globals.get('use_minified_assets', False), 'disabled_animations': app.jinja_env.globals.get('disable_animations', False), 'limited_glassmorphism': app.jinja_env.globals.get('limit_glassmorphism', False), 'template_caching': not app.config.get('TEMPLATES_AUTO_RELOAD', True), 'json_optimization': not app.config.get('JSON_SORT_KEYS', True), 'static_cache_age': app.config.get('SEND_FILE_MAX_AGE_DEFAULT', 0) } } # System-Statistiken sammeln stats = { 'total_users': db_session.query(User).count(), 'total_printers': db_session.query(Printer).count(), 'active_printers': db_session.query(Printer).filter(Printer.active == True).count(), 'total_jobs': db_session.query(Job).count(), 'pending_jobs': db_session.query(Job).filter(Job.status.in_(['queued', 'pending'])).count(), 'completed_jobs': db_session.query(Job).filter(Job.status == 'completed').count() } # Wartungs-Informationen maintenance_info = { 'last_backup': 'Nie', 'last_optimization': 'Nie', 'cache_size': '0 MB', 'log_files_count': 0 } # Backup-Informationen laden try: backup_dir = os.path.join(app.root_path, 'database', 'backups') if os.path.exists(backup_dir): backup_files = [f for f in os.listdir(backup_dir) if f.startswith('myp_backup_') and f.endswith('.zip')] if backup_files: backup_files.sort(reverse=True) latest_backup = backup_files[0] backup_path = os.path.join(backup_dir, latest_backup) backup_time = datetime.fromtimestamp(os.path.getctime(backup_path)) maintenance_info['last_backup'] = backup_time.strftime('%d.%m.%Y %H:%M') except Exception as e: app_logger.warning(f"Fehler beim Laden der Backup-Informationen: {str(e)}") # Log-Dateien zählen try: logs_dir = os.path.join(app.root_path, 'logs') if os.path.exists(logs_dir): log_count = 0 for root, dirs, files in os.walk(logs_dir): log_count += len([f for f in files if f.endswith('.log')]) maintenance_info['log_files_count'] = log_count except Exception as e: app_logger.warning(f"Fehler beim Zählen der Log-Dateien: {str(e)}") db_session.close() return render_template( 'admin_advanced_settings.html', title='Erweiterte Einstellungen', optimization_settings=optimization_settings, performance_optimization=performance_optimization, stats=stats, maintenance_info=maintenance_info ) except Exception as e: app_logger.error(f"❌ Fehler beim Laden der erweiterten Einstellungen: {str(e)}") flash('Fehler beim Laden der erweiterten Einstellungen', 'error') return redirect(url_for('admin_page')) @app.route("/admin/performance-optimization") @login_required @admin_required def admin_performance_optimization(): """Performance-Optimierungs-Verwaltungsseite für Admins""" try: app_logger.info(f"🚀 Performance-Optimierung-Seite aufgerufen von Admin {current_user.username}") # Aktuelle Optimierungseinstellungen sammeln optimization_status = { 'mode_active': USE_OPTIMIZED_CONFIG, 'detection': { 'raspberry_pi': detect_raspberry_pi(), 'forced_mode': os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes'], 'cli_mode': '--optimized' in sys.argv, 'low_memory': False }, 'settings': { 'minified_assets': app.jinja_env.globals.get('use_minified_assets', False), 'disabled_animations': app.jinja_env.globals.get('disable_animations', False), 'limited_glassmorphism': app.jinja_env.globals.get('limit_glassmorphism', False), 'template_caching': not app.config.get('TEMPLATES_AUTO_RELOAD', True), 'json_optimization': not app.config.get('JSON_SORT_KEYS', True), 'debug_disabled': not app.config.get('DEBUG', False), 'secure_sessions': app.config.get('SESSION_COOKIE_SECURE', False) }, 'performance': { 'static_cache_age_hours': app.config.get('SEND_FILE_MAX_AGE_DEFAULT', 0) / 3600, 'max_upload_mb': app.config.get('MAX_CONTENT_LENGTH', 0) / (1024 * 1024) if app.config.get('MAX_CONTENT_LENGTH') else 0, 'sqlalchemy_echo': app.config.get('SQLALCHEMY_ECHO', True) } } # Memory-Erkennung hinzufügen try: import psutil memory_gb = psutil.virtual_memory().total / (1024**3) optimization_status['detection']['low_memory'] = memory_gb < 2.0 optimization_status['system_memory_gb'] = round(memory_gb, 2) except ImportError: optimization_status['system_memory_gb'] = None return render_template( 'admin_performance_optimization.html', title='Performance-Optimierung', optimization_status=optimization_status ) except Exception as e: app_logger.error(f"❌ Fehler beim Laden der Performance-Optimierung-Seite: {str(e)}") flash('Fehler beim Laden der Performance-Optimierung-Seite', 'error') return redirect(url_for('admin_page')) @app.route('/api/admin/maintenance/cleanup-logs', methods=['POST']) @login_required @admin_required def api_cleanup_logs(): """Bereinigt alte Log-Dateien""" try: app_logger.info(f"📋 Log-Bereinigung gestartet von Benutzer {current_user.username}") cleanup_results = { 'files_removed': 0, 'space_freed_mb': 0, 'directories_cleaned': [], 'errors': [] } # Log-Verzeichnis bereinigen logs_dir = os.path.join(app.root_path, 'logs') if os.path.exists(logs_dir): cutoff_date = datetime.now() - timedelta(days=30) for root, dirs, files in os.walk(logs_dir): for file in files: if file.endswith('.log'): file_path = os.path.join(root, file) try: file_time = datetime.fromtimestamp(os.path.getctime(file_path)) if file_time < cutoff_date: file_size = os.path.getsize(file_path) os.remove(file_path) cleanup_results['files_removed'] += 1 cleanup_results['space_freed_mb'] += file_size / (1024 * 1024) except Exception as e: cleanup_results['errors'].append(f"Fehler bei {file}: {str(e)}") # Verzeichnis zu bereinigten hinzufügen rel_dir = os.path.relpath(root, logs_dir) if rel_dir != '.' and rel_dir not in cleanup_results['directories_cleaned']: cleanup_results['directories_cleaned'].append(rel_dir) # Temporäre Upload-Dateien bereinigen (älter als 7 Tage) uploads_temp_dir = os.path.join(app.root_path, 'uploads', 'temp') if os.path.exists(uploads_temp_dir): temp_cutoff_date = datetime.now() - timedelta(days=7) for root, dirs, files in os.walk(uploads_temp_dir): for file in files: file_path = os.path.join(root, file) try: file_time = datetime.fromtimestamp(os.path.getctime(file_path)) if file_time < temp_cutoff_date: file_size = os.path.getsize(file_path) os.remove(file_path) cleanup_results['files_removed'] += 1 cleanup_results['space_freed_mb'] += file_size / (1024 * 1024) except Exception as e: cleanup_results['errors'].append(f"Temp-Datei {file}: {str(e)}") cleanup_results['space_freed_mb'] = round(cleanup_results['space_freed_mb'], 2) app_logger.info(f"✅ Log-Bereinigung abgeschlossen: {cleanup_results['files_removed']} Dateien entfernt, {cleanup_results['space_freed_mb']} MB freigegeben") return jsonify({ 'success': True, 'message': f'Log-Bereinigung erfolgreich: {cleanup_results["files_removed"]} Dateien entfernt', 'details': cleanup_results }) except Exception as e: app_logger.error(f"❌ Fehler bei Log-Bereinigung: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler bei der Log-Bereinigung: {str(e)}' }), 500 @app.route('/api/admin/maintenance/system-check', methods=['POST']) @login_required @admin_required def api_system_check(): """Führt eine System-Integritätsprüfung durch""" try: app_logger.info(f"🔍 System-Integritätsprüfung gestartet von Benutzer {current_user.username}") check_results = { 'database_integrity': False, 'file_permissions': False, 'disk_space': False, 'memory_usage': False, 'critical_files': False, 'errors': [], 'warnings': [], 'details': {} } # 1. Datenbank-Integritätsprüfung try: db_session = get_db_session() # Einfache Abfrage zur Überprüfung der DB-Verbindung user_count = db_session.query(User).count() printer_count = db_session.query(Printer).count() check_results['database_integrity'] = True check_results['details']['database'] = { 'users': user_count, 'printers': printer_count, 'connection': 'OK' } db_session.close() except Exception as e: check_results['errors'].append(f"Datenbank-Integritätsprüfung: {str(e)}") check_results['details']['database'] = {'error': str(e)} # 2. Festplattenspeicher prüfen try: import shutil total, used, free = shutil.disk_usage(app.root_path) free_gb = free / (1024**3) used_percent = (used / total) * 100 check_results['disk_space'] = free_gb > 1.0 # Mindestens 1GB frei check_results['details']['disk_space'] = { 'free_gb': round(free_gb, 2), 'used_percent': round(used_percent, 2), 'total_gb': round(total / (1024**3), 2) } if used_percent > 90: check_results['warnings'].append(f"Festplatte zu {used_percent:.1f}% belegt") except Exception as e: check_results['errors'].append(f"Festplattenspeicher-Prüfung: {str(e)}") # 3. Speicherverbrauch prüfen try: import psutil memory = psutil.virtual_memory() check_results['memory_usage'] = memory.percent < 90 check_results['details']['memory'] = { 'used_percent': round(memory.percent, 2), 'available_gb': round(memory.available / (1024**3), 2), 'total_gb': round(memory.total / (1024**3), 2) } if memory.percent > 85: check_results['warnings'].append(f"Speicherverbrauch bei {memory.percent:.1f}%") except ImportError: check_results['warnings'].append("psutil nicht verfügbar - Speicherprüfung übersprungen") except Exception as e: check_results['errors'].append(f"Speicher-Prüfung: {str(e)}") # 4. Kritische Dateien prüfen try: critical_files = [ 'app.py', 'models.py', 'requirements.txt', os.path.join('instance', 'database.db') ] missing_files = [] for file_path in critical_files: full_path = os.path.join(app.root_path, file_path) if not os.path.exists(full_path): missing_files.append(file_path) check_results['critical_files'] = len(missing_files) == 0 check_results['details']['critical_files'] = { 'checked': len(critical_files), 'missing': missing_files } if missing_files: check_results['errors'].append(f"Fehlende kritische Dateien: {', '.join(missing_files)}") except Exception as e: check_results['errors'].append(f"Datei-Prüfung: {str(e)}") # 5. Dateiberechtigungen prüfen try: test_dirs = ['logs', 'uploads', 'instance'] permission_issues = [] for dir_name in test_dirs: dir_path = os.path.join(app.root_path, dir_name) if os.path.exists(dir_path): if not os.access(dir_path, os.W_OK): permission_issues.append(dir_name) check_results['file_permissions'] = len(permission_issues) == 0 check_results['details']['file_permissions'] = { 'checked_directories': test_dirs, 'permission_issues': permission_issues } if permission_issues: check_results['errors'].append(f"Schreibrechte fehlen: {', '.join(permission_issues)}") except Exception as e: check_results['errors'].append(f"Berechtigungs-Prüfung: {str(e)}") # Gesamtergebnis bewerten passed_checks = sum([ check_results['database_integrity'], check_results['file_permissions'], check_results['disk_space'], check_results['memory_usage'], check_results['critical_files'] ]) total_checks = 5 success_rate = (passed_checks / total_checks) * 100 check_results['overall_health'] = 'excellent' if success_rate >= 100 else \ 'good' if success_rate >= 80 else \ 'warning' if success_rate >= 60 else 'critical' check_results['success_rate'] = round(success_rate, 1) app_logger.info(f"✅ System-Integritätsprüfung abgeschlossen: {success_rate:.1f}% ({passed_checks}/{total_checks} Tests bestanden)") return jsonify({ 'success': True, 'message': f'System-Integritätsprüfung abgeschlossen: {success_rate:.1f}% Erfolgsrate', 'details': check_results }) except Exception as e: app_logger.error(f"❌ Fehler bei System-Integritätsprüfung: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler bei der System-Integritätsprüfung: {str(e)}' }), 500 # ===== OPTIMIERUNGS-ALGORITHMUS-FUNKTIONEN ===== def apply_round_robin_optimization(jobs, printers, db_session): """ Round-Robin-Optimierung: Gleichmäßige Verteilung der Jobs auf Drucker Verteilt Jobs nacheinander auf verfügbare Drucker für optimale Balance """ optimized_count = 0 printer_index = 0 for job in jobs: if printer_index >= len(printers): printer_index = 0 # Job dem nächsten Drucker zuweisen job.printer_id = printers[printer_index].id job.assigned_at = datetime.now() optimized_count += 1 printer_index += 1 return optimized_count def apply_load_balance_optimization(jobs, printers, db_session): """ Load-Balancing-Optimierung: Jobs basierend auf aktueller Auslastung verteilen Berücksichtigt die aktuelle Drucker-Auslastung für optimale Verteilung """ optimized_count = 0 # Aktuelle Drucker-Auslastung berechnen printer_loads = {} for printer in printers: current_jobs = db_session.query(Job).filter( Job.printer_id == printer.id, Job.status.in_(['running', 'queued']) ).count() printer_loads[printer.id] = current_jobs for job in jobs: # Drucker mit geringster Auslastung finden min_load_printer_id = min(printer_loads, key=printer_loads.get) job.printer_id = min_load_printer_id job.assigned_at = datetime.now() # Auslastung für nächste Iteration aktualisieren printer_loads[min_load_printer_id] += 1 optimized_count += 1 return optimized_count def apply_priority_optimization(jobs, printers, db_session): """ Prioritätsbasierte Optimierung: Jobs nach Priorität und verfügbaren Druckern verteilen Hochpriorisierte Jobs erhalten bevorzugte Druckerzuweisung """ optimized_count = 0 # Jobs nach Priorität sortieren priority_order = {'urgent': 1, 'high': 2, 'normal': 3, 'low': 4} sorted_jobs = sorted(jobs, key=lambda j: priority_order.get(getattr(j, 'priority', 'normal'), 3)) # Hochpriorisierte Jobs den besten verfügbaren Druckern zuweisen printer_assignments = {printer.id: 0 for printer in printers} for job in sorted_jobs: # Drucker mit geringster Anzahl zugewiesener Jobs finden best_printer_id = min(printer_assignments, key=printer_assignments.get) job.printer_id = best_printer_id job.assigned_at = datetime.now() printer_assignments[best_printer_id] += 1 optimized_count += 1 return optimized_count def validate_optimization_settings(settings): """ Validiert die Optimierungs-Einstellungen auf Korrektheit und Sicherheit Verhindert ungültige Parameter die das System beeinträchtigen könnten """ try: # Algorithmus validieren valid_algorithms = ['round_robin', 'load_balance', 'priority_based'] if settings.get('algorithm') not in valid_algorithms: return False # Numerische Werte validieren max_batch_size = settings.get('max_batch_size', 10) if not isinstance(max_batch_size, int) or max_batch_size < 1 or max_batch_size > 50: return False time_window = settings.get('time_window', 24) if not isinstance(time_window, int) or time_window < 1 or time_window > 168: return False return True except Exception: return False # ===== FORM VALIDATION API ===== @app.route('/api/validation/client-js', methods=['GET']) def get_validation_js(): """Liefert Client-seitige Validierungs-JavaScript""" try: js_content = get_client_validation_js() response = make_response(js_content) response.headers['Content-Type'] = 'application/javascript' response.headers['Cache-Control'] = 'public, max-age=3600' # 1 Stunde Cache return response except Exception as e: app_logger.error(f"Fehler beim Laden des Validierungs-JS: {str(e)}") return "console.error('Validierungs-JavaScript konnte nicht geladen werden');", 500 @app.route('/api/validation/validate-form', methods=['POST']) def validate_form_api(): """API-Endpunkt für Formular-Validierung""" try: data = request.get_json() or {} form_type = data.get('form_type') form_data = data.get('data', {}) # Validator basierend auf Form-Typ auswählen if form_type == 'user_registration': validator = get_user_registration_validator() elif form_type == 'job_creation': validator = get_job_creation_validator() elif form_type == 'printer_creation': validator = get_printer_creation_validator() elif form_type == 'guest_request': validator = get_guest_request_validator() else: return jsonify({'success': False, 'error': 'Unbekannter Formular-Typ'}), 400 # Validierung durchführen result = validator.validate(form_data) return jsonify({ 'success': result.is_valid, 'errors': result.errors, 'warnings': result.warnings, 'cleaned_data': result.cleaned_data if result.is_valid else {} }) except Exception as e: app_logger.error(f"Fehler bei Formular-Validierung: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 # ===== REPORT GENERATOR API ===== @app.route('/api/reports/generate', methods=['POST']) @login_required def generate_report(): """Generiert Reports in verschiedenen Formaten""" try: data = request.get_json() or {} report_type = data.get('type', 'comprehensive') format_type = data.get('format', 'pdf') filters = data.get('filters', {}) # Report-Konfiguration erstellen config = ReportConfig( title=f"MYP System Report - {report_type.title()}", subtitle=f"Generiert am {datetime.now().strftime('%d.%m.%Y %H:%M')}", author=current_user.name if current_user.is_authenticated else "System" ) # Report-Daten basierend auf Typ sammeln if report_type == 'jobs': report_data = JobReportBuilder.build_jobs_report( start_date=filters.get('start_date'), end_date=filters.get('end_date'), user_id=filters.get('user_id'), printer_id=filters.get('printer_id') ) elif report_type == 'users': report_data = UserReportBuilder.build_users_report( include_inactive=filters.get('include_inactive', False) ) elif report_type == 'printers': report_data = PrinterReportBuilder.build_printers_report( include_inactive=filters.get('include_inactive', False) ) else: # Umfassender Report report_bytes = generate_comprehensive_report( format_type=format_type, start_date=filters.get('start_date'), end_date=filters.get('end_date'), user_id=current_user.id if not current_user.is_admin else None ) response = make_response(report_bytes) response.headers['Content-Type'] = f'application/{format_type}' response.headers['Content-Disposition'] = f'attachment; filename="myp_report.{format_type}"' return response # Generator erstellen und Report generieren generator = ReportFactory.create_generator(format_type, config) # Daten zum Generator hinzufügen for section_name, section_data in report_data.items(): if isinstance(section_data, list): generator.add_data_section(section_name, section_data) # Report in BytesIO generieren import io output = io.BytesIO() if generator.generate(output): output.seek(0) response = make_response(output.read()) response.headers['Content-Type'] = f'application/{format_type}' response.headers['Content-Disposition'] = f'attachment; filename="myp_{report_type}_report.{format_type}"' return response else: return jsonify({'error': 'Report-Generierung fehlgeschlagen'}), 500 except Exception as e: app_logger.error(f"Fehler bei Report-Generierung: {str(e)}") return jsonify({'error': str(e)}), 500 # ===== REALTIME DASHBOARD API ===== @app.route('/api/dashboard/config', methods=['GET']) @login_required def get_dashboard_config(): """Holt Dashboard-Konfiguration für aktuellen Benutzer""" try: config = dashboard_manager.get_dashboard_config(current_user.id) return jsonify(config) except Exception as e: app_logger.error(f"Fehler beim Laden der Dashboard-Konfiguration: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dashboard/widgets//data', methods=['GET']) @login_required def get_widget_data(widget_id): """Holt Daten für ein spezifisches Widget""" try: data = dashboard_manager._get_widget_data(widget_id) return jsonify({ 'widget_id': widget_id, 'data': data, 'timestamp': datetime.now().isoformat() }) except Exception as e: app_logger.error(f"Fehler beim Laden der Widget-Daten für {widget_id}: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dashboard/emit-event', methods=['POST']) @login_required def emit_dashboard_event(): """Sendet ein Dashboard-Ereignis""" try: data = request.get_json() or {} event_type = EventType(data.get('event_type')) event_data = data.get('data', {}) priority = data.get('priority', 'normal') event = DashboardEvent( event_type=event_type, data=event_data, timestamp=datetime.now(), user_id=current_user.id, priority=priority ) dashboard_manager.emit_event(event) return jsonify({'success': True}) except Exception as e: app_logger.error(f"Fehler beim Senden des Dashboard-Ereignisses: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dashboard/client-js', methods=['GET']) def get_dashboard_js(): """Liefert Client-seitige Dashboard-JavaScript""" try: js_content = get_dashboard_client_js() response = make_response(js_content) response.headers['Content-Type'] = 'application/javascript' response.headers['Cache-Control'] = 'public, max-age=1800' # 30 Minuten Cache return response except Exception as e: app_logger.error(f"Fehler beim Laden des Dashboard-JS: {str(e)}") return "console.error('Dashboard-JavaScript konnte nicht geladen werden');", 500 # ===== DRAG & DROP API ===== @app.route('/api/dragdrop/update-job-order', methods=['POST']) @login_required def update_job_order(): """Aktualisiert die Job-Reihenfolge per Drag & Drop""" try: data = request.get_json() or {} printer_id = data.get('printer_id') job_ids = data.get('job_ids', []) if not printer_id or not isinstance(job_ids, list): return jsonify({'error': 'Ungültige Parameter'}), 400 success = drag_drop_manager.update_job_order(printer_id, job_ids) if success: # Dashboard-Event senden emit_system_alert( f"Job-Reihenfolge für Drucker {printer_id} aktualisiert", alert_type="info", priority="normal" ) return jsonify({ 'success': True, 'message': 'Job-Reihenfolge erfolgreich aktualisiert' }) else: return jsonify({'error': 'Fehler beim Aktualisieren der Job-Reihenfolge'}), 500 except Exception as e: app_logger.error(f"Fehler beim Aktualisieren der Job-Reihenfolge: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dragdrop/get-job-order/', methods=['GET']) @login_required def get_job_order_api(printer_id): """Holt die aktuelle Job-Reihenfolge für einen Drucker""" try: job_ids = drag_drop_manager.get_job_order(printer_id) ordered_jobs = drag_drop_manager.get_ordered_jobs_for_printer(printer_id) job_data = [] for job in ordered_jobs: job_data.append({ 'id': job.id, 'name': job.name, 'duration_minutes': job.duration_minutes, 'user_name': job.user.name if job.user else 'Unbekannt', 'status': job.status, 'created_at': job.created_at.isoformat() if job.created_at else None }) return jsonify({ 'printer_id': printer_id, 'job_ids': job_ids, 'jobs': job_data, 'total_jobs': len(job_data) }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Job-Reihenfolge: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dragdrop/upload-session', methods=['POST']) @login_required def create_upload_session(): """Erstellt eine neue Upload-Session""" try: import uuid session_id = str(uuid.uuid4()) drag_drop_manager.create_upload_session(session_id) return jsonify({ 'session_id': session_id, 'success': True }) except Exception as e: app_logger.error(f"Fehler beim Erstellen der Upload-Session: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dragdrop/upload-progress/', methods=['GET']) @login_required def get_upload_progress(session_id): """Holt Upload-Progress für eine Session""" try: progress = drag_drop_manager.get_session_progress(session_id) return jsonify(progress) except Exception as e: app_logger.error(f"Fehler beim Abrufen des Upload-Progress: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/dragdrop/client-js', methods=['GET']) def get_dragdrop_js(): """Liefert Client-seitige Drag & Drop JavaScript""" try: js_content = get_drag_drop_javascript() response = make_response(js_content) response.headers['Content-Type'] = 'application/javascript' response.headers['Cache-Control'] = 'public, max-age=3600' return response except Exception as e: app_logger.error(f"Fehler beim Laden des Drag & Drop JS: {str(e)}") return "console.error('Drag & Drop JavaScript konnte nicht geladen werden');", 500 @app.route('/api/dragdrop/client-css', methods=['GET']) def get_dragdrop_css(): """Liefert Client-seitige Drag & Drop CSS""" try: css_content = get_drag_drop_css() response = make_response(css_content) response.headers['Content-Type'] = 'text/css' response.headers['Cache-Control'] = 'public, max-age=3600' return response except Exception as e: app_logger.error(f"Fehler beim Laden des Drag & Drop CSS: {str(e)}") return "/* Drag & Drop CSS konnte nicht geladen werden */", 500 # ===== ADVANCED TABLES API ===== @app.route('/api/tables/query', methods=['POST']) @login_required def query_advanced_table(): """Führt erweiterte Tabellen-Abfragen durch""" try: data = request.get_json() or {} table_type = data.get('table_type') query_params = data.get('query', {}) # Tabellen-Konfiguration erstellen if table_type == 'jobs': config = create_table_config( 'jobs', ['id', 'name', 'user_name', 'printer_name', 'status', 'created_at'], base_query='Job' ) elif table_type == 'printers': config = create_table_config( 'printers', ['id', 'name', 'model', 'location', 'status', 'ip_address'], base_query='Printer' ) elif table_type == 'users': config = create_table_config( 'users', ['id', 'name', 'email', 'role', 'active', 'last_login'], base_query='User' ) else: return jsonify({'error': 'Unbekannter Tabellen-Typ'}), 400 # Erweiterte Abfrage erstellen query_builder = AdvancedTableQuery(config) # Filter anwenden if 'filters' in query_params: for filter_data in query_params['filters']: query_builder.add_filter( filter_data['column'], filter_data['operator'], filter_data['value'] ) # Sortierung anwenden if 'sort' in query_params: query_builder.set_sorting( query_params['sort']['column'], query_params['sort']['direction'] ) # Paginierung anwenden if 'pagination' in query_params: query_builder.set_pagination( query_params['pagination']['page'], query_params['pagination']['per_page'] ) # Abfrage ausführen result = query_builder.execute() return jsonify(result) except Exception as e: app_logger.error(f"Fehler bei erweiterte Tabellen-Abfrage: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/tables/export', methods=['POST']) @login_required def export_table_data(): """Exportiert Tabellen-Daten in verschiedenen Formaten""" try: data = request.get_json() or {} table_type = data.get('table_type') export_format = data.get('format', 'csv') query_params = data.get('query', {}) # Vollständige Export-Logik implementierung app_logger.info(f"📊 Starte Tabellen-Export: {table_type} als {export_format}") # Tabellen-Konfiguration basierend auf Typ erstellen if table_type == 'jobs': config = create_table_config( 'jobs', ['id', 'filename', 'status', 'printer_name', 'user_name', 'created_at', 'completed_at'], base_query='Job' ) elif table_type == 'printers': config = create_table_config( 'printers', ['id', 'name', 'ip_address', 'status', 'location', 'model'], base_query='Printer' ) elif table_type == 'users': config = create_table_config( 'users', ['id', 'name', 'email', 'role', 'active', 'last_login'], base_query='User' ) else: return jsonify({'error': 'Unbekannter Tabellen-Typ für Export'}), 400 # Erweiterte Abfrage für Export-Daten erstellen query_builder = AdvancedTableQuery(config) # Filter aus Query-Parametern anwenden if 'filters' in query_params: for filter_data in query_params['filters']: query_builder.add_filter( filter_data['column'], filter_data['operator'], filter_data['value'] ) # Sortierung anwenden if 'sort' in query_params: query_builder.set_sorting( query_params['sort']['column'], query_params['sort']['direction'] ) # Für Export: Alle Daten ohne Paginierung query_builder.set_pagination(1, 10000) # Maximale Anzahl für Export # Daten abrufen result = query_builder.execute() export_data = result.get('data', []) if export_format == 'csv': import csv import io # CSV-Export implementierung output = io.StringIO() writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL) # Header-Zeile schreiben if export_data: headers = list(export_data[0].keys()) writer.writerow(headers) # Daten-Zeilen schreiben for row in export_data: # Werte für CSV formatieren formatted_row = [] for value in row.values(): if value is None: formatted_row.append('') elif isinstance(value, datetime): formatted_row.append(value.strftime('%d.%m.%Y %H:%M:%S')) else: formatted_row.append(str(value)) writer.writerow(formatted_row) # Response erstellen csv_content = output.getvalue() output.close() response = make_response(csv_content) response.headers['Content-Type'] = 'text/csv; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename="{table_type}_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv"' app_logger.info(f"✅ CSV-Export erfolgreich: {len(export_data)} Datensätze") return response elif export_format == 'json': # JSON-Export implementierung json_content = json.dumps(export_data, indent=2, default=str, ensure_ascii=False) response = make_response(json_content) response.headers['Content-Type'] = 'application/json; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename="{table_type}_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json"' app_logger.info(f"✅ JSON-Export erfolgreich: {len(export_data)} Datensätze") return response elif export_format == 'excel': # Excel-Export implementierung (falls openpyxl verfügbar) try: import openpyxl from openpyxl.utils.dataframe import dataframe_to_rows import pandas as pd # DataFrame erstellen df = pd.DataFrame(export_data) # Excel-Datei in Memory erstellen output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, sheet_name=table_type.capitalize(), index=False) output.seek(0) response = make_response(output.getvalue()) response.headers['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' response.headers['Content-Disposition'] = f'attachment; filename="{table_type}_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx"' app_logger.info(f"✅ Excel-Export erfolgreich: {len(export_data)} Datensätze") return response except ImportError: app_logger.warning("⚠️ Excel-Export nicht verfügbar - openpyxl/pandas fehlt") return jsonify({'error': 'Excel-Export nicht verfügbar - erforderliche Bibliotheken fehlen'}), 400 except Exception as e: app_logger.error(f"Fehler beim Tabellen-Export: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/tables/client-js', methods=['GET']) def get_tables_js(): """Liefert Client-seitige Advanced Tables JavaScript""" try: js_content = get_advanced_tables_js() response = make_response(js_content) response.headers['Content-Type'] = 'application/javascript' response.headers['Cache-Control'] = 'public, max-age=3600' return response except Exception as e: app_logger.error(f"Fehler beim Laden des Tables-JS: {str(e)}") return "console.error('Advanced Tables JavaScript konnte nicht geladen werden');", 500 @app.route('/api/tables/client-css', methods=['GET']) def get_tables_css(): """Liefert Client-seitige Advanced Tables CSS""" try: css_content = get_advanced_tables_css() response = make_response(css_content) response.headers['Content-Type'] = 'text/css' response.headers['Cache-Control'] = 'public, max-age=3600' return response except Exception as e: app_logger.error(f"Fehler beim Laden des Tables-CSS: {str(e)}") return "/* Advanced Tables CSS konnte nicht geladen werden */", 500 # ===== MAINTENANCE SYSTEM API ===== @app.route('/api/admin/maintenance/clear-cache', methods=['POST']) @login_required @admin_required def api_clear_cache(): """Leert den System-Cache""" try: app_logger.info(f"🧹 Cache-Löschung gestartet von Benutzer {current_user.username}") # Flask-Cache leeren (falls vorhanden) if hasattr(app, 'cache'): app.cache.clear() # Temporäre Dateien löschen import tempfile temp_dir = tempfile.gettempdir() myp_temp_files = [] try: for root, dirs, files in os.walk(temp_dir): for file in files: if 'myp_' in file.lower() or 'tba_' in file.lower(): file_path = os.path.join(root, file) try: os.remove(file_path) myp_temp_files.append(file) except: pass except Exception as e: app_logger.warning(f"Fehler beim Löschen temporärer Dateien: {str(e)}") # Python-Cache leeren import gc gc.collect() app_logger.info(f"✅ Cache erfolgreich geleert. {len(myp_temp_files)} temporäre Dateien entfernt") return jsonify({ 'success': True, 'message': f'Cache erfolgreich geleert. {len(myp_temp_files)} temporäre Dateien entfernt.', 'details': { 'temp_files_removed': len(myp_temp_files), 'timestamp': datetime.now().isoformat() } }) except Exception as e: app_logger.error(f"❌ Fehler beim Leeren des Cache: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Leeren des Cache: {str(e)}' }), 500 @app.route('/api/admin/maintenance/optimize-database', methods=['POST']) @login_required @admin_required def api_optimize_database(): """Optimiert die Datenbank""" db_session = get_db_session() try: app_logger.info(f"🔧 Datenbank-Optimierung gestartet von Benutzer {current_user.username}") optimization_results = { 'tables_analyzed': 0, 'indexes_rebuilt': 0, 'space_freed_mb': 0, 'errors': [] } # SQLite-spezifische Optimierungen try: # VACUUM - komprimiert die Datenbank db_session.execute(text("VACUUM;")) optimization_results['space_freed_mb'] += 1 # Geschätzt # ANALYZE - aktualisiert Statistiken db_session.execute(text("ANALYZE;")) optimization_results['tables_analyzed'] += 1 # REINDEX - baut Indizes neu auf db_session.execute(text("REINDEX;")) optimization_results['indexes_rebuilt'] += 1 db_session.commit() except Exception as e: optimization_results['errors'].append(f"SQLite-Optimierung: {str(e)}") app_logger.warning(f"Fehler bei SQLite-Optimierung: {str(e)}") # Verwaiste Dateien bereinigen try: uploads_dir = os.path.join(app.root_path, 'uploads') if os.path.exists(uploads_dir): orphaned_files = 0 for root, dirs, files in os.walk(uploads_dir): for file in files: file_path = os.path.join(root, file) # Prüfe ob Datei älter als 7 Tage und nicht referenziert file_age = datetime.now() - datetime.fromtimestamp(os.path.getctime(file_path)) if file_age.days > 7: try: os.remove(file_path) orphaned_files += 1 except: pass optimization_results['orphaned_files_removed'] = orphaned_files except Exception as e: optimization_results['errors'].append(f"Datei-Bereinigung: {str(e)}") app_logger.info(f"✅ Datenbank-Optimierung abgeschlossen: {optimization_results}") return jsonify({ 'success': True, 'message': 'Datenbank erfolgreich optimiert', 'details': optimization_results }) except Exception as e: db_session.rollback() app_logger.error(f"❌ Fehler bei Datenbank-Optimierung: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler bei der Datenbank-Optimierung: {str(e)}' }), 500 finally: db_session.close() @app.route('/api/admin/maintenance/create-backup', methods=['POST']) @login_required @admin_required def api_create_backup(): """Erstellt ein System-Backup""" try: app_logger.info(f"💾 Backup-Erstellung gestartet von Benutzer {current_user.username}") import zipfile # Backup-Verzeichnis erstellen backup_dir = os.path.join(app.root_path, 'database', 'backups') os.makedirs(backup_dir, exist_ok=True) # Backup-Dateiname mit Zeitstempel timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_filename = f'myp_backup_{timestamp}.zip' backup_path = os.path.join(backup_dir, backup_filename) backup_info = { 'filename': backup_filename, 'created_at': datetime.now().isoformat(), 'created_by': current_user.username, 'size_mb': 0, 'files_included': [] } # ZIP-Backup erstellen with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Datenbank-Datei hinzufügen db_path = os.path.join(app.root_path, 'instance', 'database.db') if os.path.exists(db_path): zipf.write(db_path, 'database.db') backup_info['files_included'].append('database.db') # Konfigurationsdateien hinzufügen config_files = ['config.py', 'requirements.txt', '.env'] for config_file in config_files: config_path = os.path.join(app.root_path, config_file) if os.path.exists(config_path): zipf.write(config_path, config_file) backup_info['files_included'].append(config_file) # Wichtige Upload-Verzeichnisse hinzufügen (nur kleine Dateien) uploads_dir = os.path.join(app.root_path, 'uploads') if os.path.exists(uploads_dir): for root, dirs, files in os.walk(uploads_dir): for file in files: file_path = os.path.join(root, file) file_size = os.path.getsize(file_path) # Nur Dateien unter 10MB hinzufügen if file_size < 10 * 1024 * 1024: rel_path = os.path.relpath(file_path, app.root_path) zipf.write(file_path, rel_path) backup_info['files_included'].append(rel_path) # Backup-Größe berechnen backup_size = os.path.getsize(backup_path) backup_info['size_mb'] = round(backup_size / (1024 * 1024), 2) # Alte Backups bereinigen (nur die letzten 10 behalten) try: backup_files = [] for file in os.listdir(backup_dir): if file.startswith('myp_backup_') and file.endswith('.zip'): file_path = os.path.join(backup_dir, file) backup_files.append((file_path, os.path.getctime(file_path))) # Nach Erstellungszeit sortieren backup_files.sort(key=lambda x: x[1], reverse=True) # Alte Backups löschen (mehr als 10) for old_backup, _ in backup_files[10:]: try: os.remove(old_backup) app_logger.info(f"Altes Backup gelöscht: {os.path.basename(old_backup)}") except: pass except Exception as e: app_logger.warning(f"Fehler beim Bereinigen alter Backups: {str(e)}") app_logger.info(f"✅ Backup erfolgreich erstellt: {backup_filename} ({backup_info['size_mb']} MB)") return jsonify({ 'success': True, 'message': f'Backup erfolgreich erstellt: {backup_filename}', 'details': backup_info }) except Exception as e: app_logger.error(f"❌ Fehler bei Backup-Erstellung: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler bei der Backup-Erstellung: {str(e)}' }), 500 @app.route('/api/maintenance/tasks', methods=['GET', 'POST']) @login_required def maintenance_tasks(): """Wartungsaufgaben abrufen oder erstellen""" if request.method == 'GET': try: filters = { 'printer_id': request.args.get('printer_id', type=int), 'status': request.args.get('status'), 'priority': request.args.get('priority'), 'due_date_from': request.args.get('due_date_from'), 'due_date_to': request.args.get('due_date_to') } tasks = maintenance_manager.get_tasks(filters) return jsonify({ 'tasks': [task.to_dict() for task in tasks], 'total': len(tasks) }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Wartungsaufgaben: {str(e)}") return jsonify({'error': str(e)}), 500 elif request.method == 'POST': try: data = request.get_json() or {} task = create_maintenance_task( printer_id=data.get('printer_id'), task_type=MaintenanceType(data.get('task_type')), title=data.get('title'), description=data.get('description'), priority=data.get('priority', 'normal'), assigned_to=data.get('assigned_to'), due_date=data.get('due_date') ) if task: # Dashboard-Event senden emit_system_alert( f"Neue Wartungsaufgabe erstellt: {task.title}", alert_type="info", priority=task.priority ) return jsonify({ 'success': True, 'task': task.to_dict(), 'message': 'Wartungsaufgabe erfolgreich erstellt' }) else: return jsonify({'error': 'Fehler beim Erstellen der Wartungsaufgabe'}), 500 except Exception as e: app_logger.error(f"Fehler beim Erstellen der Wartungsaufgabe: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/maintenance/tasks//status', methods=['PUT']) @login_required def update_maintenance_task_status(task_id): """Aktualisiert den Status einer Wartungsaufgabe""" try: data = request.get_json() or {} new_status = MaintenanceStatus(data.get('status')) notes = data.get('notes', '') success = update_maintenance_status( task_id=task_id, new_status=new_status, updated_by=current_user.id, notes=notes ) if success: return jsonify({ 'success': True, 'message': 'Wartungsaufgaben-Status erfolgreich aktualisiert' }) else: return jsonify({'error': 'Fehler beim Aktualisieren des Status'}), 500 except Exception as e: app_logger.error(f"Fehler beim Aktualisieren des Wartungsaufgaben-Status: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/maintenance/overview', methods=['GET']) @login_required def get_maintenance_overview(): """Holt Wartungs-Übersicht""" try: overview = get_maintenance_overview() return jsonify(overview) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Wartungs-Übersicht: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/maintenance/schedule', methods=['POST']) @login_required @admin_required def schedule_maintenance_api(): """Plant automatische Wartungen""" try: data = request.get_json() or {} schedule = schedule_maintenance( printer_id=data.get('printer_id'), maintenance_type=MaintenanceType(data.get('maintenance_type')), interval_days=data.get('interval_days'), start_date=data.get('start_date') ) if schedule: return jsonify({ 'success': True, 'schedule': schedule.to_dict(), 'message': 'Wartungsplan erfolgreich erstellt' }) else: return jsonify({'error': 'Fehler beim Erstellen des Wartungsplans'}), 500 except Exception as e: app_logger.error(f"Fehler beim Planen der Wartung: {str(e)}") return jsonify({'error': str(e)}), 500 # ===== MULTI-LOCATION SYSTEM API ===== @app.route('/api/locations', methods=['GET', 'POST']) @login_required def locations(): """Standorte abrufen oder erstellen""" if request.method == 'GET': try: filters = { 'location_type': request.args.get('type'), 'active_only': request.args.get('active_only', 'true').lower() == 'true' } locations = location_manager.get_locations(filters) return jsonify({ 'locations': [loc.to_dict() for loc in locations], 'total': len(locations) }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Standorte: {str(e)}") return jsonify({'error': str(e)}), 500 elif request.method == 'POST': try: data = request.get_json() or {} location = create_location( name=data.get('name'), location_type=LocationType(data.get('type')), address=data.get('address'), description=data.get('description'), coordinates=data.get('coordinates'), parent_location_id=data.get('parent_location_id') ) if location: return jsonify({ 'success': True, 'location': location.to_dict(), 'message': 'Standort erfolgreich erstellt' }) else: return jsonify({'error': 'Fehler beim Erstellen des Standorts'}), 500 except Exception as e: app_logger.error(f"Fehler beim Erstellen des Standorts: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/locations//users', methods=['GET', 'POST']) @login_required @admin_required def location_users(location_id): """Benutzer-Zuweisungen für einen Standort verwalten""" if request.method == 'GET': try: users = location_manager.get_location_users(location_id) return jsonify({ 'location_id': location_id, 'users': [user.to_dict() for user in users], 'total': len(users) }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Standort-Benutzer: {str(e)}") return jsonify({'error': str(e)}), 500 elif request.method == 'POST': try: data = request.get_json() or {} success = assign_user_to_location( user_id=data.get('user_id'), location_id=location_id, access_level=AccessLevel(data.get('access_level', 'READ')), valid_until=data.get('valid_until') ) if success: return jsonify({ 'success': True, 'message': 'Benutzer erfolgreich zu Standort zugewiesen' }) else: return jsonify({'error': 'Fehler bei der Benutzer-Zuweisung'}), 500 except Exception as e: app_logger.error(f"Fehler bei der Benutzer-Zuweisung: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/locations/user/', methods=['GET']) @login_required def get_user_locations_api(user_id): """Holt alle Standorte eines Benutzers""" try: # Berechtigung prüfen if current_user.id != user_id and not current_user.is_admin: return jsonify({'error': 'Keine Berechtigung'}), 403 locations = get_user_locations(user_id) return jsonify({ 'user_id': user_id, 'locations': [loc.to_dict() for loc in locations], 'total': len(locations) }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Benutzer-Standorte: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/locations/distance', methods=['POST']) @login_required def calculate_distance_api(): """Berechnet Entfernung zwischen zwei Standorten""" try: data = request.get_json() or {} coord1 = data.get('coordinates1') # [lat, lon] coord2 = data.get('coordinates2') # [lat, lon] if not coord1 or not coord2: return jsonify({'error': 'Koordinaten erforderlich'}), 400 distance = calculate_distance(coord1, coord2) return jsonify({ 'distance_km': distance, 'distance_m': distance * 1000 }) except Exception as e: app_logger.error(f"Fehler bei Entfernungsberechnung: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/locations/nearest', methods=['POST']) @login_required def find_nearest_location_api(): """Findet den nächstgelegenen Standort""" try: data = request.get_json() or {} coordinates = data.get('coordinates') # [lat, lon] location_type = data.get('location_type') max_distance = data.get('max_distance', 50) # km if not coordinates: return jsonify({'error': 'Koordinaten erforderlich'}), 400 nearest = find_nearest_location( coordinates=coordinates, location_type=LocationType(location_type) if location_type else None, max_distance_km=max_distance ) if nearest: location, distance = nearest return jsonify({ 'location': location.to_dict(), 'distance_km': distance }) else: return jsonify({ 'location': None, 'message': 'Kein Standort in der Nähe gefunden' }) except Exception as e: app_logger.error(f"Fehler bei der Suche nach nächstem Standort: {str(e)}") return jsonify({'error': str(e)}), 500 def setup_database_with_migrations(): """ Datenbank initialisieren und alle erforderlichen Tabellen erstellen. Führt Migrationen für neue Tabellen wie JobOrder durch. """ try: app_logger.info("🔄 Starte Datenbank-Setup und Migrationen...") # Standard-Datenbank-Initialisierung init_database() # Explizite Migration für JobOrder-Tabelle engine = get_engine() # Erstelle alle Tabellen (nur neue werden tatsächlich erstellt) Base.metadata.create_all(engine) # Prüfe ob JobOrder-Tabelle existiert from sqlalchemy import inspect inspector = inspect(engine) existing_tables = inspector.get_table_names() if 'job_orders' in existing_tables: app_logger.info("✅ JobOrder-Tabelle bereits vorhanden") else: # Tabelle manuell erstellen JobOrder.__table__.create(engine, checkfirst=True) app_logger.info("✅ JobOrder-Tabelle erfolgreich erstellt") # Initial-Admin erstellen falls nicht vorhanden create_initial_admin() app_logger.info("✅ Datenbank-Setup und Migrationen erfolgreich abgeschlossen") except Exception as e: app_logger.error(f"❌ Fehler bei Datenbank-Setup: {str(e)}") raise e # ===== LOG-MANAGEMENT API ===== @app.route("/api/logs", methods=['GET']) @login_required @admin_required def api_logs(): """ API-Endpunkt für Log-Daten-Abruf Query Parameter: level: Log-Level Filter (DEBUG, INFO, WARNING, ERROR, CRITICAL) limit: Anzahl der Einträge (Standard: 100, Max: 1000) offset: Offset für Paginierung (Standard: 0) search: Suchbegriff für Log-Nachrichten start_date: Start-Datum (ISO-Format) end_date: End-Datum (ISO-Format) """ try: # Parameter aus Query-String extrahieren level = request.args.get('level', '').upper() limit = min(int(request.args.get('limit', 100)), 1000) offset = int(request.args.get('offset', 0)) search = request.args.get('search', '').strip() start_date = request.args.get('start_date') end_date = request.args.get('end_date') # Log-Dateien aus dem logs-Verzeichnis lesen import os import glob from datetime import datetime, timedelta logs_dir = os.path.join(os.path.dirname(__file__), 'logs') log_entries = [] if os.path.exists(logs_dir): # Alle .log Dateien finden log_files = glob.glob(os.path.join(logs_dir, '*.log')) log_files.sort(key=os.path.getmtime, reverse=True) # Neueste zuerst # Datum-Filter vorbereiten start_dt = None end_dt = None if start_date: try: start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) except: pass if end_date: try: end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) except: pass # Log-Dateien durchgehen (maximal die letzten 5 Dateien) for log_file in log_files[:5]: try: with open(log_file, 'r', encoding='utf-8') as f: lines = f.readlines() # Zeilen rückwärts durchgehen (neueste zuerst) for line in reversed(lines): line = line.strip() if not line: continue # Log-Zeile parsen try: # Format: 2025-06-01 00:34:08 - logger_name - [LEVEL] MESSAGE parts = line.split(' - ', 3) if len(parts) >= 4: timestamp_str = parts[0] logger_name = parts[1] level_part = parts[2] message = parts[3] # Level extrahieren if level_part.startswith('[') and ']' in level_part: log_level = level_part.split(']')[0][1:] else: log_level = 'INFO' # Timestamp parsen try: log_timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') except: continue # Filter anwenden if level and log_level != level: continue if start_dt and log_timestamp < start_dt: continue if end_dt and log_timestamp > end_dt: continue if search and search.lower() not in message.lower(): continue log_entries.append({ 'timestamp': log_timestamp.isoformat(), 'level': log_level, 'logger': logger_name, 'message': message, 'file': os.path.basename(log_file) }) except Exception as parse_error: # Fehlerhafte Zeile überspringen continue except Exception as file_error: app_logger.error(f"Fehler beim Lesen der Log-Datei {log_file}: {str(file_error)}") continue # Sortieren nach Timestamp (neueste zuerst) log_entries.sort(key=lambda x: x['timestamp'], reverse=True) # Paginierung anwenden total_count = len(log_entries) paginated_entries = log_entries[offset:offset + limit] return jsonify({ 'success': True, 'logs': paginated_entries, 'pagination': { 'total': total_count, 'limit': limit, 'offset': offset, 'has_more': offset + limit < total_count }, 'filters': { 'level': level or None, 'search': search or None, 'start_date': start_date, 'end_date': end_date } }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Log-Daten: {str(e)}") return jsonify({ 'error': f'Fehler beim Abrufen der Log-Daten: {str(e)}' }), 500 @app.route('/api/admin/logs', methods=['GET']) @login_required @admin_required def api_admin_logs(): """ Admin-spezifischer API-Endpunkt für Log-Daten-Abruf Erweiterte Version von /api/logs mit zusätzlichen Admin-Funktionen """ try: # Parameter aus Query-String extrahieren level = request.args.get('level', '').upper() if level == 'ALL': level = '' limit = min(int(request.args.get('limit', 100)), 1000) offset = int(request.args.get('offset', 0)) search = request.args.get('search', '').strip() component = request.args.get('component', '') # Verbesserter Log-Parser mit mehr Kategorien import os import glob from datetime import datetime, timedelta logs_dir = os.path.join(os.path.dirname(__file__), 'logs') log_entries = [] if os.path.exists(logs_dir): # Alle .log Dateien aus allen Unterverzeichnissen finden log_patterns = [ os.path.join(logs_dir, '*.log'), os.path.join(logs_dir, '*', '*.log'), os.path.join(logs_dir, '*', '*', '*.log') ] all_log_files = [] for pattern in log_patterns: all_log_files.extend(glob.glob(pattern)) # Nach Modifikationszeit sortieren (neueste zuerst) all_log_files.sort(key=os.path.getmtime, reverse=True) # Maximal 10 Dateien verarbeiten für Performance for log_file in all_log_files[:10]: try: # Kategorie aus Dateipfad ableiten rel_path = os.path.relpath(log_file, logs_dir) file_component = os.path.dirname(rel_path) if os.path.dirname(rel_path) != '.' else 'system' # Component-Filter anwenden if component and component.lower() != file_component.lower(): continue with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines()[-500:] # Nur die letzten 500 Zeilen pro Datei # Zeilen verarbeiten (neueste zuerst) for line in reversed(lines): line = line.strip() if not line or line.startswith('#'): continue # Verschiedene Log-Formate unterstützen log_entry = None # Format 1: 2025-06-01 00:34:08 - logger_name - [LEVEL] MESSAGE if ' - ' in line and '[' in line and ']' in line: try: parts = line.split(' - ', 3) if len(parts) >= 4: timestamp_str = parts[0] logger_name = parts[1] level_part = parts[2] message = parts[3] # Level extrahieren if '[' in level_part and ']' in level_part: log_level = level_part.split('[')[1].split(']')[0] else: log_level = 'INFO' # Timestamp parsen log_timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') log_entry = { 'timestamp': log_timestamp.isoformat(), 'level': log_level.upper(), 'component': file_component, 'logger': logger_name, 'message': message.strip(), 'source_file': os.path.basename(log_file) } except: pass # Format 2: [TIMESTAMP] LEVEL: MESSAGE elif line.startswith('[') and ']' in line and ':' in line: try: bracket_end = line.find(']') timestamp_str = line[1:bracket_end] rest = line[bracket_end+1:].strip() if ':' in rest: level_msg = rest.split(':', 1) log_level = level_msg[0].strip() message = level_msg[1].strip() # Timestamp parsen (verschiedene Formate probieren) log_timestamp = None for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%d.%m.%Y %H:%M:%S']: try: log_timestamp = datetime.strptime(timestamp_str, fmt) break except: continue if log_timestamp: log_entry = { 'timestamp': log_timestamp.isoformat(), 'level': log_level.upper(), 'component': file_component, 'logger': file_component, 'message': message, 'source_file': os.path.basename(log_file) } except: pass # Format 3: Einfaches Format ohne spezielle Struktur else: # Als INFO-Level behandeln mit aktuellem Timestamp log_entry = { 'timestamp': datetime.now().isoformat(), 'level': 'INFO', 'component': file_component, 'logger': file_component, 'message': line, 'source_file': os.path.basename(log_file) } # Entry hinzufügen wenn erfolgreich geparst if log_entry: # Filter anwenden if level and log_entry['level'] != level: continue if search and search.lower() not in log_entry['message'].lower(): continue log_entries.append(log_entry) # Limit pro Datei (Performance) if len([e for e in log_entries if e['source_file'] == os.path.basename(log_file)]) >= 50: break except Exception as file_error: app_logger.warning(f"Fehler beim Verarbeiten der Log-Datei {log_file}: {str(file_error)}") continue # Eindeutige Entries und Sortierung unique_entries = [] seen_messages = set() for entry in log_entries: # Duplikate vermeiden basierend auf Timestamp + Message key = f"{entry['timestamp']}_{entry['message'][:100]}" if key not in seen_messages: seen_messages.add(key) unique_entries.append(entry) # Nach Timestamp sortieren (neueste zuerst) unique_entries.sort(key=lambda x: x['timestamp'], reverse=True) # Paginierung anwenden total_count = len(unique_entries) paginated_entries = unique_entries[offset:offset + limit] # Statistiken sammeln level_stats = {} component_stats = {} for entry in unique_entries: level_stats[entry['level']] = level_stats.get(entry['level'], 0) + 1 component_stats[entry['component']] = component_stats.get(entry['component'], 0) + 1 app_logger.debug(f"📋 Log-API: {total_count} Einträge gefunden, {len(paginated_entries)} zurückgegeben") return jsonify({ 'success': True, 'logs': paginated_entries, 'pagination': { 'total': total_count, 'limit': limit, 'offset': offset, 'has_more': offset + limit < total_count }, 'filters': { 'level': level or None, 'search': search or None, 'component': component or None }, 'statistics': { 'total_entries': total_count, 'level_distribution': level_stats, 'component_distribution': component_stats } }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Admin-Log-Daten: {str(e)}") return jsonify({ 'success': False, 'error': f'Fehler beim Abrufen der Log-Daten: {str(e)}', 'logs': [] }), 500 @app.route('/api/admin/logs/export', methods=['GET']) @login_required @admin_required def export_admin_logs(): """ Exportiert System-Logs als ZIP-Datei Sammelt alle verfügbaren Log-Dateien und komprimiert sie in eine herunterladbare ZIP-Datei """ try: import os import zipfile import tempfile from datetime import datetime # Temporäre ZIP-Datei erstellen temp_dir = tempfile.mkdtemp() zip_filename = f"myp_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" zip_path = os.path.join(temp_dir, zip_filename) log_dir = os.path.join(os.path.dirname(__file__), 'logs') # Prüfen ob Log-Verzeichnis existiert if not os.path.exists(log_dir): app_logger.warning(f"Log-Verzeichnis nicht gefunden: {log_dir}") return jsonify({ "success": False, "message": "Log-Verzeichnis nicht gefunden" }), 404 # ZIP-Datei erstellen und Log-Dateien hinzufügen files_added = 0 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(log_dir): for file in files: if file.endswith('.log'): file_path = os.path.join(root, file) try: # Relativen Pfad für Archiv erstellen arcname = os.path.relpath(file_path, log_dir) zipf.write(file_path, arcname) files_added += 1 app_logger.debug(f"Log-Datei hinzugefügt: {arcname}") except Exception as file_error: app_logger.warning(f"Fehler beim Hinzufügen der Datei {file_path}: {str(file_error)}") continue # Prüfen ob Dateien hinzugefügt wurden if files_added == 0: # Leere ZIP-Datei löschen try: os.remove(zip_path) os.rmdir(temp_dir) except: pass return jsonify({ "success": False, "message": "Keine Log-Dateien zum Exportieren gefunden" }), 404 app_logger.info(f"System-Logs exportiert: {files_added} Dateien in {zip_filename}") # ZIP-Datei als Download senden return send_file( zip_path, as_attachment=True, download_name=zip_filename, mimetype='application/zip' ) except Exception as e: app_logger.error(f"Fehler beim Exportieren der Logs: {str(e)}") return jsonify({ "success": False, "message": f"Fehler beim Exportieren: {str(e)}" }), 500 # ===== FEHLENDE ADMIN API-ENDPUNKTE ===== @app.route("/api/admin/database/status", methods=['GET']) @login_required @admin_required def api_admin_database_status(): """ API-Endpunkt für erweiterten Datenbank-Gesundheitsstatus. Führt umfassende Datenbank-Diagnose durch und liefert detaillierte Statusinformationen für den Admin-Bereich. Returns: JSON: Detaillierter Datenbank-Gesundheitsstatus """ try: app_logger.info(f"Datenbank-Gesundheitscheck gestartet von Admin-User {current_user.id}") # Datenbankverbindung mit Timeout db_session = get_db_session() start_time = time.time() # 1. Basis-Datenbankverbindung testen mit Timeout connection_status = "OK" connection_time_ms = 0 try: query_start = time.time() result = db_session.execute(text("SELECT 1 as test_connection")).fetchone() connection_time_ms = round((time.time() - query_start) * 1000, 2) if connection_time_ms > 5000: # 5 Sekunden connection_status = f"LANGSAM: {connection_time_ms}ms" elif not result: connection_status = "FEHLER: Keine Antwort" except Exception as e: connection_status = f"FEHLER: {str(e)[:100]}" app_logger.error(f"Datenbankverbindungsfehler: {str(e)}") # 2. Erweiterte Schema-Integrität prüfen schema_status = {"status": "OK", "details": {}, "missing_tables": [], "table_counts": {}} try: required_tables = { 'users': 'Benutzer-Verwaltung', 'printers': 'Drucker-Verwaltung', 'jobs': 'Druck-Aufträge', 'guest_requests': 'Gast-Anfragen', 'settings': 'System-Einstellungen' } existing_tables = [] table_counts = {} for table_name, description in required_tables.items(): try: count_result = db_session.execute(text(f"SELECT COUNT(*) as count FROM {table_name}")).fetchone() table_count = count_result[0] if count_result else 0 existing_tables.append(table_name) table_counts[table_name] = table_count schema_status["details"][table_name] = { "exists": True, "count": table_count, "description": description } except Exception as table_error: schema_status["missing_tables"].append(table_name) schema_status["details"][table_name] = { "exists": False, "error": str(table_error)[:50], "description": description } app_logger.warning(f"Tabelle {table_name} nicht verfügbar: {str(table_error)}") schema_status["table_counts"] = table_counts if len(schema_status["missing_tables"]) > 0: schema_status["status"] = f"WARNUNG: {len(schema_status['missing_tables'])} fehlende Tabellen" elif len(existing_tables) != len(required_tables): schema_status["status"] = f"UNVOLLSTÄNDIG: {len(existing_tables)}/{len(required_tables)} Tabellen" except Exception as e: schema_status["status"] = f"FEHLER: {str(e)[:100]}" app_logger.error(f"Schema-Integritätsprüfung fehlgeschlagen: {str(e)}") # 3. Migrations-Status und Versionsinformationen migration_info = {"status": "Unbekannt", "version": None, "details": {}} try: # Alembic-Version prüfen try: result = db_session.execute(text("SELECT version_num FROM alembic_version ORDER BY version_num DESC LIMIT 1")).fetchone() if result: migration_info["version"] = result[0] migration_info["status"] = "Alembic-Migration aktiv" migration_info["details"]["alembic"] = True else: migration_info["status"] = "Keine Alembic-Migration gefunden" migration_info["details"]["alembic"] = False except Exception: # Fallback: Schema-Informationen sammeln try: # SQLite-spezifische Abfrage tables_result = db_session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall() if tables_result: table_list = [row[0] for row in tables_result] migration_info["status"] = f"Schema mit {len(table_list)} Tabellen erkannt" migration_info["details"]["detected_tables"] = table_list migration_info["details"]["alembic"] = False else: migration_info["status"] = "Keine Tabellen erkannt" except Exception: # Weitere Datenbank-Engines migration_info["status"] = "Schema-Erkennung nicht möglich" migration_info["details"]["alembic"] = False except Exception as e: migration_info["status"] = f"FEHLER: {str(e)[:100]}" app_logger.error(f"Migrations-Statusprüfung fehlgeschlagen: {str(e)}") # 4. Performance-Benchmarks performance_info = {"status": "OK", "benchmarks": {}, "overall_score": 100} try: benchmarks = {} # Einfache Select-Query start = time.time() db_session.execute(text("SELECT COUNT(*) FROM users")).fetchone() benchmarks["simple_select"] = round((time.time() - start) * 1000, 2) # Join-Query (falls möglich) try: start = time.time() db_session.execute(text("SELECT u.username, COUNT(j.id) FROM users u LEFT JOIN jobs j ON u.id = j.user_id GROUP BY u.id LIMIT 5")).fetchall() benchmarks["join_query"] = round((time.time() - start) * 1000, 2) except Exception: benchmarks["join_query"] = None # Insert/Update-Performance simulieren try: start = time.time() db_session.execute(text("SELECT 1 WHERE EXISTS (SELECT 1 FROM users LIMIT 1)")).fetchone() benchmarks["exists_check"] = round((time.time() - start) * 1000, 2) except Exception: benchmarks["exists_check"] = None performance_info["benchmarks"] = benchmarks # Performance-Score berechnen avg_time = sum(t for t in benchmarks.values() if t is not None) / len([t for t in benchmarks.values() if t is not None]) if avg_time < 10: performance_info["status"] = "AUSGEZEICHNET" performance_info["overall_score"] = 100 elif avg_time < 50: performance_info["status"] = "GUT" performance_info["overall_score"] = 85 elif avg_time < 200: performance_info["status"] = "AKZEPTABEL" performance_info["overall_score"] = 70 elif avg_time < 1000: performance_info["status"] = "LANGSAM" performance_info["overall_score"] = 50 else: performance_info["status"] = "SEHR LANGSAM" performance_info["overall_score"] = 25 except Exception as e: performance_info["status"] = f"FEHLER: {str(e)[:100]}" performance_info["overall_score"] = 0 app_logger.error(f"Performance-Benchmark fehlgeschlagen: {str(e)}") # 5. Datenbankgröße und Speicher-Informationen storage_info = {"size": "Unbekannt", "details": {}} try: # SQLite-Datei-Größe db_uri = current_app.config.get('SQLALCHEMY_DATABASE_URI', '') if 'sqlite:///' in db_uri: db_file_path = db_uri.replace('sqlite:///', '') if os.path.exists(db_file_path): file_size = os.path.getsize(db_file_path) storage_info["size"] = f"{file_size / (1024 * 1024):.2f} MB" storage_info["details"]["file_path"] = db_file_path storage_info["details"]["last_modified"] = datetime.fromtimestamp(os.path.getmtime(db_file_path)).isoformat() # Speicherplatz-Warnung try: import shutil total, used, free = shutil.disk_usage(os.path.dirname(db_file_path)) free_gb = free / (1024**3) storage_info["details"]["disk_free_gb"] = round(free_gb, 2) if free_gb < 1: storage_info["warning"] = "Kritisch wenig Speicherplatz" elif free_gb < 5: storage_info["warning"] = "Wenig Speicherplatz verfügbar" except Exception: pass else: # Für andere Datenbanken: Versuche Größe über Metadaten zu ermitteln storage_info["size"] = "Externe Datenbank" storage_info["details"]["database_type"] = "Nicht-SQLite" except Exception as e: storage_info["size"] = f"FEHLER: {str(e)[:50]}" app_logger.warning(f"Speicher-Informationen nicht verfügbar: {str(e)}") # 6. Aktuelle Verbindungs-Pool-Informationen connection_pool_info = {"status": "Nicht verfügbar", "details": {}} try: # SQLAlchemy Pool-Status (falls verfügbar) engine = db_session.get_bind() if hasattr(engine, 'pool'): pool = engine.pool connection_pool_info["details"]["pool_size"] = getattr(pool, 'size', lambda: 'N/A')() connection_pool_info["details"]["checked_in"] = getattr(pool, 'checkedin', lambda: 'N/A')() connection_pool_info["details"]["checked_out"] = getattr(pool, 'checkedout', lambda: 'N/A')() connection_pool_info["status"] = "Pool aktiv" else: connection_pool_info["status"] = "Kein Pool konfiguriert" except Exception as e: connection_pool_info["status"] = f"Pool-Status nicht verfügbar: {str(e)[:50]}" db_session.close() # Gesamtstatus ermitteln overall_status = "healthy" health_score = 100 critical_issues = [] warnings = [] # Kritische Probleme if "FEHLER" in connection_status: overall_status = "critical" health_score -= 50 critical_issues.append("Datenbankverbindung fehlgeschlagen") if "FEHLER" in schema_status["status"]: overall_status = "critical" health_score -= 30 critical_issues.append("Schema-Integrität kompromittiert") if performance_info["overall_score"] < 25: overall_status = "critical" if overall_status != "critical" else overall_status health_score -= 25 critical_issues.append("Extreme Performance-Probleme") # Warnungen if "WARNUNG" in schema_status["status"] or len(schema_status["missing_tables"]) > 0: if overall_status == "healthy": overall_status = "warning" health_score -= 15 warnings.append(f"Schema-Probleme: {len(schema_status['missing_tables'])} fehlende Tabellen") if "LANGSAM" in connection_status: if overall_status == "healthy": overall_status = "warning" health_score -= 10 warnings.append("Langsame Datenbankverbindung") if "warning" in storage_info: if overall_status == "healthy": overall_status = "warning" health_score -= 15 warnings.append(storage_info["warning"]) health_score = max(0, health_score) # Nicht unter 0 total_time = round((time.time() - start_time) * 1000, 2) result = { "success": True, "status": overall_status, "health_score": health_score, "critical_issues": critical_issues, "warnings": warnings, "connection": { "status": connection_status, "response_time_ms": connection_time_ms }, "schema": schema_status, "migration": migration_info, "performance": performance_info, "storage": storage_info, "connection_pool": connection_pool_info, "timestamp": datetime.now().isoformat(), "check_duration_ms": total_time, "summary": { "database_responsive": "FEHLER" not in connection_status, "schema_complete": len(schema_status["missing_tables"]) == 0, "performance_acceptable": performance_info["overall_score"] >= 50, "storage_adequate": "warning" not in storage_info, "overall_healthy": overall_status == "healthy" } } app_logger.info(f"Datenbank-Gesundheitscheck abgeschlossen: Status={overall_status}, Score={health_score}, Dauer={total_time}ms") return jsonify(result) except Exception as e: app_logger.error(f"Kritischer Fehler beim Datenbank-Gesundheitscheck: {str(e)}") return jsonify({ "success": False, "error": f"Kritischer Systemfehler: {str(e)}", "status": "critical", "health_score": 0, "critical_issues": ["System-Gesundheitscheck fehlgeschlagen"], "warnings": [], "connection": {"status": "FEHLER bei der Prüfung"}, "schema": {"status": "FEHLER bei der Prüfung"}, "migration": {"status": "FEHLER bei der Prüfung"}, "performance": {"status": "FEHLER bei der Prüfung"}, "storage": {"size": "FEHLER bei der Prüfung"}, "timestamp": datetime.now().isoformat(), "summary": { "database_responsive": False, "schema_complete": False, "performance_acceptable": False, "storage_adequate": False, "overall_healthy": False } }), 500 @app.route("/api/admin/system/status", methods=['GET']) @login_required @admin_required def api_admin_system_status(): """ API-Endpunkt für System-Status-Informationen Liefert detaillierte Informationen über den Zustand des Systems """ try: import psutil import platform import subprocess # System-Informationen mit robuster String-Behandlung system_info = { 'platform': str(platform.system() or 'Unknown'), 'platform_release': str(platform.release() or 'Unknown'), 'platform_version': str(platform.version() or 'Unknown'), 'architecture': str(platform.machine() or 'Unknown'), 'processor': str(platform.processor() or 'Unknown'), 'python_version': str(platform.python_version() or 'Unknown'), 'hostname': str(platform.node() or 'Unknown') } # CPU-Informationen mit Fehlerbehandlung try: cpu_freq = psutil.cpu_freq() cpu_info = { 'physical_cores': psutil.cpu_count(logical=False) or 0, 'total_cores': psutil.cpu_count(logical=True) or 0, 'max_frequency': float(cpu_freq.max) if cpu_freq and cpu_freq.max else 0.0, 'current_frequency': float(cpu_freq.current) if cpu_freq and cpu_freq.current else 0.0, 'cpu_usage_percent': float(psutil.cpu_percent(interval=1)), 'load_average': list(psutil.getloadavg()) if hasattr(psutil, 'getloadavg') else [0.0, 0.0, 0.0] } except Exception as cpu_error: app_logger.warning(f"CPU-Informationen nicht verfügbar: {str(cpu_error)}") cpu_info = { 'physical_cores': 0, 'total_cores': 0, 'max_frequency': 0.0, 'current_frequency': 0.0, 'cpu_usage_percent': 0.0, 'load_average': [0.0, 0.0, 0.0] } # Memory-Informationen mit robuster Fehlerbehandlung try: memory = psutil.virtual_memory() memory_info = { 'total_gb': round(float(memory.total) / (1024**3), 2), 'available_gb': round(float(memory.available) / (1024**3), 2), 'used_gb': round(float(memory.used) / (1024**3), 2), 'percentage': float(memory.percent), 'free_gb': round(float(memory.free) / (1024**3), 2) } except Exception as memory_error: app_logger.warning(f"Memory-Informationen nicht verfügbar: {str(memory_error)}") memory_info = { 'total_gb': 0.0, 'available_gb': 0.0, 'used_gb': 0.0, 'percentage': 0.0, 'free_gb': 0.0 } # Disk-Informationen mit Pfad-Behandlung try: disk_path = '/' if os.name != 'nt' else 'C:\\' disk = psutil.disk_usage(disk_path) disk_info = { 'total_gb': round(float(disk.total) / (1024**3), 2), 'used_gb': round(float(disk.used) / (1024**3), 2), 'free_gb': round(float(disk.free) / (1024**3), 2), 'percentage': round((float(disk.used) / float(disk.total)) * 100, 1) } except Exception as disk_error: app_logger.warning(f"Disk-Informationen nicht verfügbar: {str(disk_error)}") disk_info = { 'total_gb': 0.0, 'used_gb': 0.0, 'free_gb': 0.0, 'percentage': 0.0 } # Netzwerk-Informationen try: network = psutil.net_io_counters() network_info = { 'bytes_sent_mb': round(float(network.bytes_sent) / (1024**2), 2), 'bytes_recv_mb': round(float(network.bytes_recv) / (1024**2), 2), 'packets_sent': int(network.packets_sent), 'packets_recv': int(network.packets_recv) } except Exception as network_error: app_logger.warning(f"Netzwerk-Informationen nicht verfügbar: {str(network_error)}") network_info = {'error': 'Netzwerk-Informationen nicht verfügbar'} # Prozess-Informationen try: current_process = psutil.Process() process_info = { 'pid': int(current_process.pid), 'memory_mb': round(float(current_process.memory_info().rss) / (1024**2), 2), 'cpu_percent': float(current_process.cpu_percent()), 'num_threads': int(current_process.num_threads()), 'create_time': datetime.fromtimestamp(float(current_process.create_time())).isoformat(), 'status': str(current_process.status()) } except Exception as process_error: app_logger.warning(f"Prozess-Informationen nicht verfügbar: {str(process_error)}") process_info = {'error': 'Prozess-Informationen nicht verfügbar'} # Uptime mit robuster Formatierung try: boot_time = psutil.boot_time() current_time = time.time() uptime_seconds = int(current_time - boot_time) # Sichere uptime-Formatierung ohne problematische Format-Strings if uptime_seconds > 0: days = uptime_seconds // 86400 remaining_seconds = uptime_seconds % 86400 hours = remaining_seconds // 3600 minutes = (remaining_seconds % 3600) // 60 # String-Aufbau ohne Format-Operationen uptime_parts = [] if days > 0: uptime_parts.append(str(days) + "d") if hours > 0: uptime_parts.append(str(hours) + "h") if minutes > 0: uptime_parts.append(str(minutes) + "m") uptime_formatted = " ".join(uptime_parts) if uptime_parts else "0m" else: uptime_formatted = "0m" uptime_info = { 'boot_time': datetime.fromtimestamp(float(boot_time)).isoformat(), 'uptime_seconds': uptime_seconds, 'uptime_formatted': uptime_formatted } except Exception as uptime_error: app_logger.warning(f"Uptime-Informationen nicht verfügbar: {str(uptime_error)}") uptime_info = {'error': 'Uptime-Informationen nicht verfügbar'} # Service-Status (Windows/Linux kompatibel) mit robuster Behandlung services_status = {} try: if os.name == 'nt': # Windows # Windows-Services prüfen services_to_check = ['Schedule', 'Themes', 'Spooler'] for service in services_to_check: try: result = subprocess.run( ['sc', 'query', service], capture_output=True, text=True, timeout=5 ) services_status[service] = 'running' if 'RUNNING' in str(result.stdout) else 'stopped' except Exception: services_status[service] = 'unknown' else: # Linux # Linux-Services prüfen services_to_check = ['systemd', 'cron', 'cups'] for service in services_to_check: try: result = subprocess.run( ['systemctl', 'is-active', service], capture_output=True, text=True, timeout=5 ) services_status[service] = str(result.stdout).strip() except Exception: services_status[service] = 'unknown' except Exception as services_error: app_logger.warning(f"Service-Status nicht verfügbar: {str(services_error)}") services_status = {'error': 'Service-Status nicht verfügbar'} # System-Gesundheit bewerten health_status = 'healthy' issues = [] try: if isinstance(cpu_info.get('cpu_usage_percent'), (int, float)) and cpu_info['cpu_usage_percent'] > 80: health_status = 'warning' issues.append('Hohe CPU-Auslastung: ' + str(round(cpu_info['cpu_usage_percent'], 1)) + '%') if isinstance(memory_info.get('percentage'), (int, float)) and memory_info['percentage'] > 85: health_status = 'warning' issues.append('Hohe Memory-Auslastung: ' + str(round(memory_info['percentage'], 1)) + '%') if isinstance(disk_info.get('percentage'), (int, float)) and disk_info['percentage'] > 90: health_status = 'critical' issues.append('Kritisch wenig Speicherplatz: ' + str(round(disk_info['percentage'], 1)) + '%') if isinstance(process_info.get('memory_mb'), (int, float)) and process_info['memory_mb'] > 500: issues.append('Hoher Memory-Verbrauch der Anwendung: ' + str(round(process_info['memory_mb'], 1)) + 'MB') except Exception as health_error: app_logger.warning(f"System-Gesundheit-Bewertung nicht möglich: {str(health_error)}") return jsonify({ 'success': True, 'health_status': health_status, 'issues': issues, 'system_info': system_info, 'cpu_info': cpu_info, 'memory_info': memory_info, 'disk_info': disk_info, 'network_info': network_info, 'process_info': process_info, 'uptime_info': uptime_info, 'services_status': services_status, 'timestamp': datetime.now().isoformat() }) except Exception as e: app_logger.error(f"Fehler beim Abrufen des System-Status: {str(e)}") return jsonify({ 'success': False, 'error': 'Fehler beim Abrufen des System-Status: ' + str(e), 'health_status': 'error' }), 500 # ===== OPTIMIERUNGSSTATUS API ===== @app.route("/api/system/optimization-status", methods=['GET']) def api_optimization_status(): """ API-Endpunkt für den aktuellen Optimierungsstatus. Gibt Informationen über aktivierte Optimierungen zurück. """ try: status = { "optimized_mode_active": USE_OPTIMIZED_CONFIG, "hardware_detected": { "is_raspberry_pi": detect_raspberry_pi(), "forced_optimization": os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes'], "cli_optimization": '--optimized' in sys.argv }, "active_optimizations": { "minified_assets": app.jinja_env.globals.get('use_minified_assets', False), "disabled_animations": app.jinja_env.globals.get('disable_animations', False), "limited_glassmorphism": app.jinja_env.globals.get('limit_glassmorphism', False), "cache_headers": USE_OPTIMIZED_CONFIG, "template_caching": not app.config.get('TEMPLATES_AUTO_RELOAD', True), "json_optimization": not app.config.get('JSON_SORT_KEYS', True) }, "performance_settings": { "max_upload_mb": app.config.get('MAX_CONTENT_LENGTH', 0) / (1024 * 1024) if app.config.get('MAX_CONTENT_LENGTH') else None, "static_cache_age": app.config.get('SEND_FILE_MAX_AGE_DEFAULT', 0), "sqlalchemy_echo": app.config.get('SQLALCHEMY_ECHO', True), "session_secure": app.config.get('SESSION_COOKIE_SECURE', False) } } # Zusätzliche System-Informationen wenn verfügbar try: import psutil import platform status["system_info"] = { "cpu_count": psutil.cpu_count(), "memory_gb": round(psutil.virtual_memory().total / (1024**3), 2), "platform": platform.machine(), "system": platform.system() } except ImportError: status["system_info"] = {"error": "psutil nicht verfügbar"} return jsonify({ "success": True, "status": status, "timestamp": datetime.now().isoformat() }) except Exception as e: app_logger.error(f"Fehler beim Abrufen des Optimierungsstatus: {str(e)}") return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/admin/optimization/toggle", methods=['POST']) @login_required @admin_required def api_admin_toggle_optimization(): """ API-Endpunkt zum Umschalten der Optimierungen zur Laufzeit (nur Admins). Achtung: Einige Optimierungen erfordern einen Neustart. """ try: data = request.get_json() or {} # Welche Optimierung soll umgeschaltet werden? optimization_type = data.get('type') enabled = data.get('enabled', True) changes_made = [] restart_required = False if optimization_type == 'animations': app.jinja_env.globals['disable_animations'] = enabled changes_made.append(f"Animationen {'deaktiviert' if enabled else 'aktiviert'}") elif optimization_type == 'glassmorphism': app.jinja_env.globals['limit_glassmorphism'] = enabled changes_made.append(f"Glassmorphism {'begrenzt' if enabled else 'vollständig'}") elif optimization_type == 'minified_assets': app.jinja_env.globals['use_minified_assets'] = enabled changes_made.append(f"Minifizierte Assets {'aktiviert' if enabled else 'deaktiviert'}") elif optimization_type == 'template_caching': app.config['TEMPLATES_AUTO_RELOAD'] = not enabled changes_made.append(f"Template-Caching {'aktiviert' if enabled else 'deaktiviert'}") restart_required = True elif optimization_type == 'debug_mode': app.config['DEBUG'] = not enabled changes_made.append(f"Debug-Modus {'deaktiviert' if enabled else 'aktiviert'}") restart_required = True else: return jsonify({ "success": False, "error": "Unbekannter Optimierungstyp" }), 400 app_logger.info(f"Admin {current_user.username} hat Optimierung '{optimization_type}' auf {enabled} gesetzt") return jsonify({ "success": True, "changes": changes_made, "restart_required": restart_required, "message": f"Optimierung '{optimization_type}' erfolgreich {'aktiviert' if enabled else 'deaktiviert'}" }) except Exception as e: app_logger.error(f"Fehler beim Umschalten der Optimierung: {str(e)}") return jsonify({ "success": False, "error": str(e) }), 500 # ===== ÖFFENTLICHE STATISTIK-API ===== @app.route("/api/statistics/public", methods=['GET']) def api_public_statistics(): """ Öffentliche Statistiken ohne Authentifizierung. Stellt grundlegende, nicht-sensible Systemstatistiken bereit, die auf der Startseite angezeigt werden können. Returns: JSON: Öffentliche Statistiken """ try: db_session = get_db_session() # Grundlegende, nicht-sensible Statistiken total_jobs = db_session.query(Job).count() completed_jobs = db_session.query(Job).filter(Job.status == "finished").count() total_printers = db_session.query(Printer).count() active_printers = db_session.query(Printer).filter( Printer.active == True, Printer.status.in_(["online", "available", "idle"]) ).count() # Erfolgsrate berechnen success_rate = round((completed_jobs / total_jobs * 100) if total_jobs > 0 else 0, 1) # Anonymisierte Benutzerstatistiken total_users = db_session.query(User).filter(User.active == True).count() # Letzte 30 Tage Aktivität (anonymisiert) thirty_days_ago = datetime.now() - timedelta(days=30) recent_jobs = db_session.query(Job).filter( Job.created_at >= thirty_days_ago ).count() db_session.close() public_stats = { "system_info": { "total_jobs": total_jobs, "completed_jobs": completed_jobs, "success_rate": success_rate, "total_printers": total_printers, "active_printers": active_printers, "active_users": total_users, "recent_activity": recent_jobs }, "health_indicators": { "system_status": "operational", "printer_availability": round((active_printers / total_printers * 100) if total_printers > 0 else 0, 1), "last_updated": datetime.now().isoformat() }, "features": { "multi_location_support": True, "real_time_monitoring": True, "automated_scheduling": True, "advanced_reporting": True } } return jsonify(public_stats) except Exception as e: app_logger.error(f"Fehler bei öffentlichen Statistiken: {str(e)}") # Fallback-Statistiken bei Fehler return jsonify({ "system_info": { "total_jobs": 0, "completed_jobs": 0, "success_rate": 0, "total_printers": 0, "active_printers": 0, "active_users": 0, "recent_activity": 0 }, "health_indicators": { "system_status": "maintenance", "printer_availability": 0, "last_updated": datetime.now().isoformat() }, "features": { "multi_location_support": True, "real_time_monitoring": True, "automated_scheduling": True, "advanced_reporting": True }, "error": "Statistiken temporär nicht verfügbar" }), 200 # 200 statt 500 um Frontend nicht zu brechen @app.route("/api/stats", methods=['GET']) @login_required def api_stats(): """ API-Endpunkt für allgemeine Statistiken Liefert zusammengefasste Statistiken für normale Benutzer und Admins """ try: db_session = get_db_session() # Basis-Statistiken die alle Benutzer sehen können user_stats = {} if current_user.is_authenticated: # Benutzer-spezifische Statistiken user_jobs = db_session.query(Job).filter(Job.user_id == current_user.id) user_stats = { 'my_jobs': { 'total': user_jobs.count(), 'completed': user_jobs.filter(Job.status == 'completed').count(), 'failed': user_jobs.filter(Job.status == 'failed').count(), 'running': user_jobs.filter(Job.status == 'running').count(), 'queued': user_jobs.filter(Job.status == 'queued').count() }, 'my_activity': { 'jobs_today': user_jobs.filter( Job.created_at >= datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) ).count() if hasattr(Job, 'created_at') else 0, 'jobs_this_week': user_jobs.filter( Job.created_at >= datetime.now() - timedelta(days=7) ).count() if hasattr(Job, 'created_at') else 0 } } # System-weite Statistiken (für alle Benutzer) general_stats = { 'system': { 'total_printers': db_session.query(Printer).count(), 'online_printers': db_session.query(Printer).filter(Printer.status == 'online').count(), 'total_users': db_session.query(User).count(), 'jobs_today': db_session.query(Job).filter( Job.created_at >= datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) ).count() if hasattr(Job, 'created_at') else 0 } } # Admin-spezifische erweiterte Statistiken admin_stats = {} if current_user.is_admin: try: # Erweiterte Statistiken für Admins total_jobs = db_session.query(Job).count() completed_jobs = db_session.query(Job).filter(Job.status == 'completed').count() failed_jobs = db_session.query(Job).filter(Job.status == 'failed').count() # Erfolgsrate berechnen success_rate = 0 if completed_jobs + failed_jobs > 0: success_rate = round((completed_jobs / (completed_jobs + failed_jobs)) * 100, 1) admin_stats = { 'detailed_jobs': { 'total': total_jobs, 'completed': completed_jobs, 'failed': failed_jobs, 'success_rate': success_rate, 'running': db_session.query(Job).filter(Job.status == 'running').count(), 'queued': db_session.query(Job).filter(Job.status == 'queued').count() }, 'printers': { 'total': db_session.query(Printer).count(), 'online': db_session.query(Printer).filter(Printer.status == 'online').count(), 'offline': db_session.query(Printer).filter(Printer.status == 'offline').count(), 'maintenance': db_session.query(Printer).filter(Printer.status == 'maintenance').count() }, 'users': { 'total': db_session.query(User).count(), 'active_today': db_session.query(User).filter( User.last_login >= datetime.now() - timedelta(days=1) ).count() if hasattr(User, 'last_login') else 0, 'admins': db_session.query(User).filter(User.role == 'admin').count() } } # Zeitbasierte Trends (letzte 7 Tage) daily_stats = [] for i in range(7): day = datetime.now() - timedelta(days=i) day_start = day.replace(hour=0, minute=0, second=0, microsecond=0) day_end = day_start + timedelta(days=1) jobs_count = db_session.query(Job).filter( Job.created_at >= day_start, Job.created_at < day_end ).count() if hasattr(Job, 'created_at') else 0 daily_stats.append({ 'date': day.strftime('%Y-%m-%d'), 'jobs': jobs_count }) admin_stats['trends'] = { 'daily_jobs': list(reversed(daily_stats)) # Älteste zuerst } except Exception as admin_error: app_logger.warning(f"Fehler bei Admin-Statistiken: {str(admin_error)}") admin_stats = {'error': 'Admin-Statistiken nicht verfügbar'} db_session.close() # Response zusammenstellen response_data = { 'success': True, 'timestamp': datetime.now().isoformat(), 'user_stats': user_stats, 'general_stats': general_stats } # Admin-Statistiken nur für Admins hinzufügen if current_user.is_admin: response_data['admin_stats'] = admin_stats return jsonify(response_data) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Statistiken: {str(e)}") return jsonify({ 'success': False, 'error': f'Fehler beim Abrufen der Statistiken: {str(e)}' }), 500 # ===== LIVE ADMIN STATISTIKEN API ===== @app.route("/api/admin/stats/live", methods=['GET']) @login_required @admin_required def api_admin_stats_live(): """ API-Endpunkt für Live-Statistiken im Admin-Dashboard Liefert aktuelle System-Statistiken für Echtzeit-Updates """ try: db_session = get_db_session() # Basis-Statistiken sammeln stats = { 'timestamp': datetime.now().isoformat(), 'users': { 'total': db_session.query(User).count(), 'active_today': 0, 'new_this_week': 0 }, 'printers': { 'total': db_session.query(Printer).count(), 'online': db_session.query(Printer).filter(Printer.status == 'online').count(), 'offline': db_session.query(Printer).filter(Printer.status == 'offline').count(), 'maintenance': db_session.query(Printer).filter(Printer.status == 'maintenance').count() }, 'jobs': { 'total': db_session.query(Job).count(), 'running': db_session.query(Job).filter(Job.status == 'running').count(), 'queued': db_session.query(Job).filter(Job.status == 'queued').count(), 'completed_today': 0, 'failed_today': 0 } } # Benutzer-Aktivität mit robuster Datums-Behandlung try: if hasattr(User, 'last_login'): yesterday = datetime.now() - timedelta(days=1) stats['users']['active_today'] = db_session.query(User).filter( User.last_login >= yesterday ).count() if hasattr(User, 'created_at'): week_ago = datetime.now() - timedelta(days=7) stats['users']['new_this_week'] = db_session.query(User).filter( User.created_at >= week_ago ).count() except Exception as user_stats_error: app_logger.warning(f"Benutzer-Statistiken nicht verfügbar: {str(user_stats_error)}") # Job-Aktivität mit robuster Datums-Behandlung try: if hasattr(Job, 'updated_at'): today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) stats['jobs']['completed_today'] = db_session.query(Job).filter( Job.status == 'completed', Job.updated_at >= today_start ).count() stats['jobs']['failed_today'] = db_session.query(Job).filter( Job.status == 'failed', Job.updated_at >= today_start ).count() except Exception as job_stats_error: app_logger.warning(f"Job-Statistiken nicht verfügbar: {str(job_stats_error)}") # System-Performance-Metriken mit robuster psutil-Behandlung try: import psutil import os # CPU und Memory mit Fehlerbehandlung cpu_percent = psutil.cpu_percent(interval=1) memory_percent = psutil.virtual_memory().percent # Disk-Pfad sicher bestimmen disk_path = '/' if os.name != 'nt' else 'C:\\' disk_percent = psutil.disk_usage(disk_path).percent # Uptime sicher berechnen boot_time = psutil.boot_time() current_time = time.time() uptime_seconds = int(current_time - boot_time) stats['system'] = { 'cpu_percent': float(cpu_percent), 'memory_percent': float(memory_percent), 'disk_percent': float(disk_percent), 'uptime_seconds': uptime_seconds } except Exception as system_stats_error: app_logger.warning(f"System-Performance-Metriken nicht verfügbar: {str(system_stats_error)}") stats['system'] = { 'cpu_percent': 0.0, 'memory_percent': 0.0, 'disk_percent': 0.0, 'uptime_seconds': 0 } # Erfolgsrate berechnen (letzte 24 Stunden) mit robuster Behandlung try: if hasattr(Job, 'updated_at'): day_ago = datetime.now() - timedelta(days=1) completed_jobs = db_session.query(Job).filter( Job.status == 'completed', Job.updated_at >= day_ago ).count() failed_jobs = db_session.query(Job).filter( Job.status == 'failed', Job.updated_at >= day_ago ).count() total_finished = completed_jobs + failed_jobs success_rate = (float(completed_jobs) / float(total_finished) * 100) if total_finished > 0 else 100.0 stats['performance'] = { 'success_rate': round(success_rate, 1), 'completed_24h': completed_jobs, 'failed_24h': failed_jobs, 'total_finished_24h': total_finished } else: stats['performance'] = { 'success_rate': 100.0, 'completed_24h': 0, 'failed_24h': 0, 'total_finished_24h': 0 } except Exception as perf_error: app_logger.warning(f"Fehler bei Performance-Berechnung: {str(perf_error)}") stats['performance'] = { 'success_rate': 0.0, 'completed_24h': 0, 'failed_24h': 0, 'total_finished_24h': 0 } # Queue-Status (falls Queue Manager läuft) try: from utils.queue_manager import get_queue_status queue_status = get_queue_status() stats['queue'] = queue_status except Exception as queue_error: stats['queue'] = { 'status': 'unknown', 'pending_jobs': 0, 'active_workers': 0 } # Letzte Aktivitäten (Top 5) mit robuster Job-Behandlung try: recent_jobs = db_session.query(Job).order_by(Job.id.desc()).limit(5).all() stats['recent_activity'] = [] for job in recent_jobs: try: activity_item = { 'id': int(job.id), 'filename': str(getattr(job, 'filename', 'Unbekannt')), 'status': str(job.status), 'user': str(job.user.username) if job.user else 'Unbekannt', 'created_at': job.created_at.isoformat() if hasattr(job, 'created_at') and job.created_at else None } stats['recent_activity'].append(activity_item) except Exception as activity_item_error: app_logger.warning(f"Fehler bei Activity-Item: {str(activity_item_error)}") except Exception as activity_error: app_logger.warning(f"Fehler bei Recent Activity: {str(activity_error)}") stats['recent_activity'] = [] db_session.close() return jsonify({ 'success': True, 'stats': stats }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Live-Statistiken: {str(e)}") return jsonify({ 'error': 'Fehler beim Abrufen der Live-Statistiken: ' + str(e) }), 500 @app.route('/api/dashboard/refresh', methods=['POST']) @login_required def refresh_dashboard(): """ Aktualisiert Dashboard-Daten und gibt aktuelle Statistiken zurück. Dieser Endpunkt wird vom Frontend aufgerufen, um Dashboard-Statistiken zu aktualisieren ohne die gesamte Seite neu zu laden. Returns: JSON: Erfolgs-Status und aktuelle Dashboard-Statistiken """ try: app_logger.info(f"Dashboard-Refresh angefordert von User {current_user.id}") db_session = get_db_session() # Aktuelle Statistiken abrufen try: stats = { 'active_jobs': db_session.query(Job).filter(Job.status == 'running').count(), 'available_printers': db_session.query(Printer).filter(Printer.active == True).count(), 'total_jobs': db_session.query(Job).count(), 'pending_jobs': db_session.query(Job).filter(Job.status == 'queued').count() } # Erfolgsrate berechnen total_jobs = stats['total_jobs'] if total_jobs > 0: completed_jobs = db_session.query(Job).filter(Job.status == 'completed').count() stats['success_rate'] = round((completed_jobs / total_jobs) * 100, 1) else: stats['success_rate'] = 0 # Zusätzliche Statistiken für umfassendere Dashboard-Aktualisierung stats['completed_jobs'] = db_session.query(Job).filter(Job.status == 'completed').count() stats['failed_jobs'] = db_session.query(Job).filter(Job.status == 'failed').count() stats['cancelled_jobs'] = db_session.query(Job).filter(Job.status == 'cancelled').count() stats['total_users'] = db_session.query(User).filter(User.active == True).count() # Drucker-Status-Details stats['online_printers'] = db_session.query(Printer).filter( Printer.active == True, Printer.status == 'online' ).count() stats['offline_printers'] = db_session.query(Printer).filter( Printer.active == True, Printer.status != 'online' ).count() except Exception as stats_error: app_logger.error(f"Fehler beim Abrufen der Dashboard-Statistiken: {str(stats_error)}") # Fallback mit Basis-Statistiken stats = { 'active_jobs': 0, 'available_printers': 0, 'total_jobs': 0, 'pending_jobs': 0, 'success_rate': 0, 'completed_jobs': 0, 'failed_jobs': 0, 'cancelled_jobs': 0, 'total_users': 0, 'online_printers': 0, 'offline_printers': 0 } db_session.close() app_logger.info(f"Dashboard-Refresh erfolgreich: {stats}") return jsonify({ 'success': True, 'stats': stats, 'timestamp': datetime.now().isoformat(), 'message': 'Dashboard-Daten erfolgreich aktualisiert' }) except Exception as e: app_logger.error(f"Fehler beim Dashboard-Refresh: {str(e)}", exc_info=True) return jsonify({ 'success': False, 'error': 'Fehler beim Aktualisieren der Dashboard-Daten', 'details': str(e) if app.debug else None }), 500 # ===== STECKDOSEN-MONITORING API-ROUTEN ===== @app.route("/api/admin/plug-schedules/logs", methods=['GET']) @login_required @admin_required def api_admin_plug_schedules_logs(): """ API-Endpoint für Steckdosenschaltzeiten-Logs. Unterstützt Filterung nach Drucker, Zeitraum und Status. """ try: # Parameter aus Request printer_id = request.args.get('printer_id', type=int) hours = request.args.get('hours', default=24, type=int) status_filter = request.args.get('status') page = request.args.get('page', default=1, type=int) per_page = request.args.get('per_page', default=100, type=int) # Maximale Grenzen setzen hours = min(hours, 168) # Maximal 7 Tage per_page = min(per_page, 1000) # Maximal 1000 Einträge pro Seite db_session = get_db_session() try: # Basis-Query cutoff_time = datetime.now() - timedelta(hours=hours) query = db_session.query(PlugStatusLog)\ .filter(PlugStatusLog.timestamp >= cutoff_time)\ .join(Printer) # Drucker-Filter if printer_id: query = query.filter(PlugStatusLog.printer_id == printer_id) # Status-Filter if status_filter: query = query.filter(PlugStatusLog.status == status_filter) # Gesamtanzahl für Paginierung total = query.count() # Sortierung und Paginierung logs = query.order_by(PlugStatusLog.timestamp.desc())\ .offset((page - 1) * per_page)\ .limit(per_page)\ .all() # Daten serialisieren log_data = [] for log in logs: log_dict = log.to_dict() # Zusätzliche berechnete Felder log_dict['timestamp_relative'] = get_relative_time(log.timestamp) log_dict['status_icon'] = get_status_icon(log.status) log_dict['status_color'] = get_status_color(log.status) log_data.append(log_dict) # Paginierungs-Metadaten has_next = (page * per_page) < total has_prev = page > 1 return jsonify({ "success": True, "logs": log_data, "pagination": { "page": page, "per_page": per_page, "total": total, "total_pages": (total + per_page - 1) // per_page, "has_next": has_next, "has_prev": has_prev }, "filters": { "printer_id": printer_id, "hours": hours, "status": status_filter }, "generated_at": datetime.now().isoformat() }) finally: db_session.close() except Exception as e: app_logger.error(f"Fehler beim Abrufen der Steckdosen-Logs: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Laden der Steckdosen-Logs", "details": str(e) if current_user.is_admin else None }), 500 @app.route("/api/admin/plug-schedules/statistics", methods=['GET']) @login_required @admin_required def api_admin_plug_schedules_statistics(): """ API-Endpoint für Steckdosenschaltzeiten-Statistiken. """ try: hours = request.args.get('hours', default=24, type=int) hours = min(hours, 168) # Maximal 7 Tage # Statistiken abrufen stats = PlugStatusLog.get_status_statistics(hours=hours) # Drucker-Namen für die Top-Liste hinzufügen if stats.get('top_printers'): db_session = get_db_session() try: printer_ids = list(stats['top_printers'].keys()) printers = db_session.query(Printer.id, Printer.name)\ .filter(Printer.id.in_(printer_ids))\ .all() printer_names = {p.id: p.name for p in printers} # Top-Drucker mit Namen anreichern top_printers_with_names = [] for printer_id, count in stats['top_printers'].items(): top_printers_with_names.append({ "printer_id": printer_id, "printer_name": printer_names.get(printer_id, f"Drucker {printer_id}"), "log_count": count }) stats['top_printers_detailed'] = top_printers_with_names finally: db_session.close() return jsonify({ "success": True, "statistics": stats }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der Steckdosen-Statistiken: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Laden der Statistiken", "details": str(e) if current_user.is_admin else None }), 500 @app.route("/api/admin/plug-schedules/cleanup", methods=['POST']) @login_required @admin_required def api_admin_plug_schedules_cleanup(): """ API-Endpoint zum Bereinigen alter Steckdosenschaltzeiten-Logs. """ try: data = request.get_json() or {} days = data.get('days', 30) days = max(1, min(days, 365)) # Zwischen 1 und 365 Tagen # Bereinigung durchführen deleted_count = PlugStatusLog.cleanup_old_logs(days=days) # Erfolg loggen SystemLog.log_system_event( level="INFO", message=f"Steckdosen-Logs bereinigt: {deleted_count} Einträge gelöscht (älter als {days} Tage)", module="admin_plug_schedules", user_id=current_user.id ) app_logger.info(f"Admin {current_user.name} berinigte {deleted_count} Steckdosen-Logs (älter als {days} Tage)") return jsonify({ "success": True, "deleted_count": deleted_count, "days": days, "message": f"Erfolgreich {deleted_count} alte Einträge gelöscht" }) except Exception as e: app_logger.error(f"Fehler beim Bereinigen der Steckdosen-Logs: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Bereinigen der Logs", "details": str(e) if current_user.is_admin else None }), 500 @app.route("/api/admin/plug-schedules/calendar", methods=['GET']) @login_required @admin_required def api_admin_plug_schedules_calendar(): """ API-Endpoint für Kalender-Daten der Steckdosenschaltzeiten. Liefert Events für FullCalendar im JSON-Format. """ try: # Parameter aus Request start_date = request.args.get('start') end_date = request.args.get('end') printer_id = request.args.get('printer_id', type=int) if not start_date or not end_date: return jsonify([]) # Leere Events bei fehlenden Daten # Datum-Strings zu datetime konvertieren start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) db_session = get_db_session() try: # Query für Logs im Zeitraum query = db_session.query(PlugStatusLog)\ .filter(PlugStatusLog.timestamp >= start_dt)\ .filter(PlugStatusLog.timestamp <= end_dt)\ .join(Printer) # Drucker-Filter if printer_id: query = query.filter(PlugStatusLog.printer_id == printer_id) # Logs abrufen und nach Drucker gruppieren logs = query.order_by(PlugStatusLog.timestamp.asc()).all() # Events für FullCalendar formatieren events = [] for log in logs: # Farbe und Titel basierend auf Status if log.status == 'on': color = '#10b981' # Grün title = f"🟢 {log.printer.name}: EIN" elif log.status == 'off': color = '#f59e0b' # Orange title = f"🔴 {log.printer.name}: AUS" elif log.status == 'connected': color = '#3b82f6' # Blau title = f"🔌 {log.printer.name}: Verbunden" elif log.status == 'disconnected': color = '#ef4444' # Rot title = f"❌ {log.printer.name}: Getrennt" else: color = '#6b7280' # Grau title = f"❓ {log.printer.name}: {log.status}" # Event-Objekt für FullCalendar event = { 'id': f"plug_{log.id}", 'title': title, 'start': log.timestamp.isoformat(), 'backgroundColor': color, 'borderColor': color, 'textColor': '#ffffff', 'allDay': False, 'extendedProps': { 'printer_id': log.printer_id, 'printer_name': log.printer.name, 'status': log.status, 'source': log.source, 'user_id': log.user_id, 'user_name': log.user.name if log.user else None, 'notes': log.notes, 'response_time_ms': log.response_time_ms, 'error_message': log.error_message, 'power_consumption': log.power_consumption, 'voltage': log.voltage, 'current': log.current } } events.append(event) return jsonify(events) finally: db_session.close() except Exception as e: app_logger.error(f"Fehler beim Abrufen der Kalender-Daten: {str(e)}") return jsonify([]), 500 def get_relative_time(timestamp): """ Hilfsfunktion für relative Zeitangaben. """ if not timestamp: return "Unbekannt" now = datetime.now() diff = now - timestamp if diff.total_seconds() < 60: return "Gerade eben" elif diff.total_seconds() < 3600: minutes = int(diff.total_seconds() / 60) return f"vor {minutes} Minute{'n' if minutes != 1 else ''}" elif diff.total_seconds() < 86400: hours = int(diff.total_seconds() / 3600) return f"vor {hours} Stunde{'n' if hours != 1 else ''}" else: days = int(diff.total_seconds() / 86400) return f"vor {days} Tag{'en' if days != 1 else ''}" def get_status_icon(status): """ Hilfsfunktion für Status-Icons. """ icons = { 'connected': '🔌', 'disconnected': '❌', 'on': '🟢', 'off': '🔴' } return icons.get(status, '❓') def get_status_color(status): """ Hilfsfunktion für Status-Farben (CSS-Klassen). """ colors = { 'connected': 'text-blue-600', 'disconnected': 'text-red-600', 'on': 'text-green-600', 'off': 'text-orange-600' } return colors.get(status, 'text-gray-600') # ===== STARTUP UND MAIN ===== if __name__ == "__main__": """ Start-Modi: ----------- python app.py # Normal (Production Server auf 127.0.0.1:5000) python app.py --debug # Debug-Modus (Flask Dev Server) python app.py --optimized # Kiosk-Modus (Production Server + Optimierungen) python app.py --kiosk # Alias für --optimized python app.py --production # Force Production Server auch im Debug Kiosk-Fix: - Verwendet Waitress statt Flask Dev Server (keine "unreachable" mehr) - Bindet nur auf IPv4 (127.0.0.1) statt IPv6 (behebt Timeout-Probleme) - Automatische Bereinigung hängender Prozesse - Performance-Optimierungen aktiviert """ import sys import signal import os # Start-Modus prüfen debug_mode = len(sys.argv) > 1 and sys.argv[1] == "--debug" kiosk_mode = "--optimized" in sys.argv or "--kiosk" in sys.argv or os.getenv('KIOSK_MODE', '').lower() == 'true' # Bei Kiosk/Optimized Modus automatisch Production-Server verwenden if kiosk_mode: os.environ['FORCE_OPTIMIZED_MODE'] = 'true' os.environ['USE_OPTIMIZED_CONFIG'] = 'true' app_logger.info("🖥️ KIOSK-MODUS ERKANNT - aktiviere Optimierungen") # Windows-spezifische Umgebungsvariablen setzen für bessere Flask-Kompatibilität if os.name == 'nt' and debug_mode: # Entferne problematische Werkzeug-Variablen os.environ.pop('WERKZEUG_SERVER_FD', None) os.environ.pop('WERKZEUG_RUN_MAIN', None) # Setze saubere Umgebung os.environ['FLASK_ENV'] = 'development' os.environ['PYTHONIOENCODING'] = 'utf-8' os.environ['PYTHONUTF8'] = '1' # ===== INITIALISIERE ZENTRALEN SHUTDOWN-MANAGER ===== try: from utils.shutdown_manager import get_shutdown_manager shutdown_manager = get_shutdown_manager(timeout=45) # 45 Sekunden Gesamt-Timeout app_logger.info("✅ Zentraler Shutdown-Manager initialisiert") except ImportError as e: app_logger.error(f"❌ Shutdown-Manager konnte nicht geladen werden: {e}") # Fallback auf die alte Methode shutdown_manager = None # ===== INITIALISIERE FEHLERRESILIENZ-SYSTEME ===== try: from utils.error_recovery import start_error_monitoring, stop_error_monitoring from utils.system_control import get_system_control_manager # Error-Recovery-Monitoring starten start_error_monitoring() app_logger.info("✅ Error-Recovery-Monitoring gestartet") # System-Control-Manager initialisieren system_control_manager = get_system_control_manager() app_logger.info("✅ System-Control-Manager initialisiert") # Integriere in Shutdown-Manager if shutdown_manager: shutdown_manager.register_cleanup_function( func=stop_error_monitoring, name="Error Recovery Monitoring", priority=2, timeout=10 ) except Exception as e: app_logger.error(f"❌ Fehlerresilienz-Systeme konnten nicht initialisiert werden: {e}") # ===== KIOSK-SERVICE-OPTIMIERUNG ===== try: # Stelle sicher, dass der Kiosk-Service korrekt konfiguriert ist kiosk_service_exists = os.path.exists('/etc/systemd/system/myp-kiosk.service') if not kiosk_service_exists: app_logger.warning("⚠️ Kiosk-Service nicht gefunden - Kiosk-Funktionen eventuell eingeschränkt") else: app_logger.info("✅ Kiosk-Service-Konfiguration gefunden") except Exception as e: app_logger.error(f"❌ Kiosk-Service-Check fehlgeschlagen: {e}") # Windows-spezifisches Signal-Handling als Fallback def fallback_signal_handler(sig, frame): """Fallback Signal-Handler für ordnungsgemäßes Shutdown.""" app_logger.warning(f"🛑 Signal {sig} empfangen - fahre System herunter (Fallback)...") try: # Queue Manager stoppen stop_queue_manager() # Scheduler stoppen falls aktiviert if SCHEDULER_ENABLED and scheduler: try: if hasattr(scheduler, 'shutdown'): scheduler.shutdown(wait=True) else: scheduler.stop() except Exception as e: app_logger.error(f"Fehler beim Stoppen des Schedulers: {str(e)}") app_logger.info("✅ Fallback-Shutdown abgeschlossen") sys.exit(0) except Exception as e: app_logger.error(f"❌ Fehler beim Fallback-Shutdown: {str(e)}") sys.exit(1) # Signal-Handler registrieren (Windows-kompatibel) if os.name == 'nt': # Windows signal.signal(signal.SIGINT, fallback_signal_handler) signal.signal(signal.SIGTERM, fallback_signal_handler) signal.signal(signal.SIGBREAK, fallback_signal_handler) else: # Unix/Linux signal.signal(signal.SIGINT, fallback_signal_handler) signal.signal(signal.SIGTERM, fallback_signal_handler) signal.signal(signal.SIGHUP, fallback_signal_handler) try: # Datenbank initialisieren und Migrationen durchführen setup_database_with_migrations() # Template-Hilfsfunktionen registrieren register_template_helpers(app) # Optimierungsstatus beim Start anzeigen if USE_OPTIMIZED_CONFIG: app_logger.info("🚀 === OPTIMIERTE KONFIGURATION AKTIV ===") app_logger.info(f"📊 Hardware erkannt: Raspberry Pi={detect_raspberry_pi()}") app_logger.info(f"⚙️ Erzwungen: {os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes']}") app_logger.info(f"🔧 CLI-Parameter: {'--optimized' in sys.argv}") app_logger.info("🔧 Aktive Optimierungen:") app_logger.info(f" - Minifizierte Assets: {app.jinja_env.globals.get('use_minified_assets', False)}") app_logger.info(f" - Animationen deaktiviert: {app.jinja_env.globals.get('disable_animations', False)}") app_logger.info(f" - Glassmorphism begrenzt: {app.jinja_env.globals.get('limit_glassmorphism', False)}") app_logger.info(f" - Template-Caching: {not app.config.get('TEMPLATES_AUTO_RELOAD', True)}") app_logger.info(f" - Static Cache: {app.config.get('SEND_FILE_MAX_AGE_DEFAULT', 0) / 3600:.1f}h") app_logger.info("🚀 ========================================") else: app_logger.info("📋 Standard-Konfiguration aktiv (keine Optimierungen)") # Drucker-Monitor Steckdosen-Initialisierung beim Start try: app_logger.info("🖨️ Starte automatische Steckdosen-Initialisierung...") initialization_results = printer_monitor.initialize_all_outlets_on_startup() if initialization_results: success_count = sum(1 for success in initialization_results.values() if success) total_count = len(initialization_results) app_logger.info(f"✅ Steckdosen-Initialisierung: {success_count}/{total_count} Drucker erfolgreich") if success_count < total_count: app_logger.warning(f"⚠️ {total_count - success_count} Drucker konnten nicht initialisiert werden") else: app_logger.info("ℹ️ Keine Drucker zur Initialisierung gefunden") except Exception as e: app_logger.error(f"❌ Fehler bei automatischer Steckdosen-Initialisierung: {str(e)}") # ===== SHUTDOWN-MANAGER KONFIGURATION ===== if shutdown_manager: # Queue Manager beim Shutdown-Manager registrieren try: import utils.queue_manager as queue_module shutdown_manager.register_queue_manager(queue_module) app_logger.debug("✅ Queue Manager beim Shutdown-Manager registriert") except Exception as e: app_logger.warning(f"⚠️ Queue Manager Registrierung fehlgeschlagen: {e}") # Scheduler beim Shutdown-Manager registrieren shutdown_manager.register_scheduler(scheduler, SCHEDULER_ENABLED) # Datenbank-Cleanup beim Shutdown-Manager registrieren shutdown_manager.register_database_cleanup() # Windows Thread Manager beim Shutdown-Manager registrieren shutdown_manager.register_windows_thread_manager() # Queue-Manager für automatische Drucker-Überwachung starten # Nur im Produktionsmodus starten (nicht im Debug-Modus) if not debug_mode: try: queue_manager = start_queue_manager() app_logger.info("✅ Printer Queue Manager erfolgreich gestartet") except Exception as e: app_logger.error(f"❌ Fehler beim Starten des Queue-Managers: {str(e)}") else: app_logger.info("🔄 Debug-Modus: Queue Manager deaktiviert für Entwicklung") # Scheduler starten (falls aktiviert) if SCHEDULER_ENABLED: try: scheduler.start() app_logger.info("Job-Scheduler gestartet") except Exception as e: app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}") # ===== KIOSK-OPTIMIERTER SERVER-START ===== # Verwende Waitress für Produktion (behebt "unreachable" und Performance-Probleme) use_production_server = not debug_mode or "--production" in sys.argv # Kill hängende Prozesse auf Port 5000 (Windows-Fix) if os.name == 'nt' and use_production_server: try: app_logger.info("🔄 Bereinige hängende Prozesse auf Port 5000...") import subprocess result = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, shell=True) hanging_pids = set() for line in result.stdout.split('\n'): if ":5000" in line and ("WARTEND" in line or "ESTABLISHED" in line): parts = line.split() if len(parts) >= 5 and parts[-1].isdigit(): pid = int(parts[-1]) if pid != 0: hanging_pids.add(pid) for pid in hanging_pids: try: subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, shell=True) app_logger.info(f"✅ Prozess {pid} beendet") except: pass if hanging_pids: time.sleep(2) # Kurz warten nach Cleanup except Exception as e: app_logger.warning(f"⚠️ Prozess-Cleanup fehlgeschlagen: {e}") if debug_mode and "--production" not in sys.argv: # Debug-Modus: Flask Development Server app_logger.info("🔧 Starte Debug-Server auf 0.0.0.0:5000 (HTTP)") run_kwargs = { "host": "0.0.0.0", "port": 5000, "debug": True, "threaded": True } if os.name == 'nt': run_kwargs["use_reloader"] = False run_kwargs["passthrough_errors"] = False app_logger.info("Windows-Debug-Modus: Auto-Reload deaktiviert") app.run(**run_kwargs) else: # Produktions-Modus: Verwende Waitress WSGI Server try: from waitress import serve # IPv4-only für bessere Kompatibilität (behebt IPv6-Probleme) host = "127.0.0.1" # Nur IPv4! port = 5000 app_logger.info(f"🚀 Starte Production Server (Waitress) auf {host}:{port}") app_logger.info("💡 Kiosk-Browser sollte http://127.0.0.1:5000 verwenden") app_logger.info("✅ IPv6-Probleme behoben durch IPv4-only Binding") app_logger.info("✅ Performance optimiert für Kiosk-Betrieb") # Waitress-Konfiguration für optimale Performance serve( app, host=host, port=port, threads=6, # Multi-threading für bessere Performance connection_limit=200, cleanup_interval=30, channel_timeout=120, log_untrusted_proxy_headers=False, clear_untrusted_proxy_headers=True, max_request_header_size=8192, max_request_body_size=104857600, # 100MB expose_tracebacks=False, ident="MYP-Kiosk-Server" ) except ImportError: # Fallback auf Flask wenn Waitress nicht verfügbar app_logger.warning("⚠️ Waitress nicht installiert - verwende Flask-Server") app_logger.warning("💡 Installiere mit: pip install waitress") ssl_context = get_ssl_context() if ssl_context: app_logger.info("Starte HTTPS-Server auf 0.0.0.0:443") app.run( host="0.0.0.0", port=443, debug=False, ssl_context=ssl_context, threaded=True ) else: app_logger.info("Starte HTTP-Server auf 0.0.0.0:80") app.run( host="0.0.0.0", port=80, debug=False, threaded=True ) except KeyboardInterrupt: app_logger.info("🔄 Tastatur-Unterbrechung empfangen - beende Anwendung...") if shutdown_manager: shutdown_manager.shutdown() else: fallback_signal_handler(signal.SIGINT, None) except Exception as e: app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}") # Cleanup bei Fehler if shutdown_manager: shutdown_manager.force_shutdown(1) else: try: stop_queue_manager() except: pass sys.exit(1)