# -*- coding: utf-8 -*- """ Hauptanwendung für das 3D-Druck-Management-System Diese Datei initialisiert die Flask-Anwendung und registriert alle Blueprints. Die eigentlichen Routen sind in den jeweiligen Blueprint-Modulen definiert. """ import os import sys import logging import atexit import signal import pickle import hashlib from datetime import datetime, timedelta from flask import Flask, render_template, request, jsonify, redirect, url_for, session, abort, send_from_directory, flash from flask_login import LoginManager, current_user, logout_user, login_required from flask_wtf import CSRFProtect from flask_wtf.csrf import CSRFError from sqlalchemy import event import threading # ===== MINIMALE SESSION-DATENKLASSE ===== class MinimalSessionInterface: """Minimale Session-Implementierung zur Reduzierung der Cookie-Größe""" @staticmethod def reduce_session_data(): """Reduziert Session-Daten auf absolutes Minimum""" from flask import session # Nur kritische Daten behalten essential_keys = ['_user_id', '_id', '_fresh', 'csrf_token'] # Alle nicht-essentiellen Keys entfernen keys_to_remove = [] for key in session.keys(): if key not in essential_keys: keys_to_remove.append(key) for key in keys_to_remove: session.pop(key, None) @staticmethod def get_minimal_session_data(): """Gibt nur minimale Session-Daten zurück""" from flask import session return { 'user_id': session.get('_user_id'), 'session_id': session.get('_id'), 'is_fresh': session.get('_fresh', False) } # Globale Session-Interface-Instanz minimal_session = MinimalSessionInterface() # ===== SESSION-OPTIMIERUNG ===== class SessionManager: """Optimierter Session-Manager für große Session-Daten""" def __init__(self, app=None): self.app = app self.session_storage_path = None def init_app(self, app): """Initialisiert den Session-Manager für die Flask-App""" self.app = app self.session_storage_path = os.path.join( app.instance_path, 'sessions' ) os.makedirs(self.session_storage_path, exist_ok=True) def store_large_session_data(self, key, data): """Speichert große Session-Daten im Dateisystem""" if not self.session_storage_path: return False try: from flask import session, request session_id = session.get('session_id') if not session_id: timestamp = datetime.now().isoformat() remote_addr = request.remote_addr session_string = remote_addr + "_" + timestamp session_id = hashlib.md5(session_string.encode()).hexdigest() session['session_id'] = session_id file_path = os.path.join( self.session_storage_path, session_id + "_" + key + ".pkl" ) with open(file_path, 'wb') as f: pickle.dump(data, f) # Nur Referenz in Session speichern session[key + "_ref"] = True return True except Exception as e: logging.error("Fehler beim Speichern der Session-Daten: " + str(e)) return False def load_large_session_data(self, key): """Lädt große Session-Daten aus dem Dateisystem""" if not self.session_storage_path: return None try: session_id = session.get('session_id') if not session_id or not session.get(key + "_ref"): return None file_path = os.path.join( self.session_storage_path, session_id + "_" + key + ".pkl" ) if not os.path.exists(file_path): return None with open(file_path, 'rb') as f: return pickle.load(f) except Exception as e: logging.error("Fehler beim Laden der Session-Daten: " + str(e)) return None def cleanup_expired_sessions(self): """Bereinigt abgelaufene Session-Dateien""" if not self.session_storage_path: return try: current_time = datetime.now() for filename in os.listdir(self.session_storage_path): file_path = os.path.join(self.session_storage_path, filename) file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) # Lösche Dateien älter als 24 Stunden if current_time - file_time > timedelta(hours=24): os.remove(file_path) except Exception as e: logging.error("Fehler bei Session-Cleanup: " + str(e)) # Globaler Session-Manager session_manager = SessionManager() # ===== PRODUCTION-KONFIGURATION ===== class ProductionConfig: """Production-Konfiguration für Mercedes-Benz TBA Marienfelde Air-Gapped Environment Enthält alle Performance-Optimierungen, die vorher in OptimizedConfig waren, plus Production-spezifische Sicherheits- und Compliance-Einstellungen. """ # Umgebung ENV = 'production' DEBUG = False TESTING = False # Performance-Optimierungen (ehemals OptimizedConfig) OPTIMIZED_MODE = True USE_MINIFIED_ASSETS = True DISABLE_ANIMATIONS = True LIMIT_GLASSMORPHISM = True # Sicherheit (SECRET_KEY wird später gesetzt) WTF_CSRF_ENABLED = True WTF_CSRF_TIME_LIMIT = 3600 # 1 Stunde # Session-Sicherheit SESSION_COOKIE_SECURE = True # HTTPS erforderlich SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SAMESITE = 'Strict' # PERMANENT_SESSION_LIFETIME wird später gesetzt # Performance-Optimierungen SEND_FILE_MAX_AGE_DEFAULT = 31536000 # 1 Jahr Cache für statische Dateien TEMPLATES_AUTO_RELOAD = False EXPLAIN_TEMPLATE_LOADING = False # Upload-Beschränkungen MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB für Production # JSON-Optimierungen JSON_SORT_KEYS = False JSONIFY_PRETTYPRINT_REGULAR = False JSONIFY_MIMETYPE = 'application/json' # Logging-Level LOG_LEVEL = 'INFO' # Air-Gapped Einstellungen OFFLINE_MODE = True DISABLE_EXTERNAL_APIS = True USE_LOCAL_ASSETS_ONLY = True # Datenbank-Performance SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_POOL_RECYCLE = 3600 SQLALCHEMY_POOL_TIMEOUT = 20 SQLALCHEMY_ENGINE_OPTIONS = { 'pool_pre_ping': True, 'pool_recycle': 3600, 'echo': False } # Security Headers SECURITY_HEADERS = { 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'DENY', 'X-XSS-Protection': '1; mode=block', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Content-Security-Policy': "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self';" } # Mercedes-Benz Corporate Compliance COMPANY_NAME = "Mercedes-Benz TBA Marienfelde" ENVIRONMENT_NAME = "Production Air-Gapped" COMPLIANCE_MODE = True AUDIT_LOGGING = True # Monitoring ENABLE_METRICS = True ENABLE_HEALTH_CHECKS = True ENABLE_PERFORMANCE_MONITORING = True # ===== DEVELOPMENT-KONFIGURATION ===== class DevelopmentConfig: """Development-Konfiguration für lokale Entwicklung Konsolidiert alle Nicht-Production-Modi (development, default, fallback). Optimiert für Entwicklerfreundlichkeit und Debugging. """ # Umgebung ENV = 'development' DEBUG = True TESTING = False # Performance (moderat optimiert für bessere Entwicklererfahrung) OPTIMIZED_MODE = False USE_MINIFIED_ASSETS = False DISABLE_ANIMATIONS = False LIMIT_GLASSMORPHISM = False # Sicherheit (relaxed für Development) WTF_CSRF_ENABLED = True WTF_CSRF_TIME_LIMIT = 7200 # 2 Stunden für längere Dev-Sessions # Session-Sicherheit (relaxed) SESSION_COOKIE_SECURE = False # HTTP OK für Development SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SAMESITE = 'Lax' # Performance (Developer-freundlich) SEND_FILE_MAX_AGE_DEFAULT = 1 # Keine Cache für Development TEMPLATES_AUTO_RELOAD = True EXPLAIN_TEMPLATE_LOADING = True # Upload-Beschränkungen (generous für Testing) MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB für Development # JSON (Pretty für Debugging) JSON_SORT_KEYS = True JSONIFY_PRETTYPRINT_REGULAR = True JSONIFY_MIMETYPE = 'application/json' # Logging-Level LOG_LEVEL = 'DEBUG' # Entwicklungs-Einstellungen OFFLINE_MODE = False DISABLE_EXTERNAL_APIS = False USE_LOCAL_ASSETS_ONLY = False # Datenbank (Developer-freundlich) SQLALCHEMY_TRACK_MODIFICATIONS = True # Für Debugging SQLALCHEMY_POOL_RECYCLE = 1800 # 30 Minuten SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_ENGINE_OPTIONS = { 'pool_pre_ping': True, 'pool_recycle': 1800, 'echo': True # SQL-Logging für Development } # Development-spezifische Einstellungen COMPANY_NAME = "MYP Development Environment" ENVIRONMENT_NAME = "Development/Testing" COMPLIANCE_MODE = False AUDIT_LOGGING = False # Monitoring (minimal für Development) ENABLE_METRICS = False ENABLE_HEALTH_CHECKS = False ENABLE_PERFORMANCE_MONITORING = False def detect_raspberry_pi(): """Erkennt ob das System auf einem Raspberry Pi läuft""" try: with open('/proc/cpuinfo', 'r') as f: cpuinfo = f.read() if 'Raspberry Pi' in cpuinfo or 'BCM' in cpuinfo: return True except: pass try: import platform machine = platform.machine().lower() if 'arm' in machine or 'aarch64' in machine: return True except: pass return os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes'] def detect_production_environment(): """Erkennt ob das System in der Production-Umgebung läuft""" # Command-line Argument if '--production' in sys.argv: return True # Umgebungsvariable env = os.getenv('FLASK_ENV', '').lower() if env in ['production', 'prod']: return True # Spezifische Production-Variablen if os.getenv('USE_PRODUCTION_CONFIG', '').lower() in ['true', '1', 'yes']: return True # Mercedes-Benz spezifische Erkennung if os.getenv('MERCEDES_ENVIRONMENT', '').lower() == 'production': return True # Air-Gapped Environment Detection if os.getenv('AIR_GAPPED_MODE', '').lower() in ['true', '1', 'yes']: return True # Hostname-basierte Erkennung try: import socket hostname = socket.gethostname().lower() if any(keyword in hostname for keyword in ['prod', 'production', 'live', 'mercedes', 'tba']): return True except: pass # Automatische Production-Erkennung für Raspberry Pi oder Low-Memory-Systeme if detect_raspberry_pi(): return True try: import psutil memory_gb = psutil.virtual_memory().total / (1024**3) if memory_gb < 2.0: # Unter 2GB RAM = wahrscheinlich Production-Umgebung return True except: pass return False def get_environment_type(): """Bestimmt den Umgebungstyp - nur noch production oder development""" if detect_production_environment(): return 'production' else: return 'development' # ===== GLOBALE KONFIGURATIONSVARIABLEN ===== # Diese werden später nach den Funktionsdefinitionen gesetzt # Windows-spezifische Fixes if os.name == 'nt': try: from utils.core_system import get_windows_thread_manager print("[OK] Windows-Fixes (sichere Version) geladen") except ImportError as e: get_windows_thread_manager = None print("[WARN] Windows-Fixes nicht verfügbar: ") else: get_windows_thread_manager = None # Lokale Imports from models import init_database, create_initial_admin, User, get_db_session from utils.logging_config import setup_logging, get_logger, log_startup_info from utils.job_scheduler import JobScheduler, get_job_scheduler from utils.job_queue_system import queue_manager, start_queue_manager, stop_queue_manager from utils.utilities_collection import SECRET_KEY, SESSION_LIFETIME # Blueprints importieren from blueprints.auth import auth_blueprint from blueprints.admin_unified import admin_blueprint, admin_api_blueprint from blueprints.guest import guest_blueprint from blueprints.calendar import calendar_blueprint from blueprints.user_management import users_blueprint # Konsolidierte User-Verwaltung from blueprints.printers import printers_blueprint from blueprints.jobs import jobs_blueprint from blueprints.kiosk import kiosk_blueprint from blueprints.uploads import uploads_blueprint from blueprints.sessions import sessions_blueprint from blueprints.tapo_control import tapo_blueprint # Tapo-Steckdosen-Steuerung from blueprints.api import api_blueprint # API-Endpunkte mit Session-Management from blueprints.legal_pages import legal_bp # Rechtliche Seiten (Impressum, Datenschutz, etc.) # Import der Sicherheits- und Hilfssysteme from utils.security_suite import init_security # Logging initialisieren setup_logging() log_startup_info() # Logger für verschiedene Komponenten app_logger = get_logger("app") # ===== FLASK-APP INITIALISIERUNG ===== app = Flask(__name__) # Konfiguration anwenden basierend auf Environment wird später gemacht # (nach Definition der apply_*_config Funktionen) # Session-Manager initialisieren session_manager.init_app(app) # Login-Manager initialisieren login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'auth.login' login_manager.login_message = 'Bitte melden Sie sich an, um auf diese Seite zuzugreifen.' login_manager.login_message_category = 'info' # CSRF-Schutz initialisieren csrf = CSRFProtect(app) # Thread-sichere Caches _user_cache = {} _user_cache_lock = threading.RLock() _printer_status_cache = {} _printer_status_cache_lock = threading.RLock() # Cache-Konfiguration USER_CACHE_TTL = 300 # 5 Minuten PRINTER_STATUS_CACHE_TTL = 30 # 30 Sekunden def clear_user_cache(user_id=None): """Löscht User-Cache""" with _user_cache_lock: if user_id: _user_cache.pop(user_id, None) else: _user_cache.clear() def clear_printer_status_cache(): """Löscht Drucker-Status-Cache""" with _printer_status_cache_lock: _printer_status_cache.clear() # ===== AGGRESSIVE SHUTDOWN HANDLER ===== def aggressive_shutdown_handler(sig, frame): """Aggressiver Signal-Handler für sofortiges Herunterfahren bei Strg+C""" print("\n[ALERT] STRG+C ERKANNT - SOFORTIGES SHUTDOWN!") try: # Caches leeren clear_user_cache() clear_printer_status_cache() # Queue Manager stoppen try: stop_queue_manager() print("[OK] Queue Manager gestoppt") except Exception as e: print("[WARN] Queue Manager Stop fehlgeschlagen: ") # Datenbank-Cleanup try: from models import _engine, _scoped_session if _scoped_session: _scoped_session.remove() if _engine: _engine.dispose() print("[OK] Datenbank geschlossen") except Exception as e: print("[WARN] Datenbank-Cleanup fehlgeschlagen: ") except Exception as e: print("[ERROR] Fehler beim Cleanup: ") print("[STOP] SOFORTIGES PROGRAMM-ENDE") os._exit(0) def register_aggressive_shutdown(): """Registriert den aggressiven Shutdown-Handler""" signal.signal(signal.SIGINT, aggressive_shutdown_handler) signal.signal(signal.SIGTERM, aggressive_shutdown_handler) if os.name == 'nt': try: signal.signal(signal.SIGBREAK, aggressive_shutdown_handler) except AttributeError: pass else: try: signal.signal(signal.SIGHUP, aggressive_shutdown_handler) except AttributeError: pass atexit.register(lambda: print("[RESTART] Atexit-Handler ausgeführt")) print("[ALERT] AGGRESSIVER STRG+C SHUTDOWN-HANDLER AKTIVIERT") # Shutdown-Handler registrieren register_aggressive_shutdown() def apply_production_config(app): """Wendet die Production-Konfiguration auf die Flask-App an""" app_logger.info("[PRODUCTION] Aktiviere Production-Konfiguration für Mercedes-Benz TBA") # Dynamische Werte setzen from utils.utilities_collection import SECRET_KEY, SESSION_LIFETIME ProductionConfig.SECRET_KEY = os.environ.get('SECRET_KEY') or SECRET_KEY ProductionConfig.PERMANENT_SESSION_LIFETIME = SESSION_LIFETIME # Flask-Basis-Konfiguration app.config.update({ "ENV": ProductionConfig.ENV, "DEBUG": ProductionConfig.DEBUG, "TESTING": ProductionConfig.TESTING, "SECRET_KEY": ProductionConfig.SECRET_KEY, "WTF_CSRF_ENABLED": ProductionConfig.WTF_CSRF_ENABLED, "WTF_CSRF_TIME_LIMIT": ProductionConfig.WTF_CSRF_TIME_LIMIT, "SESSION_COOKIE_SECURE": ProductionConfig.SESSION_COOKIE_SECURE, "SESSION_COOKIE_HTTPONLY": ProductionConfig.SESSION_COOKIE_HTTPONLY, "SESSION_COOKIE_SAMESITE": ProductionConfig.SESSION_COOKIE_SAMESITE, "PERMANENT_SESSION_LIFETIME": ProductionConfig.PERMANENT_SESSION_LIFETIME, "SEND_FILE_MAX_AGE_DEFAULT": ProductionConfig.SEND_FILE_MAX_AGE_DEFAULT, "TEMPLATES_AUTO_RELOAD": ProductionConfig.TEMPLATES_AUTO_RELOAD, "EXPLAIN_TEMPLATE_LOADING": ProductionConfig.EXPLAIN_TEMPLATE_LOADING, "MAX_CONTENT_LENGTH": ProductionConfig.MAX_CONTENT_LENGTH, "JSON_SORT_KEYS": ProductionConfig.JSON_SORT_KEYS, "JSONIFY_PRETTYPRINT_REGULAR": ProductionConfig.JSONIFY_PRETTYPRINT_REGULAR, "JSONIFY_MIMETYPE": ProductionConfig.JSONIFY_MIMETYPE, "SQLALCHEMY_TRACK_MODIFICATIONS": ProductionConfig.SQLALCHEMY_TRACK_MODIFICATIONS, "SQLALCHEMY_ENGINE_OPTIONS": ProductionConfig.SQLALCHEMY_ENGINE_OPTIONS }) # Jinja2-Umgebung für Production app.jinja_env.globals.update({ 'production_mode': True, 'development_mode': False, 'optimized_mode': ProductionConfig.OPTIMIZED_MODE, 'use_minified_assets': ProductionConfig.USE_MINIFIED_ASSETS, 'disable_animations': ProductionConfig.DISABLE_ANIMATIONS, 'limit_glassmorphism': ProductionConfig.LIMIT_GLASSMORPHISM, 'environment_name': ProductionConfig.ENVIRONMENT_NAME, 'company_name': ProductionConfig.COMPANY_NAME, 'compliance_mode': ProductionConfig.COMPLIANCE_MODE, 'offline_mode': ProductionConfig.OFFLINE_MODE, 'use_local_assets_only': ProductionConfig.USE_LOCAL_ASSETS_ONLY, 'base_template': 'base-production.html' }) app_logger.info("[PRODUCTION] ✅ Konfiguration aktiviert") app_logger.info("[PRODUCTION] ✅ Environment: ") app_logger.info("[PRODUCTION] ✅ Air-Gapped Mode: ") app_logger.info("[PRODUCTION] ✅ Compliance Mode: ") app_logger.info("[PRODUCTION] ✅ Performance Optimized: ") def apply_development_config(app): """Wendet die Development-Konfiguration auf die Flask-App an""" app_logger.info("[DEVELOPMENT] Aktiviere Development-Konfiguration") # Dynamische Werte setzen from utils.utilities_collection import SECRET_KEY, SESSION_LIFETIME DevelopmentConfig.SECRET_KEY = os.environ.get('SECRET_KEY') or SECRET_KEY DevelopmentConfig.PERMANENT_SESSION_LIFETIME = SESSION_LIFETIME # Flask-Basis-Konfiguration app.config.update({ "ENV": DevelopmentConfig.ENV, "DEBUG": DevelopmentConfig.DEBUG, "TESTING": DevelopmentConfig.TESTING, "SECRET_KEY": DevelopmentConfig.SECRET_KEY, "WTF_CSRF_ENABLED": DevelopmentConfig.WTF_CSRF_ENABLED, "WTF_CSRF_TIME_LIMIT": DevelopmentConfig.WTF_CSRF_TIME_LIMIT, "SESSION_COOKIE_SECURE": DevelopmentConfig.SESSION_COOKIE_SECURE, "SESSION_COOKIE_HTTPONLY": DevelopmentConfig.SESSION_COOKIE_HTTPONLY, "SESSION_COOKIE_SAMESITE": DevelopmentConfig.SESSION_COOKIE_SAMESITE, "PERMANENT_SESSION_LIFETIME": DevelopmentConfig.PERMANENT_SESSION_LIFETIME, "SEND_FILE_MAX_AGE_DEFAULT": DevelopmentConfig.SEND_FILE_MAX_AGE_DEFAULT, "TEMPLATES_AUTO_RELOAD": DevelopmentConfig.TEMPLATES_AUTO_RELOAD, "EXPLAIN_TEMPLATE_LOADING": DevelopmentConfig.EXPLAIN_TEMPLATE_LOADING, "MAX_CONTENT_LENGTH": DevelopmentConfig.MAX_CONTENT_LENGTH, "JSON_SORT_KEYS": DevelopmentConfig.JSON_SORT_KEYS, "JSONIFY_PRETTYPRINT_REGULAR": DevelopmentConfig.JSONIFY_PRETTYPRINT_REGULAR, "JSONIFY_MIMETYPE": DevelopmentConfig.JSONIFY_MIMETYPE, "SQLALCHEMY_TRACK_MODIFICATIONS": DevelopmentConfig.SQLALCHEMY_TRACK_MODIFICATIONS, "SQLALCHEMY_ENGINE_OPTIONS": DevelopmentConfig.SQLALCHEMY_ENGINE_OPTIONS }) # Jinja2-Umgebung für Development app.jinja_env.globals.update({ 'production_mode': False, 'development_mode': True, 'optimized_mode': DevelopmentConfig.OPTIMIZED_MODE, 'use_minified_assets': DevelopmentConfig.USE_MINIFIED_ASSETS, 'disable_animations': DevelopmentConfig.DISABLE_ANIMATIONS, 'limit_glassmorphism': DevelopmentConfig.LIMIT_GLASSMORPHISM, 'environment_name': DevelopmentConfig.ENVIRONMENT_NAME, 'company_name': DevelopmentConfig.COMPANY_NAME, 'compliance_mode': DevelopmentConfig.COMPLIANCE_MODE, 'offline_mode': DevelopmentConfig.OFFLINE_MODE, 'use_local_assets_only': DevelopmentConfig.USE_LOCAL_ASSETS_ONLY, 'base_template': 'base.html' }) app_logger.info("[DEVELOPMENT] ✅ Konfiguration aktiviert") app_logger.info("[DEVELOPMENT] ✅ Environment: ") app_logger.info("[DEVELOPMENT] ✅ Debug Mode: ") app_logger.info("[DEVELOPMENT] ✅ SQL Echo: ") # ===== KONFIGURATION ANWENDEN ===== # Jetzt können wir die Funktionen aufrufen, da sie definiert sind ENVIRONMENT_TYPE = get_environment_type() USE_PRODUCTION_CONFIG = detect_production_environment() OFFLINE_MODE = USE_PRODUCTION_CONFIG app_logger.info("[CONFIG] Erkannte Umgebung: ") app_logger.info("[CONFIG] Production-Modus: ") if USE_PRODUCTION_CONFIG: apply_production_config(app) else: # Development-Konfiguration (konsolidiert default/fallback) app_logger.info("[CONFIG] Verwende Development-Konfiguration") apply_development_config(app) # Umgebungs-spezifische Einstellungen if OFFLINE_MODE: app_logger.info("[CONFIG] ✅ Air-Gapped/Offline-Modus aktiviert") app.config['DISABLE_EXTERNAL_REQUESTS'] = True # Session-Konfiguration app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME app.config["WTF_CSRF_ENABLED"] = True app.config["WTF_CSRF_TIME_LIMIT"] = 3600 # 1 Stunde app.config["WTF_CSRF_SSL_STRICT"] = False # Für Development app.config["WTF_CSRF_CHECK_DEFAULT"] = True app.config["WTF_CSRF_METHODS"] = ['POST', 'PUT', 'PATCH', 'DELETE'] # CSRF-Schutz initialisieren csrf = CSRFProtect(app) # CSRF-Token in Session verfügbar machen @app.before_request def csrf_protect(): """Stellt sicher, dass CSRF-Token verfügbar ist""" if request.endpoint and request.endpoint.startswith('static'): return # Guest-API-Endpunkte von CSRF befreien if request.path.startswith('/api/guest/'): return # Kein CSRF für Guest-APIs # Tapo-Hardware-Steuerung von CSRF befreien (Geräte verwenden kein CSRF) if request.path.startswith('/tapo/'): return # Kein CSRF für Tapo-Hardware-Steuerung # Drucker-API-Endpunkte mit Tapo-Integration von CSRF befreien tapo_api_paths = [ '/api/printers/control/', # Stromsteuerung über PyP100 '/api/printers/tapo/', # Alle Tapo-spezifischen APIs '/api/printers/force-refresh', # Force-Refresh (verwendet Tapo-Status) '/api/printers/status', # Status-API (verwendet Tapo-Status) '/api/admin/printers/', # Admin-Printer-APIs (Toggle-Funktion) ] for path in tapo_api_paths: if request.path.startswith(path): return # Kein CSRF für Tapo-Hardware-APIs try: from flask_wtf.csrf import generate_csrf token = generate_csrf() session['_csrf_token'] = token except Exception as e: app_logger.warning("CSRF-Token konnte nicht in Session gesetzt werden: ") # Template-Funktionen für CSRF-Token @app.template_global() def csrf_token(): """CSRF-Token für Templates verfügbar machen.""" try: from flask_wtf.csrf import generate_csrf token = generate_csrf() app_logger.debug("CSRF-Token generiert: ...") return token except Exception as e: app_logger.error("CSRF-Token konnte nicht generiert werden: ") # Fallback: Einfaches Token basierend auf Session import secrets fallback_token = secrets.token_urlsafe(32) app_logger.warning("Verwende Fallback-Token: ...") return fallback_token @app.errorhandler(CSRFError) def csrf_error(error): """Behandelt CSRF-Fehler mit detaillierter Diagnose""" # Guest-APIs sollten nie CSRF-Fehler haben if request.path.startswith('/api/guest/'): app_logger.warning("CSRF-Fehler bei Guest-API (sollte nicht passieren): ") return jsonify({ "success": False, "error": "Unerwarteter Sicherheitsfehler bei Guest-API" }), 500 app_logger.error("CSRF-Fehler für : ") app_logger.error("Request Headers: ") app_logger.error("Request Form: ") if request.path.startswith('/api/'): # Für API-Anfragen: JSON-Response mit Hilfe return jsonify({ "error": "CSRF-Token ungültig oder fehlt", "description": str(error.description), "help": "Fügen Sie ein gültiges CSRF-Token zu Ihrer Anfrage hinzu", "csrf_token": csrf_token() # Neues Token für Retry }), 400 else: # Für normale Anfragen: Weiterleitung mit Flash-Message from flask import flash, redirect flash("Sicherheitsfehler: Anfrage wurde abgelehnt. Bitte versuchen Sie es erneut.", "error") return redirect(request.url) # Login-Manager initialisieren login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "auth.login" login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen." # Session-Manager initialisieren session_manager.init_app(app) @login_manager.user_loader def load_user(user_id): """ Lädt einen Benutzer für Flask-Login. KRITISCHER FIX: Alle User-Attribute vollständig laden VOR expunge(), um Lazy-Loading-Probleme mit theme_preference und anderen Attributen zu vermeiden. """ try: with get_db_session() as db_session: user = db_session.query(User).filter_by(id=int(user_id)).first() if user: # ALLE User-Attribute explizit laden VOR expunge() # Dies verhindert lazy-loading Fehler nach Session-Trennung _ = user.theme_preference _ = user.language_preference _ = user.email_notifications _ = user.browser_notifications _ = user.dashboard_layout _ = user.compact_mode _ = user.show_completed_jobs _ = user.auto_refresh_interval _ = user.auto_logout_timeout _ = user.department _ = user.position _ = user.phone _ = user.bio _ = user.settings # Jetzt sicher expunge (alle Daten sind geladen) db_session.expunge(user) return user except Exception as e: app_logger.error("Fehler beim Laden des Benutzers : ") return None # ===== BLUEPRINTS REGISTRIEREN ===== app.register_blueprint(auth_blueprint) # Vereinheitlichte Admin-Blueprints registrieren app.register_blueprint(admin_blueprint) app.register_blueprint(admin_api_blueprint) app.register_blueprint(guest_blueprint) app.register_blueprint(calendar_blueprint) app.register_blueprint(users_blueprint) # Konsolidierte User-Verwaltung app.register_blueprint(printers_blueprint) app.register_blueprint(jobs_blueprint) app.register_blueprint(kiosk_blueprint) app.register_blueprint(uploads_blueprint) app.register_blueprint(sessions_blueprint) app.register_blueprint(tapo_blueprint) # Tapo-Steckdosen-Steuerung app.register_blueprint(api_blueprint) # Einfache API-Endpunkte app.register_blueprint(legal_bp) # Rechtliche Seiten (Impressum, Datenschutz, etc.) # Energiemonitoring-Blueprints registrieren from blueprints.energy_monitoring import energy_blueprint, energy_api_blueprint app.register_blueprint(energy_blueprint) # Frontend-Routen für Energiemonitoring app.register_blueprint(energy_api_blueprint) # API-Endpunkte für Energiedaten # Drucker-Steuerungs-Blueprint registrieren (Backend-only Hardware-Kontrolle) from blueprints.drucker_steuerung import drucker_blueprint app.register_blueprint(drucker_blueprint) # Backend-only Drucker-Steuerung ohne JavaScript # ===== HILFSSYSTEME INITIALISIEREN ===== init_security(app) # ===== KONTEXT-PROZESSOREN ===== @app.context_processor def inject_now(): """Injiziert die aktuelle Zeit in alle Templates""" return {'now': datetime.now} @app.context_processor def inject_current_route(): """ Stellt current_route für alle Templates bereit. Verhindert Template-Fehler wenn request.endpoint None ist (z.B. bei 404-Fehlern). """ current_route = getattr(request, 'endpoint', None) or '' return {'current_route': current_route} @app.context_processor def inject_moment(): """ Injiziert eine moment-ähnliche Funktion für Template-Kompatibilität. Behebt UndefinedError: 'moment' is not defined in Templates. Ersetzt moment.js durch native Python datetime-Funktionalität. """ def moment(): """Mock moment() Funktion die ein datetime-ähnliches Objekt zurückgibt""" class MomentLike: def __init__(self): self.dt = datetime.now() def format(self, format_str): """Konvertiert moment.js Format-Strings zu Python strftime""" # Moment.js -> Python strftime Mapping format_mapping = { 'DD.MM.YYYY': '%d.%m.%Y', 'DD/MM/YYYY': '%d/%m/%Y', 'YYYY-MM-DD': '%Y-%m-%d', 'HH:mm': '%H:%M', 'HH:mm:ss': '%H:%M:%S', 'DD.MM.YYYY HH:mm': '%d.%m.%Y %H:%M' } # Direkte Ersetzung wenn bekanntes Format if format_str in format_mapping: return self.dt.strftime(format_mapping[format_str]) # Fallback: Standard deutsche Formatierung return self.dt.strftime('%d.%m.%Y') return MomentLike() return {'moment': moment} @app.template_filter('format_datetime') def format_datetime_filter(value, format='%d.%m.%Y %H:%M'): """Template-Filter für Datums-Formatierung""" if value is None: return "" if isinstance(value, str): try: value = datetime.fromisoformat(value) except: return value return value.strftime(format) @app.template_global() def is_optimized_mode(): """Prüft ob der optimierte Modus aktiv ist""" return USE_PRODUCTION_CONFIG # ===== REQUEST HOOKS ===== @app.before_request def log_request_info(): """Loggt Request-Informationen""" if request.endpoint != 'static': app_logger.debug("Request: ") @app.after_request def log_response_info(response): """Loggt Response-Informationen""" if request.endpoint != 'static': app_logger.debug("Response: ") return response @app.after_request def minimize_session_cookie(response): """Reduziert Session-Cookie automatisch nach jedem Request""" if current_user.is_authenticated: # Drastische Session-Cookie-Reduktion minimal_session.reduce_session_data() return response @app.before_request def check_session_activity(): """Prüft Session-Aktivität und meldet inaktive Benutzer ab mit MINIMAL Cookie-Management""" if current_user.is_authenticated: from utils.utilities_collection import SESSION_LIFETIME # DRASTISCHE Session-Reduktion - alle nicht-kritischen Daten entfernen minimal_session.reduce_session_data() # Session-Aktivität über externen Store (nicht in Cookie) session_data = session_manager.load_large_session_data('activity') or {} now = datetime.now() # Aktivitätsprüfung über externen Store last_activity = session_data.get('last_activity') if last_activity: try: last_activity_time = datetime.fromisoformat(last_activity) # SESSION_LIFETIME ist bereits in Sekunden (Integer), nicht timedelta session_age_seconds = (now - last_activity_time).total_seconds() if session_age_seconds > SESSION_LIFETIME: app_logger.info("Session abgelaufen für Benutzer ") logout_user() return redirect(url_for('auth.login')) except Exception as e: app_logger.warning("Fehler beim Parsen der Session-Zeit: ") # Aktivität NICHT in Session-Cookie speichern, sondern extern session_data['last_activity'] = now.isoformat() session_manager.store_large_session_data('activity', session_data) # Session permanent ohne zusätzliche Daten session.permanent = True # ===== HAUPTROUTEN ===== @app.route("/") def index(): """Startseite - leitet zur Login-Seite oder zum Dashboard""" if current_user.is_authenticated: return redirect(url_for("dashboard")) return redirect(url_for("auth.login")) @app.route("/dashboard") @login_required def dashboard(): """Haupt-Dashboard mit vollständigen Daten für die Anzeige""" try: from models import get_db_session, Job, Printer, User from datetime import datetime, timedelta db_session = get_db_session() # ===== AKTIVE JOBS LADEN ===== active_jobs_query = db_session.query(Job).filter( Job.status.in_(['scheduled', 'running', 'printing', 'pending']) ) # Normale Benutzer sehen nur ihre eigenen Jobs if not current_user.is_admin: active_jobs_query = active_jobs_query.filter(Job.user_id == current_user.id) active_jobs = active_jobs_query.order_by(Job.created_at.desc()).limit(10).all() # ===== DRUCKER-DATEN LADEN ===== printers = db_session.query(Printer).filter(Printer.active == True).all() # ===== STATISTIKEN BERECHNEN ===== # Aktive Jobs zählen active_jobs_count = len(active_jobs) # Verfügbare Drucker zählen available_printers_count = len([p for p in printers if p.status in ['idle', 'ready']]) # Gesamtanzahl Jobs (benutzerabhängig) if current_user.is_admin: total_jobs_count = db_session.query(Job).count() completed_jobs_count = db_session.query(Job).filter(Job.status == 'completed').count() else: total_jobs_count = db_session.query(Job).filter(Job.user_id == current_user.id).count() completed_jobs_count = db_session.query(Job).filter( Job.user_id == current_user.id, Job.status == 'completed' ).count() # Erfolgsrate berechnen success_rate = round((completed_jobs_count / total_jobs_count * 100), 1) if total_jobs_count > 0 else 0 # ===== AKTIVE JOBS FÜR DASHBOARD FORMATIEREN ===== dashboard_active_jobs = [] for job in active_jobs: job_dict = job.to_dict() # Nutzt die erweiterte to_dict Methode # Zusätzliche Dashboard-spezifische Felder job_dict.update({ 'id': job.id, 'name': job.name or f"Job #{job.id}", 'printer': job.printer.name if job.printer else "Unbekannter Drucker", 'start_time': job.start_at.strftime('%d.%m.%Y %H:%M') if job.start_at else "Nicht festgelegt", 'status_text': job._get_status_text(job.status), 'progress': job._calculate_progress(), 'file_name': job.name # Für Template-Kompatibilität }) dashboard_active_jobs.append(job_dict) # ===== DRUCKER FÜR DASHBOARD FORMATIEREN ===== dashboard_printers = [] for printer in printers: printer_dict = printer.to_dict() # Status-Icon und -Farbe bestimmen status_mapping = { 'online': {'icon': 'fas fa-check-circle', 'color': 'text-green-500'}, 'idle': {'icon': 'fas fa-check-circle', 'color': 'text-green-500'}, 'busy': {'icon': 'fas fa-clock', 'color': 'text-yellow-500'}, 'offline': {'icon': 'fas fa-times-circle', 'color': 'text-red-500'}, 'error': {'icon': 'fas fa-exclamation-triangle', 'color': 'text-red-500'} } status_info = status_mapping.get(printer.status, status_mapping['offline']) printer_dict.update({ 'status_icon': status_info['icon'], 'status_color': status_info['color'], 'is_available': printer.status in ['idle', 'online', 'ready'] }) dashboard_printers.append(printer_dict) # ===== LETZTE AKTIVITÄTEN ===== # Letzte Jobs als Aktivitäten recent_jobs = db_session.query(Job).filter( Job.user_id == current_user.id if not current_user.is_admin else True ).order_by(Job.created_at.desc()).limit(5).all() activities = [] for job in recent_jobs: time_diff = datetime.now() - job.created_at if time_diff.days == 0: if time_diff.seconds < 3600: time_str = f"vor {time_diff.seconds // 60} Minuten" else: time_str = f"vor {time_diff.seconds // 3600} Stunden" else: time_str = f"vor {time_diff.days} Tagen" activities.append({ 'description': f"Job '{job.name}' - Status: {job._get_status_text(job.status)}", 'time': time_str }) # ===== TRENDS BERECHNEN (optional) ===== yesterday = datetime.now() - timedelta(days=1) week_ago = datetime.now() - timedelta(days=7) # Jobs von heute vs. gestern jobs_today = db_session.query(Job).filter( Job.created_at >= datetime.now().replace(hour=0, minute=0, second=0) ).count() jobs_yesterday = db_session.query(Job).filter( Job.created_at >= yesterday.replace(hour=0, minute=0, second=0), Job.created_at < datetime.now().replace(hour=0, minute=0, second=0) ).count() active_jobs_trend = None if jobs_yesterday > 0: trend_percent = round(((jobs_today - jobs_yesterday) / jobs_yesterday) * 100, 1) active_jobs_trend = { 'value': trend_percent, 'direction': 'up' if trend_percent > 0 else 'down' if trend_percent < 0 else 'stable' } db_session.close() # ===== TEMPLATE-DATEN ZUSAMMENSTELLEN ===== template_data = { # Statistiken 'active_jobs_count': active_jobs_count, 'available_printers_count': available_printers_count, 'total_jobs_count': total_jobs_count, 'success_rate': success_rate, # Trends 'active_jobs_trend': active_jobs_trend, 'printer_availability_trend': None, # Kann später implementiert werden 'total_jobs_trend': None, 'success_rate_trend': None, # Daten für Listen 'active_jobs': dashboard_active_jobs, 'printers': dashboard_printers, 'activities': activities, # Metadaten 'last_updated': datetime.now().strftime('%d.%m.%Y %H:%M'), 'user_name': current_user.name, 'is_admin': current_user.is_admin } app_logger.info(f"✅ Dashboard geladen für {current_user.name}: {active_jobs_count} aktive Jobs, {available_printers_count} verfügbare Drucker") return render_template("dashboard.html", **template_data) except Exception as e: app_logger.error(f"❌ Fehler beim Laden des Dashboards: {str(e)}", exc_info=True) # Fallback-Dashboard mit minimalen Daten fallback_data = { 'active_jobs_count': 0, 'available_printers_count': 0, 'total_jobs_count': 0, 'success_rate': 0, 'active_jobs': [], 'printers': [], 'activities': [], 'error': f"Fehler beim Laden der Dashboard-Daten: {str(e)}", 'last_updated': datetime.now().strftime('%d.%m.%Y %H:%M'), 'user_name': current_user.name if current_user.is_authenticated else "Unbekannt", 'is_admin': current_user.is_admin if current_user.is_authenticated else False } return render_template("dashboard.html", **fallback_data) @app.route("/csrf-test") def csrf_test_page(): """CSRF-Test-Seite für Diagnose und Debugging""" return render_template("csrf_test.html") @app.route("/api/csrf-test", methods=["POST"]) def csrf_test_api(): """API-Endpunkt für CSRF-Tests""" try: # Test-Daten aus Request extrahieren if request.is_json: data = request.get_json() test_data = data.get('test_data', 'Keine Daten') else: test_data = request.form.get('test_data', 'Keine Daten') app_logger.info("CSRF-Test erfolgreich: ") return jsonify({ "success": True, "message": "CSRF-Test erfolgreich", "data": test_data, "timestamp": datetime.now().isoformat() }), 200 except Exception as e: app_logger.error("CSRF-Test Fehler: ") return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/admin") @login_required def admin(): """Admin-Dashboard""" if not current_user.is_admin: abort(403) return redirect(url_for("admin.admin_dashboard")) # ===== HAUPTSEITEN ===== @app.route("/printers") @login_required def printers_page(): """Zeigt die Übersichtsseite für Drucker an mit Server-Side Rendering.""" try: from utils.hardware_integration import get_tapo_controller from models import get_db_session, Printer # Drucker-Daten server-side laden db_session = get_db_session() all_printers = db_session.query(Printer).filter(Printer.active == True).all() # Live-Status direkt über TapoController abrufen tapo_controller = get_tapo_controller() # Drucker-Daten mit Status anreichern printers_with_status = [] for printer in all_printers: printer_info = { 'id': printer.id, 'name': printer.name, 'model': printer.model or 'Unbekannt', 'location': printer.location or 'Unbekannt', 'ip_address': printer.ip_address, 'plug_ip': printer.plug_ip, 'active': printer.active, 'status': 'offline' } # Status direkt über TapoController prüfen und in DB persistieren if printer.plug_ip: try: reachable, plug_status = tapo_controller.check_outlet_status( printer.plug_ip, printer_id=printer.id ) # Drucker-Status basierend auf Steckdosen-Status aktualisieren if not reachable: # Nicht erreichbar = offline printer.status = 'offline' status_text = 'Offline' status_color = 'red' elif plug_status == 'on': # Steckdose an = belegt printer.status = 'busy' status_text = 'Belegt' status_color = 'green' elif plug_status == 'off': # Steckdose aus = verfügbar printer.status = 'idle' status_text = 'Verfügbar' status_color = 'gray' else: # Unbekannter Status = offline printer.status = 'offline' status_text = 'Unbekannt' status_color = 'red' # Zeitstempel aktualisieren und in DB speichern printer.last_checked = datetime.now() printer.updated_at = datetime.now() # Status-Änderung protokollieren (nur bei tatsächlicher Änderung) from models import PlugStatusLog current_db_status = printer.status log_status = 'connected' if reachable else 'disconnected' if plug_status == 'on': log_status = 'on' elif plug_status == 'off': log_status = 'off' # Nur loggen wenn sich der Status geändert hat (vereinfachte Prüfung) try: PlugStatusLog.log_status_change( printer_id=printer.id, status=log_status, source='system', ip_address=printer.plug_ip, notes="Automatische Status-Prüfung beim Laden der Drucker-Seite" ) app_logger.debug("📊 Auto-Status protokolliert: Drucker -> ") except Exception as log_error: app_logger.error("❌ Fehler beim Auto-Protokollieren: ") printer_info.update({ 'plug_status': plug_status, 'plug_reachable': reachable, 'can_control': reachable, 'status': printer.status, 'last_checked': datetime.now().isoformat() }) # Status-Display für UI printer_info['status_display'] = { 'text': status_text, 'color': status_color } except Exception as e: printer_info.update({ 'plug_status': 'unknown', 'plug_reachable': False, 'can_control': False, 'error': str(e), 'status_display': {'text': 'Fehler', 'color': 'red'} }) else: printer_info.update({ 'plug_status': 'no_plug', 'plug_reachable': False, 'can_control': False, 'status_display': {'text': 'Kein Smart-Plug', 'color': 'gray'} }) printers_with_status.append(printer_info) # Alle Status-Updates in die Datenbank committen try: db_session.commit() app_logger.debug("✅ Status-Updates für Drucker erfolgreich gespeichert") except Exception as commit_error: app_logger.error("❌ Fehler beim Speichern der Status-Updates: ") db_session.rollback() # Einzigartige Werte für Filter models = list(set([p['model'] for p in printers_with_status if p['model'] != 'Unbekannt'])) locations = list(set([p['location'] for p in printers_with_status if p['location'] != 'Unbekannt'])) db_session.close() return render_template("printers.html", printers=printers_with_status, models=models, locations=locations, no_javascript=True) except Exception as e: app_logger.error("Fehler beim Laden der Drucker-Seite: ") return render_template("printers.html", printers=[], models=[], locations=[], error=str(e), no_javascript=True) @app.route("/printers/control", methods=["POST"]) @login_required def printer_control(): """Server-Side Drucker-Steuerung ohne JavaScript.""" try: from utils.hardware_integration import get_tapo_controller from models import get_db_session, Printer printer_id = request.form.get('printer_id') action = request.form.get('action') # 'on' oder 'off' if not printer_id or not action: flash('Ungültige Parameter für Drucker-Steuerung', 'error') return redirect(url_for('printers_page')) if action not in ['on', 'off']: flash('Ungültige Aktion. Nur "on" oder "off" erlaubt.', 'error') return redirect(url_for('printers_page')) # Drucker aus Datenbank laden db_session = get_db_session() printer = db_session.query(Printer).filter(Printer.id == int(printer_id)).first() if not printer: flash('Drucker nicht gefunden', 'error') db_session.close() return redirect(url_for('printers_page')) if not printer.plug_ip: flash('Keine Steckdose für diesen Drucker konfiguriert', 'error') db_session.close() return redirect(url_for('printers_page')) # Erst Erreichbarkeit der Steckdose prüfen tapo_controller = get_tapo_controller() # Prüfe ob Steckdose erreichbar ist reachable, current_status = tapo_controller.check_outlet_status(printer.plug_ip, printer_id=int(printer_id)) if not reachable: # Steckdose nicht erreichbar = Drucker offline printer.status = 'offline' printer.last_checked = datetime.now() printer.updated_at = datetime.now() # Status-Änderung protokollieren from models import PlugStatusLog try: PlugStatusLog.log_status_change( printer_id=int(printer_id), status='disconnected', source='system', user_id=current_user.id, ip_address=printer.plug_ip, error_message="Steckdose nicht erreichbar", notes="Erreichbarkeitsprüfung durch fehlgeschlagen" ) app_logger.debug("📊 Offline-Status protokolliert: Drucker -> disconnected") except Exception as log_error: app_logger.error("❌ Fehler beim Protokollieren des Offline-Status: ") db_session.commit() flash(f'Steckdose nicht erreichbar - Drucker als offline markiert', 'error') app_logger.warning("⚠️ Steckdose für Drucker nicht erreichbar") db_session.close() return redirect(url_for('printers_page')) # Steckdose erreichbar - Steuerung ausführen state = action == 'on' success = tapo_controller.toggle_plug(printer.plug_ip, state) if success: # Drucker-Status basierend auf Steckdosen-Aktion aktualisieren if action == 'on': # Steckdose an = Drucker belegt (busy) printer.status = 'busy' status_text = "belegt" plug_status = 'on' else: # Steckdose aus = Drucker verfügbar (idle) printer.status = 'idle' status_text = "verfügbar" plug_status = 'off' # Zeitstempel der letzten Überprüfung aktualisieren printer.last_checked = datetime.now() printer.updated_at = datetime.now() # Status-Änderung in PlugStatusLog protokollieren mit Energiedaten from models import PlugStatusLog try: # Energiedaten abrufen falls verfügbar energy_data = {} try: reachable, current_status = tapo_controller.check_outlet_status(printer.plug_ip, printer_id=int(printer_id)) if reachable: # Versuche Energiedaten zu holen (falls P110) extra_info = tapo_controller._get_extra_device_info(printer.plug_ip) if extra_info: energy_data = { 'power_consumption': extra_info.get('power_consumption'), 'voltage': extra_info.get('voltage'), 'current': extra_info.get('current'), 'firmware_version': extra_info.get('firmware_version') } except Exception as energy_error: app_logger.debug("⚡ Energiedaten für nicht verfügbar: ") action_text = "eingeschaltet" if action == 'on' else "ausgeschaltet" PlugStatusLog.log_status_change( printer_id=int(printer_id), status=plug_status, source='manual', user_id=current_user.id, ip_address=printer.plug_ip, power_consumption=energy_data.get('power_consumption'), voltage=energy_data.get('voltage'), current=energy_data.get('current'), firmware_version=energy_data.get('firmware_version'), notes="Manuell durch " ) app_logger.debug("📊 Status-Änderung mit Energiedaten protokolliert: Drucker -> ") except Exception as log_error: app_logger.error("❌ Fehler beim Protokollieren der Status-Änderung: ") # Änderungen in Datenbank speichern db_session.commit() action_text = "eingeschaltet" if action == 'on' else "ausgeschaltet" flash(f'Drucker erfolgreich - Status: ', 'success') app_logger.info("✅ Drucker erfolgreich durch - Status: ") else: action_text = "einschalten" if action == 'on' else "ausschalten" flash(f'Fehler beim der Steckdose', 'error') app_logger.error("❌ Fehler beim von Drucker ") db_session.close() return redirect(url_for('printers_page')) except Exception as e: app_logger.error("Unerwarteter Fehler bei Drucker-Steuerung: ") flash(f'Systemfehler: ', 'error') return redirect(url_for('printers_page')) @app.route("/jobs") @login_required def jobs_page(): """Zeigt die Übersichtsseite für Druckaufträge an.""" return render_template("jobs.html") @app.route("/jobs/new") @login_required def new_job_page(): """Zeigt die Seite zum Erstellen neuer Druckaufträge an.""" return render_template("jobs.html") @app.route("/stats") @login_required def stats_page(): """Zeigt die Statistiken-Seite an""" return render_template("stats.html", title="Statistiken") # ===== API-ENDPUNKTE FÜR FRONTEND-KOMPATIBILITÄT ===== # Jobs-API wird über Blueprint gehandhabt - keine doppelten Routen hier @app.route('/sw.js') def service_worker(): """Service Worker für PWA-Funktionalität""" return send_from_directory('static', 'sw.js', mimetype='application/javascript') @app.route("/api/jobs//start", methods=["POST"]) @login_required def api_start_job(job_id): """API-Endpunkt für Job-Start - leitet an Jobs-Blueprint weiter""" from blueprints.jobs import start_job return start_job(job_id) @app.route("/api/jobs//pause", methods=["POST"]) @login_required def api_pause_job(job_id): """API-Endpunkt für Job-Pause - leitet an Jobs-Blueprint weiter""" from blueprints.jobs import pause_job return pause_job(job_id) @app.route("/api/jobs//resume", methods=["POST"]) @login_required def api_resume_job(job_id): """API-Endpunkt für Job-Resume - leitet an Jobs-Blueprint weiter""" from blueprints.jobs import resume_job return resume_job(job_id) @app.route("/api/jobs//finish", methods=["POST"]) @login_required def api_finish_job(job_id): """API-Endpunkt für Job-Finish - leitet an Jobs-Blueprint weiter""" from blueprints.jobs import finish_job return finish_job(job_id) @app.route("/api/printers", methods=["GET"]) @login_required def api_get_printers(): """API-Endpunkt für Drucker-Liste mit konsistenter Response-Struktur Query-Parameter: - include_inactive: 'true' um auch inaktive Drucker anzuzeigen (default: 'false') - show_all: 'true' um ALLE Drucker anzuzeigen, unabhängig vom Status (default: 'false') """ try: from models import get_db_session, Printer # Query-Parameter auslesen - Standardmäßig nur aktive TBA Marienfelde Drucker include_inactive = request.args.get('include_inactive', 'false').lower() == 'true' show_all = request.args.get('show_all', 'false').lower() == 'true' db_session = get_db_session() # Basis-Query - NUR aktive TBA Marienfelde Drucker (die korrekten 6) query = db_session.query(Printer) if show_all: # Nur wenn explizit angefordert: ALLE Drucker zeigen pass # Keine Filter else: # Standard: Nur aktive TBA Marienfelde Drucker mit korrekten Namen correct_names = ['Drucker 1', 'Drucker 2', 'Drucker 3', 'Drucker 4', 'Drucker 5', 'Drucker 6'] query = query.filter( Printer.location == 'TBA Marienfelde', Printer.active == True, Printer.name.in_(correct_names) ) if not include_inactive: # Zusätzlich: Keine offline/unreachable Drucker (außer wenn explizit gewünscht) pass # Status-Filter wird später in der UI angewendet printers = query.all() printer_list = [] for printer in printers: # Status-Bestimmung: Wenn nicht erreichbar, dann "offline" status = printer.status or "offline" # Zusätzliche Status-Informationen is_reachable = status not in ["offline", "unreachable", "error"] printer_dict = { "id": printer.id, "name": printer.name, "model": printer.model or "Unbekanntes Modell", "location": printer.location or "Unbekannter Standort", "status": status, "ip_address": printer.ip_address, "plug_ip": printer.plug_ip, "active": getattr(printer, 'active', True), "is_reachable": is_reachable, # Zusätzliches Feld für UI "is_selectable": True, # WICHTIG: Alle Drucker sind auswählbar! "created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat(), "last_checked": printer.last_checked.isoformat() if printer.last_checked else None, "display_status": " - " # Für Dropdown-Anzeige } printer_list.append(printer_dict) db_session.close() app_logger.info("✅ API: Drucker abgerufen (include_inactive=)") # Konsistente Response-Struktur wie erwartet return jsonify({ "success": True, "printers": printer_list, "count": len(printer_list), "message": "Drucker erfolgreich geladen", "filters": { "include_inactive": include_inactive, "show_all": show_all } }) except Exception as e: app_logger.error("❌ API-Fehler beim Abrufen der Drucker: ") return jsonify({ "success": False, "error": "Fehler beim Laden der Drucker", "details": str(e), "printers": [], "count": 0 }), 500 @app.route("/api/printers/status", methods=["GET"]) @login_required def api_get_printer_status(): """API-Endpunkt für Drucker-Status mit verbessertem Status-Management""" try: # Verwende den konsolidierten Hardware Integration Monitor from utils.hardware_integration import printer_monitor # Status für alle Drucker abrufen status_data = printer_monitor.get_live_printer_status() status_list = list(status_data.values()) # Erweitere Status mit UI-freundlichen Informationen for status in status_list: # Status-Display-Informationen hinzufügen plug_status = status.get("plug_status", "unknown") if plug_status in printer_monitor.STATUS_DISPLAY: status["status_display"] = printer_monitor.STATUS_DISPLAY[plug_status] else: status["status_display"] = { "text": "Unbekannt", "color": "gray", "icon": "question" } app_logger.info("✅ API: Status für Drucker abgerufen") # Erfolgreiche Response mit konsistenter Struktur return jsonify({ "success": True, "printers": status_list, "count": len(status_list), "timestamp": datetime.now().isoformat() }) except Exception as e: app_logger.error("❌ API-Fehler beim Abrufen des Drucker-Status: ", exc_info=True) # Fallback: Mindestens die Drucker-Grunddaten zurückgeben try: from models import get_db_session, Printer db_session = get_db_session() printers = db_session.query(Printer).all() basic_status = [] for printer in printers: basic_status.append({ "id": printer.id, "name": printer.name, "location": printer.location, "model": printer.model, "plug_status": "unreachable", "plug_reachable": False, "has_plug": bool(printer.plug_ip), "error": "Status-Manager nicht verfügbar" }) db_session.close() return jsonify({ "success": False, "error": "Eingeschränkte Status-Informationen", "printers": basic_status, "count": len(basic_status), "timestamp": datetime.now().isoformat() }) except: return jsonify({ "success": False, "error": "Fehler beim Laden des Drucker-Status", "details": str(e), "printers": [], "count": 0 }), 500 @app.route("/api/health", methods=["GET"]) def api_health_check(): """Einfacher Health-Check für Monitoring""" try: from models import get_db_session # Datenbank-Verbindung testen db_session = get_db_session() db_session.execute("SELECT 1") db_session.close() return jsonify({ "status": "healthy", "timestamp": datetime.now().isoformat(), "version": "1.0.0", "services": { "database": "online", "authentication": "online" } }) except Exception as e: app_logger.error("❌ Health-Check fehlgeschlagen: ") return jsonify({ "status": "unhealthy", "timestamp": datetime.now().isoformat(), "error": str(e) }), 503 @app.route("/api/version", methods=["GET"]) def api_version(): """API-Version und System-Info""" return jsonify({ "version": "1.0.0", "name": "MYP - Manage Your Printer", "description": "3D-Drucker-Verwaltung mit Smart-Steckdosen", "build": datetime.now().strftime("%Y%m%d"), "environment": get_environment_type() }) @app.route("/api/stats", methods=['GET']) @login_required def api_stats(): """ Allgemeine System-Statistiken API-Endpunkt. Stellt grundlegende Statistiken über das System zur Verfügung. """ try: from models import get_db_session, User, Printer, Job db_session = get_db_session() try: # Grundlegende Counts total_users = db_session.query(User).count() total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() # Abgeschlossene Jobs heute from datetime import date today = date.today() completed_today = db_session.query(Job).filter( Job.status == 'completed', Job.created_at >= today ).count() # Online-Drucker (aktive Drucker) online_printers = db_session.query(Printer).filter( Printer.active == True ).count() finally: db_session.close() stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs, 'completed_today': completed_today, 'online_printers': online_printers, 'timestamp': datetime.now().isoformat() } app_logger.info("✅ API-Statistiken abgerufen von ") return jsonify({ 'success': True, 'stats': stats, 'message': 'Statistiken erfolgreich geladen' }) except Exception as e: app_logger.error("❌ Fehler beim Abrufen der API-Statistiken: ") return jsonify({ 'success': False, 'error': 'Fehler beim Laden der Statistiken', 'details': str(e) }), 500 # Statische Seiten - Weiterleitungen zu Legal Blueprint @app.route("/privacy") def privacy(): """Datenschutzerklärung - Weiterleitung zum Legal Blueprint""" return redirect(url_for("legal.privacy")) @app.route("/terms") def terms(): """Nutzungsbedingungen - Weiterleitung zum Legal Blueprint""" return redirect(url_for("legal.terms")) @app.route("/imprint") def imprint(): """Impressum - Weiterleitung zum Legal Blueprint""" return redirect(url_for("legal.imprint")) @app.route("/legal") def legal(): """Rechtliche Hinweise - Weiterleitung zum Legal Blueprint""" return redirect(url_for("legal.legal")) # ===== FEHLERBEHANDLUNG ===== @app.errorhandler(400) def bad_request_error(error): """400-Fehlerseite - Ungültige Anfrage""" app_logger.warning("Bad Request (400): - ") if request.is_json: return jsonify({ "error": "Ungültige Anfrage", "message": "Die Anfrage konnte nicht verarbeitet werden", "status_code": 400 }), 400 return render_template('errors/400.html'), 400 @app.errorhandler(401) def unauthorized_error(error): """401-Fehlerseite - Nicht autorisiert""" app_logger.warning("Unauthorized (401): - User: ") if request.is_json: return jsonify({ "error": "Nicht autorisiert", "message": "Anmeldung erforderlich", "status_code": 401 }), 401 return redirect(url_for('auth.login')) @app.errorhandler(403) def forbidden_error(error): """403-Fehlerseite - Zugriff verweigert""" app_logger.warning("Forbidden (403): - User: ") if request.is_json: return jsonify({ "error": "Zugriff verweigert", "message": "Sie haben keine Berechtigung für diese Aktion", "status_code": 403 }), 403 try: return render_template('errors/403.html'), 403 except Exception as template_error: # Fallback bei Template-Fehlern app_logger.error("Template-Fehler in 403-Handler: ") return "

