🔧 Aktualisierung der Backend-Logik und Optimierung der SQLite-Datenbankkonfiguration für Raspberry Pi: Hinzufügen spezifischer Optimierungen, Verbesserung der Fehlerbehandlung und Protokollierung. Einführung von Caching-Mechanismen und Anpassungen für schwache Hardware. 📈

This commit is contained in:
2025-06-01 22:43:42 +02:00
parent 317f7dc9dc
commit 62efe03887
40 changed files with 3856 additions and 229 deletions

Binary file not shown.

335
backend/blueprints/admin.py Normal file
View File

@@ -0,0 +1,335 @@
"""
Admin-Blueprint für das 3D-Druck-Management-System
Dieses Modul enthält alle Admin-spezifischen Routen und Funktionen,
einschließlich Benutzerverwaltung, Systemüberwachung und Drucker-Administration.
"""
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash
from flask_login import login_required, current_user
from functools import wraps
from models import User, Printer, Job, get_db_session, Stats, SystemLog
from utils.logging_config import get_logger
from datetime import datetime
# Blueprint erstellen
admin_blueprint = Blueprint('admin', __name__, url_prefix='/admin')
# Logger initialisieren
admin_logger = get_logger("admin")
def admin_required(f):
"""Decorator für Admin-Berechtigung"""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
admin_logger.info(f"Admin-Check für Funktion {f.__name__}: User authenticated: {current_user.is_authenticated}, User ID: {current_user.id if current_user.is_authenticated else 'None'}, Is Admin: {current_user.is_admin if current_user.is_authenticated else 'None'}")
if not current_user.is_admin:
admin_logger.warning(f"Admin-Zugriff verweigert für User {current_user.id if current_user.is_authenticated else 'Anonymous'} auf Funktion {f.__name__}")
return jsonify({"error": "Nur Administratoren haben Zugriff"}), 403
return f(*args, **kwargs)
return decorated_function
@admin_blueprint.route("/")
@login_required
@admin_required
def admin_dashboard():
"""Admin-Dashboard-Hauptseite"""
try:
db_session = get_db_session()
# Grundlegende Statistiken sammeln
total_users = db_session.query(User).count()
total_printers = db_session.query(Printer).count()
total_jobs = db_session.query(Job).count()
# Aktive Jobs zählen
active_jobs = db_session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).count()
db_session.close()
stats = {
'total_users': total_users,
'total_printers': total_printers,
'total_jobs': total_jobs,
'active_jobs': active_jobs
}
return render_template('admin/dashboard.html', stats=stats)
except Exception as e:
admin_logger.error(f"Fehler beim Laden des Admin-Dashboards: {str(e)}")
flash("Fehler beim Laden der Dashboard-Daten", "error")
return render_template('admin/dashboard.html', stats={})
@admin_blueprint.route("/users")
@login_required
@admin_required
def users_overview():
"""Benutzerübersicht für Administratoren"""
return render_template('admin/users.html')
@admin_blueprint.route("/users/add", methods=["GET"])
@login_required
@admin_required
def add_user_page():
"""Seite zum Hinzufügen eines neuen Benutzers"""
return render_template('admin/add_user.html')
@admin_blueprint.route("/users/<int:user_id>/edit", methods=["GET"])
@login_required
@admin_required
def edit_user_page(user_id):
"""Seite zum Bearbeiten eines Benutzers"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('admin.users_overview'))
db_session.close()
return render_template('admin/edit_user.html', user=user)
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Benutzer-Bearbeitung: {str(e)}")
flash("Fehler beim Laden der Benutzerdaten", "error")
return redirect(url_for('admin.users_overview'))
@admin_blueprint.route("/printers")
@login_required
@admin_required
def printers_overview():
"""Druckerübersicht für Administratoren"""
return render_template('admin/printers.html')
@admin_blueprint.route("/printers/add", methods=["GET"])
@login_required
@admin_required
def add_printer_page():
"""Seite zum Hinzufügen eines neuen Druckers"""
return render_template('admin/add_printer.html')
@admin_blueprint.route("/printers/<int:printer_id>/edit", methods=["GET"])
@login_required
@admin_required
def edit_printer_page(printer_id):
"""Seite zum Bearbeiten eines Druckers"""
try:
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
flash("Drucker nicht gefunden", "error")
return redirect(url_for('admin.printers_overview'))
db_session.close()
return render_template('admin/edit_printer.html', printer=printer)
except Exception as e:
admin_logger.error(f"Fehler beim Laden der Drucker-Bearbeitung: {str(e)}")
flash("Fehler beim Laden der Druckerdaten", "error")
return redirect(url_for('admin.printers_overview'))
@admin_blueprint.route("/guest-requests")
@login_required
@admin_required
def guest_requests():
"""Gäste-Anfragen-Übersicht"""
return render_template('admin/guest_requests.html')
@admin_blueprint.route("/advanced-settings")
@login_required
@admin_required
def advanced_settings():
"""Erweiterte Systemeinstellungen"""
return render_template('admin/advanced_settings.html')
@admin_blueprint.route("/system-health")
@login_required
@admin_required
def system_health():
"""System-Gesundheitsstatus"""
return render_template('admin/system_health.html')
@admin_blueprint.route("/logs")
@login_required
@admin_required
def logs_overview():
"""System-Logs-Übersicht"""
return render_template('admin/logs.html')
@admin_blueprint.route("/maintenance")
@login_required
@admin_required
def maintenance():
"""Wartungsseite"""
return render_template('admin/maintenance.html')
# API-Endpunkte für Admin-Funktionen
@admin_blueprint.route("/api/users", methods=["POST"])
@login_required
@admin_required
def create_user_api():
"""API-Endpunkt zum Erstellen eines neuen Benutzers"""
try:
data = request.get_json()
# Validierung der erforderlichen Felder
required_fields = ['username', 'email', 'password', 'name']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400
db_session = get_db_session()
# Überprüfung auf bereits existierende Benutzer
existing_user = db_session.query(User).filter(
(User.username == data['username']) | (User.email == data['email'])
).first()
if existing_user:
db_session.close()
return jsonify({"error": "Benutzername oder E-Mail bereits vergeben"}), 400
# Neuen Benutzer erstellen
new_user = User(
username=data['username'],
email=data['email'],
name=data['name'],
role=data.get('role', 'user'),
department=data.get('department'),
position=data.get('position'),
phone=data.get('phone'),
bio=data.get('bio')
)
new_user.set_password(data['password'])
db_session.add(new_user)
db_session.commit()
admin_logger.info(f"Neuer Benutzer erstellt: {new_user.username} von Admin {current_user.username}")
db_session.close()
return jsonify({
"success": True,
"message": "Benutzer erfolgreich erstellt",
"user_id": new_user.id
})
except Exception as e:
admin_logger.error(f"Fehler beim Erstellen des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Benutzers"}), 500
@admin_blueprint.route("/api/users/<int:user_id>", methods=["GET"])
@login_required
@admin_required
def get_user_api(user_id):
"""API-Endpunkt zum Abrufen von Benutzerdaten"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio
}
db_session.close()
return jsonify(user_data)
except Exception as e:
admin_logger.error(f"Fehler beim Abrufen der Benutzerdaten: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 500
@admin_blueprint.route("/api/users/<int:user_id>", methods=["PUT"])
@login_required
@admin_required
def update_user_api(user_id):
"""API-Endpunkt zum Aktualisieren von Benutzerdaten"""
try:
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisierbare Felder
updatable_fields = ['username', 'email', 'name', 'role', 'active', 'department', 'position', 'phone', 'bio']
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
# Passwort separat behandeln
if 'password' in data and data['password']:
user.set_password(data['password'])
user.updated_at = datetime.now()
db_session.commit()
admin_logger.info(f"Benutzer {user.username} aktualisiert von Admin {current_user.username}")
db_session.close()
return jsonify({
"success": True,
"message": "Benutzer erfolgreich aktualisiert"
})
except Exception as e:
admin_logger.error(f"Fehler beim Aktualisieren des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Benutzers"}), 500
@admin_blueprint.route("/api/users/<int:user_id>", methods=["DELETE"])
@login_required
@admin_required
def delete_user_api(user_id):
"""API-Endpunkt zum Löschen eines Benutzers"""
try:
if user_id == current_user.id:
return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
username = user.username
db_session.delete(user)
db_session.commit()
admin_logger.info(f"Benutzer {username} gelöscht von Admin {current_user.username}")
db_session.close()
return jsonify({
"success": True,
"message": "Benutzer erfolgreich gelöscht"
})
except Exception as e:
admin_logger.error(f"Fehler beim Löschen des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Löschen des Benutzers"}), 500

