4888 lines
182 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import logging
import atexit
from datetime import datetime, timedelta
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file, abort, session, make_response, Response
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_wtf import CSRFProtect
from flask_wtf.csrf import CSRFError
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy import func, text
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple
import time
import subprocess
import json
import signal
from contextlib import contextmanager
# Windows-spezifische Fixes früh importieren (sichere Version)
if os.name == 'nt':
try:
from utils.windows_fixes import get_windows_thread_manager
# apply_all_windows_fixes() wird automatisch beim Import ausgeführt
print("✅ Windows-Fixes (sichere Version) geladen")
except ImportError as e:
# Fallback falls windows_fixes nicht verfügbar
get_windows_thread_manager = None
print(f"⚠️ Windows-Fixes nicht verfügbar: {str(e)}")
else:
get_windows_thread_manager = None
# Lokale Imports
from models import init_database, create_initial_admin, User, Printer, Job, Stats, SystemLog, get_db_session, GuestRequest, UserPermission, Notification
from utils.logging_config import setup_logging, get_logger, measure_execution_time, log_startup_info, debug_request, debug_response
from utils.job_scheduler import JobScheduler, get_job_scheduler
from utils.queue_manager import start_queue_manager, stop_queue_manager, get_queue_manager
from config.settings import SECRET_KEY, UPLOAD_FOLDER, ALLOWED_EXTENSIONS, ENVIRONMENT, SESSION_LIFETIME, SCHEDULER_ENABLED, SCHEDULER_INTERVAL, TAPO_USERNAME, TAPO_PASSWORD
from utils.file_manager import file_manager, save_job_file, save_guest_file, save_avatar_file, delete_file as delete_file_safe
# Blueprints importieren
from blueprints.guest import guest_blueprint
from blueprints.calendar import calendar_blueprint
from blueprints.users import users_blueprint
from blueprints.printers import printers_blueprint
# Scheduler importieren falls verfügbar
try:
from utils.job_scheduler import scheduler
except ImportError:
scheduler = None
# SSL-Kontext importieren falls verfügbar
try:
from utils.ssl_config import get_ssl_context
except ImportError:
def get_ssl_context():
return None
# Template-Helfer importieren falls verfügbar
try:
from utils.template_helpers import register_template_helpers
except ImportError:
def register_template_helpers(app):
pass
# Datenbank-Monitor und Backup-Manager importieren falls verfügbar
try:
from utils.database_monitor import DatabaseMonitor
database_monitor = DatabaseMonitor()
except ImportError:
database_monitor = None
try:
from utils.backup_manager import BackupManager
backup_manager = BackupManager()
except ImportError:
backup_manager = None
# Import neuer Systeme
from utils.rate_limiter import limit_requests, rate_limiter, cleanup_rate_limiter
from utils.security import init_security, require_secure_headers, security_check
from utils.permissions import init_permission_helpers, require_permission, Permission, check_permission
from utils.analytics import analytics_engine, track_event, get_dashboard_stats
# Drucker-Monitor importieren
from utils.printer_monitor import printer_monitor
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
# Security-System initialisieren
app = init_security(app)
# Permission Template Helpers registrieren
init_permission_helpers(app)
# Template-Helper registrieren
register_template_helpers(app)
# CSRF-Error-Handler - Korrigierte Version für Flask-WTF 1.2.1+
@app.errorhandler(CSRFError)
def csrf_error(error):
"""Behandelt CSRF-Fehler und gibt detaillierte Informationen zurück."""
app_logger.error(f"CSRF-Fehler für {request.path}: {error}")
if request.path.startswith('/api/'):
# Für API-Anfragen: JSON-Response
return jsonify({
"error": "CSRF-Token fehlt oder ungültig",
"reason": str(error),
"help": "Fügen Sie ein gültiges CSRF-Token zu Ihrer Anfrage hinzu"
}), 400
else:
# Für normale Anfragen: Weiterleitung zur Fehlerseite
flash("Sicherheitsfehler: Anfrage wurde abgelehnt. Bitte versuchen Sie es erneut.", "error")
return redirect(request.url)
# Blueprints registrieren
app.register_blueprint(guest_blueprint)
app.register_blueprint(calendar_blueprint)
app.register_blueprint(users_blueprint)
app.register_blueprint(printers_blueprint)
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
login_manager.login_message_category = "info"
@login_manager.user_loader
def load_user(user_id):
"""
Robuster User-Loader mit Error-Handling für Schema-Probleme.
"""
try:
# user_id von Flask-Login ist immer ein String - zu Integer konvertieren
try:
user_id_int = int(user_id)
except (ValueError, TypeError):
app_logger.error(f"Ungültige User-ID: {user_id}")
return None
db_session = get_db_session()
# Robuste Abfrage mit Error-Handling
try:
user = db_session.query(User).filter(User.id == user_id_int).first()
db_session.close()
return user
except Exception as db_error:
# Schema-Problem - versuche manuelle Abfrage
app_logger.warning(f"Schema-Problem beim User-Load für ID {user_id_int}: {str(db_error)}")
# Manuelle Abfrage nur mit Basis-Feldern
try:
result = db_session.execute(
text("SELECT id, email, password_hash, name, role, active FROM users WHERE id = :user_id"),
{"user_id": user_id_int}
).fetchone()
if result:
# Manuell User-Objekt erstellen
user = User()
user.id = result[0]
user.email = result[1] if len(result) > 1 else f"user_{user_id_int}@system.local"
user.password_hash = result[2] if len(result) > 2 else ""
user.name = result[3] if len(result) > 3 else f"User {user_id_int}"
user.role = result[4] if len(result) > 4 else "user"
user.active = result[5] if len(result) > 5 else True
# Standard-Werte für fehlende Felder
user.username = getattr(user, 'username', user.email.split('@')[0])
user.created_at = getattr(user, 'created_at', datetime.now())
user.last_login = getattr(user, 'last_login', None)
user.updated_at = getattr(user, 'updated_at', datetime.now())
db_session.close()
return user
except Exception as manual_error:
app_logger.error(f"Auch manuelle User-Abfrage fehlgeschlagen: {str(manual_error)}")
db_session.close()
return None
except Exception as e:
app_logger.error(f"Kritischer Fehler im User-Loader für ID {user_id}: {str(e)}")
return None
# Jinja2 Context Processors
@app.context_processor
def inject_now():
"""Inject the current datetime into templates."""
return {'now': datetime.now()}
# Custom Jinja2 filter für Datumsformatierung
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Format a datetime object to a German-style date and time string"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return value
return value.strftime(format)
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
auth_logger = get_logger("auth")
jobs_logger = get_logger("jobs")
printers_logger = get_logger("printers")
user_logger = get_logger("user")
kiosk_logger = get_logger("kiosk")
# HTTP-Request/Response-Middleware für automatisches Debug-Logging
@app.before_request
def log_request_info():
"""Loggt detaillierte Informationen über eingehende HTTP-Anfragen."""
# Nur für API-Endpunkte und wenn Debug-Level aktiviert ist
if request.path.startswith('/api/') or app_logger.level <= logging.DEBUG:
debug_request(app_logger, request)
@app.after_request
def log_response_info(response):
"""Loggt detaillierte Informationen über ausgehende HTTP-Antworten."""
# Nur für API-Endpunkte und wenn Debug-Level aktiviert ist
if request.path.startswith('/api/') or app_logger.level <= logging.DEBUG:
# Berechne Response-Zeit aus dem g-Objekt wenn verfügbar
duration_ms = None
if hasattr(request, '_start_time'):
duration_ms = (time.time() - request._start_time) * 1000
debug_response(app_logger, response, duration_ms)
return response
# Start-Zeit für Request-Timing setzen
@app.before_request
def start_timer():
"""Setzt einen Timer für die Request-Bearbeitung."""
request._start_time = time.time()
# Sicheres Passwort-Hash für Kiosk-Deaktivierung
KIOSK_PASSWORD_HASH = generate_password_hash("744563017196A")
print("Alle Blueprints wurden in app.py integriert")
# Custom decorator für Job-Besitzer-Check
def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
is_admin = current_user.is_admin
if not (is_owner or is_admin):
db_session.close()
return jsonify({"error": "Keine Berechtigung"}), 403
db_session.close()
return f(job_id, *args, **kwargs)
return decorated_function
# Custom decorator für Admin-Check
def admin_required(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
app_logger.info(f"Admin-Check für Funktion {f.__name__}: User authenticated: {current_user.is_authenticated}, User ID: {current_user.id if current_user.is_authenticated else 'None'}, Is Admin: {current_user.is_admin if current_user.is_authenticated else 'None'}")
if not current_user.is_admin:
app_logger.warning(f"Admin-Zugriff verweigert für User {current_user.id if current_user.is_authenticated else 'Anonymous'} auf Funktion {f.__name__}")
return jsonify({"error": "Nur Administratoren haben Zugriff"}), 403
return f(*args, **kwargs)
return decorated_function
# ===== AUTHENTIFIZIERUNGS-ROUTEN (ehemals auth.py) =====
@app.route("/auth/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
return redirect(url_for("index"))
error = None
if request.method == "POST":
# Unterscheiden zwischen JSON-Anfragen und normalen Formular-Anfragen
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
# Daten je nach Anfrageart auslesen
if is_json_request:
data = request.get_json()
username = data.get("username") or data.get("email") # Fallback für email
password = data.get("password")
remember_me = data.get("remember_me", False)
else:
# Korrigierte Feldnamen - Template verwendet "email" nicht "username"
username = request.form.get("email") # Geändert von "username" zu "email"
password = request.form.get("password")
remember_me = request.form.get("remember_me") == "on" # Geändert von "remember-me"
if not username or not password:
error = "Benutzername und Passwort müssen angegeben werden."
if is_json_request:
return jsonify({"error": error}), 400
else:
try:
db_session = get_db_session()
# Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"Benutzer {username} hat sich angemeldet")
next_page = request.args.get("next")
db_session.close()
if is_json_request:
return jsonify({"success": True, "redirect_url": next_page or url_for("index")})
else:
if next_page:
return redirect(next_page)
return redirect(url_for("index"))
else:
error = "Ungültiger Benutzername oder Passwort."
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}")
db_session.close()
if is_json_request:
return jsonify({"error": error}), 401
except Exception as e:
# Fehlerbehandlung für Datenbankprobleme
error = "Anmeldefehler. Bitte versuchen Sie es später erneut."
auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}")
if is_json_request:
return jsonify({"error": error}), 500
return render_template("login.html", error=error)
@app.route("/auth/logout", methods=["GET", "POST"])
@login_required
def auth_logout():
"""Meldet den Benutzer ab."""
app_logger.info(f"Benutzer {current_user.email} hat sich abgemeldet")
logout_user()
flash("Sie wurden erfolgreich abgemeldet.", "info")
return redirect(url_for("login"))
@app.route("/auth/reset-password-request", methods=["GET", "POST"])
def reset_password_request():
"""Passwort-Reset anfordern (Placeholder)."""
# TODO: Implement password reset functionality
flash("Passwort-Reset-Funktionalität ist noch nicht implementiert.", "info")
return redirect(url_for("login"))
@app.route("/auth/api/login", methods=["POST"])
def api_login():
"""API-Login-Endpunkt für Frontend"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
username = data.get("username")
password = data.get("password")
remember_me = data.get("remember_me", False)
if not username or not password:
return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400
db_session = get_db_session()
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"API-Login erfolgreich für Benutzer {username}")
user_data = {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
}
db_session.close()
return jsonify({
"success": True,
"user": user_data,
"redirect_url": url_for("index")
})
else:
auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}")
db_session.close()
return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401
except Exception as e:
auth_logger.error(f"Fehler beim API-Login: {str(e)}")
return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500
@app.route("/auth/api/callback", methods=["GET", "POST"])
def api_callback():
"""OAuth-Callback-Endpunkt für externe Authentifizierung"""
try:
# OAuth-Provider bestimmen
provider = request.args.get('provider', 'github')
if request.method == "GET":
# Authorization Code aus URL-Parameter extrahieren
code = request.args.get('code')
state = request.args.get('state')
error = request.args.get('error')
if error:
auth_logger.warning(f"OAuth-Fehler von {provider}: {error}")
return jsonify({
"error": f"OAuth-Authentifizierung fehlgeschlagen: {error}",
"redirect_url": url_for("login")
}), 400
if not code:
auth_logger.warning(f"Kein Authorization Code von {provider} erhalten")
return jsonify({
"error": "Kein Authorization Code erhalten",
"redirect_url": url_for("login")
}), 400
# State-Parameter validieren (CSRF-Schutz)
session_state = session.get('oauth_state')
if not state or state != session_state:
auth_logger.warning(f"Ungültiger State-Parameter von {provider}")
return jsonify({
"error": "Ungültiger State-Parameter",
"redirect_url": url_for("login")
}), 400
# OAuth-Token austauschen
if provider == 'github':
user_data = handle_github_callback(code)
else:
auth_logger.error(f"Unbekannter OAuth-Provider: {provider}")
return jsonify({
"error": "Unbekannter OAuth-Provider",
"redirect_url": url_for("login")
}), 400
if not user_data:
return jsonify({
"error": "Fehler beim Abrufen der Benutzerdaten",
"redirect_url": url_for("login")
}), 400
# Benutzer in Datenbank suchen oder erstellen
db_session = get_db_session()
try:
user = db_session.query(User).filter(
User.email == user_data['email']
).first()
if not user:
# Neuen Benutzer erstellen
user = User(
username=user_data['username'],
email=user_data['email'],
name=user_data['name'],
is_admin=False,
oauth_provider=provider,
oauth_id=str(user_data['id'])
)
# Zufälliges Passwort setzen (wird nicht verwendet)
import secrets
user.set_password(secrets.token_urlsafe(32))
db_session.add(user)
db_session.commit()
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
else:
# Bestehenden Benutzer aktualisieren
user.oauth_provider = provider
user.oauth_id = str(user_data['id'])
user.name = user_data['name']
user.updated_at = datetime.now()
db_session.commit()
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=True)
# Session-State löschen
session.pop('oauth_state', None)
response_data = {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
},
"redirect_url": url_for("index")
}
db_session.close()
return jsonify(response_data)
except Exception as e:
db_session.rollback()
db_session.close()
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
return jsonify({
"error": "Datenbankfehler bei der Benutzeranmeldung",
"redirect_url": url_for("login")
}), 500
elif request.method == "POST":
# POST-Anfragen für manuelle Token-Übermittlung
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
access_token = data.get('access_token')
provider = data.get('provider', 'github')
if not access_token:
return jsonify({"error": "Kein Access Token erhalten"}), 400
# Benutzerdaten mit Access Token abrufen
if provider == 'github':
user_data = get_github_user_data(access_token)
else:
return jsonify({"error": "Unbekannter OAuth-Provider"}), 400
if not user_data:
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 400
# Benutzer verarbeiten (gleiche Logik wie bei GET)
db_session = get_db_session()
try:
user = db_session.query(User).filter(
User.email == user_data['email']
).first()
if not user:
user = User(
username=user_data['username'],
email=user_data['email'],
name=user_data['name'],
is_admin=False,
oauth_provider=provider,
oauth_id=str(user_data['id'])
)
import secrets
user.set_password(secrets.token_urlsafe(32))
db_session.add(user)
db_session.commit()
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
else:
user.oauth_provider = provider
user.oauth_id = str(user_data['id'])
user.name = user_data['name']
user.updated_at = datetime.now()
db_session.commit()
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=True)
response_data = {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
},
"redirect_url": url_for("index")
}
db_session.close()
return jsonify(response_data)
except Exception as e:
db_session.rollback()
db_session.close()
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
return jsonify({
"error": "Datenbankfehler bei der Benutzeranmeldung",
"redirect_url": url_for("login")
}), 500
except Exception as e:
auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}")
return jsonify({
"error": "OAuth-Callback-Fehler",
"redirect_url": url_for("login")
}), 500
def handle_github_callback(code):
"""GitHub OAuth-Callback verarbeiten"""
try:
import requests
# GitHub OAuth-Konfiguration (sollte aus Umgebungsvariablen kommen)
client_id = "7c5d8bef1a5519ec1fdc"
client_secret = "5f1e586204358fbd53cf5fb7d418b3f06ccab8fd"
if not client_id or not client_secret:
auth_logger.error("GitHub OAuth-Konfiguration fehlt")
return None
# Access Token anfordern
token_url = "https://github.com/login/oauth/access_token"
token_data = {
'client_id': client_id,
'client_secret': client_secret,
'code': code
}
token_response = requests.post(
token_url,
data=token_data,
headers={'Accept': 'application/json'},
timeout=10
)
if token_response.status_code != 200:
auth_logger.error(f"GitHub Token-Anfrage fehlgeschlagen: {token_response.status_code}")
return None
token_json = token_response.json()
access_token = token_json.get('access_token')
if not access_token:
auth_logger.error("Kein Access Token von GitHub erhalten")
return None
return get_github_user_data(access_token)
except Exception as e:
auth_logger.error(f"Fehler bei GitHub OAuth-Callback: {str(e)}")
return None
def get_github_user_data(access_token):
"""GitHub-Benutzerdaten mit Access Token abrufen"""
try:
import requests
# Benutzerdaten von GitHub API abrufen
user_url = "https://api.github.com/user"
headers = {
'Authorization': f'token {access_token}',
'Accept': 'application/vnd.github.v3+json'
}
user_response = requests.get(user_url, headers=headers, timeout=10)
if user_response.status_code != 200:
auth_logger.error(f"GitHub User-API-Anfrage fehlgeschlagen: {user_response.status_code}")
return None
user_data = user_response.json()
# E-Mail-Adresse separat abrufen (falls nicht öffentlich)
email = user_data.get('email')
if not email:
email_url = "https://api.github.com/user/emails"
email_response = requests.get(email_url, headers=headers, timeout=10)
if email_response.status_code == 200:
emails = email_response.json()
# Primäre E-Mail-Adresse finden
for email_obj in emails:
if email_obj.get('primary', False):
email = email_obj.get('email')
break
# Fallback: Erste E-Mail-Adresse verwenden
if not email and emails:
email = emails[0].get('email')
if not email:
auth_logger.error("Keine E-Mail-Adresse von GitHub erhalten")
return None
return {
'id': user_data.get('id'),
'username': user_data.get('login'),
'name': user_data.get('name') or user_data.get('login'),
'email': email
}
except Exception as e:
auth_logger.error(f"Fehler beim Abrufen der GitHub-Benutzerdaten: {str(e)}")
return None
# ===== BENUTZER-ROUTEN (ehemals user.py) =====
@app.route("/user/profile", methods=["GET"])
@login_required
def user_profile():
"""Profil-Seite anzeigen"""
user_logger.info(f"Benutzer {current_user.username} hat seine Profilseite aufgerufen")
return render_template("profile.html", user=current_user)
@app.route("/user/settings", methods=["GET"])
@login_required
def user_settings():
"""Einstellungen-Seite anzeigen"""
user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungsseite aufgerufen")
return render_template("settings.html", user=current_user)
@app.route("/user/update-profile", methods=["POST"])
@login_required
def user_update_profile():
"""Benutzerprofilinformationen aktualisieren"""
try:
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
if is_json_request:
data = request.get_json()
name = data.get("name")
email = data.get("email")
department = data.get("department")
position = data.get("position")
phone = data.get("phone")
else:
name = request.form.get("name")
email = request.form.get("email")
department = request.form.get("department")
position = request.form.get("position")
phone = request.form.get("phone")
db_session = get_db_session()
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
if user:
# Aktualisiere die Benutzerinformationen
if name:
user.name = name
if email:
user.email = email
if department:
user.department = department
if position:
user.position = position
if phone:
user.phone = phone
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Benutzer {current_user.username} hat sein Profil aktualisiert")
if is_json_request:
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert"
})
else:
flash("Profil erfolgreich aktualisiert", "success")
return redirect(url_for("user_profile"))
else:
error = "Benutzer nicht gefunden."
if is_json_request:
return jsonify({"error": error}), 404
else:
flash(error, "error")
return redirect(url_for("user_profile"))
except Exception as e:
error = f"Fehler beim Aktualisieren des Profils: {str(e)}"
user_logger.error(error)
if request.is_json:
return jsonify({"error": error}), 500
else:
flash(error, "error")
return redirect(url_for("user_profile"))
finally:
db_session.close()
@app.route("/user/api/update-settings", methods=["POST"])
@login_required
def user_api_update_settings():
"""API-Endpunkt für Einstellungen-Updates (JSON)"""
return user_update_profile()
@app.route("/user/update-settings", methods=["POST"])
@login_required
def user_update_settings():
"""Benutzereinstellungen aktualisieren"""
db_session = get_db_session()
try:
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
# Einstellungen aus der Anfrage extrahieren
if is_json_request:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten empfangen"}), 400
theme = data.get("theme", "system")
reduced_motion = bool(data.get("reduced_motion", False))
contrast = data.get("contrast", "normal")
notifications = data.get("notifications", {})
privacy = data.get("privacy", {})
else:
theme = request.form.get("theme", "system")
reduced_motion = request.form.get("reduced_motion") == "on"
contrast = request.form.get("contrast", "normal")
notifications = {
"new_jobs": request.form.get("notify_new_jobs") == "on",
"job_updates": request.form.get("notify_job_updates") == "on",
"system": request.form.get("notify_system") == "on",
"email": request.form.get("notify_email") == "on"
}
privacy = {
"activity_logs": request.form.get("activity_logs") == "on",
"two_factor": request.form.get("two_factor") == "on",
"auto_logout": int(request.form.get("auto_logout", "60"))
}
# Validierung der Eingaben
valid_themes = ["light", "dark", "system"]
if theme not in valid_themes:
theme = "system"
valid_contrasts = ["normal", "high"]
if contrast not in valid_contrasts:
contrast = "normal"
# Benutzer aus der Datenbank laden
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
if not user:
error = "Benutzer nicht gefunden."
if is_json_request:
return jsonify({"error": error}), 404
else:
flash(error, "error")
return redirect(url_for("user_settings"))
# Einstellungen-Dictionary erstellen
settings = {
"theme": theme,
"reduced_motion": reduced_motion,
"contrast": contrast,
"notifications": {
"new_jobs": bool(notifications.get("new_jobs", True)),
"job_updates": bool(notifications.get("job_updates", True)),
"system": bool(notifications.get("system", True)),
"email": bool(notifications.get("email", False))
},
"privacy": {
"activity_logs": bool(privacy.get("activity_logs", True)),
"two_factor": bool(privacy.get("two_factor", False)),
"auto_logout": max(5, min(480, int(privacy.get("auto_logout", 60)))) # 5-480 Minuten
},
"last_updated": datetime.now().isoformat()
}
# Prüfen, ob User-Tabelle eine settings-Spalte hat
if hasattr(user, 'settings'):
# Einstellungen in der Datenbank speichern
import json
user.settings = json.dumps(settings)
else:
# Fallback: In Session speichern (temporär)
session['user_settings'] = settings
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungen aktualisiert")
if is_json_request:
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert",
"settings": settings
})
else:
flash("Einstellungen erfolgreich aktualisiert", "success")
return redirect(url_for("user_settings"))
except ValueError as e:
error = f"Ungültige Eingabedaten: {str(e)}"
user_logger.warning(f"Ungültige Einstellungsdaten von Benutzer {current_user.username}: {str(e)}")
if is_json_request:
return jsonify({"error": error}), 400
else:
flash(error, "error")
return redirect(url_for("user_settings"))
except Exception as e:
db_session.rollback()
error = f"Fehler beim Aktualisieren der Einstellungen: {str(e)}"
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen für Benutzer {current_user.username}: {str(e)}")
if is_json_request:
return jsonify({"error": "Interner Serverfehler"}), 500
else:
flash("Fehler beim Speichern der Einstellungen", "error")
return redirect(url_for("user_settings"))
finally:
db_session.close()
@app.route("/api/user/settings", methods=["GET"])
@login_required
def get_user_settings():
"""Holt die aktuellen Benutzereinstellungen"""
try:
# Einstellungen aus Session oder Datenbank laden
user_settings = session.get('user_settings', {})
# Standard-Einstellungen falls keine vorhanden
default_settings = {
"theme": "system",
"reduced_motion": False,
"contrast": "normal",
"notifications": {
"new_jobs": True,
"job_updates": True,
"system": True,
"email": False
},
"privacy": {
"activity_logs": True,
"two_factor": False,
"auto_logout": 60
}
}
# Merge mit Standard-Einstellungen
settings = {**default_settings, **user_settings}
return jsonify({
"success": True,
"settings": settings
})
except Exception as e:
user_logger.error(f"Fehler beim Laden der Benutzereinstellungen: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim Laden der Einstellungen"
}), 500
@app.route("/user/change-password", methods=["POST"])
@login_required
def user_change_password():
"""Benutzerpasswort ändern"""
try:
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
if is_json_request:
data = request.get_json()
current_password = data.get("current_password")
new_password = data.get("new_password")
confirm_password = data.get("confirm_password")
else:
current_password = request.form.get("current_password")
new_password = request.form.get("new_password")
confirm_password = request.form.get("confirm_password")
# Prüfen, ob alle Felder ausgefüllt sind
if not current_password or not new_password or not confirm_password:
error = "Alle Passwortfelder müssen ausgefüllt sein."
if is_json_request:
return jsonify({"error": error}), 400
else:
flash(error, "error")
return redirect(url_for("user_profile"))
# Prüfen, ob das neue Passwort und die Bestätigung übereinstimmen
if new_password != confirm_password:
error = "Das neue Passwort und die Bestätigung stimmen nicht überein."
if is_json_request:
return jsonify({"error": error}), 400
else:
flash(error, "error")
return redirect(url_for("user_profile"))
db_session = get_db_session()
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
if user and user.check_password(current_password):
# Passwort aktualisieren
user.set_password(new_password)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Benutzer {current_user.username} hat sein Passwort geändert")
if is_json_request:
return jsonify({
"success": True,
"message": "Passwort erfolgreich geändert"
})
else:
flash("Passwort erfolgreich geändert", "success")
return redirect(url_for("user_profile"))
else:
error = "Das aktuelle Passwort ist nicht korrekt."
if is_json_request:
return jsonify({"error": error}), 401
else:
flash(error, "error")
return redirect(url_for("user_profile"))
except Exception as e:
error = f"Fehler beim Ändern des Passworts: {str(e)}"
user_logger.error(error)
if request.is_json:
return jsonify({"error": error}), 500
else:
flash(error, "error")
return redirect(url_for("user_profile"))
finally:
db_session.close()
@app.route("/user/export", methods=["GET"])
@login_required
def user_export_data():
"""Exportiert alle Benutzerdaten als JSON für DSGVO-Konformität"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Benutzerdaten abrufen
user_data = user.to_dict()
# Jobs des Benutzers abrufen
jobs = db_session.query(Job).filter(Job.user_id == user.id).all()
user_data["jobs"] = [job.to_dict() for job in jobs]
# Aktivitäten und Einstellungen hinzufügen
user_data["settings"] = session.get('user_settings', {})
# Persönliche Statistiken
user_data["statistics"] = {
"total_jobs": len(jobs),
"completed_jobs": len([j for j in jobs if j.status == "finished"]),
"failed_jobs": len([j for j in jobs if j.status == "failed"]),
"account_created": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None
}
db_session.close()
# Daten als JSON-Datei zum Download anbieten
response = make_response(json.dumps(user_data, indent=4))
response.headers["Content-Disposition"] = f"attachment; filename=user_data_{user.username}.json"
response.headers["Content-Type"] = "application/json"
user_logger.info(f"Benutzer {current_user.username} hat seine Daten exportiert")
return response
except Exception as e:
error = f"Fehler beim Exportieren der Benutzerdaten: {str(e)}"
user_logger.error(error)
return jsonify({"error": error}), 500
@app.route("/user/profile", methods=["PUT"])
@login_required
def user_update_profile_api():
"""API-Endpunkt zum Aktualisieren des Benutzerprofils"""
try:
if not request.is_json:
return jsonify({"error": "Anfrage muss im JSON-Format sein"}), 400
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisiere nur die bereitgestellten Felder
if "name" in data:
user.name = data["name"]
if "email" in data:
user.email = data["email"]
if "department" in data:
user.department = data["department"]
if "position" in data:
user.position = data["position"]
if "phone" in data:
user.phone = data["phone"]
if "bio" in data:
user.bio = data["bio"]
user.updated_at = datetime.now()
db_session.commit()
# Aktualisierte Benutzerdaten zurückgeben
user_data = user.to_dict()
db_session.close()
user_logger.info(f"Benutzer {current_user.username} hat sein Profil über die API aktualisiert")
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert",
"user": user_data
})
except Exception as e:
error = f"Fehler beim Aktualisieren des Profils: {str(e)}"
user_logger.error(error)
return jsonify({"error": error}), 500
# ===== KIOSK-KONTROLL-ROUTEN (ehemals kiosk_control.py) =====
@app.route('/api/kiosk/status', methods=['GET'])
def kiosk_get_status():
"""Kiosk-Status abrufen."""
try:
# Prüfen ob Kiosk-Modus aktiv ist
kiosk_active = os.path.exists('/tmp/kiosk_active')
return jsonify({
"active": kiosk_active,
"message": "Kiosk-Status erfolgreich abgerufen"
})
except Exception as e:
kiosk_logger.error(f"Fehler beim Abrufen des Kiosk-Status: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen des Status"}), 500
@app.route('/api/kiosk/deactivate', methods=['POST'])
def kiosk_deactivate():
"""Kiosk-Modus mit Passwort deaktivieren."""
try:
data = request.get_json()
if not data or 'password' not in data:
return jsonify({"error": "Passwort erforderlich"}), 400
password = data['password']
# Passwort überprüfen
if not check_password_hash(KIOSK_PASSWORD_HASH, password):
kiosk_logger.warning(f"Fehlgeschlagener Kiosk-Deaktivierungsversuch von IP: {request.remote_addr}")
return jsonify({"error": "Ungültiges Passwort"}), 401
# Kiosk deaktivieren
try:
# Kiosk-Service stoppen
subprocess.run(['sudo', 'systemctl', 'stop', 'myp-kiosk'], check=True)
subprocess.run(['sudo', 'systemctl', 'disable', 'myp-kiosk'], check=True)
# Kiosk-Marker entfernen
if os.path.exists('/tmp/kiosk_active'):
os.remove('/tmp/kiosk_active')
# Normale Desktop-Umgebung wiederherstellen
subprocess.run(['sudo', 'systemctl', 'set-default', 'graphical.target'], check=True)
kiosk_logger.info(f"Kiosk-Modus erfolgreich deaktiviert von IP: {request.remote_addr}")
return jsonify({
"success": True,
"message": "Kiosk-Modus erfolgreich deaktiviert. System wird neu gestartet."
})
except subprocess.CalledProcessError as e:
kiosk_logger.error(f"Fehler beim Deaktivieren des Kiosk-Modus: {str(e)}")
return jsonify({"error": "Fehler beim Deaktivieren des Kiosk-Modus"}), 500
except Exception as e:
kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Deaktivierung: {str(e)}")
return jsonify({"error": "Unerwarteter Fehler"}), 500
@app.route('/api/kiosk/activate', methods=['POST'])
@login_required
def kiosk_activate():
"""Kiosk-Modus aktivieren (nur für Admins)."""
try:
# Admin-Authentifizierung prüfen
if not current_user.is_admin:
kiosk_logger.warning(f"Nicht-Admin-Benutzer {current_user.username} versuchte Kiosk-Aktivierung")
return jsonify({"error": "Nur Administratoren können den Kiosk-Modus aktivieren"}), 403
# Kiosk aktivieren
try:
# Kiosk-Marker setzen
with open('/tmp/kiosk_active', 'w') as f:
f.write('1')
# Kiosk-Service aktivieren
subprocess.run(['sudo', 'systemctl', 'enable', 'myp-kiosk'], check=True)
subprocess.run(['sudo', 'systemctl', 'start', 'myp-kiosk'], check=True)
kiosk_logger.info(f"Kiosk-Modus erfolgreich aktiviert von Admin {current_user.username} (IP: {request.remote_addr})")
return jsonify({
"success": True,
"message": "Kiosk-Modus erfolgreich aktiviert"
})
except subprocess.CalledProcessError as e:
kiosk_logger.error(f"Fehler beim Aktivieren des Kiosk-Modus: {str(e)}")
return jsonify({"error": "Fehler beim Aktivieren des Kiosk-Modus"}), 500
except Exception as e:
kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Aktivierung: {str(e)}")
return jsonify({"error": "Unerwarteter Fehler"}), 500
@app.route('/api/kiosk/restart', methods=['POST'])
def kiosk_restart_system():
"""System neu starten (nur nach Kiosk-Deaktivierung)."""
try:
data = request.get_json()
if not data or 'password' not in data:
return jsonify({"error": "Passwort erforderlich"}), 400
password = data['password']
# Passwort überprüfen
if not check_password_hash(KIOSK_PASSWORD_HASH, password):
kiosk_logger.warning(f"Fehlgeschlagener Neustart-Versuch von IP: {request.remote_addr}")
return jsonify({"error": "Ungültiges Passwort"}), 401
kiosk_logger.info(f"System-Neustart initiiert von IP: {request.remote_addr}")
# System nach kurzer Verzögerung neu starten
subprocess.Popen(['sudo', 'shutdown', '-r', '+1'])
return jsonify({
"success": True,
"message": "System wird in 1 Minute neu gestartet"
})
except Exception as e:
kiosk_logger.error(f"Fehler beim System-Neustart: {str(e)}")
return jsonify({"error": "Fehler beim Neustart"}), 500
# ===== HILFSFUNKTIONEN =====
@measure_execution_time(logger=printers_logger, task_name="Drucker-Status-Prüfung")
def check_printer_status(ip_address: str, timeout: int = 7) -> Tuple[str, bool]:
"""
Überprüft den Status eines Druckers anhand der IP-Adresse.
Gibt den Status und die Erreichbarkeit zurück.
Args:
ip_address: IP-Adresse des Druckers oder der Steckdose
timeout: Timeout in Sekunden
Returns:
Tuple[str, bool]: (Status, Erreichbarkeit)
"""
status = "offline"
reachable = False
try:
# Überprüfen, ob die Steckdose online ist
import socket
# Erst Port 9999 versuchen (Tapo-Standard)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip_address, 9999))
sock.close()
if result == 0:
reachable = True
try:
# TP-Link Tapo Steckdose mit PyP100 überprüfen
from PyP100 import PyP100
p100 = PyP100.P100(ip_address, TAPO_USERNAME, TAPO_PASSWORD)
p100.handshake() # Authentifizierung
p100.login() # Login
# Geräteinformationen abrufen
device_info = p100.getDeviceInfo()
# Status auswerten
if device_info.get('device_on', False):
status = "online"
else:
status = "standby"
printers_logger.info(f"✅ Tapo-Steckdose {ip_address}: Status = {status}")
except Exception as e:
printers_logger.error(f"❌ Fehler bei Tapo-Status-Check für {ip_address}: {str(e)}")
reachable = False
status = "error"
else:
# Alternativ HTTP/HTTPS versuchen
try:
# HTTP auf Port 80
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip_address, 80))
sock.close()
if result == 0:
reachable = True
status = "online" # Standarddrucker ohne Tapo-Steckdose
else:
# HTTPS auf Port 443
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip_address, 443))
sock.close()
if result == 0:
reachable = True
status = "online"
except Exception as e:
printers_logger.error(f"❌ Fehler bei Socket-Check für {ip_address}: {str(e)}")
reachable = False
status = "error"
except Exception as e:
printers_logger.error(f"❌ Fehler bei Verbindungsprüfung zu {ip_address}: {str(e)}")
status = "error"
reachable = False
return status, reachable
@measure_execution_time(logger=printers_logger, task_name="Mehrere-Drucker-Status-Prüfung")
def check_multiple_printers_status(printers: List[Dict], timeout: int = 7) -> Dict[int, Tuple[str, bool]]:
"""
Überprüft den Status mehrerer Drucker parallel.
Args:
printers: Liste der zu prüfenden Drucker
timeout: Timeout für jeden einzelnen Drucker
Returns:
Dict[int, Tuple[str, bool]]: Dictionary mit Drucker-ID als Key und (Status, Aktiv) als Value
"""
results = {}
# Wenn keine Drucker vorhanden sind, gebe leeres Dict zurück
if not printers:
printers_logger.info(" Keine Drucker zum Status-Check gefunden")
return results
printers_logger.info(f"🔍 Prüfe Status von {len(printers)} Druckern parallel...")
# Parallel-Ausführung mit ThreadPoolExecutor
# Sicherstellen, dass max_workers mindestens 1 ist
max_workers = min(max(len(printers), 1), 10)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Futures für alle Drucker erstellen
future_to_printer = {
executor.submit(check_printer_status, printer.get('ip_address'), timeout): printer
for printer in printers
}
# Ergebnisse sammeln
for future in as_completed(future_to_printer, timeout=timeout + 2):
printer = future_to_printer[future]
try:
status, active = future.result()
results[printer['id']] = (status, active)
printers_logger.info(f"Drucker {printer['name']} ({printer.get('ip_address')}): {status}")
except Exception as e:
printers_logger.error(f"Fehler bei Status-Check für Drucker {printer['name']}: {str(e)}")
results[printer['id']] = ("offline", False)
printers_logger.info(f"✅ Status-Check abgeschlossen für {len(results)} Drucker")
return results
# ===== UI-ROUTEN =====
@app.route("/")
def index():
if current_user.is_authenticated:
return render_template("index.html")
return redirect(url_for("login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template("dashboard.html")
@app.route("/profile")
@login_required
def profile_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
return redirect(url_for("user_profile"))
@app.route("/profil")
@login_required
def profil_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user_profile"))
@app.route("/settings")
@login_required
def settings_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
return redirect(url_for("user_settings"))
@app.route("/einstellungen")
@login_required
def einstellungen_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user_settings"))
@app.route("/admin")
@login_required
def admin():
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return redirect(url_for("admin_page"))
@app.route("/demo")
@login_required
def components_demo():
"""Demo-Seite für UI-Komponenten"""
return render_template("components_demo.html")
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/jobs/new")
@login_required
def new_job_page():
"""Zeigt die Seite zum Erstellen neuer Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistik-Seite an."""
return render_template("stats.html")
@app.route("/admin-dashboard")
@login_required
def admin_page():
"""Zeigt die Administrationsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
active_tab = request.args.get('tab', 'users')
# Daten für das Admin-Panel direkt beim Laden vorbereiten
stats = {}
users = []
printers = []
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
system_info = {"cpu": 0, "memory": 0, "disk": 0}
logs = []
db_session = get_db_session()
try:
# Statistiken laden
from sqlalchemy.orm import joinedload
# Benutzeranzahl
stats["total_users"] = db_session.query(User).count()
# Druckeranzahl und Online-Status
all_printers = db_session.query(Printer).all()
stats["total_printers"] = len(all_printers)
stats["online_printers"] = len([p for p in all_printers if p.status == "online"])
# Aktive Jobs und Warteschlange
stats["active_jobs"] = db_session.query(Job).filter(
Job.status.in_(["printing", "running"])
).count()
stats["queued_jobs"] = db_session.query(Job).filter(
Job.status == "scheduled"
).count()
# Erfolgsrate
total_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"])
).count()
successful_jobs = db_session.query(Job).filter(
Job.status == "completed"
).count()
if total_jobs > 0:
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
else:
stats["success_rate"] = 0
# Benutzer laden
if active_tab == 'users':
users = db_session.query(User).all()
users = [user.to_dict() for user in users]
# Drucker laden
if active_tab == 'printers':
printers = db_session.query(Printer).all()
printers = [printer.to_dict() for printer in printers]
# Scheduler-Status laden
if active_tab == 'scheduler':
try:
from utils.scheduler import scheduler_is_running
is_running = scheduler_is_running()
scheduler_status = {
"running": is_running,
"message": "Der Scheduler läuft" if is_running else "Der Scheduler ist gestoppt"
}
except (ImportError, AttributeError):
scheduler_status = {
"running": False,
"message": "Scheduler-Status nicht verfügbar"
}
# System-Informationen laden
if active_tab == 'system':
import os
import psutil
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds // 86400)
uptime_hours = int((uptime_seconds % 86400) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
# Datenbank-Status
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'database', 'myp.db')
db_size = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Scheduler-Status
scheduler_running = False
scheduler_jobs = 0
try:
from utils.job_scheduler import scheduler
scheduler_running = scheduler.running
if hasattr(scheduler, 'get_jobs'):
scheduler_jobs = len(scheduler.get_jobs())
except:
pass
# Nächster Job
next_job = db_session.query(Job).filter(
Job.status == "scheduled"
).order_by(Job.created_at.asc()).first()
next_job_time = "Keine geplanten Jobs"
if next_job:
next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M")
system_info = {
"cpu_usage": round(cpu_percent, 1),
"memory_usage": round(memory.percent, 1),
"disk_usage": round((disk.used / disk.total) * 100, 1),
"uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m",
"db_size": f"{db_size:.1f} MB",
"db_connections": "Aktiv",
"scheduler_running": scheduler_running,
"scheduler_jobs": scheduler_jobs,
"next_job": next_job_time
}
# Logs laden
if active_tab == 'logs':
import os
log_level = request.args.get('log_level', 'all')
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
# Logeinträge sammeln
app_logs = []
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
log_file = os.path.join(log_dir, category, f'{category}.log')
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
if log_level != 'all':
if log_level.upper() not in line:
continue
app_logs.append({
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
'category': category,
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
})
# Nach Zeitstempel sortieren (neueste zuerst)
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
finally:
db_session.close()
return render_template(
"admin.html",
active_tab=active_tab,
stats=stats,
users=users,
printers=printers,
scheduler_status=scheduler_status,
system_info=system_info,
logs=logs
)
# ===== ERROR MONITORING SYSTEM =====
@app.route("/api/admin/system-health", methods=['GET'])
@login_required
def api_admin_system_health():
"""API-Endpunkt für System-Gesundheitscheck."""
if not current_user.is_admin:
return jsonify({"error": "Berechtigung verweigert"}), 403
db_session = get_db_session()
critical_errors = []
warnings = []
try:
# 1. Datenbank-Schema-Integrität prüfen
try:
# Test verschiedene kritische Tabellen und Spalten
db_session.execute(text("SELECT COUNT(*) FROM guest_requests WHERE duration_minutes IS NOT NULL"))
schema_integrity = "OK"
except Exception as e:
critical_errors.append({
"type": "database_schema",
"message": f"Datenbank-Schema-Fehler: {str(e)}",
"severity": "critical",
"suggested_fix": "Datenbank-Migration ausführen",
"timestamp": datetime.now().isoformat()
})
schema_integrity = "FEHLER"
# 2. Prüfe kritische Spalten in wichtigen Tabellen
schema_checks = [
("guest_requests", "duration_minutes"),
("guest_requests", "file_name"),
("guest_requests", "processed_by"),
("users", "updated_at"),
("jobs", "duration_minutes")
]
missing_columns = []
for table, column in schema_checks:
try:
db_session.execute(text(f"SELECT {column} FROM {table} LIMIT 1"))
except Exception:
missing_columns.append(f"{table}.{column}")
if missing_columns:
critical_errors.append({
"type": "missing_columns",
"message": f"Fehlende Datenbank-Spalten: {', '.join(missing_columns)}",
"severity": "critical",
"suggested_fix": "python utils/database_schema_migration.py ausführen",
"timestamp": datetime.now().isoformat(),
"details": missing_columns
})
# 3. Prüfe auf wiederkehrende Datenbankfehler in den Logs
import os
log_file = os.path.join("logs", "app", f"myp_app_{datetime.now().strftime('%Y_%m_%d')}.log")
recent_db_errors = 0
if os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8') as f:
last_lines = f.readlines()[-100:] # Letzte 100 Zeilen
for line in last_lines:
if "OperationalError" in line or "no such column" in line:
recent_db_errors += 1
except Exception:
pass
if recent_db_errors > 5:
critical_errors.append({
"type": "frequent_db_errors",
"message": f"{recent_db_errors} Datenbankfehler in letzter Zeit erkannt",
"severity": "high",
"suggested_fix": "System-Logs überprüfen und Migration ausführen",
"timestamp": datetime.now().isoformat()
})
# 4. Prüfe Drucker-Konnektivität
offline_printers = db_session.query(Printer).filter(
Printer.status == "offline",
Printer.active == True
).count()
if offline_printers > 0:
warnings.append({
"type": "printer_offline",
"message": f"{offline_printers} aktive Drucker sind offline",
"severity": "warning",
"suggested_fix": "Drucker-Status überprüfen",
"timestamp": datetime.now().isoformat()
})
# 5. System-Performance Metriken
import psutil
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
disk_usage = psutil.disk_usage('/').percent
if cpu_usage > 90:
warnings.append({
"type": "high_cpu",
"message": f"Hohe CPU-Auslastung: {cpu_usage:.1f}%",
"severity": "warning",
"suggested_fix": "System-Ressourcen überprüfen",
"timestamp": datetime.now().isoformat()
})
if memory_usage > 85:
warnings.append({
"type": "high_memory",
"message": f"Hohe Speicher-Auslastung: {memory_usage:.1f}%",
"severity": "warning",
"suggested_fix": "Speicher-Verbrauch optimieren",
"timestamp": datetime.now().isoformat()
})
# 6. Letzte Migration info
try:
backup_dir = os.path.join("database", "backups")
if os.path.exists(backup_dir):
backup_files = [f for f in os.listdir(backup_dir) if f.endswith('.backup')]
if backup_files:
latest_backup = max(backup_files, key=lambda x: os.path.getctime(os.path.join(backup_dir, x)))
last_migration = latest_backup.replace('.backup', '').replace('myp.db.backup_', '')
else:
last_migration = "Keine Backups gefunden"
else:
last_migration = "Backup-Verzeichnis nicht gefunden"
except Exception:
last_migration = "Unbekannt"
return jsonify({
"success": True,
"health_status": "critical" if critical_errors else ("warning" if warnings else "healthy"),
"critical_errors": critical_errors,
"warnings": warnings,
"schema_integrity": schema_integrity,
"last_migration": last_migration,
"recent_errors_count": recent_db_errors,
"system_metrics": {
"cpu_usage": cpu_usage,
"memory_usage": memory_usage,
"disk_usage": disk_usage
},
"timestamp": datetime.now().isoformat()
})
except Exception as e:
app_logger.error(f"Fehler beim System-Gesundheitscheck: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim System-Gesundheitscheck",
"critical_errors": [{
"type": "system_check_failed",
"message": f"System-Check fehlgeschlagen: {str(e)}",
"severity": "critical",
"suggested_fix": "System-Logs überprüfen",
"timestamp": datetime.now().isoformat()
}]
}), 500
finally:
db_session.close()
@app.route("/api/admin/fix-errors", methods=['POST'])
@login_required
@csrf.exempt
def api_admin_fix_errors():
"""API-Endpunkt um automatische Fehler-Reparatur auszuführen."""
if not current_user.is_admin:
return jsonify({"error": "Berechtigung verweigert"}), 403
try:
# Automatische Migration ausführen
import subprocess
import sys
# Migration in separatem Prozess ausführen
result = subprocess.run(
[sys.executable, "utils/database_schema_migration.py"],
cwd=os.path.dirname(os.path.abspath(__file__)),
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
app_logger.info(f"Automatische Migration erfolgreich ausgeführt von Admin {current_user.email}")
return jsonify({
"success": True,
"message": "Automatische Reparatur erfolgreich durchgeführt",
"details": result.stdout
})
else:
app_logger.error(f"Automatische Migration fehlgeschlagen: {result.stderr}")
return jsonify({
"success": False,
"error": "Automatische Reparatur fehlgeschlagen",
"details": result.stderr
}), 500
except subprocess.TimeoutExpired:
return jsonify({
"success": False,
"error": "Migration-Timeout - Vorgang dauerte zu lange"
}), 500
except Exception as e:
app_logger.error(f"Fehler bei automatischer Reparatur: {str(e)}")
return jsonify({
"success": False,
"error": f"Fehler bei automatischer Reparatur: {str(e)}"
}), 500
# Direkter Zugriff auf Logout-Route (für Fallback)
@app.route("/logout", methods=["GET", "POST"])
def logout_redirect():
"""Leitet zur Blueprint-Logout-Route weiter."""
return redirect(url_for("auth_logout"))
# ===== JOB-ROUTEN =====
@app.route("/api/jobs", methods=["GET"])
@login_required
def get_jobs():
db_session = get_db_session()
try:
# Import joinedload for eager loading
from sqlalchemy.orm import joinedload
# Admin sieht alle Jobs, User nur eigene
if current_user.is_admin:
# Eagerly load the user and printer relationships to avoid detached instance errors
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
else:
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
# Convert jobs to dictionaries before closing the session
job_dicts = [job.to_dict() for job in jobs]
db_session.close()
return jsonify({
"jobs": job_dicts
})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job(job_id):
db_session = get_db_session()
try:
from sqlalchemy.orm import joinedload
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs/check-waiting', methods=['POST'])
@login_required
def check_waiting_jobs():
"""Überprüft wartende Jobs und startet sie, wenn Drucker online gehen."""
try:
db_session = get_db_session()
# Alle wartenden Jobs finden
waiting_jobs = db_session.query(Job).filter(
Job.status == "waiting_for_printer"
).all()
if not waiting_jobs:
db_session.close()
return jsonify({
"message": "Keine wartenden Jobs gefunden",
"updated_jobs": []
})
updated_jobs = []
for job in waiting_jobs:
# Drucker-Status prüfen
printer = db_session.query(Printer).get(job.printer_id)
if printer and printer.plug_ip:
status, active = check_printer_status(printer.plug_ip)
if status == "online" and active:
# Drucker ist jetzt online - Job kann geplant werden
job.status = "scheduled"
updated_jobs.append({
"id": job.id,
"name": job.name,
"printer_name": printer.name,
"status": "scheduled"
})
jobs_logger.info(f"Job {job.id} von 'waiting_for_printer' zu 'scheduled' geändert - Drucker {printer.name} ist online")
if updated_jobs:
db_session.commit()
db_session.close()
return jsonify({
"message": f"{len(updated_jobs)} Jobs aktualisiert",
"updated_jobs": updated_jobs
})
except Exception as e:
jobs_logger.error(f"Fehler beim Überprüfen wartender Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs/active', methods=['GET'])
@login_required
def get_active_jobs():
"""
Gibt alle aktiven Jobs zurück.
"""
try:
db_session = get_db_session()
from sqlalchemy.orm import joinedload
active_jobs = db_session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(
Job.status.in_(["scheduled", "running"])
).all()
result = []
for job in active_jobs:
job_dict = job.to_dict()
# Aktuelle Restzeit berechnen
if job.status == "running" and job.end_at:
remaining_time = job.end_at - datetime.now()
if remaining_time.total_seconds() > 0:
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
else:
job_dict["remaining_minutes"] = 0
result.append(job_dict)
db_session.close()
return jsonify({"jobs": result})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs', methods=['POST'])
@login_required
@measure_execution_time(logger=jobs_logger, task_name="API-Job-Erstellung")
def create_job():
"""
Erstellt einen neuen Job mit dem Status "scheduled".
Body: {
"printer_id": int,
"start_iso": str, # ISO-Datum-String
"duration_minutes": int
}
"""
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["printer_id", "start_iso", "duration_minutes"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren und validieren
printer_id = int(data["printer_id"])
start_iso = data["start_iso"]
duration_minutes = int(data["duration_minutes"])
# Optional: Jobtitel und Dateipfad
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
file_path = data.get("file_path")
# Start-Zeit parsen
try:
start_at = datetime.fromisoformat(start_iso)
except ValueError:
return jsonify({"error": "Ungültiges Startdatum"}), 400
# Dauer validieren
if duration_minutes <= 0:
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
# End-Zeit berechnen
end_at = start_at + timedelta(minutes=duration_minutes)
db_session = get_db_session()
# Prüfen, ob der Drucker existiert
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Prüfen, ob der Drucker online ist
printer_status, printer_active = check_printer_status(printer.plug_ip if printer.plug_ip else "")
# Status basierend auf Drucker-Verfügbarkeit setzen
if printer_status == "online" and printer_active:
job_status = "scheduled"
else:
job_status = "waiting_for_printer"
# Neuen Job erstellen
new_job = Job(
name=name,
printer_id=printer_id,
user_id=current_user.id,
owner_id=current_user.id,
start_at=start_at,
end_at=end_at,
status=job_status,
file_path=file_path,
duration_minutes=duration_minutes
)
db_session.add(new_job)
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = new_job.to_dict()
db_session.close()
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
return jsonify({"job": job_dict}), 201
except Exception as e:
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
@login_required
@job_owner_required
def extend_job(job_id):
"""
Verlängert die Endzeit eines Jobs.
Body: {
"extra_minutes": int
}
"""
try:
data = request.json
# Prüfen, ob die erforderlichen Daten vorhanden sind
if "extra_minutes" not in data:
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
extra_minutes = int(data["extra_minutes"])
# Validieren
if extra_minutes <= 0:
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job verlängert werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
# Endzeit aktualisieren
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
job.duration_minutes += extra_minutes
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
@login_required
def finish_job(job_id):
"""
Beendet einen Job manuell und schaltet die Steckdose aus.
Nur für Administratoren erlaubt.
"""
try:
# Prüfen, ob der Benutzer Administrator ist
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
db_session = get_db_session()
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job beendet werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
if not toggle_plug(job.printer_id, False):
# Trotzdem weitermachen, aber Warnung loggen
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = datetime.now()
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
# ===== DRUCKER-ROUTEN =====
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
"""Gibt alle Drucker zurück - OHNE Status-Check für schnelleres Laden."""
db_session = get_db_session()
try:
# Windows-kompatible Timeout-Implementierung
import threading
import time
printers = None
timeout_occurred = False
def fetch_printers():
nonlocal printers, timeout_occurred
try:
printers = db_session.query(Printer).all()
except Exception as e:
printers_logger.error(f"Datenbankfehler beim Laden der Drucker: {str(e)}")
timeout_occurred = True
# Starte Datenbankabfrage in separatem Thread
thread = threading.Thread(target=fetch_printers)
thread.daemon = True
thread.start()
thread.join(timeout=5) # 5 Sekunden Timeout
if thread.is_alive() or timeout_occurred or printers is None:
printers_logger.warning("Database timeout when fetching printers for basic loading")
return jsonify({
'error': 'Database timeout beim Laden der Drucker',
'timeout': True,
'printers': []
}), 408
# Drucker-Daten OHNE Status-Check zusammenstellen für schnelles Laden
printer_data = []
current_time = datetime.now()
for printer in printers:
printer_data.append({
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline", # Letzter bekannter Status
"active": printer.active if hasattr(printer, 'active') else True,
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
"last_checked": printer.last_checked.isoformat() if hasattr(printer, 'last_checked') and printer.last_checked else None
})
db_session.close()
printers_logger.info(f"Schnelles Laden abgeschlossen: {len(printer_data)} Drucker geladen (ohne Status-Check)")
return jsonify({
"printers": printer_data,
"count": len(printer_data),
"message": "Drucker erfolgreich geladen"
})
except Exception as e:
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
return jsonify({
"error": f"Fehler beim Laden der Drucker: {str(e)}",
"printers": []
}), 500
@app.route("/api/printers/status", methods=["GET"])
@login_required
@measure_execution_time(logger=printers_logger, task_name="API-Drucker-Status-Abfrage")
def get_printers_with_status():
"""Gibt alle Drucker MIT aktuellem Status-Check zurück - für Aktualisierung."""
db_session = get_db_session()
try:
# Windows-kompatible Timeout-Implementierung
import threading
import time
printers = None
timeout_occurred = False
def fetch_printers():
nonlocal printers, timeout_occurred
try:
printers = db_session.query(Printer).all()
except Exception as e:
printers_logger.error(f"Datenbankfehler beim Status-Check: {str(e)}")
timeout_occurred = True
# Starte Datenbankabfrage in separatem Thread
thread = threading.Thread(target=fetch_printers)
thread.daemon = True
thread.start()
thread.join(timeout=8) # 8 Sekunden Timeout für Status-Check
if thread.is_alive() or timeout_occurred or printers is None:
printers_logger.warning("Database timeout when fetching printers for status check")
return jsonify({
'error': 'Database timeout beim Status-Check der Drucker',
'timeout': True
}), 408
# Drucker-Daten für Status-Check vorbereiten
printer_data = []
for printer in printers:
# Verwende plug_ip als primäre IP-Adresse, fallback auf ip_address
ip_to_check = printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None)
printer_data.append({
'id': printer.id,
'name': printer.name,
'ip_address': ip_to_check,
'location': printer.location,
'model': printer.model
})
# Status aller Drucker parallel überprüfen mit 7-Sekunden-Timeout
printers_logger.info(f"Starte Status-Check für {len(printer_data)} Drucker mit 7-Sekunden-Timeout")
# Fallback: Wenn keine IP-Adressen vorhanden sind, alle als offline markieren
if not any(p['ip_address'] for p in printer_data):
printers_logger.warning("Keine IP-Adressen für Drucker gefunden - alle als offline markiert")
status_results = {p['id']: ("offline", False) for p in printer_data}
else:
try:
status_results = check_multiple_printers_status(printer_data, timeout=7)
except Exception as e:
printers_logger.error(f"Fehler beim Status-Check: {str(e)}")
# Fallback: alle als offline markieren
status_results = {p['id']: ("offline", False) for p in printer_data}
# Ergebnisse zusammenstellen und Datenbank aktualisieren
status_data = []
current_time = datetime.now()
for printer in printers:
if printer.id in status_results:
status, active = status_results[printer.id]
# Mapping für Frontend-Kompatibilität
if status == "online":
frontend_status = "available"
else:
frontend_status = "offline"
else:
# Fallback falls kein Ergebnis vorliegt
frontend_status = "offline"
active = False
# Status in der Datenbank aktualisieren
printer.status = frontend_status
printer.active = active
# Setze last_checked falls das Feld existiert
if hasattr(printer, 'last_checked'):
printer.last_checked = current_time
status_data.append({
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": frontend_status,
"active": active,
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
"last_checked": current_time.isoformat()
})
# Speichere die aktualisierten Status
try:
db_session.commit()
printers_logger.info("Drucker-Status erfolgreich in Datenbank aktualisiert")
except Exception as e:
printers_logger.warning(f"Fehler beim Speichern der Status-Updates: {str(e)}")
# Nicht kritisch, Status-Check kann trotzdem zurückgegeben werden
db_session.close()
online_count = len([s for s in status_data if s['status'] == 'available'])
printers_logger.info(f"Status-Check abgeschlossen: {online_count} von {len(status_data)} Drucker online")
return jsonify(status_data)
except Exception as e:
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Status-Check der Drucker: {str(e)}")
return jsonify({
"error": f"Fehler beim Status-Check: {str(e)}",
"printers": []
}), 500
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def get_current_job():
"""Gibt den aktuellen Job des Benutzers zurück."""
db_session = get_db_session()
try:
current_job = db_session.query(Job).filter(
Job.user_id == int(current_user.id),
Job.status.in_(["scheduled", "running"])
).order_by(Job.start_at).first()
if current_job:
job_data = current_job.to_dict()
else:
job_data = None
db_session.close()
return jsonify(job_data)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
# ===== WEITERE API-ROUTEN =====
@app.route("/api/printers/<int:printer_id>", methods=["GET"])
@login_required
def get_printer(printer_id):
"""Gibt einen spezifischen Drucker zurück."""
db_session = get_db_session()
try:
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Status-Check für diesen Drucker
ip_to_check = printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None)
if ip_to_check:
status, active = check_printer_status(ip_to_check)
printer.status = "available" if status == "online" else "offline"
printer.active = active
db_session.commit()
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline",
"active": printer.active if hasattr(printer, 'active') else True,
"ip_address": ip_to_check,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
return jsonify(printer_data)
except Exception as e:
db_session.close()
printers_logger.error(f"Fehler beim Abrufen des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/printers", methods=["POST"])
@login_required
def create_printer():
"""Erstellt einen neuen Drucker (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker erstellen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["name", "plug_ip"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
db_session = get_db_session()
# Prüfen, ob bereits ein Drucker mit diesem Namen existiert
existing_printer = db_session.query(Printer).filter(Printer.name == data["name"]).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit diesem Namen existiert bereits"}), 400
# Neuen Drucker erstellen
new_printer = Printer(
name=data["name"],
model=data.get("model", ""),
location=data.get("location", ""),
mac_address=data.get("mac_address", ""),
plug_ip=data["plug_ip"],
status="offline",
active=True, # Neue Drucker sind standardmäßig aktiv
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
# Sofortiger Status-Check für den neuen Drucker
ip_to_check = new_printer.plug_ip
if ip_to_check:
status, active = check_printer_status(ip_to_check)
new_printer.status = "available" if status == "online" else "offline"
new_printer.active = active
db_session.commit()
printer_data = {
"id": new_printer.id,
"name": new_printer.name,
"model": new_printer.model,
"location": new_printer.location,
"mac_address": new_printer.mac_address,
"plug_ip": new_printer.plug_ip,
"status": new_printer.status,
"active": new_printer.active,
"created_at": new_printer.created_at.isoformat()
}
db_session.close()
printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}")
return jsonify({"printer": printer_data, "message": "Drucker erfolgreich erstellt"}), 201
except Exception as e:
printers_logger.error(f"Fehler beim Erstellen eines Druckers: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/printers/add", methods=["POST"])
@login_required
def add_printer():
"""Alternativer Endpunkt zum Hinzufügen von Druckern (für Frontend-Kompatibilität)."""
return create_printer()
@app.route("/api/printers/<int:printer_id>", methods=["PUT"])
@login_required
def update_printer(printer_id):
"""Aktualisiert einen Drucker (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker bearbeiten"}), 403
try:
data = request.json
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Aktualisierbare Felder
updatable_fields = ["name", "model", "location", "mac_address", "plug_ip"]
for field in updatable_fields:
if field in data:
setattr(printer, field, data[field])
db_session.commit()
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
printers_logger.info(f"Drucker {printer_id} aktualisiert von Admin {current_user.id}")
return jsonify({"printer": printer_data})
except Exception as e:
printers_logger.error(f"Fehler beim Aktualisieren des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
@login_required
def delete_printer(printer_id):
"""Löscht einen Drucker (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker löschen"}), 403
try:
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Prüfen, ob noch aktive Jobs für diesen Drucker existieren
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({"error": f"Drucker kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400
printer_name = printer.name
db_session.delete(printer)
db_session.commit()
db_session.close()
printers_logger.info(f"Drucker '{printer_name}' (ID: {printer_id}) gelöscht von Admin {current_user.id}")
return jsonify({"message": "Drucker erfolgreich gelöscht"})
except Exception as e:
printers_logger.error(f"Fehler beim Löschen des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["DELETE"])
@login_required
@job_owner_required
def delete_job(job_id):
"""Löscht einen Job."""
try:
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job gelöscht werden kann
if job.status == "running":
db_session.close()
return jsonify({"error": "Laufende Jobs können nicht gelöscht werden"}), 400
job_name = job.name
db_session.delete(job)
db_session.commit()
db_session.close()
jobs_logger.info(f"Job '{job_name}' (ID: {job_id}) gelöscht von Benutzer {current_user.id}")
return jsonify({"message": "Job erfolgreich gelöscht"})
except Exception as e:
jobs_logger.error(f"Fehler beim Löschen des Jobs {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>/cancel", methods=["POST"])
@login_required
@job_owner_required
def cancel_job(job_id):
"""Bricht einen Job ab."""
try:
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job abgebrochen werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht abgebrochen werden"}), 400
# Job als abgebrochen markieren
job.status = "cancelled"
job.actual_end_time = datetime.now()
# Wenn der Job läuft, Steckdose ausschalten
if job.status == "running":
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, False)
db_session.commit()
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} abgebrochen von Benutzer {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Abbrechen des Jobs {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats", methods=["GET"])
@login_required
def get_stats():
"""Gibt Statistiken zurück."""
try:
db_session = get_db_session()
# Grundlegende Statistiken
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Jobs nach Status
completed_jobs = db_session.query(Job).filter(Job.status == "completed").count()
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
cancelled_jobs = db_session.query(Job).filter(Job.status == "cancelled").count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
# Online-Drucker
online_printers = db_session.query(Printer).filter(Printer.status == "available").count()
# Erfolgsrate
finished_jobs = completed_jobs + failed_jobs + cancelled_jobs
success_rate = (completed_jobs / finished_jobs * 100) if finished_jobs > 0 else 0
# Benutzer-spezifische Statistiken (falls nicht Admin)
user_stats = {}
if not current_user.is_admin:
user_jobs = db_session.query(Job).filter(Job.user_id == int(current_user.id)).count()
user_completed = db_session.query(Job).filter(
Job.user_id == int(current_user.id),
Job.status == "completed"
).count()
user_stats = {
"total_jobs": user_jobs,
"completed_jobs": user_completed,
"success_rate": (user_completed / user_jobs * 100) if user_jobs > 0 else 0
}
db_session.close()
stats = {
"total_users": total_users,
"total_printers": total_printers,
"online_printers": online_printers,
"total_jobs": total_jobs,
"completed_jobs": completed_jobs,
"failed_jobs": failed_jobs,
"cancelled_jobs": cancelled_jobs,
"active_jobs": active_jobs,
"success_rate": round(success_rate, 1),
"user_stats": user_stats
}
return jsonify(stats)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Statistiken: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats/charts/job-status", methods=["GET"])
@login_required
def get_job_status_chart_data():
"""Gibt Diagrammdaten für Job-Status-Verteilung zurück."""
try:
db_session = get_db_session()
# Job-Status zählen
job_status_counts = {
'completed': db_session.query(Job).filter(Job.status == 'completed').count(),
'failed': db_session.query(Job).filter(Job.status == 'failed').count(),
'cancelled': db_session.query(Job).filter(Job.status == 'cancelled').count(),
'running': db_session.query(Job).filter(Job.status == 'running').count(),
'scheduled': db_session.query(Job).filter(Job.status == 'scheduled').count()
}
db_session.close()
chart_data = {
'labels': ['Abgeschlossen', 'Fehlgeschlagen', 'Abgebrochen', 'Läuft', 'Geplant'],
'datasets': [{
'label': 'Anzahl Jobs',
'data': [
job_status_counts['completed'],
job_status_counts['failed'],
job_status_counts['cancelled'],
job_status_counts['running'],
job_status_counts['scheduled']
],
'backgroundColor': [
'#10b981', # Grün für abgeschlossen
'#ef4444', # Rot für fehlgeschlagen
'#6b7280', # Grau für abgebrochen
'#3b82f6', # Blau für läuft
'#f59e0b' # Orange für geplant
]
}]
}
return jsonify(chart_data)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Job-Status-Diagrammdaten: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats/charts/printer-usage", methods=["GET"])
@login_required
def get_printer_usage_chart_data():
"""Gibt Diagrammdaten für Drucker-Nutzung zurück."""
try:
db_session = get_db_session()
# Drucker mit Job-Anzahl
printer_usage = db_session.query(
Printer.name,
func.count(Job.id).label('job_count')
).outerjoin(Job).group_by(Printer.id, Printer.name).all()
db_session.close()
chart_data = {
'labels': [usage[0] for usage in printer_usage],
'datasets': [{
'label': 'Anzahl Jobs',
'data': [usage[1] for usage in printer_usage],
'backgroundColor': '#3b82f6',
'borderColor': '#1d4ed8',
'borderWidth': 1
}]
}
return jsonify(chart_data)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Drucker-Nutzung-Diagrammdaten: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats/charts/jobs-timeline", methods=["GET"])
@login_required
def get_jobs_timeline_chart_data():
"""Gibt Diagrammdaten für Jobs-Timeline der letzten 30 Tage zurück."""
try:
db_session = get_db_session()
# Letzte 30 Tage
end_date = datetime.now().date()
start_date = end_date - timedelta(days=30)
# Jobs pro Tag der letzten 30 Tage
daily_jobs = db_session.query(
func.date(Job.created_at).label('date'),
func.count(Job.id).label('count')
).filter(
func.date(Job.created_at) >= start_date,
func.date(Job.created_at) <= end_date
).group_by(func.date(Job.created_at)).all()
# Alle Tage füllen (auch ohne Jobs)
date_dict = {job_date: count for job_date, count in daily_jobs}
labels = []
data = []
current_date = start_date
while current_date <= end_date:
labels.append(current_date.strftime('%d.%m'))
data.append(date_dict.get(current_date, 0))
current_date += timedelta(days=1)
db_session.close()
chart_data = {
'labels': labels,
'datasets': [{
'label': 'Jobs pro Tag',
'data': data,
'fill': True,
'backgroundColor': 'rgba(59, 130, 246, 0.1)',
'borderColor': '#3b82f6',
'tension': 0.4
}]
}
return jsonify(chart_data)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Jobs-Timeline-Diagrammdaten: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats/charts/user-activity", methods=["GET"])
@login_required
def get_user_activity_chart_data():
"""Gibt Diagrammdaten für Top-Benutzer-Aktivität zurück."""
try:
db_session = get_db_session()
# Top 10 Benutzer nach Job-Anzahl
top_users = db_session.query(
User.username,
func.count(Job.id).label('job_count')
).join(Job).group_by(
User.id, User.username
).order_by(
func.count(Job.id).desc()
).limit(10).all()
db_session.close()
chart_data = {
'labels': [user[0] for user in top_users],
'datasets': [{
'label': 'Anzahl Jobs',
'data': [user[1] for user in top_users],
'backgroundColor': '#8b5cf6',
'borderColor': '#7c3aed',
'borderWidth': 1
}]
}
return jsonify(chart_data)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Benutzer-Aktivität-Diagrammdaten: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats/export", methods=["GET"])
@login_required
def export_stats():
"""Exportiert Statistiken als CSV."""
try:
db_session = get_db_session()
# Basis-Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
completed_jobs = db_session.query(Job).filter(Job.status == "completed").count()
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
# CSV-Inhalt erstellen
import io
import csv
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow(['Metrik', 'Wert'])
# Daten
writer.writerow(['Gesamte Benutzer', total_users])
writer.writerow(['Gesamte Drucker', total_printers])
writer.writerow(['Gesamte Jobs', total_jobs])
writer.writerow(['Abgeschlossene Jobs', completed_jobs])
writer.writerow(['Fehlgeschlagene Jobs', failed_jobs])
writer.writerow(['Erfolgsrate (%)', round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0])
writer.writerow(['Exportiert am', datetime.now().strftime('%d.%m.%Y %H:%M:%S')])
db_session.close()
# Response vorbereiten
output.seek(0)
response = Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=statistiken_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
}
)
return response
except Exception as e:
app_logger.error(f"Fehler beim Exportieren der Statistiken: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/users", methods=["GET"])
@login_required
def get_users():
"""Gibt alle Benutzer zurück (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer anzeigen"}), 403
try:
db_session = get_db_session()
users = db_session.query(User).all()
user_data = []
for user in users:
user_data.append({
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if hasattr(user, 'last_login') and user.last_login else None
})
db_session.close()
return jsonify({"users": user_data})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Benutzer: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["PUT"])
@login_required
def update_user(user_id):
"""Aktualisiert einen Benutzer (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer bearbeiten"}), 403
try:
data = request.json
db_session = get_db_session()
user = db_session.get(User, user_id)
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisierbare Felder
updatable_fields = ["username", "email", "first_name", "last_name", "is_admin"]
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
# Passwort separat behandeln
if "password" in data and data["password"]:
user.set_password(data["password"])
db_session.commit()
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None
}
db_session.close()
user_logger.info(f"Benutzer {user_id} aktualisiert von Admin {current_user.id}")
return jsonify({"user": user_data})
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["DELETE"])
@login_required
def delete_user(user_id):
"""Löscht einen Benutzer (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer löschen"}), 403
# Verhindern, dass sich der Admin selbst löscht
if user_id == current_user.id:
return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400
try:
db_session = get_db_session()
user = db_session.get(User, user_id)
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Prüfen, ob noch aktive Jobs für diesen Benutzer existieren
active_jobs = db_session.query(Job).filter(
Job.user_id == user_id,
Job.status.in_(["scheduled", "running"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({"error": f"Benutzer kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400
username = user.username
db_session.delete(user)
db_session.commit()
db_session.close()
user_logger.info(f"Benutzer '{username}' (ID: {user_id}) gelöscht von Admin {current_user.id}")
return jsonify({"message": "Benutzer erfolgreich gelöscht"})
except Exception as e:
user_logger.error(f"Fehler beim Löschen des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
# ===== FEHLERBEHANDLUNG =====
@app.errorhandler(404)
def not_found_error(error):
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('errors/500.html'), 500
@app.errorhandler(403)
def forbidden_error(error):
return render_template('errors/403.html'), 403
# ===== ADMIN - DATENBANK-VERWALTUNG =====
@app.route('/api/admin/database/stats', methods=['GET'])
@admin_required
def get_database_stats():
"""Gibt Datenbank-Statistiken zurück."""
try:
if database_monitor is None:
return jsonify({
"success": False,
"error": "Database Monitor nicht verfügbar"
}), 503
stats = database_monitor.get_database_stats()
return jsonify({
"success": True,
"stats": stats
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Datenbank-Statistiken: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/health', methods=['GET'])
@admin_required
def check_database_health():
"""Führt eine Datenbank-Gesundheitsprüfung durch."""
try:
if database_monitor is None:
return jsonify({
"success": False,
"error": "Database Monitor nicht verfügbar"
}), 503
health = database_monitor.check_database_health()
return jsonify({
"success": True,
"health": health
})
except Exception as e:
app_logger.error(f"Fehler bei Datenbank-Gesundheitsprüfung: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/optimize', methods=['POST'])
@admin_required
def optimize_database():
"""Führt Datenbank-Optimierung durch."""
try:
if database_monitor is None:
return jsonify({
"success": False,
"error": "Database Monitor nicht verfügbar"
}), 503
result = database_monitor.optimize_database()
return jsonify({
"success": result["success"],
"result": result
})
except Exception as e:
app_logger.error(f"Fehler bei Datenbank-Optimierung: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backup', methods=['POST'])
@admin_required
def create_database_backup():
"""Erstellt ein manuelles Datenbank-Backup."""
try:
if backup_manager is None:
return jsonify({
"success": False,
"error": "Backup Manager nicht verfügbar"
}), 503
data = request.get_json() or {}
compress = data.get('compress', True)
backup_path = backup_manager.create_backup(compress=compress)
return jsonify({
"success": True,
"backup_path": backup_path,
"message": "Backup erfolgreich erstellt"
})
except Exception as e:
app_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backups', methods=['GET'])
@admin_required
def list_database_backups():
"""Listet alle verfügbaren Datenbank-Backups auf."""
try:
if backup_manager is None:
return jsonify({
"success": False,
"error": "Backup Manager nicht verfügbar"
}), 503
backups = backup_manager.get_backup_list()
# Konvertiere datetime-Objekte zu Strings für JSON
for backup in backups:
backup['created'] = backup['created'].isoformat()
return jsonify({
"success": True,
"backups": backups
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Backup-Liste: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backup/restore', methods=['POST'])
@admin_required
def restore_database_backup():
"""Stellt ein Datenbank-Backup wieder her."""
try:
if backup_manager is None:
return jsonify({
"success": False,
"error": "Backup Manager nicht verfügbar"
}), 503
data = request.get_json()
if not data or 'backup_path' not in data:
return jsonify({
"success": False,
"error": "Backup-Pfad erforderlich"
}), 400
backup_path = data['backup_path']
# Sicherheitsprüfung: Nur Backups aus dem Backup-Verzeichnis erlauben
if not backup_path.startswith(backup_manager.backup_dir):
return jsonify({
"success": False,
"error": "Ungültiger Backup-Pfad"
}), 400
success = backup_manager.restore_backup(backup_path)
if success:
return jsonify({
"success": True,
"message": "Backup erfolgreich wiederhergestellt"
})
else:
return jsonify({
"success": False,
"error": "Fehler beim Wiederherstellen des Backups"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Wiederherstellen des Backups: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backup/cleanup', methods=['POST'])
@admin_required
def cleanup_old_backups():
"""Löscht alte Datenbank-Backups."""
try:
backup_dir = os.path.join(os.path.dirname(__file__), 'database', 'backups')
if not os.path.exists(backup_dir):
return jsonify({"error": "Backup-Verzeichnis nicht gefunden"}), 404
# Backups älter als 30 Tage löschen
cutoff_date = datetime.now() - timedelta(days=30)
deleted_count = 0
for filename in os.listdir(backup_dir):
if filename.endswith('.sql'):
file_path = os.path.join(backup_dir, filename)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
deleted_count += 1
return jsonify({
"message": f"{deleted_count} alte Backups gelöscht",
"deleted_count": deleted_count
})
except Exception as e:
app_logger.error(f"Fehler beim Löschen alter Backups: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/admin/stats/live', methods=['GET'])
@admin_required
def get_admin_live_stats():
"""Liefert Live-Statistiken für das Admin-Dashboard."""
try:
db_session = get_db_session()
# Aktuelle Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
# Printer-Status
available_printers = db_session.query(Printer).filter(Printer.status == "available").count()
offline_printers = db_session.query(Printer).filter(Printer.status == "offline").count()
maintenance_printers = db_session.query(Printer).filter(Printer.status == "maintenance").count()
# Jobs heute
today = datetime.now().date()
jobs_today = db_session.query(Job).filter(
func.date(Job.created_at) == today
).count()
# Erfolgreiche Jobs heute
completed_jobs_today = db_session.query(Job).filter(
func.date(Job.created_at) == today,
Job.status == "completed"
).count()
db_session.close()
stats = {
"users": {
"total": total_users
},
"printers": {
"total": total_printers,
"available": available_printers,
"offline": offline_printers,
"maintenance": maintenance_printers
},
"jobs": {
"total": total_jobs,
"active": active_jobs,
"today": jobs_today,
"completed_today": completed_jobs_today
},
"timestamp": datetime.now().isoformat()
}
return jsonify(stats)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Live-Statistiken: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/admin/system/status', methods=['GET'])
@admin_required
def get_system_status():
"""Liefert System-Status-Informationen."""
try:
import psutil
import platform
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Netzwerk (vereinfacht)
network = psutil.net_io_counters()
system_info = {
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"cpu": {
"percent": cpu_percent,
"count": psutil.cpu_count()
},
"memory": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent,
"used": memory.used
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
},
"network": {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv
},
"timestamp": datetime.now().isoformat()
}
return jsonify(system_info)
except ImportError:
return jsonify({
"error": "psutil nicht installiert",
"message": "Systemstatus kann nicht abgerufen werden"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Systemstatus: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/admin/database/status', methods=['GET'])
@admin_required
def get_database_status():
"""Liefert Datenbank-Status-Informationen."""
try:
db_session = get_db_session()
# Tabellen-Informationen sammeln
table_stats = {}
# User-Tabelle
user_count = db_session.query(User).count()
latest_user = db_session.query(User).order_by(User.created_at.desc()).first()
# Printer-Tabelle
printer_count = db_session.query(Printer).count()
latest_printer = db_session.query(Printer).order_by(Printer.created_at.desc()).first()
# Job-Tabelle
job_count = db_session.query(Job).count()
latest_job = db_session.query(Job).order_by(Job.created_at.desc()).first()
table_stats = {
"users": {
"count": user_count,
"latest": latest_user.created_at.isoformat() if latest_user else None
},
"printers": {
"count": printer_count,
"latest": latest_printer.created_at.isoformat() if latest_printer else None
},
"jobs": {
"count": job_count,
"latest": latest_job.created_at.isoformat() if latest_job else None
}
}
db_session.close()
# Datenbank-Dateigröße (falls SQLite)
db_file_size = None
try:
db_path = os.path.join(os.path.dirname(__file__), 'database', 'app.db')
if os.path.exists(db_path):
db_file_size = os.path.getsize(db_path)
except:
pass
status = {
"tables": table_stats,
"database_size": db_file_size,
"timestamp": datetime.now().isoformat(),
"connection_status": "connected"
}
return jsonify(status)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Datenbankstatus: {str(e)}")
return jsonify({
"error": "Datenbankfehler",
"connection_status": "error",
"timestamp": datetime.now().isoformat()
}), 500
# ===== WEITERE UI-ROUTEN =====
@app.route("/terms")
def terms():
"""Zeigt die Nutzungsbedingungen an."""
return render_template("terms.html")
@app.route("/privacy")
def privacy():
"""Zeigt die Datenschutzerklärung an."""
return render_template("privacy.html")
@app.route("/admin/users/add")
@login_required
def admin_add_user_page():
"""Zeigt die Seite zum Hinzufügen eines neuen Benutzers an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return render_template("admin_add_user.html")
@app.route("/admin/printers/add")
@login_required
def admin_add_printer_page():
"""Zeigt die Seite zum Hinzufügen eines neuen Druckers an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return render_template("admin_add_printer.html")
@app.route("/admin/printers/<int:printer_id>/manage")
@login_required
def admin_manage_printer_page(printer_id):
"""Zeigt die Drucker-Verwaltungsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
db_session = get_db_session()
try:
printer = db_session.get(Printer, printer_id)
if not printer:
flash("Drucker nicht gefunden.", "error")
return redirect(url_for("admin_page"))
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline",
"active": printer.active if hasattr(printer, 'active') else True,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
return render_template("admin_manage_printer.html", printer=printer_data)
except Exception as e:
db_session.close()
app_logger.error(f"Fehler beim Laden der Drucker-Verwaltung: {str(e)}")
flash("Fehler beim Laden der Drucker-Daten.", "error")
return redirect(url_for("admin_page"))
@app.route("/admin/printers/<int:printer_id>/settings")
@login_required
def admin_printer_settings_page(printer_id):
"""Zeigt die Drucker-Einstellungsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
db_session = get_db_session()
try:
printer = db_session.get(Printer, printer_id)
if not printer:
flash("Drucker nicht gefunden.", "error")
return redirect(url_for("admin_page"))
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline",
"active": printer.active if hasattr(printer, 'active') else True,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
return render_template("admin_printer_settings.html", printer=printer_data)
except Exception as e:
db_session.close()
app_logger.error(f"Fehler beim Laden der Drucker-Einstellungen: {str(e)}")
flash("Fehler beim Laden der Drucker-Daten.", "error")
return redirect(url_for("admin_page"))
@app.route("/admin/guest-requests")
@login_required
@admin_required
def admin_guest_requests():
"""Admin-Seite für Gastanfragen Verwaltung"""
try:
app_logger.info(f"Admin-Gastanfragen Seite aufgerufen von User {current_user.id}")
return render_template("admin_guest_requests.html")
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Gastanfragen Seite: {str(e)}")
flash("Fehler beim Laden der Gastanfragen-Verwaltung.", "danger")
return redirect(url_for("admin"))
@app.route("/requests/overview")
@login_required
@admin_required
def admin_guest_requests_overview():
"""Admin-Oberfläche für die Verwaltung von Gastanfragen mit direkten Aktionen."""
try:
app_logger.info(f"Admin-Gastanträge Übersicht aufgerufen von User {current_user.id}")
return render_template("admin_guest_requests_overview.html")
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Gastanträge Übersicht: {str(e)}")
flash("Fehler beim Laden der Gastanträge-Übersicht.", "danger")
return redirect(url_for("admin"))
# ===== ADMIN API-ROUTEN FÜR BENUTZER UND DRUCKER =====
@app.route("/api/admin/users", methods=["POST"])
@login_required
def create_user_api():
"""Erstellt einen neuen Benutzer (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer erstellen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["username", "email", "password"]
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400
db_session = get_db_session()
# Prüfen, ob bereits ein Benutzer mit diesem Benutzernamen oder E-Mail existiert
existing_user = db_session.query(User).filter(
(User.username == data["username"]) | (User.email == data["email"])
).first()
if existing_user:
db_session.close()
return jsonify({"error": "Ein Benutzer mit diesem Benutzernamen oder E-Mail existiert bereits"}), 400
# Neuen Benutzer erstellen
new_user = User(
username=data["username"],
email=data["email"],
first_name=data.get("first_name", ""),
last_name=data.get("last_name", ""),
is_admin=data.get("is_admin", False),
created_at=datetime.now()
)
# Passwort setzen
new_user.set_password(data["password"])
db_session.add(new_user)
db_session.commit()
user_data = {
"id": new_user.id,
"username": new_user.username,
"email": new_user.email,
"first_name": new_user.first_name,
"last_name": new_user.last_name,
"is_admin": new_user.is_admin,
"created_at": new_user.created_at.isoformat()
}
db_session.close()
user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}")
return jsonify({"user": user_data}), 201
except Exception as e:
user_logger.error(f"Fehler beim Erstellen eines Benutzers: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/printers/<int:printer_id>/toggle", methods=["POST"])
@login_required
def toggle_printer_power(printer_id):
"""
Schaltet einen Drucker über die zugehörige Steckdose ein/aus.
"""
if not current_user.is_admin:
return jsonify({"error": "Administratorrechte erforderlich"}), 403
try:
# Robuste JSON-Datenverarbeitung
data = {}
try:
if request.is_json and request.get_json():
data = request.get_json()
elif request.form:
# Fallback für Form-Daten
data = request.form.to_dict()
except Exception as json_error:
printers_logger.warning(f"Fehler beim Parsen der JSON-Daten für Drucker {printer_id}: {str(json_error)}")
# Verwende Standard-Werte wenn JSON-Parsing fehlschlägt
data = {}
# Standard-Zustand ermitteln (Toggle-Verhalten)
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Aktuellen Status ermitteln für Toggle-Verhalten
current_status = getattr(printer, 'status', 'offline')
current_active = getattr(printer, 'active', False)
# Zielzustand bestimmen
if 'state' in data:
# Expliziter Zustand angegeben
state = bool(data.get("state", True))
else:
# Toggle-Verhalten: Umschalten basierend auf aktuellem Status
state = not (current_status == "available" and current_active)
db_session.close()
# Steckdose schalten
from utils.job_scheduler import toggle_plug
success = toggle_plug(printer_id, state)
if success:
action = "eingeschaltet" if state else "ausgeschaltet"
printers_logger.info(f"Drucker {printer.name} (ID: {printer_id}) erfolgreich {action} von Admin {current_user.name}")
return jsonify({
"success": True,
"message": f"Drucker erfolgreich {action}",
"printer_id": printer_id,
"printer_name": printer.name,
"state": state,
"action": action
})
else:
printers_logger.error(f"Fehler beim Schalten der Steckdose für Drucker {printer_id}")
return jsonify({
"success": False,
"error": "Fehler beim Schalten der Steckdose",
"printer_id": printer_id
}), 500
except Exception as e:
printers_logger.error(f"Fehler beim Schalten von Drucker {printer_id}: {str(e)}")
return jsonify({
"success": False,
"error": "Interner Serverfehler",
"details": str(e)
}), 500
@app.route("/api/admin/printers/<int:printer_id>/test-tapo", methods=["POST"])
@login_required
@admin_required
def test_printer_tapo_connection(printer_id):
"""
Testet die Tapo-Steckdosen-Verbindung für einen Drucker.
"""
try:
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
if not printer.plug_ip or not printer.plug_username or not printer.plug_password:
db_session.close()
return jsonify({
"error": "Unvollständige Tapo-Konfiguration",
"missing": [
key for key, value in {
"plug_ip": printer.plug_ip,
"plug_username": printer.plug_username,
"plug_password": printer.plug_password
}.items() if not value
]
}), 400
db_session.close()
# Tapo-Verbindung testen
from utils.job_scheduler import test_tapo_connection
test_result = test_tapo_connection(
printer.plug_ip,
printer.plug_username,
printer.plug_password
)
return jsonify({
"printer_id": printer_id,
"printer_name": printer.name,
"tapo_test": test_result
})
except Exception as e:
printers_logger.error(f"Fehler beim Testen der Tapo-Verbindung für Drucker {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler beim Verbindungstest"}), 500
@app.route("/api/admin/printers/test-all-tapo", methods=["POST"])
@login_required
@admin_required
def test_all_printers_tapo_connection():
"""
Testet die Tapo-Steckdosen-Verbindung für alle Drucker.
Nützlich für Diagnose und Setup-Validierung.
"""
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
db_session.close()
if not printers:
return jsonify({
"message": "Keine aktiven Drucker gefunden",
"results": []
})
# Alle Drucker testen
from utils.job_scheduler import test_tapo_connection
results = []
for printer in printers:
result = {
"printer_id": printer.id,
"printer_name": printer.name,
"plug_ip": printer.plug_ip,
"has_config": bool(printer.plug_ip and printer.plug_username and printer.plug_password)
}
if result["has_config"]:
# Tapo-Verbindung testen
test_result = test_tapo_connection(
printer.plug_ip,
printer.plug_username,
printer.plug_password
)
result["tapo_test"] = test_result
else:
result["tapo_test"] = {
"success": False,
"error": "Unvollständige Tapo-Konfiguration",
"device_info": None,
"status": "unconfigured"
}
result["missing_config"] = [
key for key, value in {
"plug_ip": printer.plug_ip,
"plug_username": printer.plug_username,
"plug_password": printer.plug_password
}.items() if not value
]
results.append(result)
# Zusammenfassung erstellen
total_printers = len(results)
successful_connections = sum(1 for r in results if r["tapo_test"]["success"])
configured_printers = sum(1 for r in results if r["has_config"])
return jsonify({
"summary": {
"total_printers": total_printers,
"configured_printers": configured_printers,
"successful_connections": successful_connections,
"success_rate": round(successful_connections / total_printers * 100, 1) if total_printers > 0 else 0
},
"results": results
})
except Exception as e:
printers_logger.error(f"Fehler beim Testen aller Tapo-Verbindungen: {str(e)}")
return jsonify({"error": "Interner Serverfehler beim Massentest"}), 500
# ===== ADMIN FORM ENDPOINTS =====
@app.route("/admin/users/create", methods=["POST"])
@login_required
def admin_create_user_form():
"""Erstellt einen neuen Benutzer über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
email = request.form.get("email", "").strip()
name = request.form.get("name", "").strip()
password = request.form.get("password", "").strip()
role = request.form.get("role", "user").strip()
# Pflichtfelder prüfen
if not email or not password:
flash("E-Mail und Passwort sind erforderlich.", "error")
return redirect(url_for("admin_add_user_page"))
# E-Mail validieren
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
flash("Ungültige E-Mail-Adresse.", "error")
return redirect(url_for("admin_add_user_page"))
db_session = get_db_session()
# Prüfen, ob bereits ein Benutzer mit dieser E-Mail existiert
existing_user = db_session.query(User).filter(User.email == email).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
return redirect(url_for("admin_add_user_page"))
# E-Mail als Username verwenden (falls kein separates Username-Feld)
username = email.split('@')[0]
counter = 1
original_username = username
while db_session.query(User).filter(User.username == username).first():
username = f"{original_username}{counter}"
counter += 1
# Neuen Benutzer erstellen
new_user = User(
username=username,
email=email,
first_name=name.split(' ')[0] if name else "",
last_name=" ".join(name.split(' ')[1:]) if name and ' ' in name else "",
is_admin=(role == "admin"),
created_at=datetime.now()
)
# Passwort setzen
new_user.set_password(password)
db_session.add(new_user)
db_session.commit()
db_session.close()
user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}")
flash(f"Benutzer '{new_user.email}' erfolgreich erstellt.", "success")
return redirect(url_for("admin_page", tab="users"))
except Exception as e:
user_logger.error(f"Fehler beim Erstellen eines Benutzers über Form: {str(e)}")
flash("Fehler beim Erstellen des Benutzers.", "error")
return redirect(url_for("admin_add_user_page"))
@app.route("/admin/printers/create", methods=["POST"])
@login_required
def admin_create_printer_form():
"""Erstellt einen neuen Drucker über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
name = request.form.get("name", "").strip()
ip_address = request.form.get("ip_address", "").strip()
model = request.form.get("model", "").strip()
location = request.form.get("location", "").strip()
description = request.form.get("description", "").strip()
status = request.form.get("status", "available").strip()
# Pflichtfelder prüfen
if not name or not ip_address:
flash("Name und IP-Adresse sind erforderlich.", "error")
return redirect(url_for("admin_add_printer_page"))
# IP-Adresse validieren
import re
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
if not re.match(ip_pattern, ip_address):
flash("Ungültige IP-Adresse.", "error")
return redirect(url_for("admin_add_printer_page"))
db_session = get_db_session()
# Prüfen, ob bereits ein Drucker mit diesem Namen existiert
existing_printer = db_session.query(Printer).filter(Printer.name == name).first()
if existing_printer:
db_session.close()
flash("Ein Drucker mit diesem Namen existiert bereits.", "error")
return redirect(url_for("admin_add_printer_page"))
# Neuen Drucker erstellen
new_printer = Printer(
name=name,
model=model,
location=location,
description=description,
mac_address="", # Wird später ausgefüllt
plug_ip=ip_address,
status=status,
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
db_session.close()
printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}")
flash(f"Drucker '{new_printer.name}' erfolgreich erstellt.", "success")
return redirect(url_for("admin_page", tab="printers"))
except Exception as e:
printers_logger.error(f"Fehler beim Erstellen eines Druckers über Form: {str(e)}")
flash("Fehler beim Erstellen des Druckers.", "error")
return redirect(url_for("admin_add_printer_page"))
@app.route("/admin/users/<int:user_id>/edit", methods=["GET"])
@login_required
def admin_edit_user_page(user_id):
"""Zeigt die Benutzer-Bearbeitungsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
db_session = get_db_session()
try:
user = db_session.get(User, user_id)
if not user:
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for("admin_page", tab="users"))
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name or "",
"is_admin": user.is_admin,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else datetime.now().isoformat()
}
db_session.close()
return render_template("admin_edit_user.html", user=user_data)
except Exception as e:
db_session.close()
app_logger.error(f"Fehler beim Laden der Benutzer-Daten: {str(e)}")
flash("Fehler beim Laden der Benutzer-Daten.", "error")
return redirect(url_for("admin_page", tab="users"))
@app.route("/admin/users/<int:user_id>/update", methods=["POST"])
@login_required
def admin_update_user_form(user_id):
"""Aktualisiert einen Benutzer über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
email = request.form.get("email", "").strip()
name = request.form.get("name", "").strip()
password = request.form.get("password", "").strip()
role = request.form.get("role", "user").strip()
is_active = request.form.get("is_active", "true").strip() == "true"
# Pflichtfelder prüfen
if not email:
flash("E-Mail-Adresse ist erforderlich.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
# E-Mail validieren
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
flash("Ungültige E-Mail-Adresse.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
db_session = get_db_session()
user = db_session.query(User).get(user_id)
if not user:
db_session.close()
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for("admin_page", tab="users"))
# Prüfen, ob bereits ein anderer Benutzer mit dieser E-Mail existiert
existing_user = db_session.query(User).filter(
User.email == email,
User.id != user_id
).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail-Adresse existiert bereits.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
# Benutzer aktualisieren
user.email = email
if name:
user.name = name
# Passwort nur ändern, wenn eines angegeben wurde
if password:
user.password_hash = generate_password_hash(password)
user.role = "admin" if role == "admin" else "user"
user.active = is_active
db_session.commit()
db_session.close()
auth_logger.info(f"Benutzer '{user.email}' (ID: {user_id}) aktualisiert von Admin {current_user.id}")
flash(f"Benutzer '{user.email}' erfolgreich aktualisiert.", "success")
return redirect(url_for("admin_page", tab="users"))
except Exception as e:
auth_logger.error(f"Fehler beim Aktualisieren eines Benutzers über Form: {str(e)}")
flash("Fehler beim Aktualisieren des Benutzers.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
@app.route("/admin/printers/<int:printer_id>/update", methods=["POST"])
@login_required
def admin_update_printer_form(printer_id):
"""Aktualisiert einen Drucker über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
name = request.form.get("name", "").strip()
ip_address = request.form.get("ip_address", "").strip()
model = request.form.get("model", "").strip()
location = request.form.get("location", "").strip()
description = request.form.get("description", "").strip()
status = request.form.get("status", "available").strip()
# Pflichtfelder prüfen
if not name or not ip_address:
flash("Name und IP-Adresse sind erforderlich.", "error")
return redirect(url_for("admin_edit_printer_page", printer_id=printer_id))
# IP-Adresse validieren
import re
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
if not re.match(ip_pattern, ip_address):
flash("Ungültige IP-Adresse.", "error")
return redirect(url_for("admin_edit_printer_page", printer_id=printer_id))
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
flash("Drucker nicht gefunden.", "error")
return redirect(url_for("admin_page", tab="printers"))
# Drucker aktualisieren
printer.name = name
printer.model = model
printer.location = location
printer.description = description
printer.plug_ip = ip_address
printer.status = status
db_session.commit()
db_session.close()
printers_logger.info(f"Drucker '{printer.name}' (ID: {printer_id}) aktualisiert von Admin {current_user.id}")
flash(f"Drucker '{printer.name}' erfolgreich aktualisiert.", "success")
return redirect(url_for("admin_page", tab="printers"))
except Exception as e:
printers_logger.error(f"Fehler beim Aktualisieren eines Druckers über Form: {str(e)}")
flash("Fehler beim Aktualisieren des Druckers.", "error")
return redirect(url_for("admin_edit_printer_page", printer_id=printer_id))
# ===== STARTUP UND MAIN =====
if __name__ == "__main__":
import sys
import signal
import os
# Debug-Modus prüfen
debug_mode = len(sys.argv) > 1 and sys.argv[1] == "--debug"
# Windows-spezifische Umgebungsvariablen setzen für bessere Flask-Kompatibilität
if os.name == 'nt' and debug_mode:
# Entferne problematische Werkzeug-Variablen
os.environ.pop('WERKZEUG_SERVER_FD', None)
os.environ.pop('WERKZEUG_RUN_MAIN', None)
# Setze saubere Umgebung
os.environ['FLASK_ENV'] = 'development'
os.environ['PYTHONIOENCODING'] = 'utf-8'
os.environ['PYTHONUTF8'] = '1'
# Windows-spezifisches Signal-Handling für ordnungsgemäßes Shutdown
def signal_handler(sig, frame):
"""Signal-Handler für ordnungsgemäßes Shutdown."""
app_logger.warning(f"🛑 Signal {sig} empfangen - fahre System herunter...")
try:
# Queue Manager stoppen
app_logger.info("🔄 Beende Queue Manager...")
stop_queue_manager()
# Scheduler stoppen falls aktiviert
if SCHEDULER_ENABLED and scheduler:
try:
scheduler.stop()
app_logger.info("Job-Scheduler gestoppt")
except Exception as e:
app_logger.error(f"Fehler beim Stoppen des Schedulers: {str(e)}")
app_logger.info("✅ Shutdown abgeschlossen")
sys.exit(0)
except Exception as e:
app_logger.error(f"❌ Fehler beim Shutdown: {str(e)}")
sys.exit(1)
# Signal-Handler registrieren (Windows-kompatibel)
if os.name == 'nt': # Windows
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Zusätzlich für Flask-Development-Server
signal.signal(signal.SIGBREAK, signal_handler)
else: # Unix/Linux
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
try:
# Datenbank initialisieren
init_database()
create_initial_admin()
# Template-Hilfsfunktionen registrieren
register_template_helpers(app)
# Drucker-Monitor Steckdosen-Initialisierung beim Start
try:
app_logger.info("🖨️ Starte automatische Steckdosen-Initialisierung...")
initialization_results = printer_monitor.initialize_all_outlets_on_startup()
if initialization_results:
success_count = sum(1 for success in initialization_results.values() if success)
total_count = len(initialization_results)
app_logger.info(f"✅ Steckdosen-Initialisierung: {success_count}/{total_count} Drucker erfolgreich")
if success_count < total_count:
app_logger.warning(f"⚠️ {total_count - success_count} Drucker konnten nicht initialisiert werden")
else:
app_logger.info(" Keine Drucker zur Initialisierung gefunden")
except Exception as e:
app_logger.error(f"❌ Fehler bei automatischer Steckdosen-Initialisierung: {str(e)}")
# Queue-Manager für automatische Drucker-Überwachung starten
# Nur im Produktionsmodus starten (nicht im Debug-Modus)
if not debug_mode:
try:
queue_manager = start_queue_manager()
app_logger.info("✅ Printer Queue Manager erfolgreich gestartet")
# Verbesserte Shutdown-Handler registrieren
def cleanup_queue_manager():
try:
app_logger.info("🔄 Beende Queue Manager...")
stop_queue_manager()
except Exception as e:
app_logger.error(f"❌ Fehler beim Queue Manager Cleanup: {str(e)}")
atexit.register(cleanup_queue_manager)
except Exception as e:
app_logger.error(f"❌ Fehler beim Starten des Queue-Managers: {str(e)}")
else:
app_logger.info("🔄 Debug-Modus: Queue Manager deaktiviert für Entwicklung")
# Scheduler starten (falls aktiviert)
if SCHEDULER_ENABLED:
try:
scheduler.start()
app_logger.info("Job-Scheduler gestartet")
except Exception as e:
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
if debug_mode:
# Debug-Modus: HTTP auf Port 5000
app_logger.info("Starte Debug-Server auf 0.0.0.0:5000 (HTTP)")
# Windows-spezifische Flask-Konfiguration
run_kwargs = {
"host": "0.0.0.0",
"port": 5000,
"debug": True,
"threaded": True
}
if os.name == 'nt':
# Windows: Deaktiviere Auto-Reload um WERKZEUG_SERVER_FD Fehler zu vermeiden
run_kwargs["use_reloader"] = False
run_kwargs["passthrough_errors"] = False
app_logger.info("Windows-Debug-Modus: Auto-Reload deaktiviert")
app.run(**run_kwargs)
else:
# Produktions-Modus: HTTPS auf Port 443
ssl_context = get_ssl_context()
if ssl_context:
app_logger.info("Starte HTTPS-Server auf 0.0.0.0:443")
app.run(
host="0.0.0.0",
port=443,
debug=False,
ssl_context=ssl_context,
threaded=True
)
else:
app_logger.info("Starte HTTP-Server auf 0.0.0.0:8080")
app.run(
host="0.0.0.0",
port=8080,
debug=False,
threaded=True
)
except KeyboardInterrupt:
app_logger.info("🔄 Tastatur-Unterbrechung empfangen - beende Anwendung...")
signal_handler(signal.SIGINT, None)
except Exception as e:
app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}")
# Cleanup bei Fehler
try:
stop_queue_manager()
except:
pass
sys.exit(1)
# ===== FILE-UPLOAD-ROUTEN =====
@app.route('/api/upload/job', methods=['POST'])
@login_required
def upload_job_file():
"""
Lädt eine Datei für einen Druckjob hoch
Form Data:
file: Die hochzuladende Datei
job_name: Name des Jobs (optional)
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
job_name = request.form.get('job_name', '')
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Metadaten für die Datei
metadata = {
'uploader_id': current_user.id,
'uploader_name': current_user.username,
'job_name': job_name
}
# Datei speichern
result = save_job_file(file, current_user.id, metadata)
if result:
relative_path, absolute_path, file_metadata = result
app_logger.info(f"Job-Datei hochgeladen: {file_metadata['original_filename']} von User {current_user.id}")
return jsonify({
'success': True,
'message': 'Datei erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size'],
'metadata': file_metadata
})
else:
return jsonify({'error': 'Fehler beim Speichern der Datei'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen der Job-Datei: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/upload/guest', methods=['POST'])
def upload_guest_file():
"""
Lädt eine Datei für einen Gastauftrag hoch
Form Data:
file: Die hochzuladende Datei
guest_name: Name des Gasts (optional)
guest_email: E-Mail des Gasts (optional)
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
guest_name = request.form.get('guest_name', '')
guest_email = request.form.get('guest_email', '')
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Metadaten für die Datei
metadata = {
'guest_name': guest_name,
'guest_email': guest_email
}
# Datei speichern
result = save_guest_file(file, metadata)
if result:
relative_path, absolute_path, file_metadata = result
app_logger.info(f"Gast-Datei hochgeladen: {file_metadata['original_filename']} für {guest_name or 'Unbekannt'}")
return jsonify({
'success': True,
'message': 'Datei erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size'],
'metadata': file_metadata
})
else:
return jsonify({'error': 'Fehler beim Speichern der Datei'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen der Gast-Datei: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/upload/avatar', methods=['POST'])
@login_required
def upload_avatar():
"""
Lädt ein Avatar-Bild für den aktuellen Benutzer hoch
Form Data:
file: Das Avatar-Bild
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Nur Bilder erlauben
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
if not file.filename or '.' not in file.filename:
return jsonify({'error': 'Ungültiger Dateityp'}), 400
file_ext = file.filename.rsplit('.', 1)[1].lower()
if file_ext not in allowed_extensions:
return jsonify({'error': 'Nur Bilddateien sind erlaubt (PNG, JPG, JPEG, GIF, WebP)'}), 400
# Alte Avatar-Datei löschen falls vorhanden
db_session = get_db_session()
user = db_session.query(User).get(current_user.id)
if user and user.avatar_path:
delete_file_safe(user.avatar_path)
# Neue Avatar-Datei speichern
result = save_avatar_file(file, current_user.id)
if result:
relative_path, absolute_path, file_metadata = result
# Avatar-Pfad in der Datenbank aktualisieren
user.avatar_path = relative_path
db_session.commit()
db_session.close()
app_logger.info(f"Avatar hochgeladen für User {current_user.id}")
return jsonify({
'success': True,
'message': 'Avatar erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size']
})
else:
db_session.close()
return jsonify({'error': 'Fehler beim Speichern des Avatars'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen des Avatars: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/upload/asset', methods=['POST'])
@login_required
@admin_required
def upload_asset():
"""
Lädt ein statisches Asset hoch (nur für Administratoren)
Form Data:
file: Die Asset-Datei
asset_name: Name des Assets (optional)
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
asset_name = request.form.get('asset_name', '')
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Metadaten für die Datei
metadata = {
'uploader_id': current_user.id,
'uploader_name': current_user.username,
'asset_name': asset_name
}
# Datei speichern
result = save_asset_file(file, current_user.id, metadata)
if result:
relative_path, absolute_path, file_metadata = result
app_logger.info(f"Asset hochgeladen: {file_metadata['original_filename']} von Admin {current_user.id}")
return jsonify({
'success': True,
'message': 'Asset erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size'],
'metadata': file_metadata
})
else:
return jsonify({'error': 'Fehler beim Speichern des Assets'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen des Assets: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/upload/log', methods=['POST'])
@login_required
@admin_required
def upload_log():
"""
Lädt eine Log-Datei hoch (nur für Administratoren)
Form Data:
file: Die Log-Datei
log_type: Typ des Logs (optional)
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
log_type = request.form.get('log_type', 'allgemein')
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Metadaten für die Datei
metadata = {
'uploader_id': current_user.id,
'uploader_name': current_user.username,
'log_type': log_type
}
# Datei speichern
result = save_log_file(file, current_user.id, metadata)
if result:
relative_path, absolute_path, file_metadata = result
app_logger.info(f"Log-Datei hochgeladen: {file_metadata['original_filename']} von Admin {current_user.id}")
return jsonify({
'success': True,
'message': 'Log-Datei erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size'],
'metadata': file_metadata
})
else:
return jsonify({'error': 'Fehler beim Speichern der Log-Datei'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen der Log-Datei: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/upload/backup', methods=['POST'])
@login_required
@admin_required
def upload_backup():
"""
Lädt eine Backup-Datei hoch (nur für Administratoren)
Form Data:
file: Die Backup-Datei
backup_type: Typ des Backups (optional)
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
backup_type = request.form.get('backup_type', 'allgemein')
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Metadaten für die Datei
metadata = {
'uploader_id': current_user.id,
'uploader_name': current_user.username,
'backup_type': backup_type
}
# Datei speichern
result = save_backup_file(file, current_user.id, metadata)
if result:
relative_path, absolute_path, file_metadata = result
app_logger.info(f"Backup-Datei hochgeladen: {file_metadata['original_filename']} von Admin {current_user.id}")
return jsonify({
'success': True,
'message': 'Backup-Datei erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size'],
'metadata': file_metadata
})
else:
return jsonify({'error': 'Fehler beim Speichern der Backup-Datei'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen der Backup-Datei: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/upload/temp', methods=['POST'])
@login_required
def upload_temp_file():
"""
Lädt eine temporäre Datei hoch
Form Data:
file: Die temporäre Datei
purpose: Verwendungszweck (optional)
"""
try:
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
file = request.files['file']
purpose = request.form.get('purpose', '')
if file.filename == '':
return jsonify({'error': 'Keine Datei ausgewählt'}), 400
# Metadaten für die Datei
metadata = {
'uploader_id': current_user.id,
'uploader_name': current_user.username,
'purpose': purpose
}
# Datei speichern
result = save_temp_file(file, current_user.id, metadata)
if result:
relative_path, absolute_path, file_metadata = result
app_logger.info(f"Temporäre Datei hochgeladen: {file_metadata['original_filename']} von User {current_user.id}")
return jsonify({
'success': True,
'message': 'Temporäre Datei erfolgreich hochgeladen',
'file_path': relative_path,
'filename': file_metadata['original_filename'],
'unique_filename': file_metadata['unique_filename'],
'file_size': file_metadata['file_size'],
'metadata': file_metadata
})
else:
return jsonify({'error': 'Fehler beim Speichern der temporären Datei'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Hochladen der temporären Datei: {str(e)}")
return jsonify({'error': f'Fehler beim Hochladen: {str(e)}'}), 500
@app.route('/api/files/<path:file_path>', methods=['GET'])
@login_required
def serve_uploaded_file(file_path):
"""
Stellt hochgeladene Dateien bereit (mit Zugriffskontrolle)
"""
try:
# Datei-Info abrufen
file_info = file_manager.get_file_info(file_path)
if not file_info:
return jsonify({'error': 'Datei nicht gefunden'}), 404
# Zugriffskontrolle basierend auf Dateikategorie
if file_path.startswith('jobs/'):
# Job-Dateien: Nur Besitzer und Admins
if not current_user.is_admin:
# Prüfen ob Benutzer der Besitzer ist
if f"user_{current_user.id}" not in file_path:
return jsonify({'error': 'Zugriff verweigert'}), 403
elif file_path.startswith('guests/'):
# Gast-Dateien: Nur Admins
if not current_user.is_admin:
return jsonify({'error': 'Zugriff verweigert'}), 403
elif file_path.startswith('avatars/'):
# Avatar-Dateien: Öffentlich zugänglich für angemeldete Benutzer
pass
elif file_path.startswith('temp/'):
# Temporäre Dateien: Nur Besitzer und Admins
if not current_user.is_admin:
# Prüfen ob Benutzer der Besitzer ist
if f"user_{current_user.id}" not in file_path:
return jsonify({'error': 'Zugriff verweigert'}), 403
else:
# Andere Dateien (assets, logs, backups): Nur Admins
if not current_user.is_admin:
return jsonify({'error': 'Zugriff verweigert'}), 403
# Datei bereitstellen
return send_file(file_info['absolute_path'], as_attachment=False)
except Exception as e:
app_logger.error(f"Fehler beim Bereitstellen der Datei {file_path}: {str(e)}")
return jsonify({'error': 'Fehler beim Laden der Datei'}), 500
@app.route('/api/files/<path:file_path>', methods=['DELETE'])
@login_required
def delete_uploaded_file(file_path):
"""
Löscht eine hochgeladene Datei (mit Zugriffskontrolle)
"""
try:
# Datei-Info abrufen
file_info = file_manager.get_file_info(file_path)
if not file_info:
return jsonify({'error': 'Datei nicht gefunden'}), 404
# Zugriffskontrolle basierend auf Dateikategorie
if file_path.startswith('jobs/'):
# Job-Dateien: Nur Besitzer und Admins
if not current_user.is_admin:
# Prüfen ob Benutzer der Besitzer ist
if f"user_{current_user.id}" not in file_path:
return jsonify({'error': 'Zugriff verweigert'}), 403
elif file_path.startswith('guests/'):
# Gast-Dateien: Nur Admins
if not current_user.is_admin:
return jsonify({'error': 'Zugriff verweigert'}), 403
elif file_path.startswith('avatars/'):
# Avatar-Dateien: Nur Besitzer und Admins
if not current_user.is_admin:
# Prüfen ob Benutzer der Besitzer ist
if f"user_{current_user.id}" not in file_path:
return jsonify({'error': 'Zugriff verweigert'}), 403
elif file_path.startswith('temp/'):
# Temporäre Dateien: Nur Besitzer und Admins
if not current_user.is_admin:
# Prüfen ob Benutzer der Besitzer ist
if f"user_{current_user.id}" not in file_path:
return jsonify({'error': 'Zugriff verweigert'}), 403
else:
# Andere Dateien (assets, logs, backups): Nur Admins
if not current_user.is_admin:
return jsonify({'error': 'Zugriff verweigert'}), 403
# Datei löschen
if delete_file_safe(file_path):
app_logger.info(f"Datei gelöscht: {file_path} von User {current_user.id}")
return jsonify({'success': True, 'message': 'Datei erfolgreich gelöscht'})
else:
return jsonify({'error': 'Fehler beim Löschen der Datei'}), 500
except Exception as e:
app_logger.error(f"Fehler beim Löschen der Datei {file_path}: {str(e)}")
return jsonify({'error': f'Fehler beim Löschen der Datei: {str(e)}'}), 500
@app.route('/api/admin/files/stats', methods=['GET'])
@login_required
@admin_required
def get_file_stats():
"""
Gibt Statistiken zu allen Dateien zurück (nur für Administratoren)
"""
try:
stats = file_manager.get_category_stats()
# Gesamtstatistiken berechnen
total_files = sum(category.get('file_count', 0) for category in stats.values())
total_size = sum(category.get('total_size', 0) for category in stats.values())
return jsonify({
'success': True,
'categories': stats,
'totals': {
'file_count': total_files,
'total_size': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2)
}
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Datei-Statistiken: {str(e)}")
return jsonify({'error': f'Fehler beim Abrufen der Statistiken: {str(e)}'}), 500
@app.route('/api/admin/files/cleanup', methods=['POST'])
@login_required
@admin_required
def cleanup_temp_files():
"""
Räumt temporäre Dateien auf (nur für Administratoren)
"""
try:
data = request.get_json() or {}
max_age_hours = data.get('max_age_hours', 24)
# Temporäre Dateien aufräumen
deleted_count = file_manager.cleanup_temp_files(max_age_hours)
app_logger.info(f"Temporäre Dateien aufgeräumt: {deleted_count} Dateien gelöscht")
return jsonify({
'success': True,
'message': f'{deleted_count} temporäre Dateien erfolgreich gelöscht',
'deleted_count': deleted_count
})
except Exception as e:
app_logger.error(f"Fehler beim Aufräumen temporärer Dateien: {str(e)}")
return jsonify({'error': f'Fehler beim Aufräumen: {str(e)}'}), 500