403 - Zugriff verweigert

Sie haben keine Berechtigung für diese Aktion.

", 403 @app.errorhandler(404) def not_found_error(error): """404-Fehlerseite - Seite nicht gefunden""" app_logger.info("Not Found (404): ") if request.is_json: return jsonify({ "error": "Nicht gefunden", "message": "Die angeforderte Ressource wurde nicht gefunden", "status_code": 404 }), 404 try: return render_template('errors/404.html'), 404 except Exception as template_error: # Fallback bei Template-Fehlern app_logger.error("Template-Fehler in 404-Handler: ") return "

404 - Nicht gefunden

Die angeforderte Seite wurde nicht gefunden.

", 404 @app.errorhandler(405) def method_not_allowed_error(error): """405-Fehlerseite - Methode nicht erlaubt""" app_logger.warning("Method Not Allowed (405): ") if request.is_json: return jsonify({ "error": "Methode nicht erlaubt", "message": "Die HTTP-Methode ist für diese URL nicht erlaubt", "status_code": 405 }), 405 return render_template('errors/405.html'), 405 @app.errorhandler(413) def payload_too_large_error(error): """413-Fehlerseite - Datei zu groß""" app_logger.warning("Payload Too Large (413): ") if request.is_json: return jsonify({ "error": "Datei zu groß", "message": "Die hochgeladene Datei ist zu groß", "status_code": 413 }), 413 return render_template('errors/413.html'), 413 @app.errorhandler(429) def rate_limit_error(error): """429-Fehlerseite - Zu viele Anfragen""" app_logger.warning("Rate Limit Exceeded (429): - IP: ") if request.is_json: return jsonify({ "error": "Zu viele Anfragen", "message": "Sie haben zu viele Anfragen gesendet. Bitte versuchen Sie es später erneut", "status_code": 429 }), 429 return render_template('errors/429.html'), 429 @app.errorhandler(500) def internal_error(error): """500-Fehlerseite - Interner Serverfehler""" import traceback error_id = datetime.now().strftime("%Y%m%d_%H%M%S") # Detailliertes Logging für Debugging app_logger.error("Internal Server Error (500) - ID: ") app_logger.error("URL: ") app_logger.error("Method: ") app_logger.error("User: ") app_logger.error("Error: ") app_logger.error("Traceback: ") if request.is_json: return jsonify({ "error": "Interner Serverfehler", "message": "Ein unerwarteter Fehler ist aufgetreten. Bitte versuchen Sie es später erneut", "error_id": error_id, "status_code": 500 }), 500 try: return render_template('errors/500.html', error_id=error_id), 500 except Exception as template_error: # Fallback bei Template-Fehlern app_logger.error("Template-Fehler in 500-Handler: ") return "

500 - Interner Serverfehler

Ein unerwarteter Fehler ist aufgetreten. Fehler-ID:

", 500 @app.errorhandler(502) def bad_gateway_error(error): """502-Fehlerseite - Bad Gateway""" app_logger.error("Bad Gateway (502): ") if request.is_json: return jsonify({ "error": "Gateway-Fehler", "message": "Der Server ist vorübergehend nicht verfügbar", "status_code": 502 }), 502 return render_template('errors/502.html'), 502 @app.errorhandler(503) def service_unavailable_error(error): """503-Fehlerseite - Service nicht verfügbar""" app_logger.error("Service Unavailable (503): ") if request.is_json: return jsonify({ "error": "Service nicht verfügbar", "message": "Der Service ist vorübergehend nicht verfügbar. Bitte versuchen Sie es später erneut", "status_code": 503 }), 503 return render_template('errors/503.html'), 503 @app.errorhandler(505) def http_version_not_supported_error(error): """505-Fehlerseite - HTTP-Version nicht unterstützt""" app_logger.error("HTTP Version Not Supported (505): ") if request.is_json: return jsonify({ "error": "HTTP-Version nicht unterstützt", "message": "Die verwendete HTTP-Version wird vom Server nicht unterstützt", "status_code": 505 }), 505 return render_template('errors/505.html'), 505 # Allgemeiner Exception-Handler für unbehandelte Ausnahmen @app.errorhandler(Exception) def handle_exception(error): """Allgemeiner Handler für unbehandelte Ausnahmen""" import traceback error_id = datetime.now().strftime("%Y%m%d_%H%M%S") # Detailliertes Logging app_logger.error("Unhandled Exception - ID: ") app_logger.error("URL: ") app_logger.error("Method: ") app_logger.error("User: ") app_logger.error("Exception Type: ") app_logger.error("Exception: ") app_logger.error("Traceback: ") # Für HTTP-Exceptions die ursprüngliche Behandlung verwenden if hasattr(error, 'code'): return error # Für alle anderen Exceptions als 500 behandeln if request.is_json: return jsonify({ "error": "Unerwarteter Fehler", "message": "Ein unerwarteter Fehler ist aufgetreten. Bitte versuchen Sie es später erneut", "error_id": error_id, "status_code": 500 }), 500 try: return render_template('errors/500.html', error_id=error_id), 500 except Exception as template_error: # Fallback bei Template-Fehlern app_logger.error("Template-Fehler im Exception-Handler: ") return "

500 - Unerwarteter Fehler

Ein unerwarteter Fehler ist aufgetreten. Fehler-ID:

", 500 # ===== APP-FACTORY ===== def create_app(config_name=None): """ Flask-App-Factory für Tests und modulare Initialisierung Args: config_name: 'production', 'development' oder None (auto-detect) Returns: Flask: Konfigurierte Flask-App-Instanz """ # Bestimme Konfiguration if config_name is None: config_name = get_environment_type() # Setze Environment-Variablen basierend auf config_name if config_name == 'production': os.environ['FLASK_ENV'] = 'production' os.environ['USE_PRODUCTION_CONFIG'] = 'true' else: os.environ['FLASK_ENV'] = 'development' os.environ['USE_PRODUCTION_CONFIG'] = 'false' # Globale Variablen neu setzen global ENVIRONMENT_TYPE, USE_PRODUCTION_CONFIG, OFFLINE_MODE ENVIRONMENT_TYPE = config_name USE_PRODUCTION_CONFIG = (config_name == 'production') OFFLINE_MODE = USE_PRODUCTION_CONFIG # App-Konfiguration anwenden if USE_PRODUCTION_CONFIG: apply_production_config(app) app_logger.info("[FACTORY] ✅ Production-Konfiguration angewendet") else: apply_development_config(app) app_logger.info("[FACTORY] ✅ Development-Konfiguration angewendet") # Session-Manager initialisieren session_manager.init_app(app) # Sicherheitssuite initialisieren try: init_security(app) app_logger.info("[FACTORY] ✅ Sicherheitssuite initialisiert") except Exception as e: app_logger.warning("[FACTORY] ⚠️ Sicherheitssuite-Fehler: ") app_logger.info("[FACTORY] 🏭 Flask-App erstellt ()") return app # ===== HAUPTFUNKTION ===== def main(): """Hauptfunktion zum Starten der Anwendung""" try: # Umgebungsinfo loggen app_logger.info("[STARTUP] 🚀 Starte MYP -Umgebung") app_logger.info("[STARTUP] 🏢 ") app_logger.info("[STARTUP] 🔒 Air-Gapped: ") # Production-spezifische Initialisierung if USE_PRODUCTION_CONFIG: app_logger.info("[PRODUCTION] Initialisiere Production-Systeme...") # Performance-Monitoring aktivieren if getattr(ProductionConfig, 'ENABLE_PERFORMANCE_MONITORING', False): try: from utils.monitoring_analytics import performance_tracker # Performance monitoring initialized via global instance app_logger.info("[PRODUCTION] ✅ Performance-Monitoring aktiviert") except ImportError: app_logger.warning("[PRODUCTION] ⚠️ Performance-Monitoring nicht verfügbar") # Health-Checks aktivieren if getattr(ProductionConfig, 'ENABLE_HEALTH_CHECKS', False): try: from utils.monitoring_analytics import get_health_check # Simple health check initialization app_logger.info("[PRODUCTION] ✅ Health-Checks aktiviert") except ImportError: app_logger.warning("[PRODUCTION] ⚠️ Health-Checks nicht verfügbar") # Audit-Logging aktivieren if getattr(ProductionConfig, 'AUDIT_LOGGING', False): try: from utils.audit_logger import init_audit_logging init_audit_logging(app) app_logger.info("[PRODUCTION] ✅ Audit-Logging aktiviert") except ImportError: app_logger.warning("[PRODUCTION] ⚠️ Audit-Logging nicht verfügbar") # Datenbank initialisieren app_logger.info("[STARTUP] Initialisiere Datenbank...") init_database() app_logger.info("[STARTUP] ✅ Datenbank initialisiert") # Initial-Admin erstellen falls nicht vorhanden app_logger.info("[STARTUP] Prüfe Initial-Admin...") create_initial_admin() app_logger.info("[STARTUP] ✅ Admin-Benutzer geprüft") # Statische Drucker für TBA Marienfelde erstellen/aktualisieren app_logger.info("[STARTUP] Initialisiere statische Drucker...") from models import create_initial_printers success = create_initial_printers() if success: app_logger.info("[STARTUP] ✅ Statische Drucker konfiguriert") else: app_logger.warning("[STARTUP] ⚠️ Fehler bei Drucker-Initialisierung") # Queue Manager starten app_logger.info("[STARTUP] Starte Queue Manager...") start_queue_manager() app_logger.info("[STARTUP] ✅ Queue Manager gestartet") # Job Scheduler starten app_logger.info("[STARTUP] Starte Job Scheduler...") scheduler = get_job_scheduler() if scheduler: scheduler.start() app_logger.info("[STARTUP] ✅ Job Scheduler gestartet") # Steckdosen beim Systemstart initialisieren app_logger.info("[STARTUP] Initialisiere Steckdosen (alle auf 'aus' = frei)...") try: initialization_results = scheduler.initialize_all_outlets_on_startup() if initialization_results: success_count = sum(1 for result in initialization_results.values() if result.get('success', False)) total_count = len(initialization_results) if success_count == total_count: app_logger.info(f"[STARTUP] ✅ Alle {total_count} Steckdosen erfolgreich initialisiert") elif success_count > 0: app_logger.info(f"[STARTUP] ⚡ {success_count}/{total_count} Steckdosen erfolgreich initialisiert") else: app_logger.warning(f"[STARTUP] ⚠️ Keine der {total_count} Steckdosen konnte initialisiert werden") else: app_logger.info("[STARTUP] ℹ️ Keine Steckdosen zur Initialisierung gefunden") except Exception as e: app_logger.warning(f"[STARTUP] ⚠️ Fehler bei Steckdosen-Initialisierung: {str(e)}") else: app_logger.warning("[STARTUP] ⚠️ Job Scheduler nicht verfügbar") # SSL-Kontext für Production ssl_context = None if USE_PRODUCTION_CONFIG: app_logger.info("[PRODUCTION] Konfiguriere SSL...") try: from utils.ssl_suite import ssl_config ssl_context = ssl_config.get_ssl_context() app_logger.info("[PRODUCTION] ✅ SSL-Kontext konfiguriert") except ImportError: app_logger.warning("[PRODUCTION] ⚠️ SSL-Konfiguration nicht verfügbar") # Server-Konfiguration host = os.getenv('FLASK_HOST', '0.0.0.0') port = int(os.getenv('FLASK_PORT', 5000)) # Production-spezifische Server-Einstellungen server_options = { 'host': host, 'port': port, 'threaded': True } if USE_PRODUCTION_CONFIG: # Production-Server-Optimierungen server_options.update({ 'threaded': True, 'processes': 1, # Für Air-Gapped Umgebung 'use_reloader': False, 'use_debugger': False }) app_logger.info("[PRODUCTION] 🌐 Server startet auf https://:") app_logger.info("[PRODUCTION] 🔧 Threaded: ") app_logger.info("[PRODUCTION] 🔒 SSL: ") else: app_logger.info("[STARTUP] 🌐 Server startet auf http://:") # Server starten if ssl_context: server_options['ssl_context'] = ssl_context app.run(**server_options) else: app.run(**server_options) except KeyboardInterrupt: app_logger.info("[SHUTDOWN] 🛑 Shutdown durch Benutzer angefordert") except Exception as e: app_logger.error("[ERROR] ❌ Fehler beim Starten der Anwendung: ") if USE_PRODUCTION_CONFIG: # Production-Fehlerbehandlung import traceback app_logger.error("[ERROR] Traceback: ") raise finally: # Cleanup app_logger.info("[SHUTDOWN] 🧹 Cleanup wird ausgeführt...") try: # Queue Manager stoppen stop_queue_manager() app_logger.info("[SHUTDOWN] ✅ Queue Manager gestoppt") # Scheduler stoppen if 'scheduler' in locals() and scheduler: scheduler.shutdown() app_logger.info("[SHUTDOWN] ✅ Job Scheduler gestoppt") app_logger.info("[SHUTDOWN] ✅ Rate Limiter bereinigt") # Caches leeren clear_user_cache() clear_printer_status_cache() app_logger.info("[SHUTDOWN] ✅ Caches geleert") if USE_PRODUCTION_CONFIG: app_logger.info("[SHUTDOWN] 🏁 System heruntergefahren") else: app_logger.info("[SHUTDOWN] 🏁 System heruntergefahren") except Exception as cleanup_error: app_logger.error("[SHUTDOWN] ❌ Cleanup-Fehler: ") # Production-spezifische Funktionen def get_production_info(): """Gibt Production-Informationen zurück""" if USE_PRODUCTION_CONFIG: return { 'company': ProductionConfig.COMPANY_NAME, 'environment': ProductionConfig.ENVIRONMENT_NAME, 'offline_mode': ProductionConfig.OFFLINE_MODE, 'compliance_mode': ProductionConfig.COMPLIANCE_MODE, 'version': '1.0.0', 'build_date': datetime.now().strftime('%Y-%m-%d'), 'ssl_enabled': USE_PRODUCTION_CONFIG } return None # Template-Funktion für Production-Info @app.template_global() def production_info(): """Stellt Production-Informationen für Templates bereit""" return get_production_info() # Nach der Initialisierung der Blueprints und vor dem App-Start try: # Admin-Berechtigungen beim Start korrigieren from utils.permissions import fix_all_admin_permissions result = fix_all_admin_permissions() if result['success']: app_logger.info("Admin-Berechtigungen beim Start korrigiert: erstellt, aktualisiert") else: app_logger.warning("Fehler beim Korrigieren der Admin-Berechtigungen: ") except Exception as e: app_logger.error("Fehler beim Korrigieren der Admin-Berechtigungen beim Start: ") if __name__ == "__main__": main()