""" Multi-Standort-Unterstützungssystem für das MYP-System ====================================================== Dieses Modul stellt umfassende Multi-Location-Funktionalität bereit: - Standort-Management und Hierarchien - Standort-spezifische Konfigurationen - Zentrale und dezentrale Verwaltung - Standort-übergreifende Berichte - Ressourcen-Sharing zwischen Standorten - Benutzer-Standort-Zuweisungen """ import json import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, asdict from enum import Enum import geocoder import requests from utils.logging_config import get_logger from models import User, Printer, Job, get_db_session logger = get_logger("multi_location") class LocationType(Enum): """Arten von Standorten""" HEADQUARTERS = "headquarters" # Hauptsitz BRANCH = "branch" # Niederlassung DEPARTMENT = "department" # Abteilung FLOOR = "floor" # Stockwerk ROOM = "room" # Raum AREA = "area" # Bereich class AccessLevel(Enum): """Zugriffslevel für Standorte""" FULL = "full" # Vollzugriff READ_WRITE = "read_write" # Lesen und Schreiben READ_ONLY = "read_only" # Nur Lesen NO_ACCESS = "no_access" # Kein Zugriff @dataclass class LocationConfig: """Standort-spezifische Konfiguration""" timezone: str = "Europe/Berlin" business_hours: Dict[str, str] = None maintenance_window: Dict[str, str] = None auto_approval_enabled: bool = False max_job_duration: int = 480 # Minuten contact_info: Dict[str, str] = None notification_settings: Dict[str, Any] = None @dataclass class Location: """Standort-Definition""" id: Optional[int] = None name: str = "" code: str = "" # Kurzer Code für den Standort location_type: LocationType = LocationType.BRANCH parent_id: Optional[int] = None address: str = "" city: str = "" country: str = "" postal_code: str = "" latitude: Optional[float] = None longitude: Optional[float] = None description: str = "" config: LocationConfig = None is_active: bool = True created_at: datetime = None manager_id: Optional[int] = None def __post_init__(self): if self.config is None: self.config = LocationConfig() if self.created_at is None: self.created_at = datetime.now() @dataclass class UserLocationAccess: """Benutzer-Standort-Zugriff""" user_id: int location_id: int access_level: AccessLevel granted_by: int granted_at: datetime expires_at: Optional[datetime] = None is_primary: bool = False class MultiLocationManager: """Manager für Multi-Standort-Funktionalität""" def __init__(self): self.locations: Dict[int, Location] = {} self.user_access: Dict[int, List[UserLocationAccess]] = {} self.next_location_id = 1 # Standard-Standort erstellen self._create_default_location() def _create_default_location(self): """Erstellt Standard-Standort falls keiner existiert""" default_location = Location( id=1, name="Hauptstandort", code="HQ", location_type=LocationType.HEADQUARTERS, address="Mercedes-Benz Platz", city="Stuttgart", country="Deutschland", description="Hauptstandort des MYP-Systems" ) self.locations[1] = default_location self.next_location_id = 2 logger.info("Standard-Standort erstellt") def create_location(self, location: Location) -> int: """Erstellt einen neuen Standort""" location.id = self.next_location_id self.next_location_id += 1 # Koordinaten automatisch ermitteln if not location.latitude or not location.longitude: self._geocode_location(location) self.locations[location.id] = location logger.info(f"Standort erstellt: {location.name} ({location.code})") return location.id def update_location(self, location_id: int, updates: Dict[str, Any]) -> bool: """Aktualisiert einen Standort""" if location_id not in self.locations: return False location = self.locations[location_id] for key, value in updates.items(): if hasattr(location, key): setattr(location, key, value) # Koordinaten neu ermitteln bei Adressänderung if 'address' in updates or 'city' in updates: self._geocode_location(location) logger.info(f"Standort aktualisiert: {location.name}") return True def delete_location(self, location_id: int) -> bool: """Löscht einen Standort (Soft Delete)""" if location_id not in self.locations: return False location = self.locations[location_id] # Prüfe ob Standort Kinder hat children = self.get_child_locations(location_id) if children: logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat Unterstandorte") return False # Prüfe auf aktive Ressourcen if self._has_active_resources(location_id): logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat aktive Ressourcen") return False location.is_active = False logger.info(f"Standort deaktiviert: {location.name}") return True def get_location_hierarchy(self, location_id: Optional[int] = None) -> Dict[str, Any]: """Holt Standort-Hierarchie""" if location_id: # Spezifische Hierarchie ab einem Standort location = self.locations.get(location_id) if not location: return {} return self._build_hierarchy_node(location) else: # Komplette Hierarchie root_locations = [loc for loc in self.locations.values() if loc.parent_id is None and loc.is_active] return { 'locations': [self._build_hierarchy_node(loc) for loc in root_locations] } def _build_hierarchy_node(self, location: Location) -> Dict[str, Any]: """Erstellt einen Hierarchie-Knoten""" children = self.get_child_locations(location.id) return { 'id': location.id, 'name': location.name, 'code': location.code, 'type': location.location_type.value, 'children': [self._build_hierarchy_node(child) for child in children], 'resource_count': self._count_location_resources(location.id) } def get_child_locations(self, parent_id: int) -> List[Location]: """Holt alle Kinder-Standorte""" return [loc for loc in self.locations.values() if loc.parent_id == parent_id and loc.is_active] def get_location_path(self, location_id: int) -> List[Location]: """Holt den Pfad vom Root zum Standort""" path = [] current_id = location_id while current_id: location = self.locations.get(current_id) if not location: break path.insert(0, location) current_id = location.parent_id return path def grant_location_access(self, user_id: int, location_id: int, access_level: AccessLevel, granted_by: int, expires_at: Optional[datetime] = None, is_primary: bool = False) -> bool: """Gewährt Benutzer-Zugriff auf einen Standort""" if location_id not in self.locations: return False access = UserLocationAccess( user_id=user_id, location_id=location_id, access_level=access_level, granted_by=granted_by, granted_at=datetime.now(), expires_at=expires_at, is_primary=is_primary ) if user_id not in self.user_access: self.user_access[user_id] = [] # Entferne vorherigen Zugriff für diesen Standort self.user_access[user_id] = [ acc for acc in self.user_access[user_id] if acc.location_id != location_id ] # Setze anderen primary-Zugriff zurück falls nötig if is_primary: for access_item in self.user_access[user_id]: access_item.is_primary = False self.user_access[user_id].append(access) logger.info(f"Standort-Zugriff gewährt: User {user_id} → Location {location_id} ({access_level.value})") return True def revoke_location_access(self, user_id: int, location_id: int) -> bool: """Entzieht Benutzer-Zugriff auf einen Standort""" if user_id not in self.user_access: return False original_count = len(self.user_access[user_id]) self.user_access[user_id] = [ acc for acc in self.user_access[user_id] if acc.location_id != location_id ] success = len(self.user_access[user_id]) < original_count if success: logger.info(f"Standort-Zugriff entzogen: User {user_id} → Location {location_id}") return success def get_user_locations(self, user_id: int, access_level: Optional[AccessLevel] = None) -> List[Location]: """Holt alle Standorte eines Benutzers""" if user_id not in self.user_access: return [] accessible_locations = [] now = datetime.now() for access in self.user_access[user_id]: # Prüfe Ablaufzeit if access.expires_at and access.expires_at < now: continue # Prüfe Access Level if access_level and access.access_level != access_level: continue location = self.locations.get(access.location_id) if location and location.is_active: accessible_locations.append(location) return accessible_locations def get_user_primary_location(self, user_id: int) -> Optional[Location]: """Holt den primären Standort eines Benutzers""" if user_id not in self.user_access: return None for access in self.user_access[user_id]: if access.is_primary: return self.locations.get(access.location_id) # Fallback: ersten verfügbaren Standort nehmen user_locations = self.get_user_locations(user_id) return user_locations[0] if user_locations else None def check_user_access(self, user_id: int, location_id: int, required_level: AccessLevel = AccessLevel.READ_ONLY) -> bool: """Prüft ob Benutzer Zugriff auf Standort hat""" if user_id not in self.user_access: return False access_levels = { AccessLevel.NO_ACCESS: 0, AccessLevel.READ_ONLY: 1, AccessLevel.READ_WRITE: 2, AccessLevel.FULL: 3 } required_level_value = access_levels[required_level] now = datetime.now() for access in self.user_access[user_id]: if access.location_id != location_id: continue # Prüfe Ablaufzeit if access.expires_at and access.expires_at < now: continue user_level_value = access_levels[access.access_level] if user_level_value >= required_level_value: return True return False def get_location_resources(self, location_id: int) -> Dict[str, Any]: """Holt alle Ressourcen eines Standorts""" if location_id not in self.locations: return {} # Simuliere Datenbankabfrage für Drucker und Jobs resources = { 'printers': [], 'active_jobs': [], 'users': [], 'pending_maintenance': 0 } # In echter Implementierung würde hier die Datenbank abgefragt with get_db_session() as db_session: # Drucker des Standorts (vereinfacht - benötigt location_id in Printer-Model) # printers = db_session.query(Printer).filter(Printer.location_id == location_id).all() # resources['printers'] = [p.to_dict() for p in printers] pass return resources def get_location_statistics(self, location_id: int, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> Dict[str, Any]: """Holt Statistiken für einen Standort""" if not start_date: start_date = datetime.now() - timedelta(days=30) if not end_date: end_date = datetime.now() # Sammle Statistiken stats = { 'location': self.locations.get(location_id, {}).name if location_id in self.locations else 'Unbekannt', 'period': { 'start': start_date.isoformat(), 'end': end_date.isoformat() }, 'totals': { 'printers': 0, 'jobs_completed': 0, 'jobs_failed': 0, 'print_time_hours': 0, 'material_used_kg': 0, 'users_active': 0 }, 'averages': { 'jobs_per_day': 0, 'job_duration_minutes': 0, 'printer_utilization': 0 }, 'trends': { 'daily_jobs': [], 'printer_usage': [] } } # In echter Implementierung würden hier Datenbankabfragen stehen return stats def get_multi_location_report(self, location_ids: List[int] = None) -> Dict[str, Any]: """Erstellt standortübergreifenden Bericht""" if not location_ids: location_ids = list(self.locations.keys()) report = { 'generated_at': datetime.now().isoformat(), 'locations': [], 'summary': { 'total_locations': len(location_ids), 'total_printers': 0, 'total_users': 0, 'total_jobs': 0, 'cross_location_sharing': [] } } for location_id in location_ids: location = self.locations.get(location_id) if not location: continue location_stats = self.get_location_statistics(location_id) location_data = { 'id': location.id, 'name': location.name, 'code': location.code, 'type': location.location_type.value, 'statistics': location_stats } report['locations'].append(location_data) # Summiere für Gesamtübersicht totals = location_stats.get('totals', {}) report['summary']['total_printers'] += totals.get('printers', 0) report['summary']['total_users'] += totals.get('users_active', 0) report['summary']['total_jobs'] += totals.get('jobs_completed', 0) return report def find_nearest_locations(self, latitude: float, longitude: float, radius_km: float = 50, limit: int = 5) -> List[Tuple[Location, float]]: """Findet nächstgelegene Standorte""" from math import radians, sin, cos, sqrt, atan2 def calculate_distance(lat1, lon1, lat2, lon2): """Berechnet Entfernung zwischen zwei Koordinaten (Haversine)""" R = 6371 # Erdradius in km lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * atan2(sqrt(a), sqrt(1-a)) return R * c nearby_locations = [] for location in self.locations.values(): if not location.is_active or not location.latitude or not location.longitude: continue distance = calculate_distance( latitude, longitude, location.latitude, location.longitude ) if distance <= radius_km: nearby_locations.append((location, distance)) # Sortiere nach Entfernung nearby_locations.sort(key=lambda x: x[1]) return nearby_locations[:limit] def _geocode_location(self, location: Location): """Ermittelt Koordinaten für einen Standort""" try: address_parts = [location.address, location.city, location.country] full_address = ', '.join(filter(None, address_parts)) if not full_address: return # Verwende geocoder library result = geocoder.osm(full_address) if result.ok: location.latitude = result.lat location.longitude = result.lng logger.info(f"Koordinaten ermittelt für {location.name}: {location.latitude}, {location.longitude}") else: logger.warning(f"Koordinaten konnten nicht ermittelt werden für {location.name}") except Exception as e: logger.error(f"Fehler bei Geocoding für {location.name}: {str(e)}") def _has_active_resources(self, location_id: int) -> bool: """Prüft ob Standort aktive Ressourcen hat""" # Vereinfachte Implementierung # In echter Implementation würde hier die Datenbank geprüft return False def _count_location_resources(self, location_id: int) -> Dict[str, int]: """Zählt Ressourcen eines Standorts""" # Vereinfachte Implementierung return { 'printers': 0, 'users': 0, 'jobs': 0 } # Globale Instanz location_manager = MultiLocationManager() # Alias für Import-Kompatibilität LocationManager = MultiLocationManager def create_location(name: str, code: str, location_type: LocationType = LocationType.BRANCH, address: str = "", city: str = "", country: str = "", parent_id: Optional[int] = None) -> int: """ Erstellt einen neuen Standort (globale Funktion). Args: name: Name des Standorts code: Kurzer Code für den Standort location_type: Art des Standorts address: Adresse city: Stadt country: Land parent_id: Parent-Standort ID Returns: int: ID des erstellten Standorts """ location = Location( name=name, code=code, location_type=location_type, address=address, city=city, country=country, parent_id=parent_id ) return location_manager.create_location(location) def assign_user_to_location(user_id: int, location_id: int, access_level: AccessLevel = AccessLevel.READ_WRITE, granted_by: int = 1, is_primary: bool = False) -> bool: """ Weist einen Benutzer einem Standort zu. Args: user_id: ID des Benutzers location_id: ID des Standorts access_level: Zugriffslevel granted_by: ID des gewährenden Benutzers is_primary: Ob dies der primäre Standort ist Returns: bool: True wenn erfolgreich """ return location_manager.grant_location_access( user_id=user_id, location_id=location_id, access_level=access_level, granted_by=granted_by, is_primary=is_primary ) def get_user_locations(user_id: int) -> List[Location]: """ Holt alle Standorte eines Benutzers (globale Funktion). Args: user_id: ID des Benutzers Returns: List[Location]: Liste der zugänglichen Standorte """ return location_manager.get_user_locations(user_id) def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Berechnet die Entfernung zwischen zwei Koordinaten (Haversine-Formel). Args: lat1, lon1: Koordinaten des ersten Punkts lat2, lon2: Koordinaten des zweiten Punkts Returns: float: Entfernung in Kilometern """ from math import radians, sin, cos, sqrt, atan2 R = 6371 # Erdradius in km lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * atan2(sqrt(a), sqrt(1-a)) return R * c def find_nearest_location(latitude: float, longitude: float, radius_km: float = 50) -> Optional[Location]: """ Findet den nächstgelegenen Standort. Args: latitude: Breitengrad longitude: Längengrad radius_km: Suchradius in Kilometern Returns: Optional[Location]: Nächstgelegener Standort oder None """ nearest_locations = location_manager.find_nearest_locations( latitude=latitude, longitude=longitude, radius_km=radius_km, limit=1 ) return nearest_locations[0][0] if nearest_locations else None def get_location_dashboard_data(user_id: int) -> Dict[str, Any]: """Holt Dashboard-Daten für Standorte eines Benutzers""" user_locations = location_manager.get_user_locations(user_id) primary_location = location_manager.get_user_primary_location(user_id) dashboard_data = { 'user_locations': [asdict(loc) for loc in user_locations], 'primary_location': asdict(primary_location) if primary_location else None, 'location_count': len(user_locations), 'hierarchy': location_manager.get_location_hierarchy() } # Füge Statistiken für jeden Standort hinzu for location in user_locations: location_stats = location_manager.get_location_statistics(location.id) dashboard_data[f'stats_{location.id}'] = location_stats return dashboard_data def create_location_from_address(name: str, address: str, city: str, country: str, location_type: LocationType = LocationType.BRANCH) -> int: """Erstellt Standort aus Adresse mit automatischer Geocodierung""" location = Location( name=name, code=name[:3].upper(), location_type=location_type, address=address, city=city, country=country ) return location_manager.create_location(location) # JavaScript für Multi-Location Frontend def get_multi_location_javascript() -> str: """JavaScript für Multi-Location Management""" return """ class MultiLocationManager { constructor() { this.currentLocation = null; this.userLocations = []; this.locationHierarchy = {}; this.init(); } init() { this.loadUserLocations(); this.setupEventListeners(); } setupEventListeners() { // Location switcher document.addEventListener('change', (e) => { if (e.target.matches('.location-selector')) { const locationId = parseInt(e.target.value); this.switchLocation(locationId); } }); // Location management buttons document.addEventListener('click', (e) => { if (e.target.matches('.manage-locations-btn')) { this.showLocationManager(); } if (e.target.matches('.location-hierarchy-btn')) { this.showLocationHierarchy(); } }); } async loadUserLocations() { try { const response = await fetch('/api/locations/user'); const data = await response.json(); if (data.success) { this.userLocations = data.locations; this.currentLocation = data.primary_location; this.locationHierarchy = data.hierarchy; this.updateLocationSelector(); this.updateLocationDisplay(); } } catch (error) { console.error('Fehler beim Laden der Standorte:', error); } } updateLocationSelector() { const selectors = document.querySelectorAll('.location-selector'); selectors.forEach(selector => { selector.innerHTML = this.userLocations.map(location => `` ).join(''); }); } updateLocationDisplay() { const displays = document.querySelectorAll('.current-location-display'); displays.forEach(display => { if (this.currentLocation) { display.innerHTML = `
Typ: ${location.type}
Adresse: ${location.address || 'Nicht angegeben'}
Stadt: ${location.city || 'Nicht angegeben'}