2025-06-04 10:03:22 +02:00

359 lines
14 KiB
Python

"""
Benutzer-Blueprint für das 3D-Druck-Management-System
Dieses Modul enthält alle Benutzer-spezifischen Routen und Funktionen,
einschließlich Profilverwaltung, Einstellungen und Passwort-Änderung.
"""
import json
from datetime import datetime
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, make_response
from flask_login import login_required, current_user
from werkzeug.security import check_password_hash
from models import User, get_db_session
from utils.logging_config import get_logger
# Blueprint erstellen
user_blueprint = Blueprint('user', __name__, url_prefix='/user')
# Logger initialisieren
user_logger = get_logger("user")
@user_blueprint.route("/profile", methods=["GET"])
@login_required
def profile():
"""Benutzerprofil anzeigen"""
return render_template('user/profile.html', user=current_user)
@user_blueprint.route("/settings", methods=["GET"])
@login_required
def settings():
"""Benutzereinstellungen anzeigen"""
return render_template('user/settings.html', user=current_user)
@user_blueprint.route("/update-profile", methods=["POST"])
@login_required
def update_profile():
"""Benutzerprofil aktualisieren (Form-basiert)"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('user.profile'))
# Aktualisierbare Felder aus dem Formular
user.name = request.form.get('name', user.name)
user.email = request.form.get('email', user.email)
user.department = request.form.get('department', user.department)
user.position = request.form.get('position', user.position)
user.phone = request.form.get('phone', user.phone)
user.bio = request.form.get('bio', user.bio)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Profil aktualisiert für Benutzer {user.username}")
flash("Profil erfolgreich aktualisiert", "success")
db_session.close()
return redirect(url_for('user.profile'))
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren des Profils: {str(e)}")
flash("Fehler beim Aktualisieren des Profils", "error")
return redirect(url_for('user.profile'))
@user_blueprint.route("/api/update-settings", methods=["POST"])
@login_required
def api_update_settings():
"""API-Endpunkt für Einstellungen-Updates"""
try:
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Einstellungen JSON aktualisieren
current_settings = user.settings or {}
if isinstance(current_settings, str):
try:
current_settings = json.loads(current_settings)
except json.JSONDecodeError:
current_settings = {}
# Neue Einstellungen hinzufügen/aktualisieren
for key, value in data.items():
current_settings[key] = value
user.settings = json.dumps(current_settings)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Einstellungen aktualisiert für Benutzer {user.username}")
db_session.close()
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren der Einstellungen"}), 500
@user_blueprint.route("/update-settings", methods=["POST"])
@login_required
def update_settings():
"""Benutzereinstellungen aktualisieren (Form-basiert)"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('user.settings'))
# Einstellungen aus dem Formular sammeln
settings = {}
# Theme-Einstellungen
settings['theme'] = request.form.get('theme', 'light')
settings['language'] = request.form.get('language', 'de')
# Benachrichtigungseinstellungen
settings['email_notifications'] = request.form.get('email_notifications') == 'on'
settings['push_notifications'] = request.form.get('push_notifications') == 'on'
settings['job_completion_notifications'] = request.form.get('job_completion_notifications') == 'on'
settings['printer_error_notifications'] = request.form.get('printer_error_notifications') == 'on'
# Dashboard-Einstellungen
settings['default_dashboard_view'] = request.form.get('default_dashboard_view', 'overview')
settings['auto_refresh_interval'] = int(request.form.get('auto_refresh_interval', 30))
# Privacy-Einstellungen
settings['show_profile_publicly'] = request.form.get('show_profile_publicly') == 'on'
settings['allow_job_sharing'] = request.form.get('allow_job_sharing') == 'on'
user.settings = json.dumps(settings)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Einstellungen aktualisiert für Benutzer {user.username}")
flash("Einstellungen erfolgreich aktualisiert", "success")
db_session.close()
return redirect(url_for('user.settings'))
except Exception as e:
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen: {str(e)}")
flash("Fehler beim Aktualisieren der Einstellungen", "error")
return redirect(url_for('user.settings'))
@user_blueprint.route("/change-password", methods=["POST"])
@login_required
def change_password():
"""Passwort ändern"""
try:
# Daten aus Form oder JSON extrahieren
if request.is_json:
data = request.get_json()
current_password = data.get('current_password')
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
else:
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Validierung
if not all([current_password, new_password, confirm_password]):
error_msg = "Alle Passwort-Felder sind erforderlich"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('user.settings'))
if new_password != confirm_password:
error_msg = "Neue Passwörter stimmen nicht überein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('user.settings'))
if len(new_password) < 8:
error_msg = "Das neue Passwort muss mindestens 8 Zeichen lang sein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('user.settings'))
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
error_msg = "Benutzer nicht gefunden"
if request.is_json:
return jsonify({"error": error_msg}), 404
flash(error_msg, "error")
return redirect(url_for('user.settings'))
# Aktuelles Passwort überprüfen
if not user.check_password(current_password):
db_session.close()
error_msg = "Aktuelles Passwort ist falsch"
if request.is_json:
return jsonify({"error": error_msg}), 401
flash(error_msg, "error")
return redirect(url_for('user.settings'))
# Neues Passwort setzen
user.set_password(new_password)
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Passwort geändert für Benutzer {user.username}")
db_session.close()
success_msg = "Passwort erfolgreich geändert"
if request.is_json:
return jsonify({"success": True, "message": success_msg})
flash(success_msg, "success")
return redirect(url_for('user.settings'))
except Exception as e:
user_logger.error(f"Fehler beim Ändern des Passworts: {str(e)}")
error_msg = "Fehler beim Ändern des Passworts"
if request.is_json:
return jsonify({"error": error_msg}), 500
flash(error_msg, "error")
return redirect(url_for('user.settings'))
@user_blueprint.route("/export", methods=["GET"])
@login_required
def export_data():
"""Benutzerdaten exportieren (DSGVO-Compliance)"""
try:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('user.settings'))
# Benutzerdaten sammeln
user_data = {
"personal_information": {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"role": user.role,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_activity": user.last_activity.isoformat() if user.last_activity else None
},
"settings": json.loads(user.settings) if user.settings else {},
"jobs": [],
"export_date": datetime.now().isoformat(),
"export_note": "Dies ist ein Export Ihrer persönlichen Daten gemäß DSGVO Art. 20"
}
# Benutzer-Jobs sammeln (falls verfügbar)
try:
from models import Job
user_jobs = db_session.query(Job).filter(Job.user_id == user.id).all()
for job in user_jobs:
user_data["jobs"].append({
"id": job.id,
"filename": job.filename,
"status": job.status,
"created_at": job.created_at.isoformat() if job.created_at else None,
"estimated_duration": job.estimated_duration,
"material_used": job.material_used,
"notes": job.notes
})
except Exception as job_error:
user_logger.warning(f"Fehler beim Sammeln der Job-Daten: {str(job_error)}")
db_session.close()
# JSON-Response erstellen
response = make_response(jsonify(user_data))
response.headers['Content-Disposition'] = f'attachment; filename=user_data_{user.username}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
response.headers['Content-Type'] = 'application/json'
user_logger.info(f"Datenexport erstellt für Benutzer {user.username}")
return response
except Exception as e:
user_logger.error(f"Fehler beim Datenexport: {str(e)}")
flash("Fehler beim Erstellen des Datenexports", "error")
return redirect(url_for('user.settings'))
@user_blueprint.route("/profile", methods=["PUT"])
@login_required
def update_profile_api():
"""API-Endpunkt für Profil-Updates"""
try:
data = request.get_json()
db_session = get_db_session()
user = db_session.query(User).filter(User.id == current_user.id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Aktualisierbare Felder (ohne sensitive Daten)
updatable_fields = ['name', 'email', 'department', 'position', 'phone', 'bio']
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
user.updated_at = datetime.now()
db_session.commit()
user_logger.info(f"Profil über API aktualisiert für Benutzer {user.username}")
# Aktuelle Benutzerdaten zurückgeben
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"role": user.role,
"updated_at": user.updated_at.isoformat()
}
db_session.close()
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert",
"user": user_data
})
except Exception as e:
user_logger.error(f"Fehler beim API-Profil-Update: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Profils"}), 500