Files
Projektarbeit-MYP/backend/app.py

674 lines
22 KiB
Python

"""
Hauptanwendung für das 3D-Druck-Management-System
Diese Datei initialisiert die Flask-Anwendung und registriert alle Blueprints.
Die eigentlichen Routen sind in den jeweiligen Blueprint-Modulen definiert.
"""
import os
import sys
import logging
import atexit
import signal
from datetime import datetime
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, abort
from flask_login import LoginManager, current_user, logout_user, login_required
from flask_wtf import CSRFProtect
from flask_wtf.csrf import CSRFError
from sqlalchemy import event
from contextlib import contextmanager
import threading
# ===== OPTIMIERTE KONFIGURATION FÜR RASPBERRY PI =====
class OptimizedConfig:
"""Konfiguration für performance-optimierte Bereitstellung auf Raspberry Pi"""
# Performance-Optimierungs-Flags
OPTIMIZED_MODE = True
USE_MINIFIED_ASSETS = True
DISABLE_ANIMATIONS = True
LIMIT_GLASSMORPHISM = True
# Flask-Performance-Einstellungen
DEBUG = False
TESTING = False
SEND_FILE_MAX_AGE_DEFAULT = 31536000 # 1 Jahr Cache für statische Dateien
# Template-Einstellungen
TEMPLATES_AUTO_RELOAD = False
EXPLAIN_TEMPLATE_LOADING = False
# Session-Konfiguration
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# Performance-Optimierungen
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max Upload
JSON_SORT_KEYS = False
JSONIFY_PRETTYPRINT_REGULAR = False
def detect_raspberry_pi():
"""Erkennt ob das System auf einem Raspberry Pi läuft"""
try:
with open('/proc/cpuinfo', 'r') as f:
cpuinfo = f.read()
if 'Raspberry Pi' in cpuinfo or 'BCM' in cpuinfo:
return True
except:
pass
try:
import platform
machine = platform.machine().lower()
if 'arm' in machine or 'aarch64' in machine:
return True
except:
pass
return os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes']
def should_use_optimized_config():
"""Bestimmt ob die optimierte Konfiguration verwendet werden soll"""
if '--optimized' in sys.argv:
return True
if detect_raspberry_pi():
return True
if os.getenv('USE_OPTIMIZED_CONFIG', '').lower() in ['true', '1', 'yes']:
return True
try:
import psutil
memory_gb = psutil.virtual_memory().total / (1024**3)
if memory_gb < 2.0:
return True
except:
pass
return False
# Windows-spezifische Fixes
if os.name == 'nt':
try:
from utils.windows_fixes import get_windows_thread_manager
print("[OK] Windows-Fixes (sichere Version) geladen")
except ImportError as e:
get_windows_thread_manager = None
print(f"[WARN] Windows-Fixes nicht verfügbar: {str(e)}")
else:
get_windows_thread_manager = None
# Lokale Imports
from models import init_database, create_initial_admin, User, get_db_session
from utils.logging_config import setup_logging, get_logger, log_startup_info
from utils.job_scheduler import JobScheduler, get_job_scheduler
from utils.queue_manager import start_queue_manager, stop_queue_manager
from utils.settings import SECRET_KEY, SESSION_LIFETIME
# ===== OFFLINE-MODUS KONFIGURATION =====
OFFLINE_MODE = True # Produktionseinstellung für Offline-Betrieb
# Blueprints importieren
from blueprints.auth import auth_blueprint
# from blueprints.user import user_blueprint # Konsolidiert in user_management
from blueprints.admin_unified import admin_blueprint, admin_api_blueprint
from blueprints.guest import guest_blueprint
from blueprints.calendar import calendar_blueprint
from blueprints.user_management import users_blueprint # Konsolidierte User-Verwaltung
from blueprints.printers import printers_blueprint
from blueprints.jobs import jobs_blueprint
from blueprints.kiosk import kiosk_blueprint
from blueprints.uploads import uploads_blueprint
from blueprints.sessions import sessions_blueprint
from blueprints.tapo_control import tapo_blueprint # Tapo-Steckdosen-Steuerung
from blueprints.api_simple import api_blueprint # Einfache API-Endpunkte
# Import der Sicherheits- und Hilfssysteme
from utils.rate_limiter import cleanup_rate_limiter
from utils.security import init_security
from utils.permissions import init_permission_helpers
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
# Thread-sichere Caches
_user_cache = {}
_user_cache_lock = threading.RLock()
_printer_status_cache = {}
_printer_status_cache_lock = threading.RLock()
# Cache-Konfiguration
USER_CACHE_TTL = 300 # 5 Minuten
PRINTER_STATUS_CACHE_TTL = 30 # 30 Sekunden
def clear_user_cache(user_id=None):
"""Löscht User-Cache"""
with _user_cache_lock:
if user_id:
_user_cache.pop(user_id, None)
else:
_user_cache.clear()
def clear_printer_status_cache():
"""Löscht Drucker-Status-Cache"""
with _printer_status_cache_lock:
_printer_status_cache.clear()
# ===== AGGRESSIVE SHUTDOWN HANDLER =====
def aggressive_shutdown_handler(sig, frame):
"""Aggressiver Signal-Handler für sofortiges Herunterfahren bei Strg+C"""
print("\n[ALERT] STRG+C ERKANNT - SOFORTIGES SHUTDOWN!")
try:
# Caches leeren
clear_user_cache()
clear_printer_status_cache()
# Queue Manager stoppen
try:
stop_queue_manager()
print("[OK] Queue Manager gestoppt")
except Exception as e:
print(f"[WARN] Queue Manager Stop fehlgeschlagen: {e}")
# Datenbank-Cleanup
try:
from models import _engine, _scoped_session
if _scoped_session:
_scoped_session.remove()
if _engine:
_engine.dispose()
print("[OK] Datenbank geschlossen")
except Exception as e:
print(f"[WARN] Datenbank-Cleanup fehlgeschlagen: {e}")
except Exception as e:
print(f"[ERROR] Fehler beim Cleanup: {e}")
print("[STOP] SOFORTIGES PROGRAMM-ENDE")
os._exit(0)
def register_aggressive_shutdown():
"""Registriert den aggressiven Shutdown-Handler"""
signal.signal(signal.SIGINT, aggressive_shutdown_handler)
signal.signal(signal.SIGTERM, aggressive_shutdown_handler)
if os.name == 'nt':
try:
signal.signal(signal.SIGBREAK, aggressive_shutdown_handler)
except AttributeError:
pass
else:
try:
signal.signal(signal.SIGHUP, aggressive_shutdown_handler)
except AttributeError:
pass
atexit.register(lambda: print("[RESTART] Atexit-Handler ausgeführt"))
print("[ALERT] AGGRESSIVER STRG+C SHUTDOWN-HANDLER AKTIVIERT")
# Shutdown-Handler registrieren
register_aggressive_shutdown()
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
# ===== KONFIGURATION ANWENDEN =====
USE_OPTIMIZED_CONFIG = should_use_optimized_config()
if USE_OPTIMIZED_CONFIG:
app_logger.info("[START] Aktiviere optimierte Konfiguration")
app.config.update({
"DEBUG": OptimizedConfig.DEBUG,
"TESTING": OptimizedConfig.TESTING,
"SEND_FILE_MAX_AGE_DEFAULT": OptimizedConfig.SEND_FILE_MAX_AGE_DEFAULT,
"TEMPLATES_AUTO_RELOAD": OptimizedConfig.TEMPLATES_AUTO_RELOAD,
"EXPLAIN_TEMPLATE_LOADING": OptimizedConfig.EXPLAIN_TEMPLATE_LOADING,
"SESSION_COOKIE_SECURE": OptimizedConfig.SESSION_COOKIE_SECURE,
"SESSION_COOKIE_HTTPONLY": OptimizedConfig.SESSION_COOKIE_HTTPONLY,
"SESSION_COOKIE_SAMESITE": OptimizedConfig.SESSION_COOKIE_SAMESITE,
"MAX_CONTENT_LENGTH": OptimizedConfig.MAX_CONTENT_LENGTH,
"JSON_SORT_KEYS": OptimizedConfig.JSON_SORT_KEYS,
"JSONIFY_PRETTYPRINT_REGULAR": OptimizedConfig.JSONIFY_PRETTYPRINT_REGULAR
})
app.jinja_env.globals.update({
'optimized_mode': True,
'use_minified_assets': OptimizedConfig.USE_MINIFIED_ASSETS,
'disable_animations': OptimizedConfig.DISABLE_ANIMATIONS,
'limit_glassmorphism': OptimizedConfig.LIMIT_GLASSMORPHISM,
'base_template': 'base-optimized.html'
})
@app.after_request
def add_optimized_cache_headers(response):
"""Fügt optimierte Cache-Header hinzu"""
if request.endpoint == 'static' or '/static/' in request.path:
response.headers['Cache-Control'] = 'public, max-age=31536000'
response.headers['Vary'] = 'Accept-Encoding'
return response
else:
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.jinja_env.globals.update({
'optimized_mode': False,
'use_minified_assets': False,
'disable_animations': False,
'limit_glassmorphism': False,
'base_template': 'base.html'
})
# Session-Konfiguration
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
@app.errorhandler(CSRFError)
def csrf_error(error):
"""Behandelt CSRF-Fehler"""
app_logger.warning(f"CSRF-Fehler: {error.description}")
return jsonify({"error": "CSRF-Token ungültig oder fehlt"}), 400
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
@login_manager.user_loader
def load_user(user_id):
"""Lädt einen Benutzer für Flask-Login"""
try:
with get_db_session() as db_session:
user = db_session.query(User).filter_by(id=int(user_id)).first()
if user:
db_session.expunge(user)
return user
except Exception as e:
app_logger.error(f"Fehler beim Laden des Benutzers {user_id}: {str(e)}")
return None
# ===== BLUEPRINTS REGISTRIEREN =====
app.register_blueprint(auth_blueprint)
# app.register_blueprint(user_blueprint) # Konsolidiert in users_blueprint
# Vereinheitlichte Admin-Blueprints registrieren
app.register_blueprint(admin_blueprint)
app.register_blueprint(admin_api_blueprint)
app.register_blueprint(guest_blueprint)
app.register_blueprint(calendar_blueprint)
app.register_blueprint(users_blueprint) # Konsolidierte User-Verwaltung
app.register_blueprint(printers_blueprint)
app.register_blueprint(jobs_blueprint)
app.register_blueprint(kiosk_blueprint)
app.register_blueprint(uploads_blueprint)
app.register_blueprint(sessions_blueprint)
app.register_blueprint(tapo_blueprint) # Tapo-Steckdosen-Steuerung
app.register_blueprint(api_blueprint) # Einfache API-Endpunkte
# ===== HILFSSYSTEME INITIALISIEREN =====
init_security(app)
init_permission_helpers(app)
# ===== KONTEXT-PROZESSOREN =====
@app.context_processor
def inject_now():
"""Injiziert die aktuelle Zeit in alle Templates"""
return {'now': datetime.now}
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Template-Filter für Datums-Formatierung"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except:
return value
return value.strftime(format)
@app.template_global()
def is_optimized_mode():
"""Prüft ob der optimierte Modus aktiv ist"""
return USE_OPTIMIZED_CONFIG
# ===== REQUEST HOOKS =====
@app.before_request
def log_request_info():
"""Loggt Request-Informationen"""
if request.endpoint != 'static':
app_logger.debug(f"Request: {request.method} {request.path}")
@app.after_request
def log_response_info(response):
"""Loggt Response-Informationen"""
if request.endpoint != 'static':
app_logger.debug(f"Response: {response.status_code}")
return response
@app.before_request
def check_session_activity():
"""Prüft Session-Aktivität und meldet inaktive Benutzer ab"""
if current_user.is_authenticated:
last_activity = session.get('last_activity')
if last_activity:
try:
last_activity_time = datetime.fromisoformat(last_activity)
if (datetime.now() - last_activity_time).total_seconds() > SESSION_LIFETIME.total_seconds():
app_logger.info(f"Session abgelaufen für Benutzer {current_user.id}")
logout_user()
return redirect(url_for('auth.login'))
except:
pass
# Aktivität aktualisieren
session['last_activity'] = datetime.now().isoformat()
session.permanent = True
# ===== HAUPTROUTEN =====
@app.route("/")
def index():
"""Startseite - leitet zur Login-Seite oder zum Dashboard"""
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
return redirect(url_for("auth.login"))
@app.route("/dashboard")
@login_required
def dashboard():
"""Haupt-Dashboard"""
return render_template("dashboard.html")
@app.route("/admin")
@login_required
def admin():
"""Admin-Dashboard"""
if not current_user.is_admin:
abort(403)
return redirect(url_for("admin.admin_dashboard"))
# ===== HAUPTSEITEN =====
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/jobs/new")
@login_required
def new_job_page():
"""Zeigt die Seite zum Erstellen neuer Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistiken-Seite an"""
return render_template("stats.html", title="Statistiken")
# Statische Seiten
@app.route("/privacy")
def privacy():
"""Datenschutzerklärung"""
return render_template("privacy.html")
@app.route("/terms")
def terms():
"""Nutzungsbedingungen"""
return render_template("terms.html")
@app.route("/imprint")
def imprint():
"""Impressum"""
return render_template("imprint.html")
@app.route("/legal")
def legal():
"""Rechtliche Hinweise - Weiterleitung zum Impressum"""
return redirect(url_for("imprint"))
# ===== FEHLERBEHANDLUNG =====
@app.errorhandler(400)
def bad_request_error(error):
"""400-Fehlerseite - Ungültige Anfrage"""
app_logger.warning(f"Bad Request (400): {request.url} - {str(error)}")
if request.is_json:
return jsonify({
"error": "Ungültige Anfrage",
"message": "Die Anfrage konnte nicht verarbeitet werden",
"status_code": 400
}), 400
return render_template('errors/400.html'), 400
@app.errorhandler(401)
def unauthorized_error(error):
"""401-Fehlerseite - Nicht autorisiert"""
app_logger.warning(f"Unauthorized (401): {request.url} - User: {getattr(current_user, 'username', 'Anonymous')}")
if request.is_json:
return jsonify({
"error": "Nicht autorisiert",
"message": "Anmeldung erforderlich",
"status_code": 401
}), 401
return redirect(url_for('auth.login'))
@app.errorhandler(403)
def forbidden_error(error):
"""403-Fehlerseite - Zugriff verweigert"""
app_logger.warning(f"Forbidden (403): {request.url} - User: {getattr(current_user, 'username', 'Anonymous')}")
if request.is_json:
return jsonify({
"error": "Zugriff verweigert",
"message": "Sie haben keine Berechtigung für diese Aktion",
"status_code": 403
}), 403
return render_template('errors/403.html'), 403
@app.errorhandler(404)
def not_found_error(error):
"""404-Fehlerseite - Seite nicht gefunden"""
app_logger.info(f"Not Found (404): {request.url}")
if request.is_json:
return jsonify({
"error": "Nicht gefunden",
"message": "Die angeforderte Ressource wurde nicht gefunden",
"status_code": 404
}), 404
return render_template('errors/404.html'), 404
@app.errorhandler(405)
def method_not_allowed_error(error):
"""405-Fehlerseite - Methode nicht erlaubt"""
app_logger.warning(f"Method Not Allowed (405): {request.method} {request.url}")
if request.is_json:
return jsonify({
"error": "Methode nicht erlaubt",
"message": f"Die HTTP-Methode {request.method} ist für diese URL nicht erlaubt",
"status_code": 405
}), 405
return render_template('errors/405.html'), 405
@app.errorhandler(413)
def payload_too_large_error(error):
"""413-Fehlerseite - Datei zu groß"""
app_logger.warning(f"Payload Too Large (413): {request.url}")
if request.is_json:
return jsonify({
"error": "Datei zu groß",
"message": "Die hochgeladene Datei ist zu groß",
"status_code": 413
}), 413
return render_template('errors/413.html'), 413
@app.errorhandler(429)
def rate_limit_error(error):
"""429-Fehlerseite - Zu viele Anfragen"""
app_logger.warning(f"Rate Limit Exceeded (429): {request.url} - IP: {request.remote_addr}")
if request.is_json:
return jsonify({
"error": "Zu viele Anfragen",
"message": "Sie haben zu viele Anfragen gesendet. Bitte versuchen Sie es später erneut",
"status_code": 429
}), 429
return render_template('errors/429.html'), 429
@app.errorhandler(500)
def internal_error(error):
"""500-Fehlerseite - Interner Serverfehler"""
import traceback
error_id = datetime.now().strftime("%Y%m%d_%H%M%S")
# Detailliertes Logging für Debugging
app_logger.error(f"Internal Server Error (500) - ID: {error_id}")
app_logger.error(f"URL: {request.url}")
app_logger.error(f"Method: {request.method}")
app_logger.error(f"User: {getattr(current_user, 'username', 'Anonymous')}")
app_logger.error(f"Error: {str(error)}")
app_logger.error(f"Traceback: {traceback.format_exc()}")
if request.is_json:
return jsonify({
"error": "Interner Serverfehler",
"message": "Ein unerwarteter Fehler ist aufgetreten. Bitte versuchen Sie es später erneut",
"error_id": error_id,
"status_code": 500
}), 500
return render_template('errors/500.html', error_id=error_id), 500
@app.errorhandler(502)
def bad_gateway_error(error):
"""502-Fehlerseite - Bad Gateway"""
app_logger.error(f"Bad Gateway (502): {request.url}")
if request.is_json:
return jsonify({
"error": "Gateway-Fehler",
"message": "Der Server ist vorübergehend nicht verfügbar",
"status_code": 502
}), 502
return render_template('errors/502.html'), 502
@app.errorhandler(503)
def service_unavailable_error(error):
"""503-Fehlerseite - Service nicht verfügbar"""
app_logger.error(f"Service Unavailable (503): {request.url}")
if request.is_json:
return jsonify({
"error": "Service nicht verfügbar",
"message": "Der Service ist vorübergehend nicht verfügbar. Bitte versuchen Sie es später erneut",
"status_code": 503
}), 503
return render_template('errors/503.html'), 503
@app.errorhandler(505)
def http_version_not_supported_error(error):
"""505-Fehlerseite - HTTP-Version nicht unterstützt"""
app_logger.error(f"HTTP Version Not Supported (505): {request.url}")
if request.is_json:
return jsonify({
"error": "HTTP-Version nicht unterstützt",
"message": "Die verwendete HTTP-Version wird vom Server nicht unterstützt",
"status_code": 505
}), 505
return render_template('errors/505.html'), 505
# Allgemeiner Exception-Handler für unbehandelte Ausnahmen
@app.errorhandler(Exception)
def handle_exception(error):
"""Allgemeiner Handler für unbehandelte Ausnahmen"""
import traceback
error_id = datetime.now().strftime("%Y%m%d_%H%M%S")
# Detailliertes Logging
app_logger.error(f"Unhandled Exception - ID: {error_id}")
app_logger.error(f"URL: {request.url}")
app_logger.error(f"Method: {request.method}")
app_logger.error(f"User: {getattr(current_user, 'username', 'Anonymous')}")
app_logger.error(f"Exception Type: {type(error).__name__}")
app_logger.error(f"Exception: {str(error)}")
app_logger.error(f"Traceback: {traceback.format_exc()}")
# Für HTTP-Exceptions die ursprüngliche Behandlung verwenden
if hasattr(error, 'code'):
return error
# Für alle anderen Exceptions als 500 behandeln
if request.is_json:
return jsonify({
"error": "Unerwarteter Fehler",
"message": "Ein unerwarteter Fehler ist aufgetreten. Bitte versuchen Sie es später erneut",
"error_id": error_id,
"status_code": 500
}), 500
return render_template('errors/500.html', error_id=error_id), 500
# ===== HAUPTFUNKTION =====
def main():
"""Hauptfunktion zum Starten der Anwendung"""
try:
# Datenbank initialisieren
init_database()
# Initial-Admin erstellen falls nicht vorhanden
create_initial_admin()
# Queue Manager starten
start_queue_manager()
# Job Scheduler starten
scheduler = get_job_scheduler()
if scheduler:
scheduler.start()
# SSL-Kontext
ssl_context = None
try:
from utils.ssl_config import get_ssl_context
ssl_context = get_ssl_context()
except ImportError:
app_logger.warning("SSL-Konfiguration nicht verfügbar")
# Server starten
host = os.getenv('FLASK_HOST', '0.0.0.0')
port = int(os.getenv('FLASK_PORT', 5000))
app_logger.info(f"[START] Server startet auf {host}:{port}")
if ssl_context:
app.run(host=host, port=port, ssl_context=ssl_context, threaded=True)
else:
app.run(host=host, port=port, threaded=True)
except Exception as e:
app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}")
raise
finally:
# Cleanup
try:
stop_queue_manager()
if scheduler:
scheduler.shutdown()
cleanup_rate_limiter()
except:
pass
if __name__ == "__main__":
main()