#!/usr/bin/env python3 """ Erweiterte Berechtigungsverwaltung für MYP Platform Granulare Rollen und Permissions für feingranulare Zugriffskontrolle """ from enum import Enum from functools import wraps from typing import List, Dict, Set, Optional from flask import request, jsonify, abort from flask_login import login_required, current_user from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, Table, DateTime, MetaData from sqlalchemy.orm import relationship from datetime import datetime, timedelta from utils.logging_config import get_logger logger = get_logger("permissions") # ===== PERMISSION DEFINITIONS ===== class Permission(Enum): """Alle verfügbaren Berechtigungen im System""" # Basis-Berechtigungen LOGIN = "login" VIEW_DASHBOARD = "view_dashboard" # Drucker-Berechtigungen VIEW_PRINTERS = "view_printers" CREATE_PRINTER = "create_printer" EDIT_PRINTER = "edit_printer" DELETE_PRINTER = "delete_printer" CONTROL_PRINTER = "control_printer" # Ein-/Ausschalten VIEW_PRINTER_DETAILS = "view_printer_details" # Job-Berechtigungen VIEW_JOBS = "view_jobs" CREATE_JOB = "create_job" EDIT_OWN_JOB = "edit_own_job" EDIT_ALL_JOBS = "edit_all_jobs" DELETE_OWN_JOB = "delete_own_job" DELETE_ALL_JOBS = "delete_all_jobs" EXTEND_JOB = "extend_job" CANCEL_JOB = "cancel_job" VIEW_JOB_HISTORY = "view_job_history" APPROVE_JOBS = "approve_jobs" # Berechtigung zum Genehmigen und Verwalten von Jobs # Benutzer-Berechtigungen VIEW_USERS = "view_users" CREATE_USER = "create_user" EDIT_USER = "edit_user" DELETE_USER = "delete_user" MANAGE_ROLES = "manage_roles" VIEW_USER_DETAILS = "view_user_details" # Admin-Berechtigungen VIEW_ADMIN_PANEL = "view_admin_panel" MANAGE_SYSTEM = "manage_system" VIEW_LOGS = "view_logs" EXPORT_DATA = "export_data" BACKUP_DATABASE = "backup_database" MANAGE_SETTINGS = "manage_settings" ADMIN = "admin" # Allgemeine Admin-Berechtigung für administrative Funktionen # Gast-Berechtigungen VIEW_GUEST_REQUESTS = "view_guest_requests" CREATE_GUEST_REQUEST = "create_guest_request" APPROVE_GUEST_REQUEST = "approve_guest_request" DENY_GUEST_REQUEST = "deny_guest_request" MANAGE_GUEST_REQUESTS = "manage_guest_requests" # Statistik-Berechtigungen VIEW_STATS = "view_stats" VIEW_DETAILED_STATS = "view_detailed_stats" EXPORT_STATS = "export_stats" # Kalender-Berechtigungen VIEW_CALENDAR = "view_calendar" EDIT_CALENDAR = "edit_calendar" MANAGE_SHIFTS = "manage_shifts" # Wartung-Berechtigungen SCHEDULE_MAINTENANCE = "schedule_maintenance" VIEW_MAINTENANCE = "view_maintenance" PERFORM_MAINTENANCE = "perform_maintenance" class Role(Enum): """Vordefinierte Rollen mit Standard-Berechtigungen""" GUEST = "guest" USER = "user" POWER_USER = "power_user" TECHNICIAN = "technician" SUPERVISOR = "supervisor" ADMIN = "admin" SUPER_ADMIN = "super_admin" # ===== ROLE PERMISSIONS MAPPING ===== ROLE_PERMISSIONS = { Role.GUEST: { Permission.LOGIN, Permission.VIEW_PRINTERS, Permission.CREATE_GUEST_REQUEST, Permission.VIEW_CALENDAR, }, Role.USER: { Permission.LOGIN, Permission.VIEW_DASHBOARD, Permission.VIEW_PRINTERS, Permission.VIEW_JOBS, Permission.CREATE_JOB, Permission.EDIT_OWN_JOB, Permission.DELETE_OWN_JOB, Permission.EXTEND_JOB, Permission.CANCEL_JOB, Permission.VIEW_STATS, Permission.VIEW_CALENDAR, Permission.CREATE_GUEST_REQUEST, }, } # Power User erweitert User-Permissions ROLE_PERMISSIONS[Role.POWER_USER] = ROLE_PERMISSIONS[Role.USER] | { Permission.VIEW_PRINTER_DETAILS, Permission.VIEW_JOB_HISTORY, Permission.VIEW_DETAILED_STATS, Permission.EXPORT_STATS, Permission.VIEW_GUEST_REQUESTS, } # Technician erweitert Power User-Permissions ROLE_PERMISSIONS[Role.TECHNICIAN] = ROLE_PERMISSIONS[Role.POWER_USER] | { Permission.CONTROL_PRINTER, Permission.EDIT_PRINTER, Permission.SCHEDULE_MAINTENANCE, Permission.VIEW_MAINTENANCE, Permission.PERFORM_MAINTENANCE, Permission.EDIT_CALENDAR, } # Supervisor erweitert Technician-Permissions ROLE_PERMISSIONS[Role.SUPERVISOR] = ROLE_PERMISSIONS[Role.TECHNICIAN] | { Permission.CREATE_PRINTER, Permission.EDIT_ALL_JOBS, Permission.DELETE_ALL_JOBS, Permission.VIEW_USERS, Permission.APPROVE_GUEST_REQUEST, Permission.DENY_GUEST_REQUEST, Permission.MANAGE_GUEST_REQUESTS, Permission.MANAGE_SHIFTS, Permission.VIEW_USER_DETAILS, Permission.APPROVE_JOBS, # Jobs genehmigen und verwalten } # Admin erweitert Supervisor-Permissions ROLE_PERMISSIONS[Role.ADMIN] = ROLE_PERMISSIONS[Role.SUPERVISOR] | { Permission.DELETE_PRINTER, Permission.VIEW_ADMIN_PANEL, Permission.CREATE_USER, Permission.EDIT_USER, Permission.DELETE_USER, Permission.EXPORT_DATA, Permission.VIEW_LOGS, Permission.MANAGE_SETTINGS, Permission.ADMIN, # Allgemeine Admin-Berechtigung hinzufügen } # Super Admin hat alle Berechtigungen ROLE_PERMISSIONS[Role.SUPER_ADMIN] = {perm for perm in Permission} # ===== DATABASE MODELS EXTENSIONS ===== # Metadata für die Tabellen erstellen metadata = MetaData() # Many-to-Many Tabelle für User-Permissions user_permissions = Table('user_permissions', metadata, Column('user_id', Integer, ForeignKey('users.id'), primary_key=True), Column('permission_id', Integer, ForeignKey('permissions.id'), primary_key=True) ) # Many-to-Many Tabelle für User-Roles user_roles = Table('user_roles', metadata, Column('user_id', Integer, ForeignKey('users.id'), primary_key=True), Column('role_id', Integer, ForeignKey('roles.id'), primary_key=True) ) class PermissionModel: """Datenbank-Model für Berechtigungen""" __tablename__ = 'permissions' id = Column(Integer, primary_key=True) name = Column(String(100), unique=True, nullable=False) description = Column(String(255)) category = Column(String(50)) # Gruppierung von Berechtigungen created_at = Column(DateTime, default=datetime.now) class RoleModel: """Datenbank-Model für Rollen""" __tablename__ = 'roles' id = Column(Integer, primary_key=True) name = Column(String(50), unique=True, nullable=False) display_name = Column(String(100)) description = Column(String(255)) is_system_role = Column(Boolean, default=False) # System-Rollen können nicht gelöscht werden created_at = Column(DateTime, default=datetime.now) # Relationships permissions = relationship("PermissionModel", secondary="role_permissions", back_populates="roles") class UserPermissionOverride: """Temporäre oder spezielle Berechtigungsüberschreibungen""" __tablename__ = 'user_permission_overrides' id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('users.id'), nullable=False) permission = Column(String(100), nullable=False) granted = Column(Boolean, nullable=False) # True = gewährt, False = verweigert reason = Column(String(255)) granted_by = Column(Integer, ForeignKey('users.id')) expires_at = Column(DateTime, nullable=True) # NULL = permanent created_at = Column(DateTime, default=datetime.now) # ===== PERMISSION CHECKER CLASS ===== class PermissionChecker: """Hauptklasse für Berechtigungsprüfungen""" def __init__(self, user=None): self.user = user or current_user self._permission_cache = {} self._cache_timeout = timedelta(minutes=5) self._cache_timestamp = None def has_permission(self, permission: Permission) -> bool: """ Prüft ob der Benutzer eine bestimmte Berechtigung hat Args: permission: Die zu prüfende Berechtigung Returns: bool: True wenn Berechtigung vorhanden """ if not self.user or not self.user.is_authenticated: return False # Cache prüfen if self._is_cache_valid() and permission.value in self._permission_cache: return self._permission_cache[permission.value] # Berechtigungen neu berechnen has_perm = self._calculate_permission(permission) # Cache aktualisieren self._update_cache(permission.value, has_perm) return has_perm def _calculate_permission(self, permission: Permission) -> bool: """Berechnet ob eine Berechtigung vorhanden ist""" # Super Admin hat alle Rechte if hasattr(self.user, 'is_super_admin') and self.user.is_super_admin: return True # Explizite Überschreibungen prüfen override = self._check_permission_override(permission) if override is not None: return override # Rollen-basierte Berechtigungen prüfen user_roles = self._get_user_roles() for role in user_roles: if permission in ROLE_PERMISSIONS.get(role, set()): return True # Direkte Benutzer-Berechtigungen prüfen if hasattr(self.user, 'permissions'): user_permissions = [Permission(p.name) for p in self.user.permissions if hasattr(Permission, p.name.upper())] if permission in user_permissions: return True return False def _check_permission_override(self, permission: Permission) -> Optional[bool]: """Prüft ob es eine Berechtigungsüberschreibung gibt""" if not hasattr(self.user, 'permission_overrides'): return None now = datetime.now() for override in self.user.permission_overrides: if (override.permission == permission.value and (override.expires_at is None or override.expires_at > now)): logger.info(f"Permission override angewendet: {permission.value} = {override.granted} für User {self.user.id}") return override.granted return None def _get_user_roles(self) -> List[Role]: """Holt die Rollen des Benutzers""" roles = [] # Legacy Admin-Check if hasattr(self.user, 'is_admin') and self.user.is_admin: roles.append(Role.ADMIN) # Neue Rollen-System if hasattr(self.user, 'roles'): for role_model in self.user.roles: try: role = Role(role_model.name) roles.append(role) except ValueError: logger.warning(f"Unbekannte Rolle: {role_model.name}") # Standard-Rolle wenn keine andere definiert if not roles: roles.append(Role.USER) return roles def _is_cache_valid(self) -> bool: """Prüft ob der Permission-Cache noch gültig ist""" if self._cache_timestamp is None: return False return datetime.now() - self._cache_timestamp < self._cache_timeout def _update_cache(self, permission: str, has_permission: bool): """Aktualisiert den Permission-Cache""" if self._cache_timestamp is None or not self._is_cache_valid(): self._permission_cache = {} self._cache_timestamp = datetime.now() self._permission_cache[permission] = has_permission def get_all_permissions(self) -> Set[Permission]: """Gibt alle Berechtigungen des Benutzers zurück""" permissions = set() for permission in Permission: if self.has_permission(permission): permissions.add(permission) return permissions def can_access_resource(self, resource_type: str, resource_id: int = None, action: str = "view") -> bool: """ Prüft Zugriff auf spezifische Ressourcen Args: resource_type: Art der Ressource (job, printer, user, etc.) resource_id: ID der Ressource (optional) action: Aktion (view, edit, delete, etc.) Returns: bool: True wenn Zugriff erlaubt """ # Resource-spezifische Logik if resource_type == "job": return self._check_job_access(resource_id, action) elif resource_type == "printer": return self._check_printer_access(resource_id, action) elif resource_type == "user": return self._check_user_access(resource_id, action) return False def _check_job_access(self, job_id: int, action: str) -> bool: """Prüft Job-spezifische Zugriffsrechte""" if action == "view": if self.has_permission(Permission.VIEW_JOBS): return True elif action == "edit": if self.has_permission(Permission.EDIT_ALL_JOBS): return True if self.has_permission(Permission.EDIT_OWN_JOB) and job_id: # Prüfen ob eigener Job (vereinfacht) return self._is_own_job(job_id) elif action == "delete": if self.has_permission(Permission.DELETE_ALL_JOBS): return True if self.has_permission(Permission.DELETE_OWN_JOB) and job_id: return self._is_own_job(job_id) return False def _check_printer_access(self, printer_id: int, action: str) -> bool: """Prüft Drucker-spezifische Zugriffsrechte""" if action == "view": return self.has_permission(Permission.VIEW_PRINTERS) elif action == "edit": return self.has_permission(Permission.EDIT_PRINTER) elif action == "delete": return self.has_permission(Permission.DELETE_PRINTER) elif action == "control": return self.has_permission(Permission.CONTROL_PRINTER) return False def _check_user_access(self, user_id: int, action: str) -> bool: """Prüft Benutzer-spezifische Zugriffsrechte""" if action == "view": if self.has_permission(Permission.VIEW_USERS): return True # Eigenes Profil ansehen if user_id == self.user.id: return True elif action == "edit": if self.has_permission(Permission.EDIT_USER): return True # Eigenes Profil bearbeiten (begrenzt) if user_id == self.user.id: return True elif action == "delete": if self.has_permission(Permission.DELETE_USER) and user_id != self.user.id: return True return False def _is_own_job(self, job_id: int) -> bool: """Hilfsfunktion um zu prüfen ob Job dem Benutzer gehört""" # Vereinfachte Implementierung - sollte mit DB-Query implementiert werden try: from models import Job, get_db_session db_session = get_db_session() job = db_session.query(Job).filter(Job.id == job_id).first() db_session.close() return job and (job.user_id == self.user.id or job.owner_id == self.user.id) except Exception as e: logger.error(f"Fehler bei Job-Ownership-Check: {e}") return False # ===== DECORATORS ===== def require_permission(permission: Permission): """ Decorator der eine bestimmte Berechtigung erfordert Args: permission: Die erforderliche Berechtigung """ def decorator(f): @wraps(f) @login_required def wrapper(*args, **kwargs): checker = PermissionChecker() if not checker.has_permission(permission): logger.warning(f"Zugriff verweigert: User {current_user.id} hat keine Berechtigung {permission.value}") if request.path.startswith('/api/'): return jsonify({ 'error': 'Insufficient permissions', 'message': f'Berechtigung "{permission.value}" erforderlich', 'required_permission': permission.value }), 403 else: abort(403) return f(*args, **kwargs) return wrapper return decorator def require_role(role: Role): """ Decorator der eine bestimmte Rolle erfordert Args: role: Die erforderliche Rolle """ def decorator(f): @wraps(f) @login_required def wrapper(*args, **kwargs): checker = PermissionChecker() user_roles = checker._get_user_roles() if role not in user_roles: logger.warning(f"Zugriff verweigert: User {current_user.id} hat nicht die Rolle {role.value}") if request.path.startswith('/api/'): return jsonify({ 'error': 'Insufficient role', 'message': f'Rolle "{role.value}" erforderlich', 'required_role': role.value }), 403 else: abort(403) return f(*args, **kwargs) return wrapper return decorator def require_resource_access(resource_type: str, action: str = "view"): """ Decorator für ressourcen-spezifische Berechtigungsprüfung Args: resource_type: Art der Ressource action: Erforderliche Aktion """ def decorator(f): @wraps(f) @login_required def wrapper(*args, **kwargs): # Resource ID aus URL-Parametern extrahieren resource_id = kwargs.get('id') or kwargs.get(f'{resource_type}_id') checker = PermissionChecker() if not checker.can_access_resource(resource_type, resource_id, action): logger.warning(f"Ressourcen-Zugriff verweigert: User {current_user.id}, {resource_type}:{resource_id}, Action: {action}") if request.path.startswith('/api/'): return jsonify({ 'error': 'Resource access denied', 'message': f'Zugriff auf {resource_type} nicht erlaubt', 'resource_type': resource_type, 'action': action }), 403 else: abort(403) return f(*args, **kwargs) return wrapper return decorator # ===== UTILITY FUNCTIONS ===== def check_permission(permission: Permission, user=None) -> bool: """ Standalone-Funktion zur Berechtigungsprüfung Args: permission: Die zu prüfende Berechtigung user: Benutzer (optional, default: current_user) Returns: bool: True wenn Berechtigung vorhanden """ checker = PermissionChecker(user) return checker.has_permission(permission) def get_user_permissions(user=None) -> Set[Permission]: """ Gibt alle Berechtigungen eines Benutzers zurück Args: user: Benutzer (optional, default: current_user) Returns: Set[Permission]: Alle Berechtigungen des Benutzers """ checker = PermissionChecker(user) return checker.get_all_permissions() def grant_temporary_permission(user_id: int, permission: Permission, duration_hours: int = 24, reason: str = "", granted_by_id: int = None): """ Gewährt temporäre Berechtigung Args: user_id: ID des Benutzers permission: Die zu gewährende Berechtigung duration_hours: Dauer in Stunden reason: Begründung granted_by_id: ID des gewährenden Benutzers """ try: from models import get_db_session db_session = get_db_session() override = UserPermissionOverride( user_id=user_id, permission=permission.value, granted=True, reason=reason, granted_by=granted_by_id or (current_user.id if current_user.is_authenticated else None), expires_at=datetime.now() + timedelta(hours=duration_hours) ) db_session.add(override) db_session.commit() db_session.close() logger.info(f"Temporäre Berechtigung gewährt: {permission.value} für User {user_id} ({duration_hours}h)") except Exception as e: logger.error(f"Fehler beim Gewähren temporärer Berechtigung: {e}") # ===== TEMPLATE HELPERS ===== def init_permission_helpers(app): """ Registriert Template-Helper für Berechtigungen Args: app: Flask-App-Instanz """ @app.template_global() def has_permission(permission_name: str) -> bool: """Template Helper für Berechtigungsprüfung""" try: permission = Permission(permission_name) return check_permission(permission) except ValueError: return False @app.template_global() def has_role(role_name: str) -> bool: """Template Helper für Rollenprüfung""" try: role = Role(role_name) checker = PermissionChecker() return role in checker._get_user_roles() except ValueError: return False @app.template_global() def can_access(resource_type: str, resource_id: int = None, action: str = "view") -> bool: """Template Helper für Ressourcen-Zugriff""" checker = PermissionChecker() return checker.can_access_resource(resource_type, resource_id, action) logger.info("🔐 Permission Template Helpers registriert")