Files
Projektarbeit-MYP/backend/app.py

1012 lines
35 KiB
Python

"""
Hauptanwendung für das 3D-Druck-Management-System
Diese Datei initialisiert die Flask-Anwendung und registriert alle Blueprints.
Die eigentlichen Routen sind in den jeweiligen Blueprint-Modulen definiert.
"""
import os
import sys
import logging
import atexit
import signal
from datetime import datetime
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, abort
from flask_login import LoginManager, current_user, logout_user, login_required
from flask_wtf import CSRFProtect
from flask_wtf.csrf import CSRFError
from sqlalchemy import event
from contextlib import contextmanager
import threading
# ===== OPTIMIERTE KONFIGURATION FÜR RASPBERRY PI =====
class OptimizedConfig:
"""Konfiguration für performance-optimierte Bereitstellung auf Raspberry Pi"""
# Performance-Optimierungs-Flags
OPTIMIZED_MODE = True
USE_MINIFIED_ASSETS = True
DISABLE_ANIMATIONS = True
LIMIT_GLASSMORPHISM = True
# Flask-Performance-Einstellungen
DEBUG = False
TESTING = False
SEND_FILE_MAX_AGE_DEFAULT = 31536000 # 1 Jahr Cache für statische Dateien
# Template-Einstellungen
TEMPLATES_AUTO_RELOAD = False
EXPLAIN_TEMPLATE_LOADING = False
# Session-Konfiguration
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# Performance-Optimierungen
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max Upload
JSON_SORT_KEYS = False
JSONIFY_PRETTYPRINT_REGULAR = False
def detect_raspberry_pi():
"""Erkennt ob das System auf einem Raspberry Pi läuft"""
try:
with open('/proc/cpuinfo', 'r') as f:
cpuinfo = f.read()
if 'Raspberry Pi' in cpuinfo or 'BCM' in cpuinfo:
return True
except:
pass
try:
import platform
machine = platform.machine().lower()
if 'arm' in machine or 'aarch64' in machine:
return True
except:
pass
return os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes']
def should_use_optimized_config():
"""Bestimmt ob die optimierte Konfiguration verwendet werden soll"""
if '--optimized' in sys.argv:
return True
if detect_raspberry_pi():
return True
if os.getenv('USE_OPTIMIZED_CONFIG', '').lower() in ['true', '1', 'yes']:
return True
try:
import psutil
memory_gb = psutil.virtual_memory().total / (1024**3)
if memory_gb < 2.0:
return True
except:
pass
return False
# Windows-spezifische Fixes
if os.name == 'nt':
try:
from utils.windows_fixes import get_windows_thread_manager
print("[OK] Windows-Fixes (sichere Version) geladen")
except ImportError as e:
get_windows_thread_manager = None
print(f"[WARN] Windows-Fixes nicht verfügbar: {str(e)}")
else:
get_windows_thread_manager = None
# Lokale Imports
from models import init_database, create_initial_admin, User, get_db_session
from utils.logging_config import setup_logging, get_logger, log_startup_info
from utils.job_scheduler import JobScheduler, get_job_scheduler
from utils.queue_manager import start_queue_manager, stop_queue_manager
from utils.settings import SECRET_KEY, SESSION_LIFETIME
# ===== OFFLINE-MODUS KONFIGURATION =====
OFFLINE_MODE = True # Produktionseinstellung für Offline-Betrieb
# Blueprints importieren
from blueprints.auth import auth_blueprint
# from blueprints.user import user_blueprint # Konsolidiert in user_management
from blueprints.admin_unified import admin_blueprint, admin_api_blueprint
from blueprints.guest import guest_blueprint
from blueprints.calendar import calendar_blueprint
from blueprints.user_management import users_blueprint # Konsolidierte User-Verwaltung
from blueprints.printers import printers_blueprint
from blueprints.jobs import jobs_blueprint
from blueprints.kiosk import kiosk_blueprint
from blueprints.uploads import uploads_blueprint
from blueprints.sessions import sessions_blueprint
from blueprints.tapo_control import tapo_blueprint # Tapo-Steckdosen-Steuerung
from blueprints.api_simple import api_blueprint # Einfache API-Endpunkte
# Import der Sicherheits- und Hilfssysteme
from utils.rate_limiter import cleanup_rate_limiter
from utils.security import init_security
from utils.permissions import init_permission_helpers
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
# Thread-sichere Caches
_user_cache = {}
_user_cache_lock = threading.RLock()
_printer_status_cache = {}
_printer_status_cache_lock = threading.RLock()
# Cache-Konfiguration
USER_CACHE_TTL = 300 # 5 Minuten
PRINTER_STATUS_CACHE_TTL = 30 # 30 Sekunden
def clear_user_cache(user_id=None):
"""Löscht User-Cache"""
with _user_cache_lock:
if user_id:
_user_cache.pop(user_id, None)
else:
_user_cache.clear()
def clear_printer_status_cache():
"""Löscht Drucker-Status-Cache"""
with _printer_status_cache_lock:
_printer_status_cache.clear()
# ===== AGGRESSIVE SHUTDOWN HANDLER =====
def aggressive_shutdown_handler(sig, frame):
"""Aggressiver Signal-Handler für sofortiges Herunterfahren bei Strg+C"""
print("\n[ALERT] STRG+C ERKANNT - SOFORTIGES SHUTDOWN!")
try:
# Caches leeren
clear_user_cache()
clear_printer_status_cache()
# Queue Manager stoppen
try:
stop_queue_manager()
print("[OK] Queue Manager gestoppt")
except Exception as e:
print(f"[WARN] Queue Manager Stop fehlgeschlagen: {e}")
# Datenbank-Cleanup
try:
from models import _engine, _scoped_session
if _scoped_session:
_scoped_session.remove()
if _engine:
_engine.dispose()
print("[OK] Datenbank geschlossen")
except Exception as e:
print(f"[WARN] Datenbank-Cleanup fehlgeschlagen: {e}")
except Exception as e:
print(f"[ERROR] Fehler beim Cleanup: {e}")
print("[STOP] SOFORTIGES PROGRAMM-ENDE")
os._exit(0)
def register_aggressive_shutdown():
"""Registriert den aggressiven Shutdown-Handler"""
signal.signal(signal.SIGINT, aggressive_shutdown_handler)
signal.signal(signal.SIGTERM, aggressive_shutdown_handler)
if os.name == 'nt':
try:
signal.signal(signal.SIGBREAK, aggressive_shutdown_handler)
except AttributeError:
pass
else:
try:
signal.signal(signal.SIGHUP, aggressive_shutdown_handler)
except AttributeError:
pass
atexit.register(lambda: print("[RESTART] Atexit-Handler ausgeführt"))
print("[ALERT] AGGRESSIVER STRG+C SHUTDOWN-HANDLER AKTIVIERT")
# Shutdown-Handler registrieren
register_aggressive_shutdown()
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
# ===== KONFIGURATION ANWENDEN =====
USE_OPTIMIZED_CONFIG = should_use_optimized_config()
if USE_OPTIMIZED_CONFIG:
app_logger.info("[START] Aktiviere optimierte Konfiguration")
app.config.update({
"DEBUG": OptimizedConfig.DEBUG,
"TESTING": OptimizedConfig.TESTING,
"SEND_FILE_MAX_AGE_DEFAULT": OptimizedConfig.SEND_FILE_MAX_AGE_DEFAULT,
"TEMPLATES_AUTO_RELOAD": OptimizedConfig.TEMPLATES_AUTO_RELOAD,
"EXPLAIN_TEMPLATE_LOADING": OptimizedConfig.EXPLAIN_TEMPLATE_LOADING,
"SESSION_COOKIE_SECURE": OptimizedConfig.SESSION_COOKIE_SECURE,
"SESSION_COOKIE_HTTPONLY": OptimizedConfig.SESSION_COOKIE_HTTPONLY,
"SESSION_COOKIE_SAMESITE": OptimizedConfig.SESSION_COOKIE_SAMESITE,
"MAX_CONTENT_LENGTH": OptimizedConfig.MAX_CONTENT_LENGTH,
"JSON_SORT_KEYS": OptimizedConfig.JSON_SORT_KEYS,
"JSONIFY_PRETTYPRINT_REGULAR": OptimizedConfig.JSONIFY_PRETTYPRINT_REGULAR
})
app.jinja_env.globals.update({
'optimized_mode': True,
'use_minified_assets': OptimizedConfig.USE_MINIFIED_ASSETS,
'disable_animations': OptimizedConfig.DISABLE_ANIMATIONS,
'limit_glassmorphism': OptimizedConfig.LIMIT_GLASSMORPHISM,
'base_template': 'base-optimized.html'
})
@app.after_request
def add_optimized_cache_headers(response):
"""Fügt optimierte Cache-Header hinzu"""
if request.endpoint == 'static' or '/static/' in request.path:
response.headers['Cache-Control'] = 'public, max-age=31536000'
response.headers['Vary'] = 'Accept-Encoding'
return response
else:
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.jinja_env.globals.update({
'optimized_mode': False,
'use_minified_assets': False,
'disable_animations': False,
'limit_glassmorphism': False,
'base_template': 'base.html'
})
# Session-Konfiguration
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
@app.errorhandler(CSRFError)
def csrf_error(error):
"""Behandelt CSRF-Fehler"""
app_logger.warning(f"CSRF-Fehler: {error.description}")
return jsonify({"error": "CSRF-Token ungültig oder fehlt"}), 400
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
@login_manager.user_loader
def load_user(user_id):
"""Lädt einen Benutzer für Flask-Login"""
try:
with get_db_session() as db_session:
user = db_session.query(User).filter_by(id=int(user_id)).first()
if user:
db_session.expunge(user)
return user
except Exception as e:
app_logger.error(f"Fehler beim Laden des Benutzers {user_id}: {str(e)}")
return None
# ===== BLUEPRINTS REGISTRIEREN =====
app.register_blueprint(auth_blueprint)
# app.register_blueprint(user_blueprint) # Konsolidiert in users_blueprint
# Vereinheitlichte Admin-Blueprints registrieren
app.register_blueprint(admin_blueprint)
app.register_blueprint(admin_api_blueprint)
app.register_blueprint(guest_blueprint)
app.register_blueprint(calendar_blueprint)
app.register_blueprint(users_blueprint) # Konsolidierte User-Verwaltung
app.register_blueprint(printers_blueprint)
app.register_blueprint(jobs_blueprint)
app.register_blueprint(kiosk_blueprint)
app.register_blueprint(uploads_blueprint)
app.register_blueprint(sessions_blueprint)
app.register_blueprint(tapo_blueprint) # Tapo-Steckdosen-Steuerung
app.register_blueprint(api_blueprint) # Einfache API-Endpunkte
# ===== HILFSSYSTEME INITIALISIEREN =====
init_security(app)
init_permission_helpers(app)
# ===== KONTEXT-PROZESSOREN =====
@app.context_processor
def inject_now():
"""Injiziert die aktuelle Zeit in alle Templates"""
return {'now': datetime.now}
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Template-Filter für Datums-Formatierung"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except:
return value
return value.strftime(format)
@app.template_global()
def is_optimized_mode():
"""Prüft ob der optimierte Modus aktiv ist"""
return USE_OPTIMIZED_CONFIG
# ===== REQUEST HOOKS =====
@app.before_request
def log_request_info():
"""Loggt Request-Informationen"""
if request.endpoint != 'static':
app_logger.debug(f"Request: {request.method} {request.path}")
@app.after_request
def log_response_info(response):
"""Loggt Response-Informationen"""
if request.endpoint != 'static':
app_logger.debug(f"Response: {response.status_code}")
return response
@app.before_request
def check_session_activity():
"""Prüft Session-Aktivität und meldet inaktive Benutzer ab"""
if current_user.is_authenticated:
last_activity = session.get('last_activity')
if last_activity:
try:
last_activity_time = datetime.fromisoformat(last_activity)
if (datetime.now() - last_activity_time).total_seconds() > SESSION_LIFETIME.total_seconds():
app_logger.info(f"Session abgelaufen für Benutzer {current_user.id}")
logout_user()
return redirect(url_for('auth.login'))
except:
pass
# Aktivität aktualisieren
session['last_activity'] = datetime.now().isoformat()
session.permanent = True
# ===== HAUPTROUTEN =====
@app.route("/")
def index():
"""Startseite - leitet zur Login-Seite oder zum Dashboard"""
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
return redirect(url_for("auth.login"))
@app.route("/dashboard")
@login_required
def dashboard():
"""Haupt-Dashboard"""
return render_template("dashboard.html")
@app.route("/admin")
@login_required
def admin():
"""Admin-Dashboard"""
if not current_user.is_admin:
abort(403)
return redirect(url_for("admin.admin_dashboard"))
# ===== HAUPTSEITEN =====
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/jobs/new")
@login_required
def new_job_page():
"""Zeigt die Seite zum Erstellen neuer Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistiken-Seite an"""
return render_template("stats.html", title="Statistiken")
# ===== API-ENDPUNKTE FÜR FRONTEND-KOMPATIBILITÄT =====
@app.route("/api/jobs", methods=["GET"])
@login_required
def api_get_jobs():
"""API-Endpunkt für Jobs - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import get_jobs
return get_jobs()
@app.route("/api/jobs", methods=["POST"])
@login_required
def api_create_job():
"""API-Endpunkt für Job-Erstellung - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import create_job
return create_job()
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
def api_get_job(job_id):
"""API-Endpunkt für einzelnen Job - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import get_job
return get_job(job_id)
@app.route("/api/jobs/<int:job_id>", methods=["PUT"])
@login_required
def api_update_job(job_id):
"""API-Endpunkt für Job-Update - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import update_job
return update_job(job_id)
@app.route("/api/jobs/<int:job_id>", methods=["DELETE"])
@login_required
def api_delete_job(job_id):
"""API-Endpunkt für Job-Löschung - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import delete_job
return delete_job(job_id)
@app.route("/api/jobs/active", methods=["GET"])
@login_required
def api_get_active_jobs():
"""API-Endpunkt für aktive Jobs - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import get_active_jobs
return get_active_jobs()
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def api_get_current_job():
"""API-Endpunkt für aktuellen Job - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import get_current_job
return get_current_job()
@app.route("/api/jobs/<int:job_id>/start", methods=["POST"])
@login_required
def api_start_job(job_id):
"""API-Endpunkt für Job-Start - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import start_job
return start_job(job_id)
@app.route("/api/jobs/<int:job_id>/pause", methods=["POST"])
@login_required
def api_pause_job(job_id):
"""API-Endpunkt für Job-Pause - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import pause_job
return pause_job(job_id)
@app.route("/api/jobs/<int:job_id>/resume", methods=["POST"])
@login_required
def api_resume_job(job_id):
"""API-Endpunkt für Job-Resume - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import resume_job
return resume_job(job_id)
@app.route("/api/jobs/<int:job_id>/finish", methods=["POST"])
@login_required
def api_finish_job(job_id):
"""API-Endpunkt für Job-Finish - leitet an Jobs-Blueprint weiter"""
from blueprints.jobs import finish_job
return finish_job(job_id)
@app.route("/api/printers", methods=["GET"])
@login_required
def api_get_printers():
"""API-Endpunkt für Drucker-Liste"""
try:
from models import get_db_session, Printer
db_session = get_db_session()
# Alle Drucker für API-Abfragen anzeigen (unabhängig von active-Status)
printers = db_session.query(Printer).all()
printer_list = []
for printer in printers:
printer_dict = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"status": printer.status,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"active": printer.active,
"last_checked": printer.last_checked.isoformat() if printer.last_checked else None
}
printer_list.append(printer_dict)
db_session.close()
app_logger.info(f"✅ API: {len(printer_list)} Drucker abgerufen")
return jsonify({"printers": printer_list})
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen der Drucker: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Drucker", "details": str(e)}), 500
@app.route("/api/printers/status", methods=["GET"])
@login_required
def api_get_printer_status():
"""API-Endpunkt für Drucker-Status"""
try:
from models import get_db_session, Printer
from utils.tapo_controller import tapo_controller
db_session = get_db_session()
# Alle Drucker für Status-Abfragen anzeigen (unabhängig von active-Status)
printers = db_session.query(Printer).all()
status_list = []
for printer in printers:
# Tapo-Steckdosen-Status prüfen
if printer.plug_ip:
try:
reachable, plug_status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
status_dict = {
"id": printer.id,
"name": printer.name,
"status": printer.status,
"plug_status": plug_status,
"plug_reachable": reachable,
"plug_ip": printer.plug_ip,
"location": printer.location
}
except Exception as e:
app_logger.warning(f"⚠️ Fehler bei Steckdosen-Status für {printer.name}: {str(e)}")
status_dict = {
"id": printer.id,
"name": printer.name,
"status": "error",
"plug_status": "unknown",
"plug_reachable": False,
"plug_ip": printer.plug_ip,
"location": printer.location,
"error": str(e)
}
else:
status_dict = {
"id": printer.id,
"name": printer.name,
"status": printer.status,
"plug_status": "no_plug",
"plug_reachable": False,
"plug_ip": None,
"location": printer.location
}
status_list.append(status_dict)
db_session.close()
app_logger.info(f"✅ API: Status für {len(status_list)} Drucker abgerufen")
return jsonify({"printers": status_list})
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen des Drucker-Status: {str(e)}")
return jsonify({"error": "Fehler beim Laden des Drucker-Status", "details": str(e)}), 500
# ===== SESSION-API-ENDPUNKTE =====
@app.route("/api/session/status", methods=["GET"])
@login_required
def api_session_status():
"""API-Endpunkt für Session-Status"""
try:
last_activity = session.get('last_activity')
if last_activity:
last_activity_time = datetime.fromisoformat(last_activity)
time_since_activity = (datetime.now() - last_activity_time).total_seconds()
time_left_seconds = max(0, SESSION_LIFETIME.total_seconds() - time_since_activity)
else:
time_left_seconds = SESSION_LIFETIME.total_seconds()
return jsonify({
"success": True,
"user": {
"id": current_user.id,
"email": current_user.email,
"name": current_user.name,
"is_admin": current_user.is_admin
},
"session": {
"time_left_seconds": int(time_left_seconds),
"max_inactive_minutes": int(SESSION_LIFETIME.total_seconds() / 60),
"last_activity": last_activity or datetime.now().isoformat()
}
})
except Exception as e:
app_logger.error(f"❌ Session-Status-Fehler: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/session/heartbeat", methods=["POST"])
@login_required
def api_session_heartbeat():
"""API-Endpunkt für Session-Heartbeat"""
try:
# Session-Aktivität aktualisieren
session['last_activity'] = datetime.now().isoformat()
session.permanent = True
# Verbleibende Zeit berechnen
time_left_seconds = SESSION_LIFETIME.total_seconds()
return jsonify({
"success": True,
"time_left_seconds": int(time_left_seconds),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
app_logger.error(f"❌ Session-Heartbeat-Fehler: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/session/extend", methods=["POST"])
@login_required
def api_session_extend():
"""API-Endpunkt für Session-Verlängerung"""
try:
data = request.get_json() or {}
extend_minutes = data.get('extend_minutes', 30)
# Session verlängern
session['last_activity'] = datetime.now().isoformat()
session.permanent = True
return jsonify({
"success": True,
"extended_minutes": extend_minutes,
"new_expiry": (datetime.now() + SESSION_LIFETIME).isoformat()
})
except Exception as e:
app_logger.error(f"❌ Session-Extend-Fehler: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/jobs/recent", methods=["GET"])
@login_required
def api_get_recent_jobs():
"""API-Endpunkt für kürzlich erstellte Jobs"""
try:
from models import get_db_session, Job
db_session = get_db_session()
# Letzte 10 Jobs des Benutzers (oder alle für Admin)
query = db_session.query(Job).order_by(Job.created_at.desc())
if not current_user.is_admin:
query = query.filter(Job.user_id == current_user.id)
recent_jobs = query.limit(10).all()
job_list = []
for job in recent_jobs:
job_dict = {
"id": job.id,
"name": job.name,
"status": job.status,
"created_at": job.created_at.isoformat() if job.created_at else None,
"start_at": job.start_at.isoformat() if job.start_at else None,
"duration_minutes": job.duration_minutes,
"printer_name": job.printer.name if job.printer else "Unbekannt"
}
job_list.append(job_dict)
db_session.close()
app_logger.info(f"✅ API: {len(job_list)} kürzliche Jobs abgerufen")
return jsonify({"jobs": job_list})
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen kürzlicher Jobs: {str(e)}")
return jsonify({"error": "Fehler beim Laden kürzlicher Jobs", "details": str(e)}), 500
@app.route("/api/stats", methods=["GET"])
@login_required
def api_get_stats():
"""API-Endpunkt für System-Statistiken"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
# Grundlegende Statistiken
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
completed_jobs = db_session.query(Job).filter(Job.status == "finished").count()
total_printers = db_session.query(Printer).filter(Printer.active == True).count()
# Benutzer-spezifische Statistiken
if not current_user.is_admin:
user_jobs = db_session.query(Job).filter(Job.user_id == current_user.id).count()
user_active_jobs = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status.in_(["scheduled", "running"])
).count()
else:
user_jobs = total_jobs
user_active_jobs = active_jobs
db_session.close()
stats = {
"total_jobs": total_jobs,
"active_jobs": active_jobs,
"completed_jobs": completed_jobs,
"total_printers": total_printers,
"user_jobs": user_jobs,
"user_active_jobs": user_active_jobs,
"timestamp": datetime.now().isoformat()
}
app_logger.info(f"✅ API: Statistiken abgerufen")
return jsonify(stats)
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen der Statistiken: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Statistiken", "details": str(e)}), 500
# Statische Seiten
@app.route("/privacy")
def privacy():
"""Datenschutzerklärung"""
return render_template("privacy.html")
@app.route("/terms")
def terms():
"""Nutzungsbedingungen"""
return render_template("terms.html")
@app.route("/imprint")
def imprint():
"""Impressum"""
return render_template("imprint.html")
@app.route("/legal")
def legal():
"""Rechtliche Hinweise - Weiterleitung zum Impressum"""
return redirect(url_for("imprint"))
# ===== FEHLERBEHANDLUNG =====
@app.errorhandler(400)
def bad_request_error(error):
"""400-Fehlerseite - Ungültige Anfrage"""
app_logger.warning(f"Bad Request (400): {request.url} - {str(error)}")
if request.is_json:
return jsonify({
"error": "Ungültige Anfrage",
"message": "Die Anfrage konnte nicht verarbeitet werden",
"status_code": 400
}), 400
return render_template('errors/400.html'), 400
@app.errorhandler(401)
def unauthorized_error(error):
"""401-Fehlerseite - Nicht autorisiert"""
app_logger.warning(f"Unauthorized (401): {request.url} - User: {getattr(current_user, 'username', 'Anonymous')}")
if request.is_json:
return jsonify({
"error": "Nicht autorisiert",
"message": "Anmeldung erforderlich",
"status_code": 401
}), 401
return redirect(url_for('auth.login'))
@app.errorhandler(403)
def forbidden_error(error):
"""403-Fehlerseite - Zugriff verweigert"""
app_logger.warning(f"Forbidden (403): {request.url} - User: {getattr(current_user, 'username', 'Anonymous')}")
if request.is_json:
return jsonify({
"error": "Zugriff verweigert",
"message": "Sie haben keine Berechtigung für diese Aktion",
"status_code": 403
}), 403
return render_template('errors/403.html'), 403
@app.errorhandler(404)
def not_found_error(error):
"""404-Fehlerseite - Seite nicht gefunden"""
app_logger.info(f"Not Found (404): {request.url}")
if request.is_json:
return jsonify({
"error": "Nicht gefunden",
"message": "Die angeforderte Ressource wurde nicht gefunden",
"status_code": 404
}), 404
return render_template('errors/404.html'), 404
@app.errorhandler(405)
def method_not_allowed_error(error):
"""405-Fehlerseite - Methode nicht erlaubt"""
app_logger.warning(f"Method Not Allowed (405): {request.method} {request.url}")
if request.is_json:
return jsonify({
"error": "Methode nicht erlaubt",
"message": f"Die HTTP-Methode {request.method} ist für diese URL nicht erlaubt",
"status_code": 405
}), 405
return render_template('errors/405.html'), 405
@app.errorhandler(413)
def payload_too_large_error(error):
"""413-Fehlerseite - Datei zu groß"""
app_logger.warning(f"Payload Too Large (413): {request.url}")
if request.is_json:
return jsonify({
"error": "Datei zu groß",
"message": "Die hochgeladene Datei ist zu groß",
"status_code": 413
}), 413
return render_template('errors/413.html'), 413
@app.errorhandler(429)
def rate_limit_error(error):
"""429-Fehlerseite - Zu viele Anfragen"""
app_logger.warning(f"Rate Limit Exceeded (429): {request.url} - IP: {request.remote_addr}")
if request.is_json:
return jsonify({
"error": "Zu viele Anfragen",
"message": "Sie haben zu viele Anfragen gesendet. Bitte versuchen Sie es später erneut",
"status_code": 429
}), 429
return render_template('errors/429.html'), 429
@app.errorhandler(500)
def internal_error(error):
"""500-Fehlerseite - Interner Serverfehler"""
import traceback
error_id = datetime.now().strftime("%Y%m%d_%H%M%S")
# Detailliertes Logging für Debugging
app_logger.error(f"Internal Server Error (500) - ID: {error_id}")
app_logger.error(f"URL: {request.url}")
app_logger.error(f"Method: {request.method}")
app_logger.error(f"User: {getattr(current_user, 'username', 'Anonymous')}")
app_logger.error(f"Error: {str(error)}")
app_logger.error(f"Traceback: {traceback.format_exc()}")
if request.is_json:
return jsonify({
"error": "Interner Serverfehler",
"message": "Ein unerwarteter Fehler ist aufgetreten. Bitte versuchen Sie es später erneut",
"error_id": error_id,
"status_code": 500
}), 500
return render_template('errors/500.html', error_id=error_id), 500
@app.errorhandler(502)
def bad_gateway_error(error):
"""502-Fehlerseite - Bad Gateway"""
app_logger.error(f"Bad Gateway (502): {request.url}")
if request.is_json:
return jsonify({
"error": "Gateway-Fehler",
"message": "Der Server ist vorübergehend nicht verfügbar",
"status_code": 502
}), 502
return render_template('errors/502.html'), 502
@app.errorhandler(503)
def service_unavailable_error(error):
"""503-Fehlerseite - Service nicht verfügbar"""
app_logger.error(f"Service Unavailable (503): {request.url}")
if request.is_json:
return jsonify({
"error": "Service nicht verfügbar",
"message": "Der Service ist vorübergehend nicht verfügbar. Bitte versuchen Sie es später erneut",
"status_code": 503
}), 503
return render_template('errors/503.html'), 503
@app.errorhandler(505)
def http_version_not_supported_error(error):
"""505-Fehlerseite - HTTP-Version nicht unterstützt"""
app_logger.error(f"HTTP Version Not Supported (505): {request.url}")
if request.is_json:
return jsonify({
"error": "HTTP-Version nicht unterstützt",
"message": "Die verwendete HTTP-Version wird vom Server nicht unterstützt",
"status_code": 505
}), 505
return render_template('errors/505.html'), 505
# Allgemeiner Exception-Handler für unbehandelte Ausnahmen
@app.errorhandler(Exception)
def handle_exception(error):
"""Allgemeiner Handler für unbehandelte Ausnahmen"""
import traceback
error_id = datetime.now().strftime("%Y%m%d_%H%M%S")
# Detailliertes Logging
app_logger.error(f"Unhandled Exception - ID: {error_id}")
app_logger.error(f"URL: {request.url}")
app_logger.error(f"Method: {request.method}")
app_logger.error(f"User: {getattr(current_user, 'username', 'Anonymous')}")
app_logger.error(f"Exception Type: {type(error).__name__}")
app_logger.error(f"Exception: {str(error)}")
app_logger.error(f"Traceback: {traceback.format_exc()}")
# Für HTTP-Exceptions die ursprüngliche Behandlung verwenden
if hasattr(error, 'code'):
return error
# Für alle anderen Exceptions als 500 behandeln
if request.is_json:
return jsonify({
"error": "Unerwarteter Fehler",
"message": "Ein unerwarteter Fehler ist aufgetreten. Bitte versuchen Sie es später erneut",
"error_id": error_id,
"status_code": 500
}), 500
return render_template('errors/500.html', error_id=error_id), 500
# ===== HAUPTFUNKTION =====
def main():
"""Hauptfunktion zum Starten der Anwendung"""
try:
# Datenbank initialisieren
init_database()
# Initial-Admin erstellen falls nicht vorhanden
create_initial_admin()
# Queue Manager starten
start_queue_manager()
# Job Scheduler starten
scheduler = get_job_scheduler()
if scheduler:
scheduler.start()
# SSL-Kontext
ssl_context = None
try:
from utils.ssl_config import get_ssl_context
ssl_context = get_ssl_context()
except ImportError:
app_logger.warning("SSL-Konfiguration nicht verfügbar")
# Server starten
host = os.getenv('FLASK_HOST', '0.0.0.0')
port = int(os.getenv('FLASK_PORT', 5000))
app_logger.info(f"[START] Server startet auf {host}:{port}")
if ssl_context:
app.run(host=host, port=port, ssl_context=ssl_context, threaded=True)
else:
app.run(host=host, port=port, threaded=True)
except Exception as e:
app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}")
raise
finally:
# Cleanup
try:
stop_queue_manager()
if scheduler:
scheduler.shutdown()
cleanup_rate_limiter()
except:
pass
if __name__ == "__main__":
main()