🎉 Refactored backend structure: Removed unused files including app_cleaned.py, admin_api.py, admin.py, user.py, and others. Updated settings.local.json to include additional Bash commands. Enhanced admin templates for better navigation and functionality. Improved logging and error handling across various modules.

This commit is contained in:
2025-06-09 19:33:06 +02:00
parent 876b5a64e4
commit c7f9738bbe
115 changed files with 23507 additions and 9958 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,225 @@
"""
Einfache API-Endpunkte für Tapo-Steckdosen-Steuerung
Minimale REST-API für externe Zugriffe
"""
from flask import Blueprint, jsonify, request
from flask_login import login_required, current_user
import ipaddress
import time
from utils.tapo_controller import tapo_controller
from utils.logging_config import get_logger
from utils.permissions import require_permission, Permission
from models import get_db_session, Printer
# Blueprint initialisieren
api_blueprint = Blueprint('api', __name__, url_prefix='/api/v1')
# Logger konfigurieren
api_logger = get_logger("api_simple")
@api_blueprint.route("/tapo/outlets", methods=["GET"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def list_outlets():
"""Listet alle verfügbaren Tapo-Steckdosen auf."""
try:
db_session = get_db_session()
printers_with_tapo = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
outlets = []
for printer in printers_with_tapo:
outlets.append({
'id': printer.id,
'name': printer.name,
'ip': printer.plug_ip,
'location': printer.location or "Unbekannt",
'model': printer.model or "P110"
})
db_session.close()
return jsonify({
'success': True,
'outlets': outlets,
'count': len(outlets),
'timestamp': time.time()
})
except Exception as e:
api_logger.error(f"Fehler beim Auflisten der Outlets: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_blueprint.route("/tapo/outlet/<ip>/status", methods=["GET"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def get_outlet_status_api(ip):
"""Holt den Status einer spezifischen Steckdose."""
try:
# IP validieren
try:
ipaddress.ip_address(ip)
except ValueError:
return jsonify({
'success': False,
'error': 'Ungültige IP-Adresse'
}), 400
# Status prüfen
reachable, status = tapo_controller.check_outlet_status(ip)
return jsonify({
'success': True,
'ip': ip,
'status': status,
'reachable': reachable,
'timestamp': time.time()
})
except Exception as e:
api_logger.error(f"API Fehler beim Status-Check für {ip}: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_blueprint.route("/tapo/outlet/<ip>/control", methods=["POST"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def control_outlet_api(ip):
"""Schaltet eine Steckdose ein oder aus."""
try:
# IP validieren
try:
ipaddress.ip_address(ip)
except ValueError:
return jsonify({
'success': False,
'error': 'Ungültige IP-Adresse'
}), 400
# Aktion aus Request Body
data = request.get_json()
if not data or 'action' not in data:
return jsonify({
'success': False,
'error': 'Aktion erforderlich (on/off)'
}), 400
action = data['action']
if action not in ['on', 'off']:
return jsonify({
'success': False,
'error': 'Ungültige Aktion. Nur "on" oder "off" erlaubt.'
}), 400
# Steckdose schalten
state = action == 'on'
success = tapo_controller.toggle_plug(ip, state)
if success:
action_text = "eingeschaltet" if state else "ausgeschaltet"
api_logger.info(f"✅ API: Steckdose {ip} {action_text} durch {current_user.name}")
return jsonify({
'success': True,
'ip': ip,
'action': action,
'message': f'Steckdose {action_text}',
'timestamp': time.time()
})
else:
return jsonify({
'success': False,
'error': f'Fehler beim Schalten der Steckdose'
}), 500
except Exception as e:
api_logger.error(f"API Fehler beim Schalten von {ip}: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_blueprint.route("/tapo/status/all", methods=["GET"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def get_all_status_api():
"""Holt den Status aller Steckdosen."""
try:
db_session = get_db_session()
printers_with_tapo = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
outlets = []
online_count = 0
for printer in printers_with_tapo:
try:
reachable, status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
if reachable:
online_count += 1
outlets.append({
'id': printer.id,
'name': printer.name,
'ip': printer.plug_ip,
'location': printer.location or "Unbekannt",
'status': status,
'reachable': reachable
})
except Exception as e:
api_logger.warning(f"API Status-Check für {printer.plug_ip} fehlgeschlagen: {e}")
outlets.append({
'id': printer.id,
'name': printer.name,
'ip': printer.plug_ip,
'location': printer.location or "Unbekannt",
'status': 'error',
'reachable': False,
'error': str(e)
})
db_session.close()
return jsonify({
'success': True,
'outlets': outlets,
'summary': {
'total': len(outlets),
'online': online_count,
'offline': len(outlets) - online_count
},
'timestamp': time.time()
})
except Exception as e:
api_logger.error(f"API Fehler beim Abrufen aller Status: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_blueprint.route("/health", methods=["GET"])
def health_check():
"""API Gesundheitscheck."""
return jsonify({
'success': True,
'service': 'MYP Platform Tapo API',
'version': '1.0',
'timestamp': time.time()
})

View File

@@ -0,0 +1,387 @@
"""
Tapo-Steckdosen-Steuerung Blueprint
Eigenständige Web-Interface für direkte Tapo-Steckdosen-Kontrolle
"""
from flask import Blueprint, render_template, request, jsonify, flash, redirect, url_for
from flask_login import login_required, current_user
from datetime import datetime
import ipaddress
import time
from blueprints.admin_unified import admin_required
from utils.tapo_controller import tapo_controller
from utils.logging_config import get_logger
from utils.performance_tracker import measure_execution_time
from utils.permissions import require_permission, Permission
from models import get_db_session, Printer
# Blueprint initialisieren
tapo_blueprint = Blueprint('tapo', __name__, url_prefix='/tapo')
# Logger konfigurieren
tapo_logger = get_logger("tapo_control")
@tapo_blueprint.route("/")
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def tapo_dashboard():
"""Haupt-Dashboard für Tapo-Steckdosen-Steuerung."""
try:
tapo_logger.info(f"Tapo Dashboard aufgerufen von Benutzer: {current_user.name}")
# Alle konfigurierten Tapo-Steckdosen aus der Datenbank laden
db_session = get_db_session()
printers_with_tapo = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
# Status aller Steckdosen abrufen - auch wenn sie nicht erreichbar sind
outlets_status = {}
online_count = 0
for printer in printers_with_tapo:
try:
tapo_logger.debug(f"Prüfe Tapo-Steckdose für Drucker {printer.name} ({printer.plug_ip})")
reachable, status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
if reachable:
online_count += 1
tapo_logger.info(f"✅ Tapo-Steckdose {printer.plug_ip} erreichbar - Status: {status}")
else:
tapo_logger.warning(f"⚠️ Tapo-Steckdose {printer.plug_ip} nicht erreichbar")
outlets_status[printer.plug_ip] = {
'printer_name': printer.name,
'printer_id': printer.id,
'ip': printer.plug_ip,
'status': status,
'reachable': reachable,
'location': printer.location or "Unbekannt"
}
except Exception as e:
tapo_logger.error(f"❌ Fehler beim Status-Check für {printer.plug_ip}: {e}")
outlets_status[printer.plug_ip] = {
'printer_name': printer.name,
'printer_id': printer.id,
'ip': printer.plug_ip,
'status': 'error',
'reachable': False,
'location': printer.location or "Unbekannt",
'error': str(e)
}
db_session.close()
tapo_logger.info(f"Dashboard geladen: {len(outlets_status)} Steckdosen, {online_count} online")
return render_template('tapo_control.html',
outlets=outlets_status,
total_outlets=len(outlets_status),
online_outlets=online_count)
except Exception as e:
tapo_logger.error(f"Fehler beim Laden des Tapo-Dashboards: {e}")
flash(f"Fehler beim Laden der Tapo-Steckdosen: {str(e)}", "error")
return render_template('tapo_control.html', outlets={}, total_outlets=0, online_outlets=0)
@tapo_blueprint.route("/control", methods=["POST"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
@measure_execution_time(logger=tapo_logger, task_name="Tapo-Steckdosen-Steuerung")
def control_outlet():
"""Schaltet eine Tapo-Steckdose direkt ein oder aus."""
try:
data = request.get_json()
ip = data.get('ip')
action = data.get('action') # 'on' oder 'off'
if not ip or not action:
return jsonify({
'success': False,
'error': 'IP-Adresse und Aktion sind erforderlich'
}), 400
# IP-Adresse validieren
try:
ipaddress.ip_address(ip)
except ValueError:
return jsonify({
'success': False,
'error': 'Ungültige IP-Adresse'
}), 400
if action not in ['on', 'off']:
return jsonify({
'success': False,
'error': 'Ungültige Aktion. Nur "on" oder "off" erlaubt.'
}), 400
# Steckdose schalten
state = action == 'on'
success = tapo_controller.toggle_plug(ip, state)
if success:
action_text = "eingeschaltet" if state else "ausgeschaltet"
tapo_logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich {action_text} durch {current_user.name}")
return jsonify({
'success': True,
'message': f'Steckdose {ip} erfolgreich {action_text}',
'ip': ip,
'action': action,
'timestamp': datetime.now().isoformat(),
'user': current_user.name
})
else:
action_text = "einschalten" if state else "ausschalten"
tapo_logger.error(f"❌ Fehler beim {action_text} der Steckdose {ip}")
return jsonify({
'success': False,
'error': f'Fehler beim {action_text} der Steckdose {ip}'
}), 500
except Exception as e:
tapo_logger.error(f"Unerwarteter Fehler bei Steckdosen-Steuerung: {e}")
return jsonify({
'success': False,
'error': f'Interner Serverfehler: {str(e)}'
}), 500
@tapo_blueprint.route("/status/<ip>", methods=["GET"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def get_outlet_status(ip):
"""Holt den aktuellen Status einer spezifischen Tapo-Steckdose."""
try:
# IP-Adresse validieren
try:
ipaddress.ip_address(ip)
except ValueError:
return jsonify({
'success': False,
'error': 'Ungültige IP-Adresse'
}), 400
# Status prüfen
reachable, status = tapo_controller.check_outlet_status(ip)
return jsonify({
'success': True,
'ip': ip,
'status': status,
'reachable': reachable,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
tapo_logger.error(f"Fehler beim Abrufen des Status für {ip}: {e}")
return jsonify({
'success': False,
'error': f'Fehler beim Status-Check: {str(e)}'
}), 500
@tapo_blueprint.route("/discover", methods=["POST"])
@login_required
@admin_required
@measure_execution_time(logger=tapo_logger, task_name="Tapo-Steckdosen-Erkennung")
def discover_outlets():
"""Startet die automatische Erkennung von Tapo-Steckdosen."""
try:
tapo_logger.info(f"🔍 Starte Tapo-Steckdosen-Erkennung auf Anfrage von {current_user.name}")
# Discovery starten
results = tapo_controller.auto_discover_outlets()
success_count = sum(1 for success in results.values() if success)
total_count = len(results)
return jsonify({
'success': True,
'message': f'Erkennung abgeschlossen: {success_count}/{total_count} Steckdosen gefunden',
'results': results,
'discovered_count': success_count,
'total_tested': total_count,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
tapo_logger.error(f"Fehler bei der Steckdosen-Erkennung: {e}")
return jsonify({
'success': False,
'error': f'Fehler bei der Erkennung: {str(e)}'
}), 500
@tapo_blueprint.route("/test/<ip>", methods=["POST"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def test_connection(ip):
"""Testet die Verbindung zu einer spezifischen Tapo-Steckdose."""
try:
# IP-Adresse validieren
try:
ipaddress.ip_address(ip)
except ValueError:
return jsonify({
'success': False,
'error': 'Ungültige IP-Adresse'
}), 400
# Verbindung testen
test_result = tapo_controller.test_connection(ip)
return jsonify({
'success': True,
'ip': ip,
'test_result': test_result,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
tapo_logger.error(f"Fehler beim Verbindungstest für {ip}: {e}")
return jsonify({
'success': False,
'error': f'Fehler beim Verbindungstest: {str(e)}'
}), 500
@tapo_blueprint.route("/all-status", methods=["GET"])
@login_required
@require_permission(Permission.CONTROL_PRINTER)
def get_all_status():
"""Holt den Status aller konfigurierten Tapo-Steckdosen."""
try:
# Alle Tapo-Steckdosen aus der Datenbank laden
db_session = get_db_session()
printers_with_tapo = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
all_status = {}
online_count = 0
total_count = len(printers_with_tapo)
tapo_logger.info(f"Status-Abfrage für {total_count} Tapo-Steckdosen gestartet")
for printer in printers_with_tapo:
try:
tapo_logger.debug(f"Prüfe Status für {printer.plug_ip} ({printer.name})")
reachable, status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
if reachable:
online_count += 1
tapo_logger.debug(f"{printer.plug_ip} erreichbar - Status: {status}")
else:
tapo_logger.debug(f"⚠️ {printer.plug_ip} nicht erreichbar")
all_status[printer.plug_ip] = {
'printer_name': printer.name,
'printer_id': printer.id,
'status': status,
'reachable': reachable,
'location': printer.location or "Unbekannt",
'last_checked': time.time()
}
except Exception as e:
tapo_logger.warning(f"❌ Status-Check für {printer.plug_ip} fehlgeschlagen: {e}")
all_status[printer.plug_ip] = {
'printer_name': printer.name,
'printer_id': printer.id,
'status': 'error',
'reachable': False,
'location': printer.location or "Unbekannt",
'error': str(e),
'last_checked': time.time()
}
db_session.close()
# Zusammenfassung loggen
tapo_logger.info(f"Status-Abfrage abgeschlossen: {online_count}/{total_count} Steckdosen erreichbar")
response_data = {
'success': True,
'outlets': all_status,
'summary': {
'total': total_count,
'online': online_count,
'offline': total_count - online_count,
'last_update': time.time()
},
'timestamp': datetime.now().isoformat()
}
return jsonify(response_data)
except Exception as e:
tapo_logger.error(f"Fehler beim Abrufen aller Tapo-Status: {e}")
return jsonify({
'success': False,
'error': str(e),
'outlets': {},
'summary': {'total': 0, 'online': 0, 'offline': 0}
}), 500
@tapo_blueprint.route("/manual-control", methods=["GET", "POST"])
@login_required
@admin_required
def manual_control():
"""Manuelle Steuerung für beliebige IP-Adressen (Admin-only)."""
if request.method == 'GET':
return render_template('tapo_manual_control.html')
try:
ip = request.form.get('ip')
action = request.form.get('action')
if not ip or not action:
flash('IP-Adresse und Aktion sind erforderlich', 'error')
return redirect(url_for('tapo.manual_control'))
# IP-Adresse validieren
try:
ipaddress.ip_address(ip)
except ValueError:
flash('Ungültige IP-Adresse', 'error')
return redirect(url_for('tapo.manual_control'))
if action not in ['on', 'off', 'status']:
flash('Ungültige Aktion', 'error')
return redirect(url_for('tapo.manual_control'))
if action == 'status':
# Status abfragen
reachable, status = tapo_controller.check_outlet_status(ip)
if reachable:
flash(f'Steckdose {ip} ist {status.upper()}', 'success')
else:
flash(f'Steckdose {ip} ist nicht erreichbar', 'error')
else:
# Ein/Ausschalten
state = action == 'on'
success = tapo_controller.toggle_plug(ip, state)
if success:
action_text = "eingeschaltet" if state else "ausgeschaltet"
flash(f'Steckdose {ip} erfolgreich {action_text}', 'success')
tapo_logger.info(f"✅ Manuelle Steuerung: {ip} {action_text} durch {current_user.name}")
else:
action_text = "einschalten" if state else "ausschalten"
flash(f'Fehler beim {action_text} der Steckdose {ip}', 'error')
return redirect(url_for('tapo.manual_control'))
except Exception as e:
tapo_logger.error(f"Fehler bei manueller Steuerung: {e}")
flash(f'Fehler: {str(e)}', 'error')
return redirect(url_for('tapo.manual_control'))

View File

@@ -0,0 +1,626 @@
"""
Vereinheitlichtes User-Management-Blueprint für das MYP System
Konsolidierte Implementierung aller benutzerbezogenen Funktionen:
- Benutzer-Selbstverwaltung (ursprünglich user.py)
- Administrative Benutzerverwaltung (ursprünglich users.py)
- Vereinheitlichte API-Schnittstellen
Funktionsbereiche:
- /user/* - Selbstverwaltung für eingeloggte Benutzer
- /admin/users/* - Administrative Benutzerverwaltung
- /api/users/* - Unified API Layer
Optimierungen:
- Einheitliche Database-Session-Verwaltung
- Konsistente Error-Handling und Logging
- Vollständige API-Kompatibilität zu beiden ursprünglichen Blueprints
Autor: MYP Team - Konsolidiert für IHK-Projektarbeit
Datum: 2025-06-09
"""
import json
from datetime import datetime
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, make_response, abort
from flask_login import login_required, current_user
from werkzeug.security import check_password_hash
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
from models import User, UserPermission, get_cached_session
from utils.logging_config import get_logger
# ===== BLUEPRINT-KONFIGURATION =====
# Hauptblueprint für User-Management
users_blueprint = Blueprint('users', __name__)
# Logger für verschiedene Funktionsbereiche
user_logger = get_logger("user")
users_logger = get_logger("users")
# ===== DECORATOR-FUNKTIONEN =====
def users_admin_required(f):
"""
Decorator für Admin-Berechtigung bei Benutzerverwaltung.
Erweitert den Standard-Admin-Check um spezifische User-Management-Rechte.
"""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
# Grundlegende Admin-Prüfung
if not current_user.is_authenticated:
users_logger.warning("Unauthenticated access attempt to user management")
abort(401)
# Admin-Status prüfen (doppelte Methode für Robustheit)
is_admin = False
if hasattr(current_user, 'is_admin') and current_user.is_admin:
is_admin = True
elif hasattr(current_user, 'role') and current_user.role == 'admin':
is_admin = True
if not is_admin:
users_logger.warning(f"Non-admin user {current_user.id} attempted to access user management")
abort(403)
users_logger.info(f"Admin access granted to {current_user.username} for function {f.__name__}")
return f(*args, **kwargs)
return decorated_function
# ===== BENUTZER-SELBSTVERWALTUNG (ursprünglich user.py) =====
@users_blueprint.route('/user/profile', methods=['GET'])
@login_required
def user_profile():
"""Benutzerprofil-Seite anzeigen"""
try:
user_logger.info(f"User {current_user.username} accessed profile page")
return render_template('profile.html', user=current_user)
except Exception as e:
user_logger.error(f"Error loading profile page: {str(e)}")
flash("Fehler beim Laden des Profils", "error")
return redirect(url_for('dashboard'))
@users_blueprint.route('/user/settings', methods=['GET'])
@login_required
def user_settings():
"""Benutzereinstellungen-Seite anzeigen"""
try:
user_logger.info(f"User {current_user.username} accessed settings page")
return render_template('settings.html', user=current_user)
except Exception as e:
user_logger.error(f"Error loading settings page: {str(e)}")
flash("Fehler beim Laden der Einstellungen", "error")
return redirect(url_for('dashboard'))
@users_blueprint.route('/user/update-profile', methods=['POST'])
@login_required
def update_profile_form():
"""Profil via Formular aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('users.user_profile'))
# Formular-Daten extrahieren
user.name = request.form.get('name', user.name)
user.email = request.form.get('email', user.email)
user.department = request.form.get('department', user.department)
user.position = request.form.get('position', user.position)
user.phone = request.form.get('phone', user.phone)
user.bio = request.form.get('bio', user.bio)
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated profile via form")
flash("Profil erfolgreich aktualisiert", "success")
return redirect(url_for('users.user_profile'))
except Exception as e:
user_logger.error(f"Error updating profile via form: {str(e)}")
flash("Fehler beim Aktualisieren des Profils", "error")
return redirect(url_for('users.user_profile'))
@users_blueprint.route('/user/profile', methods=['PUT'])
@login_required
def update_profile_api():
"""Profil via API aktualisieren"""
try:
data = request.get_json()
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# API-Daten verarbeiten
updatable_fields = ['name', 'email', 'department', 'position', 'phone', 'bio']
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated profile via API")
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Error updating profile via API: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Profils"}), 500
@users_blueprint.route('/api/user/settings', methods=['GET', 'POST'])
@login_required
def user_settings_api():
"""Benutzereinstellungen via API abrufen oder aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
if request.method == 'GET':
# Einstellungen abrufen
settings = {
'theme_preference': getattr(user, 'theme_preference', 'auto'),
'language_preference': getattr(user, 'language_preference', 'de'),
'email_notifications': getattr(user, 'email_notifications', True),
'browser_notifications': getattr(user, 'browser_notifications', True),
'dashboard_layout': getattr(user, 'dashboard_layout', 'default'),
'compact_mode': getattr(user, 'compact_mode', False),
'show_completed_jobs': getattr(user, 'show_completed_jobs', True),
'auto_refresh_interval': getattr(user, 'auto_refresh_interval', 30),
'privacy': {
'auto_logout': getattr(user, 'auto_logout_timeout', 0)
}
}
user_logger.info(f"User {user.username} retrieved settings via API")
return jsonify({
"success": True,
"settings": settings
})
elif request.method == 'POST':
# Einstellungen aktualisieren
data = request.get_json()
# Einstellungen aktualisieren
settings_fields = [
'theme_preference', 'language_preference', 'email_notifications',
'browser_notifications', 'dashboard_layout', 'compact_mode',
'show_completed_jobs', 'auto_refresh_interval'
]
for field in settings_fields:
if field in data:
setattr(user, field, data[field])
# Privacy-Einstellungen
if 'privacy' in data and isinstance(data['privacy'], dict):
if 'auto_logout' in data['privacy']:
user.auto_logout_timeout = data['privacy']['auto_logout']
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated settings via API")
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Error handling user settings API: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Einstellungen"}), 500
@users_blueprint.route('/user/api/update-settings', methods=['POST'])
@login_required
def update_settings_api():
"""Benutzereinstellungen via API aktualisieren (Legacy-Kompatibilität)"""
try:
data = request.get_json()
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Einstellungen aktualisieren
settings_fields = [
'theme_preference', 'language_preference', 'email_notifications',
'browser_notifications', 'dashboard_layout', 'compact_mode',
'show_completed_jobs', 'auto_refresh_interval'
]
for field in settings_fields:
if field in data:
setattr(user, field, data[field])
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated settings via API")
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Error updating settings via API: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren der Einstellungen"}), 500
@users_blueprint.route('/user/update-settings', methods=['POST'])
@login_required
def update_settings_form():
"""Benutzereinstellungen via Formular aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('users.user_settings'))
# Formular-Einstellungen verarbeiten
user.theme_preference = request.form.get('theme_preference', user.theme_preference)
user.language_preference = request.form.get('language_preference', user.language_preference)
user.email_notifications = 'email_notifications' in request.form
user.browser_notifications = 'browser_notifications' in request.form
user.dashboard_layout = request.form.get('dashboard_layout', user.dashboard_layout)
user.compact_mode = 'compact_mode' in request.form
user.show_completed_jobs = 'show_completed_jobs' in request.form
auto_refresh = request.form.get('auto_refresh_interval')
if auto_refresh:
user.auto_refresh_interval = int(auto_refresh)
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated settings via form")
flash("Einstellungen erfolgreich aktualisiert", "success")
return redirect(url_for('users.user_settings'))
except Exception as e:
user_logger.error(f"Error updating settings via form: {str(e)}")
flash("Fehler beim Aktualisieren der Einstellungen", "error")
return redirect(url_for('users.user_settings'))
@users_blueprint.route('/user/change-password', methods=['POST'])
@login_required
def change_password():
"""Passwort ändern (unterstützt Form und JSON)"""
try:
# Daten aus Request extrahieren (Form oder JSON)
if request.is_json:
data = request.get_json()
current_password = data.get('current_password')
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
else:
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Validierung
if not all([current_password, new_password, confirm_password]):
error_msg = "Alle Passwort-Felder sind erforderlich"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
if new_password != confirm_password:
error_msg = "Neue Passwörter stimmen nicht überein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
if len(new_password) < 8:
error_msg = "Neues Passwort muss mindestens 8 Zeichen lang sein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
# Aktuelles Passwort prüfen
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user or not check_password_hash(user.password_hash, current_password):
error_msg = "Aktuelles Passwort ist falsch"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
# Neues Passwort setzen
user.set_password(new_password)
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} changed password successfully")
success_msg = "Passwort erfolgreich geändert"
if request.is_json:
return jsonify({"success": True, "message": success_msg})
flash(success_msg, "success")
return redirect(url_for('users.user_settings'))
except Exception as e:
user_logger.error(f"Error changing password: {str(e)}")
error_msg = "Fehler beim Ändern des Passworts"
if request.is_json:
return jsonify({"error": error_msg}), 500
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
@users_blueprint.route('/user/export', methods=['GET'])
@login_required
def export_user_data():
"""DSGVO-konformer Datenexport für Benutzer"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Umfassende Benutzerdaten sammeln
user_data = {
"export_info": {
"generated_at": datetime.now().isoformat(),
"user_id": user.id,
"export_type": "DSGVO_complete_data_export"
},
"personal_data": {
"username": user.username,
"email": user.email,
"name": user.name,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None
},
"preferences": {
"theme_preference": user.theme_preference,
"language_preference": user.language_preference,
"email_notifications": user.email_notifications,
"browser_notifications": user.browser_notifications,
"dashboard_layout": user.dashboard_layout,
"compact_mode": user.compact_mode,
"show_completed_jobs": user.show_completed_jobs,
"auto_refresh_interval": user.auto_refresh_interval
},
"system_info": {
"total_jobs_created": len(user.jobs) if hasattr(user, 'jobs') else 0,
"account_status": "active" if user.active else "inactive"
}
}
# Jobs-Daten hinzufügen (falls verfügbar)
if hasattr(user, 'jobs'):
user_data["job_history"] = []
for job in user.jobs:
job_info = {
"id": job.id,
"title": job.title,
"status": job.status,
"created_at": job.created_at.isoformat() if job.created_at else None,
"updated_at": job.updated_at.isoformat() if job.updated_at else None
}
user_data["job_history"].append(job_info)
# JSON-Response mit Download-Headers erstellen
response = make_response(jsonify(user_data))
response.headers['Content-Type'] = 'application/json; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename=user_data_export_{user.username}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
user_logger.info(f"User {user.username} exported personal data (DSGVO)")
return response
except Exception as e:
user_logger.error(f"Error exporting user data: {str(e)}")
return jsonify({"error": "Fehler beim Exportieren der Benutzerdaten"}), 500
# ===== ADMINISTRATIVE BENUTZERVERWALTUNG (ursprünglich users.py) =====
@users_blueprint.route('/admin/users/<int:user_id>/permissions', methods=['GET'])
@users_admin_required
def user_permissions_page(user_id):
"""Admin-Seite für Benutzerberechtigungen"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('admin.users_overview'))
# UserPermissions laden oder erstellen
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
session.commit()
users_logger.info(f"Admin {current_user.username} accessed permissions for user {user.username}")
return render_template('admin/user_permissions.html', user=user, permissions=permissions)
except Exception as e:
users_logger.error(f"Error loading user permissions page: {str(e)}")
flash("Fehler beim Laden der Benutzerberechtigungen", "error")
return redirect(url_for('admin.users_overview'))
@users_blueprint.route('/api/users/<int:user_id>/permissions', methods=['GET'])
@users_admin_required
def get_user_permissions_api(user_id):
"""API-Endpunkt für Benutzerberechtigungen"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
session.commit()
permissions_data = {
"user_id": user_id,
"username": user.username,
"can_start_jobs": permissions.can_start_jobs,
"needs_approval": permissions.needs_approval,
"can_approve_jobs": permissions.can_approve_jobs
}
return jsonify(permissions_data)
except Exception as e:
users_logger.error(f"Error getting user permissions via API: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen der Benutzerberechtigungen"}), 500
@users_blueprint.route('/api/users/<int:user_id>/permissions', methods=['PUT'])
@users_admin_required
def update_user_permissions_api(user_id):
"""Benutzerberechtigungen via API aktualisieren"""
try:
data = request.get_json()
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
# Berechtigungen aktualisieren
if 'can_start_jobs' in data:
permissions.can_start_jobs = data['can_start_jobs']
if 'needs_approval' in data:
permissions.needs_approval = data['needs_approval']
if 'can_approve_jobs' in data:
permissions.can_approve_jobs = data['can_approve_jobs']
session.commit()
users_logger.info(f"Admin {current_user.username} updated permissions for user {user.username}")
return jsonify({
"success": True,
"message": "Berechtigungen erfolgreich aktualisiert"
})
except Exception as e:
users_logger.error(f"Error updating user permissions via API: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren der Berechtigungen"}), 500
@users_blueprint.route('/admin/users/<int:user_id>/permissions/update', methods=['POST'])
@users_admin_required
def update_user_permissions_form(user_id):
"""Benutzerberechtigungen via Formular aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('admin.users_overview'))
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
# Formular-Daten verarbeiten
permissions.can_start_jobs = 'can_start_jobs' in request.form
permissions.needs_approval = 'needs_approval' in request.form
permissions.can_approve_jobs = 'can_approve_jobs' in request.form
session.commit()
users_logger.info(f"Admin {current_user.username} updated permissions for user {user.username} via form")
flash("Berechtigungen erfolgreich aktualisiert", "success")
return redirect(url_for('users.user_permissions_page', user_id=user_id))
except Exception as e:
users_logger.error(f"Error updating user permissions via form: {str(e)}")
flash("Fehler beim Aktualisieren der Berechtigungen", "error")
return redirect(url_for('users.user_permissions_page', user_id=user_id))
@users_blueprint.route('/admin/users/<int:user_id>/edit/permissions', methods=['GET'])
@users_admin_required
def edit_user_permissions_section(user_id):
"""Berechtigungsbereich für Benutzer bearbeiten"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
return render_template('admin/edit_user_permissions_section.html', user=user, permissions=permissions)
except Exception as e:
users_logger.error(f"Error loading user permissions edit section: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Berechtigungsbearbeitung"}), 500
@users_blueprint.route('/api/users/<int:user_id>', methods=['GET'])
@users_admin_required
def get_user_details_api(user_id):
"""API-Endpunkt für detaillierte Benutzerdaten"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"theme_preference": user.theme_preference,
"language_preference": user.language_preference,
"email_notifications": user.email_notifications,
"browser_notifications": user.browser_notifications
}
return jsonify(user_data)
except Exception as e:
users_logger.error(f"Error getting user details via API: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 500

View File

@@ -0,0 +1,664 @@
"""
Vereinheitlichtes User-Management-Blueprint für das MYP System
Konsolidierte Implementierung aller benutzerbezogenen Funktionen:
- Benutzer-Selbstverwaltung (ursprünglich user.py)
- Administrative Benutzerverwaltung (ursprünglich users.py)
- Vereinheitlichte API-Schnittstellen
Funktionsbereiche:
- /user/* - Selbstverwaltung für eingeloggte Benutzer
- /admin/users/* - Administrative Benutzerverwaltung
- /api/users/* - Unified API Layer
Optimierungen:
- Einheitliche Database-Session-Verwaltung
- Konsistente Error-Handling und Logging
- Vollständige API-Kompatibilität zu beiden ursprünglichen Blueprints
Autor: MYP Team - Konsolidiert für IHK-Projektarbeit
Datum: 2025-06-09
"""
import json
from datetime import datetime
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, make_response, abort
from flask_login import login_required, current_user
from werkzeug.security import check_password_hash
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
from models import User, UserPermission, get_cached_session
from utils.logging_config import get_logger
# ===== BLUEPRINT-KONFIGURATION =====
# Hauptblueprint für User-Management
users_blueprint = Blueprint('users', __name__)
# Logger für verschiedene Funktionsbereiche
user_logger = get_logger("user")
users_logger = get_logger("users")
# ===== DECORATOR-FUNKTIONEN =====
def users_admin_required(f):
"""
Decorator für Admin-Berechtigung bei Benutzerverwaltung.
Erweitert den Standard-Admin-Check um spezifische User-Management-Rechte.
"""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
# Grundlegende Admin-Prüfung
if not current_user.is_authenticated:
users_logger.warning("Unauthenticated access attempt to user management")
abort(401)
# Admin-Status prüfen (doppelte Methode für Robustheit)
is_admin = False
if hasattr(current_user, 'is_admin') and current_user.is_admin:
is_admin = True
elif hasattr(current_user, 'role') and current_user.role == 'admin':
is_admin = True
if not is_admin:
users_logger.warning(f"Non-admin user {current_user.id} attempted to access user management")
abort(403)
users_logger.info(f"Admin access granted to {current_user.username} for function {f.__name__}")
return f(*args, **kwargs)
return decorated_function
# ===== BENUTZER-SELBSTVERWALTUNG (ursprünglich user.py) =====
@users_blueprint.route('/user/profile', methods=['GET'])
@login_required
def user_profile():
"""Benutzerprofil-Seite anzeigen"""
try:
user_logger.info(f"User {current_user.username} accessed profile page")
return render_template('user/profile.html', user=current_user)
except Exception as e:
user_logger.error(f"Error loading profile page: {str(e)}")
flash("Fehler beim Laden des Profils", "error")
return redirect(url_for('dashboard'))
@users_blueprint.route('/user/settings', methods=['GET'])
@login_required
def user_settings():
"""Benutzereinstellungen-Seite anzeigen"""
try:
user_logger.info(f"User {current_user.username} accessed settings page")
return render_template('user/settings.html', user=current_user)
except Exception as e:
user_logger.error(f"Error loading settings page: {str(e)}")
flash("Fehler beim Laden der Einstellungen", "error")
return redirect(url_for('dashboard'))
@users_blueprint.route('/user/update-profile', methods=['POST'])
@login_required
def update_profile_form():
"""Profil via Formular aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('users.user_profile'))
# Formular-Daten extrahieren
user.name = request.form.get('name', user.name)
user.email = request.form.get('email', user.email)
user.department = request.form.get('department', user.department)
user.position = request.form.get('position', user.position)
user.phone = request.form.get('phone', user.phone)
user.bio = request.form.get('bio', user.bio)
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated profile via form")
flash("Profil erfolgreich aktualisiert", "success")
return redirect(url_for('users.user_profile'))
except Exception as e:
user_logger.error(f"Error updating profile via form: {str(e)}")
flash("Fehler beim Aktualisieren des Profils", "error")
return redirect(url_for('users.user_profile'))
@users_blueprint.route('/user/profile', methods=['PUT'])
@login_required
def update_profile_api():
"""Profil via API aktualisieren"""
try:
data = request.get_json()
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# API-Daten verarbeiten
updatable_fields = ['name', 'email', 'department', 'position', 'phone', 'bio']
for field in updatable_fields:
if field in data:
setattr(user, field, data[field])
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated profile via API")
return jsonify({
"success": True,
"message": "Profil erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Error updating profile via API: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Profils"}), 500
@users_blueprint.route('/api/user/settings', methods=['GET', 'POST'])
@login_required
def user_settings_api():
"""Benutzereinstellungen via API abrufen oder aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
if request.method == 'GET':
# Einstellungen abrufen
settings = {
'theme_preference': getattr(user, 'theme_preference', 'auto'),
'language_preference': getattr(user, 'language_preference', 'de'),
'email_notifications': getattr(user, 'email_notifications', True),
'browser_notifications': getattr(user, 'browser_notifications', True),
'dashboard_layout': getattr(user, 'dashboard_layout', 'default'),
'compact_mode': getattr(user, 'compact_mode', False),
'show_completed_jobs': getattr(user, 'show_completed_jobs', True),
'auto_refresh_interval': getattr(user, 'auto_refresh_interval', 30),
'privacy': {
'auto_logout': getattr(user, 'auto_logout_timeout', 0)
}
}
user_logger.info(f"User {user.username} retrieved settings via API")
return jsonify({
"success": True,
"settings": settings
})
elif request.method == 'POST':
# Einstellungen aktualisieren
data = request.get_json()
# Einstellungen aktualisieren
settings_fields = [
'theme_preference', 'language_preference', 'email_notifications',
'browser_notifications', 'dashboard_layout', 'compact_mode',
'show_completed_jobs', 'auto_refresh_interval'
]
for field in settings_fields:
if field in data:
setattr(user, field, data[field])
# Privacy-Einstellungen
if 'privacy' in data and isinstance(data['privacy'], dict):
if 'auto_logout' in data['privacy']:
user.auto_logout_timeout = data['privacy']['auto_logout']
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated settings via API")
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Error handling user settings API: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Einstellungen"}), 500
@users_blueprint.route('/user/api/update-settings', methods=['POST'])
@login_required
def update_settings_api():
"""Benutzereinstellungen via API aktualisieren (Legacy-Kompatibilität)"""
try:
data = request.get_json()
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Einstellungen aktualisieren
settings_fields = [
'theme_preference', 'language_preference', 'email_notifications',
'browser_notifications', 'dashboard_layout', 'compact_mode',
'show_completed_jobs', 'auto_refresh_interval'
]
for field in settings_fields:
if field in data:
setattr(user, field, data[field])
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated settings via API")
return jsonify({
"success": True,
"message": "Einstellungen erfolgreich aktualisiert"
})
except Exception as e:
user_logger.error(f"Error updating settings via API: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren der Einstellungen"}), 500
@users_blueprint.route('/user/update-settings', methods=['POST'])
@login_required
def update_settings_form():
"""Benutzereinstellungen via Formular aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('users.user_settings'))
# Formular-Einstellungen verarbeiten
user.theme_preference = request.form.get('theme_preference', user.theme_preference)
user.language_preference = request.form.get('language_preference', user.language_preference)
user.email_notifications = 'email_notifications' in request.form
user.browser_notifications = 'browser_notifications' in request.form
user.dashboard_layout = request.form.get('dashboard_layout', user.dashboard_layout)
user.compact_mode = 'compact_mode' in request.form
user.show_completed_jobs = 'show_completed_jobs' in request.form
auto_refresh = request.form.get('auto_refresh_interval')
if auto_refresh:
user.auto_refresh_interval = int(auto_refresh)
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} updated settings via form")
flash("Einstellungen erfolgreich aktualisiert", "success")
return redirect(url_for('users.user_settings'))
except Exception as e:
user_logger.error(f"Error updating settings via form: {str(e)}")
flash("Fehler beim Aktualisieren der Einstellungen", "error")
return redirect(url_for('users.user_settings'))
@users_blueprint.route('/user/change-password', methods=['POST'])
@login_required
def change_password():
"""Passwort ändern (unterstützt Form und JSON)"""
try:
# Daten aus Request extrahieren (Form oder JSON)
if request.is_json:
data = request.get_json()
current_password = data.get('current_password')
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
else:
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Validierung
if not all([current_password, new_password, confirm_password]):
error_msg = "Alle Passwort-Felder sind erforderlich"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
if new_password != confirm_password:
error_msg = "Neue Passwörter stimmen nicht überein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
if len(new_password) < 8:
error_msg = "Neues Passwort muss mindestens 8 Zeichen lang sein"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
# Aktuelles Passwort prüfen
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user or not check_password_hash(user.password_hash, current_password):
error_msg = "Aktuelles Passwort ist falsch"
if request.is_json:
return jsonify({"error": error_msg}), 400
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
# Neues Passwort setzen
user.set_password(new_password)
user.updated_at = datetime.now()
session.commit()
user_logger.info(f"User {user.username} changed password successfully")
success_msg = "Passwort erfolgreich geändert"
if request.is_json:
return jsonify({"success": True, "message": success_msg})
flash(success_msg, "success")
return redirect(url_for('users.user_settings'))
except Exception as e:
user_logger.error(f"Error changing password: {str(e)}")
error_msg = "Fehler beim Ändern des Passworts"
if request.is_json:
return jsonify({"error": error_msg}), 500
flash(error_msg, "error")
return redirect(url_for('users.user_settings'))
@users_blueprint.route('/user/export', methods=['GET'])
@login_required
def export_user_data():
"""DSGVO-konformer Datenexport für Benutzer"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == current_user.id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Umfassende Benutzerdaten sammeln
user_data = {
"export_info": {
"generated_at": datetime.now().isoformat(),
"user_id": user.id,
"export_type": "DSGVO_complete_data_export"
},
"personal_data": {
"username": user.username,
"email": user.email,
"name": user.name,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None
},
"preferences": {
"theme_preference": user.theme_preference,
"language_preference": user.language_preference,
"email_notifications": user.email_notifications,
"browser_notifications": user.browser_notifications,
"dashboard_layout": user.dashboard_layout,
"compact_mode": user.compact_mode,
"show_completed_jobs": user.show_completed_jobs,
"auto_refresh_interval": user.auto_refresh_interval
},
"system_info": {
"total_jobs_created": len(user.jobs) if hasattr(user, 'jobs') else 0,
"account_status": "active" if user.active else "inactive"
}
}
# Jobs-Daten hinzufügen (falls verfügbar)
if hasattr(user, 'jobs'):
user_data["job_history"] = []
for job in user.jobs:
job_info = {
"id": job.id,
"title": job.title,
"status": job.status,
"created_at": job.created_at.isoformat() if job.created_at else None,
"updated_at": job.updated_at.isoformat() if job.updated_at else None
}
user_data["job_history"].append(job_info)
# JSON-Response mit Download-Headers erstellen
response = make_response(jsonify(user_data))
response.headers['Content-Type'] = 'application/json; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename=user_data_export_{user.username}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
user_logger.info(f"User {user.username} exported personal data (DSGVO)")
return response
except Exception as e:
user_logger.error(f"Error exporting user data: {str(e)}")
return jsonify({"error": "Fehler beim Exportieren der Benutzerdaten"}), 500
# ===== ADMINISTRATIVE BENUTZERVERWALTUNG (ursprünglich users.py) =====
@users_blueprint.route('/admin/users/<int:user_id>/permissions', methods=['GET'])
@users_admin_required
def user_permissions_page(user_id):
"""Admin-Seite für Benutzerberechtigungen"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('admin.users_overview'))
# UserPermissions laden oder erstellen
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
session.commit()
users_logger.info(f"Admin {current_user.username} accessed permissions for user {user.username}")
return render_template('admin/user_permissions.html', user=user, permissions=permissions)
except Exception as e:
users_logger.error(f"Error loading user permissions page: {str(e)}")
flash("Fehler beim Laden der Benutzerberechtigungen", "error")
return redirect(url_for('admin.users_overview'))
@users_blueprint.route('/api/users/<int:user_id>/permissions', methods=['GET'])
@users_admin_required
def get_user_permissions_api(user_id):
"""API-Endpunkt für Benutzerberechtigungen"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
session.commit()
permissions_data = {
"user_id": user_id,
"username": user.username,
"can_start_jobs": permissions.can_start_jobs,
"needs_approval": permissions.needs_approval,
"can_approve_jobs": permissions.can_approve_jobs,
"max_concurrent_jobs": permissions.max_concurrent_jobs,
"created_at": permissions.created_at.isoformat() if permissions.created_at else None,
"updated_at": permissions.updated_at.isoformat() if permissions.updated_at else None
}
return jsonify(permissions_data)
except Exception as e:
users_logger.error(f"Error getting user permissions via API: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen der Benutzerberechtigungen"}), 500
@users_blueprint.route('/api/users/<int:user_id>/permissions', methods=['PUT'])
@users_admin_required
def update_user_permissions_api(user_id):
"""Benutzerberechtigungen via API aktualisieren"""
try:
data = request.get_json()
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
# Berechtigungen aktualisieren
permission_fields = ['can_start_jobs', 'needs_approval', 'can_approve_jobs', 'max_concurrent_jobs']
for field in permission_fields:
if field in data:
setattr(permissions, field, data[field])
permissions.updated_at = datetime.now()
session.commit()
users_logger.info(f"Admin {current_user.username} updated permissions for user {user.username} via API")
return jsonify({
"success": True,
"message": "Benutzerberechtigungen erfolgreich aktualisiert"
})
except SQLAlchemyError as e:
users_logger.error(f"Database error updating permissions: {str(e)}")
return jsonify({"error": "Datenbankfehler beim Aktualisieren der Berechtigungen"}), 500
except Exception as e:
users_logger.error(f"Error updating user permissions via API: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren der Benutzerberechtigungen"}), 500
@users_blueprint.route('/admin/users/<int:user_id>/permissions/update', methods=['POST'])
@users_admin_required
def update_user_permissions_form(user_id):
"""Benutzerberechtigungen via Formular aktualisieren"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
flash("Benutzer nicht gefunden", "error")
return redirect(url_for('admin.users_overview'))
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
# Formular-Daten verarbeiten (Checkboxen)
permissions.can_start_jobs = 'can_start_jobs' in request.form
permissions.needs_approval = 'needs_approval' in request.form
permissions.can_approve_jobs = 'can_approve_jobs' in request.form
# Max concurrent jobs
max_jobs = request.form.get('max_concurrent_jobs')
if max_jobs:
try:
permissions.max_concurrent_jobs = int(max_jobs)
except ValueError:
permissions.max_concurrent_jobs = 3 # Default
permissions.updated_at = datetime.now()
session.commit()
users_logger.info(f"Admin {current_user.username} updated permissions for user {user.username} via form")
flash(f"Berechtigungen für {user.username} erfolgreich aktualisiert", "success")
return redirect(url_for('users.user_permissions_page', user_id=user_id))
except SQLAlchemyError as e:
users_logger.error(f"Database error updating permissions via form: {str(e)}")
flash("Datenbankfehler beim Aktualisieren der Berechtigungen", "error")
return redirect(url_for('users.user_permissions_page', user_id=user_id))
except Exception as e:
users_logger.error(f"Error updating user permissions via form: {str(e)}")
flash("Fehler beim Aktualisieren der Benutzerberechtigungen", "error")
return redirect(url_for('users.user_permissions_page', user_id=user_id))
@users_blueprint.route('/admin/users/<int:user_id>/edit/permissions', methods=['GET'])
@users_admin_required
def edit_user_permissions_section(user_id):
"""Berechtigungsbereich für Benutzer-Bearbeitungsformular"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
if not permissions:
permissions = UserPermission(user_id=user_id)
session.add(permissions)
session.commit()
# Template-Fragment für AJAX-Anfragen
return render_template('admin/user_permissions_section.html', user=user, permissions=permissions)
except Exception as e:
users_logger.error(f"Error loading permissions section: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Berechtigungen"}), 500
# ===== UNIFIED API LAYER =====
@users_blueprint.route('/api/users/<int:user_id>', methods=['GET'])
@users_admin_required
def get_user_details_api(user_id):
"""Vollständige Benutzerdaten via API (Admin-Zugriff)"""
try:
with get_cached_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Berechtigungen laden
permissions = session.query(UserPermission).filter(UserPermission.user_id == user_id).first()
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"department": user.department,
"position": user.position,
"phone": user.phone,
"bio": user.bio,
"created_at": user.created_at.isoformat() if user.created_at else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"preferences": {
"theme_preference": user.theme_preference,
"language_preference": user.language_preference,
"email_notifications": user.email_notifications,
"browser_notifications": user.browser_notifications,
"dashboard_layout": user.dashboard_layout,
"compact_mode": user.compact_mode,
"show_completed_jobs": user.show_completed_jobs,
"auto_refresh_interval": user.auto_refresh_interval
}
}
# Berechtigungen hinzufügen (falls verfügbar)
if permissions:
user_data["permissions"] = {
"can_start_jobs": permissions.can_start_jobs,
"needs_approval": permissions.needs_approval,
"can_approve_jobs": permissions.can_approve_jobs,
"max_concurrent_jobs": permissions.max_concurrent_jobs
}
return jsonify(user_data)
except Exception as e:
users_logger.error(f"Error getting user details via API: {str(e)}")
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 500