3253 lines
121 KiB
Python

import os
import sys
import logging
import threading
import time
import subprocess
import socket
import json
import secrets
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from functools import wraps
from typing import Optional, Dict, List, Tuple, Any, Union
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_file, Response, make_response
from flask_login import LoginManager, login_user, logout_user, login_required, current_user, UserMixin
from werkzeug.security import check_password_hash, generate_password_hash
from werkzeug.utils import secure_filename
import sqlalchemy.exc
import sqlalchemy
from sqlalchemy.orm import joinedload
from sqlalchemy import func
from PyP100 import PyP110
from flask_wtf.csrf import CSRFProtect
from config.settings import (
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context, FLASK_FALLBACK_PORT,
SSL_ENABLED, SSL_CERT_PATH, SSL_KEY_PATH
)
from utils.logging_config import setup_logging, get_logger, log_startup_info
from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin
from utils.job_scheduler import scheduler
from utils.template_helpers import register_template_helpers
from utils.database_utils import backup_manager, database_monitor, maintenance_scheduler
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
login_manager.login_message_category = "info"
@login_manager.user_loader
def load_user(user_id):
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
db_session.close()
return user
# Jinja2 Context Processors
@app.context_processor
def inject_now():
"""Inject the current datetime into templates."""
return {'now': datetime.now()}
# Custom Jinja2 filter für Datumsformatierung
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Format a datetime object to a German-style date and time string"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return value
return value.strftime(format)
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
auth_logger = get_logger("auth")
jobs_logger = get_logger("jobs")
printers_logger = get_logger("printers")
user_logger = get_logger("user")
kiosk_logger = get_logger("kiosk")
# Sicheres Passwort-Hash für Kiosk-Deaktivierung
KIOSK_PASSWORD_HASH = generate_password_hash("744563017196A")
print("Alle Blueprints wurden in app.py integriert")
# Custom decorator für Job-Besitzer-Check
def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
is_admin = current_user.is_admin
if not (is_owner or is_admin):
db_session.close()
return jsonify({"error": "Keine Berechtigung"}), 403
db_session.close()
return f(job_id, *args, **kwargs)
return decorated_function
# Custom decorator für Admin-Check
def admin_required(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren haben Zugriff"}), 403
return f(*args, **kwargs)
return decorated_function
# ===== AUTHENTIFIZIERUNGS-ROUTEN (ehemals auth.py) =====
@app.route("/auth/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
return redirect(url_for("index"))
error = None
if request.method == "POST":
# Unterscheiden zwischen JSON-Anfragen und normalen Formular-Anfragen
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
# Daten je nach Anfrageart auslesen
if is_json_request:
data = request.get_json()
username = data.get("username")
password = data.get("password")
remember_me = data.get("remember_me", False)
else:
username = request.form.get("username")
password = request.form.get("password")
remember_me = request.form.get("remember-me") == "on"
if not username or not password:
error = "Benutzername und Passwort müssen angegeben werden."
if is_json_request:
return jsonify({"error": error}), 400
else:
try:
db_session = get_db_session()
# Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"Benutzer {username} hat sich angemeldet")
next_page = request.args.get("next")
db_session.close()
if is_json_request:
return jsonify({"success": True, "redirect_url": next_page or url_for("index")})
else:
if next_page:
return redirect(next_page)
return redirect(url_for("index"))
else:
error = "Ungültiger Benutzername oder Passwort."
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}")
db_session.close()
if is_json_request:
return jsonify({"error": error}), 401
except Exception as e:
# Fehlerbehandlung für Datenbankprobleme
error = "Anmeldefehler. Bitte versuchen Sie es später erneut."
auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}")
if is_json_request:
return jsonify({"error": error}), 500
return render_template("login.html", error=error)
@app.route("/auth/logout", methods=["GET", "POST"])
@login_required
def auth_logout():
username = current_user.username if hasattr(current_user, "username") else "Unbekannt"
logout_user()
auth_logger.info(f"Benutzer {username} hat sich abgemeldet")
# Unterscheiden zwischen JSON-Anfragen und normalen Anfragen
if request.is_json or request.headers.get('Content-Type') == 'application/json':
return jsonify({"success": True, "redirect_url": url_for("login")})
else:
return redirect(url_for("login"))
@app.route("/auth/api/login", methods=["POST"])
def api_login():
"""API-Login-Endpunkt für Frontend"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
username = data.get("username")
password = data.get("password")
remember_me = data.get("remember_me", False)
if not username or not password:
return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400
db_session = get_db_session()
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"API-Login erfolgreich für Benutzer {username}")
user_data = {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
}
db_session.close()
return jsonify({
"success": True,
"user": user_data,
"redirect_url": url_for("index")
})
else:
auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}")
db_session.close()
return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401
except Exception as e:
auth_logger.error(f"Fehler beim API-Login: {str(e)}")
return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500
@app.route("/auth/api/callback", methods=["GET", "POST"])
def api_callback():
"""OAuth-Callback-Endpunkt für externe Authentifizierung"""
try:
# OAuth-Provider bestimmen
provider = request.args.get('provider', 'github')
if request.method == "GET":
# Authorization Code aus URL-Parameter extrahieren
code = request.args.get('code')
state = request.args.get('state')
error = request.args.get('error')
if error:
auth_logger.warning(f"OAuth-Fehler von {provider}: {error}")
return jsonify({
"error": f"OAuth-Authentifizierung fehlgeschlagen: {error}",
"redirect_url": url_for("login")
}), 400
if not code:
auth_logger.warning(f"Kein Authorization Code von {provider} erhalten")
return jsonify({
"error": "Kein Authorization Code erhalten",
"redirect_url": url_for("login")
}), 400
# State-Parameter validieren (CSRF-Schutz)
session_state = session.get('oauth_state')
if not state or state != session_state:
auth_logger.warning(f"Ungültiger State-Parameter von {provider}")
return jsonify({
"error": "Ungültiger State-Parameter",
"redirect_url": url_for("login")
}), 400
# OAuth-Token austauschen
if provider == 'github':
user_data = handle_github_callback(code)
else:
auth_logger.error(f"Unbekannter OAuth-Provider: {provider}")
return jsonify({
"error": "Unbekannter OAuth-Provider",
"redirect_url": url_for("login")
}), 400
if not user_data:
return jsonify({
"error": "Fehler beim Abrufen der Benutzerdaten",
"redirect_url": url_for("login")
}), 400
# Benutzer in Datenbank suchen oder erstellen
db_session = get_db_session()
try:
user = db_session.query(User).filter(
User.email == user_data['email']
).first()
if not user:
# Neuen Benutzer erstellen
user = User(
username=user_data['username'],
email=user_data['email'],
name=user_data['name'],
is_admin=False,
oauth_provider=provider,
oauth_id=str(user_data['id'])
)
# Zufälliges Passwort setzen (wird nicht verwendet)
import secrets
user.set_password(secrets.token_urlsafe(32))
db_session.add(user)
db_session.commit()
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
else:
# Bestehenden Benutzer aktualisieren
user.oauth_provider = provider
user.oauth_id = str(user_data['id'])
user.name = user_data['name']
user.updated_at = datetime.now()
db_session.commit()
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=True)
# Session-State löschen
session.pop('oauth_state', None)
response_data = {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
},
"redirect_url": url_for("index")
}
db_session.close()
return jsonify(response_data)
except Exception as e:
db_session.rollback()
db_session.close()
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
return jsonify({
"error": "Datenbankfehler bei der Benutzeranmeldung",
"redirect_url": url_for("login")
}), 500
elif request.method == "POST":
# POST-Anfragen für manuelle Token-Übermittlung
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
access_token = data.get('access_token')
provider = data.get('provider', 'github')
if not access_token:
return jsonify({"error": "Kein Access Token erhalten"}), 400
# Benutzerdaten mit Access Token abrufen
if provider == 'github':
user_data = get_github_user_data(access_token)
else:
return jsonify({"error": "Unbekannter OAuth-Provider"}), 400
if not user_data:
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 400
# Benutzer verarbeiten (gleiche Logik wie bei GET)
db_session = get_db_session()
try:
user = db_session.query(User).filter(
User.email == user_data['email']
).first()
if not user:
user = User(
username=user_data['username'],
email=user_data['email'],
name=user_data['name'],
is_admin=False,
oauth_provider=provider,
oauth_id=str(user_data['id'])
)
import secrets
user.set_password(secrets.token_urlsafe(32))
db_session.add(user)
db_session.commit()
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
else:
user.oauth_provider = provider
user.oauth_id = str(user_data['id'])
user.name = user_data['name']
user.updated_at = datetime.now()
db_session.commit()
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=True)
response_data = {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
},
"redirect_url": url_for("index")
}
db_session.close()
return jsonify(response_data)
except Exception as e:
db_session.rollback()
db_session.close()
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
return jsonify({
"error": "Datenbankfehler bei der Benutzeranmeldung",
"redirect_url": url_for("login")
}), 500
except Exception as e:
auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}")
return jsonify({
"error": "OAuth-Callback-Fehler",
"redirect_url": url_for("login")
}), 500
def handle_github_callback(code):
"""GitHub OAuth-Callback verarbeiten"""
try:
import requests
# GitHub OAuth-Konfiguration (sollte aus Umgebungsvariablen kommen)
client_id = "7c5d8bef1a5519ec1fdc"
client_secret = "5f1e586204358fbd53cf5fb7d418b3f06ccab8fd"
if not client_id or not client_secret:
auth_logger.error("GitHub OAuth-Konfiguration fehlt")
return None
# Access Token anfordern
token_url = "https://github.com/login/oauth/access_token"
token_data = {
'client_id': client_id,
'client_secret': client_secret,
'code': code
}
token_response = requests.post(
token_url,
data=token_data,
headers={'Accept': 'application/json'},
timeout=10
)
if token_response.status_code != 200:
auth_logger.error(f"GitHub Token-Anfrage fehlgeschlagen: {token_response.status_code}")
return None
token_json = token_response.json()
access_token = token_json.get('access_token')
if not access_token:
auth_logger.error("Kein Access Token von GitHub erhalten")
return None
return get_github_user_data(access_token)
except Exception as e:
auth_logger.error(f"Fehler bei GitHub OAuth-Callback: {str(e)}")
return None
def get_github_user_data(access_token):
"""GitHub-Benutzerdaten mit Access Token abrufen"""
try:
import requests
# Benutzerdaten von GitHub API abrufen
user_url = "https://api.github.com/user"
headers = {
'Authorization': f'token {access_token}',
'Accept': 'application/vnd.github.v3+json'
}
user_response = requests.get(user_url, headers=headers, timeout=10)
if user_response.status_code != 200:
auth_logger.error(f"GitHub User-API-Anfrage fehlgeschlagen: {user_response.status_code}")
return None
user_data = user_response.json()
# E-Mail-Adresse separat abrufen (falls nicht öffentlich)
email = user_data.get('email')
if not email:
email_url = "https://api.github.com/user/emails"
email_response = requests.get(email_url, headers=headers, timeout=10)
if email_response.status_code == 200:
emails = email_response.json()
# Primäre E-Mail-Adresse finden
for email_obj in emails:
if email_obj.get('primary', False):
email = email_obj.get('email')
break
# Fallback: Erste E-Mail-Adresse verwenden
if not email and emails:
email = emails[0].get('email')
if not email:
auth_logger.error("Keine E-Mail-Adresse von GitHub erhalten")
return None
return {
'id': user_data.get('id'),
'username': user_data.get('login'),
'name': user_data.get('name') or user_data.get('login'),
'email': email
}
except Exception as e:
auth_logger.error(f"Fehler beim Abrufen der GitHub-Benutzerdaten: {str(e)}")
return None
# ===== BENUTZER-ROUTEN (ehemals user.py) =====
@app.route("/user/profile", methods=["GET"])
@login_required
def user_profile():
"""Profil-Seite anzeigen"""
user_logger.info(f"Benutzer {current_user.username} hat seine Profilseite aufgerufen")
return render_template("profile.html", user=current_user)
@app.route("/user/settings", methods=["GET"])
@login_required
def user_settings():
"""Einstellungen-Seite anzeigen"""
user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungsseite aufgerufen")
return render_template("settings.html", user=current_user)
@app.route("/user/update-profile", methods=["POST"])
@login_required
def user_update_profile():
"""Benutzerprofilinformationen aktualisieren"""
try:
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
if is_json_request:
data = request.get_json()
name = data.get("name")
email = data.get("email")
department = data.get("department")
position = data.get("position")
phone = data.get("phone")
else:
name = request.form.get("name")
email = request.form.get("email")
department = request.form.get("department")
position = request.form.get("position")
phone = request.form.get("phone")
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if user:
# Aktualisiere die Benutzerinformationen
if name:
user.name = name
if email:
user.email = email
if department:
user.department = department
if position:
user.position = position
if phone:
user.phone = phone
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Benutzer {current_user.username} hat sein Profil aktualisiert")
if is_json_request:
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert"
})
else:
flash("Profil erfolgreich aktualisiert", "success")
return redirect(url_for("user_profile"))
else:
error = "Benutzer nicht gefunden."
if is_json_request:
return jsonify({"error": error}), 404
else:
flash(error, "error")
return redirect(url_for("user_profile"))
except Exception as e:
error = f"Fehler beim Aktualisieren des Profils: {str(e)}"
user_logger.error(error)
if request.is_json:
return jsonify({"error": error}), 500
else:
flash(error, "error")
return redirect(url_for("user_profile"))
finally:
db_session.close()
@app.route("/user/api/update-settings", methods=["POST"])
@login_required
def user_api_update_settings():
"""API-Endpunkt für Einstellungen-Updates (JSON)"""
return user_update_settings()
@app.route("/user/update-settings", methods=["POST"])
@login_required
def user_update_settings():
"""Benutzereinstellungen aktualisieren"""
db_session = get_db_session()
try:
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
# Einstellungen aus der Anfrage extrahieren
if is_json_request:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten empfangen"}), 400
theme = data.get("theme", "system")
reduced_motion = bool(data.get("reduced_motion", False))
contrast = data.get("contrast", "normal")
notifications = data.get("notifications", {})
privacy = data.get("privacy", {})
else:
theme = request.form.get("theme", "system")
reduced_motion = request.form.get("reduced_motion") == "on"
contrast = request.form.get("contrast", "normal")
notifications = {
"new_jobs": request.form.get("notify_new_jobs") == "on",
"job_updates": request.form.get("notify_job_updates") == "on",
"system": request.form.get("notify_system") == "on",
"email": request.form.get("notify_email") == "on"
}
privacy = {
"activity_logs": request.form.get("activity_logs") == "on",
"two_factor": request.form.get("two_factor") == "on",
"auto_logout": int(request.form.get("auto_logout", "60"))
}
# Validierung der Eingaben
valid_themes = ["light", "dark", "system"]
if theme not in valid_themes:
theme = "system"
valid_contrasts = ["normal", "high"]
if contrast not in valid_contrasts:
contrast = "normal"
# Benutzer aus der Datenbank laden
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
error = "Benutzer nicht gefunden."
if is_json_request:
return jsonify({"error": error}), 404
else:
flash(error, "error")
return redirect(url_for("user_settings"))
# Einstellungen-Dictionary erstellen
settings = {
"theme": theme,
"reduced_motion": reduced_motion,
"contrast": contrast,
"notifications": {
"new_jobs": bool(notifications.get("new_jobs", True)),
"job_updates": bool(notifications.get("job_updates", True)),
"system": bool(notifications.get("system", True)),
"email": bool(notifications.get("email", False))
},
"privacy": {
"activity_logs": bool(privacy.get("activity_logs", True)),
"two_factor": bool(privacy.get("two_factor", False)),
"auto_logout": max(5, min(480, int(privacy.get("auto_logout", 60)))) # 5-480 Minuten
},
"last_updated": datetime.now().isoformat()
}
# Prüfen, ob User-Tabelle eine settings-Spalte hat
if hasattr(user, 'settings'):
# Einstellungen in der Datenbank speichern
import json
user.settings = json.dumps(settings)
else:
# Fallback: In Session speichern (temporär)
session['user_settings'] = settings
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungen aktualisiert")
if is_json_request:
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert",
"settings": settings
})
else:
flash("Einstellungen erfolgreich aktualisiert", "success")
return redirect(url_for("user_settings"))
except ValueError as e:
error = f"Ungültige Eingabedaten: {str(e)}"
user_logger.warning(f"Ungültige Einstellungsdaten von Benutzer {current_user.username}: {str(e)}")
if is_json_request:
return jsonify({"error": error}), 400
else:
flash(error, "error")
return redirect(url_for("user_settings"))
except Exception as e:
db_session.rollback()
error = f"Fehler beim Aktualisieren der Einstellungen: {str(e)}"
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen für Benutzer {current_user.username}: {str(e)}")
if is_json_request:
return jsonify({"error": "Interner Serverfehler"}), 500
else:
flash("Fehler beim Speichern der Einstellungen", "error")
return redirect(url_for("user_settings"))
finally:
db_session.close()
@app.route("/user/change-password", methods=["POST"])
@login_required
def user_change_password():
"""Benutzerpasswort ändern"""
try:
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
if is_json_request:
data = request.get_json()
current_password = data.get("current_password")
new_password = data.get("new_password")
confirm_password = data.get("confirm_password")
else:
current_password = request.form.get("current_password")
new_password = request.form.get("new_password")
confirm_password = request.form.get("confirm_password")
# Prüfen, ob alle Felder ausgefüllt sind
if not current_password or not new_password or not confirm_password:
error = "Alle Passwortfelder müssen ausgefüllt sein."
if is_json_request:
return jsonify({"error": error}), 400
else:
flash(error, "error")
return redirect(url_for("user_profile"))
# Prüfen, ob das neue Passwort und die Bestätigung übereinstimmen
if new_password != confirm_password:
error = "Das neue Passwort und die Bestätigung stimmen nicht überein."
if is_json_request:
return jsonify({"error": error}), 400
else:
flash(error, "error")
return redirect(url_for("user_profile"))
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if user and user.check_password(current_password):
# Passwort aktualisieren
user.set_password(new_password)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Benutzer {current_user.username} hat sein Passwort geändert")
if is_json_request:
return jsonify({
"success": True,
"message": "Passwort erfolgreich geändert"
})
else:
flash("Passwort erfolgreich geändert", "success")
return redirect(url_for("user_profile"))
else:
error = "Das aktuelle Passwort ist nicht korrekt."
if is_json_request:
return jsonify({"error": error}), 401
else:
flash(error, "error")
return redirect(url_for("user_profile"))
except Exception as e:
error = f"Fehler beim Ändern des Passworts: {str(e)}"
user_logger.error(error)
if request.is_json:
return jsonify({"error": error}), 500
else:
flash(error, "error")
return redirect(url_for("user_profile"))
finally:
db_session.close()
@app.route("/user/export", methods=["GET"])
@login_required
def user_export_data():
"""Exportiert alle Benutzerdaten als JSON für DSGVO-Konformität"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Benutzerdaten abrufen
user_data = user.to_dict()
# Jobs des Benutzers abrufen
jobs = db_session.query(Job).filter(Job.user_id == user.id).all()
user_data["jobs"] = [job.to_dict() for job in jobs]
# Aktivitäten und Einstellungen hinzufügen
user_data["settings"] = session.get('user_settings', {})
# Persönliche Statistiken
user_data["statistics"] = {
"total_jobs": len(jobs),
"completed_jobs": len([j for j in jobs if j.status == "finished"]),
"failed_jobs": len([j for j in jobs if j.status == "failed"]),
"account_created": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None
}
db_session.close()
# Daten als JSON-Datei zum Download anbieten
response = make_response(json.dumps(user_data, indent=4))
response.headers["Content-Disposition"] = f"attachment; filename=user_data_{user.username}.json"
response.headers["Content-Type"] = "application/json"
user_logger.info(f"Benutzer {current_user.username} hat seine Daten exportiert")
return response
except Exception as e:
error = f"Fehler beim Exportieren der Benutzerdaten: {str(e)}"
user_logger.error(error)
return jsonify({"error": error}), 500
@app.route("/user/profile", methods=["PUT"])
@login_required
def user_update_profile_api():
"""API-Endpunkt zum Aktualisieren des Benutzerprofils"""
try:
if not request.is_json:
return jsonify({"error": "Anfrage muss im JSON-Format sein"}), 400
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisiere nur die bereitgestellten Felder
if "name" in data:
user.name = data["name"]
if "email" in data:
user.email = data["email"]
if "department" in data:
user.department = data["department"]
if "position" in data:
user.position = data["position"]
if "phone" in data:
user.phone = data["phone"]
if "bio" in data:
user.bio = data["bio"]
user.updated_at = datetime.now()
db_session.commit()
# Aktualisierte Benutzerdaten zurückgeben
user_data = user.to_dict()
db_session.close()
user_logger.info(f"Benutzer {current_user.username} hat sein Profil über die API aktualisiert")
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert",
"user": user_data
})
except Exception as e:
error = f"Fehler beim Aktualisieren des Profils: {str(e)}"
user_logger.error(error)
return jsonify({"error": error}), 500
# ===== KIOSK-KONTROLL-ROUTEN (ehemals kiosk_control.py) =====
@app.route('/api/kiosk/status', methods=['GET'])
def kiosk_get_status():
"""Kiosk-Status abrufen."""
try:
# Prüfen ob Kiosk-Modus aktiv ist
kiosk_active = os.path.exists('/tmp/kiosk_active')
return jsonify({
"active": kiosk_active,
"message": "Kiosk-Status erfolgreich abgerufen"
})
except Exception as e:
kiosk_logger.error(f"Fehler beim Abrufen des Kiosk-Status: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen des Status"}), 500
@app.route('/api/kiosk/deactivate', methods=['POST'])
def kiosk_deactivate():
"""Kiosk-Modus mit Passwort deaktivieren."""
try:
data = request.get_json()
if not data or 'password' not in data:
return jsonify({"error": "Passwort erforderlich"}), 400
password = data['password']
# Passwort überprüfen
if not check_password_hash(KIOSK_PASSWORD_HASH, password):
kiosk_logger.warning(f"Fehlgeschlagener Kiosk-Deaktivierungsversuch von IP: {request.remote_addr}")
return jsonify({"error": "Ungültiges Passwort"}), 401
# Kiosk deaktivieren
try:
# Kiosk-Service stoppen
subprocess.run(['sudo', 'systemctl', 'stop', 'myp-kiosk'], check=True)
subprocess.run(['sudo', 'systemctl', 'disable', 'myp-kiosk'], check=True)
# Kiosk-Marker entfernen
if os.path.exists('/tmp/kiosk_active'):
os.remove('/tmp/kiosk_active')
# Normale Desktop-Umgebung wiederherstellen
subprocess.run(['sudo', 'systemctl', 'set-default', 'graphical.target'], check=True)
kiosk_logger.info(f"Kiosk-Modus erfolgreich deaktiviert von IP: {request.remote_addr}")
return jsonify({
"success": True,
"message": "Kiosk-Modus erfolgreich deaktiviert. System wird neu gestartet."
})
except subprocess.CalledProcessError as e:
kiosk_logger.error(f"Fehler beim Deaktivieren des Kiosk-Modus: {str(e)}")
return jsonify({"error": "Fehler beim Deaktivieren des Kiosk-Modus"}), 500
except Exception as e:
kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Deaktivierung: {str(e)}")
return jsonify({"error": "Unerwarteter Fehler"}), 500
@app.route('/api/kiosk/activate', methods=['POST'])
@login_required
def kiosk_activate():
"""Kiosk-Modus aktivieren (nur für Admins)."""
try:
# Admin-Authentifizierung prüfen
if not current_user.is_admin:
kiosk_logger.warning(f"Nicht-Admin-Benutzer {current_user.username} versuchte Kiosk-Aktivierung")
return jsonify({"error": "Nur Administratoren können den Kiosk-Modus aktivieren"}), 403
# Kiosk aktivieren
try:
# Kiosk-Marker setzen
with open('/tmp/kiosk_active', 'w') as f:
f.write('1')
# Kiosk-Service aktivieren
subprocess.run(['sudo', 'systemctl', 'enable', 'myp-kiosk'], check=True)
subprocess.run(['sudo', 'systemctl', 'start', 'myp-kiosk'], check=True)
kiosk_logger.info(f"Kiosk-Modus erfolgreich aktiviert von Admin {current_user.username} (IP: {request.remote_addr})")
return jsonify({
"success": True,
"message": "Kiosk-Modus erfolgreich aktiviert"
})
except subprocess.CalledProcessError as e:
kiosk_logger.error(f"Fehler beim Aktivieren des Kiosk-Modus: {str(e)}")
return jsonify({"error": "Fehler beim Aktivieren des Kiosk-Modus"}), 500
except Exception as e:
kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Aktivierung: {str(e)}")
return jsonify({"error": "Unerwarteter Fehler"}), 500
@app.route('/api/kiosk/restart', methods=['POST'])
def kiosk_restart_system():
"""System neu starten (nur nach Kiosk-Deaktivierung)."""
try:
data = request.get_json()
if not data or 'password' not in data:
return jsonify({"error": "Passwort erforderlich"}), 400
password = data['password']
# Passwort überprüfen
if not check_password_hash(KIOSK_PASSWORD_HASH, password):
kiosk_logger.warning(f"Fehlgeschlagener Neustart-Versuch von IP: {request.remote_addr}")
return jsonify({"error": "Ungültiges Passwort"}), 401
kiosk_logger.info(f"System-Neustart initiiert von IP: {request.remote_addr}")
# System nach kurzer Verzögerung neu starten
subprocess.Popen(['sudo', 'shutdown', '-r', '+1'])
return jsonify({
"success": True,
"message": "System wird in 1 Minute neu gestartet"
})
except Exception as e:
kiosk_logger.error(f"Fehler beim System-Neustart: {str(e)}")
return jsonify({"error": "Fehler beim Neustart"}), 500
# ===== HILFSFUNKTIONEN =====
def check_printer_status(ip_address: str, timeout: int = 7) -> Tuple[str, bool]:
"""
Überprüft den Status eines Druckers über Ping mit Timeout.
Args:
ip_address: IP-Adresse des Druckers
timeout: Timeout in Sekunden (Standard: 7)
Returns:
Tuple[str, bool]: (Status, Aktiv) - Status ist "online" oder "offline", Aktiv ist True/False
"""
if not ip_address or ip_address.strip() == "":
printers_logger.debug(f"Keine IP-Adresse angegeben")
return "offline", False
try:
# IP-Adresse validieren
import ipaddress
try:
ipaddress.ip_address(ip_address.strip())
except ValueError:
printers_logger.warning(f"Ungültige IP-Adresse: {ip_address}")
return "offline", False
# Windows-spezifischer Ping-Befehl mit Timeout
if os.name == 'nt': # Windows
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()]
else: # Unix/Linux/macOS
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()]
printers_logger.debug(f"Ping-Befehl für {ip_address}: {' '.join(cmd)}")
# Ping ausführen mit Timeout
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
errors='ignore', # Ignoriere Unicode-Fehler
timeout=timeout + 2 # Zusätzlicher Timeout für subprocess
)
# Erfolgreicher Ping (Return Code 0)
if result.returncode == 0:
printers_logger.debug(f"Ping erfolgreich für {ip_address}")
return "online", True
else:
printers_logger.debug(f"Ping fehlgeschlagen für {ip_address} (Return Code: {result.returncode})")
return "offline", False
except subprocess.TimeoutExpired:
printers_logger.warning(f"Ping-Timeout für Drucker {ip_address} nach {timeout} Sekunden")
return "offline", False
except Exception as e:
printers_logger.error(f"Fehler beim Ping für Drucker {ip_address}: {str(e)}")
return "offline", False
def check_multiple_printers_status(printers: List[Dict], timeout: int = 7) -> Dict[int, Tuple[str, bool]]:
"""
Überprüft den Status mehrerer Drucker parallel mit Timeout.
Args:
printers: Liste von Drucker-Dictionaries mit 'id' und 'ip_address'
timeout: Timeout in Sekunden pro Drucker (Standard: 7)
Returns:
Dict[int, Tuple[str, bool]]: Dictionary mit Drucker-ID als Key und (Status, Aktiv) als Value
"""
results = {}
# Parallel-Ausführung mit ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=min(len(printers), 10)) as executor:
# Futures für alle Drucker erstellen
future_to_printer = {
executor.submit(check_printer_status, printer.get('ip_address'), timeout): printer
for printer in printers
}
# Ergebnisse sammeln
for future in as_completed(future_to_printer, timeout=timeout + 2):
printer = future_to_printer[future]
try:
status, active = future.result()
results[printer['id']] = (status, active)
printers_logger.info(f"Drucker {printer['name']} ({printer.get('ip_address')}): {status}")
except Exception as e:
printers_logger.error(f"Fehler bei Status-Check für Drucker {printer['name']}: {str(e)}")
results[printer['id']] = ("offline", False)
return results
# ===== UI-ROUTEN =====
@app.route("/")
def index():
if current_user.is_authenticated:
return render_template("index.html")
return redirect(url_for("login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template("dashboard.html")
@app.route("/profile")
@login_required
def profile_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
return redirect(url_for("user_profile"))
@app.route("/profil")
@login_required
def profil_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user_profile"))
@app.route("/settings")
@login_required
def settings_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
return redirect(url_for("user_settings"))
@app.route("/einstellungen")
@login_required
def einstellungen_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user_settings"))
@app.route("/admin")
@login_required
def admin():
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return redirect(url_for("admin_page"))
@app.route("/demo")
@login_required
def components_demo():
"""Demo-Seite für UI-Komponenten"""
return render_template("components_demo.html")
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistik-Seite an."""
return render_template("stats.html")
@app.route("/admin-dashboard")
@login_required
def admin_page():
"""Zeigt die Administrationsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
active_tab = request.args.get('tab', 'users')
# Daten für das Admin-Panel direkt beim Laden vorbereiten
stats = {}
users = []
printers = []
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
system_info = {"cpu": 0, "memory": 0, "disk": 0}
logs = []
db_session = get_db_session()
try:
# Statistiken laden
from sqlalchemy.orm import joinedload
# Benutzeranzahl
stats["total_users"] = db_session.query(User).count()
# Druckeranzahl und Online-Status
all_printers = db_session.query(Printer).all()
stats["total_printers"] = len(all_printers)
stats["online_printers"] = len([p for p in all_printers if p.status == "online"])
# Aktive Jobs und Warteschlange
stats["active_jobs"] = db_session.query(Job).filter(
Job.status.in_(["printing", "running"])
).count()
stats["queued_jobs"] = db_session.query(Job).filter(
Job.status == "scheduled"
).count()
# Erfolgsrate
total_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"])
).count()
successful_jobs = db_session.query(Job).filter(
Job.status == "completed"
).count()
if total_jobs > 0:
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
else:
stats["success_rate"] = 0
# Benutzer laden
if active_tab == 'users':
users = db_session.query(User).all()
users = [user.to_dict() for user in users]
# Drucker laden
if active_tab == 'printers':
printers = db_session.query(Printer).all()
printers = [printer.to_dict() for printer in printers]
# Scheduler-Status laden
if active_tab == 'scheduler':
from utils.scheduler import scheduler_is_running
scheduler_status = {
"running": scheduler_is_running(),
"message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt"
}
# System-Informationen laden
if active_tab == 'system':
import os
import psutil
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds // 86400)
uptime_hours = int((uptime_seconds % 86400) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
# Datenbank-Status
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'database', 'myp.db')
db_size = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Scheduler-Status
scheduler_running = False
scheduler_jobs = 0
try:
from utils.job_scheduler import scheduler
scheduler_running = scheduler.running
if hasattr(scheduler, 'get_jobs'):
scheduler_jobs = len(scheduler.get_jobs())
except:
pass
# Nächster Job
next_job = db_session.query(Job).filter(
Job.status == "scheduled"
).order_by(Job.created_at.asc()).first()
next_job_time = "Keine geplanten Jobs"
if next_job:
next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M")
system_info = {
"cpu_usage": round(cpu_percent, 1),
"memory_usage": round(memory.percent, 1),
"disk_usage": round((disk.used / disk.total) * 100, 1),
"uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m",
"db_size": f"{db_size:.1f} MB",
"db_connections": "Aktiv",
"scheduler_running": scheduler_running,
"scheduler_jobs": scheduler_jobs,
"next_job": next_job_time
}
# Logs laden
if active_tab == 'logs':
import os
log_level = request.args.get('log_level', 'all')
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
# Logeinträge sammeln
app_logs = []
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
log_file = os.path.join(log_dir, category, f'{category}.log')
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
if log_level != 'all':
if log_level.upper() not in line:
continue
app_logs.append({
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
'category': category,
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
})
# Nach Zeitstempel sortieren (neueste zuerst)
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
finally:
db_session.close()
return render_template(
"admin.html",
active_tab=active_tab,
stats=stats,
users=users,
printers=printers,
scheduler_status=scheduler_status,
system_info=system_info,
logs=logs
)
# Direkter Zugriff auf Logout-Route (für Fallback)
@app.route("/logout", methods=["GET", "POST"])
def logout_redirect():
"""Leitet zur Blueprint-Logout-Route weiter."""
return redirect(url_for("auth_logout"))
# ===== JOB-ROUTEN =====
@app.route("/api/jobs", methods=["GET"])
@login_required
def get_jobs():
db_session = get_db_session()
try:
# Import joinedload for eager loading
from sqlalchemy.orm import joinedload
# Admin sieht alle Jobs, User nur eigene
if current_user.is_admin:
# Eagerly load the user and printer relationships to avoid detached instance errors
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
else:
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
# Convert jobs to dictionaries before closing the session
job_dicts = [job.to_dict() for job in jobs]
db_session.close()
return jsonify({
"jobs": job_dicts
})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job(job_id):
db_session = get_db_session()
try:
from sqlalchemy.orm import joinedload
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs/active', methods=['GET'])
@login_required
def get_active_jobs():
"""
Gibt alle aktiven Jobs zurück.
"""
try:
db_session = get_db_session()
from sqlalchemy.orm import joinedload
active_jobs = db_session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(
Job.status.in_(["scheduled", "running"])
).all()
result = []
for job in active_jobs:
job_dict = job.to_dict()
# Aktuelle Restzeit berechnen
if job.status == "running" and job.end_at:
remaining_time = job.end_at - datetime.now()
if remaining_time.total_seconds() > 0:
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
else:
job_dict["remaining_minutes"] = 0
result.append(job_dict)
db_session.close()
return jsonify({"jobs": result})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs', methods=['POST'])
@login_required
def create_job():
"""
Erstellt einen neuen Job mit dem Status "scheduled".
Body: {
"printer_id": int,
"start_iso": str, # ISO-Datum-String
"duration_minutes": int
}
"""
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["printer_id", "start_iso", "duration_minutes"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren und validieren
printer_id = int(data["printer_id"])
start_iso = data["start_iso"]
duration_minutes = int(data["duration_minutes"])
# Optional: Jobtitel und Dateipfad
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
file_path = data.get("file_path")
# Start-Zeit parsen
try:
start_at = datetime.fromisoformat(start_iso)
except ValueError:
return jsonify({"error": "Ungültiges Startdatum"}), 400
# Dauer validieren
if duration_minutes <= 0:
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
# End-Zeit berechnen
end_at = start_at + timedelta(minutes=duration_minutes)
db_session = get_db_session()
# Prüfen, ob der Drucker existiert
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Neuen Job erstellen
new_job = Job(
name=name,
printer_id=printer_id,
user_id=current_user.id,
owner_id=current_user.id,
start_at=start_at,
end_at=end_at,
status="scheduled",
file_path=file_path,
duration_minutes=duration_minutes
)
db_session.add(new_job)
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = new_job.to_dict()
db_session.close()
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
return jsonify({"job": job_dict}), 201
except Exception as e:
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
@login_required
@job_owner_required
def extend_job(job_id):
"""
Verlängert die Endzeit eines Jobs.
Body: {
"extra_minutes": int
}
"""
try:
data = request.json
# Prüfen, ob die erforderlichen Daten vorhanden sind
if "extra_minutes" not in data:
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
extra_minutes = int(data["extra_minutes"])
# Validieren
if extra_minutes <= 0:
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job verlängert werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
# Endzeit aktualisieren
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
job.duration_minutes += extra_minutes
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
@login_required
def finish_job(job_id):
"""
Beendet einen Job manuell und schaltet die Steckdose aus.
Nur für Administratoren erlaubt.
"""
try:
# Prüfen, ob der Benutzer Administrator ist
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
db_session = get_db_session()
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job beendet werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
if not toggle_plug(job.printer_id, False):
# Trotzdem weitermachen, aber Warnung loggen
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = datetime.now()
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
# ===== DRUCKER-ROUTEN =====
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
"""Gibt alle Drucker zurück - OHNE Status-Check für schnelleres Laden."""
db_session = get_db_session()
try:
# Set timeout for database query
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Database query timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(5) # 5 second timeout for basic printer loading (reduziert)
try:
printers = db_session.query(Printer).all()
signal.alarm(0) # Clear alarm
except TimeoutError:
printers_logger.warning("Database timeout when fetching printers for basic loading")
return jsonify({
'error': 'Database timeout beim Laden der Drucker',
'timeout': True,
'printers': []
}), 408
# Drucker-Daten OHNE Status-Check zusammenstellen für schnelles Laden
printer_data = []
current_time = datetime.now()
for printer in printers:
printer_data.append({
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline", # Letzter bekannter Status
"active": printer.active if hasattr(printer, 'active') else True,
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
"last_checked": printer.last_checked.isoformat() if hasattr(printer, 'last_checked') and printer.last_checked else None
})
db_session.close()
printers_logger.info(f"Schnelles Laden abgeschlossen: {len(printer_data)} Drucker geladen (ohne Status-Check)")
return jsonify({
"printers": printer_data,
"count": len(printer_data),
"message": "Drucker erfolgreich geladen"
})
except Exception as e:
signal.alarm(0) # Clear any remaining alarm
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
return jsonify({
"error": f"Fehler beim Laden der Drucker: {str(e)}",
"printers": []
}), 500
@app.route("/api/printers/status", methods=["GET"])
@login_required
def get_printers_with_status():
"""Gibt alle Drucker MIT aktuellem Status-Check zurück - für Aktualisierung."""
db_session = get_db_session()
try:
# Set timeout for database query
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Database query timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(8) # 8 second timeout for status check loading
try:
printers = db_session.query(Printer).all()
signal.alarm(0) # Clear alarm
except TimeoutError:
printers_logger.warning("Database timeout when fetching printers for status check")
return jsonify({
'error': 'Database timeout beim Status-Check der Drucker',
'timeout': True
}), 408
# Drucker-Daten für Status-Check vorbereiten
printer_data = []
for printer in printers:
# Verwende plug_ip als primäre IP-Adresse, fallback auf ip_address
ip_to_check = printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None)
printer_data.append({
'id': printer.id,
'name': printer.name,
'ip_address': ip_to_check,
'location': printer.location,
'model': printer.model
})
# Status aller Drucker parallel überprüfen mit 7-Sekunden-Timeout
printers_logger.info(f"Starte Status-Check für {len(printer_data)} Drucker mit 7-Sekunden-Timeout")
# Fallback: Wenn keine IP-Adressen vorhanden sind, alle als offline markieren
if not any(p['ip_address'] for p in printer_data):
printers_logger.warning("Keine IP-Adressen für Drucker gefunden - alle als offline markiert")
status_results = {p['id']: ("offline", False) for p in printer_data}
else:
try:
status_results = check_multiple_printers_status(printer_data, timeout=7)
except Exception as e:
printers_logger.error(f"Fehler beim Status-Check: {str(e)}")
# Fallback: alle als offline markieren
status_results = {p['id']: ("offline", False) for p in printer_data}
# Ergebnisse zusammenstellen und Datenbank aktualisieren
status_data = []
current_time = datetime.now()
for printer in printers:
if printer.id in status_results:
status, active = status_results[printer.id]
# Mapping für Frontend-Kompatibilität
if status == "online":
frontend_status = "available"
else:
frontend_status = "offline"
else:
# Fallback falls kein Ergebnis vorliegt
frontend_status = "offline"
active = False
# Status in der Datenbank aktualisieren
printer.status = frontend_status
printer.active = active
# Setze last_checked falls das Feld existiert
if hasattr(printer, 'last_checked'):
printer.last_checked = current_time
status_data.append({
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": frontend_status,
"active": active,
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
"last_checked": current_time.isoformat()
})
# Speichere die aktualisierten Status
try:
db_session.commit()
printers_logger.info("Drucker-Status erfolgreich in Datenbank aktualisiert")
except Exception as e:
printers_logger.warning(f"Fehler beim Speichern der Status-Updates: {str(e)}")
# Nicht kritisch, Status-Check kann trotzdem zurückgegeben werden
db_session.close()
online_count = len([s for s in status_data if s['status'] == 'available'])
printers_logger.info(f"Status-Check abgeschlossen: {online_count} von {len(status_data)} Drucker online")
return jsonify(status_data)
except Exception as e:
signal.alarm(0) # Clear any remaining alarm
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Status-Check der Drucker: {str(e)}")
return jsonify({
"error": f"Fehler beim Status-Check: {str(e)}",
"printers": []
}), 500
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def get_current_job():
"""Gibt den aktuellen Job des Benutzers zurück."""
db_session = get_db_session()
try:
current_job = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status.in_(["scheduled", "running"])
).order_by(Job.start_at).first()
if current_job:
job_data = current_job.to_dict()
else:
job_data = None
db_session.close()
return jsonify(job_data)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
# ===== WEITERE API-ROUTEN =====
@app.route("/api/printers/<int:printer_id>", methods=["GET"])
@login_required
def get_printer(printer_id):
"""Gibt einen spezifischen Drucker zurück."""
db_session = get_db_session()
try:
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Status-Check für diesen Drucker
ip_to_check = printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None)
if ip_to_check:
status, active = check_printer_status(ip_to_check)
printer.status = "available" if status == "online" else "offline"
printer.active = active
db_session.commit()
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline",
"active": printer.active if hasattr(printer, 'active') else True,
"ip_address": ip_to_check,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
return jsonify(printer_data)
except Exception as e:
db_session.close()
printers_logger.error(f"Fehler beim Abrufen des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/printers", methods=["POST"])
@login_required
def create_printer():
"""Erstellt einen neuen Drucker (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker erstellen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["name", "plug_ip"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
db_session = get_db_session()
# Prüfen, ob bereits ein Drucker mit diesem Namen existiert
existing_printer = db_session.query(Printer).filter(Printer.name == data["name"]).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit diesem Namen existiert bereits"}), 400
# Neuen Drucker erstellen
new_printer = Printer(
name=data["name"],
model=data.get("model", ""),
location=data.get("location", ""),
mac_address=data.get("mac_address", ""),
plug_ip=data["plug_ip"],
status="offline",
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
printer_data = {
"id": new_printer.id,
"name": new_printer.name,
"model": new_printer.model,
"location": new_printer.location,
"mac_address": new_printer.mac_address,
"plug_ip": new_printer.plug_ip,
"status": new_printer.status,
"created_at": new_printer.created_at.isoformat()
}
db_session.close()
printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}")
return jsonify({"printer": printer_data}), 201
except Exception as e:
printers_logger.error(f"Fehler beim Erstellen eines Druckers: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/printers/<int:printer_id>", methods=["PUT"])
@login_required
def update_printer(printer_id):
"""Aktualisiert einen Drucker (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker bearbeiten"}), 403
try:
data = request.json
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Aktualisierbare Felder
updatable_fields = ["name", "model", "location", "mac_address", "plug_ip"]
for field in updatable_fields:
if field in data:
setattr(printer, field, data[field])
db_session.commit()
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
printers_logger.info(f"Drucker {printer_id} aktualisiert von Admin {current_user.id}")
return jsonify({"printer": printer_data})
except Exception as e:
printers_logger.error(f"Fehler beim Aktualisieren des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
@login_required
def delete_printer(printer_id):
"""Löscht einen Drucker (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker löschen"}), 403
try:
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Prüfen, ob noch aktive Jobs für diesen Drucker existieren
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({"error": f"Drucker kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400
printer_name = printer.name
db_session.delete(printer)
db_session.commit()
db_session.close()
printers_logger.info(f"Drucker '{printer_name}' (ID: {printer_id}) gelöscht von Admin {current_user.id}")
return jsonify({"message": "Drucker erfolgreich gelöscht"})
except Exception as e:
printers_logger.error(f"Fehler beim Löschen des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["DELETE"])
@login_required
@job_owner_required
def delete_job(job_id):
"""Löscht einen Job."""
try:
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job gelöscht werden kann
if job.status == "running":
db_session.close()
return jsonify({"error": "Laufende Jobs können nicht gelöscht werden"}), 400
job_name = job.name
db_session.delete(job)
db_session.commit()
db_session.close()
jobs_logger.info(f"Job '{job_name}' (ID: {job_id}) gelöscht von Benutzer {current_user.id}")
return jsonify({"message": "Job erfolgreich gelöscht"})
except Exception as e:
jobs_logger.error(f"Fehler beim Löschen des Jobs {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>/cancel", methods=["POST"])
@login_required
@job_owner_required
def cancel_job(job_id):
"""Bricht einen Job ab."""
try:
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job abgebrochen werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht abgebrochen werden"}), 400
# Job als abgebrochen markieren
job.status = "cancelled"
job.actual_end_time = datetime.now()
# Wenn der Job läuft, Steckdose ausschalten
if job.status == "running":
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, False)
db_session.commit()
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} abgebrochen von Benutzer {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Abbrechen des Jobs {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/stats", methods=["GET"])
@login_required
def get_stats():
"""Gibt Statistiken zurück."""
try:
db_session = get_db_session()
# Grundlegende Statistiken
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Jobs nach Status
completed_jobs = db_session.query(Job).filter(Job.status == "completed").count()
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
cancelled_jobs = db_session.query(Job).filter(Job.status == "cancelled").count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
# Online-Drucker
online_printers = db_session.query(Printer).filter(Printer.status == "available").count()
# Erfolgsrate
finished_jobs = completed_jobs + failed_jobs + cancelled_jobs
success_rate = (completed_jobs / finished_jobs * 100) if finished_jobs > 0 else 0
# Benutzer-spezifische Statistiken (falls nicht Admin)
user_stats = {}
if not current_user.is_admin:
user_jobs = db_session.query(Job).filter(Job.user_id == current_user.id).count()
user_completed = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status == "completed"
).count()
user_stats = {
"total_jobs": user_jobs,
"completed_jobs": user_completed,
"success_rate": (user_completed / user_jobs * 100) if user_jobs > 0 else 0
}
db_session.close()
stats = {
"total_users": total_users,
"total_printers": total_printers,
"online_printers": online_printers,
"total_jobs": total_jobs,
"completed_jobs": completed_jobs,
"failed_jobs": failed_jobs,
"cancelled_jobs": cancelled_jobs,
"active_jobs": active_jobs,
"success_rate": round(success_rate, 1),
"user_stats": user_stats
}
return jsonify(stats)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Statistiken: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/users", methods=["GET"])
@login_required
def get_users():
"""Gibt alle Benutzer zurück (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer anzeigen"}), 403
try:
db_session = get_db_session()
users = db_session.query(User).all()
user_data = []
for user in users:
user_data.append({
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if hasattr(user, 'last_login') and user.last_login else None
})
db_session.close()
return jsonify({"users": user_data})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Benutzer: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["PUT"])
@login_required
def update_user(user_id):
"""Aktualisiert einen Benutzer (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer bearbeiten"}), 403
try:
data = request.json
db_session = get_db_session()
user = db_session.get(User, user_id)
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisierbare Felder
updatable_fields = ["username", "email", "first_name", "last_name", "is_admin"]
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
# Passwort separat behandeln
if "password" in data and data["password"]:
user.set_password(data["password"])
db_session.commit()
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None
}
db_session.close()
user_logger.info(f"Benutzer {user_id} aktualisiert von Admin {current_user.id}")
return jsonify({"user": user_data})
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["DELETE"])
@login_required
def delete_user(user_id):
"""Löscht einen Benutzer (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer löschen"}), 403
# Verhindern, dass sich der Admin selbst löscht
if user_id == current_user.id:
return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400
try:
db_session = get_db_session()
user = db_session.get(User, user_id)
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Prüfen, ob noch aktive Jobs für diesen Benutzer existieren
active_jobs = db_session.query(Job).filter(
Job.user_id == user_id,
Job.status.in_(["scheduled", "running"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({"error": f"Benutzer kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400
username = user.username
db_session.delete(user)
db_session.commit()
db_session.close()
user_logger.info(f"Benutzer '{username}' (ID: {user_id}) gelöscht von Admin {current_user.id}")
return jsonify({"message": "Benutzer erfolgreich gelöscht"})
except Exception as e:
user_logger.error(f"Fehler beim Löschen des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
# ===== FEHLERBEHANDLUNG =====
@app.errorhandler(404)
def not_found_error(error):
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('errors/500.html'), 500
@app.errorhandler(403)
def forbidden_error(error):
return render_template('errors/403.html'), 403
# ===== ADMIN - DATENBANK-VERWALTUNG =====
@app.route('/api/admin/database/stats', methods=['GET'])
@admin_required
def get_database_stats():
"""Gibt Datenbank-Statistiken zurück."""
try:
stats = database_monitor.get_database_stats()
return jsonify({
"success": True,
"stats": stats
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Datenbank-Statistiken: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/health', methods=['GET'])
@admin_required
def check_database_health():
"""Führt eine Datenbank-Gesundheitsprüfung durch."""
try:
health = database_monitor.check_database_health()
return jsonify({
"success": True,
"health": health
})
except Exception as e:
app_logger.error(f"Fehler bei Datenbank-Gesundheitsprüfung: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/optimize', methods=['POST'])
@admin_required
def optimize_database():
"""Führt Datenbank-Optimierung durch."""
try:
result = database_monitor.optimize_database()
return jsonify({
"success": result["success"],
"result": result
})
except Exception as e:
app_logger.error(f"Fehler bei Datenbank-Optimierung: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backup', methods=['POST'])
@admin_required
def create_database_backup():
"""Erstellt ein manuelles Datenbank-Backup."""
try:
data = request.get_json() or {}
compress = data.get('compress', True)
backup_path = backup_manager.create_backup(compress=compress)
return jsonify({
"success": True,
"backup_path": backup_path,
"message": "Backup erfolgreich erstellt"
})
except Exception as e:
app_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backups', methods=['GET'])
@admin_required
def list_database_backups():
"""Listet alle verfügbaren Datenbank-Backups auf."""
try:
backups = backup_manager.get_backup_list()
# Konvertiere datetime-Objekte zu Strings für JSON
for backup in backups:
backup['created'] = backup['created'].isoformat()
return jsonify({
"success": True,
"backups": backups
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Backup-Liste: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backup/restore', methods=['POST'])
@admin_required
def restore_database_backup():
"""Stellt ein Datenbank-Backup wieder her."""
try:
data = request.get_json()
if not data or 'backup_path' not in data:
return jsonify({
"success": False,
"error": "Backup-Pfad erforderlich"
}), 400
backup_path = data['backup_path']
# Sicherheitsprüfung: Nur Backups aus dem Backup-Verzeichnis erlauben
if not backup_path.startswith(backup_manager.backup_dir):
return jsonify({
"success": False,
"error": "Ungültiger Backup-Pfad"
}), 400
success = backup_manager.restore_backup(backup_path)
if success:
return jsonify({
"success": True,
"message": "Backup erfolgreich wiederhergestellt"
})
else:
return jsonify({
"success": False,
"error": "Fehler beim Wiederherstellen des Backups"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Wiederherstellen des Backups: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/admin/database/backup/cleanup', methods=['POST'])
@admin_required
def cleanup_old_backups():
"""Löscht alte Datenbank-Backups."""
try:
backup_dir = os.path.join(os.path.dirname(__file__), 'database', 'backups')
if not os.path.exists(backup_dir):
return jsonify({"error": "Backup-Verzeichnis nicht gefunden"}), 404
# Backups älter als 30 Tage löschen
cutoff_date = datetime.now() - timedelta(days=30)
deleted_count = 0
for filename in os.listdir(backup_dir):
if filename.endswith('.sql'):
file_path = os.path.join(backup_dir, filename)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
deleted_count += 1
return jsonify({
"message": f"{deleted_count} alte Backups gelöscht",
"deleted_count": deleted_count
})
except Exception as e:
app_logger.error(f"Fehler beim Löschen alter Backups: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/admin/stats/live', methods=['GET'])
@admin_required
def get_admin_live_stats():
"""Liefert Live-Statistiken für das Admin-Dashboard."""
try:
db_session = get_db_session()
# Aktuelle Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
# Printer-Status
available_printers = db_session.query(Printer).filter(Printer.status == "available").count()
offline_printers = db_session.query(Printer).filter(Printer.status == "offline").count()
maintenance_printers = db_session.query(Printer).filter(Printer.status == "maintenance").count()
# Jobs heute
today = datetime.now().date()
jobs_today = db_session.query(Job).filter(
func.date(Job.created_at) == today
).count()
# Erfolgreiche Jobs heute
completed_jobs_today = db_session.query(Job).filter(
func.date(Job.created_at) == today,
Job.status == "completed"
).count()
db_session.close()
stats = {
"users": {
"total": total_users
},
"printers": {
"total": total_printers,
"available": available_printers,
"offline": offline_printers,
"maintenance": maintenance_printers
},
"jobs": {
"total": total_jobs,
"active": active_jobs,
"today": jobs_today,
"completed_today": completed_jobs_today
},
"timestamp": datetime.now().isoformat()
}
return jsonify(stats)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Live-Statistiken: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/admin/system/status', methods=['GET'])
@admin_required
def get_system_status():
"""Liefert System-Status-Informationen."""
try:
import psutil
import platform
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Netzwerk (vereinfacht)
network = psutil.net_io_counters()
system_info = {
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"cpu": {
"percent": cpu_percent,
"count": psutil.cpu_count()
},
"memory": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent,
"used": memory.used
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
},
"network": {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv
},
"timestamp": datetime.now().isoformat()
}
return jsonify(system_info)
except ImportError:
return jsonify({
"error": "psutil nicht installiert",
"message": "Systemstatus kann nicht abgerufen werden"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Systemstatus: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/admin/database/status', methods=['GET'])
@admin_required
def get_database_status():
"""Liefert Datenbank-Status-Informationen."""
try:
db_session = get_db_session()
# Tabellen-Informationen sammeln
table_stats = {}
# User-Tabelle
user_count = db_session.query(User).count()
latest_user = db_session.query(User).order_by(User.created_at.desc()).first()
# Printer-Tabelle
printer_count = db_session.query(Printer).count()
latest_printer = db_session.query(Printer).order_by(Printer.created_at.desc()).first()
# Job-Tabelle
job_count = db_session.query(Job).count()
latest_job = db_session.query(Job).order_by(Job.created_at.desc()).first()
table_stats = {
"users": {
"count": user_count,
"latest": latest_user.created_at.isoformat() if latest_user else None
},
"printers": {
"count": printer_count,
"latest": latest_printer.created_at.isoformat() if latest_printer else None
},
"jobs": {
"count": job_count,
"latest": latest_job.created_at.isoformat() if latest_job else None
}
}
db_session.close()
# Datenbank-Dateigröße (falls SQLite)
db_file_size = None
try:
db_path = os.path.join(os.path.dirname(__file__), 'database', 'app.db')
if os.path.exists(db_path):
db_file_size = os.path.getsize(db_path)
except:
pass
status = {
"tables": table_stats,
"database_size": db_file_size,
"timestamp": datetime.now().isoformat(),
"connection_status": "connected"
}
return jsonify(status)
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Datenbankstatus: {str(e)}")
return jsonify({
"error": "Datenbankfehler",
"connection_status": "error",
"timestamp": datetime.now().isoformat()
}), 500
# ===== WEITERE UI-ROUTEN =====
@app.route("/terms")
def terms():
"""Zeigt die Nutzungsbedingungen an."""
return render_template("terms.html")
@app.route("/privacy")
def privacy():
"""Zeigt die Datenschutzerklärung an."""
return render_template("privacy.html")
@app.route("/admin/users/add")
@login_required
def admin_add_user_page():
"""Zeigt die Seite zum Hinzufügen eines neuen Benutzers an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return render_template("admin_add_user.html")
@app.route("/admin/printers/add")
@login_required
def admin_add_printer_page():
"""Zeigt die Seite zum Hinzufügen eines neuen Druckers an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return render_template("admin_add_printer.html")
@app.route("/admin/printers/<int:printer_id>/manage")
@login_required
def admin_manage_printer_page(printer_id):
"""Zeigt die Drucker-Verwaltungsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
db_session = get_db_session()
try:
printer = db_session.get(Printer, printer_id)
if not printer:
flash("Drucker nicht gefunden.", "error")
return redirect(url_for("admin_page"))
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline",
"active": printer.active if hasattr(printer, 'active') else True,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
return render_template("admin_manage_printer.html", printer=printer_data)
except Exception as e:
db_session.close()
app_logger.error(f"Fehler beim Laden der Drucker-Verwaltung: {str(e)}")
flash("Fehler beim Laden der Drucker-Daten.", "error")
return redirect(url_for("admin_page"))
@app.route("/admin/printers/<int:printer_id>/settings")
@login_required
def admin_printer_settings_page(printer_id):
"""Zeigt die Drucker-Einstellungsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
db_session = get_db_session()
try:
printer = db_session.get(Printer, printer_id)
if not printer:
flash("Drucker nicht gefunden.", "error")
return redirect(url_for("admin_page"))
printer_data = {
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline",
"active": printer.active if hasattr(printer, 'active') else True,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
}
db_session.close()
return render_template("admin_printer_settings.html", printer=printer_data)
except Exception as e:
db_session.close()
app_logger.error(f"Fehler beim Laden der Drucker-Einstellungen: {str(e)}")
flash("Fehler beim Laden der Drucker-Daten.", "error")
return redirect(url_for("admin_page"))
# ===== ADMIN API-ROUTEN FÜR BENUTZER UND DRUCKER =====
@app.route("/api/admin/users", methods=["POST"])
@login_required
def create_user_api():
"""Erstellt einen neuen Benutzer (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Benutzer erstellen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["username", "email", "password"]
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400
db_session = get_db_session()
# Prüfen, ob bereits ein Benutzer mit diesem Benutzernamen oder E-Mail existiert
existing_user = db_session.query(User).filter(
(User.username == data["username"]) | (User.email == data["email"])
).first()
if existing_user:
db_session.close()
return jsonify({"error": "Ein Benutzer mit diesem Benutzernamen oder E-Mail existiert bereits"}), 400
# Neuen Benutzer erstellen
new_user = User(
username=data["username"],
email=data["email"],
first_name=data.get("first_name", ""),
last_name=data.get("last_name", ""),
is_admin=data.get("is_admin", False),
created_at=datetime.now()
)
# Passwort setzen
new_user.set_password(data["password"])
db_session.add(new_user)
db_session.commit()
user_data = {
"id": new_user.id,
"username": new_user.username,
"email": new_user.email,
"first_name": new_user.first_name,
"last_name": new_user.last_name,
"is_admin": new_user.is_admin,
"created_at": new_user.created_at.isoformat()
}
db_session.close()
user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}")
return jsonify({"user": user_data}), 201
except Exception as e:
user_logger.error(f"Fehler beim Erstellen eines Benutzers: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/printers/<int:printer_id>/toggle", methods=["POST"])
@login_required
def toggle_printer_power(printer_id):
"""Schaltet einen Drucker ein oder aus (nur für Admins)."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker steuern"}), 403
try:
data = request.json
power_on = data.get("power_on", True)
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Steckdose schalten
from utils.job_scheduler import toggle_plug
success = toggle_plug(printer_id, power_on)
if success:
# Status in der Datenbank aktualisieren
printer.status = "available" if power_on else "offline"
printer.active = power_on
db_session.commit()
action = "eingeschaltet" if power_on else "ausgeschaltet"
printers_logger.info(f"Drucker {printer.name} {action} von Admin {current_user.id}")
db_session.close()
return jsonify({
"success": True,
"message": f"Drucker erfolgreich {action}",
"status": printer.status
})
else:
db_session.close()
return jsonify({"error": "Fehler beim Schalten der Steckdose"}), 500
except Exception as e:
printers_logger.error(f"Fehler beim Schalten des Druckers {printer_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
# ===== ADMIN FORM ENDPOINTS =====
@app.route("/admin/users/create", methods=["POST"])
@login_required
def admin_create_user_form():
"""Erstellt einen neuen Benutzer über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
email = request.form.get("email", "").strip()
name = request.form.get("name", "").strip()
password = request.form.get("password", "").strip()
role = request.form.get("role", "user").strip()
# Pflichtfelder prüfen
if not email or not password:
flash("E-Mail und Passwort sind erforderlich.", "error")
return redirect(url_for("admin_add_user_page"))
# E-Mail validieren
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
flash("Ungültige E-Mail-Adresse.", "error")
return redirect(url_for("admin_add_user_page"))
db_session = get_db_session()
# Prüfen, ob bereits ein Benutzer mit dieser E-Mail existiert
existing_user = db_session.query(User).filter(User.email == email).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
return redirect(url_for("admin_add_user_page"))
# E-Mail als Username verwenden (falls kein separates Username-Feld)
username = email.split('@')[0]
counter = 1
original_username = username
while db_session.query(User).filter(User.username == username).first():
username = f"{original_username}{counter}"
counter += 1
# Neuen Benutzer erstellen
new_user = User(
username=username,
email=email,
first_name=name.split(' ')[0] if name else "",
last_name=" ".join(name.split(' ')[1:]) if name and ' ' in name else "",
is_admin=(role == "admin"),
created_at=datetime.now()
)
# Passwort setzen
new_user.set_password(password)
db_session.add(new_user)
db_session.commit()
db_session.close()
user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}")
flash(f"Benutzer '{new_user.email}' erfolgreich erstellt.", "success")
return redirect(url_for("admin_page", tab="users"))
except Exception as e:
user_logger.error(f"Fehler beim Erstellen eines Benutzers über Form: {str(e)}")
flash("Fehler beim Erstellen des Benutzers.", "error")
return redirect(url_for("admin_add_user_page"))
@app.route("/admin/printers/create", methods=["POST"])
@login_required
def admin_create_printer_form():
"""Erstellt einen neuen Drucker über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
name = request.form.get("name", "").strip()
ip_address = request.form.get("ip_address", "").strip()
model = request.form.get("model", "").strip()
location = request.form.get("location", "").strip()
description = request.form.get("description", "").strip()
status = request.form.get("status", "available").strip()
# Pflichtfelder prüfen
if not name or not ip_address:
flash("Name und IP-Adresse sind erforderlich.", "error")
return redirect(url_for("admin_add_printer_page"))
# IP-Adresse validieren
import re
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
if not re.match(ip_pattern, ip_address):
flash("Ungültige IP-Adresse.", "error")
return redirect(url_for("admin_add_printer_page"))
db_session = get_db_session()
# Prüfen, ob bereits ein Drucker mit diesem Namen existiert
existing_printer = db_session.query(Printer).filter(Printer.name == name).first()
if existing_printer:
db_session.close()
flash("Ein Drucker mit diesem Namen existiert bereits.", "error")
return redirect(url_for("admin_add_printer_page"))
# Neuen Drucker erstellen
new_printer = Printer(
name=name,
model=model,
location=location,
description=description,
mac_address="", # Wird später ausgefüllt
plug_ip=ip_address,
status=status,
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
db_session.close()
printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}")
flash(f"Drucker '{new_printer.name}' erfolgreich erstellt.", "success")
return redirect(url_for("admin_page", tab="printers"))
except Exception as e:
printers_logger.error(f"Fehler beim Erstellen eines Druckers über Form: {str(e)}")
flash("Fehler beim Erstellen des Druckers.", "error")
return redirect(url_for("admin_add_printer_page"))
@app.route("/admin/users/<int:user_id>/edit", methods=["GET"])
@login_required
def admin_edit_user_page(user_id):
"""Zeigt die Benutzer-Bearbeitungsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
db_session = get_db_session()
try:
user = db_session.get(User, user_id)
if not user:
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for("admin_page", tab="users"))
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name or "",
"is_admin": user.is_admin,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else datetime.now().isoformat()
}
db_session.close()
return render_template("admin_edit_user.html", user=user_data)
except Exception as e:
db_session.close()
app_logger.error(f"Fehler beim Laden der Benutzer-Daten: {str(e)}")
flash("Fehler beim Laden der Benutzer-Daten.", "error")
return redirect(url_for("admin_page", tab="users"))
@app.route("/admin/users/<int:user_id>/update", methods=["POST"])
@login_required
def admin_update_user_form(user_id):
"""Aktualisiert einen Benutzer über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
email = request.form.get("email", "").strip()
name = request.form.get("name", "").strip()
password = request.form.get("password", "").strip()
role = request.form.get("role", "user").strip()
is_active = request.form.get("is_active", "true").strip() == "true"
# Pflichtfelder prüfen
if not email:
flash("E-Mail-Adresse ist erforderlich.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
# E-Mail validieren
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
flash("Ungültige E-Mail-Adresse.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
db_session = get_db_session()
user = db_session.query(User).get(user_id)
if not user:
db_session.close()
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for("admin_page", tab="users"))
# Prüfen, ob bereits ein anderer Benutzer mit dieser E-Mail existiert
existing_user = db_session.query(User).filter(
User.email == email,
User.id != user_id
).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail-Adresse existiert bereits.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
# Benutzer aktualisieren
user.email = email
if name:
user.name = name
# Passwort nur ändern, wenn eines angegeben wurde
if password:
user.password_hash = generate_password_hash(password)
user.role = "admin" if role == "admin" else "user"
user.active = is_active
db_session.commit()
db_session.close()
auth_logger.info(f"Benutzer '{user.email}' (ID: {user_id}) aktualisiert von Admin {current_user.id}")
flash(f"Benutzer '{user.email}' erfolgreich aktualisiert.", "success")
return redirect(url_for("admin_page", tab="users"))
except Exception as e:
auth_logger.error(f"Fehler beim Aktualisieren eines Benutzers über Form: {str(e)}")
flash("Fehler beim Aktualisieren des Benutzers.", "error")
return redirect(url_for("admin_edit_user_page", user_id=user_id))
@app.route("/admin/printers/<int:printer_id>/update", methods=["POST"])
@login_required
def admin_update_printer_form(printer_id):
"""Aktualisiert einen Drucker über HTML-Form (nur für Admins)."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
try:
# Form-Daten lesen
name = request.form.get("name", "").strip()
ip_address = request.form.get("ip_address", "").strip()
model = request.form.get("model", "").strip()
location = request.form.get("location", "").strip()
description = request.form.get("description", "").strip()
status = request.form.get("status", "available").strip()
# Pflichtfelder prüfen
if not name or not ip_address:
flash("Name und IP-Adresse sind erforderlich.", "error")
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
# IP-Adresse validieren
import re
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
if not re.match(ip_pattern, ip_address):
flash("Ungültige IP-Adresse.", "error")
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
db_session = get_db_session()
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
flash("Drucker nicht gefunden.", "error")
return redirect(url_for("admin_page", tab="printers"))
# Prüfen, ob bereits ein anderer Drucker mit diesem Namen existiert
existing_printer = db_session.query(Printer).filter(
Printer.name == name,
Printer.id != printer_id
).first()
if existing_printer:
db_session.close()
flash("Ein Drucker mit diesem Namen existiert bereits.", "error")
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
# Drucker aktualisieren
printer.name = name
printer.model = model
printer.location = location
printer.description = description
printer.plug_ip = ip_address
printer.status = status
db_session.commit()
db_session.close()
printers_logger.info(f"Drucker '{printer.name}' (ID: {printer_id}) aktualisiert von Admin {current_user.id}")
flash(f"Drucker '{printer.name}' erfolgreich aktualisiert.", "success")
return redirect(url_for("admin_manage_printer_page", printer_id=printer_id))
except Exception as e:
printers_logger.error(f"Fehler beim Aktualisieren eines Druckers über Form: {str(e)}")
flash("Fehler beim Aktualisieren des Druckers.", "error")
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
# ===== STARTUP UND MAIN =====
if __name__ == "__main__":
import sys
# Debug-Modus prüfen
debug_mode = len(sys.argv) > 1 and sys.argv[1] == "--debug"
try:
# Datenbank initialisieren
init_database()
create_initial_admin()
# Template-Hilfsfunktionen registrieren
register_template_helpers(app)
# Scheduler starten (falls aktiviert)
if SCHEDULER_ENABLED:
try:
scheduler.start()
app_logger.info("Job-Scheduler gestartet")
except Exception as e:
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
if debug_mode:
# Debug-Modus: HTTP auf Port 5000
app_logger.info("Starte Debug-Server auf 0.0.0.0:5000 (HTTP)")
app.run(
host="0.0.0.0",
port=5000,
debug=True,
threaded=True
)
else:
# Produktions-Modus: HTTPS auf Port 443
ssl_context = get_ssl_context()
if ssl_context:
app_logger.info("Starte HTTPS-Server auf 0.0.0.0:443")
app.run(
host="0.0.0.0",
port=443,
debug=False,
ssl_context=ssl_context,
threaded=True
)
else:
app_logger.info("Starte HTTP-Server auf 0.0.0.0:80")
app.run(
host="0.0.0.0",
port=80,
debug=False,
threaded=True
)
except Exception as e:
app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}")
sys.exit(1)