Files
Projektarbeit-MYP/backend/utils/permissions.py

252 lines
8.2 KiB
Python

"""
Berechtigungssystem für das MYP 3D-Druck-Management-System
Dieses Modul stellt Decorator und Utility-Funktionen für die Rechteverwaltung bereit.
"""
import logging
from functools import wraps
from flask import jsonify, abort
from flask_login import login_required, current_user
from models import get_db_session, User, UserPermission
from utils.logging_config import get_logger
# Logger initialisieren
permissions_logger = get_logger("permissions")
def admin_required(f):
"""
Decorator für Admin-Berechtigung.
Überprüft sowohl die Rolle als auch die is_admin Property.
Falls UserPermission fehlt, wird sie automatisch erstellt.
"""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
# Authentifizierung prüfen
if not current_user.is_authenticated:
permissions_logger.warning("Nicht authentifizierter Zugriff auf Admin-Funktion")
abort(401)
# Admin-Status prüfen
is_admin = False
# Methode 1: is_admin Property
if hasattr(current_user, 'is_admin') and current_user.is_admin:
is_admin = True
# Methode 2: Role-basierte Prüfung
if hasattr(current_user, 'role') and current_user.role == 'admin':
is_admin = True
if not is_admin:
permissions_logger.warning(f"Nicht-Admin Benutzer {current_user.id} versucht Admin-Zugriff")
abort(403)
# Stelle sicher, dass Admin-Benutzer UserPermissions haben
ensure_admin_permissions(current_user.id)
return f(*args, **kwargs)
return decorated_function
def ensure_admin_permissions(user_id: int):
"""
Stellt sicher, dass Admin-Benutzer die korrekten UserPermissions haben.
Args:
user_id: ID des Admin-Benutzers
"""
try:
db_session = get_db_session()
# Prüfe ob UserPermission existiert
permission = db_session.query(UserPermission).filter_by(user_id=user_id).first()
if not permission:
# Erstelle neue UserPermission für Admin
permission = UserPermission(
user_id=user_id,
can_start_jobs=True,
needs_approval=False,
can_approve_jobs=True
)
db_session.add(permission)
permissions_logger.info(f"UserPermission für Admin-Benutzer {user_id} erstellt")
else:
# Aktualisiere bestehende Permission für Admin
permission.can_start_jobs = True
permission.needs_approval = False
permission.can_approve_jobs = True
permissions_logger.info(f"UserPermission für Admin-Benutzer {user_id} aktualisiert")
db_session.commit()
db_session.close()
except Exception as e:
permissions_logger.error(f"Fehler beim Sicherstellen der Admin-Berechtigungen: {str(e)}")
def fix_all_admin_permissions():
"""
Korrigiert die Berechtigungen für alle Admin-Benutzer im System.
Returns:
dict: Anzahl der korrigierten Benutzer
"""
try:
db_session = get_db_session()
# Alle Admin-Benutzer finden
admin_users = db_session.query(User).filter(
(User.role == 'admin') | (User.is_admin == True)
).all()
corrected_count = 0
created_count = 0
for admin in admin_users:
# Prüfe UserPermission
permission = db_session.query(UserPermission).filter_by(user_id=admin.id).first()
if not permission:
# Erstelle neue UserPermission
permission = UserPermission(
user_id=admin.id,
can_start_jobs=True,
needs_approval=False,
can_approve_jobs=True
)
db_session.add(permission)
created_count += 1
permissions_logger.info(f"UserPermission für Admin {admin.username} (ID: {admin.id}) erstellt")
else:
# Korrigiere bestehende Permission
if not permission.can_approve_jobs or permission.needs_approval or not permission.can_start_jobs:
permission.can_start_jobs = True
permission.needs_approval = False
permission.can_approve_jobs = True
corrected_count += 1
permissions_logger.info(f"UserPermission für Admin {admin.username} (ID: {admin.id}) korrigiert")
db_session.commit()
db_session.close()
permissions_logger.info(f"Admin-Berechtigungen korrigiert: {created_count} erstellt, {corrected_count} aktualisiert")
return {
'success': True,
'created': created_count,
'corrected': corrected_count,
'total_admins': len(admin_users)
}
except Exception as e:
permissions_logger.error(f"Fehler beim Korrigieren der Admin-Berechtigungen: {str(e)}")
return {
'success': False,
'error': str(e)
}
def can_approve_jobs(user_id: int) -> bool:
"""
Prüft, ob ein Benutzer Gastanfragen genehmigen darf.
Args:
user_id: ID des Benutzers
Returns:
bool: True wenn der Benutzer genehmigen darf
"""
try:
db_session = get_db_session()
# Benutzer laden
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
db_session.close()
return False
# Admin-Benutzer können immer genehmigen
if user.role == 'admin' or (hasattr(user, 'is_admin') and user.is_admin):
ensure_admin_permissions(user_id)
db_session.close()
return True
# UserPermission prüfen
permission = db_session.query(UserPermission).filter_by(user_id=user_id).first()
db_session.close()
if not permission:
return False
return permission.can_approve_jobs
except Exception as e:
permissions_logger.error(f"Fehler beim Prüfen der Genehmigungsberechtigung: {str(e)}")
return False
def approver_required(f):
"""
Decorator für Genehmigungsberechtigungen.
Prüft ob der Benutzer Gastanfragen genehmigen darf.
"""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if not can_approve_jobs(current_user.id):
permissions_logger.warning(f"Benutzer {current_user.id} ({current_user.username}) hat keine Genehmigungsberechtigung")
abort(403)
return f(*args, **kwargs)
return decorated_function
def get_user_permissions(user_id: int) -> dict:
"""
Holt die Berechtigungen eines Benutzers.
Args:
user_id: ID des Benutzers
Returns:
dict: Berechtigungen des Benutzers
"""
try:
db_session = get_db_session()
user = db_session.query(User).filter_by(id=user_id).first()
if not user:
db_session.close()
return {}
permission = db_session.query(UserPermission).filter_by(user_id=user_id).first()
# Standard-Berechtigungen für Admin
if user.role == 'admin' or (hasattr(user, 'is_admin') and user.is_admin):
result = {
'is_admin': True,
'can_start_jobs': True,
'needs_approval': False,
'can_approve_jobs': True
}
elif permission:
result = {
'is_admin': False,
'can_start_jobs': permission.can_start_jobs,
'needs_approval': permission.needs_approval,
'can_approve_jobs': permission.can_approve_jobs
}
else:
result = {
'is_admin': False,
'can_start_jobs': False,
'needs_approval': True,
'can_approve_jobs': False
}
db_session.close()
return result
except Exception as e:
permissions_logger.error(f"Fehler beim Abrufen der Benutzer-Berechtigungen: {str(e)}")
return {}