336
backend/blueprints/auth.py Normal file
View File

@@ -0,0 +1,336 @@
"""
Authentifizierungs-Blueprint für das 3D-Druck-Management-System
Dieses Modul enthält alle Routen und Funktionen für die Benutzerauthentifizierung,
einschließlich Login, Logout, OAuth-Callbacks und Passwort-Reset.
"""
import logging
from datetime import datetime
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash
from models import User, get_db_session
from utils.logging_config import get_logger
# Blueprint erstellen
auth_blueprint = Blueprint('auth', __name__, url_prefix='/auth')
# Logger initialisieren
auth_logger = get_logger("auth")
@auth_blueprint.route("/login", methods=["GET", "POST"])
def login():
"""Benutzeranmeldung mit E-Mail/Benutzername und Passwort"""
if current_user.is_authenticated:
return redirect(url_for("index"))
error = None
if request.method == "POST":
# Debug-Logging für Request-Details
auth_logger.debug(f"Login-Request: Content-Type={request.content_type}, Headers={dict(request.headers)}")
# Erweiterte Content-Type-Erkennung für AJAX-Anfragen
content_type = request.content_type or ""
is_json_request = (
request.is_json or
"application/json" in content_type or
request.headers.get('X-Requested-With') == 'XMLHttpRequest' or
request.headers.get('Accept', '').startswith('application/json')
)
# Robuste Datenextraktion
username = None
password = None
remember_me = False
try:
if is_json_request:
# JSON-Request verarbeiten
try:
data = request.get_json(force=True) or {}
username = data.get("username") or data.get("email")
password = data.get("password")
remember_me = data.get("remember_me", False)
except Exception as json_error:
auth_logger.warning(f"JSON-Parsing fehlgeschlagen: {str(json_error)}")
# Fallback zu Form-Daten
username = request.form.get("email")
password = request.form.get("password")
remember_me = request.form.get("remember_me") == "on"
else:
# Form-Request verarbeiten
username = request.form.get("email")
password = request.form.get("password")
remember_me = request.form.get("remember_me") == "on"
# Zusätzlicher Fallback für verschiedene Feldnamen
if not username:
username = request.form.get("username") or request.values.get("email") or request.values.get("username")
if not password:
password = request.form.get("password") or request.values.get("password")
except Exception as extract_error:
auth_logger.error(f"Fehler beim Extrahieren der Login-Daten: {str(extract_error)}")
error = "Fehler beim Verarbeiten der Anmeldedaten."
if is_json_request:
return jsonify({"error": error, "success": False}), 400
if not username or not password:
error = "E-Mail-Adresse und Passwort müssen angegeben werden."
auth_logger.warning(f"Unvollständige Login-Daten: username={bool(username)}, password={bool(password)}")
if is_json_request:
return jsonify({"error": error, "success": False}), 400
else:
db_session = None
try:
db_session = get_db_session()
# Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"Benutzer {username} hat sich erfolgreich angemeldet")
next_page = request.args.get("next")
if is_json_request:
return jsonify({
"success": True,
"message": "Anmeldung erfolgreich",
"redirect_url": next_page or url_for("index")
})
else:
if next_page:
return redirect(next_page)
return redirect(url_for("index"))
else:
error = "Ungültige E-Mail-Adresse oder Passwort."
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}")
if is_json_request:
return jsonify({"error": error, "success": False}), 401
except Exception as e:
# Fehlerbehandlung für Datenbankprobleme
error = "Anmeldefehler. Bitte versuchen Sie es später erneut."
auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}")
if is_json_request:
return jsonify({"error": error, "success": False}), 500
finally:
# Sicherstellen, dass die Datenbankverbindung geschlossen wird
if db_session:
try:
db_session.close()
except Exception as close_error:
auth_logger.error(f"Fehler beim Schließen der DB-Session: {str(close_error)}")
return render_template("login.html", error=error)
@auth_blueprint.route("/logout", methods=["GET", "POST"])
@login_required
def logout():
"""Meldet den Benutzer ab"""
auth_logger.info(f"Benutzer {current_user.email} hat sich abgemeldet")
logout_user()
flash("Sie wurden erfolgreich abgemeldet.", "info")
return redirect(url_for("auth.login"))
@auth_blueprint.route("/reset-password-request", methods=["GET", "POST"])
def reset_password_request():
"""Passwort-Reset anfordern (Placeholder)"""
# TODO: Implement password reset functionality
flash("Passwort-Reset-Funktionalität ist noch nicht implementiert.", "info")
return redirect(url_for("auth.login"))
@auth_blueprint.route("/api/login", methods=["POST"])
def api_login():
"""API-Login-Endpunkt für Frontend"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
username = data.get("username")
password = data.get("password")
remember_me = data.get("remember_me", False)
if not username or not password:
return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400
db_session = get_db_session()
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"API-Login erfolgreich für Benutzer {username}")
user_data = {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
}
db_session.close()
return jsonify({
"success": True,
"user": user_data,
"redirect_url": url_for("index")
})
else:
auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}")
db_session.close()
return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401
except Exception as e:
auth_logger.error(f"Fehler beim API-Login: {str(e)}")
return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500
@auth_blueprint.route("/api/callback", methods=["GET", "POST"])
def api_callback():
"""OAuth-Callback-Endpunkt für externe Authentifizierung"""
try:
# OAuth-Provider bestimmen
provider = request.args.get('provider', 'github')
if request.method == "GET":
# Authorization Code aus URL-Parameter extrahieren
code = request.args.get('code')
state = request.args.get('state')
error = request.args.get('error')
if error:
auth_logger.warning(f"OAuth-Fehler von {provider}: {error}")
return jsonify({
"error": f"OAuth-Authentifizierung fehlgeschlagen: {error}",
"redirect_url": url_for("auth.login")
}), 400
if not code:
auth_logger.warning(f"Kein Authorization Code von {provider} erhalten")
return jsonify({
"error": "Kein Authorization Code erhalten",
"redirect_url": url_for("auth.login")
}), 400
# State-Parameter validieren (CSRF-Schutz)
session_state = session.get('oauth_state')
if not state or state != session_state:
auth_logger.warning(f"Ungültiger State-Parameter von {provider}")
return jsonify({
"error": "Ungültiger State-Parameter",
"redirect_url": url_for("auth.login")
}), 400
# OAuth-Token austauschen
if provider == 'github':
user_data = handle_github_callback(code)
else:
auth_logger.error(f"Unbekannter OAuth-Provider: {provider}")
return jsonify({
"error": "Unbekannter OAuth-Provider",
"redirect_url": url_for("auth.login")
}), 400
if not user_data:
return jsonify({
"error": "Fehler beim Abrufen der Benutzerdaten",
"redirect_url": url_for("auth.login")
}), 400
# Benutzer in Datenbank suchen oder erstellen
db_session = get_db_session()
try:
user = db_session.query(User).filter(
User.email == user_data['email']
).first()
if not user:
# Neuen Benutzer erstellen
user = User(
username=user_data['username'],
email=user_data['email'],
name=user_data['name'],
role="user",
oauth_provider=provider,
oauth_id=str(user_data['id'])
)
# Zufälliges Passwort setzen (wird nicht verwendet)
import secrets
user.set_password(secrets.token_urlsafe(32))
db_session.add(user)
db_session.commit()
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
else:
# Bestehenden Benutzer aktualisieren
user.oauth_provider = provider
user.oauth_id = str(user_data['id'])
user.name = user_data['name']
user.updated_at = datetime.now()
db_session.commit()
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=True)
# Session-State löschen
session.pop('oauth_state', None)
response_data = {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
},
"redirect_url": url_for("index")
}
db_session.close()
return jsonify(response_data)
except Exception as e:
db_session.rollback()
db_session.close()
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
return jsonify({
"error": "Datenbankfehler bei der Benutzeranmeldung",
"redirect_url": url_for("auth.login")
}), 500
except Exception as e:
auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}")
return jsonify({
"error": "OAuth-Callback-Fehler",
"redirect_url": url_for("auth.login")
}), 500
def handle_github_callback(code):
"""Verarbeite GitHub OAuth Callback"""
# TODO: Implementiere GitHub OAuth Handling
auth_logger.warning("GitHub OAuth Callback noch nicht implementiert")
return None
def get_github_user_data(access_token):
"""Lade Benutzerdaten von GitHub API"""
# TODO: Implementiere GitHub API Abfrage
auth_logger.warning("GitHub User Data Abfrage noch nicht implementiert")
return None

359
backend/blueprints/user.py Normal file
View File

@@ -0,0 +1,359 @@
"""
Benutzer-Blueprint für das 3D-Druck-Management-System
Dieses Modul enthält alle Benutzer-spezifischen Routen und Funktionen,
einschließlich Profilverwaltung, Einstellungen und Passwort-Änderung.
"""
import json
from datetime import datetime
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, make_response
from flask_login import login_required, current_user
from werkzeug.security import check_password_hash
from models import User, get_db_session
from utils.logging_config import get_logger
# Blueprint erstellen
user_blueprint = Blueprint('user', __name__, url_prefix='/user')
# Logger initialisieren
user_logger = get_logger("user")
@user_blueprint.route("/profile", methods=["GET"])
@login_required
def profile():
"""Benutzerprofil anzeigen"""
return render_template('user/profile.html', user=current_user)
@user_blueprint.route("/settings", methods=["GET"])
@login_required
def settings():
"""Benutzereinstellungen anzeigen"""
return render_template('user/settings.html', user=current_user)
@user_blueprint.route("/update-profile", methods=["POST"])
@login_required
def update_profile():
"""Benutzerprofil aktualisieren (Form-basiert)"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('user.profile'))
# Aktualisierbare Felder aus dem Formular
user.name = request.form.get('name', user.name)
user.email = request.form.get('email', user.email)
user.department = request.form.get('department', user.department)
user.position = request.form.get('position', user.position)
user.phone = request.form.get('phone', user.phone)
user.bio = request.form.get('bio', user.bio)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Profil aktualisiert für Benutzer {user.username}")
flash("Profil erfolgreich aktualisiert", "success")
db_session.close()
return redirect(url_for('user.profile'))
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren des Profils: {str(e)}")
flash("Fehler beim Aktualisieren des Profils", "error")
return redirect(url_for('user.profile'))
@user_blueprint.route("/api/update-settings", methods=["POST"])
@login_required
def api_update_settings():
"""API-Endpunkt für Einstellungen-Updates"""
try:
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Einstellungen JSON aktualisieren
current_settings = user.settings or {}
if isinstance(current_settings, str):
try:
current_settings = json.loads(current_settings)
except json.JSONDecodeError:
current_settings = {}
# Neue Einstellungen hinzufügen/aktualisieren
for key, value in data.items():
current_settings[key] = value
user.settings = json.dumps(current_settings)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Einstellungen aktualisiert für Benutzer {user.username}")
db_session.close()
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren der Einstellungen"}), 500
@user_blueprint.route("/update-settings", methods=["POST"])
@login_required
def update_settings():
"""Benutzereinstellungen aktualisieren (Form-basiert)"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('user.settings'))
# Einstellungen aus dem Formular sammeln
settings = {}
# Theme-Einstellungen
settings['theme'] = request.form.get('theme', 'light')
settings['language'] = request.form.get('language', 'de')
# Benachrichtigungseinstellungen
settings['email_notifications'] = request.form.get('email_notifications') == 'on'
settings['push_notifications'] = request.form.get('push_notifications') == 'on'
settings['job_completion_notifications'] = request.form.get('job_completion_notifications') == 'on'
settings['printer_error_notifications'] = request.form.get('printer_error_notifications') == 'on'
# Dashboard-Einstellungen
settings['default_dashboard_view'] = request.form.get('default_dashboard_view', 'overview')
settings['auto_refresh_interval'] = int(request.form.get('auto_refresh_interval', 30))
# Privacy-Einstellungen
settings['show_profile_publicly'] = request.form.get('show_profile_publicly') == 'on'
settings['allow_job_sharing'] = request.form.get('allow_job_sharing') == 'on'
user.settings = json.dumps(settings)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Einstellungen aktualisiert für Benutzer {user.username}")
flash("Einstellungen erfolgreich aktualisiert", "success")
db_session.close()
return redirect(url_for('user.settings'))
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen: {str(e)}")
flash("Fehler beim Aktualisieren der Einstellungen", "error")
return redirect(url_for('user.settings'))
@user_blueprint.route("/change-password", methods=["POST"])
@login_required
def change_password():
"""Passwort ändern"""
try:
# Daten aus Form oder JSON extrahieren
if request.is_json:
data = request.get_json()
current_password = data.get('current_password')
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
else:
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Validierung
if not all([current_password, new_password, confirm_password]):
error_msg = "Alle Passwort-Felder sind erforderlich"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('user.settings'))
if new_password != confirm_password:
error_msg = "Neue Passwörter stimmen nicht überein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('user.settings'))
if len(new_password) < 8:
error_msg = "Das neue Passwort muss mindestens 8 Zeichen lang sein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('user.settings'))
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
error_msg = "Benutzer nicht gefunden"
if request.is_json:
return jsonify({"error": error_msg}), 404
flash(error_msg, "error")
return redirect(url_for('user.settings'))
# Aktuelles Passwort überprüfen
if not user.check_password(current_password):
db_session.close()
error_msg = "Aktuelles Passwort ist falsch"
if request.is_json:
return jsonify({"error": error_msg}), 401
flash(error_msg, "error")
return redirect(url_for('user.settings'))
# Neues Passwort setzen
user.set_password(new_password)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Passwort geändert für Benutzer {user.username}")
db_session.close()
success_msg = "Passwort erfolgreich geändert"
if request.is_json:
return jsonify({"success": True, "message": success_msg})
flash(success_msg, "success")
return redirect(url_for('user.settings'))
except Exception as e:
user_logger.error(f"Fehler beim Ändern des Passworts: {str(e)}")
error_msg = "Fehler beim Ändern des Passworts"
if request.is_json:
return jsonify({"error": error_msg}), 500
flash(error_msg, "error")
return redirect(url_for('user.settings'))
@user_blueprint.route("/export", methods=["GET"])
@login_required
def export_data():
"""Benutzerdaten exportieren (DSGVO-Compliance)"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('user.settings'))
# Benutzerdaten sammeln
user_data = {
"personal_information": {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"role": user.role,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_activity": user.last_activity.isoformat() if user.last_activity else None
},
"settings": json.loads(user.settings) if user.settings else {},
"jobs": [],
"export_date": datetime.now().isoformat(),
"export_note": "Dies ist ein Export Ihrer persönlichen Daten gemäß DSGVO Art. 20"
}
# Benutzer-Jobs sammeln (falls verfügbar)
try:
from models import Job
user_jobs = db_session.query(Job).filter(Job.user_id == user.id).all()
for job in user_jobs:
user_data["jobs"].append({
"id": job.id,
"filename": job.filename,
"status": job.status,
"created_at": job.created_at.isoformat() if job.created_at else None,
"estimated_duration": job.estimated_duration,
"material_used": job.material_used,
"notes": job.notes
})
except Exception as job_error:
user_logger.warning(f"Fehler beim Sammeln der Job-Daten: {str(job_error)}")
db_session.close()
# JSON-Response erstellen
response = make_response(jsonify(user_data))
response.headers['Content-Disposition'] = f'attachment; filename=user_data_{user.username}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
response.headers['Content-Type'] = 'application/json'
user_logger.info(f"Datenexport erstellt für Benutzer {user.username}")
return response
except Exception as e:
user_logger.error(f"Fehler beim Datenexport: {str(e)}")
flash("Fehler beim Erstellen des Datenexports", "error")
return redirect(url_for('user.settings'))
@user_blueprint.route("/profile", methods=["PUT"])
@login_required
def update_profile_api():
"""API-Endpunkt für Profil-Updates"""
try:
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisierbare Felder (ohne sensitive Daten)
updatable_fields = ['name', 'email', 'department', 'position', 'phone', 'bio']
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Profil über API aktualisiert für Benutzer {user.username}")
# Aktuelle Benutzerdaten zurückgeben
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"role": user.role,
"updated_at": user.updated_at.isoformat()
}
db_session.close()
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert",
"user": user_data
})
except Exception as e:
user_logger.error(f"Fehler beim API-Profil-Update: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Profils"}), 500