manage-your-printer/utils/multi_location_system.py
2025-06-04 10:03:22 +02:00

899 lines
32 KiB
Python

"""
Multi-Standort-Unterstützungssystem für das MYP-System
======================================================
Dieses Modul stellt umfassende Multi-Location-Funktionalität bereit:
- Standort-Management und Hierarchien
- Standort-spezifische Konfigurationen
- Zentrale und dezentrale Verwaltung
- Standort-übergreifende Berichte
- Ressourcen-Sharing zwischen Standorten
- Benutzer-Standort-Zuweisungen
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import geocoder
import requests
from utils.logging_config import get_logger
from models import User, Printer, Job, get_db_session
logger = get_logger("multi_location")
class LocationType(Enum):
"""Arten von Standorten"""
HEADQUARTERS = "headquarters" # Hauptsitz
BRANCH = "branch" # Niederlassung
DEPARTMENT = "department" # Abteilung
FLOOR = "floor" # Stockwerk
ROOM = "room" # Raum
AREA = "area" # Bereich
class AccessLevel(Enum):
"""Zugriffslevel für Standorte"""
FULL = "full" # Vollzugriff
READ_WRITE = "read_write" # Lesen und Schreiben
READ_ONLY = "read_only" # Nur Lesen
NO_ACCESS = "no_access" # Kein Zugriff
@dataclass
class LocationConfig:
"""Standort-spezifische Konfiguration"""
timezone: str = "Europe/Berlin"
business_hours: Dict[str, str] = None
maintenance_window: Dict[str, str] = None
auto_approval_enabled: bool = False
max_job_duration: int = 480 # Minuten
contact_info: Dict[str, str] = None
notification_settings: Dict[str, Any] = None
@dataclass
class Location:
"""Standort-Definition"""
id: Optional[int] = None
name: str = ""
code: str = "" # Kurzer Code für den Standort
location_type: LocationType = LocationType.BRANCH
parent_id: Optional[int] = None
address: str = ""
city: str = ""
country: str = ""
postal_code: str = ""
latitude: Optional[float] = None
longitude: Optional[float] = None
description: str = ""
config: LocationConfig = None
is_active: bool = True
created_at: datetime = None
manager_id: Optional[int] = None
def __post_init__(self):
if self.config is None:
self.config = LocationConfig()
if self.created_at is None:
self.created_at = datetime.now()
@dataclass
class UserLocationAccess:
"""Benutzer-Standort-Zugriff"""
user_id: int
location_id: int
access_level: AccessLevel
granted_by: int
granted_at: datetime
expires_at: Optional[datetime] = None
is_primary: bool = False
class MultiLocationManager:
"""Manager für Multi-Standort-Funktionalität"""
def __init__(self):
self.locations: Dict[int, Location] = {}
self.user_access: Dict[int, List[UserLocationAccess]] = {}
self.next_location_id = 1
# Standard-Standort erstellen
self._create_default_location()
def _create_default_location(self):
"""Erstellt Standard-Standort falls keiner existiert"""
default_location = Location(
id=1,
name="Hauptstandort",
code="HQ",
location_type=LocationType.HEADQUARTERS,
address="Mercedes-Benz Platz",
city="Stuttgart",
country="Deutschland",
description="Hauptstandort des MYP-Systems"
)
self.locations[1] = default_location
self.next_location_id = 2
logger.info("Standard-Standort erstellt")
def create_location(self, location: Location) -> int:
"""Erstellt einen neuen Standort"""
location.id = self.next_location_id
self.next_location_id += 1
# Koordinaten automatisch ermitteln
if not location.latitude or not location.longitude:
self._geocode_location(location)
self.locations[location.id] = location
logger.info(f"Standort erstellt: {location.name} ({location.code})")
return location.id
def update_location(self, location_id: int, updates: Dict[str, Any]) -> bool:
"""Aktualisiert einen Standort"""
if location_id not in self.locations:
return False
location = self.locations[location_id]
for key, value in updates.items():
if hasattr(location, key):
setattr(location, key, value)
# Koordinaten neu ermitteln bei Adressänderung
if 'address' in updates or 'city' in updates:
self._geocode_location(location)
logger.info(f"Standort aktualisiert: {location.name}")
return True
def delete_location(self, location_id: int) -> bool:
"""Löscht einen Standort (Soft Delete)"""
if location_id not in self.locations:
return False
location = self.locations[location_id]
# Prüfe ob Standort Kinder hat
children = self.get_child_locations(location_id)
if children:
logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat Unterstandorte")
return False
# Prüfe auf aktive Ressourcen
if self._has_active_resources(location_id):
logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat aktive Ressourcen")
return False
location.is_active = False
logger.info(f"Standort deaktiviert: {location.name}")
return True
def get_location_hierarchy(self, location_id: Optional[int] = None) -> Dict[str, Any]:
"""Holt Standort-Hierarchie"""
if location_id:
# Spezifische Hierarchie ab einem Standort
location = self.locations.get(location_id)
if not location:
return {}
return self._build_hierarchy_node(location)
else:
# Komplette Hierarchie
root_locations = [loc for loc in self.locations.values()
if loc.parent_id is None and loc.is_active]
return {
'locations': [self._build_hierarchy_node(loc) for loc in root_locations]
}
def _build_hierarchy_node(self, location: Location) -> Dict[str, Any]:
"""Erstellt einen Hierarchie-Knoten"""
children = self.get_child_locations(location.id)
return {
'id': location.id,
'name': location.name,
'code': location.code,
'type': location.location_type.value,
'children': [self._build_hierarchy_node(child) for child in children],
'resource_count': self._count_location_resources(location.id)
}
def get_child_locations(self, parent_id: int) -> List[Location]:
"""Holt alle Kinder-Standorte"""
return [loc for loc in self.locations.values()
if loc.parent_id == parent_id and loc.is_active]
def get_location_path(self, location_id: int) -> List[Location]:
"""Holt den Pfad vom Root zum Standort"""
path = []
current_id = location_id
while current_id:
location = self.locations.get(current_id)
if not location:
break
path.insert(0, location)
current_id = location.parent_id
return path
def grant_location_access(self, user_id: int, location_id: int,
access_level: AccessLevel, granted_by: int,
expires_at: Optional[datetime] = None,
is_primary: bool = False) -> bool:
"""Gewährt Benutzer-Zugriff auf einen Standort"""
if location_id not in self.locations:
return False
access = UserLocationAccess(
user_id=user_id,
location_id=location_id,
access_level=access_level,
granted_by=granted_by,
granted_at=datetime.now(),
expires_at=expires_at,
is_primary=is_primary
)
if user_id not in self.user_access:
self.user_access[user_id] = []
# Entferne vorherigen Zugriff für diesen Standort
self.user_access[user_id] = [
acc for acc in self.user_access[user_id]
if acc.location_id != location_id
]
# Setze anderen primary-Zugriff zurück falls nötig
if is_primary:
for access_item in self.user_access[user_id]:
access_item.is_primary = False
self.user_access[user_id].append(access)
logger.info(f"Standort-Zugriff gewährt: User {user_id} → Location {location_id} ({access_level.value})")
return True
def revoke_location_access(self, user_id: int, location_id: int) -> bool:
"""Entzieht Benutzer-Zugriff auf einen Standort"""
if user_id not in self.user_access:
return False
original_count = len(self.user_access[user_id])
self.user_access[user_id] = [
acc for acc in self.user_access[user_id]
if acc.location_id != location_id
]
success = len(self.user_access[user_id]) < original_count
if success:
logger.info(f"Standort-Zugriff entzogen: User {user_id} → Location {location_id}")
return success
def get_user_locations(self, user_id: int, access_level: Optional[AccessLevel] = None) -> List[Location]:
"""Holt alle Standorte eines Benutzers"""
if user_id not in self.user_access:
return []
accessible_locations = []
now = datetime.now()
for access in self.user_access[user_id]:
# Prüfe Ablaufzeit
if access.expires_at and access.expires_at < now:
continue
# Prüfe Access Level
if access_level and access.access_level != access_level:
continue
location = self.locations.get(access.location_id)
if location and location.is_active:
accessible_locations.append(location)
return accessible_locations
def get_user_primary_location(self, user_id: int) -> Optional[Location]:
"""Holt den primären Standort eines Benutzers"""
if user_id not in self.user_access:
return None
for access in self.user_access[user_id]:
if access.is_primary:
return self.locations.get(access.location_id)
# Fallback: ersten verfügbaren Standort nehmen
user_locations = self.get_user_locations(user_id)
return user_locations[0] if user_locations else None
def check_user_access(self, user_id: int, location_id: int,
required_level: AccessLevel = AccessLevel.READ_ONLY) -> bool:
"""Prüft ob Benutzer Zugriff auf Standort hat"""
if user_id not in self.user_access:
return False
access_levels = {
AccessLevel.NO_ACCESS: 0,
AccessLevel.READ_ONLY: 1,
AccessLevel.READ_WRITE: 2,
AccessLevel.FULL: 3
}
required_level_value = access_levels[required_level]
now = datetime.now()
for access in self.user_access[user_id]:
if access.location_id != location_id:
continue
# Prüfe Ablaufzeit
if access.expires_at and access.expires_at < now:
continue
user_level_value = access_levels[access.access_level]
if user_level_value >= required_level_value:
return True
return False
def get_location_resources(self, location_id: int) -> Dict[str, Any]:
"""Holt alle Ressourcen eines Standorts"""
if location_id not in self.locations:
return {}
# Simuliere Datenbankabfrage für Drucker und Jobs
resources = {
'printers': [],
'active_jobs': [],
'users': [],
'pending_maintenance': 0
}
# In echter Implementierung würde hier die Datenbank abgefragt
with get_db_session() as db_session:
# Drucker des Standorts (vereinfacht - benötigt location_id in Printer-Model)
# printers = db_session.query(Printer).filter(Printer.location_id == location_id).all()
# resources['printers'] = [p.to_dict() for p in printers]
pass
return resources
def get_location_statistics(self, location_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> Dict[str, Any]:
"""Holt Statistiken für einen Standort"""
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Sammle Statistiken
stats = {
'location': self.locations.get(location_id, {}).name if location_id in self.locations else 'Unbekannt',
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'totals': {
'printers': 0,
'jobs_completed': 0,
'jobs_failed': 0,
'print_time_hours': 0,
'material_used_kg': 0,
'users_active': 0
},
'averages': {
'jobs_per_day': 0,
'job_duration_minutes': 0,
'printer_utilization': 0
},
'trends': {
'daily_jobs': [],
'printer_usage': []
}
}
# In echter Implementierung würden hier Datenbankabfragen stehen
return stats
def get_multi_location_report(self, location_ids: List[int] = None) -> Dict[str, Any]:
"""Erstellt standortübergreifenden Bericht"""
if not location_ids:
location_ids = list(self.locations.keys())
report = {
'generated_at': datetime.now().isoformat(),
'locations': [],
'summary': {
'total_locations': len(location_ids),
'total_printers': 0,
'total_users': 0,
'total_jobs': 0,
'cross_location_sharing': []
}
}
for location_id in location_ids:
location = self.locations.get(location_id)
if not location:
continue
location_stats = self.get_location_statistics(location_id)
location_data = {
'id': location.id,
'name': location.name,
'code': location.code,
'type': location.location_type.value,
'statistics': location_stats
}
report['locations'].append(location_data)
# Summiere für Gesamtübersicht
totals = location_stats.get('totals', {})
report['summary']['total_printers'] += totals.get('printers', 0)
report['summary']['total_users'] += totals.get('users_active', 0)
report['summary']['total_jobs'] += totals.get('jobs_completed', 0)
return report
def find_nearest_locations(self, latitude: float, longitude: float,
radius_km: float = 50, limit: int = 5) -> List[Tuple[Location, float]]:
"""Findet nächstgelegene Standorte"""
from math import radians, sin, cos, sqrt, atan2
def calculate_distance(lat1, lon1, lat2, lon2):
"""Berechnet Entfernung zwischen zwei Koordinaten (Haversine)"""
R = 6371 # Erdradius in km
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
nearby_locations = []
for location in self.locations.values():
if not location.is_active or not location.latitude or not location.longitude:
continue
distance = calculate_distance(
latitude, longitude,
location.latitude, location.longitude
)
if distance <= radius_km:
nearby_locations.append((location, distance))
# Sortiere nach Entfernung
nearby_locations.sort(key=lambda x: x[1])
return nearby_locations[:limit]
def _geocode_location(self, location: Location):
"""Ermittelt Koordinaten für einen Standort"""
try:
address_parts = [location.address, location.city, location.country]
full_address = ', '.join(filter(None, address_parts))
if not full_address:
return
# Verwende geocoder library
result = geocoder.osm(full_address)
if result.ok:
location.latitude = result.lat
location.longitude = result.lng
logger.info(f"Koordinaten ermittelt für {location.name}: {location.latitude}, {location.longitude}")
else:
logger.warning(f"Koordinaten konnten nicht ermittelt werden für {location.name}")
except Exception as e:
logger.error(f"Fehler bei Geocoding für {location.name}: {str(e)}")
def _has_active_resources(self, location_id: int) -> bool:
"""Prüft ob Standort aktive Ressourcen hat"""
# Vereinfachte Implementierung
# In echter Implementation würde hier die Datenbank geprüft
return False
def _count_location_resources(self, location_id: int) -> Dict[str, int]:
"""Zählt Ressourcen eines Standorts"""
# Vereinfachte Implementierung
return {
'printers': 0,
'users': 0,
'jobs': 0
}
# Globale Instanz
location_manager = MultiLocationManager()
# Alias für Import-Kompatibilität
LocationManager = MultiLocationManager
def create_location(name: str, code: str, location_type: LocationType = LocationType.BRANCH,
address: str = "", city: str = "", country: str = "",
parent_id: Optional[int] = None) -> int:
"""
Erstellt einen neuen Standort (globale Funktion).
Args:
name: Name des Standorts
code: Kurzer Code für den Standort
location_type: Art des Standorts
address: Adresse
city: Stadt
country: Land
parent_id: Parent-Standort ID
Returns:
int: ID des erstellten Standorts
"""
location = Location(
name=name,
code=code,
location_type=location_type,
address=address,
city=city,
country=country,
parent_id=parent_id
)
return location_manager.create_location(location)
def assign_user_to_location(user_id: int, location_id: int,
access_level: AccessLevel = AccessLevel.READ_WRITE,
granted_by: int = 1, is_primary: bool = False) -> bool:
"""
Weist einen Benutzer einem Standort zu.
Args:
user_id: ID des Benutzers
location_id: ID des Standorts
access_level: Zugriffslevel
granted_by: ID des gewährenden Benutzers
is_primary: Ob dies der primäre Standort ist
Returns:
bool: True wenn erfolgreich
"""
return location_manager.grant_location_access(
user_id=user_id,
location_id=location_id,
access_level=access_level,
granted_by=granted_by,
is_primary=is_primary
)
def get_user_locations(user_id: int) -> List[Location]:
"""
Holt alle Standorte eines Benutzers (globale Funktion).
Args:
user_id: ID des Benutzers
Returns:
List[Location]: Liste der zugänglichen Standorte
"""
return location_manager.get_user_locations(user_id)
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Berechnet die Entfernung zwischen zwei Koordinaten (Haversine-Formel).
Args:
lat1, lon1: Koordinaten des ersten Punkts
lat2, lon2: Koordinaten des zweiten Punkts
Returns:
float: Entfernung in Kilometern
"""
from math import radians, sin, cos, sqrt, atan2
R = 6371 # Erdradius in km
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def find_nearest_location(latitude: float, longitude: float,
radius_km: float = 50) -> Optional[Location]:
"""
Findet den nächstgelegenen Standort.
Args:
latitude: Breitengrad
longitude: Längengrad
radius_km: Suchradius in Kilometern
Returns:
Optional[Location]: Nächstgelegener Standort oder None
"""
nearest_locations = location_manager.find_nearest_locations(
latitude=latitude,
longitude=longitude,
radius_km=radius_km,
limit=1
)
return nearest_locations[0][0] if nearest_locations else None
def get_location_dashboard_data(user_id: int) -> Dict[str, Any]:
"""Holt Dashboard-Daten für Standorte eines Benutzers"""
user_locations = location_manager.get_user_locations(user_id)
primary_location = location_manager.get_user_primary_location(user_id)
dashboard_data = {
'user_locations': [asdict(loc) for loc in user_locations],
'primary_location': asdict(primary_location) if primary_location else None,
'location_count': len(user_locations),
'hierarchy': location_manager.get_location_hierarchy()
}
# Füge Statistiken für jeden Standort hinzu
for location in user_locations:
location_stats = location_manager.get_location_statistics(location.id)
dashboard_data[f'stats_{location.id}'] = location_stats
return dashboard_data
def create_location_from_address(name: str, address: str, city: str,
country: str, location_type: LocationType = LocationType.BRANCH) -> int:
"""Erstellt Standort aus Adresse mit automatischer Geocodierung"""
location = Location(
name=name,
code=name[:3].upper(),
location_type=location_type,
address=address,
city=city,
country=country
)
return location_manager.create_location(location)
# JavaScript für Multi-Location Frontend
def get_multi_location_javascript() -> str:
"""JavaScript für Multi-Location Management"""
return """
class MultiLocationManager {
constructor() {
this.currentLocation = null;
this.userLocations = [];
this.locationHierarchy = {};
this.init();
}
init() {
this.loadUserLocations();
this.setupEventListeners();
}
setupEventListeners() {
// Location switcher
document.addEventListener('change', (e) => {
if (e.target.matches('.location-selector')) {
const locationId = parseInt(e.target.value);
this.switchLocation(locationId);
}
});
// Location management buttons
document.addEventListener('click', (e) => {
if (e.target.matches('.manage-locations-btn')) {
this.showLocationManager();
}
if (e.target.matches('.location-hierarchy-btn')) {
this.showLocationHierarchy();
}
});
}
async loadUserLocations() {
try {
const response = await fetch('/api/locations/user');
const data = await response.json();
if (data.success) {
this.userLocations = data.locations;
this.currentLocation = data.primary_location;
this.locationHierarchy = data.hierarchy;
this.updateLocationSelector();
this.updateLocationDisplay();
}
} catch (error) {
console.error('Fehler beim Laden der Standorte:', error);
}
}
updateLocationSelector() {
const selectors = document.querySelectorAll('.location-selector');
selectors.forEach(selector => {
selector.innerHTML = this.userLocations.map(location =>
`<option value="${location.id}" ${location.id === this.currentLocation?.id ? 'selected' : ''}>
${location.name} (${location.code})
</option>`
).join('');
});
}
updateLocationDisplay() {
const displays = document.querySelectorAll('.current-location-display');
displays.forEach(display => {
if (this.currentLocation) {
display.innerHTML = `
<div class="location-info">
<strong>${this.currentLocation.name}</strong>
<span class="location-type">${this.currentLocation.type}</span>
${this.currentLocation.city ? `<span class="location-city">${this.currentLocation.city}</span>` : ''}
</div>
`;
} else {
display.innerHTML = '<span class="no-location">Kein Standort ausgewählt</span>';
}
});
}
async switchLocation(locationId) {
try {
const response = await fetch('/api/locations/switch', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ location_id: locationId })
});
const result = await response.json();
if (result.success) {
this.currentLocation = this.userLocations.find(loc => loc.id === locationId);
this.updateLocationDisplay();
// Seite neu laden um location-spezifische Daten zu aktualisieren
window.location.reload();
} else {
this.showNotification('Fehler beim Wechseln des Standorts', 'error');
}
} catch (error) {
console.error('Standort-Wechsel fehlgeschlagen:', error);
}
}
showLocationManager() {
const modal = document.createElement('div');
modal.className = 'location-manager-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>Standort-Verwaltung</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="location-list">
${this.renderLocationList()}
</div>
<div class="location-actions">
<button class="btn-create-location">Neuen Standort erstellen</button>
</div>
</div>
</div>
`;
document.body.appendChild(modal);
// Event handlers
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderLocationList() {
return this.userLocations.map(location => `
<div class="location-item">
<div class="location-details">
<h4>${location.name} (${location.code})</h4>
<p><strong>Typ:</strong> ${location.type}</p>
<p><strong>Adresse:</strong> ${location.address || 'Nicht angegeben'}</p>
<p><strong>Stadt:</strong> ${location.city || 'Nicht angegeben'}</p>
</div>
<div class="location-actions">
<button class="btn-edit-location" data-location-id="${location.id}">Bearbeiten</button>
<button class="btn-view-stats" data-location-id="${location.id}">Statistiken</button>
</div>
</div>
`).join('');
}
showLocationHierarchy() {
const modal = document.createElement('div');
modal.className = 'hierarchy-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>Standort-Hierarchie</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="hierarchy-tree">
${this.renderHierarchyTree(this.locationHierarchy.locations || [])}
</div>
</div>
</div>
`;
document.body.appendChild(modal);
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderHierarchyTree(locations, level = 0) {
return locations.map(location => `
<div class="hierarchy-node" style="margin-left: ${level * 20}px;">
<div class="node-content">
<span class="node-icon">${this.getLocationTypeIcon(location.type)}</span>
<span class="node-name">${location.name}</span>
<span class="node-code">(${location.code})</span>
<span class="resource-count">${location.resource_count.printers || 0} Drucker</span>
</div>
${location.children && location.children.length > 0 ?
this.renderHierarchyTree(location.children, level + 1) : ''}
</div>
`).join('');
}
getLocationTypeIcon(type) {
const icons = {
'headquarters': '🏢',
'branch': '🏪',
'department': '🏬',
'floor': '🏢',
'room': '🚪',
'area': '📍'
};
return icons[type] || '📍';
}
showNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.className = `notification notification-${type}`;
notification.textContent = message;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 3000);
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', function() {
window.multiLocationManager = new MultiLocationManager();
});
"""