""" Vereinheitlichtes User-Management-Blueprint für das MYP System Konsolidierte Implementierung aller benutzerbezogenen Funktionen: - Benutzer-Selbstverwaltung (ursprünglich user.py) - Administrative Benutzerverwaltung (ursprünglich users.py) - Vereinheitlichte API-Schnittstellen Funktionsbereiche: - /user/* - Selbstverwaltung für eingeloggte Benutzer - /admin/users/* - Administrative Benutzerverwaltung - /api/users/* - Unified API Layer Optimierungen: - Einheitliche Database-Session-Verwaltung - Konsistente Error-Handling und Logging - Vollständige API-Kompatibilität zu beiden ursprünglichen Blueprints Autor: MYP Team - Konsolidiert für IHK-Projektarbeit Datum: 2025-06-09 """ import json from datetime import datetime from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, make_response, abort from flask_login import login_required, current_user from werkzeug.security import check_password_hash from sqlalchemy.exc import SQLAlchemyError from functools import wraps from models import User, UserPermission, get_cached_session from utils.logging_config import get_logger # ===== BLUEPRINT-KONFIGURATION ===== # Hauptblueprint für User-Management users_blueprint = Blueprint('users', __name__) # Logger für verschiedene Funktionsbereiche user_logger = get_logger("user") users_logger = get_logger("users") # ===== DECORATOR-FUNKTIONEN ===== def users_admin_required(f): """ Decorator für Admin-Berechtigung bei Benutzerverwaltung. Erweitert den Standard-Admin-Check um spezifische User-Management-Rechte. """ @wraps(f) @login_required def decorated_function(*args, **kwargs): # Grundlegende Admin-Prüfung if not current_user.is_authenticated: users_logger.warning("Unauthenticated access attempt to user management") abort(401) # Admin-Status prüfen (doppelte Methode für Robustheit) is_admin = False if hasattr(current_user, 'is_admin') and current_user.is_admin: is_admin = True elif hasattr(current_user, 'role') and current_user.role == 'admin': is_admin = True if not is_admin: users_logger.warning(f"Non-admin user {current_user.id} attempted to access user management") abort(403) users_logger.info(f"Admin access granted to {current_user.username} for function {f.__name__}") return f(*args, **kwargs) return decorated_function # ===== BENUTZER-SELBSTVERWALTUNG (ursprünglich user.py) ===== @users_blueprint.route('/user/profile', methods=['GET']) @login_required def user_profile(): """Benutzerprofil-Seite anzeigen""" try: user_logger.info(f"User {current_user.username} accessed profile page") return render_template('profile.html', user=current_user) except Exception as e: user_logger.error(f"Error loading profile page: {str(e)}") flash("Fehler beim Laden des Profils", "error") return redirect(url_for('dashboard')) @users_blueprint.route('/user/settings', methods=['GET']) @login_required def user_settings(): """Benutzereinstellungen-Seite anzeigen""" try: user_logger.info(f"User {current_user.username} accessed settings page") return render_template('settings.html', user=current_user) except Exception as e: user_logger.error(f"Error loading settings page: {str(e)}") flash("Fehler beim Laden der Einstellungen", "error") return redirect(url_for('dashboard')) @users_blueprint.route('/user/update-profile', methods=['POST']) @login_required def update_profile_form(): """Profil via Formular aktualisieren""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user: flash("Benutzer nicht gefunden", "error") return redirect(url_for('users.user_profile')) # Formular-Daten extrahieren user.name = request.form.get('name', user.name) user.email = request.form.get('email', user.email) user.department = request.form.get('department', user.department) user.position = request.form.get('position', user.position) user.phone = request.form.get('phone', user.phone) user.bio = request.form.get('bio', user.bio) user.updated_at = datetime.now() session.commit() user_logger.info(f"User {user.username} updated profile via form") flash("Profil erfolgreich aktualisiert", "success") return redirect(url_for('users.user_profile')) except Exception as e: user_logger.error(f"Error updating profile via form: {str(e)}") flash("Fehler beim Aktualisieren des Profils", "error") return redirect(url_for('users.user_profile')) @users_blueprint.route('/user/profile', methods=['PUT']) @login_required def update_profile_api(): """Profil via API aktualisieren""" try: data = request.get_json() with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 # API-Daten verarbeiten updatable_fields = ['name', 'email', 'department', 'position', 'phone', 'bio'] for field in updatable_fields: if field in data: setattr(user, field, data[field]) user.updated_at = datetime.now() session.commit() user_logger.info(f"User {user.username} updated profile via API") return jsonify({ "success": True, "message": "Profil erfolgreich aktualisiert" }) except Exception as e: user_logger.error(f"Error updating profile via API: {str(e)}") return jsonify({"error": "Fehler beim Aktualisieren des Profils"}), 500 @users_blueprint.route('/api/user/settings', methods=['GET', 'POST']) @login_required def user_settings_api(): """Benutzereinstellungen via API abrufen oder aktualisieren""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 if request.method == 'GET': # Einstellungen abrufen settings = { 'theme_preference': getattr(user, 'theme_preference', 'auto'), 'language_preference': getattr(user, 'language_preference', 'de'), 'email_notifications': getattr(user, 'email_notifications', True), 'browser_notifications': getattr(user, 'browser_notifications', True), 'dashboard_layout': getattr(user, 'dashboard_layout', 'default'), 'compact_mode': getattr(user, 'compact_mode', False), 'show_completed_jobs': getattr(user, 'show_completed_jobs', True), 'auto_refresh_interval': getattr(user, 'auto_refresh_interval', 30), 'privacy': { 'auto_logout': getattr(user, 'auto_logout_timeout', 0) } } user_logger.info(f"User {user.username} retrieved settings via API") return jsonify({ "success": True, "settings": settings }) elif request.method == 'POST': # Einstellungen aktualisieren data = request.get_json() # Einstellungen aktualisieren settings_fields = [ 'theme_preference', 'language_preference', 'email_notifications', 'browser_notifications', 'dashboard_layout', 'compact_mode', 'show_completed_jobs', 'auto_refresh_interval' ] for field in settings_fields: if field in data: setattr(user, field, data[field]) # Privacy-Einstellungen if 'privacy' in data and isinstance(data['privacy'], dict): if 'auto_logout' in data['privacy']: user.auto_logout_timeout = data['privacy']['auto_logout'] user.updated_at = datetime.now() session.commit() user_logger.info(f"User {user.username} updated settings via API") return jsonify({ "success": True, "message": "Einstellungen erfolgreich aktualisiert" }) except Exception as e: user_logger.error(f"Error handling user settings API: {str(e)}") return jsonify({"error": "Fehler beim Verarbeiten der Einstellungen"}), 500 @users_blueprint.route('/user/api/update-settings', methods=['POST']) @login_required def update_settings_api(): """Benutzereinstellungen via API aktualisieren (Legacy-Kompatibilität)""" try: data = request.get_json() with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Einstellungen aktualisieren settings_fields = [ 'theme_preference', 'language_preference', 'email_notifications', 'browser_notifications', 'dashboard_layout', 'compact_mode', 'show_completed_jobs', 'auto_refresh_interval' ] for field in settings_fields: if field in data: setattr(user, field, data[field]) user.updated_at = datetime.now() session.commit() user_logger.info(f"User {user.username} updated settings via API") return jsonify({ "success": True, "message": "Einstellungen erfolgreich aktualisiert" }) except Exception as e: user_logger.error(f"Error updating settings via API: {str(e)}") return jsonify({"error": "Fehler beim Aktualisieren der Einstellungen"}), 500 @users_blueprint.route('/user/update-settings', methods=['POST']) @login_required def update_settings_form(): """Benutzereinstellungen via Formular aktualisieren""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user: flash("Benutzer nicht gefunden", "error") return redirect(url_for('users.user_settings')) # Formular-Einstellungen verarbeiten user.theme_preference = request.form.get('theme_preference', user.theme_preference) user.language_preference = request.form.get('language_preference', user.language_preference) user.email_notifications = 'email_notifications' in request.form user.browser_notifications = 'browser_notifications' in request.form user.dashboard_layout = request.form.get('dashboard_layout', user.dashboard_layout) user.compact_mode = 'compact_mode' in request.form user.show_completed_jobs = 'show_completed_jobs' in request.form auto_refresh = request.form.get('auto_refresh_interval') if auto_refresh: user.auto_refresh_interval = int(auto_refresh) user.updated_at = datetime.now() session.commit() user_logger.info(f"User {user.username} updated settings via form") flash("Einstellungen erfolgreich aktualisiert", "success") return redirect(url_for('users.user_settings')) except Exception as e: user_logger.error(f"Error updating settings via form: {str(e)}") flash("Fehler beim Aktualisieren der Einstellungen", "error") return redirect(url_for('users.user_settings')) @users_blueprint.route('/user/change-password', methods=['POST']) @login_required def change_password(): """Passwort ändern (unterstützt Form und JSON)""" try: # Daten aus Request extrahieren (Form oder JSON) if request.is_json: data = request.get_json() current_password = data.get('current_password') new_password = data.get('new_password') confirm_password = data.get('confirm_password') else: current_password = request.form.get('current_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') # Validierung if not all([current_password, new_password, confirm_password]): error_msg = "Alle Passwort-Felder sind erforderlich" if request.is_json: return jsonify({"error": error_msg}), 400 flash(error_msg, "error") return redirect(url_for('users.user_settings')) if new_password != confirm_password: error_msg = "Neue Passwörter stimmen nicht überein" if request.is_json: return jsonify({"error": error_msg}), 400 flash(error_msg, "error") return redirect(url_for('users.user_settings')) if len(new_password) < 8: error_msg = "Neues Passwort muss mindestens 8 Zeichen lang sein" if request.is_json: return jsonify({"error": error_msg}), 400 flash(error_msg, "error") return redirect(url_for('users.user_settings')) # Aktuelles Passwort prüfen with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user or not check_password_hash(user.password_hash, current_password): error_msg = "Aktuelles Passwort ist falsch" if request.is_json: return jsonify({"error": error_msg}), 400 flash(error_msg, "error") return redirect(url_for('users.user_settings')) # Neues Passwort setzen user.set_password(new_password) user.updated_at = datetime.now() session.commit() user_logger.info(f"User {user.username} changed password successfully") success_msg = "Passwort erfolgreich geändert" if request.is_json: return jsonify({"success": True, "message": success_msg}) flash(success_msg, "success") return redirect(url_for('users.user_settings')) except Exception as e: user_logger.error(f"Error changing password: {str(e)}") error_msg = "Fehler beim Ändern des Passworts" if request.is_json: return jsonify({"error": error_msg}), 500 flash(error_msg, "error") return redirect(url_for('users.user_settings')) @users_blueprint.route('/user/export', methods=['GET']) @login_required def export_user_data(): """DSGVO-konformer Datenexport für Benutzer""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == current_user.id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Umfassende Benutzerdaten sammeln user_data = { "export_info": { "generated_at": datetime.now().isoformat(), "user_id": user.id, "export_type": "DSGVO_complete_data_export" }, "personal_data": { "username": user.username, "email": user.email, "name": user.name, "department": user.department, "position": user.position, "phone": user.phone, "bio": user.bio, "role": user.role, "active": user.active, "created_at": user.created_at.isoformat() if user.created_at else None, "updated_at": user.updated_at.isoformat() if user.updated_at else None, "last_login": user.last_login.isoformat() if user.last_login else None }, "preferences": { "theme_preference": user.theme_preference, "language_preference": user.language_preference, "email_notifications": user.email_notifications, "browser_notifications": user.browser_notifications, "dashboard_layout": user.dashboard_layout, "compact_mode": user.compact_mode, "show_completed_jobs": user.show_completed_jobs, "auto_refresh_interval": user.auto_refresh_interval }, "system_info": { "total_jobs_created": len(user.jobs) if hasattr(user, 'jobs') else 0, "account_status": "active" if user.active else "inactive" } } # Jobs-Daten hinzufügen (falls verfügbar) if hasattr(user, 'jobs'): user_data["job_history"] = [] for job in user.jobs: job_info = { "id": job.id, "title": job.title, "status": job.status, "created_at": job.created_at.isoformat() if job.created_at else None, "updated_at": job.updated_at.isoformat() if job.updated_at else None } user_data["job_history"].append(job_info) # JSON-Response mit Download-Headers erstellen response = make_response(jsonify(user_data)) response.headers['Content-Type'] = 'application/json; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename=user_data_export_{user.username}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' user_logger.info(f"User {user.username} exported personal data (DSGVO)") return response except Exception as e: user_logger.error(f"Error exporting user data: {str(e)}") return jsonify({"error": "Fehler beim Exportieren der Benutzerdaten"}), 500 # ===== ADMINISTRATIVE BENUTZERVERWALTUNG (ursprünglich users.py) ===== @users_blueprint.route('/admin/users//permissions', methods=['GET']) @users_admin_required def user_permissions_page(user_id): """Admin-Seite für Benutzerberechtigungen""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == user_id).first() if not user: flash("Benutzer nicht gefunden", "error") return redirect(url_for('admin.users_overview')) # UserPermissions laden oder erstellen permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first() if not permissions: permissions = UserPermission(user_id=user_id) session.add(permissions) session.commit() users_logger.info(f"Admin {current_user.username} accessed permissions for user {user.username}") return render_template('admin/user_permissions.html', user=user, permissions=permissions) except Exception as e: users_logger.error(f"Error loading user permissions page: {str(e)}") flash("Fehler beim Laden der Benutzerberechtigungen", "error") return redirect(url_for('admin.users_overview')) @users_blueprint.route('/api/users//permissions', methods=['GET']) @users_admin_required def get_user_permissions_api(user_id): """API-Endpunkt für Benutzerberechtigungen""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first() if not permissions: permissions = UserPermission(user_id=user_id) session.add(permissions) session.commit() permissions_data = { "user_id": user_id, "username": user.username, "can_start_jobs": permissions.can_start_jobs, "needs_approval": permissions.needs_approval, "can_approve_jobs": permissions.can_approve_jobs } return jsonify(permissions_data) except Exception as e: users_logger.error(f"Error getting user permissions via API: {str(e)}") return jsonify({"error": "Fehler beim Abrufen der Benutzerberechtigungen"}), 500 @users_blueprint.route('/api/users//permissions', methods=['PUT']) @users_admin_required def update_user_permissions_api(user_id): """Benutzerberechtigungen via API aktualisieren""" try: data = request.get_json() with get_cached_session() as session: user = session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first() if not permissions: permissions = UserPermission(user_id=user_id) session.add(permissions) # Berechtigungen aktualisieren if 'can_start_jobs' in data: permissions.can_start_jobs = data['can_start_jobs'] if 'needs_approval' in data: permissions.needs_approval = data['needs_approval'] if 'can_approve_jobs' in data: permissions.can_approve_jobs = data['can_approve_jobs'] session.commit() users_logger.info(f"Admin {current_user.username} updated permissions for user {user.username}") return jsonify({ "success": True, "message": "Berechtigungen erfolgreich aktualisiert" }) except Exception as e: users_logger.error(f"Error updating user permissions via API: {str(e)}") return jsonify({"error": "Fehler beim Aktualisieren der Berechtigungen"}), 500 @users_blueprint.route('/admin/users//permissions/update', methods=['POST']) @users_admin_required def update_user_permissions_form(user_id): """Benutzerberechtigungen via Formular aktualisieren""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == user_id).first() if not user: flash("Benutzer nicht gefunden", "error") return redirect(url_for('admin.users_overview')) permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first() if not permissions: permissions = UserPermission(user_id=user_id) session.add(permissions) # Formular-Daten verarbeiten permissions.can_start_jobs = 'can_start_jobs' in request.form permissions.needs_approval = 'needs_approval' in request.form permissions.can_approve_jobs = 'can_approve_jobs' in request.form session.commit() users_logger.info(f"Admin {current_user.username} updated permissions for user {user.username} via form") flash("Berechtigungen erfolgreich aktualisiert", "success") return redirect(url_for('users.user_permissions_page', user_id=user_id)) except Exception as e: users_logger.error(f"Error updating user permissions via form: {str(e)}") flash("Fehler beim Aktualisieren der Berechtigungen", "error") return redirect(url_for('users.user_permissions_page', user_id=user_id)) @users_blueprint.route('/admin/users//edit/permissions', methods=['GET']) @users_admin_required def edit_user_permissions_section(user_id): """Berechtigungsbereich für Benutzer bearbeiten""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first() if not permissions: permissions = UserPermission(user_id=user_id) return render_template('admin/edit_user_permissions_section.html', user=user, permissions=permissions) except Exception as e: users_logger.error(f"Error loading user permissions edit section: {str(e)}") return jsonify({"error": "Fehler beim Laden der Berechtigungsbearbeitung"}), 500 @users_blueprint.route('/api/users/', methods=['GET']) @users_admin_required def get_user_details_api(user_id): """API-Endpunkt für detaillierte Benutzerdaten""" try: with get_cached_session() as session: user = session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 user_data = { "id": user.id, "username": user.username, "email": user.email, "name": user.name, "role": user.role, "active": user.active, "created_at": user.created_at.isoformat() if user.created_at else None, "last_login": user.last_login.isoformat() if user.last_login else None, "department": user.department, "position": user.position, "phone": user.phone, "bio": user.bio, "theme_preference": user.theme_preference, "language_preference": user.language_preference, "email_notifications": user.email_notifications, "browser_notifications": user.browser_notifications } return jsonify(user_data) except Exception as e: users_logger.error(f"Error getting user details via API: {str(e)}") return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 500