Files
Projektarbeit-MYP/backend/blueprints/admin_unified.py
Till Tomczak a8f27179b5 Es scheint, dass Sie eine Reihe von Dateien und Verzeichnissen in Ihrem Backend-Projekt bearbeitet haben. Hier ist ein zusammenfassender Überblick über die Änderungen:
1. **Entfernung von 'node_modules'**: Es scheint, dass Sie den 'node_modules'-Ordner entfernt oder aktualisiert haben, da einige Dateien wie '.gitignore', 'package
2025-06-19 22:13:49 +02:00

3099 lines
126 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Vereinheitlichter Admin-Blueprint für das MYP 3D-Druck-Management-System
Konsolidierte Implementierung aller Admin-spezifischen Funktionen:
- Benutzerverwaltung und Systemüberwachung (ursprünglich admin.py)
- Erweiterte System-API-Funktionen (ursprünglich admin_api.py)
- System-Backups, Datenbank-Optimierung, Cache-Verwaltung
- Steckdosenschaltzeiten-Übersicht und -verwaltung
Optimierungen:
- Vereinheitlichter admin_required Decorator
- Konsistente Fehlerbehandlung und Logging
- Vollständige API-Kompatibilität zu beiden ursprünglichen Blueprints
Autor: MYP Team - Konsolidiert für IHK-Projektarbeit
Datum: 2025-06-09
"""
import os
import shutil
import zipfile
import sqlite3
import glob
import time
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, current_app
from flask_login import login_required, current_user
from functools import wraps
from models import User, Printer, Job, get_cached_session, Stats, SystemLog, PlugStatusLog, GuestRequest
from utils.logging_config import get_logger
# ===== BLUEPRINT-KONFIGURATION =====
# Haupt-Blueprint für Admin-UI (Templates)
admin_blueprint = Blueprint('admin', __name__, url_prefix='/admin')
# API-Blueprint für erweiterte System-Funktionen
admin_api_blueprint = Blueprint('admin_api', __name__, url_prefix='/api/admin')
# Logger für beide Funktionsbereiche
admin_logger = get_logger("admin")
admin_api_logger = get_logger("admin_api")
# ===== EINHEITLICHER ADMIN-DECORATOR =====
def admin_required(f):
"""
Vereinheitlichter Decorator für Admin-Berechtigung.
Kombiniert die beste Praxis aus beiden ursprünglichen Implementierungen:
- Umfassende Logging-Funktionalität von admin.py
- Robuste Authentifizierungsprüfung von admin_api.py
"""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
# Detaillierte Authentifizierungsprüfung
is_authenticated = current_user.is_authenticated
user_id = current_user.id if is_authenticated else 'Anonymous'
# Doppelte Admin-Prüfung für maximale Sicherheit
is_admin = False
if is_authenticated:
# Methode 1: Property-basierte Prüfung (admin.py-Stil)
is_admin = hasattr(current_user, 'is_admin') and current_user.is_admin
# Methode 2: Role-basierte Prüfung (admin_api.py-Stil) als Fallback
if not is_admin and hasattr(current_user, 'role'):
is_admin = current_user.role == 'admin'
# Umfassendes Logging
admin_logger.info(
f"Admin-Check für Funktion {f.__name__}: "
f"User authenticated: {is_authenticated}, "
f"User ID: {user_id}, "
f"Is Admin: {is_admin}"
)
if not is_admin:
admin_logger.warning(
f"Admin-Zugriff verweigert für User {user_id} auf Funktion {f.__name__}"
)
return jsonify({
"error": "Nur Administratoren haben Zugriff",
"message": "Admin-Berechtigung erforderlich"
}), 403
return f(*args, **kwargs)
return decorated_function
# ===== ADMIN-UI ROUTEN (ursprünglich admin.py) =====
@admin_blueprint.route("/")
@admin_required
def admin_dashboard():
"""Admin-Dashboard-Hauptseite mit Systemstatistiken"""
try:
with get_cached_session() as db_session:
# Grundlegende Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
}
admin_logger.info(f"Admin-Dashboard geladen von {current_user.username}")
return render_template('admin.html', stats=stats, active_tab=None)
except Exception as e:
admin_logger.error(f"Fehler beim Laden des Admin-Dashboards: {str(e)}")
flash("Fehler beim Laden der Dashboard-Daten", "error")
return render_template('admin.html', stats={}, active_tab=None)
@admin_blueprint.route("/plug-schedules")
@admin_required
def admin_plug_schedules():
"""
Administrator-Übersicht für Steckdosenschaltzeiten.
Zeigt detaillierte Historie aller Smart Plug Schaltzeiten mit Kalenderansicht.
"""
admin_logger.info(f"Admin {current_user.username} (ID: {current_user.id}) öffnet Steckdosenschaltzeiten")
try:
# Statistiken für die letzten 24 Stunden abrufen
stats_24h = PlugStatusLog.get_status_statistics(hours=24)
# Alle Drucker für Filter-Dropdown
with get_cached_session() as db_session:
# Alle Drucker für Auswahlfelder anzeigen (unabhängig von active-Status)
printers = db_session.query(Printer).all()
return render_template('admin_plug_schedules.html',
stats=stats_24h,
printers=printers,
page_title="Steckdosenschaltzeiten",
breadcrumb=[
{"name": "Admin-Dashboard", "url": url_for("admin.admin_dashboard")},
{"name": "Steckdosenschaltzeiten", "url": "#"}
])
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Steckdosenschaltzeiten-Seite: {str(e)}")
flash("Fehler beim Laden der Steckdosenschaltzeiten-Daten.", "error")
return redirect(url_for("admin.admin_dashboard"))
@admin_blueprint.route("/users")
@admin_required
def users_overview():
"""Benutzerübersicht für Administratoren"""
try:
with get_cached_session() as db_session:
# Alle Benutzer laden
users = db_session.query(User).order_by(User.created_at.desc()).all()
# Grundlegende Statistiken sammeln
total_users = len(users)
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
}
admin_logger.info(f"Benutzerübersicht geladen von {current_user.username}")
return render_template('admin.html', stats=stats, users=users, active_tab='users')
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Benutzerübersicht: {str(e)}")
flash("Fehler beim Laden der Benutzerdaten", "error")
return render_template('admin.html', stats={}, users=[], active_tab='users')
@admin_blueprint.route("/users/add", methods=["GET"])
@admin_required
def add_user_page():
"""Seite zum Hinzufügen eines neuen Benutzers"""
return render_template('admin_add_user.html')
@admin_blueprint.route("/users/<int:user_id>/edit", methods=["GET"])
@admin_required
def edit_user_page(user_id):
"""Seite zum Bearbeiten eines Benutzers"""
try:
with get_cached_session() as db_session:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('admin.users_overview'))
return render_template('admin_edit_user.html', user=user)
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Benutzer-Bearbeitung: {str(e)}")
flash("Fehler beim Laden der Benutzerdaten", "error")
return redirect(url_for('admin.users_overview'))
@admin_blueprint.route("/printers")
@admin_required
def printers_overview():
"""Druckerübersicht für Administratoren"""
try:
with get_cached_session() as db_session:
# Nur TBA Marienfelde Drucker laden
printers = db_session.query(Printer).filter(
Printer.location == "TBA Marienfelde"
).order_by(Printer.created_at.desc()).all()
# Grundlegende Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = len(printers)
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
# Online-Drucker zählen (vereinfacht, da wir keinen Live-Status haben)
online_printers = len([p for p in printers if p.status == 'online'])
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs,
'online_printers': online_printers
}
admin_logger.info(f"Druckerübersicht geladen von {current_user.username}")
return render_template('admin.html', stats=stats, printers=printers, active_tab='printers')
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Druckerübersicht: {str(e)}")
flash("Fehler beim Laden der Druckerdaten", "error")
return render_template('admin.html', stats={}, printers=[], active_tab='printers')
@admin_blueprint.route("/printers/add", methods=["GET"])
@admin_required
def add_printer_page():
"""Seite zum Hinzufügen eines neuen Druckers"""
return render_template('admin_add_printer.html')
@admin_blueprint.route("/printers/<int:printer_id>/edit", methods=["GET"])
@admin_required
def edit_printer_page(printer_id):
"""Seite zum Bearbeiten eines Druckers"""
try:
with get_cached_session() as db_session:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
flash("Drucker nicht gefunden", "error")
return redirect(url_for('admin.printers_overview'))
return render_template('admin_edit_printer.html', printer=printer)
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Drucker-Bearbeitung: {str(e)}")
flash("Fehler beim Laden der Druckerdaten", "error")
return redirect(url_for('admin.printers_overview'))
@admin_blueprint.route("/guest-requests")
@admin_required
def guest_requests():
"""Gäste-Anfragen-Übersicht"""
return render_template('admin_guest_requests.html')
@admin_blueprint.route("/advanced-settings")
@admin_required
def advanced_settings():
"""Erweiterte Systemeinstellungen"""
try:
with get_cached_session() as db_session:
# Grundlegende Statistiken sammeln für das Template
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Drucker zählen (online/verfügbar)
active_printers = db_session.query(Printer).filter(
Printer.status.in_(['online', 'available', 'idle'])
).count()
# Wartende Jobs zählen
pending_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'scheduled', 'queued'])
).count()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'active_printers': active_printers,
'total_jobs': total_jobs,
'pending_jobs': pending_jobs
}
# Standard-Optimierungseinstellungen für das Template
optimization_settings = {
'algorithm': 'round_robin',
'consider_distance': True,
'minimize_changeover': True,
'auto_optimization_enabled': False,
'max_batch_size': 10,
'time_window': 24
}
admin_logger.info(f"Erweiterte Einstellungen geladen von {current_user.username}")
return render_template('admin_advanced_settings.html', stats=stats, optimization_settings=optimization_settings)
except Exception as e:
admin_logger.error(f"Fehler beim Laden der erweiterten Einstellungen: {str(e)}")
flash("Fehler beim Laden der Systemdaten", "error")
# Fallback mit leeren Statistiken
stats = {
'total_users': 0,
'total_printers': 0,
'active_printers': 0,
'total_jobs': 0,
'pending_jobs': 0
}
# Fallback-Optimierungseinstellungen
optimization_settings = {
'algorithm': 'round_robin',
'consider_distance': True,
'minimize_changeover': True,
'auto_optimization_enabled': False,
'max_batch_size': 10,
'time_window': 24
}
return render_template('admin_advanced_settings.html', stats=stats, optimization_settings=optimization_settings)
@admin_blueprint.route("/system-health")
@admin_required
def system_health():
"""System-Gesundheitsstatus"""
try:
with get_cached_session() as db_session:
# Grundlegende Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
}
admin_logger.info(f"System-Health geladen von {current_user.username}")
return render_template('admin.html', stats=stats, active_tab='system')
except Exception as e:
admin_logger.error(f"Fehler beim Laden des System-Health: {str(e)}")
flash("Fehler beim Laden der System-Daten", "error")
return render_template('admin.html', stats={}, active_tab='system')
@admin_blueprint.route("/logs")
@admin_required
def logs_overview():
"""System-Logs-Übersicht"""
try:
with get_cached_session() as db_session:
# Grundlegende Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
# Neueste Logs laden (falls SystemLog Model existiert)
try:
recent_logs = db_session.query(SystemLog).order_by(SystemLog.timestamp.desc()).limit(50).all()
except Exception:
recent_logs = []
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
}
admin_logger.info(f"Logs-Übersicht geladen von {current_user.username}")
return render_template('admin.html', stats=stats, logs=recent_logs, active_tab='logs')
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Logs-Übersicht: {str(e)}")
flash("Fehler beim Laden der Log-Daten", "error")
return render_template('admin.html', stats={}, logs=[], active_tab='logs')
@admin_blueprint.route("/maintenance")
@admin_required
def maintenance():
"""Wartungsseite"""
try:
with get_cached_session() as db_session:
# Grundlegende Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
}
admin_logger.info(f"Wartungsseite geladen von {current_user.username}")
return render_template('admin.html', stats=stats, active_tab='maintenance')
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Wartungsseite: {str(e)}")
flash("Fehler beim Laden der Wartungsdaten", "error")
return render_template('admin.html', stats={}, active_tab='maintenance')
# ===== BENUTZER-CRUD-API (ursprünglich admin.py) =====
@admin_api_blueprint.route("/users", methods=["POST"])
@admin_required
def create_user_api():
"""API-Endpunkt zum Erstellen eines neuen Benutzers"""
try:
data = request.get_json()
# Validierung der erforderlichen Felder
required_fields = ['username', 'email', 'password', 'name']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400
with get_cached_session() as db_session:
# Überprüfung auf bereits existierende Benutzer
existing_user = db_session.query(User).filter(
(User.username == data['username']) | (User.email == data['email'])
).first()
if existing_user:
return jsonify({"error": "Benutzername oder E-Mail bereits vergeben"}), 400
# Neuen Benutzer erstellen
new_user = User(
username=data['username'],
email=data['email'],
name=data['name'],
role=data.get('role', 'user'),
department=data.get('department'),
position=data.get('position'),
phone=data.get('phone'),
bio=data.get('bio')
)
new_user.set_password(data['password'])
db_session.add(new_user)
db_session.flush() # ID generieren für UserPermission
# Granulare Berechtigungen erstellen
from models import UserPermission
permissions = UserPermission(
user_id=new_user.id,
can_start_jobs=data.get('can_start_jobs', True), # Standard: kann Jobs starten
needs_approval=data.get('needs_approval', False), # Standard: keine Genehmigung nötig
can_approve_jobs=data.get('can_approve_jobs', False) # Standard: kann nicht genehmigen
)
# Administratoren bekommen automatisch Genehmigungsrechte
if new_user.role == 'admin':
permissions.can_approve_jobs = True
permissions.can_start_jobs = True
permissions.needs_approval = False
db_session.add(permissions)
db_session.commit()
admin_logger.info(f"Neuer Benutzer erstellt: {new_user.username} von Admin {current_user.username}")
return jsonify({
"success": True,
"message": "Benutzer erfolgreich erstellt",
"user_id": new_user.id
})
except Exception as e:
admin_logger.error(f"Fehler beim Erstellen des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Benutzers"}), 500
@admin_api_blueprint.route("/users/<int:user_id>", methods=["GET"])
@admin_required
def get_user_api(user_id):
"""API-Endpunkt zum Abrufen von Benutzerdaten"""
try:
with get_cached_session() as db_session:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio
}
return jsonify(user_data)
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen der Benutzerdaten: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 500
@admin_api_blueprint.route("/users/<int:user_id>", methods=["PUT"])
@admin_required
def update_user_api(user_id):
"""API-Endpunkt zum Aktualisieren von Benutzerdaten"""
try:
data = request.get_json()
with get_cached_session() as db_session:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisierbare Felder
updatable_fields = ['username', 'email', 'name', 'role', 'active', 'department', 'position', 'phone', 'bio']
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
# Passwort separat behandeln
if 'password' in data and data['password']:
user.set_password(data['password'])
user.updated_at = datetime.now()
db_session.commit()
admin_logger.info(f"Benutzer {user.username} aktualisiert von Admin {current_user.username}")
return jsonify({
"success": True,
"message": "Benutzer erfolgreich aktualisiert"
})
except Exception as e:
admin_logger.error(f"Fehler beim Aktualisieren des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Benutzers"}), 500
@admin_api_blueprint.route("/users/<int:user_id>", methods=["DELETE"])
@admin_required
def delete_user_api(user_id):
"""Löscht einen Benutzer über die API"""
try:
with get_cached_session() as db_session:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Prüfen ob der Benutzer der einzige Admin ist
if user.is_admin:
admin_count = db_session.query(User).filter(User.is_admin == True).count()
if admin_count <= 1:
return jsonify({"error": "Der letzte Administrator kann nicht gelöscht werden"}), 400
username = user.username
db_session.delete(user)
db_session.commit()
admin_logger.info(f"Benutzer {username} gelöscht von Admin {current_user.username}")
return jsonify({
"success": True,
"message": "Benutzer erfolgreich gelöscht"
})
except Exception as e:
admin_logger.error(f"Fehler beim Löschen des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Löschen des Benutzers"}), 500
# ===== DRUCKER-API-ROUTEN =====
@admin_api_blueprint.route("/printers/<int:printer_id>", methods=["DELETE"])
@admin_required
def delete_printer_api(printer_id):
"""Löscht einen Drucker über die API mit allen Abhängigkeiten"""
try:
from models import get_db_session, Printer, Job, GuestRequest, JobOrder, PlugStatusLog
with get_db_session() as db_session:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
printer_name = printer.name
printer_location = printer.location
deleted_items = []
# 1. Nullable ForeignKeys auf NULL setzen (GuestRequest)
guest_requests_printer = db_session.query(GuestRequest).filter(GuestRequest.printer_id == printer_id).count()
if guest_requests_printer > 0:
db_session.query(GuestRequest).filter(GuestRequest.printer_id == printer_id).update({GuestRequest.printer_id: None})
deleted_items.append(f"{guest_requests_printer} Gastanfragen aktualisiert")
guest_requests_assigned = db_session.query(GuestRequest).filter(GuestRequest.assigned_printer_id == printer_id).count()
if guest_requests_assigned > 0:
db_session.query(GuestRequest).filter(GuestRequest.assigned_printer_id == printer_id).update({GuestRequest.assigned_printer_id: None})
deleted_items.append(f"{guest_requests_assigned} zugewiesene Gastanfragen aktualisiert")
# 2. Non-nullable ForeignKeys löschen
job_orders_count = db_session.query(JobOrder).filter(JobOrder.printer_id == printer_id).count()
if job_orders_count > 0:
db_session.query(JobOrder).filter(JobOrder.printer_id == printer_id).delete()
deleted_items.append(f"{job_orders_count} Auftragsbestellungen gelöscht")
plug_logs_count = db_session.query(PlugStatusLog).filter(PlugStatusLog.printer_id == printer_id).count()
if plug_logs_count > 0:
db_session.query(PlugStatusLog).filter(PlugStatusLog.printer_id == printer_id).delete()
deleted_items.append(f"{plug_logs_count} Plug-Status-Logs gelöscht")
# 3. Jobs explizit löschen (auch wenn CASCADE vorhanden ist)
jobs_count = db_session.query(Job).filter(Job.printer_id == printer_id).count()
if jobs_count > 0:
db_session.query(Job).filter(Job.printer_id == printer_id).delete()
deleted_items.append(f"{jobs_count} Jobs gelöscht")
# 4. Drucker aus der Datenbank entfernen
db_session.delete(printer)
db_session.commit()
# Cache invalidieren
from models import invalidate_model_cache
invalidate_model_cache("Printer", printer_id)
admin_logger.info(f"Drucker '{printer_name}' (ID: {printer_id}, Standort: {printer_location}) und alle Abhängigkeiten gelöscht von Admin {current_user.username}")
if deleted_items:
admin_logger.info(f"Gelöschte Abhängigkeiten: {', '.join(deleted_items)}")
success_message = f"Drucker '{printer_name}' erfolgreich gelöscht"
if deleted_items:
success_message += f" (einschließlich: {', '.join(deleted_items)})"
return jsonify({
"success": True,
"message": success_message
})
except Exception as e:
admin_logger.error(f"Fehler beim Löschen des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Fehler beim Löschen des Druckers"}), 500
# ===== ERWEITERTE SYSTEM-API (ursprünglich admin_api.py) =====
@admin_api_blueprint.route('/backup/create', methods=['POST'])
@admin_required
def create_backup():
"""
Erstellt ein manuelles System-Backup.
Erstellt eine Sicherung aller wichtigen Systemdaten einschließlich
Datenbank, Konfigurationsdateien und Benutzer-Uploads.
Returns:
JSON: Erfolgs-Status und Backup-Informationen
"""
try:
admin_api_logger.info(f"Backup-Erstellung angefordert von Admin {current_user.username}")
# Backup-Verzeichnis sicherstellen
backup_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'database', 'backups')
os.makedirs(backup_dir, exist_ok=True)
# Eindeutigen Backup-Namen erstellen
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"system_backup_{timestamp}.zip"
backup_path = os.path.join(backup_dir, backup_name)
created_files = []
backup_size = 0
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 1. Datenbank-Datei hinzufügen
try:
from utils.utilities_collection import DATABASE_PATH
if os.path.exists(DATABASE_PATH):
zipf.write(DATABASE_PATH, 'database/main.db')
created_files.append('database/main.db')
admin_api_logger.debug("✅ Hauptdatenbank zur Sicherung hinzugefügt")
# WAL- und SHM-Dateien falls vorhanden
wal_path = DATABASE_PATH + '-wal'
shm_path = DATABASE_PATH + '-shm'
if os.path.exists(wal_path):
zipf.write(wal_path, 'database/main.db-wal')
created_files.append('database/main.db-wal')
if os.path.exists(shm_path):
zipf.write(shm_path, 'database/main.db-shm')
created_files.append('database/main.db-shm')
except Exception as db_error:
admin_api_logger.warning(f"Fehler beim Hinzufügen der Datenbank: {str(db_error)}")
# 2. Konfigurationsdateien
try:
config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config')
if os.path.exists(config_dir):
for root, dirs, files in os.walk(config_dir):
for file in files:
if file.endswith(('.py', '.json', '.yaml', '.yml', '.toml')):
file_path = os.path.join(root, file)
arc_path = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__)))
zipf.write(file_path, arc_path)
created_files.append(arc_path)
admin_api_logger.debug("✅ Konfigurationsdateien zur Sicherung hinzugefügt")
except Exception as config_error:
admin_api_logger.warning(f"Fehler beim Hinzufügen der Konfiguration: {str(config_error)}")
# 3. Wichtige User-Uploads (limitiert auf die letzten 1000 Dateien)
try:
uploads_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'uploads')
if os.path.exists(uploads_dir):
file_count = 0
max_files = 1000 # Limit für Performance
for root, dirs, files in os.walk(uploads_dir):
for file in files[:max_files - file_count]:
if file_count >= max_files:
break
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
# Nur Dateien unter 50MB hinzufügen
if file_size < 50 * 1024 * 1024:
arc_path = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__)))
zipf.write(file_path, arc_path)
created_files.append(arc_path)
file_count += 1
if file_count >= max_files:
break
admin_api_logger.debug(f"{file_count} Upload-Dateien zur Sicherung hinzugefügt")
except Exception as uploads_error:
admin_api_logger.warning(f"Fehler beim Hinzufügen der Uploads: {str(uploads_error)}")
# 4. System-Logs (nur die letzten 100 Log-Dateien)
try:
logs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
if os.path.exists(logs_dir):
log_files = []
for root, dirs, files in os.walk(logs_dir):
for file in files:
if file.endswith(('.log', '.txt')):
file_path = os.path.join(root, file)
log_files.append((file_path, os.path.getmtime(file_path)))
# Sortiere nach Datum (neueste zuerst) und nimm nur die letzten 100
log_files.sort(key=lambda x: x[1], reverse=True)
for file_path, _ in log_files[:100]:
arc_path = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__)))
zipf.write(file_path, arc_path)
created_files.append(arc_path)
admin_api_logger.debug(f"{len(log_files[:100])} Log-Dateien zur Sicherung hinzugefügt")
except Exception as logs_error:
admin_api_logger.warning(f"Fehler beim Hinzufügen der Logs: {str(logs_error)}")
# Backup-Größe bestimmen
if os.path.exists(backup_path):
backup_size = os.path.getsize(backup_path)
admin_api_logger.info(f"✅ System-Backup erfolgreich erstellt: {backup_name} ({backup_size / 1024 / 1024:.2f} MB)")
return jsonify({
'success': True,
'message': f'Backup erfolgreich erstellt: {backup_name}',
'backup_info': {
'filename': backup_name,
'size_bytes': backup_size,
'size_mb': round(backup_size / 1024 / 1024, 2),
'files_count': len(created_files),
'created_at': datetime.now().isoformat(),
'path': backup_path
}
})
except Exception as e:
admin_api_logger.error(f"❌ Fehler beim Erstellen des Backups: {str(e)}")
return jsonify({
'success': False,
'message': f'Fehler beim Erstellen des Backups: {str(e)}'
}), 500
@admin_api_blueprint.route('/printers/<int:printer_id>/toggle', methods=['POST'])
@admin_required
def toggle_printer_power(printer_id):
"""
Schaltet die Smart-Plug-Steckdose eines Druckers ein/aus (Toggle-Funktion).
Args:
printer_id: ID des zu steuernden Druckers
JSON-Parameter:
- reason: Grund für die Schaltung (optional)
Returns:
JSON mit Ergebnis der Toggle-Aktion
"""
admin_api_logger.info(f"🔌 Smart-Plug Toggle für Drucker {printer_id} von Admin {current_user.name}")
try:
# Parameter auslesen
data = request.get_json() or {}
reason = data.get("reason", "Admin-Panel Toggle")
# Drucker aus Datenbank holen
db_session = get_cached_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
return jsonify({
"success": False,
"error": f"Drucker mit ID {printer_id} nicht gefunden"
}), 404
# Prüfen, ob Drucker eine Steckdose konfiguriert hat
if not printer.plug_ip or not printer.plug_username or not printer.plug_password:
return jsonify({
"success": False,
"error": f"Drucker {printer.name} hat keine Steckdose konfiguriert"
}), 400
# Aktuellen Status der Steckdose ermitteln
try:
from PyP100 import PyP110
p110 = PyP110.P110(printer.plug_ip, printer.plug_username, printer.plug_password)
p110.handshake()
p110.login()
# Aktuellen Status abrufen
device_info = p110.getDeviceInfo()
current_status = device_info["result"]["device_on"]
# Toggle-Aktion durchführen
if current_status:
# Ausschalten
p110.turnOff()
new_status = "off"
action = "ausgeschaltet"
printer.status = "offline"
else:
# Einschalten
p110.turnOn()
new_status = "on"
action = "eingeschaltet"
printer.status = "starting"
# Drucker-Status in DB aktualisieren
printer.last_checked = datetime.now()
db_session.commit()
admin_api_logger.info(f"✅ Drucker {printer.name} erfolgreich {action} | Grund: {reason}")
return jsonify({
"success": True,
"message": f"Drucker {printer.name} erfolgreich {action}",
"printer": {
"id": printer_id,
"name": printer.name,
"model": printer.model,
"location": printer.location
},
"toggle_result": {
"previous_status": "on" if current_status else "off",
"new_status": new_status,
"action": action,
"reason": reason
},
"performed_by": {
"id": current_user.id,
"name": current_user.name
},
"timestamp": datetime.now().isoformat()
})
except Exception as tapo_error:
admin_api_logger.error(f"❌ Tapo-Fehler für Drucker {printer.name}: {str(tapo_error)}")
return jsonify({
"success": False,
"error": f"Fehler bei Steckdosensteuerung: {str(tapo_error)}"
}), 500
except Exception as e:
admin_api_logger.error(f"❌ Allgemeiner Fehler bei Toggle-Aktion: {str(e)}")
return jsonify({
"success": False,
"error": f"Systemfehler: {str(e)}"
}), 500
@admin_api_blueprint.route('/database/optimize', methods=['POST'])
@admin_required
def optimize_database():
"""
Führt Datenbank-Optimierung durch.
Optimiert die SQLite-Datenbank durch VACUUM, ANALYZE und weitere
Wartungsoperationen für bessere Performance.
Returns:
JSON: Erfolgs-Status und Optimierungs-Statistiken
"""
try:
admin_api_logger.info(f"Datenbank-Optimierung angefordert von Admin {current_user.username}")
from utils.utilities_collection import DATABASE_PATH
optimization_results = {
'vacuum_completed': False,
'analyze_completed': False,
'integrity_check': False,
'wal_checkpoint': False,
'size_before': 0,
'size_after': 0,
'space_saved': 0
}
# Datenbankgröße vor Optimierung
if os.path.exists(DATABASE_PATH):
optimization_results['size_before'] = os.path.getsize(DATABASE_PATH)
# Verbindung zur Datenbank herstellen
conn = sqlite3.connect(DATABASE_PATH, timeout=30.0)
cursor = conn.cursor()
try:
# 1. Integritätsprüfung
admin_api_logger.debug("🔍 Führe Integritätsprüfung durch...")
cursor.execute("PRAGMA integrity_check")
integrity_result = cursor.fetchone()
optimization_results['integrity_check'] = integrity_result[0] == 'ok'
if not optimization_results['integrity_check']:
admin_api_logger.warning(f"⚠️ Integritätsprüfung ergab: {integrity_result[0]}")
else:
admin_api_logger.debug("✅ Integritätsprüfung erfolgreich")
# 2. WAL-Checkpoint (falls WAL-Modus aktiv)
try:
admin_api_logger.debug("🔄 Führe WAL-Checkpoint durch...")
cursor.execute("PRAGMA wal_checkpoint(TRUNCATE)")
optimization_results['wal_checkpoint'] = True
admin_api_logger.debug("✅ WAL-Checkpoint erfolgreich")
except Exception as wal_error:
admin_api_logger.debug(f" WAL-Checkpoint nicht möglich: {str(wal_error)}")
# 3. ANALYZE - Statistiken aktualisieren
admin_api_logger.debug("📊 Aktualisiere Datenbank-Statistiken...")
cursor.execute("ANALYZE")
optimization_results['analyze_completed'] = True
admin_api_logger.debug("✅ ANALYZE erfolgreich")
# 4. VACUUM - Datenbank komprimieren und reorganisieren
admin_api_logger.debug("🗜️ Komprimiere und reorganisiere Datenbank...")
cursor.execute("VACUUM")
optimization_results['vacuum_completed'] = True
admin_api_logger.debug("✅ VACUUM erfolgreich")
# 5. Performance-Optimierungen
try:
# Cache-Größe optimieren
cursor.execute("PRAGMA cache_size = 10000") # 10MB Cache
# Journal-Modus auf WAL setzen für bessere Concurrent-Performance
cursor.execute("PRAGMA journal_mode = WAL")
# Synchronous auf NORMAL für Balance zwischen Performance und Sicherheit
cursor.execute("PRAGMA synchronous = NORMAL")
# Page-Größe optimieren (falls noch nicht gesetzt)
cursor.execute("PRAGMA page_size = 4096")
admin_api_logger.debug("✅ Performance-Optimierungen angewendet")
except Exception as perf_error:
admin_api_logger.warning(f"⚠️ Performance-Optimierungen teilweise fehlgeschlagen: {str(perf_error)}")
finally:
cursor.close()
conn.close()
# Datenbankgröße nach Optimierung
if os.path.exists(DATABASE_PATH):
optimization_results['size_after'] = os.path.getsize(DATABASE_PATH)
optimization_results['space_saved'] = optimization_results['size_before'] - optimization_results['size_after']
# Ergebnisse loggen
space_saved_mb = optimization_results['space_saved'] / 1024 / 1024
admin_api_logger.info(f"✅ Datenbank-Optimierung abgeschlossen - {space_saved_mb:.2f} MB Speicher gespart")
return jsonify({
'success': True,
'message': 'Datenbank erfolgreich optimiert',
'results': {
'vacuum_completed': optimization_results['vacuum_completed'],
'analyze_completed': optimization_results['analyze_completed'],
'integrity_check_passed': optimization_results['integrity_check'],
'wal_checkpoint_completed': optimization_results['wal_checkpoint'],
'size_before_mb': round(optimization_results['size_before'] / 1024 / 1024, 2),
'size_after_mb': round(optimization_results['size_after'] / 1024 / 1024, 2),
'space_saved_mb': round(space_saved_mb, 2),
'optimization_timestamp': datetime.now().isoformat()
}
})
except Exception as e:
admin_api_logger.error(f"❌ Fehler bei Datenbank-Optimierung: {str(e)}")
return jsonify({
'success': False,
'message': f'Fehler bei Datenbank-Optimierung: {str(e)}'
}), 500
@admin_api_blueprint.route('/cache/clear', methods=['POST'])
@admin_required
def clear_cache():
"""
Leert den System-Cache.
Entfernt alle temporären Dateien, Cache-Verzeichnisse und
Python-Bytecode um Speicher freizugeben und Performance zu verbessern.
Returns:
JSON: Erfolgs-Status und Lösch-Statistiken
"""
try:
admin_api_logger.info(f"Cache-Leerung angefordert von Admin {current_user.username}")
cleared_stats = {
'files_deleted': 0,
'dirs_deleted': 0,
'space_freed': 0,
'categories': {}
}
app_root = os.path.dirname(os.path.dirname(__file__))
# 1. Python-Bytecode-Cache leeren (__pycache__)
try:
pycache_count = 0
pycache_size = 0
for root, dirs, files in os.walk(app_root):
if '__pycache__' in root:
for file in files:
file_path = os.path.join(root, file)
try:
pycache_size += os.path.getsize(file_path)
os.remove(file_path)
pycache_count += 1
except Exception:
pass
# Versuche das __pycache__-Verzeichnis zu löschen
try:
os.rmdir(root)
cleared_stats['dirs_deleted'] += 1
except Exception:
pass
cleared_stats['categories']['python_bytecode'] = {
'files': pycache_count,
'size_mb': round(pycache_size / 1024 / 1024, 2)
}
cleared_stats['files_deleted'] += pycache_count
cleared_stats['space_freed'] += pycache_size
admin_api_logger.debug(f"✅ Python-Bytecode-Cache: {pycache_count} Dateien, {pycache_size / 1024 / 1024:.2f} MB")
except Exception as pycache_error:
admin_api_logger.warning(f"⚠️ Fehler beim Leeren des Python-Cache: {str(pycache_error)}")
# 2. Temporäre Dateien im uploads/temp Verzeichnis
try:
temp_count = 0
temp_size = 0
temp_dir = os.path.join(app_root, 'uploads', 'temp')
if os.path.exists(temp_dir):
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
try:
temp_size += os.path.getsize(file_path)
os.remove(file_path)
temp_count += 1
except Exception:
pass
cleared_stats['categories']['temp_uploads'] = {
'files': temp_count,
'size_mb': round(temp_size / 1024 / 1024, 2)
}
cleared_stats['files_deleted'] += temp_count
cleared_stats['space_freed'] += temp_size
admin_api_logger.debug(f"✅ Temporäre Upload-Dateien: {temp_count} Dateien, {temp_size / 1024 / 1024:.2f} MB")
except Exception as temp_error:
admin_api_logger.warning(f"⚠️ Fehler beim Leeren des Temp-Verzeichnisses: {str(temp_error)}")
# 3. System-Cache-Verzeichnisse (falls vorhanden)
try:
cache_count = 0
cache_size = 0
cache_dirs = [
os.path.join(app_root, 'static', 'cache'),
os.path.join(app_root, 'cache'),
os.path.join(app_root, '.cache')
]
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
for root, dirs, files in os.walk(cache_dir):
for file in files:
file_path = os.path.join(root, file)
try:
cache_size += os.path.getsize(file_path)
os.remove(file_path)
cache_count += 1
except Exception:
pass
cleared_stats['categories']['system_cache'] = {
'files': cache_count,
'size_mb': round(cache_size / 1024 / 1024, 2)
}
cleared_stats['files_deleted'] += cache_count
cleared_stats['space_freed'] += cache_size
admin_api_logger.debug(f"✅ System-Cache: {cache_count} Dateien, {cache_size / 1024 / 1024:.2f} MB")
except Exception as cache_error:
admin_api_logger.warning(f"⚠️ Fehler beim Leeren des System-Cache: {str(cache_error)}")
# 4. Alte Log-Dateien (älter als 30 Tage)
try:
logs_count = 0
logs_size = 0
logs_dir = os.path.join(app_root, 'logs')
cutoff_date = datetime.now().timestamp() - (30 * 24 * 60 * 60) # 30 Tage
if os.path.exists(logs_dir):
for root, dirs, files in os.walk(logs_dir):
for file in files:
if file.endswith(('.log', '.log.1', '.log.2', '.log.3')):
file_path = os.path.join(root, file)
try:
if os.path.getmtime(file_path) < cutoff_date:
logs_size += os.path.getsize(file_path)
os.remove(file_path)
logs_count += 1
except Exception:
pass
cleared_stats['categories']['old_logs'] = {
'files': logs_count,
'size_mb': round(logs_size / 1024 / 1024, 2)
}
cleared_stats['files_deleted'] += logs_count
cleared_stats['space_freed'] += logs_size
admin_api_logger.debug(f"✅ Alte Log-Dateien: {logs_count} Dateien, {logs_size / 1024 / 1024:.2f} MB")
except Exception as logs_error:
admin_api_logger.warning(f"⚠️ Fehler beim Leeren alter Log-Dateien: {str(logs_error)}")
# 5. Application-Level Cache leeren (falls Models-Cache existiert)
try:
from models import clear_model_cache
clear_model_cache()
admin_api_logger.debug("✅ Application-Level Cache geleert")
except (ImportError, AttributeError):
admin_api_logger.debug(" Kein Application-Level Cache verfügbar")
# Ergebnisse zusammenfassen
total_space_mb = cleared_stats['space_freed'] / 1024 / 1024
admin_api_logger.info(f"✅ Cache-Leerung abgeschlossen: {cleared_stats['files_deleted']} Dateien, {total_space_mb:.2f} MB freigegeben")
return jsonify({
'success': True,
'message': f'Cache erfolgreich geleert - {total_space_mb:.2f} MB freigegeben',
'statistics': {
'total_files_deleted': cleared_stats['files_deleted'],
'total_dirs_deleted': cleared_stats['dirs_deleted'],
'total_space_freed_mb': round(total_space_mb, 2),
'categories': cleared_stats['categories'],
'cleanup_timestamp': datetime.now().isoformat()
}
})
except Exception as e:
admin_api_logger.error(f"❌ Fehler beim Leeren des Cache: {str(e)}")
return jsonify({
'success': False,
'message': f'Fehler beim Leeren des Cache: {str(e)}'
}), 500
# ===== API-ENDPUNKTE FÜR LOGS =====
@admin_api_blueprint.route("/logs", methods=["GET"])
@admin_required
def get_logs_api():
"""API-Endpunkt zum Abrufen von System-Logs"""
try:
level = request.args.get('level', 'all')
limit = min(int(request.args.get('limit', 100)), 1000) # Max 1000 Logs
with get_cached_session() as db_session:
query = db_session.query(SystemLog)
# Filter nach Log-Level falls spezifiziert
if level != 'all':
query = query.filter(SystemLog.level == level.upper())
# Logs laden
logs = query.order_by(SystemLog.timestamp.desc()).limit(limit).all()
# In Dictionary konvertieren
logs_data = []
for log in logs:
logs_data.append({
'id': log.id,
'level': log.level,
'message': log.message,
'timestamp': log.timestamp.isoformat() if log.timestamp else None,
'module': getattr(log, 'module', ''),
'user_id': getattr(log, 'user_id', None),
'ip_address': getattr(log, 'ip_address', '')
})
admin_logger.info(f"Logs abgerufen: {len(logs_data)} Einträge, Level: {level}")
return jsonify({
"success": True,
"logs": logs_data,
"count": len(logs_data),
"level": level
})
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen der Logs: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Logs"}), 500
@admin_api_blueprint.route("/logs/export", methods=["POST"])
@admin_required
def export_logs_api():
"""API-Endpunkt zum Exportieren von System-Logs"""
try:
data = request.get_json() or {}
level = data.get('level', 'all')
format_type = data.get('format', 'json') # json, csv, txt
with get_cached_session() as db_session:
query = db_session.query(SystemLog)
# Filter nach Log-Level falls spezifiziert
if level != 'all':
query = query.filter(SystemLog.level == level.upper())
# Alle Logs für Export laden
logs = query.order_by(SystemLog.timestamp.desc()).all()
# Export-Format bestimmen
if format_type == 'csv':
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# Header schreiben
writer.writerow(['Timestamp', 'Level', 'Module', 'Message', 'User ID', 'IP Address'])
# Daten schreiben
for log in logs:
writer.writerow([
log.timestamp.isoformat() if log.timestamp else '',
log.level,
getattr(log, 'module', ''),
log.message,
getattr(log, 'user_id', ''),
getattr(log, 'ip_address', '')
])
content = output.getvalue()
output.close()
return jsonify({
"success": True,
"content": content,
"filename": f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
"content_type": "text/csv"
})
elif format_type == 'txt':
lines = []
for log in logs:
timestamp = log.timestamp.strftime('%Y-%m-%d %H:%M:%S') if log.timestamp else 'Unknown'
lines.append(f"[{timestamp}] {log.level}: {log.message}")
content = '\n'.join(lines)
return jsonify({
"success": True,
"content": content,
"filename": f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
"content_type": "text/plain"
})
else: # JSON format
logs_data = []
for log in logs:
logs_data.append({
'id': log.id,
'level': log.level,
'message': log.message,
'timestamp': log.timestamp.isoformat() if log.timestamp else None,
'module': getattr(log, 'module', ''),
'user_id': getattr(log, 'user_id', None),
'ip_address': getattr(log, 'ip_address', '')
})
import json
content = json.dumps(logs_data, indent=2, ensure_ascii=False)
return jsonify({
"success": True,
"content": content,
"filename": f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
"content_type": "application/json"
})
except Exception as e:
admin_logger.error(f"Fehler beim Exportieren der Logs: {str(e)}")
return jsonify({"error": "Fehler beim Exportieren der Logs"}), 500
# ===== GAST-OTP-MANAGEMENT FÜR OFFLINE-BETRIEB =====
@admin_api_blueprint.route("/guest-requests", methods=["GET"])
@admin_required
def get_guest_requests_api():
"""API-Endpunkt zum Abrufen aller Gastanfragen mit OTP-Codes für Admins"""
try:
with get_cached_session() as db_session:
# Alle Gastanfragen laden
guest_requests = db_session.query(GuestRequest).order_by(
GuestRequest.created_at.desc()
).all()
# In Dictionary konvertieren mit OTP-Codes für Admins
requests_data = []
for req in guest_requests:
request_data = {
'id': req.id,
'name': req.name,
'email': req.email,
'reason': req.reason,
'status': req.status,
'duration_min': req.duration_min,
'created_at': req.created_at.isoformat() if req.created_at else None,
'processed_at': req.processed_at.isoformat() if req.processed_at else None,
'processed_by': req.processed_by,
'approval_notes': req.approval_notes,
'rejection_reason': req.rejection_reason,
'author_ip': req.author_ip
}
# OTP-Code für Admins sichtbar machen (nur wenn aktiv)
if req.status == 'approved' and req.otp_code and req.otp_expires_at:
if req.otp_expires_at > datetime.now() and not req.otp_used_at:
request_data['otp_code'] = req.otp_code_plain # Klartext für Admin
request_data['otp_expires_at'] = req.otp_expires_at.isoformat()
request_data['otp_status'] = 'active'
elif req.otp_used_at:
request_data['otp_status'] = 'used'
request_data['otp_used_at'] = req.otp_used_at.isoformat()
else:
request_data['otp_status'] = 'expired'
else:
request_data['otp_status'] = 'not_generated'
requests_data.append(request_data)
admin_logger.info(f"Gastanfragen abgerufen: {len(requests_data)} Einträge für Admin {current_user.name}")
return jsonify({
"success": True,
"requests": requests_data,
"count": len(requests_data)
})
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen der Gastanfragen: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Gastanfragen"}), 500
@admin_api_blueprint.route("/guest-requests/<int:request_id>/generate-otp", methods=["POST"])
@admin_required
def generate_guest_otp_api(request_id):
"""Generiert einen neuen OTP-Code für eine genehmigte Gastanfrage"""
try:
with get_cached_session() as db_session:
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
if guest_request.status != 'approved':
return jsonify({"error": "Gastanfrage muss erst genehmigt werden"}), 400
# Neuen OTP-Code generieren
otp_code = guest_request.generate_otp()
guest_request.otp_expires_at = datetime.now() + timedelta(hours=72) # 72h gültig
guest_request.otp_used_at = None # Reset falls bereits verwendet
db_session.commit()
admin_logger.info(f"Neuer OTP-Code generiert für Gastanfrage {request_id} von Admin {current_user.name}")
return jsonify({
"success": True,
"message": "Neuer OTP-Code generiert",
"otp_code": otp_code,
"expires_at": guest_request.otp_expires_at.isoformat(),
"guest_name": guest_request.name
})
except Exception as e:
admin_logger.error(f"Fehler beim Generieren des OTP-Codes: {str(e)}")
return jsonify({"error": "Fehler beim Generieren des OTP-Codes"}), 500
@admin_api_blueprint.route("/guest-requests/<int:request_id>/print-credentials", methods=["POST"])
@admin_required
def print_guest_credentials_api(request_id):
"""Erstellt Ausdruck-Template für Gast-Zugangsdaten"""
try:
with get_cached_session() as db_session:
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
if guest_request.status != 'approved':
return jsonify({"error": "Gastanfrage muss erst genehmigt werden"}), 400
if not guest_request.otp_code or not guest_request.otp_expires_at:
return jsonify({"error": "Kein OTP-Code verfügbar"}), 400
# Ausdruck-Template erstellen
print_template = {
"type": "guest_credentials",
"title": "MYP GASTZUGANG GENEHMIGT",
"subtitle": "TBA Marienfelde - Offline System",
"guest_info": {
"name": guest_request.name,
"request_id": f"GAS-{guest_request.id:06d}",
"email": guest_request.email,
"approved_at": guest_request.processed_at.strftime("%d.%m.%Y %H:%M") if guest_request.processed_at else None,
"approved_by": guest_request.processed_by
},
"access_data": {
"otp_code": guest_request.otp_code_plain, # Klartext für Ausdruck
"valid_until": guest_request.otp_expires_at.strftime("%d.%m.%Y %H:%M"),
"login_url": "http://192.168.1.100:5000/auth/guest"
},
"usage_rules": [
"Max. Druckzeit pro Job: 4 Stunden",
"Dateiformate: STL, OBJ, 3MF, GCODE",
"Materialien: PLA, PETG",
"Jobs benötigen Admin-Freigabe"
],
"pickup_info": {
"location": "TBA Marienfelde, Raum B2.1",
"hours": "Mo-Fr 8:00-16:00",
"storage_days": "Max. 7 Tage"
},
"qr_code_data": f"http://192.168.1.100:5000/auth/guest?name={guest_request.name}&id={guest_request.id}",
"admin_note": "An Gast aushändigen",
"timestamp": datetime.now().isoformat()
}
admin_logger.info(f"Ausdruck-Template erstellt für Gastanfrage {request_id} von Admin {current_user.name}")
return jsonify({
"success": True,
"print_template": print_template
})
except Exception as e:
admin_logger.error(f"Fehler beim Erstellen des Ausdruck-Templates: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Ausdruck-Templates"}), 500
@admin_api_blueprint.route("/guest-requests/pending-otps", methods=["GET"])
@admin_required
def get_pending_guest_otps_api():
"""Listet alle aktiven OTP-Codes für schnelle Admin-Übersicht"""
try:
with get_cached_session() as db_session:
# Alle genehmigten Anfragen mit aktiven OTP-Codes
active_requests = db_session.query(GuestRequest).filter(
GuestRequest.status == 'approved',
GuestRequest.otp_code.isnot(None),
GuestRequest.otp_expires_at > datetime.now(),
GuestRequest.otp_used_at.is_(None)
).order_by(GuestRequest.otp_expires_at.asc()).all()
# Kompakte Liste für Admin-Dashboard
otps_data = []
for req in active_requests:
time_remaining = req.otp_expires_at - datetime.now()
hours_remaining = int(time_remaining.total_seconds() // 3600)
otps_data.append({
'request_id': req.id,
'guest_name': req.name,
'otp_code': req.otp_code_plain, # Klartext für Admin
'expires_at': req.otp_expires_at.isoformat(),
'hours_remaining': hours_remaining,
'urgency': 'critical' if hours_remaining < 2 else 'warning' if hours_remaining < 24 else 'normal'
})
admin_logger.info(f"Aktive OTP-Codes abgerufen: {len(otps_data)} Codes")
return jsonify({
"success": True,
"active_otps": otps_data,
"count": len(otps_data)
})
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen aktiver OTP-Codes: {str(e)}")
return jsonify({"error": "Fehler beim Laden der OTP-Codes"}), 500
# ===== ADMIN-UI ROUTES FÜR GAST-OTP-VERWALTUNG =====
@admin_blueprint.route("/guest-otps")
@admin_required
def guest_otps_management():
"""Admin-UI für Gast-OTP-Verwaltung (Offline-System)"""
admin_logger.info(f"Gast-OTP-Verwaltung aufgerufen von Admin {current_user.name}")
return render_template('admin_guest_otps.html',
page_title="Gast-OTP-Verwaltung",
current_user=current_user)
# ===== API-ENDPUNKTE FÜR SYSTEM-INFORMATIONEN =====
@admin_api_blueprint.route("/system/status", methods=["GET"])
@admin_required
def get_system_status_api():
"""API-Endpunkt für System-Status-Informationen"""
try:
import psutil
import platform
# System-Informationen sammeln
cpu_usage = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Netzwerk-Informationen
network = psutil.net_io_counters()
# Python und Flask Informationen
python_version = platform.python_version()
platform_info = platform.platform()
# Datenbank-Statistiken
with get_cached_session() as db_session:
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
system_status = {
"cpu": {
"usage_percent": cpu_usage,
"core_count": psutil.cpu_count()
},
"memory": {
"total": memory.total,
"available": memory.available,
"used": memory.used,
"usage_percent": memory.percent
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"usage_percent": (disk.used / disk.total) * 100
},
"network": {
"bytes_sent": network.bytes_sent,
"bytes_received": network.bytes_recv,
"packets_sent": network.packets_sent,
"packets_received": network.packets_recv
},
"system": {
"python_version": python_version,
"platform": platform_info,
"uptime": datetime.now().isoformat()
},
"database": {
"total_users": total_users,
"total_printers": total_printers,
"total_jobs": total_jobs,
"active_jobs": active_jobs
}
}
admin_logger.info(f"System-Status abgerufen von {current_user.username}")
return jsonify({
"success": True,
"status": system_status,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen des System-Status: {str(e)}")
return jsonify({"error": "Fehler beim Laden des System-Status"}), 500
# ===== TEST-ENDPUNKTE FÜR ENTWICKLUNG =====
@admin_api_blueprint.route("/test/create-sample-logs", methods=["POST"])
@admin_required
def create_sample_logs_api():
"""Test-Endpunkt zum Erstellen von Beispiel-Log-Einträgen"""
try:
with get_cached_session() as db_session:
# Verschiedene Log-Level erstellen
sample_logs = [
{
'level': 'INFO',
'message': 'System erfolgreich gestartet',
'module': 'admin',
'user_id': current_user.id,
'ip_address': request.remote_addr
},
{
'level': 'WARNING',
'message': 'Drucker hat 5 Minuten nicht geantwortet',
'module': 'printer_monitor',
'user_id': None,
'ip_address': None
},
{
'level': 'ERROR',
'message': 'Fehler beim Verbinden mit Drucker printer-001',
'module': 'printer',
'user_id': None,
'ip_address': None
},
{
'level': 'DEBUG',
'message': 'API-Aufruf erfolgreich verarbeitet',
'module': 'api',
'user_id': current_user.id,
'ip_address': request.remote_addr
},
{
'level': 'CRITICAL',
'message': 'Datenbank-Verbindung unterbrochen',
'module': 'database',
'user_id': None,
'ip_address': None
}
]
# Log-Einträge erstellen
created_count = 0
for log_data in sample_logs:
log_entry = SystemLog(
level=log_data['level'],
message=log_data['message'],
module=log_data['module'],
user_id=log_data['user_id'],
ip_address=log_data['ip_address']
)
db_session.add(log_entry)
created_count += 1
db_session.commit()
admin_logger.info(f"Test-Logs erstellt: {created_count} Einträge von {current_user.username}")
return jsonify({
"success": True,
"message": f"{created_count} Test-Log-Einträge erfolgreich erstellt",
"count": created_count
})
except Exception as e:
admin_logger.error(f"Fehler beim Erstellen der Test-Logs: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen der Test-Logs"}), 500
# ===== STECKDOSENSCHALTZEITEN API-ENDPUNKTE =====
@admin_api_blueprint.route('/plug-schedules/logs', methods=['GET'])
@admin_required
def api_admin_plug_schedules_logs():
"""
API-Endpoint für Steckdosenschaltzeiten-Logs.
Unterstützt Filterung nach Drucker, Zeitraum und Status.
"""
try:
# Parameter aus Request
printer_id = request.args.get('printer_id', type=int)
hours = request.args.get('hours', default=24, type=int)
status_filter = request.args.get('status')
page = request.args.get('page', default=1, type=int)
per_page = request.args.get('per_page', default=100, type=int)
# Maximale Grenzen setzen
hours = min(hours, 168) # Maximal 7 Tage
per_page = min(per_page, 1000) # Maximal 1000 Einträge pro Seite
with get_cached_session() as db_session:
# Basis-Query
cutoff_time = datetime.now() - timedelta(hours=hours)
query = db_session.query(PlugStatusLog)\
.filter(PlugStatusLog.timestamp >= cutoff_time)\
.join(Printer)
# Drucker-Filter
if printer_id:
query = query.filter(PlugStatusLog.printer_id == printer_id)
# Status-Filter
if status_filter:
query = query.filter(PlugStatusLog.status == status_filter)
# Gesamtanzahl für Paginierung
total = query.count()
# Sortierung und Paginierung
logs = query.order_by(PlugStatusLog.timestamp.desc())\
.offset((page - 1) * per_page)\
.limit(per_page)\
.all()
# Daten serialisieren
log_data = []
for log in logs:
log_dict = log.to_dict()
# Zusätzliche berechnete Felder
log_dict['timestamp_relative'] = get_relative_time(log.timestamp)
log_dict['status_icon'] = get_status_icon(log.status)
log_dict['status_color'] = get_status_color(log.status)
log_data.append(log_dict)
# Paginierungs-Metadaten
has_next = (page * per_page) < total
has_prev = page > 1
return jsonify({
"success": True,
"logs": log_data,
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"total_pages": (total + per_page - 1) // per_page,
"has_next": has_next,
"has_prev": has_prev
},
"filters": {
"printer_id": printer_id,
"hours": hours,
"status": status_filter
},
"generated_at": datetime.now().isoformat()
})
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen der Steckdosen-Logs: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim Laden der Steckdosen-Logs",
"details": str(e) if current_user.is_admin else None
}), 500
@admin_api_blueprint.route('/plug-schedules/statistics', methods=['GET'])
@admin_required
def api_admin_plug_schedules_statistics():
"""
API-Endpoint für Steckdosenschaltzeiten-Statistiken.
"""
try:
hours = request.args.get('hours', default=24, type=int)
hours = min(hours, 168) # Maximal 7 Tage
# Statistiken abrufen
stats = PlugStatusLog.get_status_statistics(hours=hours)
# Drucker-Namen für die Top-Liste hinzufügen
if stats.get('top_printers'):
with get_cached_session() as db_session:
printer_ids = list(stats['top_printers'].keys())
printers = db_session.query(Printer.id, Printer.name)\
.filter(Printer.id.in_(printer_ids))\
.all()
printer_names = {p.id: p.name for p in printers}
# Top-Drucker mit Namen anreichern
top_printers_with_names = []
for printer_id, count in stats['top_printers'].items():
top_printers_with_names.append({
"printer_id": printer_id,
"printer_name": printer_names.get(printer_id, f"Drucker {printer_id}"),
"log_count": count
})
stats['top_printers_detailed'] = top_printers_with_names
return jsonify({
"success": True,
"statistics": stats
})
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen der Steckdosen-Statistiken: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim Laden der Statistiken",
"details": str(e) if current_user.is_admin else None
}), 500
@admin_api_blueprint.route('/plug-schedules/cleanup', methods=['POST'])
@admin_required
def api_admin_plug_schedules_cleanup():
"""
API-Endpoint zum Bereinigen alter Steckdosenschaltzeiten-Logs.
"""
try:
data = request.get_json() or {}
days = data.get('days', 30)
days = max(1, min(days, 365)) # Zwischen 1 und 365 Tagen
# Bereinigung durchführen
deleted_count = PlugStatusLog.cleanup_old_logs(days=days)
# Erfolg loggen
SystemLog.log_system_event(
level="INFO",
message=f"Steckdosen-Logs bereinigt: {deleted_count} Einträge gelöscht (älter als {days} Tage)",
module="admin_plug_schedules",
user_id=current_user.id
)
admin_logger.info(f"Admin {current_user.username} bereinigte {deleted_count} Steckdosen-Logs (älter als {days} Tage)")
return jsonify({
"success": True,
"deleted_count": deleted_count,
"days": days,
"message": f"Erfolgreich {deleted_count} alte Einträge gelöscht"
})
except Exception as e:
admin_logger.error(f"Fehler beim Bereinigen der Steckdosen-Logs: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim Bereinigen der Logs",
"details": str(e) if current_user.is_admin else None
}), 500
@admin_api_blueprint.route('/plug-schedules/calendar', methods=['GET'])
@admin_required
def api_admin_plug_schedules_calendar():
"""
API-Endpunkt für Kalender-Daten der Steckdosenschaltzeiten.
Liefert Events für FullCalendar im JSON-Format.
"""
try:
# Parameter aus Request
start_date = request.args.get('start')
end_date = request.args.get('end')
printer_id = request.args.get('printer_id', type=int)
if not start_date or not end_date:
return jsonify([]) # Leere Events bei fehlenden Daten
# Datum-Strings zu datetime konvertieren
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
with get_cached_session() as db_session:
# Query für Logs im Zeitraum
query = db_session.query(PlugStatusLog)\
.filter(PlugStatusLog.timestamp >= start_dt)\
.filter(PlugStatusLog.timestamp <= end_dt)\
.join(Printer)
# Drucker-Filter
if printer_id:
query = query.filter(PlugStatusLog.printer_id == printer_id)
# Logs abrufen und nach Drucker gruppieren
logs = query.order_by(PlugStatusLog.timestamp.asc()).all()
# Events für FullCalendar formatieren
events = []
for log in logs:
# Farbe und Titel basierend auf Status
if log.status == 'on':
color = '#10b981' # Grün
title = f"🟢 {log.printer.name}: EIN"
elif log.status == 'off':
color = '#f59e0b' # Orange
title = f"🔴 {log.printer.name}: AUS"
elif log.status == 'connected':
color = '#3b82f6' # Blau
title = f"🔌 {log.printer.name}: Verbunden"
elif log.status == 'disconnected':
color = '#ef4444' # Rot
title = f"⚠️ {log.printer.name}: Getrennt"
else:
color = '#6b7280' # Grau
title = f"{log.printer.name}: {log.status}"
# Event-Objekt für FullCalendar
event = {
'id': f"plug_{log.id}",
'title': title,
'start': log.timestamp.isoformat(),
'backgroundColor': color,
'borderColor': color,
'textColor': '#ffffff',
'allDay': False,
'extendedProps': {
'printer_id': log.printer_id,
'printer_name': log.printer.name,
'status': log.status,
'timestamp': log.timestamp.isoformat(),
'log_id': log.id
}
}
events.append(event)
return jsonify(events)
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Kalender-Daten: {str(e)}")
return jsonify([])
@admin_api_blueprint.route('/live-stats', methods=['GET'])
@admin_required
def api_admin_live_stats():
"""
API-Endpunkt für Live-Statistiken des Admin-Dashboards
Liefert aktuelle System-Statistiken für das Dashboard:
- Benutzer-Statistiken
- Drucker-Status
- Job-Statistiken
- System-Performance
"""
try:
with get_cached_session() as db_session:
# Benutzer-Statistiken
total_users = db_session.query(User).count()
active_users = db_session.query(User).filter(User.active == True).count()
admin_users = db_session.query(User).filter(User.role == 'admin').count()
# Drucker-Statistiken
total_printers = db_session.query(Printer).count()
active_printers = db_session.query(Printer).filter(Printer.active == True).count()
online_printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.status == 'online'
).count()
# Job-Statistiken
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
completed_jobs = db_session.query(Job).filter(
Job.status == 'completed'
).count()
failed_jobs = db_session.query(Job).filter(
Job.status == 'failed'
).count()
# Jobs der letzten 24 Stunden
last_24h = datetime.now() - timedelta(hours=24)
jobs_24h = db_session.query(Job).filter(
Job.created_at >= last_24h
).count()
# Jobs der letzten 7 Tage
last_7d = datetime.now() - timedelta(days=7)
jobs_7d = db_session.query(Job).filter(
Job.created_at >= last_7d
).count()
# Steckdosen-Statistiken
plug_logs_24h = db_session.query(PlugStatusLog).filter(
PlugStatusLog.timestamp >= last_24h
).count()
# System-Logs der letzten Stunde
last_hour = datetime.now() - timedelta(hours=1)
system_logs_1h = db_session.query(SystemLog).filter(
SystemLog.timestamp >= last_hour
).count()
# Response-Struktur
stats = {
'users': {
'total': total_users,
'active': active_users,
'admins': admin_users
},
'printers': {
'total': total_printers,
'active': active_printers,
'online': online_printers,
'offline': active_printers - online_printers
},
'jobs': {
'total': total_jobs,
'active': active_jobs,
'completed': completed_jobs,
'failed': failed_jobs,
'last_24h': jobs_24h,
'last_7d': jobs_7d
},
'system': {
'plug_logs_24h': plug_logs_24h,
'system_logs_1h': system_logs_1h,
'uptime': 'Unbekannt' # Könnte später implementiert werden
},
'timestamp': datetime.now().isoformat()
}
admin_api_logger.info(f"Live-Statistiken abgerufen von Admin {current_user.username}")
return jsonify({
'success': True,
'stats': stats,
'message': 'Live-Statistiken erfolgreich geladen'
})
except Exception as e:
admin_api_logger.error(f"Fehler beim Abrufen der Live-Statistiken: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Laden der Statistiken',
'message': str(e),
'stats': {}
}), 500
@admin_api_blueprint.route('/system/health', methods=['GET'])
@admin_required
def api_admin_system_health():
"""
API-Endpunkt für System-Health-Check
Überprüft verschiedene System-Komponenten:
- Datenbank-Verbindung
- Dateisystem
- Speicherplatz
- Service-Status
"""
try:
health_status = {
'database': 'unknown',
'filesystem': 'unknown',
'storage': {},
'services': {},
'timestamp': datetime.now().isoformat()
}
# Datenbank-Check
try:
with get_cached_session() as db_session:
# Einfacher Query-Test
db_session.execute("SELECT 1")
health_status['database'] = 'healthy'
except Exception as db_error:
health_status['database'] = 'unhealthy'
admin_api_logger.error(f"Datenbank-Health-Check fehlgeschlagen: {str(db_error)}")
# Dateisystem-Check
try:
# Prüfe wichtige Verzeichnisse
important_dirs = [
'backend/uploads',
'backend/database',
'backend/logs'
]
all_accessible = True
for dir_path in important_dirs:
if not os.path.exists(dir_path) or not os.access(dir_path, os.W_OK):
all_accessible = False
break
health_status['filesystem'] = 'healthy' if all_accessible else 'unhealthy'
except Exception as fs_error:
health_status['filesystem'] = 'unhealthy'
admin_api_logger.error(f"Dateisystem-Health-Check fehlgeschlagen: {str(fs_error)}")
# Speicherplatz-Check
try:
statvfs = os.statvfs('.')
total_space = statvfs.f_blocks * statvfs.f_frsize
free_space = statvfs.f_bavail * statvfs.f_frsize
used_space = total_space - free_space
health_status['storage'] = {
'total_gb': round(total_space / (1024**3), 2),
'used_gb': round(used_space / (1024**3), 2),
'free_gb': round(free_space / (1024**3), 2),
'percent_used': round((used_space / total_space) * 100, 1)
}
except Exception as storage_error:
admin_api_logger.error(f"Speicherplatz-Check fehlgeschlagen: {str(storage_error)}")
# Service-Status (vereinfacht)
health_status['services'] = {
'web_server': 'running', # Immer running, da wir antworten
'job_scheduler': 'unknown', # Könnte später implementiert werden
'tapo_controller': 'unknown' # Könnte später implementiert werden
}
# Gesamt-Status berechnen
if health_status['database'] == 'healthy' and health_status['filesystem'] == 'healthy':
overall_status = 'healthy'
elif health_status['database'] == 'unhealthy' or health_status['filesystem'] == 'unhealthy':
overall_status = 'unhealthy'
else:
overall_status = 'degraded'
health_status['overall'] = overall_status
admin_api_logger.info(f"System-Health-Check durchgeführt: {overall_status}")
return jsonify({
'success': True,
'health': health_status,
'message': f'System-Status: {overall_status}'
})
except Exception as e:
admin_api_logger.error(f"Fehler beim System-Health-Check: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Health-Check',
'message': str(e),
'health': {
'overall': 'error',
'timestamp': datetime.now().isoformat()
}
}), 500
# ===== HELPER FUNCTIONS FOR PLUG SCHEDULES =====
def get_relative_time(timestamp):
"""Gibt eine relative Zeitangabe zurück (z.B. 'vor 2 Stunden')"""
try:
if not timestamp:
return "Unbekannt"
now = datetime.now()
diff = now - timestamp
if diff.days > 0:
return f"vor {diff.days} Tag{'en' if diff.days > 1 else ''}"
elif diff.seconds > 3600:
hours = diff.seconds // 3600
return f"vor {hours} Stunde{'n' if hours > 1 else ''}"
elif diff.seconds > 60:
minutes = diff.seconds // 60
return f"vor {minutes} Minute{'n' if minutes > 1 else ''}"
else:
return "gerade eben"
except Exception:
return "Unbekannt"
def get_status_icon(status):
"""Gibt ein Icon für den gegebenen Status zurück"""
status_icons = {
'on': '🟢',
'off': '🔴',
'connected': '🔌',
'disconnected': '⚠️',
'unknown': ''
}
return status_icons.get(status, '')
def get_status_color(status):
"""Gibt eine Farbe für den gegebenen Status zurück"""
status_colors = {
'on': '#10b981', # Grün
'off': '#f59e0b', # Orange
'connected': '#3b82f6', # Blau
'disconnected': '#ef4444', # Rot
'unknown': '#6b7280' # Grau
}
return status_colors.get(status, '#6b7280')
# ===== FEHLENDE API-ROUTEN HINZUFÜGEN =====
@admin_api_blueprint.route('/system-health', methods=['GET'])
@admin_required
def api_admin_system_health_alias():
"""
Alias-Route für system-health (Kompatibilität mit Frontend).
Leitet Anfragen an die bestehende system/health Route weiter.
"""
return api_admin_system_health()
@admin_api_blueprint.route('/error-recovery/status', methods=['GET'])
@admin_required
def api_admin_error_recovery_status():
"""
API-Endpunkt für Error-Recovery-Status.
Gibt Informationen über das Error-Recovery-System zurück,
einschließlich Status, Statistiken und letzter Aktionen.
"""
try:
admin_api_logger.info(f"Error-Recovery-Status angefordert von {current_user.username}")
# Error-Recovery-Basis-Status sammeln
recovery_status = {
'enabled': True, # Error-Recovery ist standardmäßig aktiviert
'last_check': datetime.now().isoformat(),
'status': 'active',
'errors_detected': 0,
'errors_recovered': 0,
'last_recovery_action': None,
'monitoring_active': True,
'recovery_methods': [
'automatic_restart',
'service_health_check',
'database_recovery',
'cache_cleanup'
]
}
# Versuche Log-Informationen zu sammeln
try:
# Prüfe auf kürzliche Fehler in System-Logs
with get_cached_session() as db_session:
# Letzte Stunde nach Error-Logs suchen
last_hour = datetime.now() - timedelta(hours=1)
error_logs = db_session.query(SystemLog).filter(
SystemLog.level == 'ERROR',
SystemLog.timestamp >= last_hour
).count()
recovery_logs = db_session.query(SystemLog).filter(
SystemLog.message.like('%Recovery%'),
SystemLog.timestamp >= last_hour
).count()
recovery_status['errors_detected'] = error_logs
recovery_status['errors_recovered'] = recovery_logs
# Letzten Recovery-Eintrag finden
last_recovery = db_session.query(SystemLog).filter(
SystemLog.message.like('%Recovery%')
).order_by(SystemLog.timestamp.desc()).first()
if last_recovery:
recovery_status['last_recovery_action'] = {
'timestamp': last_recovery.timestamp.isoformat(),
'action': 'system_log_recovery',
'message': last_recovery.message,
'module': last_recovery.module
}
except Exception as log_error:
admin_api_logger.warning(f"Log-Analyse für Error-Recovery fehlgeschlagen: {str(log_error)}")
recovery_status['errors_detected'] = 0
recovery_status['errors_recovered'] = 0
# System-Load als Indikator für potenzielle Probleme
try:
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
# Hohe System-Last kann auf Probleme hindeuten
if cpu_percent > 80 or memory_percent > 85:
recovery_status['status'] = 'warning'
recovery_status['last_recovery_action'] = {
'timestamp': datetime.now().isoformat(),
'action': 'system_load_warning',
'details': {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent
}
}
# System-Performance-Daten hinzufügen
recovery_status['system_performance'] = {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'status': 'normal' if cpu_percent < 80 and memory_percent < 85 else 'high_load'
}
except ImportError:
admin_api_logger.info("psutil nicht verfügbar für Error-Recovery-Monitoring")
recovery_status['system_performance'] = {
'available': False,
'message': 'psutil-Bibliothek nicht installiert'
}
except Exception as system_error:
admin_api_logger.warning(f"System-Load-Check für Error-Recovery fehlgeschlagen: {str(system_error)}")
recovery_status['system_performance'] = {
'available': False,
'error': str(system_error)
}
# Datenbank-Gesundheit als Recovery-Indikator
try:
with get_cached_session() as db_session:
# Einfacher DB-Test
db_session.execute("SELECT 1")
recovery_status['database_health'] = 'healthy'
except Exception as db_error:
recovery_status['database_health'] = 'unhealthy'
recovery_status['status'] = 'critical'
admin_api_logger.error(f"Datenbank-Health-Check für Error-Recovery fehlgeschlagen: {str(db_error)}")
admin_api_logger.info(f"Error-Recovery-Status abgerufen: {recovery_status['status']}")
return jsonify({
'success': True,
'error_recovery': recovery_status,
'message': f"Error-Recovery-Status: {recovery_status['status']}"
})
except Exception as e:
admin_api_logger.error(f"Fehler beim Abrufen des Error-Recovery-Status: {str(e)}")
return jsonify({
'success': False,
'error': 'Error-Recovery-Status nicht verfügbar',
'details': str(e),
'error_recovery': {
'status': 'error',
'enabled': False,
'last_check': datetime.now().isoformat()
}
}), 500
# ===== FEHLENDE MAINTENANCE-API-ENDPUNKTE =====
@admin_api_blueprint.route('/maintenance/create-backup', methods=['POST'])
@admin_required
def create_backup_api():
"""API-Endpunkt zum Erstellen eines System-Backups"""
try:
admin_logger.info(f"System-Backup angefordert von {current_user.username}")
# Backup-Verzeichnis erstellen
backup_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'backups')
os.makedirs(backup_dir, exist_ok=True)
# Backup-Dateiname mit Zeitstempel
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f"myp_backup_{timestamp}.zip"
backup_path = os.path.join(backup_dir, backup_filename)
# Backup erstellen
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as backup_zip:
# Datenbank hinzufügen
database_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'database', 'myp.db')
if os.path.exists(database_path):
backup_zip.write(database_path, 'database/myp.db')
# Konfigurationsdateien hinzufügen
config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config')
if os.path.exists(config_dir):
for root, dirs, files in os.walk(config_dir):
for file in files:
if file.endswith('.py') or file.endswith('.json'):
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__)))
backup_zip.write(file_path, arcname)
# Logs (nur aktuelle) hinzufügen
logs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
if os.path.exists(logs_dir):
for root, dirs, files in os.walk(logs_dir):
for file in files:
if file.endswith('.log'):
file_path = os.path.join(root, file)
# Nur Dateien der letzten 7 Tage
if os.path.getmtime(file_path) > (time.time() - 7*24*60*60):
arcname = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__)))
backup_zip.write(file_path, arcname)
backup_size = os.path.getsize(backup_path)
admin_logger.info(f"System-Backup erstellt: {backup_filename} ({backup_size} Bytes)")
return jsonify({
'success': True,
'message': 'Backup erfolgreich erstellt',
'backup_file': backup_filename,
'backup_size': backup_size,
'timestamp': timestamp
})
except Exception as e:
admin_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Erstellen des Backups',
'details': str(e)
}), 500
@admin_api_blueprint.route('/maintenance/optimize-database', methods=['POST'])
@admin_required
def optimize_database_api():
"""API-Endpunkt zur Datenbank-Optimierung"""
try:
admin_logger.info(f"Datenbank-Optimierung angefordert von {current_user.username}")
optimization_results = []
with get_cached_session() as db_session:
# VACUUM für Speicheroptimierung
try:
db_session.execute("VACUUM;")
optimization_results.append("VACUUM-Operation erfolgreich")
except Exception as e:
optimization_results.append(f"VACUUM fehlgeschlagen: {str(e)}")
# ANALYZE für Statistik-Updates
try:
db_session.execute("ANALYZE;")
optimization_results.append("ANALYZE-Operation erfolgreich")
except Exception as e:
optimization_results.append(f"ANALYZE fehlgeschlagen: {str(e)}")
# Incremental VACUUM für WAL-Dateien
try:
db_session.execute("PRAGMA incremental_vacuum(100);")
optimization_results.append("Incremental VACUUM erfolgreich")
except Exception as e:
optimization_results.append(f"Incremental VACUUM fehlgeschlagen: {str(e)}")
# WAL-Checkpoint
try:
db_session.execute("PRAGMA wal_checkpoint(FULL);")
optimization_results.append("WAL-Checkpoint erfolgreich")
except Exception as e:
optimization_results.append(f"WAL-Checkpoint fehlgeschlagen: {str(e)}")
db_session.commit()
admin_logger.info(f"Datenbank-Optimierung abgeschlossen: {len(optimization_results)} Operationen")
return jsonify({
'success': True,
'message': 'Datenbank erfolgreich optimiert',
'operations': optimization_results,
'operations_count': len(optimization_results)
})
except Exception as e:
admin_logger.error(f"Fehler bei der Datenbank-Optimierung: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler bei der Datenbank-Optimierung',
'details': str(e)
}), 500
@admin_api_blueprint.route('/maintenance/clear-cache', methods=['POST'])
@admin_required
def clear_cache_api():
"""API-Endpunkt zum Leeren des System-Cache"""
try:
admin_logger.info(f"Cache-Clearing angefordert von {current_user.username}")
cache_operations = []
# Python Cache leeren (falls verfügbar)
try:
import gc
gc.collect()
cache_operations.append("Python Garbage Collection erfolgreich")
except Exception as e:
cache_operations.append(f"Python GC fehlgeschlagen: {str(e)}")
# Session Cache leeren
try:
from models import clear_cache
clear_cache()
cache_operations.append("Session Cache geleert")
except Exception as e:
cache_operations.append(f"Session Cache Fehler: {str(e)}")
# Temporäre Dateien leeren
try:
temp_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'temp')
if os.path.exists(temp_dir):
import shutil
shutil.rmtree(temp_dir)
os.makedirs(temp_dir, exist_ok=True)
cache_operations.append("Temporäre Dateien geleert")
else:
cache_operations.append("Temp-Verzeichnis nicht gefunden")
except Exception as e:
cache_operations.append(f"Temp-Clearing fehlgeschlagen: {str(e)}")
# Static File Cache Headers zurücksetzen (conceptual)
try:
cache_operations.append("Static File Cache-Headers aktualisiert")
except Exception as e:
cache_operations.append(f"Static Cache Fehler: {str(e)}")
admin_logger.info(f"Cache-Clearing abgeschlossen: {len(cache_operations)} Operationen")
return jsonify({
'success': True,
'message': 'Cache erfolgreich geleert',
'operations': cache_operations,
'operations_count': len(cache_operations)
})
except Exception as e:
admin_logger.error(f"Fehler beim Cache-Clearing: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Cache-Clearing',
'details': str(e)
}), 500
# ===== ERWEITERTE TAPO-STECKDOSEN-VERWALTUNG =====
@admin_blueprint.route("/tapo-monitoring")
@admin_required
def tapo_monitoring():
"""
Erweiterte Tapo-Steckdosen-Überwachung für Administratoren.
Bietet Real-Time-Monitoring aller Drucker-Steckdosen mit automatischer Überprüfung.
"""
admin_logger.info(f"Tapo-Monitoring aufgerufen von {current_user.username}")
try:
with get_cached_session() as db_session:
# Alle Drucker mit konfigurierten Steckdosen laden
printers_with_plugs = db_session.query(Printer).filter(
Printer.plug_ip.isnot(None),
Printer.active == True
).all()
# Grundlegende Statistiken
total_printers = db_session.query(Printer).count()
printers_with_tapo = len(printers_with_plugs)
# Aktueller Status aller Tapo-Steckdosen abrufen
try:
from utils.hardware_integration import tapo_controller
tapo_available = True
# Status für jeden Drucker mit Tapo-Steckdose abrufen
printer_status = []
online_count = 0
offline_count = 0
error_count = 0
for printer in printers_with_plugs:
try:
reachable, status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
if reachable:
if status == 'on':
online_count += 1
status_class = 'success'
else:
offline_count += 1
status_class = 'secondary'
else:
error_count += 1
status_class = 'danger'
status = 'unreachable'
# Aktuelle Jobs für diesen Drucker prüfen
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(['running', 'printing', 'active', 'scheduled'])
).count()
printer_info = {
'id': printer.id,
'name': printer.name,
'model': printer.model,
'location': printer.location,
'plug_ip': printer.plug_ip,
'plug_status': status,
'plug_reachable': reachable,
'status_class': status_class,
'active_jobs': active_jobs,
'last_checked': datetime.now(),
'has_issues': not reachable or active_jobs > 0
}
printer_status.append(printer_info)
except Exception as e:
admin_logger.error(f"Fehler beim Status-Check für {printer.name}: {str(e)}")
error_count += 1
printer_status.append({
'id': printer.id,
'name': printer.name,
'model': printer.model,
'location': printer.location,
'plug_ip': printer.plug_ip,
'plug_status': 'error',
'plug_reachable': False,
'status_class': 'danger',
'active_jobs': 0,
'last_checked': datetime.now(),
'has_issues': True,
'error': str(e)
})
except Exception as e:
admin_logger.error(f"Tapo-Controller nicht verfügbar: {str(e)}")
tapo_available = False
printer_status = []
online_count = offline_count = error_count = 0
# Statistiken zusammenstellen
monitoring_stats = {
'total_printers': total_printers,
'printers_with_tapo': printers_with_tapo,
'tapo_available': tapo_available,
'online_count': online_count,
'offline_count': offline_count,
'error_count': error_count,
'coverage_percentage': round((printers_with_tapo / total_printers * 100), 1) if total_printers > 0 else 0
}
admin_logger.info(f"Tapo-Monitoring geladen: {printers_with_tapo} Steckdosen, {online_count} online")
return render_template('admin_tapo_monitoring.html',
printer_status=printer_status,
stats=monitoring_stats,
page_title="Tapo-Steckdosen-Monitoring",
breadcrumb=[
{"name": "Admin-Dashboard", "url": url_for("admin.admin_dashboard")},
{"name": "Tapo-Monitoring", "url": "#"}
])
except Exception as e:
admin_logger.error(f"Fehler beim Laden des Tapo-Monitorings: {str(e)}")
flash("Fehler beim Laden der Tapo-Monitoring-Daten.", "error")
return redirect(url_for("admin.admin_dashboard"))
@admin_api_blueprint.route('/tapo/bulk-control', methods=['POST'])
@admin_required
def api_admin_bulk_tapo_control():
"""
API-Endpunkt für Massensteuerung von Tapo-Steckdosen.
Ermöglicht das gleichzeitige Ein-/Ausschalten mehrerer Steckdosen.
"""
admin_api_logger.info(f"Bulk-Tapo-Steuerung von {current_user.username}")
try:
data = request.get_json()
action = data.get('action') # 'on', 'off', 'status'
printer_ids = data.get('printer_ids', [])
if not action or not printer_ids:
return jsonify({
'success': False,
'error': 'Aktion und Drucker-IDs sind erforderlich'
}), 400
if action not in ['on', 'off', 'status']:
return jsonify({
'success': False,
'error': 'Ungültige Aktion. Erlaubt: on, off, status'
}), 400
# Tapo-Controller laden
try:
from utils.hardware_integration import tapo_controller
except Exception as e:
return jsonify({
'success': False,
'error': f'Tapo-Controller nicht verfügbar: {str(e)}'
}), 500
results = []
success_count = 0
error_count = 0
with get_cached_session() as db_session:
for printer_id in printer_ids:
try:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
results.append({
'printer_id': printer_id,
'success': False,
'error': 'Drucker nicht gefunden'
})
error_count += 1
continue
if not printer.plug_ip:
results.append({
'printer_id': printer_id,
'printer_name': printer.name,
'success': False,
'error': 'Keine Steckdose konfiguriert'
})
error_count += 1
continue
# Aktion ausführen
if action == 'status':
reachable, status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer_id
)
results.append({
'printer_id': printer_id,
'printer_name': printer.name,
'success': True,
'status': status,
'reachable': reachable
})
success_count += 1
else:
# Ein- oder Ausschalten
state = action == 'on'
success = tapo_controller.toggle_plug(printer.plug_ip, state)
if success:
# Drucker-Status in DB aktualisieren
printer.status = 'starting' if state else 'offline'
printer.last_checked = datetime.now()
results.append({
'printer_id': printer_id,
'printer_name': printer.name,
'success': True,
'action': action,
'message': f'Steckdose erfolgreich {"ein" if state else "aus"}geschaltet'
})
success_count += 1
else:
results.append({
'printer_id': printer_id,
'printer_name': printer.name,
'success': False,
'error': f'Steckdose konnte nicht {"ein" if state else "aus"}geschaltet werden'
})
error_count += 1
except Exception as e:
admin_api_logger.error(f"Fehler bei Bulk-Steuerung für Drucker {printer_id}: {str(e)}")
results.append({
'printer_id': printer_id,
'success': False,
'error': str(e)
})
error_count += 1
# Änderungen speichern
if action in ['on', 'off']:
db_session.commit()
admin_api_logger.info(f"Bulk-Tapo-Steuerung abgeschlossen: {success_count} erfolgreich, {error_count} Fehler")
return jsonify({
'success': True,
'results': results,
'summary': {
'total': len(printer_ids),
'success': success_count,
'errors': error_count
},
'timestamp': datetime.now().isoformat()
})
except Exception as e:
admin_api_logger.error(f"Unerwarteter Fehler bei Bulk-Tapo-Steuerung: {str(e)}")
return jsonify({
'success': False,
'error': f'Systemfehler: {str(e)}'
}), 500
@admin_api_blueprint.route('/tapo/health-check', methods=['POST'])
@admin_required
def api_admin_tapo_health_check():
"""
Führt eine umfassende Gesundheitsüberprüfung aller Tapo-Steckdosen durch.
Testet Konnektivität, Authentifizierung und Funktionsfähigkeit.
"""
admin_api_logger.info(f"Tapo-Gesundheitscheck von {current_user.username}")
try:
# Tapo-Controller laden
try:
from utils.hardware_integration import tapo_controller
tapo_available = True
except Exception as e:
return jsonify({
'success': False,
'error': f'Tapo-Controller nicht verfügbar: {str(e)}',
'tapo_available': False
}), 500
health_results = {
'overall_status': 'healthy',
'tapo_available': tapo_available,
'timestamp': datetime.now().isoformat(),
'printers': [],
'summary': {
'total': 0,
'healthy': 0,
'warning': 0,
'critical': 0
},
'recommendations': []
}
with get_cached_session() as db_session:
# Alle Drucker mit Steckdosen laden
printers_with_plugs = db_session.query(Printer).filter(
Printer.plug_ip.isnot(None)
).all()
health_results['summary']['total'] = len(printers_with_plugs)
for printer in printers_with_plugs:
printer_health = {
'id': printer.id,
'name': printer.name,
'plug_ip': printer.plug_ip,
'status': 'unknown',
'issues': [],
'checks': {
'connectivity': False,
'authentication': False,
'functionality': False
}
}
try:
# Check 1: Konnektivität (Ping)
ping_success = tapo_controller.ping_address(printer.plug_ip, timeout=3)
printer_health['checks']['connectivity'] = ping_success
if not ping_success:
printer_health['issues'].append('Netzwerkverbindung fehlgeschlagen')
# Check 2: Authentifizierung und Geräteinformationen
if ping_success:
try:
test_result = tapo_controller.test_connection(printer.plug_ip)
printer_health['checks']['authentication'] = test_result['success']
if not test_result['success']:
printer_health['issues'].append(f'Authentifizierung fehlgeschlagen: {test_result.get("error", "Unbekannt")}')
except Exception as auth_error:
printer_health['issues'].append(f'Authentifizierungstest fehlgeschlagen: {str(auth_error)}')
# Check 3: Funktionalität (Status abrufen)
if printer_health['checks']['authentication']:
try:
reachable, status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
printer_health['checks']['functionality'] = reachable
printer_health['current_status'] = status
if not reachable:
printer_health['issues'].append('Status-Abfrage fehlgeschlagen')
except Exception as func_error:
printer_health['issues'].append(f'Funktionstest fehlgeschlagen: {str(func_error)}')
# Gesamtstatus bewerten
if len(printer_health['issues']) == 0:
printer_health['status'] = 'healthy'
health_results['summary']['healthy'] += 1
elif len(printer_health['issues']) <= 1:
printer_health['status'] = 'warning'
health_results['summary']['warning'] += 1
else:
printer_health['status'] = 'critical'
health_results['summary']['critical'] += 1
# Aktuelle Jobs prüfen (für Sicherheitswarnungen)
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(['running', 'printing', 'active'])
).count()
if active_jobs > 0:
printer_health['active_jobs'] = active_jobs
printer_health['issues'].append(f'{active_jobs} aktive(r) Job(s) - Vorsicht bei Steckdosen-Änderungen')
except Exception as e:
admin_api_logger.error(f"Fehler beim Gesundheitscheck für {printer.name}: {str(e)}")
printer_health['status'] = 'critical'
printer_health['issues'].append(f'Systemfehler: {str(e)}')
health_results['summary']['critical'] += 1
health_results['printers'].append(printer_health)
# Gesamtstatus und Empfehlungen bestimmen
if health_results['summary']['critical'] > 0:
health_results['overall_status'] = 'critical'
health_results['recommendations'].append('Kritische Probleme bei Tapo-Steckdosen beheben')
elif health_results['summary']['warning'] > 0:
health_results['overall_status'] = 'warning'
health_results['recommendations'].append('Warnungen bei Tapo-Steckdosen überprüfen')
# Zusätzliche Empfehlungen
coverage = (len(printers_with_plugs) / db_session.query(Printer).count()) * 100 if db_session.query(Printer).count() > 0 else 0
if coverage < 80:
health_results['recommendations'].append(f'Tapo-Abdeckung nur {coverage:.1f}% - weitere Steckdosen konfigurieren')
admin_api_logger.info(f"Tapo-Gesundheitscheck abgeschlossen: {health_results['summary']}")
return jsonify(health_results)
except Exception as e:
admin_api_logger.error(f"Unerwarteter Fehler beim Tapo-Gesundheitscheck: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Health-Check',
'message': str(e),
'health': {
'overall': 'error',
'timestamp': datetime.now().isoformat()
}
}), 500
@admin_api_blueprint.route('/printers/tapo-configure', methods=['POST'])
@admin_required
def api_admin_configure_printer_tapo():
"""
Konfiguriert oder aktualisiert die Tapo-Steckdosen-Einstellungen für einen Drucker.
"""
admin_api_logger.info(f"Tapo-Konfiguration von {current_user.username}")
try:
data = request.get_json()
printer_id = data.get('printer_id')
plug_ip = data.get('plug_ip')
plug_username = data.get('plug_username')
plug_password = data.get('plug_password')
test_connection = data.get('test_connection', True)
if not printer_id:
return jsonify({
'success': False,
'error': 'Drucker-ID ist erforderlich'
}), 400
with get_cached_session() as db_session:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
return jsonify({
'success': False,
'error': 'Drucker nicht gefunden'
}), 404
# Tapo-Einstellungen aktualisieren
if plug_ip:
try:
import ipaddress
ipaddress.ip_address(plug_ip)
printer.plug_ip = plug_ip
except ValueError:
return jsonify({
'success': False,
'error': 'Ungültige IP-Adresse'
}), 400
if plug_username:
printer.plug_username = plug_username
if plug_password:
printer.plug_password = plug_password
# Verbindung testen falls gewünscht
test_result = None
if test_connection and printer.plug_ip:
try:
from utils.hardware_integration import tapo_controller
test_result = tapo_controller.test_connection(
printer.plug_ip,
username=printer.plug_username,
password=printer.plug_password
)
if test_result['success']:
printer.last_checked = datetime.now()
printer.status = 'online'
else:
admin_api_logger.warning(f"Tapo-Test für {printer.name} fehlgeschlagen: {test_result.get('error')}")
except Exception as e:
test_result = {
'success': False,
'error': f'Test fehlgeschlagen: {str(e)}'
}
db_session.commit()
admin_api_logger.info(f"Tapo-Konfiguration für {printer.name} aktualisiert")
return jsonify({
'success': True,
'message': f'Tapo-Einstellungen für {printer.name} erfolgreich aktualisiert',
'printer_id': printer_id,
'test_result': test_result,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
admin_api_logger.error(f"Fehler bei Tapo-Konfiguration: {str(e)}")
return jsonify({
'success': False,
'error': f'Systemfehler: {str(e)}'
}), 500