🎉 Improved backend structure & added utility modules 🎨📚

This commit is contained in:
2025-06-01 00:26:29 +02:00
parent 91548dfb0e
commit 9e1719df4d
27 changed files with 750 additions and 60 deletions

View File

@ -933,4 +933,36 @@ def get_advanced_table_css() -> str:
gap: 1rem;
}
}
"""
"""
def create_table_config(table_id: str, columns: List[ColumnConfig], **kwargs) -> TableConfig:
"""
Erstellt eine neue Tabellen-Konfiguration.
Args:
table_id: Eindeutige ID für die Tabelle
columns: Liste der Spalten-Konfigurationen
**kwargs: Zusätzliche Konfigurationsoptionen
Returns:
TableConfig: Konfiguration für die erweiterte Tabelle
"""
return TableConfig(
table_id=table_id,
columns=columns,
default_sort=kwargs.get('default_sort', []),
default_filters=kwargs.get('default_filters', []),
pagination=kwargs.get('pagination', PaginationConfig()),
searchable=kwargs.get('searchable', True),
exportable=kwargs.get('exportable', True),
selectable=kwargs.get('selectable', False),
row_actions=kwargs.get('row_actions', [])
)
def get_advanced_tables_js() -> str:
"""Alias für die bestehende Funktion"""
return get_advanced_table_javascript()
def get_advanced_tables_css() -> str:
"""Alias für die bestehende Funktion"""
return get_advanced_table_css()

View File

@ -685,4 +685,106 @@ def get_maintenance_javascript() -> str:
document.addEventListener('DOMContentLoaded', function() {
window.maintenanceManager = new MaintenanceManager();
});
"""
"""
def create_maintenance_task(printer_id: int, title: str, description: str = "",
maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE,
priority: MaintenancePriority = MaintenancePriority.NORMAL) -> int:
"""
Erstellt eine neue Wartungsaufgabe.
Args:
printer_id: ID des Druckers
title: Titel der Wartungsaufgabe
description: Beschreibung der Aufgabe
maintenance_type: Art der Wartung
priority: Priorität der Aufgabe
Returns:
int: ID der erstellten Aufgabe
"""
task = MaintenanceTask(
printer_id=printer_id,
title=title,
description=description,
maintenance_type=maintenance_type,
priority=priority,
checklist=maintenance_manager.create_maintenance_checklist(maintenance_type)
)
return maintenance_manager.create_task(task)
def schedule_maintenance(printer_id: int, maintenance_type: MaintenanceType,
interval_days: int, description: str = "") -> MaintenanceSchedule:
"""
Plant regelmäßige Wartungen (Alias für maintenance_manager.schedule_maintenance).
Args:
printer_id: ID des Druckers
maintenance_type: Art der Wartung
interval_days: Intervall in Tagen
description: Beschreibung
Returns:
MaintenanceSchedule: Erstellter Wartungsplan
"""
return maintenance_manager.schedule_maintenance(
printer_id=printer_id,
maintenance_type=maintenance_type,
interval_days=interval_days,
description=description
)
def get_maintenance_overview() -> Dict[str, Any]:
"""
Holt eine Übersicht aller Wartungsaktivitäten.
Returns:
Dict: Wartungsübersicht mit Statistiken und anstehenden Aufgaben
"""
upcoming = maintenance_manager.get_upcoming_maintenance()
overdue = maintenance_manager.get_overdue_tasks()
metrics = maintenance_manager.get_maintenance_metrics()
# Aktive Tasks
active_tasks = [task for task in maintenance_manager.tasks.values()
if task.status == MaintenanceStatus.IN_PROGRESS]
# Completed tasks in last 30 days
thirty_days_ago = datetime.now() - timedelta(days=30)
recent_completed = [task for task in maintenance_manager.maintenance_history
if task.completed_at and task.completed_at >= thirty_days_ago]
return {
'summary': {
'total_tasks': len(maintenance_manager.tasks),
'active_tasks': len(active_tasks),
'upcoming_tasks': len(upcoming),
'overdue_tasks': len(overdue),
'completed_this_month': len(recent_completed)
},
'upcoming_tasks': [asdict(task) for task in upcoming[:10]],
'overdue_tasks': [asdict(task) for task in overdue],
'active_tasks': [asdict(task) for task in active_tasks],
'recent_completed': [asdict(task) for task in recent_completed[:5]],
'metrics': asdict(metrics),
'schedules': {
printer_id: [asdict(schedule) for schedule in schedules]
for printer_id, schedules in maintenance_manager.schedules.items()
}
}
def update_maintenance_status(task_id: int, new_status: MaintenanceStatus,
notes: str = "") -> bool:
"""
Aktualisiert den Status einer Wartungsaufgabe (Alias für maintenance_manager.update_task_status).
Args:
task_id: ID der Wartungsaufgabe
new_status: Neuer Status
notes: Optionale Notizen
Returns:
bool: True wenn erfolgreich aktualisiert
"""
return maintenance_manager.update_task_status(task_id, new_status, notes)

