Es scheint, dass Sie eine Reihe von Dateien und Verzeichnissen in einem Backend-Projekt bearbeitet haben. Hier ist eine Zusammenfassung der Änderungen:

This commit is contained in:
Tomczak
2025-06-19 11:49:24 +02:00
parent c16bcca9e6
commit 9bf89f8ddb
389 changed files with 6135 additions and 2886 deletions

View File

@ -1,33 +1,40 @@
"""
Vereinheitlichter Admin-Blueprint für das MYP 3D-Druck-Management-System
Vereinheitlichtes Admin-Blueprint für das MYP System
Konsolidierte Implementierung aller Admin-spezifischen Funktionen:
- Benutzerverwaltung und Systemüberwachung (ursprünglich admin.py)
- Erweiterte System-API-Funktionen (ursprünglich admin_api.py)
- System-Backups, Datenbank-Optimierung, Cache-Verwaltung
- Steckdosenschaltzeiten-Übersicht und -verwaltung
Konsolidiert alle administrativen Funktionen in einem einzigen Blueprint:
- Admin-Dashboard und Übersichtsseiten
- Benutzer- und Druckerverwaltung
- System-Wartung und -überwachung
- API-Endpunkte für alle Admin-Funktionen
Optimierungen:
- Vereinheitlichter admin_required Decorator
- Konsistente Fehlerbehandlung und Logging
- Vollständige API-Kompatibilität zu beiden ursprünglichen Blueprints
Optimiert für die Mercedes-Benz TBA Marienfelde Umgebung mit:
- Einheitlichem Error-Handling und Logging
- Konsistentem Session-Management
- Vollständiger API-Kompatibilität
Autor: MYP Team - Konsolidiert für IHK-Projektarbeit
Datum: 2025-06-09
"""
import os
import shutil
import zipfile
import sqlite3
import glob
import json
import time
import zipfile
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, current_app
from flask_login import login_required, current_user
from functools import wraps
from models import User, Printer, Job, get_cached_session, Stats, SystemLog, PlugStatusLog, GuestRequest
from utils.logging_config import get_logger
from flask import Blueprint, render_template, request, jsonify, flash, redirect, url_for, current_app
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from sqlalchemy import text, func, desc, asc
from sqlalchemy.exc import SQLAlchemyError
# Models und Utils importieren
from models import (
User, UserPermission, Printer, Job, GuestRequest, SystemLog,
get_db_session, get_cached_session, PlugStatusLog
)
from utils.logging_config import get_logger, measure_execution_time
# ===== BLUEPRINT-KONFIGURATION =====
@ -110,12 +117,18 @@ def admin_dashboard():
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
# Online-Drucker zählen (ohne Live-Status-Check für bessere Performance)
online_printers = db_session.query(Printer).filter(
Printer.status == 'online'
).count()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
'active_jobs': active_jobs,
'online_printers': online_printers
}
admin_logger.info(f"Admin-Dashboard geladen von {current_user.username}")
@ -181,7 +194,8 @@ def users_overview():
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
'active_jobs': active_jobs,
'online_printers': 0
}
admin_logger.info(f"Benutzerübersicht geladen von {current_user.username}")
@ -374,7 +388,8 @@ def system_health():
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
'active_jobs': active_jobs,
'online_printers': 0
}
admin_logger.info(f"System-Health geladen von {current_user.username}")
@ -411,7 +426,8 @@ def logs_overview():
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
'active_jobs': active_jobs,
'online_printers': 0
}
admin_logger.info(f"Logs-Übersicht geladen von {current_user.username}")
@ -422,10 +438,52 @@ def logs_overview():
flash("Fehler beim Laden der Log-Daten", "error")
return render_template('admin.html', stats={}, logs=[], active_tab='logs')
@admin_blueprint.route("/maintenance")
@admin_blueprint.route("/maintenance", methods=["GET", "POST"])
@admin_required
def maintenance():
"""Wartungsseite"""
"""Wartungsseite und Wartungsaktionen"""
# POST-Request: Wartungsaktion ausführen
if request.method == "POST":
action = request.form.get('action')
admin_logger.info(f"Wartungsaktion '{action}' von {current_user.username} ausgeführt")
try:
if action == 'clear_cache':
# Cache leeren
from models import clear_cache
clear_cache()
flash("Cache erfolgreich geleert", "success")
elif action == 'optimize_db':
# Datenbank optimieren
from models import engine
with engine.connect() as conn:
conn.execute(text("PRAGMA optimize"))
conn.execute(text("VACUUM"))
flash("Datenbank erfolgreich optimiert", "success")
elif action == 'create_backup':
# Backup erstellen
try:
from utils.backup_manager import BackupManager
backup_manager = BackupManager()
backup_path = backup_manager.create_backup()
flash(f"Backup erfolgreich erstellt: {backup_path}", "success")
except ImportError:
flash("Backup-System nicht verfügbar", "warning")
except Exception as backup_error:
flash(f"Backup-Fehler: {str(backup_error)}", "error")
else:
flash("Unbekannte Wartungsaktion", "error")
except Exception as e:
admin_logger.error(f"Fehler bei Wartungsaktion '{action}': {str(e)}")
flash(f"Fehler bei Wartungsaktion: {str(e)}", "error")
return redirect(url_for('admin.maintenance'))
# GET-Request: Wartungsseite anzeigen
try:
with get_cached_session() as db_session:
# Grundlegende Statistiken sammeln
@ -442,7 +500,8 @@ def maintenance():
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
'active_jobs': active_jobs,
'online_printers': 0
}
admin_logger.info(f"Wartungsseite geladen von {current_user.username}")
@ -460,21 +519,45 @@ def maintenance():
def create_user_api():
"""API-Endpunkt zum Erstellen eines neuen Benutzers"""
try:
data = request.get_json()
# Sowohl JSON als auch Form-Daten unterstützen
if request.is_json:
data = request.get_json()
else:
data = request.form.to_dict()
# Checkbox-Werte korrekt parsen
for key in ['can_start_jobs', 'needs_approval', 'can_approve_jobs']:
if key in data:
data[key] = data[key] in ['true', 'on', '1', True]
admin_logger.info(f"Benutzer-Erstellung angefordert von {current_user.username}: {data.get('username', 'unknown')}")
# Validierung der erforderlichen Felder
required_fields = ['username', 'email', 'password', 'name']
for field in required_fields:
if field not in data or not data[field]:
admin_logger.error(f"Erforderliches Feld '{field}' fehlt bei Benutzer-Erstellung")
return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400
with get_cached_session() as db_session:
# Datenvalidierung
if len(data['username']) < 3:
return jsonify({"error": "Benutzername muss mindestens 3 Zeichen lang sein"}), 400
if len(data['password']) < 8:
return jsonify({"error": "Passwort muss mindestens 8 Zeichen lang sein"}), 400
if '@' not in data['email']:
return jsonify({"error": "Ungültige E-Mail-Adresse"}), 400
# Datenbank-Session korrekt verwenden
db_session = get_db_session()
try:
# Überprüfung auf bereits existierende Benutzer
existing_user = db_session.query(User).filter(
(User.username == data['username']) | (User.email == data['email'])
).first()
if existing_user:
admin_logger.warning(f"Benutzer-Erstellung fehlgeschlagen: Benutzername oder E-Mail bereits vergeben")
return jsonify({"error": "Benutzername oder E-Mail bereits vergeben"}), 400
# Neuen Benutzer erstellen
@ -486,7 +569,9 @@ def create_user_api():
department=data.get('department'),
position=data.get('position'),
phone=data.get('phone'),
bio=data.get('bio')
bio=data.get('bio'),
active=True,
created_at=datetime.now()
)
new_user.set_password(data['password'])
@ -511,16 +596,25 @@ def create_user_api():
db_session.add(permissions)
db_session.commit()
admin_logger.info(f"Neuer Benutzer erstellt: {new_user.username} von Admin {current_user.username}")
admin_logger.info(f"Neuer Benutzer erfolgreich erstellt: {new_user.username} (ID: {new_user.id}) von Admin {current_user.username}")
return jsonify({
"success": True,
"message": "Benutzer erfolgreich erstellt",
"user_id": new_user.id
"user_id": new_user.id,
"username": new_user.username,
"role": new_user.role
})
except Exception as db_error:
admin_logger.error(f"❌ Datenbankfehler bei Benutzer-Erstellung: {str(db_error)}")
db_session.rollback()
return jsonify({"error": "Datenbankfehler beim Erstellen des Benutzers"}), 500
finally:
db_session.close()
except Exception as e:
admin_logger.error(f"Fehler beim Erstellen des Benutzers: {str(e)}")
admin_logger.error(f"❌ Allgemeiner Fehler bei Benutzer-Erstellung: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Benutzers"}), 500
@admin_api_blueprint.route("/users/<int:user_id>", methods=["GET"])
@ -845,108 +939,108 @@ def create_backup():
@admin_api_blueprint.route('/printers/<int:printer_id>/toggle', methods=['POST'])
@admin_required
def toggle_printer_power(printer_id):
"""
Schaltet die Smart-Plug-Steckdose eines Druckers ein/aus (Toggle-Funktion).
Args:
printer_id: ID des zu steuernden Druckers
JSON-Parameter:
- reason: Grund für die Schaltung (optional)
Returns:
JSON mit Ergebnis der Toggle-Aktion
"""
admin_api_logger.info(f"🔌 Smart-Plug Toggle für Drucker {printer_id} von Admin {current_user.name}")
"""Schaltet die Steckdose eines Druckers ein oder aus"""
try:
# Parameter auslesen
data = request.get_json() or {}
reason = data.get("reason", "Admin-Panel Toggle")
from models import get_db_session, Printer, PlugStatusLog
from utils.hardware_integration import get_tapo_controller
from sqlalchemy import text
# Drucker aus Datenbank holen
db_session = get_cached_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
admin_logger.info(f"🔌 Smart-Plug Toggle für Drucker {printer_id} von Admin {current_user.name}")
if not printer:
return jsonify({
"success": False,
"error": f"Drucker mit ID {printer_id} nicht gefunden"
}), 404
# Prüfen, ob Drucker eine Steckdose konfiguriert hat
if not printer.plug_ip or not printer.plug_username or not printer.plug_password:
return jsonify({
"success": False,
"error": f"Drucker {printer.name} hat keine Steckdose konfiguriert"
}), 400
# Request-Daten parsen
if request.is_json:
data = request.get_json()
action = data.get('action', 'toggle')
else:
action = request.form.get('action', 'toggle')
# Aktuellen Status der Steckdose ermitteln
# Drucker aus Datenbank laden
db_session = get_db_session()
try:
from PyP100 import PyP110
p110 = PyP110.P110(printer.plug_ip, printer.plug_username, printer.plug_password)
p110.handshake()
p110.login()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
# Aktuellen Status abrufen
device_info = p110.getDeviceInfo()
current_status = device_info["result"]["device_on"]
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Toggle-Aktion durchführen
if current_status:
# Ausschalten
p110.turnOff()
new_status = "off"
action = "ausgeschaltet"
printer.status = "offline"
if not printer.plug_ip:
return jsonify({"error": "Keine Steckdose für diesen Drucker konfiguriert"}), 400
# Tapo-Controller holen
tapo_controller = get_tapo_controller()
# Aktueller Status der Steckdose prüfen
is_reachable, current_status = tapo_controller.check_outlet_status(printer.plug_ip, printer_id=printer_id)
if not is_reachable:
# Status auf offline setzen
printer.status = 'offline'
printer.last_checked = datetime.now()
db_session.commit()
return jsonify({
"error": f"Steckdose {printer.plug_ip} nicht erreichbar",
"printer_status": "offline"
}), 400
# Neue Aktion bestimmen
if action == 'toggle':
new_state = not (current_status == 'on')
elif action in ['on', 'off']:
new_state = (action == 'on')
else:
# Einschalten
p110.turnOn()
new_status = "on"
action = "eingeschaltet"
printer.status = "starting"
return jsonify({"error": "Ungültige Aktion"}), 400
# Drucker-Status in DB aktualisieren
printer.last_checked = datetime.now()
db_session.commit()
# Steckdose schalten
success = tapo_controller.toggle_plug(printer.plug_ip, new_state)
admin_api_logger.info(f"✅ Drucker {printer.name} erfolgreich {action} | Grund: {reason}")
return jsonify({
"success": True,
"message": f"Drucker {printer.name} erfolgreich {action}",
"printer": {
"id": printer_id,
"name": printer.name,
"model": printer.model,
"location": printer.location
},
"toggle_result": {
"previous_status": "on" if current_status else "off",
if success:
# Drucker-Status aktualisieren
new_status = 'busy' if new_state else 'idle'
printer.status = new_status
printer.last_checked = datetime.now()
printer.updated_at = datetime.now()
# Status-Änderung protokollieren - MIT korrekter Drucker-ID
try:
PlugStatusLog.log_status_change(
printer_id=printer_id, # KORRIGIERT: Explizit Drucker-ID übergeben
status='on' if new_state else 'off',
source='admin',
user_id=current_user.id,
ip_address=printer.plug_ip,
notes=f"Toggle durch Admin {current_user.name}"
)
except Exception as log_error:
admin_logger.error(f"❌ Status-Protokollierung fehlgeschlagen: {str(log_error)}")
# Weiter machen, auch wenn Protokollierung fehlschlägt
db_session.commit()
admin_logger.info(f"✅ Drucker {printer_id} erfolgreich {'eingeschaltet' if new_state else 'ausgeschaltet'}")
return jsonify({
"success": True,
"message": f"Drucker erfolgreich {'eingeschaltet' if new_state else 'ausgeschaltet'}",
"printer_id": printer_id,
"new_status": new_status,
"action": action,
"reason": reason
},
"performed_by": {
"id": current_user.id,
"name": current_user.name
},
"timestamp": datetime.now().isoformat()
})
except Exception as tapo_error:
admin_api_logger.error(f"❌ Tapo-Fehler für Drucker {printer.name}: {str(tapo_error)}")
return jsonify({
"success": False,
"error": f"Fehler bei Steckdosensteuerung: {str(tapo_error)}"
}), 500
"plug_status": 'on' if new_state else 'off'
})
else:
return jsonify({
"error": f"Fehler beim Schalten der Steckdose",
"printer_id": printer_id
}), 500
except Exception as db_error:
admin_logger.error(f"❌ Datenbankfehler bei Toggle-Aktion: {str(db_error)}")
db_session.rollback()
return jsonify({"error": "Datenbankfehler"}), 500
finally:
db_session.close()
except Exception as e:
admin_api_logger.error(f"❌ Allgemeiner Fehler bei Toggle-Aktion: {str(e)}")
return jsonify({
"success": False,
"error": f"Systemfehler: {str(e)}"
}), 500
admin_logger.error(f"❌ Allgemeiner Fehler bei Toggle-Aktion: {str(e)}")
return jsonify({"error": f"Systemfehler: {str(e)}"}), 500
@admin_api_blueprint.route('/database/optimize', methods=['POST'])
@admin_required
@ -2121,103 +2215,154 @@ def api_admin_live_stats():
@admin_required
def api_admin_system_health():
"""
API-Endpunkt für System-Health-Check
Detaillierte System-Gesundheitsprüfung für das Admin-Panel.
Überprüft verschiedene System-Komponenten:
- Datenbank-Verbindung
- Dateisystem
- Speicherplatz
- Service-Status
Testet alle kritischen Systemkomponenten und gibt strukturierte
Gesundheitsinformationen zurück.
Returns:
JSON mit detaillierten System-Health-Informationen
"""
admin_logger.info(f"System-Health-Check durchgeführt von {current_user.username}")
try:
from models import get_db_session
from sqlalchemy import text
import os
import time
health_status = {
'database': 'unknown',
'filesystem': 'unknown',
'storage': {},
'services': {},
'timestamp': datetime.now().isoformat()
"overall_status": "healthy",
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# Datenbank-Check
# 1. Datenbank-Health-Check
try:
with get_cached_session() as db_session:
# Einfacher Query-Test
db_session.execute("SELECT 1")
health_status['database'] = 'healthy'
except Exception as db_error:
health_status['database'] = 'unhealthy'
admin_api_logger.error(f"Datenbank-Health-Check fehlgeschlagen: {str(db_error)}")
# Dateisystem-Check
try:
# Prüfe wichtige Verzeichnisse
important_dirs = [
'backend/uploads',
'backend/database',
'backend/logs'
]
db_session = get_db_session()
start_time = time.time()
all_accessible = True
for dir_path in important_dirs:
if not os.path.exists(dir_path) or not os.access(dir_path, os.W_OK):
all_accessible = False
break
# KORRIGIERT: Verwende text() für SQL-Ausdruck
db_session.execute(text("SELECT 1"))
db_response_time = round((time.time() - start_time) * 1000, 2)
health_status['filesystem'] = 'healthy' if all_accessible else 'unhealthy'
except Exception as fs_error:
health_status['filesystem'] = 'unhealthy'
admin_api_logger.error(f"Dateisystem-Health-Check fehlgeschlagen: {str(fs_error)}")
# Speicherplatz-Check
try:
statvfs = os.statvfs('.')
total_space = statvfs.f_blocks * statvfs.f_frsize
free_space = statvfs.f_bavail * statvfs.f_frsize
used_space = total_space - free_space
db_session.close()
health_status['storage'] = {
'total_gb': round(total_space / (1024**3), 2),
'used_gb': round(used_space / (1024**3), 2),
'free_gb': round(free_space / (1024**3), 2),
'percent_used': round((used_space / total_space) * 100, 1)
health_status["checks"]["database"] = {
"status": "healthy",
"response_time_ms": db_response_time,
"message": "Datenbank ist erreichbar"
}
except Exception as storage_error:
admin_api_logger.error(f"Speicherplatz-Check fehlgeschlagen: {str(storage_error)}")
except Exception as db_error:
admin_logger.error(f"Datenbank-Health-Check fehlgeschlagen: {str(db_error)}")
health_status["checks"]["database"] = {
"status": "critical",
"error": str(db_error),
"message": "Datenbank nicht erreichbar"
}
health_status["overall_status"] = "unhealthy"
# Service-Status (vereinfacht)
health_status['services'] = {
'web_server': 'running', # Immer running, da wir antworten
'job_scheduler': 'unknown', # Könnte später implementiert werden
'tapo_controller': 'unknown' # Könnte später implementiert werden
}
# 2. Speicherplatz-Check (Windows-kompatibel)
try:
import shutil
disk_usage = shutil.disk_usage('.')
free_space_gb = disk_usage.free / (1024**3)
total_space_gb = disk_usage.total / (1024**3)
used_percent = ((disk_usage.total - disk_usage.free) / disk_usage.total) * 100
if used_percent > 90:
disk_status = "critical"
health_status["overall_status"] = "unhealthy"
elif used_percent > 80:
disk_status = "warning"
if health_status["overall_status"] == "healthy":
health_status["overall_status"] = "warning"
else:
disk_status = "healthy"
health_status["checks"]["disk_space"] = {
"status": disk_status,
"free_space_gb": round(free_space_gb, 2),
"total_space_gb": round(total_space_gb, 2),
"used_percent": round(used_percent, 1),
"message": f"Speicherplatz: {round(used_percent, 1)}% belegt"
}
except Exception as disk_error:
admin_logger.error(f"Speicherplatz-Check fehlgeschlagen: {str(disk_error)}")
health_status["checks"]["disk_space"] = {
"status": "warning",
"error": str(disk_error),
"message": "Speicherplatz-Information nicht verfügbar"
}
# Gesamt-Status berechnen
if health_status['database'] == 'healthy' and health_status['filesystem'] == 'healthy':
overall_status = 'healthy'
elif health_status['database'] == 'unhealthy' or health_status['filesystem'] == 'unhealthy':
overall_status = 'unhealthy'
else:
overall_status = 'degraded'
# 3. Tapo-Controller-Health-Check
try:
from utils.hardware_integration import get_tapo_controller
tapo_controller = get_tapo_controller()
# Teste mit einer beispiel-IP
test_result = tapo_controller.is_plug_reachable("192.168.0.100")
health_status["checks"]["tapo_controller"] = {
"status": "healthy",
"message": "Tapo-Controller verfügbar",
"test_result": test_result
}
except Exception as tapo_error:
health_status["checks"]["tapo_controller"] = {
"status": "warning",
"error": str(tapo_error),
"message": "Tapo-Controller Problem"
}
health_status['overall'] = overall_status
# 4. Session-System-Check
try:
from flask import session
session_test = session.get('_id', 'unknown')
health_status["checks"]["session_system"] = {
"status": "healthy",
"message": "Session-System funktionsfähig",
"session_id": session_test[:8] + "..." if len(session_test) > 8 else session_test
}
except Exception as session_error:
health_status["checks"]["session_system"] = {
"status": "warning",
"error": str(session_error),
"message": "Session-System Problem"
}
admin_api_logger.info(f"System-Health-Check durchgeführt: {overall_status}")
# 5. Logging-System-Check
try:
admin_logger.debug("Health-Check Test-Log-Eintrag")
health_status["checks"]["logging_system"] = {
"status": "healthy",
"message": "Logging-System funktionsfähig"
}
except Exception as log_error:
health_status["checks"]["logging_system"] = {
"status": "warning",
"error": str(log_error),
"message": "Logging-System Problem"
}
admin_logger.info(f"System-Health-Check durchgeführt: {health_status['overall_status']}")
return jsonify({
'success': True,
'health': health_status,
'message': f'System-Status: {overall_status}'
"success": True,
"health": health_status
})
except Exception as e:
admin_api_logger.error(f"Fehler beim System-Health-Check: {str(e)}")
admin_logger.error(f"Allgemeiner Fehler beim System-Health-Check: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Health-Check',
'message': str(e),
'health': {
'overall': 'error',
'timestamp': datetime.now().isoformat()
"success": False,
"error": "Fehler beim System-Health-Check",
"details": str(e),
"health": {
"overall_status": "critical",
"timestamp": datetime.now().isoformat(),
"checks": {}
}
}), 500
@ -2285,134 +2430,165 @@ def api_admin_error_recovery_status():
"""
API-Endpunkt für Error-Recovery-Status.
Gibt Informationen über das Error-Recovery-System zurück,
einschließlich Status, Statistiken und letzter Aktionen.
Bietet detaillierte Informationen über:
- Systemfehler-Status
- Recovery-Mechanismen
- Fehlerbehebungsempfehlungen
- Auto-Recovery-Status
Returns:
JSON mit Error-Recovery-Informationen
"""
admin_logger.info(f"Error-Recovery-Status angefordert von {current_user.username}")
try:
admin_api_logger.info(f"Error-Recovery-Status angefordert von {current_user.username}")
from models import get_db_session
from sqlalchemy import text
import os
# Error-Recovery-Basis-Status sammeln
recovery_status = {
'enabled': True, # Error-Recovery ist standardmäßig aktiviert
'last_check': datetime.now().isoformat(),
'status': 'active',
'errors_detected': 0,
'errors_recovered': 0,
'last_recovery_action': None,
'monitoring_active': True,
'recovery_methods': [
'automatic_restart',
'service_health_check',
'database_recovery',
'cache_cleanup'
]
"overall_status": "stable",
"timestamp": datetime.now().isoformat(),
"error_levels": {
"critical": 0,
"warning": 0,
"info": 0
},
"components": {},
"recommendations": []
}
# Versuche Log-Informationen zu sammeln
# 1. Datenbank-Gesundheit für Error-Recovery
try:
# Prüfe auf kürzliche Fehler in System-Logs
with get_cached_session() as db_session:
# Letzte Stunde nach Error-Logs suchen
last_hour = datetime.now() - timedelta(hours=1)
error_logs = db_session.query(SystemLog).filter(
SystemLog.level == 'ERROR',
SystemLog.timestamp >= last_hour
).count()
recovery_logs = db_session.query(SystemLog).filter(
SystemLog.message.like('%Recovery%'),
SystemLog.timestamp >= last_hour
).count()
recovery_status['errors_detected'] = error_logs
recovery_status['errors_recovered'] = recovery_logs
# Letzten Recovery-Eintrag finden
last_recovery = db_session.query(SystemLog).filter(
SystemLog.message.like('%Recovery%')
).order_by(SystemLog.timestamp.desc()).first()
if last_recovery:
recovery_status['last_recovery_action'] = {
'timestamp': last_recovery.timestamp.isoformat(),
'action': 'system_log_recovery',
'message': last_recovery.message,
'module': last_recovery.module
}
except Exception as log_error:
admin_api_logger.warning(f"Log-Analyse für Error-Recovery fehlgeschlagen: {str(log_error)}")
recovery_status['errors_detected'] = 0
recovery_status['errors_recovered'] = 0
# System-Load als Indikator für potenzielle Probleme
try:
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
db_session = get_db_session()
# KORRIGIERT: Verwende text() für SQL-Ausdruck
db_session.execute(text("SELECT 1"))
db_session.close()
# Hohe System-Last kann auf Probleme hindeuten
if cpu_percent > 80 or memory_percent > 85:
recovery_status['status'] = 'warning'
recovery_status['last_recovery_action'] = {
'timestamp': datetime.now().isoformat(),
'action': 'system_load_warning',
'details': {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent
}
}
# System-Performance-Daten hinzufügen
recovery_status['system_performance'] = {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'status': 'normal' if cpu_percent < 80 and memory_percent < 85 else 'high_load'
recovery_status["components"]["database"] = {
"status": "healthy",
"message": "Datenbank verfügbar"
}
except ImportError:
admin_api_logger.info("psutil nicht verfügbar für Error-Recovery-Monitoring")
recovery_status['system_performance'] = {
'available': False,
'message': 'psutil-Bibliothek nicht installiert'
}
except Exception as system_error:
admin_api_logger.warning(f"System-Load-Check für Error-Recovery fehlgeschlagen: {str(system_error)}")
recovery_status['system_performance'] = {
'available': False,
'error': str(system_error)
}
# Datenbank-Gesundheit als Recovery-Indikator
try:
with get_cached_session() as db_session:
# Einfacher DB-Test
db_session.execute("SELECT 1")
recovery_status['database_health'] = 'healthy'
except Exception as db_error:
recovery_status['database_health'] = 'unhealthy'
recovery_status['status'] = 'critical'
admin_api_logger.error(f"Datenbank-Health-Check für Error-Recovery fehlgeschlagen: {str(db_error)}")
admin_logger.error(f"Datenbank-Health-Check für Error-Recovery fehlgeschlagen: {str(db_error)}")
recovery_status["components"]["database"] = {
"status": "critical",
"error": str(db_error),
"message": "Datenbank nicht verfügbar"
}
recovery_status["error_levels"]["critical"] += 1
recovery_status["overall_status"] = "critical"
recovery_status["recommendations"].append("Datenbank-Verbindung prüfen und neu starten")
admin_api_logger.info(f"Error-Recovery-Status abgerufen: {recovery_status['status']}")
# 2. Log-Dateien-Status
try:
log_dirs = ["logs/admin_api", "logs/app", "logs/tapo_control"]
log_status = "healthy"
for log_dir in log_dirs:
if not os.path.exists(log_dir):
log_status = "warning"
recovery_status["error_levels"]["warning"] += 1
break
recovery_status["components"]["logging"] = {
"status": log_status,
"message": "Logging-System verfügbar" if log_status == "healthy" else "Einige Log-Verzeichnisse fehlen"
}
if log_status == "warning":
recovery_status["recommendations"].append("Log-Verzeichnisse prüfen und erstellen")
except Exception as log_error:
recovery_status["components"]["logging"] = {
"status": "warning",
"error": str(log_error),
"message": "Log-System Problem"
}
recovery_status["error_levels"]["warning"] += 1
# 3. Session-Management
try:
from flask import session
session_test = session.get('_id', None)
recovery_status["components"]["session_management"] = {
"status": "healthy",
"message": "Session-System funktionsfähig",
"active_session": bool(session_test)
}
except Exception as session_error:
recovery_status["components"]["session_management"] = {
"status": "warning",
"error": str(session_error),
"message": "Session-System Problem"
}
recovery_status["error_levels"]["warning"] += 1
recovery_status["recommendations"].append("Session-System neu starten")
# 4. Tapo-Controller-Status
try:
from utils.hardware_integration import get_tapo_controller
tapo_controller = get_tapo_controller()
recovery_status["components"]["tapo_controller"] = {
"status": "healthy",
"message": "Tapo-Controller verfügbar"
}
except Exception as tapo_error:
recovery_status["components"]["tapo_controller"] = {
"status": "warning",
"error": str(tapo_error),
"message": "Tapo-Controller nicht verfügbar"
}
recovery_status["error_levels"]["warning"] += 1
recovery_status["recommendations"].append("Tapo-Controller-Konfiguration prüfen")
# 5. Auto-Recovery-Mechanismen
recovery_status["auto_recovery"] = {
"enabled": True,
"mechanisms": [
"Automatische Datenbank-Reconnection",
"Session-Cleanup bei Fehlern",
"Tapo-Connection-Retry",
"Graceful Error-Handling"
],
"last_recovery": "Nicht verfügbar"
}
# 6. Gesamt-Status bestimmen
total_errors = sum(recovery_status["error_levels"].values())
if recovery_status["error_levels"]["critical"] > 0:
recovery_status["overall_status"] = "critical"
elif recovery_status["error_levels"]["warning"] > 2:
recovery_status["overall_status"] = "degraded"
elif recovery_status["error_levels"]["warning"] > 0:
recovery_status["overall_status"] = "warning"
else:
recovery_status["overall_status"] = "stable"
# 7. Allgemeine Empfehlungen hinzufügen
if total_errors == 0:
recovery_status["recommendations"].append("System läuft stabil - keine Maßnahmen erforderlich")
elif recovery_status["overall_status"] == "critical":
recovery_status["recommendations"].append("Sofortige Maßnahmen erforderlich - System-Neustart empfohlen")
admin_logger.info(f"Error-Recovery-Status abgerufen: {recovery_status['overall_status']}")
return jsonify({
'success': True,
'error_recovery': recovery_status,
'message': f"Error-Recovery-Status: {recovery_status['status']}"
"success": True,
"recovery_status": recovery_status
})
except Exception as e:
admin_api_logger.error(f"Fehler beim Abrufen des Error-Recovery-Status: {str(e)}")
admin_logger.error(f"Fehler beim Error-Recovery-Status: {str(e)}")
return jsonify({
'success': False,
'error': 'Error-Recovery-Status nicht verfügbar',
'details': str(e),
'error_recovery': {
'status': 'error',
'enabled': False,
'last_check': datetime.now().isoformat()
"success": False,
"error": "Fehler beim Abrufen des Error-Recovery-Status",
"details": str(e),
"recovery_status": {
"overall_status": "error",
"timestamp": datetime.now().isoformat(),
"message": "Error-Recovery-System nicht verfügbar"
}
}), 500