🐛 Backend Cleanup & Enhancements:

This commit is contained in:
2025-06-11 10:16:14 +02:00
parent 4b18bcaf0d
commit 50d4c62725
15 changed files with 1191 additions and 738 deletions

View File

@@ -48,6 +48,78 @@ class OptimizedConfig:
JSON_SORT_KEYS = False
JSONIFY_PRETTYPRINT_REGULAR = False
# ===== PRODUCTION-KONFIGURATION =====
class ProductionConfig:
"""Production-Konfiguration für Mercedes-Benz TBA Marienfelde Air-Gapped Environment"""
# Umgebung
ENV = 'production'
DEBUG = False
TESTING = False
# Sicherheit
SECRET_KEY = os.environ.get('SECRET_KEY') or SECRET_KEY
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = 3600 # 1 Stunde
# Session-Sicherheit
SESSION_COOKIE_SECURE = True # HTTPS erforderlich
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
PERMANENT_SESSION_LIFETIME = SESSION_LIFETIME
# Performance-Optimierungen
SEND_FILE_MAX_AGE_DEFAULT = 2592000 # 30 Tage Cache
TEMPLATES_AUTO_RELOAD = False
EXPLAIN_TEMPLATE_LOADING = False
# Upload-Beschränkungen
MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB für Production
# JSON-Optimierungen
JSON_SORT_KEYS = False
JSONIFY_PRETTYPRINT_REGULAR = False
JSONIFY_MIMETYPE = 'application/json'
# Logging-Level
LOG_LEVEL = 'INFO'
# Air-Gapped Einstellungen
OFFLINE_MODE = True
DISABLE_EXTERNAL_APIS = True
USE_LOCAL_ASSETS_ONLY = True
# Datenbank-Performance
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_RECYCLE = 3600
SQLALCHEMY_POOL_TIMEOUT = 20
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_pre_ping': True,
'pool_recycle': 3600,
'echo': False
}
# Security Headers
SECURITY_HEADERS = {
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Content-Security-Policy': "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self';"
}
# Mercedes-Benz Corporate Compliance
COMPANY_NAME = "Mercedes-Benz TBA Marienfelde"
ENVIRONMENT_NAME = "Production Air-Gapped"
COMPLIANCE_MODE = True
AUDIT_LOGGING = True
# Monitoring
ENABLE_METRICS = True
ENABLE_HEALTH_CHECKS = True
ENABLE_PERFORMANCE_MONITORING = True
def detect_raspberry_pi():
"""Erkennt ob das System auf einem Raspberry Pi läuft"""
try:
@@ -68,6 +140,51 @@ def detect_raspberry_pi():
return os.getenv('FORCE_OPTIMIZED_MODE', '').lower() in ['true', '1', 'yes']
def detect_production_environment():
"""Erkennt ob das System in der Production-Umgebung läuft"""
# Command-line Argument
if '--production' in sys.argv:
return True
# Umgebungsvariable
env = os.getenv('FLASK_ENV', '').lower()
if env in ['production', 'prod']:
return True
# Spezifische Production-Variablen
if os.getenv('USE_PRODUCTION_CONFIG', '').lower() in ['true', '1', 'yes']:
return True
# Mercedes-Benz spezifische Erkennung
if os.getenv('MERCEDES_ENVIRONMENT', '').lower() == 'production':
return True
# Air-Gapped Environment Detection
if os.getenv('AIR_GAPPED_MODE', '').lower() in ['true', '1', 'yes']:
return True
# Hostname-basierte Erkennung
try:
import socket
hostname = socket.gethostname().lower()
if any(keyword in hostname for keyword in ['prod', 'production', 'live', 'mercedes', 'tba']):
return True
except:
pass
return False
def get_environment_type():
"""Bestimmt den Umgebungstyp"""
if detect_production_environment():
return 'production'
elif should_use_optimized_config():
return 'optimized'
elif os.getenv('FLASK_ENV', '').lower() in ['development', 'dev']:
return 'development'
else:
return 'default'
def should_use_optimized_config():
"""Bestimmt ob die optimierte Konfiguration verwendet werden soll"""
if '--optimized' in sys.argv:
@@ -221,51 +338,38 @@ app = Flask(__name__)
app.secret_key = SECRET_KEY
# ===== KONFIGURATION ANWENDEN =====
ENVIRONMENT_TYPE = get_environment_type()
USE_PRODUCTION_CONFIG = detect_production_environment()
USE_OPTIMIZED_CONFIG = should_use_optimized_config()
if USE_OPTIMIZED_CONFIG:
app_logger.info("[START] Aktiviere optimierte Konfiguration")
app_logger.info(f"[CONFIG] Erkannte Umgebung: {ENVIRONMENT_TYPE}")
app_logger.info(f"[CONFIG] Production-Modus: {USE_PRODUCTION_CONFIG}")
app_logger.info(f"[CONFIG] Optimiert-Modus: {USE_OPTIMIZED_CONFIG}")
if USE_PRODUCTION_CONFIG:
apply_production_config(app)
app.config.update({
"DEBUG": OptimizedConfig.DEBUG,
"TESTING": OptimizedConfig.TESTING,
"SEND_FILE_MAX_AGE_DEFAULT": OptimizedConfig.SEND_FILE_MAX_AGE_DEFAULT,
"TEMPLATES_AUTO_RELOAD": OptimizedConfig.TEMPLATES_AUTO_RELOAD,
"EXPLAIN_TEMPLATE_LOADING": OptimizedConfig.EXPLAIN_TEMPLATE_LOADING,
"SESSION_COOKIE_SECURE": OptimizedConfig.SESSION_COOKIE_SECURE,
"SESSION_COOKIE_HTTPONLY": OptimizedConfig.SESSION_COOKIE_HTTPONLY,
"SESSION_COOKIE_SAMESITE": OptimizedConfig.SESSION_COOKIE_SAMESITE,
"MAX_CONTENT_LENGTH": OptimizedConfig.MAX_CONTENT_LENGTH,
"JSON_SORT_KEYS": OptimizedConfig.JSON_SORT_KEYS,
"JSONIFY_PRETTYPRINT_REGULAR": OptimizedConfig.JSONIFY_PRETTYPRINT_REGULAR
})
app.jinja_env.globals.update({
'optimized_mode': True,
'use_minified_assets': OptimizedConfig.USE_MINIFIED_ASSETS,
'disable_animations': OptimizedConfig.DISABLE_ANIMATIONS,
'limit_glassmorphism': OptimizedConfig.LIMIT_GLASSMORPHISM,
'base_template': 'base-optimized.html'
})
@app.after_request
def add_optimized_cache_headers(response):
"""Fügt optimierte Cache-Header hinzu"""
if request.endpoint == 'static' or '/static/' in request.path:
response.headers['Cache-Control'] = 'public, max-age=31536000'
response.headers['Vary'] = 'Accept-Encoding'
return response
elif USE_OPTIMIZED_CONFIG:
apply_optimized_config(app)
else:
# Standard-Entwicklungskonfiguration
app_logger.info("[CONFIG] Verwende Standard-Entwicklungskonfiguration")
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.jinja_env.globals.update({
'optimized_mode': False,
'production_mode': False,
'use_minified_assets': False,
'disable_animations': False,
'limit_glassmorphism': False,
'base_template': 'base.html'
})
# Umgebungs-spezifische Einstellungen
if OFFLINE_MODE or getattr(ProductionConfig, 'OFFLINE_MODE', False):
app_logger.info("[CONFIG] ✅ Air-Gapped/Offline-Modus aktiviert")
app.config['DISABLE_EXTERNAL_REQUESTS'] = True
# Session-Konfiguration
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["WTF_CSRF_ENABLED"] = True
@@ -761,6 +865,565 @@ def api_get_stats():
app_logger.error(f"❌ API-Fehler beim Abrufen der Statistiken: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Statistiken", "details": str(e)}), 500
# ===== ADMIN-API-ENDPUNKTE =====
def admin_required(f):
"""Decorator für Admin-only Funktionen"""
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
return jsonify({"error": "Anmeldung erforderlich"}), 401
if not current_user.is_admin:
return jsonify({"error": "Admin-Berechtigung erforderlich"}), 403
return f(*args, **kwargs)
return decorated_function
@app.route("/api/admin/users", methods=["GET"])
@login_required
@admin_required
def api_admin_get_users():
"""API-Endpunkt für alle Benutzer (Admin only)"""
try:
from models import get_db_session, User
db_session = get_db_session()
users = db_session.query(User).all()
user_list = []
for user in users:
user_dict = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": getattr(user, 'department', None),
"position": getattr(user, 'position', None)
}
user_list.append(user_dict)
db_session.close()
app_logger.info(f"✅ Admin API: {len(user_list)} Benutzer abgerufen")
return jsonify({"users": user_list})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Abrufen der Benutzer: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Benutzer", "details": str(e)}), 500
@app.route("/api/admin/users", methods=["POST"])
@login_required
@admin_required
def api_admin_create_user():
"""API-Endpunkt für Benutzer-Erstellung (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
# Pflichtfelder prüfen
required_fields = ["username", "email", "password"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
from models import get_db_session, User
from werkzeug.security import generate_password_hash
db_session = get_db_session()
# Prüfen ob Email/Username bereits existiert
existing_user = db_session.query(User).filter(
(User.email == data["email"]) | (User.username == data["username"])
).first()
if existing_user:
db_session.close()
return jsonify({"error": "Benutzer mit dieser E-Mail oder diesem Benutzernamen existiert bereits"}), 409
# Neuen Benutzer erstellen
new_user = User(
username=data["username"],
email=data["email"],
password_hash=generate_password_hash(data["password"]),
name=data.get("name", ""),
role=data.get("role", "user"),
active=data.get("active", True),
department=data.get("department", ""),
position=data.get("position", "")
)
db_session.add(new_user)
db_session.commit()
user_dict = {
"id": new_user.id,
"username": new_user.username,
"email": new_user.email,
"name": new_user.name,
"role": new_user.role,
"active": new_user.active
}
db_session.close()
app_logger.info(f"✅ Admin API: Neuer Benutzer '{new_user.username}' erstellt")
return jsonify({"user": user_dict}), 201
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Erstellen des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["GET"])
@login_required
@admin_required
def api_admin_get_user(user_id):
"""API-Endpunkt für einzelnen Benutzer (Admin only)"""
try:
from models import get_db_session, User
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
user_dict = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": getattr(user, 'department', None),
"position": getattr(user, 'position', None),
"phone": getattr(user, 'phone', None),
"bio": getattr(user, 'bio', None)
}
db_session.close()
app_logger.info(f"✅ Admin API: Benutzer {user_id} abgerufen")
return jsonify({"user": user_dict})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Abrufen des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Fehler beim Laden des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["PUT"])
@login_required
@admin_required
def api_admin_update_user(user_id):
"""API-Endpunkt für Benutzer-Update (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
from models import get_db_session, User
from werkzeug.security import generate_password_hash
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Felder aktualisieren
if "username" in data:
user.username = data["username"]
if "email" in data:
user.email = data["email"]
if "name" in data:
user.name = data["name"]
if "role" in data:
user.role = data["role"]
if "active" in data:
user.active = data["active"]
if "department" in data:
user.department = data["department"]
if "position" in data:
user.position = data["position"]
if "password" in data and data["password"]:
user.password_hash = generate_password_hash(data["password"])
user.updated_at = datetime.now()
db_session.commit()
user_dict = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active
}
db_session.close()
app_logger.info(f"✅ Admin API: Benutzer {user_id} aktualisiert")
return jsonify({"user": user_dict})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Aktualisieren des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["DELETE"])
@login_required
@admin_required
def api_admin_delete_user(user_id):
"""API-Endpunkt für Benutzer-Löschung (Admin only)"""
try:
from models import get_db_session, User
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Sich selbst nicht löschen
if user.id == current_user.id:
db_session.close()
return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400
username = user.username
db_session.delete(user)
db_session.commit()
db_session.close()
app_logger.info(f"✅ Admin API: Benutzer '{username}' (ID: {user_id}) gelöscht")
return jsonify({"success": True, "message": "Benutzer erfolgreich gelöscht"})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Löschen des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Fehler beim Löschen des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/error-recovery/status", methods=["GET"])
@login_required
@admin_required
def api_admin_error_recovery_status():
"""API-Endpunkt für Error-Recovery-Status (Admin only)"""
try:
# Mock Error-Recovery-Status da das Modul möglicherweise nicht verfügbar ist
error_stats = {
"auto_recovery_enabled": True,
"monitoring_active": True,
"total_errors": 0,
"recovered_errors": 0,
"unrecovered_errors": 0,
"recovery_success_rate": 100.0,
"last_error": None,
"uptime_hours": 24,
"status": "healthy"
}
recent_errors = []
app_logger.info("✅ Admin API: Error-Recovery-Status abgerufen")
return jsonify({
"success": True,
"statistics": error_stats,
"recent_errors": recent_errors
})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Error-Recovery-Status: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/admin/system-health", methods=["GET"])
@login_required
@admin_required
def api_admin_system_health():
"""API-Endpunkt für System-Health (Admin only)"""
try:
from models import get_db_session, Job, Printer, User
import psutil
import os
db_session = get_db_session()
# Datenbank-Statistiken
total_users = db_session.query(User).count()
active_users = db_session.query(User).filter(User.active == True).count()
total_printers = db_session.query(Printer).count()
active_printers = db_session.query(Printer).filter(Printer.active == True).count()
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
db_session.close()
# System-Ressourcen
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
system_resources = {
"cpu_percent": cpu_percent,
"memory_total_gb": round(memory.total / (1024**3), 2),
"memory_used_gb": round(memory.used / (1024**3), 2),
"memory_percent": memory.percent,
"disk_total_gb": round(disk.total / (1024**3), 2),
"disk_used_gb": round(disk.used / (1024**3), 2),
"disk_percent": round((disk.used / disk.total) * 100, 1)
}
except:
system_resources = {
"cpu_percent": 0,
"memory_total_gb": 0,
"memory_used_gb": 0,
"memory_percent": 0,
"disk_total_gb": 0,
"disk_used_gb": 0,
"disk_percent": 0
}
# Health-Status bestimmen
health_status = "healthy"
health_issues = []
if system_resources["cpu_percent"] > 80:
health_status = "warning"
health_issues.append("Hohe CPU-Auslastung")
if system_resources["memory_percent"] > 85:
health_status = "warning"
health_issues.append("Hoher Speicherverbrauch")
if system_resources["disk_percent"] > 90:
health_status = "critical"
health_issues.append("Kritischer Speicherplatz")
health_data = {
"success": True,
"health_status": health_status,
"health_issues": health_issues,
"timestamp": datetime.now().isoformat(),
"database": {
"total_users": total_users,
"active_users": active_users,
"total_printers": total_printers,
"active_printers": active_printers,
"total_jobs": total_jobs,
"active_jobs": active_jobs
},
"system_resources": system_resources,
"services": {
"database": "online",
"tapo_controller": "online",
"job_scheduler": "online",
"session_manager": "online"
}
}
app_logger.info("✅ Admin API: System-Health abgerufen")
return jsonify(health_data)
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim System-Health: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"health_status": "error"
}), 500
# ===== WEITERE WICHTIGE API-ENDPUNKTE =====
@app.route("/api/admin/printers", methods=["POST"])
@login_required
@admin_required
def api_admin_create_printer():
"""API-Endpunkt für Drucker-Erstellung (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
# Pflichtfelder prüfen
required_fields = ["name", "model"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
from models import get_db_session, Printer
db_session = get_db_session()
# Neuen Drucker erstellen
new_printer = Printer(
name=data["name"],
model=data["model"],
location=data.get("location", ""),
ip_address=data.get("ip_address"),
plug_ip=data.get("plug_ip"),
plug_username=data.get("plug_username"),
plug_password=data.get("plug_password"),
status=data.get("status", "offline"),
active=data.get("active", True)
)
db_session.add(new_printer)
db_session.commit()
printer_dict = {
"id": new_printer.id,
"name": new_printer.name,
"model": new_printer.model,
"location": new_printer.location,
"status": new_printer.status,
"active": new_printer.active
}
db_session.close()
app_logger.info(f"✅ Admin API: Neuer Drucker '{new_printer.name}' erstellt")
return jsonify({"printer": printer_dict}), 201
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Erstellen des Druckers: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Druckers", "details": str(e)}), 500
@app.route("/api/admin/printers/<int:printer_id>", methods=["PUT"])
@login_required
@admin_required
def api_admin_update_printer(printer_id):
"""API-Endpunkt für Drucker-Update (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
from models import get_db_session, Printer
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Felder aktualisieren
if "name" in data:
printer.name = data["name"]
if "model" in data:
printer.model = data["model"]
if "location" in data:
printer.location = data["location"]
if "ip_address" in data:
printer.ip_address = data["ip_address"]
if "plug_ip" in data:
printer.plug_ip = data["plug_ip"]
if "plug_username" in data:
printer.plug_username = data["plug_username"]
if "plug_password" in data:
printer.plug_password = data["plug_password"]
if "status" in data:
printer.status = data["status"]
if "active" in data:
printer.active = data["active"]
printer.last_checked = datetime.now()
db_session.commit()
printer_dict = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"status": printer.status,
"active": printer.active
}
db_session.close()
app_logger.info(f"✅ Admin API: Drucker {printer_id} aktualisiert")
return jsonify({"printer": printer_dict})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Aktualisieren des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Druckers", "details": str(e)}), 500
@app.route("/api/admin/printers/<int:printer_id>", methods=["DELETE"])
@login_required
@admin_required
def api_admin_delete_printer(printer_id):
"""API-Endpunkt für Drucker-Löschung (Admin only)"""
try:
from models import get_db_session, Printer
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
printer_name = printer.name
db_session.delete(printer)
db_session.commit()
db_session.close()
app_logger.info(f"✅ Admin API: Drucker '{printer_name}' (ID: {printer_id}) gelöscht")
return jsonify({"success": True, "message": "Drucker erfolgreich gelöscht"})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Löschen des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Fehler beim Löschen des Druckers", "details": str(e)}), 500
@app.route("/api/health", methods=["GET"])
def api_health_check():
"""Einfacher Health-Check für Monitoring"""
try:
from models import get_db_session
# Datenbank-Verbindung testen
db_session = get_db_session()
db_session.execute("SELECT 1")
db_session.close()
return jsonify({
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"services": {
"database": "online",
"authentication": "online"
}
})
except Exception as e:
app_logger.error(f"❌ Health-Check fehlgeschlagen: {str(e)}")
return jsonify({
"status": "unhealthy",
"timestamp": datetime.now().isoformat(),
"error": str(e)
}), 503
@app.route("/api/version", methods=["GET"])
def api_version():
"""API-Version und System-Info"""
return jsonify({
"version": "1.0.0",
"name": "MYP - Manage Your Printer",
"description": "3D-Drucker-Verwaltung mit Smart-Steckdosen",
"build": datetime.now().strftime("%Y%m%d"),
"environment": get_environment_type()
})
# Statische Seiten
@app.route("/privacy")
def privacy():
@@ -962,51 +1625,169 @@ def handle_exception(error):
def main():
"""Hauptfunktion zum Starten der Anwendung"""
try:
# Umgebungsinfo loggen
app_logger.info(f"[STARTUP] 🚀 Starte MYP {ENVIRONMENT_TYPE.upper()}-Umgebung")
app_logger.info(f"[STARTUP] 🏢 {getattr(ProductionConfig, 'COMPANY_NAME', 'Mercedes-Benz TBA Marienfelde')}")
app_logger.info(f"[STARTUP] 🔒 Air-Gapped: {OFFLINE_MODE or getattr(ProductionConfig, 'OFFLINE_MODE', False)}")
# Production-spezifische Initialisierung
if USE_PRODUCTION_CONFIG:
app_logger.info("[PRODUCTION] Initialisiere Production-Systeme...")
# Performance-Monitoring aktivieren
if getattr(ProductionConfig, 'ENABLE_PERFORMANCE_MONITORING', False):
try:
from utils.performance_monitor import init_performance_monitoring
init_performance_monitoring(app)
app_logger.info("[PRODUCTION] ✅ Performance-Monitoring aktiviert")
except ImportError:
app_logger.warning("[PRODUCTION] ⚠️ Performance-Monitoring nicht verfügbar")
# Health-Checks aktivieren
if getattr(ProductionConfig, 'ENABLE_HEALTH_CHECKS', False):
try:
from utils.health_checks import init_health_checks
init_health_checks(app)
app_logger.info("[PRODUCTION] ✅ Health-Checks aktiviert")
except ImportError:
app_logger.warning("[PRODUCTION] ⚠️ Health-Checks nicht verfügbar")
# Audit-Logging aktivieren
if getattr(ProductionConfig, 'AUDIT_LOGGING', False):
try:
from utils.audit_logger import init_audit_logging
init_audit_logging(app)
app_logger.info("[PRODUCTION] ✅ Audit-Logging aktiviert")
except ImportError:
app_logger.warning("[PRODUCTION] ⚠️ Audit-Logging nicht verfügbar")
# Datenbank initialisieren
app_logger.info("[STARTUP] Initialisiere Datenbank...")
init_database()
app_logger.info("[STARTUP] ✅ Datenbank initialisiert")
# Initial-Admin erstellen falls nicht vorhanden
app_logger.info("[STARTUP] Prüfe Initial-Admin...")
create_initial_admin()
app_logger.info("[STARTUP] ✅ Admin-Benutzer geprüft")
# Queue Manager starten
app_logger.info("[STARTUP] Starte Queue Manager...")
start_queue_manager()
app_logger.info("[STARTUP] ✅ Queue Manager gestartet")
# Job Scheduler starten
app_logger.info("[STARTUP] Starte Job Scheduler...")
scheduler = get_job_scheduler()
if scheduler:
scheduler.start()
app_logger.info("[STARTUP] ✅ Job Scheduler gestartet")
else:
app_logger.warning("[STARTUP] ⚠️ Job Scheduler nicht verfügbar")
# SSL-Kontext
# SSL-Kontext für Production
ssl_context = None
try:
from utils.ssl_config import get_ssl_context
ssl_context = get_ssl_context()
except ImportError:
app_logger.warning("SSL-Konfiguration nicht verfügbar")
if USE_PRODUCTION_CONFIG:
app_logger.info("[PRODUCTION] Konfiguriere SSL...")
try:
from utils.ssl_config import get_ssl_context
ssl_context = get_ssl_context()
app_logger.info("[PRODUCTION] ✅ SSL-Kontext konfiguriert")
except ImportError:
app_logger.warning("[PRODUCTION] ⚠️ SSL-Konfiguration nicht verfügbar")
# Server starten
# Server-Konfiguration
host = os.getenv('FLASK_HOST', '0.0.0.0')
port = int(os.getenv('FLASK_PORT', 5000))
app_logger.info(f"[START] Server startet auf {host}:{port}")
# Production-spezifische Server-Einstellungen
server_options = {
'host': host,
'port': port,
'threaded': True
}
if ssl_context:
app.run(host=host, port=port, ssl_context=ssl_context, threaded=True)
else:
app.run(host=host, port=port, threaded=True)
if USE_PRODUCTION_CONFIG:
# Production-Server-Optimierungen
server_options.update({
'threaded': True,
'processes': 1, # Für Air-Gapped Umgebung
'use_reloader': False,
'use_debugger': False
})
app_logger.info(f"[PRODUCTION] 🌐 Server startet auf https://{host}:{port}")
app_logger.info(f"[PRODUCTION] 🔧 Threaded: {server_options['threaded']}")
app_logger.info(f"[PRODUCTION] 🔒 SSL: {'Ja' if ssl_context else 'Nein'}")
else:
app_logger.info(f"[STARTUP] 🌐 Server startet auf http://{host}:{port}")
# Server starten
if ssl_context:
server_options['ssl_context'] = ssl_context
app.run(**server_options)
else:
app.run(**server_options)
except KeyboardInterrupt:
app_logger.info("[SHUTDOWN] 🛑 Shutdown durch Benutzer angefordert")
except Exception as e:
app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}")
app_logger.error(f"[ERROR] ❌ Fehler beim Starten der Anwendung: {str(e)}")
if USE_PRODUCTION_CONFIG:
# Production-Fehlerbehandlung
import traceback
app_logger.error(f"[ERROR] Traceback: {traceback.format_exc()}")
raise
finally:
# Cleanup
app_logger.info("[SHUTDOWN] 🧹 Cleanup wird ausgeführt...")
try:
# Queue Manager stoppen
stop_queue_manager()
if scheduler:
app_logger.info("[SHUTDOWN] ✅ Queue Manager gestoppt")
# Scheduler stoppen
if 'scheduler' in locals() and scheduler:
scheduler.shutdown()
app_logger.info("[SHUTDOWN] ✅ Job Scheduler gestoppt")
# Rate Limiter cleanup
cleanup_rate_limiter()
except:
pass
app_logger.info("[SHUTDOWN] ✅ Rate Limiter bereinigt")
# Caches leeren
clear_user_cache()
clear_printer_status_cache()
app_logger.info("[SHUTDOWN] ✅ Caches geleert")
if USE_PRODUCTION_CONFIG:
app_logger.info(f"[SHUTDOWN] 🏁 {ProductionConfig.COMPANY_NAME} System heruntergefahren")
else:
app_logger.info("[SHUTDOWN] 🏁 System heruntergefahren")
except Exception as cleanup_error:
app_logger.error(f"[SHUTDOWN] ❌ Cleanup-Fehler: {str(cleanup_error)}")
# Production-spezifische Funktionen
def get_production_info():
"""Gibt Production-Informationen zurück"""
if USE_PRODUCTION_CONFIG:
return {
'company': ProductionConfig.COMPANY_NAME,
'environment': ProductionConfig.ENVIRONMENT_NAME,
'offline_mode': ProductionConfig.OFFLINE_MODE,
'compliance_mode': ProductionConfig.COMPLIANCE_MODE,
'version': '1.0.0',
'build_date': datetime.now().strftime('%Y-%m-%d'),
'ssl_enabled': ssl_context is not None if 'ssl_context' in globals() else False
}
return None
# Template-Funktion für Production-Info
@app.template_global()
def production_info():
"""Stellt Production-Informationen für Templates bereit"""
return get_production_info()
if __name__ == "__main__":
main()