View File

@ -520,23 +520,138 @@ class MultiLocationManager:
}
# Globale Instanz
multi_location_manager = MultiLocationManager()
location_manager = MultiLocationManager()
# Alias für Import-Kompatibilität
LocationManager = MultiLocationManager
def create_location(name: str, code: str, location_type: LocationType = LocationType.BRANCH,
address: str = "", city: str = "", country: str = "",
parent_id: Optional[int] = None) -> int:
"""
Erstellt einen neuen Standort (globale Funktion).
Args:
name: Name des Standorts
code: Kurzer Code für den Standort
location_type: Art des Standorts
address: Adresse
city: Stadt
country: Land
parent_id: Parent-Standort ID
Returns:
int: ID des erstellten Standorts
"""
location = Location(
name=name,
code=code,
location_type=location_type,
address=address,
city=city,
country=country,
parent_id=parent_id
)
return location_manager.create_location(location)
def assign_user_to_location(user_id: int, location_id: int,
access_level: AccessLevel = AccessLevel.READ_WRITE,
granted_by: int = 1, is_primary: bool = False) -> bool:
"""
Weist einen Benutzer einem Standort zu.
Args:
user_id: ID des Benutzers
location_id: ID des Standorts
access_level: Zugriffslevel
granted_by: ID des gewährenden Benutzers
is_primary: Ob dies der primäre Standort ist
Returns:
bool: True wenn erfolgreich
"""
return location_manager.grant_location_access(
user_id=user_id,
location_id=location_id,
access_level=access_level,
granted_by=granted_by,
is_primary=is_primary
)
def get_user_locations(user_id: int) -> List[Location]:
"""
Holt alle Standorte eines Benutzers (globale Funktion).
Args:
user_id: ID des Benutzers
Returns:
List[Location]: Liste der zugänglichen Standorte
"""
return location_manager.get_user_locations(user_id)
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Berechnet die Entfernung zwischen zwei Koordinaten (Haversine-Formel).
Args:
lat1, lon1: Koordinaten des ersten Punkts
lat2, lon2: Koordinaten des zweiten Punkts
Returns:
float: Entfernung in Kilometern
"""
from math import radians, sin, cos, sqrt, atan2
R = 6371 # Erdradius in km
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def find_nearest_location(latitude: float, longitude: float,
radius_km: float = 50) -> Optional[Location]:
"""
Findet den nächstgelegenen Standort.
Args:
latitude: Breitengrad
longitude: Längengrad
radius_km: Suchradius in Kilometern
Returns:
Optional[Location]: Nächstgelegener Standort oder None
"""
nearest_locations = location_manager.find_nearest_locations(
latitude=latitude,
longitude=longitude,
radius_km=radius_km,
limit=1
)
return nearest_locations[0][0] if nearest_locations else None
def get_location_dashboard_data(user_id: int) -> Dict[str, Any]:
"""Holt Dashboard-Daten für Standorte eines Benutzers"""
user_locations = multi_location_manager.get_user_locations(user_id)
primary_location = multi_location_manager.get_user_primary_location(user_id)
user_locations = location_manager.get_user_locations(user_id)
primary_location = location_manager.get_user_primary_location(user_id)
dashboard_data = {
'user_locations': [asdict(loc) for loc in user_locations],
'primary_location': asdict(primary_location) if primary_location else None,
'location_count': len(user_locations),
'hierarchy': multi_location_manager.get_location_hierarchy()
'hierarchy': location_manager.get_location_hierarchy()
}
# Füge Statistiken für jeden Standort hinzu
for location in user_locations:
location_stats = multi_location_manager.get_location_statistics(location.id)
location_stats = location_manager.get_location_statistics(location.id)
dashboard_data[f'stats_{location.id}'] = location_stats
return dashboard_data
@ -553,7 +668,7 @@ def create_location_from_address(name: str, address: str, city: str,
country=country
)
return multi_location_manager.create_location(location)
return location_manager.create_location(location)
# JavaScript für Multi-Location Frontend
def get_multi_location_javascript() -> str:

View File

@ -43,6 +43,7 @@ class Permission(Enum):
EXTEND_JOB = "extend_job"
CANCEL_JOB = "cancel_job"
VIEW_JOB_HISTORY = "view_job_history"
APPROVE_JOBS = "approve_jobs" # Berechtigung zum Genehmigen und Verwalten von Jobs
# Benutzer-Berechtigungen
VIEW_USERS = "view_users"
@ -59,6 +60,7 @@ class Permission(Enum):
EXPORT_DATA = "export_data"
BACKUP_DATABASE = "backup_database"
MANAGE_SETTINGS = "manage_settings"
ADMIN = "admin" # Allgemeine Admin-Berechtigung für administrative Funktionen
# Gast-Berechtigungen
VIEW_GUEST_REQUESTS = "view_guest_requests"
@ -149,6 +151,7 @@ ROLE_PERMISSIONS[Role.SUPERVISOR] = ROLE_PERMISSIONS[Role.TECHNICIAN] | {
Permission.MANAGE_GUEST_REQUESTS,
Permission.MANAGE_SHIFTS,
Permission.VIEW_USER_DETAILS,
Permission.APPROVE_JOBS, # Jobs genehmigen und verwalten
}
# Admin erweitert Supervisor-Permissions
@ -161,6 +164,7 @@ ROLE_PERMISSIONS[Role.ADMIN] = ROLE_PERMISSIONS[Role.SUPERVISOR] | {
Permission.EXPORT_DATA,
Permission.VIEW_LOGS,
Permission.MANAGE_SETTINGS,
Permission.ADMIN, # Allgemeine Admin-Berechtigung hinzufügen
}
# Super Admin hat alle Berechtigungen

View File

@ -25,12 +25,28 @@ from concurrent.futures import ThreadPoolExecutor
# WebSocket-Support
try:
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect
from eventlet import wsgi, listen
import eventlet
# Eventlet separat importieren für bessere Fehlerbehandlung
try:
from eventlet import wsgi, listen
import eventlet
EVENTLET_AVAILABLE = True
except (ImportError, AttributeError) as e:
# Eventlet nicht verfügbar oder nicht kompatibel (z.B. Python 3.13)
print(f"⚠️ Eventlet nicht verfügbar: {e}")
print("🔄 Fallback auf standard threading mode für WebSocket")
EVENTLET_AVAILABLE = False
eventlet = None
wsgi = None
listen = None
WEBSOCKET_AVAILABLE = True
except ImportError:
except ImportError as e:
print(f"⚠️ Flask-SocketIO nicht verfügbar: {e}")
print("📡 Dashboard läuft ohne Real-time Updates")
WEBSOCKET_AVAILABLE = False
EVENTLET_AVAILABLE = False
SocketIO = None
eventlet = None
from flask import request, session, current_app
from flask_login import current_user
@ -122,16 +138,24 @@ class DashboardManager:
logger.warning("WebSocket-Funktionalität nicht verfügbar. Flask-SocketIO installieren.")
return None
# Bestimme den async_mode basierend auf verfügbaren Bibliotheken
if EVENTLET_AVAILABLE:
async_mode = 'eventlet'
logger.info("Dashboard WebSocket-Server wird mit eventlet initialisiert")
else:
async_mode = 'threading'
logger.info("Dashboard WebSocket-Server wird mit threading initialisiert (eventlet-Fallback)")
self.socketio = SocketIO(
app,
cors_allowed_origins=cors_allowed_origins,
logger=False,
engineio_logger=False,
async_mode='eventlet'
async_mode=async_mode
)
self._setup_socket_handlers()
logger.info("Dashboard WebSocket-Server initialisiert")
logger.info(f"Dashboard WebSocket-Server initialisiert (async_mode: {async_mode})")
return self.socketio
def _setup_socket_handlers(self):