🎉 Backend: Aktualisierung der API-Routen und Verbesserung der Fehlerprotokollierung für Job-Erstellung. URL-Präfix für Jobs-Blueprint geändert, um Konflikte zu vermeiden. Erweiterte Fehlerbehandlung und Protokollierung für kritische Systemfehler hinzugefügt. 🛠️

This commit is contained in:
2025-06-01 16:08:07 +02:00
parent 4816987f8a
commit 63c8b4f378
25 changed files with 1238 additions and 214 deletions

View File

@@ -3,7 +3,7 @@ import sys
import logging
import atexit
from datetime import datetime, timedelta
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file, abort, session, make_response, Response
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file, abort, session, make_response, Response, current_app
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_wtf import CSRFProtect
from flask_wtf.csrf import CSRFError
@@ -18,6 +18,7 @@ import time
import subprocess
import json
import signal
import shutil
from contextlib import contextmanager
# Windows-spezifische Fixes früh importieren (sichere Version)
@@ -1998,36 +1999,303 @@ def dragdrop_demo():
# ===== ERROR MONITORING SYSTEM =====
@app.route("/api/admin/system-health", methods=['GET'])
@login_required
@admin_required
def api_admin_system_health():
"""API-Endpunkt für System-Gesundheitscheck mit Dashboard-Integration."""
"""API-Endpunkt für System-Gesundheitscheck mit erweiterten Fehlermeldungen."""
try:
# Basis-System-Gesundheitscheck durchführen
critical_errors = []
warnings = []
# Dashboard-Event für System-Check senden
# 1. Datenbankverbindung prüfen
try:
db_session = get_db_session()
db_session.execute(text("SELECT 1")).fetchone()
db_session.close()
except Exception as e:
critical_errors.append({
"type": "critical",
"title": "Datenbankverbindung fehlgeschlagen",
"description": f"Keine Verbindung zur Datenbank möglich: {str(e)[:100]}",
"solution": "Datenbankdienst neustarten oder Konfiguration prüfen",
"timestamp": datetime.now().isoformat()
})
# 2. Verfügbaren Speicherplatz prüfen
try:
import shutil
total, used, free = shutil.disk_usage("/")
free_percentage = (free / total) * 100
if free_percentage < 5:
critical_errors.append({
"type": "critical",
"title": "Kritischer Speicherplatz",
"description": f"Nur noch {free_percentage:.1f}% Speicherplatz verfügbar",
"solution": "Temporäre Dateien löschen oder Speicher erweitern",
"timestamp": datetime.now().isoformat()
})
elif free_percentage < 15:
warnings.append({
"type": "warning",
"title": "Wenig Speicherplatz",
"description": f"Nur noch {free_percentage:.1f}% Speicherplatz verfügbar",
"solution": "Aufräumen empfohlen",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
warnings.append({
"type": "warning",
"title": "Speicherplatz-Prüfung fehlgeschlagen",
"description": f"Konnte Speicherplatz nicht prüfen: {str(e)[:100]}",
"solution": "Manuell prüfen",
"timestamp": datetime.now().isoformat()
})
# 3. Upload-Ordner-Struktur prüfen
upload_paths = [
"uploads/jobs", "uploads/avatars", "uploads/assets",
"uploads/backups", "uploads/logs", "uploads/temp"
]
for path in upload_paths:
full_path = os.path.join(current_app.root_path, path)
if not os.path.exists(full_path):
warnings.append({
"type": "warning",
"title": f"Upload-Ordner fehlt: {path}",
"description": f"Der Upload-Ordner {path} existiert nicht",
"solution": "Ordner automatisch erstellen lassen",
"timestamp": datetime.now().isoformat()
})
# 4. Log-Dateien-Größe prüfen
try:
logs_dir = os.path.join(current_app.root_path, "logs")
if os.path.exists(logs_dir):
total_log_size = sum(
os.path.getsize(os.path.join(logs_dir, f))
for f in os.listdir(logs_dir)
if os.path.isfile(os.path.join(logs_dir, f))
)
# Größe in MB
log_size_mb = total_log_size / (1024 * 1024)
if log_size_mb > 500: # > 500 MB
warnings.append({
"type": "warning",
"title": "Große Log-Dateien",
"description": f"Log-Dateien belegen {log_size_mb:.1f} MB Speicherplatz",
"solution": "Log-Rotation oder Archivierung empfohlen",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
app_logger.warning(f"Fehler beim Prüfen der Log-Dateien-Größe: {str(e)}")
# 5. Aktive Drucker-Verbindungen prüfen
try:
db_session = get_db_session()
total_printers = db_session.query(Printer).count()
online_printers = db_session.query(Printer).filter(Printer.status == 'online').count()
db_session.close()
if total_printers > 0:
offline_percentage = ((total_printers - online_printers) / total_printers) * 100
if offline_percentage > 50:
warnings.append({
"type": "warning",
"title": "Viele Drucker offline",
"description": f"{offline_percentage:.0f}% der Drucker sind offline",
"solution": "Drucker-Verbindungen überprüfen",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
app_logger.warning(f"Fehler beim Prüfen der Drucker-Status: {str(e)}")
# Dashboard-Event senden
emit_system_alert(
"System-Gesundheitscheck durchgeführt",
alert_type="info",
priority="normal"
alert_type="info" if not critical_errors else "warning",
priority="normal" if not critical_errors else "high"
)
health_status = "healthy" if not critical_errors else "unhealthy"
return jsonify({
"success": True,
"health_status": "healthy",
"health_status": health_status,
"critical_errors": critical_errors,
"warnings": warnings,
"timestamp": datetime.now().isoformat()
"timestamp": datetime.now().isoformat(),
"summary": {
"total_issues": len(critical_errors) + len(warnings),
"critical_count": len(critical_errors),
"warning_count": len(warnings)
}
})
except Exception as e:
app_logger.error(f"Fehler beim System-Gesundheitscheck: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
"error": str(e),
"health_status": "error"
}), 500
@app.route("/api/admin/fix-errors", methods=['POST'])
@login_required
@admin_required
def api_admin_fix_errors():
"""API-Endpunkt für automatische Fehlerbehebung."""
try:
fixed_issues = []
failed_fixes = []
# 1. Fehlende Upload-Ordner erstellen
upload_paths = [
"uploads/jobs", "uploads/avatars", "uploads/assets",
"uploads/backups", "uploads/logs", "uploads/temp",
"uploads/guests" # Ergänzt um guests
]
for path in upload_paths:
full_path = os.path.join(current_app.root_path, path)
if not os.path.exists(full_path):
try:
os.makedirs(full_path, exist_ok=True)
fixed_issues.append(f"Upload-Ordner {path} erstellt")
app_logger.info(f"Upload-Ordner automatisch erstellt: {full_path}")
except Exception as e:
failed_fixes.append(f"Konnte Upload-Ordner {path} nicht erstellen: {str(e)}")
app_logger.error(f"Fehler beim Erstellen des Upload-Ordners {path}: {str(e)}")
# 2. Temporäre Dateien aufräumen (älter als 24 Stunden)
try:
temp_path = os.path.join(current_app.root_path, "uploads/temp")
if os.path.exists(temp_path):
now = time.time()
cleaned_files = 0
for filename in os.listdir(temp_path):
file_path = os.path.join(temp_path, filename)
if os.path.isfile(file_path):
# Dateien älter als 24 Stunden löschen
if now - os.path.getmtime(file_path) > 24 * 3600:
try:
os.remove(file_path)
cleaned_files += 1
except Exception as e:
app_logger.warning(f"Konnte temporäre Datei nicht löschen {filename}: {str(e)}")
if cleaned_files > 0:
fixed_issues.append(f"{cleaned_files} alte temporäre Dateien gelöscht")
app_logger.info(f"Automatische Bereinigung: {cleaned_files} temporäre Dateien gelöscht")
except Exception as e:
failed_fixes.append(f"Temporäre Dateien Bereinigung fehlgeschlagen: {str(e)}")
app_logger.error(f"Fehler bei der temporären Dateien Bereinigung: {str(e)}")
# 3. Datenbankverbindung wiederherstellen
try:
db_session = get_db_session()
db_session.execute(text("SELECT 1")).fetchone()
db_session.close()
fixed_issues.append("Datenbankverbindung erfolgreich getestet")
except Exception as e:
failed_fixes.append(f"Datenbankverbindung konnte nicht wiederhergestellt werden: {str(e)}")
app_logger.error(f"Datenbankverbindung Wiederherstellung fehlgeschlagen: {str(e)}")
# 4. Log-Rotation durchführen bei großen Log-Dateien
try:
logs_dir = os.path.join(current_app.root_path, "logs")
if os.path.exists(logs_dir):
rotated_logs = 0
for log_file in os.listdir(logs_dir):
log_path = os.path.join(logs_dir, log_file)
if os.path.isfile(log_path) and log_file.endswith('.log'):
# Log-Dateien größer als 10 MB rotieren
if os.path.getsize(log_path) > 10 * 1024 * 1024:
try:
# Backup erstellen
backup_name = f"{log_file}.{datetime.now().strftime('%Y%m%d_%H%M%S')}.bak"
backup_path = os.path.join(logs_dir, backup_name)
shutil.copy2(log_path, backup_path)
# Log-Datei leeren (aber nicht löschen)
with open(log_path, 'w') as f:
f.write(f"# Log rotiert am {datetime.now().isoformat()}\n")
rotated_logs += 1
except Exception as e:
app_logger.warning(f"Konnte Log-Datei nicht rotieren {log_file}: {str(e)}")
if rotated_logs > 0:
fixed_issues.append(f"{rotated_logs} große Log-Dateien rotiert")
app_logger.info(f"Automatische Log-Rotation: {rotated_logs} Dateien rotiert")
except Exception as e:
failed_fixes.append(f"Log-Rotation fehlgeschlagen: {str(e)}")
app_logger.error(f"Fehler bei der Log-Rotation: {str(e)}")
# 5. Offline-Drucker Reconnect versuchen
try:
db_session = get_db_session()
offline_printers = db_session.query(Printer).filter(Printer.status != 'online').all()
reconnected_printers = 0
for printer in offline_printers:
try:
# Status-Check durchführen
if printer.plug_ip:
status, is_reachable = check_printer_status(printer.plug_ip, timeout=3)
if is_reachable:
printer.status = 'online'
reconnected_printers += 1
except Exception as e:
app_logger.debug(f"Drucker {printer.name} Reconnect fehlgeschlagen: {str(e)}")
if reconnected_printers > 0:
db_session.commit()
fixed_issues.append(f"{reconnected_printers} Drucker wieder online")
app_logger.info(f"Automatischer Drucker-Reconnect: {reconnected_printers} Drucker")
db_session.close()
except Exception as e:
failed_fixes.append(f"Drucker-Reconnect fehlgeschlagen: {str(e)}")
app_logger.error(f"Fehler beim Drucker-Reconnect: {str(e)}")
# Ergebnis zusammenfassen
total_fixed = len(fixed_issues)
total_failed = len(failed_fixes)
success = total_fixed > 0 or total_failed == 0
app_logger.info(f"Automatische Fehlerbehebung abgeschlossen: {total_fixed} behoben, {total_failed} fehlgeschlagen")
return jsonify({
"success": success,
"message": f"Automatische Reparatur abgeschlossen: {total_fixed} Probleme behoben" +
(f", {total_failed} fehlgeschlagen" if total_failed > 0 else ""),
"fixed_issues": fixed_issues,
"failed_fixes": failed_fixes,
"summary": {
"total_fixed": total_fixed,
"total_failed": total_failed
},
"timestamp": datetime.now().isoformat()
})
except Exception as e:
app_logger.error(f"Fehler bei der automatischen Fehlerbehebung: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"message": "Automatische Fehlerbehebung fehlgeschlagen"
}), 500
@app.route("/api/admin/system-health-dashboard", methods=['GET'])
@@ -3507,6 +3775,7 @@ def cleanup_temp_files():
# ===== WEITERE API-ROUTEN =====
# Legacy-Route für Kompatibilität - sollte durch Blueprint ersetzt werden
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def get_current_job():
@@ -3523,12 +3792,40 @@ def get_current_job():
else:
job_data = None
db_session.close()
db_session.close()
return jsonify(job_data)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des aktuellen Jobs: {str(e)}")
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job_detail(job_id):
"""
Gibt Details zu einem spezifischen Job zurück.
"""
db_session = get_db_session()
try:
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["DELETE"])
@login_required
@job_owner_required
@@ -3615,32 +3912,6 @@ def get_jobs():
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job_detail(job_id):
"""Gibt einen einzelnen Job zurück."""
db_session = get_db_session()
try:
from sqlalchemy.orm import joinedload
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs', methods=['POST'])
@login_required
@measure_execution_time(logger=jobs_logger, task_name="API-Job-Erstellung")
@@ -4799,7 +5070,7 @@ def export_guest_requests():
'success': False,
'message': f'Fehler beim Export: {str(e)}'
}), 500
# ===== AUTO-OPTIMIERUNG-API-ENDPUNKTE =====
@@ -5033,7 +5304,7 @@ def admin_advanced_settings():
except Exception as e:
app_logger.warning(f"Fehler beim Zählen der Log-Dateien: {str(e)}")
db_session.close()
db_session.close()
return render_template(
'admin_advanced_settings.html',
@@ -6936,150 +7207,322 @@ def export_admin_logs():
@admin_required
def api_admin_database_status():
"""
API-Endpunkt für Datenbank-Status-Informationen
API-Endpunkt für erweiterten Datenbank-Gesundheitsstatus.
Liefert detaillierte Informationen über den Zustand der SQLite-Datenbank
Führt umfassende Datenbank-Diagnose durch und liefert detaillierte
Statusinformationen für den Admin-Bereich.
Returns:
JSON: Detaillierter Datenbank-Gesundheitsstatus
"""
try:
from models import get_db_session, create_optimized_engine
from sqlalchemy import text
import os
app_logger.info(f"Datenbank-Gesundheitscheck gestartet von Admin-User {current_user.id}")
# Datenbankverbindung mit Timeout
db_session = get_db_session()
engine = create_optimized_engine()
start_time = time.time()
# Basis-Datenbankpfad
db_path = os.path.join(os.path.dirname(__file__), 'database', 'printer_system.db')
# Datenbank-Datei-Informationen
db_info = {
'file_path': db_path,
'file_exists': os.path.exists(db_path),
'file_size_mb': 0,
'last_modified': None
}
if os.path.exists(db_path):
stat_info = os.stat(db_path)
db_info['file_size_mb'] = round(stat_info.st_size / (1024 * 1024), 2)
db_info['last_modified'] = datetime.fromtimestamp(stat_info.st_mtime).isoformat()
# SQLite-spezifische Informationen
with engine.connect() as conn:
# Datenbankschema-Version und Pragma-Informationen
pragma_info = {}
# 1. Basis-Datenbankverbindung testen mit Timeout
connection_status = "OK"
connection_time_ms = 0
try:
query_start = time.time()
result = db_session.execute(text("SELECT 1 as test_connection")).fetchone()
connection_time_ms = round((time.time() - query_start) * 1000, 2)
# Wichtige PRAGMA-Werte abrufen
pragma_queries = {
'user_version': 'PRAGMA user_version',
'schema_version': 'PRAGMA schema_version',
'journal_mode': 'PRAGMA journal_mode',
'synchronous': 'PRAGMA synchronous',
'cache_size': 'PRAGMA cache_size',
'page_size': 'PRAGMA page_size',
'page_count': 'PRAGMA page_count',
'freelist_count': 'PRAGMA freelist_count',
'integrity_check': 'PRAGMA quick_check'
if connection_time_ms > 5000: # 5 Sekunden
connection_status = f"LANGSAM: {connection_time_ms}ms"
elif not result:
connection_status = "FEHLER: Keine Antwort"
except Exception as e:
connection_status = f"FEHLER: {str(e)[:100]}"
app_logger.error(f"Datenbankverbindungsfehler: {str(e)}")
# 2. Erweiterte Schema-Integrität prüfen
schema_status = {"status": "OK", "details": {}, "missing_tables": [], "table_counts": {}}
try:
required_tables = {
'users': 'Benutzer-Verwaltung',
'printers': 'Drucker-Verwaltung',
'jobs': 'Druck-Aufträge',
'guest_requests': 'Gast-Anfragen',
'settings': 'System-Einstellungen'
}
for key, query in pragma_queries.items():
existing_tables = []
table_counts = {}
for table_name, description in required_tables.items():
try:
result = conn.execute(text(query)).fetchone()
pragma_info[key] = result[0] if result else None
except Exception as e:
pragma_info[key] = f"Error: {str(e)}"
# Tabellen-Informationen
tables_result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'")).fetchall()
tables = [row[0] for row in tables_result]
# Tabellen-Statistiken
table_stats = {}
for table in tables:
try:
count_result = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).fetchone()
table_stats[table] = count_result[0] if count_result else 0
except Exception as e:
table_stats[table] = f"Error: {str(e)}"
# Connection-Pool-Status
pool_status = {}
try:
# StaticPool hat andere Methoden als andere Pool-Typen
if hasattr(engine.pool, 'size'):
pool_status['pool_size'] = engine.pool.size()
else:
pool_status['pool_size'] = 'N/A (StaticPool)'
if hasattr(engine.pool, 'checkedin'):
pool_status['checked_in'] = engine.pool.checkedin()
else:
pool_status['checked_in'] = 'N/A'
if hasattr(engine.pool, 'checkedout'):
pool_status['checked_out'] = engine.pool.checkedout()
else:
pool_status['checked_out'] = 'N/A'
if hasattr(engine.pool, 'overflow'):
pool_status['overflow'] = engine.pool.overflow()
else:
pool_status['overflow'] = 'N/A'
if hasattr(engine.pool, 'invalid'):
pool_status['invalid'] = engine.pool.invalid()
else:
pool_status['invalid'] = 'N/A'
count_result = db_session.execute(text(f"SELECT COUNT(*) as count FROM {table_name}")).fetchone()
table_count = count_result[0] if count_result else 0
# Zusätzliche StaticPool-spezifische Informationen
pool_status['pool_type'] = type(engine.pool).__name__
existing_tables.append(table_name)
table_counts[table_name] = table_count
schema_status["details"][table_name] = {
"exists": True,
"count": table_count,
"description": description
}
except Exception as table_error:
schema_status["missing_tables"].append(table_name)
schema_status["details"][table_name] = {
"exists": False,
"error": str(table_error)[:50],
"description": description
}
app_logger.warning(f"Tabelle {table_name} nicht verfügbar: {str(table_error)}")
schema_status["table_counts"] = table_counts
if len(schema_status["missing_tables"]) > 0:
schema_status["status"] = f"WARNUNG: {len(schema_status['missing_tables'])} fehlende Tabellen"
elif len(existing_tables) != len(required_tables):
schema_status["status"] = f"UNVOLLSTÄNDIG: {len(existing_tables)}/{len(required_tables)} Tabellen"
except Exception as e:
schema_status["status"] = f"FEHLER: {str(e)[:100]}"
app_logger.error(f"Schema-Integritätsprüfung fehlgeschlagen: {str(e)}")
# 3. Migrations-Status und Versionsinformationen
migration_info = {"status": "Unbekannt", "version": None, "details": {}}
try:
# Alembic-Version prüfen
try:
result = db_session.execute(text("SELECT version_num FROM alembic_version ORDER BY version_num DESC LIMIT 1")).fetchone()
if result:
migration_info["version"] = result[0]
migration_info["status"] = "Alembic-Migration aktiv"
migration_info["details"]["alembic"] = True
else:
migration_info["status"] = "Keine Alembic-Migration gefunden"
migration_info["details"]["alembic"] = False
except Exception:
# Fallback: Schema-Informationen sammeln
try:
# SQLite-spezifische Abfrage
tables_result = db_session.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall()
if tables_result:
table_list = [row[0] for row in tables_result]
migration_info["status"] = f"Schema mit {len(table_list)} Tabellen erkannt"
migration_info["details"]["detected_tables"] = table_list
migration_info["details"]["alembic"] = False
else:
migration_info["status"] = "Keine Tabellen erkannt"
except Exception:
# Weitere Datenbank-Engines
migration_info["status"] = "Schema-Erkennung nicht möglich"
migration_info["details"]["alembic"] = False
except Exception as e:
migration_info["status"] = f"FEHLER: {str(e)[:100]}"
app_logger.error(f"Migrations-Statusprüfung fehlgeschlagen: {str(e)}")
# 4. Performance-Benchmarks
performance_info = {"status": "OK", "benchmarks": {}, "overall_score": 100}
try:
benchmarks = {}
# Einfache Select-Query
start = time.time()
db_session.execute(text("SELECT COUNT(*) FROM users")).fetchone()
benchmarks["simple_select"] = round((time.time() - start) * 1000, 2)
# Join-Query (falls möglich)
try:
start = time.time()
db_session.execute(text("SELECT u.username, COUNT(j.id) FROM users u LEFT JOIN jobs j ON u.id = j.user_id GROUP BY u.id LIMIT 5")).fetchall()
benchmarks["join_query"] = round((time.time() - start) * 1000, 2)
except Exception:
benchmarks["join_query"] = None
# Insert/Update-Performance simulieren
try:
start = time.time()
db_session.execute(text("SELECT 1 WHERE EXISTS (SELECT 1 FROM users LIMIT 1)")).fetchone()
benchmarks["exists_check"] = round((time.time() - start) * 1000, 2)
except Exception:
benchmarks["exists_check"] = None
performance_info["benchmarks"] = benchmarks
# Performance-Score berechnen
avg_time = sum(t for t in benchmarks.values() if t is not None) / len([t for t in benchmarks.values() if t is not None])
if avg_time < 10:
performance_info["status"] = "AUSGEZEICHNET"
performance_info["overall_score"] = 100
elif avg_time < 50:
performance_info["status"] = "GUT"
performance_info["overall_score"] = 85
elif avg_time < 200:
performance_info["status"] = "AKZEPTABEL"
performance_info["overall_score"] = 70
elif avg_time < 1000:
performance_info["status"] = "LANGSAM"
performance_info["overall_score"] = 50
else:
performance_info["status"] = "SEHR LANGSAM"
performance_info["overall_score"] = 25
except Exception as pool_error:
app_logger.warning(f"Fehler beim Abrufen des Pool-Status: {str(pool_error)}")
pool_status = {
'pool_size': 'Error',
'checked_in': 'Error',
'checked_out': 'Error',
'overflow': 'Error',
'invalid': 'Error',
'pool_type': type(engine.pool).__name__,
'error': str(pool_error)
}
except Exception as e:
performance_info["status"] = f"FEHLER: {str(e)[:100]}"
performance_info["overall_score"] = 0
app_logger.error(f"Performance-Benchmark fehlgeschlagen: {str(e)}")
# 5. Datenbankgröße und Speicher-Informationen
storage_info = {"size": "Unbekannt", "details": {}}
try:
# SQLite-Datei-Größe
db_uri = current_app.config.get('SQLALCHEMY_DATABASE_URI', '')
if 'sqlite:///' in db_uri:
db_file_path = db_uri.replace('sqlite:///', '')
if os.path.exists(db_file_path):
file_size = os.path.getsize(db_file_path)
storage_info["size"] = f"{file_size / (1024 * 1024):.2f} MB"
storage_info["details"]["file_path"] = db_file_path
storage_info["details"]["last_modified"] = datetime.fromtimestamp(os.path.getmtime(db_file_path)).isoformat()
# Speicherplatz-Warnung
try:
import shutil
total, used, free = shutil.disk_usage(os.path.dirname(db_file_path))
free_gb = free / (1024**3)
storage_info["details"]["disk_free_gb"] = round(free_gb, 2)
if free_gb < 1:
storage_info["warning"] = "Kritisch wenig Speicherplatz"
elif free_gb < 5:
storage_info["warning"] = "Wenig Speicherplatz verfügbar"
except Exception:
pass
else:
# Für andere Datenbanken: Versuche Größe über Metadaten zu ermitteln
storage_info["size"] = "Externe Datenbank"
storage_info["details"]["database_type"] = "Nicht-SQLite"
except Exception as e:
storage_info["size"] = f"FEHLER: {str(e)[:50]}"
app_logger.warning(f"Speicher-Informationen nicht verfügbar: {str(e)}")
# 6. Aktuelle Verbindungs-Pool-Informationen
connection_pool_info = {"status": "Nicht verfügbar", "details": {}}
try:
# SQLAlchemy Pool-Status (falls verfügbar)
engine = db_session.get_bind()
if hasattr(engine, 'pool'):
pool = engine.pool
connection_pool_info["details"]["pool_size"] = getattr(pool, 'size', lambda: 'N/A')()
connection_pool_info["details"]["checked_in"] = getattr(pool, 'checkedin', lambda: 'N/A')()
connection_pool_info["details"]["checked_out"] = getattr(pool, 'checkedout', lambda: 'N/A')()
connection_pool_info["status"] = "Pool aktiv"
else:
connection_pool_info["status"] = "Kein Pool konfiguriert"
except Exception as e:
connection_pool_info["status"] = f"Pool-Status nicht verfügbar: {str(e)[:50]}"
db_session.close()
# Status bewerten
status = 'healthy'
issues = []
# Gesamtstatus ermitteln
overall_status = "healthy"
health_score = 100
critical_issues = []
warnings = []
if pragma_info.get('integrity_check') != 'ok':
status = 'warning'
issues.append('Datenbank-Integritätsprüfung fehlgeschlagen')
# Kritische Probleme
if "FEHLER" in connection_status:
overall_status = "critical"
health_score -= 50
critical_issues.append("Datenbankverbindung fehlgeschlagen")
if "FEHLER" in schema_status["status"]:
overall_status = "critical"
health_score -= 30
critical_issues.append("Schema-Integrität kompromittiert")
if performance_info["overall_score"] < 25:
overall_status = "critical" if overall_status != "critical" else overall_status
health_score -= 25
critical_issues.append("Extreme Performance-Probleme")
if db_info['file_size_mb'] > 100: # Warnung bei >100MB
issues.append(f"Große Datenbankdatei: {db_info['file_size_mb']}MB")
# Warnungen
if "WARNUNG" in schema_status["status"] or len(schema_status["missing_tables"]) > 0:
if overall_status == "healthy":
overall_status = "warning"
health_score -= 15
warnings.append(f"Schema-Probleme: {len(schema_status['missing_tables'])} fehlende Tabellen")
if "LANGSAM" in connection_status:
if overall_status == "healthy":
overall_status = "warning"
health_score -= 10
warnings.append("Langsame Datenbankverbindung")
if "warning" in storage_info:
if overall_status == "healthy":
overall_status = "warning"
health_score -= 15
warnings.append(storage_info["warning"])
if pragma_info.get('freelist_count', 0) > 1000:
issues.append('Hohe Anzahl freier Seiten - VACUUM empfohlen')
health_score = max(0, health_score) # Nicht unter 0
total_time = round((time.time() - start_time) * 1000, 2)
result = {
"success": True,
"status": overall_status,
"health_score": health_score,
"critical_issues": critical_issues,
"warnings": warnings,
"connection": {
"status": connection_status,
"response_time_ms": connection_time_ms
},
"schema": schema_status,
"migration": migration_info,
"performance": performance_info,
"storage": storage_info,
"connection_pool": connection_pool_info,
"timestamp": datetime.now().isoformat(),
"check_duration_ms": total_time,
"summary": {
"database_responsive": "FEHLER" not in connection_status,
"schema_complete": len(schema_status["missing_tables"]) == 0,
"performance_acceptable": performance_info["overall_score"] >= 50,
"storage_adequate": "warning" not in storage_info,
"overall_healthy": overall_status == "healthy"
}
}
app_logger.info(f"Datenbank-Gesundheitscheck abgeschlossen: Status={overall_status}, Score={health_score}, Dauer={total_time}ms")
return jsonify(result)
return jsonify({
'success': True,
'status': status,
'issues': issues,
'database_info': db_info,
'pragma_info': pragma_info,
'tables': tables,
'table_stats': table_stats,
'connection_pool': pool_status,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Datenbank-Status: {str(e)}")
app_logger.error(f"Kritischer Fehler beim Datenbank-Gesundheitscheck: {str(e)}")
return jsonify({
'success': False,
'error': f'Fehler beim Abrufen des Datenbank-Status: {str(e)}',
'status': 'error'
"success": False,
"error": f"Kritischer Systemfehler: {str(e)}",
"status": "critical",
"health_score": 0,
"critical_issues": ["System-Gesundheitscheck fehlgeschlagen"],
"warnings": [],
"connection": {"status": "FEHLER bei der Prüfung"},
"schema": {"status": "FEHLER bei der Prüfung"},
"migration": {"status": "FEHLER bei der Prüfung"},
"performance": {"status": "FEHLER bei der Prüfung"},
"storage": {"size": "FEHLER bei der Prüfung"},
"timestamp": datetime.now().isoformat(),
"summary": {
"database_responsive": False,
"schema_complete": False,
"performance_acceptable": False,
"storage_adequate": False,
"overall_healthy": False
}
}), 500
@app.route("/api/admin/system/status", methods=['GET'])
@@ -7313,7 +7756,6 @@ def api_admin_system_status():
# ===== ÖFFENTLICHE STATISTIK-API =====
@app.route("/api/statistics/public", methods=['GET'])
def api_public_statistics():
"""