🎉 Refactor & Update Backend Files, Documentation 📚

This commit is contained in:
2025-06-01 00:05:09 +02:00
parent 193164964e
commit b2bdc2d123
12 changed files with 4178 additions and 1868 deletions

View File

@@ -21,7 +21,7 @@ from flask import request, jsonify, current_app
from flask_login import current_user
from utils.logging_config import get_logger
from models import Job, Printer, get_db_session
from models import Job, Printer, JobOrder, get_db_session
from utils.file_manager import save_job_file, save_temp_file
from config.settings import ALLOWED_EXTENSIONS, MAX_FILE_SIZE, UPLOAD_FOLDER
@@ -119,34 +119,282 @@ class DragDropManager:
def update_job_order(self, printer_id: int, job_ids: List[int]) -> bool:
"""Aktualisiert die Job-Reihenfolge für einen Drucker"""
try:
with get_db_session() as db_session:
# Validiere dass alle Jobs existieren und zum Drucker gehören
jobs = db_session.query(Job).filter(
Job.id.in_(job_ids),
Job.printer_id == printer_id,
Job.status.in_(['scheduled', 'paused'])
).all()
if len(jobs) != len(job_ids):
logger.warning(f"Nicht alle Jobs gefunden oder gehören zu Drucker {printer_id}")
return False
# Aktuelle Benutzer-ID für Audit-Trail
user_id = current_user.id if current_user.is_authenticated else None
# Validierung der Eingaben
if not isinstance(printer_id, int) or printer_id <= 0:
logger.error(f"Ungültige Drucker-ID: {printer_id}")
return False
if not isinstance(job_ids, list) or not job_ids:
logger.error(f"Ungültige Job-IDs Liste: {job_ids}")
return False
# Duplikate entfernen und Reihenfolge beibehalten
unique_job_ids = []
seen = set()
for job_id in job_ids:
if job_id not in seen:
unique_job_ids.append(job_id)
seen.add(job_id)
if len(unique_job_ids) != len(job_ids):
logger.warning(f"Duplikate in Job-IDs entfernt: {job_ids} -> {unique_job_ids}")
job_ids = unique_job_ids
# Datenbank-Implementierung mit JobOrder-Tabelle
success = JobOrder.update_printer_order(
printer_id=printer_id,
job_ids=job_ids,
modified_by_user_id=user_id
)
if success:
# Cache aktualisieren
self.job_order_cache[printer_id] = job_ids
# Optional: In Datenbank speichern (erweiterte Implementierung)
# Hier könnte man ein separates Job-Order-Table verwenden
logger.info(f"Job-Reihenfolge für Drucker {printer_id} erfolgreich aktualisiert: {job_ids}")
logger.info(f"Aktualisiert von Benutzer: {user_id}")
# Optional: Bereinigung ungültiger Einträge im Hintergrund
self._schedule_cleanup()
logger.info(f"Job-Reihenfolge für Drucker {printer_id} aktualisiert: {job_ids}")
return True
else:
logger.error(f"Fehler beim Speichern der Job-Reihenfolge in der Datenbank")
return False
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der Job-Reihenfolge: {str(e)}")
logger.error(f"Unerwarteter Fehler beim Aktualisieren der Job-Reihenfolge: {str(e)}")
return False
def get_job_order(self, printer_id: int) -> List[int]:
"""Holt die aktuelle Job-Reihenfolge für einen Drucker"""
return self.job_order_cache.get(printer_id, [])
try:
# Erst aus Cache versuchen
if printer_id in self.job_order_cache:
cached_order = self.job_order_cache[printer_id]
logger.debug(f"Job-Reihenfolge aus Cache für Drucker {printer_id}: {cached_order}")
return cached_order
# Aus Datenbank laden
job_ids = JobOrder.get_ordered_job_ids(printer_id)
# Cache aktualisieren
self.job_order_cache[printer_id] = job_ids
logger.debug(f"Job-Reihenfolge aus Datenbank geladen für Drucker {printer_id}: {job_ids}")
return job_ids
except Exception as e:
logger.error(f"Fehler beim Laden der Job-Reihenfolge für Drucker {printer_id}: {str(e)}")
return []
def get_ordered_jobs_for_printer(self, printer_id: int) -> List[Job]:
"""
Holt die Jobs für einen Drucker in der korrekten Reihenfolge.
Args:
printer_id: ID des Druckers
Returns:
List[Job]: Jobs sortiert nach der benutzerdefinierten Reihenfolge
"""
try:
# Job-IDs in der korrekten Reihenfolge holen
ordered_job_ids = self.get_job_order(printer_id)
if not ordered_job_ids:
# Fallback: Jobs nach Standard-Kriterien sortieren
with get_db_session() as db_session:
jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(['scheduled', 'paused'])
).order_by(Job.created_at).all()
return jobs
# Jobs in der definierten Reihenfolge laden
with get_db_session() as db_session:
# Alle relevanten Jobs laden
all_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(['scheduled', 'paused'])
).all()
# Dictionary für schnelle Zugriffe
jobs_dict = {job.id: job for job in all_jobs}
# Jobs in der korrekten Reihenfolge zusammenstellen
ordered_jobs = []
for job_id in ordered_job_ids:
if job_id in jobs_dict:
ordered_jobs.append(jobs_dict[job_id])
# Jobs hinzufügen, die nicht in der Reihenfolge sind (neue Jobs)
ordered_job_ids_set = set(ordered_job_ids)
unordered_jobs = [job for job in all_jobs if job.id not in ordered_job_ids_set]
if unordered_jobs:
# Neue Jobs nach Erstellungsdatum sortieren und anhängen
unordered_jobs.sort(key=lambda x: x.created_at)
ordered_jobs.extend(unordered_jobs)
# Reihenfolge automatisch aktualisieren für neue Jobs
new_order = [job.id for job in ordered_jobs]
self.update_job_order(printer_id, new_order)
logger.debug(f"Jobs für Drucker {printer_id} in Reihenfolge geladen: {len(ordered_jobs)} Jobs")
return ordered_jobs
except Exception as e:
logger.error(f"Fehler beim Laden der sortierten Jobs für Drucker {printer_id}: {str(e)}")
# Fallback: Unsortierte Jobs zurückgeben
try:
with get_db_session() as db_session:
jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(['scheduled', 'paused'])
).order_by(Job.created_at).all()
return jobs
except Exception as fallback_error:
logger.error(f"Auch Fallback fehlgeschlagen: {str(fallback_error)}")
return []
def remove_job_from_order(self, job_id: int) -> bool:
"""
Entfernt einen Job aus allen Drucker-Reihenfolgen.
Args:
job_id: ID des zu entfernenden Jobs
Returns:
bool: True wenn erfolgreich
"""
try:
# Aus Datenbank entfernen
JobOrder.remove_job_from_orders(job_id)
# Cache aktualisieren: Job aus allen Caches entfernen
for printer_id in list(self.job_order_cache.keys()):
if job_id in self.job_order_cache[printer_id]:
self.job_order_cache[printer_id].remove(job_id)
logger.debug(f"Job {job_id} aus Cache für Drucker {printer_id} entfernt")
logger.info(f"Job {job_id} erfolgreich aus allen Reihenfolgen entfernt")
return True
except Exception as e:
logger.error(f"Fehler beim Entfernen des Jobs {job_id} aus Reihenfolgen: {str(e)}")
return False
def cleanup_invalid_orders(self):
"""Bereinigt ungültige Job-Reihenfolgen"""
try:
# Datenbank-Bereinigung
JobOrder.cleanup_invalid_orders()
# Cache komplett leeren (wird bei Bedarf neu geladen)
self.job_order_cache.clear()
logger.info("Job-Reihenfolgen bereinigt")
except Exception as e:
logger.error(f"Fehler bei der Bereinigung der Job-Reihenfolgen: {str(e)}")
def _schedule_cleanup(self):
"""Plant eine Bereinigung für später (non-blocking)"""
try:
# In produktiver Umgebung könnte hier ein Background-Task gestartet werden
# Für jetzt führen wir eine schnelle Bereinigung durch
import threading
def cleanup_worker():
try:
self.cleanup_invalid_orders()
except Exception as e:
logger.error(f"Hintergrund-Bereinigung fehlgeschlagen: {str(e)}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
except Exception as e:
logger.debug(f"Konnte Hintergrund-Bereinigung nicht starten: {str(e)}")
def get_printer_summary(self, printer_id: int) -> Dict[str, Any]:
"""
Erstellt eine Zusammenfassung der Job-Reihenfolge für einen Drucker.
Args:
printer_id: ID des Druckers
Returns:
Dict: Zusammenfassung mit Jobs, Reihenfolge, Statistiken
"""
try:
ordered_jobs = self.get_ordered_jobs_for_printer(printer_id)
# Statistiken berechnen
total_duration = sum(job.duration_minutes for job in ordered_jobs)
total_jobs = len(ordered_jobs)
# Nächster Job
next_job = ordered_jobs[0] if ordered_jobs else None
# Job-Details für die Ausgabe
job_details = []
for position, job in enumerate(ordered_jobs):
job_details.append({
'position': position,
'job_id': job.id,
'name': job.name,
'duration_minutes': job.duration_minutes,
'user_name': job.user.name if job.user else 'Unbekannt',
'created_at': job.created_at.isoformat() if job.created_at else None,
'status': job.status
})
return {
'printer_id': printer_id,
'total_jobs': total_jobs,
'total_duration_minutes': total_duration,
'estimated_completion': self._calculate_completion_time(ordered_jobs),
'next_job': {
'id': next_job.id,
'name': next_job.name,
'user': next_job.user.name if next_job and next_job.user else None
} if next_job else None,
'jobs': job_details,
'last_updated': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Fehler beim Erstellen der Drucker-Zusammenfassung für {printer_id}: {str(e)}")
return {
'printer_id': printer_id,
'total_jobs': 0,
'total_duration_minutes': 0,
'error': str(e)
}
def _calculate_completion_time(self, jobs: List[Job]) -> Optional[str]:
"""Berechnet die geschätzte Fertigstellungszeit"""
try:
if not jobs:
return None
total_minutes = sum(job.duration_minutes for job in jobs)
completion_time = datetime.now()
completion_time = completion_time.replace(
minute=(completion_time.minute + total_minutes) % 60,
hour=(completion_time.hour + (completion_time.minute + total_minutes) // 60) % 24
)
return completion_time.isoformat()
except Exception:
return None
# Globale Instanz
drag_drop_manager = DragDropManager()

View File

@@ -0,0 +1,175 @@
"""
Offline-kompatible E-Mail-Benachrichtigung für MYP-System
========================================================
Da das System im Produktionsbetrieb offline läuft, werden alle E-Mail-Benachrichtigungen
nur geloggt aber nicht tatsächlich versendet.
"""
import logging
from datetime import datetime
from typing import Optional, Dict, Any
from utils.logging_config import get_logger
logger = get_logger("email_notification")
class OfflineEmailNotification:
"""
Offline-E-Mail-Benachrichtigung die nur Logs erstellt.
Simuliert E-Mail-Versand für Offline-Betrieb.
"""
def __init__(self):
self.enabled = False # Immer deaktiviert im Offline-Modus
logger.info("📧 Offline-E-Mail-Benachrichtigung initialisiert (kein echter E-Mail-Versand)")
def send_email(self, to: str, subject: str, body: str, **kwargs) -> bool:
"""
Simuliert E-Mail-Versand durch Logging.
Args:
to: E-Mail-Empfänger
subject: E-Mail-Betreff
body: E-Mail-Inhalt
**kwargs: Zusätzliche Parameter
Returns:
bool: Immer True (Simulation erfolgreich)
"""
logger.info(f"📧 [OFFLINE-SIMULATION] E-Mail würde versendet werden:")
logger.info(f" 📮 An: {to}")
logger.info(f" 📋 Betreff: {subject}")
logger.info(f" 📝 Inhalt: {body[:100]}{'...' if len(body) > 100 else ''}")
logger.info(f" 🕒 Zeitpunkt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}")
if kwargs:
logger.info(f" ⚙️ Zusätzliche Parameter: {kwargs}")
return True
def send_notification_email(self, recipient: str, notification_type: str,
data: Dict[str, Any]) -> bool:
"""
Sendet Benachrichtigungs-E-Mail (Offline-Simulation).
Args:
recipient: E-Mail-Empfänger
notification_type: Art der Benachrichtigung
data: Daten für die Benachrichtigung
Returns:
bool: Immer True (Simulation erfolgreich)
"""
subject = f"MYP-Benachrichtigung: {notification_type}"
body = f"Benachrichtigung vom MYP-System:\n\n{data}"
return self.send_email(recipient, subject, body, notification_type=notification_type)
def send_maintenance_notification(self, recipient: str, task_title: str,
task_description: str) -> bool:
"""
Sendet Wartungs-Benachrichtigung (Offline-Simulation).
Args:
recipient: E-Mail-Empfänger
task_title: Titel der Wartungsaufgabe
task_description: Beschreibung der Wartungsaufgabe
Returns:
bool: Immer True (Simulation erfolgreich)
"""
subject = f"MYP-Wartungsaufgabe: {task_title}"
body = f"""
Neue Wartungsaufgabe im MYP-System:
Titel: {task_title}
Beschreibung: {task_description}
Erstellt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}
Bitte loggen Sie sich in das MYP-System ein, um weitere Details zu sehen.
"""
return self.send_email(recipient, subject, body, task_type="maintenance")
# Globale Instanz für einfache Verwendung
email_notifier = OfflineEmailNotification()
def send_email_notification(recipient: str, subject: str, body: str, **kwargs) -> bool:
"""
Haupt-Funktion für E-Mail-Versand (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
subject: E-Mail-Betreff
body: E-Mail-Inhalt
**kwargs: Zusätzliche Parameter
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
return email_notifier.send_email(recipient, subject, body, **kwargs)
def send_maintenance_email(recipient: str, task_title: str, task_description: str) -> bool:
"""
Sendet Wartungs-E-Mail (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
task_title: Titel der Wartungsaufgabe
task_description: Beschreibung der Wartungsaufgabe
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
return email_notifier.send_maintenance_notification(recipient, task_title, task_description)
def send_guest_approval_email(recipient: str, otp_code: str, expires_at: str) -> bool:
"""
Sendet Gastauftrags-Genehmigung-E-Mail (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
otp_code: OTP-Code für den Gastauftrag
expires_at: Ablaufzeit des OTP-Codes
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
subject = "MYP-Gastauftrag genehmigt"
body = f"""
Ihr Gastauftrag wurde genehmigt!
OTP-Code: {otp_code}
Gültig bis: {expires_at}
Bitte verwenden Sie diesen Code am MYP-Terminal, um Ihren Druckauftrag zu starten.
"""
return email_notifier.send_email(recipient, subject, body,
otp_code=otp_code, expires_at=expires_at)
def send_guest_rejection_email(recipient: str, reason: str) -> bool:
"""
Sendet Gastauftrags-Ablehnungs-E-Mail (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
reason: Grund für die Ablehnung
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
subject = "MYP-Gastauftrag abgelehnt"
body = f"""
Ihr Gastauftrag wurde leider abgelehnt.
Grund: {reason}
Bei Fragen wenden Sie sich bitte an das MYP-Team.
"""
return email_notifier.send_email(recipient, subject, body, rejection_reason=reason)
# Für Backward-Kompatibilität
send_notification = send_email_notification

View File

@@ -0,0 +1,688 @@
"""
Wartungsplanungs- und Tracking-System für das MYP-System
========================================================
Dieses Modul stellt umfassende Wartungsfunktionalität bereit:
- Geplante und ungeplante Wartungen
- Wartungsintervalle und Erinnerungen
- Wartungshistorie und Berichte
- Automatische Wartungsprüfungen
- Ersatzteil-Management
- Techniker-Zuweisungen
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import schedule
import time
from utils.logging_config import get_logger
from models import Printer, get_db_session
from utils.email_notification import send_email_notification
from utils.realtime_dashboard import emit_system_alert
logger = get_logger("maintenance")
class MaintenanceType(Enum):
"""Arten von Wartungen"""
PREVENTIVE = "preventive" # Vorbeugende Wartung
CORRECTIVE = "corrective" # Reparatur/Korrektur
EMERGENCY = "emergency" # Notfall-Wartung
SCHEDULED = "scheduled" # Geplante Wartung
INSPECTION = "inspection" # Inspektion
class MaintenanceStatus(Enum):
"""Status einer Wartung"""
PLANNED = "planned" # Geplant
SCHEDULED = "scheduled" # Terminiert
IN_PROGRESS = "in_progress" # In Bearbeitung
COMPLETED = "completed" # Abgeschlossen
CANCELLED = "cancelled" # Abgebrochen
OVERDUE = "overdue" # Überfällig
class MaintenancePriority(Enum):
"""Priorität einer Wartung"""
LOW = "low" # Niedrig
NORMAL = "normal" # Normal
HIGH = "high" # Hoch
CRITICAL = "critical" # Kritisch
EMERGENCY = "emergency" # Notfall
@dataclass
class MaintenanceTask:
"""Wartungsaufgabe"""
id: Optional[int] = None
printer_id: int = None
title: str = ""
description: str = ""
maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE
priority: MaintenancePriority = MaintenancePriority.NORMAL
status: MaintenanceStatus = MaintenanceStatus.PLANNED
scheduled_date: Optional[datetime] = None
due_date: Optional[datetime] = None
estimated_duration: int = 60 # Minuten
actual_duration: Optional[int] = None
assigned_technician: Optional[str] = None
created_at: datetime = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: str = ""
required_parts: List[str] = None
actual_parts_used: List[str] = None
cost: Optional[float] = None
checklist: List[Dict[str, Any]] = None
photos: List[str] = None
created_by: Optional[int] = None
@dataclass
class MaintenanceSchedule:
"""Wartungsplan"""
printer_id: int
maintenance_type: MaintenanceType
interval_days: int
next_due: datetime
last_completed: Optional[datetime] = None
is_active: bool = True
description: str = ""
checklist_template: List[str] = None
@dataclass
class MaintenanceMetrics:
"""Wartungsmetriken"""
total_tasks: int = 0
completed_tasks: int = 0
overdue_tasks: int = 0
average_completion_time: float = 0.0
total_cost: float = 0.0
mtbf: float = 0.0 # Mean Time Between Failures
mttr: float = 0.0 # Mean Time To Repair
uptime_percentage: float = 0.0
class MaintenanceManager:
"""Manager für Wartungsplanung und -tracking"""
def __init__(self):
self.tasks: Dict[int, MaintenanceTask] = {}
self.schedules: Dict[int, List[MaintenanceSchedule]] = {}
self.maintenance_history: List[MaintenanceTask] = []
self.next_task_id = 1
self.is_running = False
self._setup_scheduler()
def _setup_scheduler(self):
"""Richtet automatische Wartungsplanung ein"""
schedule.every().day.at("06:00").do(self._check_scheduled_maintenance)
schedule.every().hour.do(self._check_overdue_tasks)
schedule.every().monday.at("08:00").do(self._generate_weekly_report)
# Scheduler in separatem Thread
def run_scheduler():
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
self.is_running = True
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("Wartungs-Scheduler gestartet")
def create_task(self, task: MaintenanceTask) -> int:
"""Erstellt eine neue Wartungsaufgabe"""
task.id = self.next_task_id
self.next_task_id += 1
task.created_at = datetime.now()
self.tasks[task.id] = task
# Automatische Terminierung für vorbeugende Wartungen
if task.maintenance_type == MaintenanceType.PREVENTIVE and not task.scheduled_date:
task.scheduled_date = self._calculate_next_maintenance_date(task.printer_id)
# Benachrichtigungen senden
self._send_task_notifications(task, "created")
logger.info(f"Wartungsaufgabe erstellt: {task.title} für Drucker {task.printer_id}")
return task.id
def update_task_status(self, task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool:
"""Aktualisiert den Status einer Wartungsaufgabe"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
old_status = task.status
task.status = new_status
# Zeitstempel setzen
if new_status == MaintenanceStatus.IN_PROGRESS:
task.started_at = datetime.now()
elif new_status == MaintenanceStatus.COMPLETED:
task.completed_at = datetime.now()
if task.started_at:
task.actual_duration = int((task.completed_at - task.started_at).total_seconds() / 60)
# Zur Historie hinzufügen
self.maintenance_history.append(task)
# Nächste Wartung planen
self._schedule_next_maintenance(task)
if notes:
task.notes += f"\n{datetime.now().strftime('%d.%m.%Y %H:%M')}: {notes}"
# Benachrichtigungen senden
if old_status != new_status:
self._send_task_notifications(task, "status_changed")
logger.info(f"Wartungsaufgabe {task_id} Status: {old_status.value}{new_status.value}")
return True
def schedule_maintenance(self, printer_id: int, maintenance_type: MaintenanceType,
interval_days: int, description: str = "") -> MaintenanceSchedule:
"""Plant regelmäßige Wartungen"""
schedule_item = MaintenanceSchedule(
printer_id=printer_id,
maintenance_type=maintenance_type,
interval_days=interval_days,
next_due=datetime.now() + timedelta(days=interval_days),
description=description
)
if printer_id not in self.schedules:
self.schedules[printer_id] = []
self.schedules[printer_id].append(schedule_item)
logger.info(f"Wartungsplan erstellt: {maintenance_type.value} alle {interval_days} Tage für Drucker {printer_id}")
return schedule_item
def get_upcoming_maintenance(self, days_ahead: int = 7) -> List[MaintenanceTask]:
"""Holt anstehende Wartungen"""
cutoff_date = datetime.now() + timedelta(days=days_ahead)
upcoming = []
for task in self.tasks.values():
if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
task.due_date and task.due_date <= cutoff_date):
upcoming.append(task)
return sorted(upcoming, key=lambda t: t.due_date or datetime.max)
def get_overdue_tasks(self) -> List[MaintenanceTask]:
"""Holt überfällige Wartungen"""
now = datetime.now()
overdue = []
for task in self.tasks.values():
if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
task.due_date and task.due_date < now):
task.status = MaintenanceStatus.OVERDUE
overdue.append(task)
return overdue
def get_maintenance_metrics(self, printer_id: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> MaintenanceMetrics:
"""Berechnet Wartungsmetriken"""
# Filter tasks
tasks = self.maintenance_history.copy()
if printer_id:
tasks = [t for t in tasks if t.printer_id == printer_id]
if start_date:
tasks = [t for t in tasks if t.completed_at and t.completed_at >= start_date]
if end_date:
tasks = [t for t in tasks if t.completed_at and t.completed_at <= end_date]
if not tasks:
return MaintenanceMetrics()
completed_tasks = [t for t in tasks if t.status == MaintenanceStatus.COMPLETED]
# Grundmetriken
total_tasks = len(tasks)
completed_count = len(completed_tasks)
# Durchschnittliche Bearbeitungszeit
completion_times = [t.actual_duration for t in completed_tasks if t.actual_duration]
avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else 0
# Gesamtkosten
total_cost = sum(t.cost for t in completed_tasks if t.cost)
# MTBF und MTTR berechnen
mtbf = self._calculate_mtbf(tasks, printer_id)
mttr = avg_completion_time / 60 # Konvertiere zu Stunden
# Verfügbarkeit berechnen
uptime_percentage = self._calculate_uptime(printer_id, start_date, end_date)
return MaintenanceMetrics(
total_tasks=total_tasks,
completed_tasks=completed_count,
overdue_tasks=len(self.get_overdue_tasks()),
average_completion_time=avg_completion_time,
total_cost=total_cost,
mtbf=mtbf,
mttr=mttr,
uptime_percentage=uptime_percentage
)
def create_maintenance_checklist(self, maintenance_type: MaintenanceType) -> List[Dict[str, Any]]:
"""Erstellt eine Wartungs-Checkliste"""
checklists = {
MaintenanceType.PREVENTIVE: [
{"task": "Drucker äußerlich reinigen", "completed": False, "required": True},
{"task": "Druckbett-Level prüfen", "completed": False, "required": True},
{"task": "Extruder-Düse reinigen", "completed": False, "required": True},
{"task": "Riemen-Spannung prüfen", "completed": False, "required": True},
{"task": "Filament-Führung prüfen", "completed": False, "required": False},
{"task": "Software-Updates prüfen", "completed": False, "required": False},
{"task": "Lüfter reinigen", "completed": False, "required": True},
{"task": "Schrauben nachziehen", "completed": False, "required": False}
],
MaintenanceType.CORRECTIVE: [
{"task": "Problem-Diagnose durchführen", "completed": False, "required": True},
{"task": "Defekte Teile identifizieren", "completed": False, "required": True},
{"task": "Ersatzteile bestellen/bereitstellen", "completed": False, "required": True},
{"task": "Reparatur durchführen", "completed": False, "required": True},
{"task": "Funktionstest durchführen", "completed": False, "required": True},
{"task": "Kalibrierung prüfen", "completed": False, "required": True}
],
MaintenanceType.INSPECTION: [
{"task": "Sichtprüfung der Mechanik", "completed": False, "required": True},
{"task": "Druckqualität testen", "completed": False, "required": True},
{"task": "Temperaturen prüfen", "completed": False, "required": True},
{"task": "Bewegungen testen", "completed": False, "required": True},
{"task": "Verschleiß bewerten", "completed": False, "required": True}
]
}
return checklists.get(maintenance_type, [])
def _check_scheduled_maintenance(self):
"""Prüft täglich auf fällige Wartungen"""
logger.info("Prüfe fällige Wartungen...")
today = datetime.now()
for printer_id, schedules in self.schedules.items():
for schedule_item in schedules:
if not schedule_item.is_active:
continue
if schedule_item.next_due <= today:
# Erstelle Wartungsaufgabe
task = MaintenanceTask(
printer_id=printer_id,
title=f"{schedule_item.maintenance_type.value.title()} Wartung",
description=schedule_item.description,
maintenance_type=schedule_item.maintenance_type,
priority=MaintenancePriority.NORMAL,
due_date=schedule_item.next_due,
checklist=self.create_maintenance_checklist(schedule_item.maintenance_type)
)
task_id = self.create_task(task)
# Nächsten Termin berechnen
schedule_item.next_due = today + timedelta(days=schedule_item.interval_days)
logger.info(f"Automatische Wartungsaufgabe erstellt: {task_id}")
def _check_overdue_tasks(self):
"""Prüft stündlich auf überfällige Aufgaben"""
overdue = self.get_overdue_tasks()
if overdue:
logger.warning(f"{len(overdue)} überfällige Wartungsaufgaben gefunden")
for task in overdue:
emit_system_alert(
f"Wartung überfällig: {task.title} (Drucker {task.printer_id})",
"warning",
"high"
)
def _generate_weekly_report(self):
"""Generiert wöchentlichen Wartungsbericht"""
logger.info("Generiere wöchentlichen Wartungsbericht...")
# Sammle Daten der letzten Woche
last_week = datetime.now() - timedelta(days=7)
metrics = self.get_maintenance_metrics(start_date=last_week)
# Sende Report (Implementation abhängig von verfügbaren Services)
# send_maintenance_report(metrics)
def _calculate_next_maintenance_date(self, printer_id: int) -> datetime:
"""Berechnet nächstes Wartungsdatum basierend auf Nutzung"""
# Vereinfachte Implementierung - kann erweitert werden
base_interval = 30 # Tage
# Hier könnte man Nutzungsstatistiken einbeziehen
with get_db_session() as db_session:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if printer:
# Berücksichtige letzten Check
if printer.last_checked:
days_since_check = (datetime.now() - printer.last_checked).days
if days_since_check < 15: # Kürzlich gecheckt
base_interval += 15
return datetime.now() + timedelta(days=base_interval)
def _schedule_next_maintenance(self, completed_task: MaintenanceTask):
"""Plant nächste Wartung nach Abschluss einer Aufgabe"""
if completed_task.maintenance_type == MaintenanceType.PREVENTIVE:
# Finde entsprechenden Schedule
printer_schedules = self.schedules.get(completed_task.printer_id, [])
for schedule_item in printer_schedules:
if schedule_item.maintenance_type == completed_task.maintenance_type:
schedule_item.last_completed = completed_task.completed_at
schedule_item.next_due = datetime.now() + timedelta(days=schedule_item.interval_days)
break
def _calculate_mtbf(self, tasks: List[MaintenanceTask], printer_id: Optional[int]) -> float:
"""Berechnet Mean Time Between Failures"""
# Vereinfachte MTBF-Berechnung
failure_tasks = [t for t in tasks if t.maintenance_type == MaintenanceType.CORRECTIVE]
if len(failure_tasks) < 2:
return 0.0
# Zeitspanne zwischen ersten und letzten Ausfall
first_failure = min(failure_tasks, key=lambda t: t.created_at)
last_failure = max(failure_tasks, key=lambda t: t.created_at)
total_time = (last_failure.created_at - first_failure.created_at).total_seconds() / 3600 # Stunden
failure_count = len(failure_tasks) - 1
return total_time / failure_count if failure_count > 0 else 0.0
def _calculate_uptime(self, printer_id: Optional[int], start_date: Optional[datetime],
end_date: Optional[datetime]) -> float:
"""Berechnet Verfügbarkeit in Prozent"""
# Vereinfachte Uptime-Berechnung
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
total_time = (end_date - start_date).total_seconds()
# Berechne Downtime aus Wartungszeiten
downtime = 0
for task in self.maintenance_history:
if printer_id and task.printer_id != printer_id:
continue
if (task.status == MaintenanceStatus.COMPLETED and
task.started_at and task.completed_at and
task.started_at >= start_date and task.completed_at <= end_date):
downtime += (task.completed_at - task.started_at).total_seconds()
uptime = ((total_time - downtime) / total_time) * 100 if total_time > 0 else 0
return max(0, min(100, uptime))
def _send_task_notifications(self, task: MaintenanceTask, event_type: str):
"""Sendet Benachrichtigungen für Wartungsaufgaben"""
try:
if event_type == "created":
emit_system_alert(
f"Neue Wartungsaufgabe: {task.title} (Drucker {task.printer_id})",
"info",
"normal"
)
elif event_type == "status_changed":
emit_system_alert(
f"Wartungsstatus geändert: {task.title}{task.status.value}",
"info",
"normal"
)
except Exception as e:
logger.error(f"Fehler beim Senden der Wartungsbenachrichtigung: {str(e)}")
# Globale Instanz
maintenance_manager = MaintenanceManager()
def get_maintenance_dashboard_data() -> Dict[str, Any]:
"""Holt Dashboard-Daten für Wartungen"""
upcoming = maintenance_manager.get_upcoming_maintenance()
overdue = maintenance_manager.get_overdue_tasks()
metrics = maintenance_manager.get_maintenance_metrics()
return {
'upcoming_count': len(upcoming),
'overdue_count': len(overdue),
'upcoming_tasks': [asdict(task) for task in upcoming[:5]],
'overdue_tasks': [asdict(task) for task in overdue],
'metrics': asdict(metrics),
'next_scheduled': upcoming[0] if upcoming else None
}
def create_emergency_maintenance(printer_id: int, description: str,
priority: MaintenancePriority = MaintenancePriority.CRITICAL) -> int:
"""Erstellt eine Notfall-Wartung"""
task = MaintenanceTask(
printer_id=printer_id,
title="Notfall-Wartung",
description=description,
maintenance_type=MaintenanceType.EMERGENCY,
priority=priority,
due_date=datetime.now(), # Sofort fällig
checklist=maintenance_manager.create_maintenance_checklist(MaintenanceType.CORRECTIVE)
)
return maintenance_manager.create_task(task)
def schedule_preventive_maintenance(printer_id: int, interval_days: int = 30) -> MaintenanceSchedule:
"""Plant vorbeugende Wartung"""
return maintenance_manager.schedule_maintenance(
printer_id=printer_id,
maintenance_type=MaintenanceType.PREVENTIVE,
interval_days=interval_days,
description="Regelmäßige vorbeugende Wartung"
)
# JavaScript für Wartungs-Frontend
def get_maintenance_javascript() -> str:
"""JavaScript für Wartungsmanagement"""
return """
class MaintenanceManager {
constructor() {
this.currentTasks = [];
this.selectedTask = null;
this.init();
}
init() {
this.loadTasks();
this.setupEventListeners();
this.startAutoRefresh();
}
setupEventListeners() {
// Task status updates
document.addEventListener('click', (e) => {
if (e.target.matches('.maintenance-status-btn')) {
const taskId = e.target.dataset.taskId;
const newStatus = e.target.dataset.status;
this.updateTaskStatus(taskId, newStatus);
}
if (e.target.matches('.maintenance-details-btn')) {
const taskId = e.target.dataset.taskId;
this.showTaskDetails(taskId);
}
});
// Create maintenance form
const createForm = document.getElementById('create-maintenance-form');
createForm?.addEventListener('submit', (e) => {
e.preventDefault();
this.createTask(new FormData(createForm));
});
}
async loadTasks() {
try {
const response = await fetch('/api/maintenance/tasks');
const data = await response.json();
if (data.success) {
this.currentTasks = data.tasks;
this.renderTasks();
}
} catch (error) {
console.error('Fehler beim Laden der Wartungsaufgaben:', error);
}
}
async updateTaskStatus(taskId, newStatus) {
try {
const response = await fetch(`/api/maintenance/tasks/${taskId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: newStatus })
});
const result = await response.json();
if (result.success) {
this.loadTasks(); // Refresh
this.showNotification('Wartungsstatus aktualisiert', 'success');
} else {
this.showNotification('Fehler beim Aktualisieren', 'error');
}
} catch (error) {
console.error('Status-Update fehlgeschlagen:', error);
}
}
renderTasks() {
const container = document.getElementById('maintenance-tasks-container');
if (!container) return;
container.innerHTML = this.currentTasks.map(task => `
<div class="maintenance-task-card ${task.status} priority-${task.priority}">
<div class="task-header">
<h3>${task.title}</h3>
<span class="task-priority">${task.priority}</span>
</div>
<div class="task-info">
<p><strong>Drucker:</strong> ${task.printer_id}</p>
<p><strong>Typ:</strong> ${task.maintenance_type}</p>
<p><strong>Fällig:</strong> ${this.formatDate(task.due_date)}</p>
<p><strong>Status:</strong> ${task.status}</p>
</div>
<div class="task-actions">
<button class="maintenance-status-btn" data-task-id="${task.id}" data-status="in_progress">
Starten
</button>
<button class="maintenance-status-btn" data-task-id="${task.id}" data-status="completed">
Abschließen
</button>
<button class="maintenance-details-btn" data-task-id="${task.id}">
Details
</button>
</div>
</div>
`).join('');
}
showTaskDetails(taskId) {
const task = this.currentTasks.find(t => t.id == taskId);
if (!task) return;
// Create modal with task details
const modal = document.createElement('div');
modal.className = 'maintenance-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>${task.title}</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="task-details">
<p><strong>Beschreibung:</strong> ${task.description}</p>
<p><strong>Techniker:</strong> ${task.assigned_technician || 'Nicht zugewiesen'}</p>
<p><strong>Geschätzte Dauer:</strong> ${task.estimated_duration} Minuten</p>
${task.checklist ? this.renderChecklist(task.checklist) : ''}
<div class="task-notes">
<h4>Notizen:</h4>
<textarea id="task-notes-${taskId}" rows="4" cols="50">${task.notes || ''}</textarea>
<button onclick="maintenanceManager.saveNotes(${taskId})">Notizen speichern</button>
</div>
</div>
</div>
</div>
`;
document.body.appendChild(modal);
// Close modal handlers
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderChecklist(checklist) {
return `
<div class="maintenance-checklist">
<h4>Checkliste:</h4>
${checklist.map((item, index) => `
<label class="checklist-item">
<input type="checkbox" ${item.completed ? 'checked' : ''}
onchange="maintenanceManager.updateChecklistItem(${index}, this.checked)">
${item.task}
${item.required ? '<span class="required">*</span>' : ''}
</label>
`).join('')}
</div>
`;
}
formatDate(dateString) {
if (!dateString) return 'Nicht gesetzt';
const date = new Date(dateString);
return date.toLocaleDateString('de-DE') + ' ' + date.toLocaleTimeString('de-DE', {hour: '2-digit', minute: '2-digit'});
}
showNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.className = `notification notification-${type}`;
notification.textContent = message;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 3000);
}
startAutoRefresh() {
setInterval(() => {
this.loadTasks();
}, 30000); // Refresh every 30 seconds
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', function() {
window.maintenanceManager = new MaintenanceManager();
});
"""

View File

@@ -0,0 +1,784 @@
"""
Multi-Standort-Unterstützungssystem für das MYP-System
======================================================
Dieses Modul stellt umfassende Multi-Location-Funktionalität bereit:
- Standort-Management und Hierarchien
- Standort-spezifische Konfigurationen
- Zentrale und dezentrale Verwaltung
- Standort-übergreifende Berichte
- Ressourcen-Sharing zwischen Standorten
- Benutzer-Standort-Zuweisungen
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import geocoder
import requests
from utils.logging_config import get_logger
from models import User, Printer, Job, get_db_session
logger = get_logger("multi_location")
class LocationType(Enum):
"""Arten von Standorten"""
HEADQUARTERS = "headquarters" # Hauptsitz
BRANCH = "branch" # Niederlassung
DEPARTMENT = "department" # Abteilung
FLOOR = "floor" # Stockwerk
ROOM = "room" # Raum
AREA = "area" # Bereich
class AccessLevel(Enum):
"""Zugriffslevel für Standorte"""
FULL = "full" # Vollzugriff
READ_WRITE = "read_write" # Lesen und Schreiben
READ_ONLY = "read_only" # Nur Lesen
NO_ACCESS = "no_access" # Kein Zugriff
@dataclass
class LocationConfig:
"""Standort-spezifische Konfiguration"""
timezone: str = "Europe/Berlin"
business_hours: Dict[str, str] = None
maintenance_window: Dict[str, str] = None
auto_approval_enabled: bool = False
max_job_duration: int = 480 # Minuten
contact_info: Dict[str, str] = None
notification_settings: Dict[str, Any] = None
@dataclass
class Location:
"""Standort-Definition"""
id: Optional[int] = None
name: str = ""
code: str = "" # Kurzer Code für den Standort
location_type: LocationType = LocationType.BRANCH
parent_id: Optional[int] = None
address: str = ""
city: str = ""
country: str = ""
postal_code: str = ""
latitude: Optional[float] = None
longitude: Optional[float] = None
description: str = ""
config: LocationConfig = None
is_active: bool = True
created_at: datetime = None
manager_id: Optional[int] = None
def __post_init__(self):
if self.config is None:
self.config = LocationConfig()
if self.created_at is None:
self.created_at = datetime.now()
@dataclass
class UserLocationAccess:
"""Benutzer-Standort-Zugriff"""
user_id: int
location_id: int
access_level: AccessLevel
granted_by: int
granted_at: datetime
expires_at: Optional[datetime] = None
is_primary: bool = False
class MultiLocationManager:
"""Manager für Multi-Standort-Funktionalität"""
def __init__(self):
self.locations: Dict[int, Location] = {}
self.user_access: Dict[int, List[UserLocationAccess]] = {}
self.next_location_id = 1
# Standard-Standort erstellen
self._create_default_location()
def _create_default_location(self):
"""Erstellt Standard-Standort falls keiner existiert"""
default_location = Location(
id=1,
name="Hauptstandort",
code="HQ",
location_type=LocationType.HEADQUARTERS,
address="Mercedes-Benz Platz",
city="Stuttgart",
country="Deutschland",
description="Hauptstandort des MYP-Systems"
)
self.locations[1] = default_location
self.next_location_id = 2
logger.info("Standard-Standort erstellt")
def create_location(self, location: Location) -> int:
"""Erstellt einen neuen Standort"""
location.id = self.next_location_id
self.next_location_id += 1
# Koordinaten automatisch ermitteln
if not location.latitude or not location.longitude:
self._geocode_location(location)
self.locations[location.id] = location
logger.info(f"Standort erstellt: {location.name} ({location.code})")
return location.id
def update_location(self, location_id: int, updates: Dict[str, Any]) -> bool:
"""Aktualisiert einen Standort"""
if location_id not in self.locations:
return False
location = self.locations[location_id]
for key, value in updates.items():
if hasattr(location, key):
setattr(location, key, value)
# Koordinaten neu ermitteln bei Adressänderung
if 'address' in updates or 'city' in updates:
self._geocode_location(location)
logger.info(f"Standort aktualisiert: {location.name}")
return True
def delete_location(self, location_id: int) -> bool:
"""Löscht einen Standort (Soft Delete)"""
if location_id not in self.locations:
return False
location = self.locations[location_id]
# Prüfe ob Standort Kinder hat
children = self.get_child_locations(location_id)
if children:
logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat Unterstandorte")
return False
# Prüfe auf aktive Ressourcen
if self._has_active_resources(location_id):
logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat aktive Ressourcen")
return False
location.is_active = False
logger.info(f"Standort deaktiviert: {location.name}")
return True
def get_location_hierarchy(self, location_id: Optional[int] = None) -> Dict[str, Any]:
"""Holt Standort-Hierarchie"""
if location_id:
# Spezifische Hierarchie ab einem Standort
location = self.locations.get(location_id)
if not location:
return {}
return self._build_hierarchy_node(location)
else:
# Komplette Hierarchie
root_locations = [loc for loc in self.locations.values()
if loc.parent_id is None and loc.is_active]
return {
'locations': [self._build_hierarchy_node(loc) for loc in root_locations]
}
def _build_hierarchy_node(self, location: Location) -> Dict[str, Any]:
"""Erstellt einen Hierarchie-Knoten"""
children = self.get_child_locations(location.id)
return {
'id': location.id,
'name': location.name,
'code': location.code,
'type': location.location_type.value,
'children': [self._build_hierarchy_node(child) for child in children],
'resource_count': self._count_location_resources(location.id)
}
def get_child_locations(self, parent_id: int) -> List[Location]:
"""Holt alle Kinder-Standorte"""
return [loc for loc in self.locations.values()
if loc.parent_id == parent_id and loc.is_active]
def get_location_path(self, location_id: int) -> List[Location]:
"""Holt den Pfad vom Root zum Standort"""
path = []
current_id = location_id
while current_id:
location = self.locations.get(current_id)
if not location:
break
path.insert(0, location)
current_id = location.parent_id
return path
def grant_location_access(self, user_id: int, location_id: int,
access_level: AccessLevel, granted_by: int,
expires_at: Optional[datetime] = None,
is_primary: bool = False) -> bool:
"""Gewährt Benutzer-Zugriff auf einen Standort"""
if location_id not in self.locations:
return False
access = UserLocationAccess(
user_id=user_id,
location_id=location_id,
access_level=access_level,
granted_by=granted_by,
granted_at=datetime.now(),
expires_at=expires_at,
is_primary=is_primary
)
if user_id not in self.user_access:
self.user_access[user_id] = []
# Entferne vorherigen Zugriff für diesen Standort
self.user_access[user_id] = [
acc for acc in self.user_access[user_id]
if acc.location_id != location_id
]
# Setze anderen primary-Zugriff zurück falls nötig
if is_primary:
for access_item in self.user_access[user_id]:
access_item.is_primary = False
self.user_access[user_id].append(access)
logger.info(f"Standort-Zugriff gewährt: User {user_id} → Location {location_id} ({access_level.value})")
return True
def revoke_location_access(self, user_id: int, location_id: int) -> bool:
"""Entzieht Benutzer-Zugriff auf einen Standort"""
if user_id not in self.user_access:
return False
original_count = len(self.user_access[user_id])
self.user_access[user_id] = [
acc for acc in self.user_access[user_id]
if acc.location_id != location_id
]
success = len(self.user_access[user_id]) < original_count
if success:
logger.info(f"Standort-Zugriff entzogen: User {user_id} → Location {location_id}")
return success
def get_user_locations(self, user_id: int, access_level: Optional[AccessLevel] = None) -> List[Location]:
"""Holt alle Standorte eines Benutzers"""
if user_id not in self.user_access:
return []
accessible_locations = []
now = datetime.now()
for access in self.user_access[user_id]:
# Prüfe Ablaufzeit
if access.expires_at and access.expires_at < now:
continue
# Prüfe Access Level
if access_level and access.access_level != access_level:
continue
location = self.locations.get(access.location_id)
if location and location.is_active:
accessible_locations.append(location)
return accessible_locations
def get_user_primary_location(self, user_id: int) -> Optional[Location]:
"""Holt den primären Standort eines Benutzers"""
if user_id not in self.user_access:
return None
for access in self.user_access[user_id]:
if access.is_primary:
return self.locations.get(access.location_id)
# Fallback: ersten verfügbaren Standort nehmen
user_locations = self.get_user_locations(user_id)
return user_locations[0] if user_locations else None
def check_user_access(self, user_id: int, location_id: int,
required_level: AccessLevel = AccessLevel.READ_ONLY) -> bool:
"""Prüft ob Benutzer Zugriff auf Standort hat"""
if user_id not in self.user_access:
return False
access_levels = {
AccessLevel.NO_ACCESS: 0,
AccessLevel.READ_ONLY: 1,
AccessLevel.READ_WRITE: 2,
AccessLevel.FULL: 3
}
required_level_value = access_levels[required_level]
now = datetime.now()
for access in self.user_access[user_id]:
if access.location_id != location_id:
continue
# Prüfe Ablaufzeit
if access.expires_at and access.expires_at < now:
continue
user_level_value = access_levels[access.access_level]
if user_level_value >= required_level_value:
return True
return False
def get_location_resources(self, location_id: int) -> Dict[str, Any]:
"""Holt alle Ressourcen eines Standorts"""
if location_id not in self.locations:
return {}
# Simuliere Datenbankabfrage für Drucker und Jobs
resources = {
'printers': [],
'active_jobs': [],
'users': [],
'pending_maintenance': 0
}
# In echter Implementierung würde hier die Datenbank abgefragt
with get_db_session() as db_session:
# Drucker des Standorts (vereinfacht - benötigt location_id in Printer-Model)
# printers = db_session.query(Printer).filter(Printer.location_id == location_id).all()
# resources['printers'] = [p.to_dict() for p in printers]
pass
return resources
def get_location_statistics(self, location_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> Dict[str, Any]:
"""Holt Statistiken für einen Standort"""
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Sammle Statistiken
stats = {
'location': self.locations.get(location_id, {}).name if location_id in self.locations else 'Unbekannt',
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'totals': {
'printers': 0,
'jobs_completed': 0,
'jobs_failed': 0,
'print_time_hours': 0,
'material_used_kg': 0,
'users_active': 0
},
'averages': {
'jobs_per_day': 0,
'job_duration_minutes': 0,
'printer_utilization': 0
},
'trends': {
'daily_jobs': [],
'printer_usage': []
}
}
# In echter Implementierung würden hier Datenbankabfragen stehen
return stats
def get_multi_location_report(self, location_ids: List[int] = None) -> Dict[str, Any]:
"""Erstellt standortübergreifenden Bericht"""
if not location_ids:
location_ids = list(self.locations.keys())
report = {
'generated_at': datetime.now().isoformat(),
'locations': [],
'summary': {
'total_locations': len(location_ids),
'total_printers': 0,
'total_users': 0,
'total_jobs': 0,
'cross_location_sharing': []
}
}
for location_id in location_ids:
location = self.locations.get(location_id)
if not location:
continue
location_stats = self.get_location_statistics(location_id)
location_data = {
'id': location.id,
'name': location.name,
'code': location.code,
'type': location.location_type.value,
'statistics': location_stats
}
report['locations'].append(location_data)
# Summiere für Gesamtübersicht
totals = location_stats.get('totals', {})
report['summary']['total_printers'] += totals.get('printers', 0)
report['summary']['total_users'] += totals.get('users_active', 0)
report['summary']['total_jobs'] += totals.get('jobs_completed', 0)
return report
def find_nearest_locations(self, latitude: float, longitude: float,
radius_km: float = 50, limit: int = 5) -> List[Tuple[Location, float]]:
"""Findet nächstgelegene Standorte"""
from math import radians, sin, cos, sqrt, atan2
def calculate_distance(lat1, lon1, lat2, lon2):
"""Berechnet Entfernung zwischen zwei Koordinaten (Haversine)"""
R = 6371 # Erdradius in km
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
nearby_locations = []
for location in self.locations.values():
if not location.is_active or not location.latitude or not location.longitude:
continue
distance = calculate_distance(
latitude, longitude,
location.latitude, location.longitude
)
if distance <= radius_km:
nearby_locations.append((location, distance))
# Sortiere nach Entfernung
nearby_locations.sort(key=lambda x: x[1])
return nearby_locations[:limit]
def _geocode_location(self, location: Location):
"""Ermittelt Koordinaten für einen Standort"""
try:
address_parts = [location.address, location.city, location.country]
full_address = ', '.join(filter(None, address_parts))
if not full_address:
return
# Verwende geocoder library
result = geocoder.osm(full_address)
if result.ok:
location.latitude = result.lat
location.longitude = result.lng
logger.info(f"Koordinaten ermittelt für {location.name}: {location.latitude}, {location.longitude}")
else:
logger.warning(f"Koordinaten konnten nicht ermittelt werden für {location.name}")
except Exception as e:
logger.error(f"Fehler bei Geocoding für {location.name}: {str(e)}")
def _has_active_resources(self, location_id: int) -> bool:
"""Prüft ob Standort aktive Ressourcen hat"""
# Vereinfachte Implementierung
# In echter Implementation würde hier die Datenbank geprüft
return False
def _count_location_resources(self, location_id: int) -> Dict[str, int]:
"""Zählt Ressourcen eines Standorts"""
# Vereinfachte Implementierung
return {
'printers': 0,
'users': 0,
'jobs': 0
}
# Globale Instanz
multi_location_manager = MultiLocationManager()
def get_location_dashboard_data(user_id: int) -> Dict[str, Any]:
"""Holt Dashboard-Daten für Standorte eines Benutzers"""
user_locations = multi_location_manager.get_user_locations(user_id)
primary_location = multi_location_manager.get_user_primary_location(user_id)
dashboard_data = {
'user_locations': [asdict(loc) for loc in user_locations],
'primary_location': asdict(primary_location) if primary_location else None,
'location_count': len(user_locations),
'hierarchy': multi_location_manager.get_location_hierarchy()
}
# Füge Statistiken für jeden Standort hinzu
for location in user_locations:
location_stats = multi_location_manager.get_location_statistics(location.id)
dashboard_data[f'stats_{location.id}'] = location_stats
return dashboard_data
def create_location_from_address(name: str, address: str, city: str,
country: str, location_type: LocationType = LocationType.BRANCH) -> int:
"""Erstellt Standort aus Adresse mit automatischer Geocodierung"""
location = Location(
name=name,
code=name[:3].upper(),
location_type=location_type,
address=address,
city=city,
country=country
)
return multi_location_manager.create_location(location)
# JavaScript für Multi-Location Frontend
def get_multi_location_javascript() -> str:
"""JavaScript für Multi-Location Management"""
return """
class MultiLocationManager {
constructor() {
this.currentLocation = null;
this.userLocations = [];
this.locationHierarchy = {};
this.init();
}
init() {
this.loadUserLocations();
this.setupEventListeners();
}
setupEventListeners() {
// Location switcher
document.addEventListener('change', (e) => {
if (e.target.matches('.location-selector')) {
const locationId = parseInt(e.target.value);
this.switchLocation(locationId);
}
});
// Location management buttons
document.addEventListener('click', (e) => {
if (e.target.matches('.manage-locations-btn')) {
this.showLocationManager();
}
if (e.target.matches('.location-hierarchy-btn')) {
this.showLocationHierarchy();
}
});
}
async loadUserLocations() {
try {
const response = await fetch('/api/locations/user');
const data = await response.json();
if (data.success) {
this.userLocations = data.locations;
this.currentLocation = data.primary_location;
this.locationHierarchy = data.hierarchy;
this.updateLocationSelector();
this.updateLocationDisplay();
}
} catch (error) {
console.error('Fehler beim Laden der Standorte:', error);
}
}
updateLocationSelector() {
const selectors = document.querySelectorAll('.location-selector');
selectors.forEach(selector => {
selector.innerHTML = this.userLocations.map(location =>
`<option value="${location.id}" ${location.id === this.currentLocation?.id ? 'selected' : ''}>
${location.name} (${location.code})
</option>`
).join('');
});
}
updateLocationDisplay() {
const displays = document.querySelectorAll('.current-location-display');
displays.forEach(display => {
if (this.currentLocation) {
display.innerHTML = `
<div class="location-info">
<strong>${this.currentLocation.name}</strong>
<span class="location-type">${this.currentLocation.type}</span>
${this.currentLocation.city ? `<span class="location-city">${this.currentLocation.city}</span>` : ''}
</div>
`;
} else {
display.innerHTML = '<span class="no-location">Kein Standort ausgewählt</span>';
}
});
}
async switchLocation(locationId) {
try {
const response = await fetch('/api/locations/switch', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ location_id: locationId })
});
const result = await response.json();
if (result.success) {
this.currentLocation = this.userLocations.find(loc => loc.id === locationId);
this.updateLocationDisplay();
// Seite neu laden um location-spezifische Daten zu aktualisieren
window.location.reload();
} else {
this.showNotification('Fehler beim Wechseln des Standorts', 'error');
}
} catch (error) {
console.error('Standort-Wechsel fehlgeschlagen:', error);
}
}
showLocationManager() {
const modal = document.createElement('div');
modal.className = 'location-manager-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>Standort-Verwaltung</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="location-list">
${this.renderLocationList()}
</div>
<div class="location-actions">
<button class="btn-create-location">Neuen Standort erstellen</button>
</div>
</div>
</div>
`;
document.body.appendChild(modal);
// Event handlers
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderLocationList() {
return this.userLocations.map(location => `
<div class="location-item">
<div class="location-details">
<h4>${location.name} (${location.code})</h4>
<p><strong>Typ:</strong> ${location.type}</p>
<p><strong>Adresse:</strong> ${location.address || 'Nicht angegeben'}</p>
<p><strong>Stadt:</strong> ${location.city || 'Nicht angegeben'}</p>
</div>
<div class="location-actions">
<button class="btn-edit-location" data-location-id="${location.id}">Bearbeiten</button>
<button class="btn-view-stats" data-location-id="${location.id}">Statistiken</button>
</div>
</div>
`).join('');
}
showLocationHierarchy() {
const modal = document.createElement('div');
modal.className = 'hierarchy-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>Standort-Hierarchie</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="hierarchy-tree">
${this.renderHierarchyTree(this.locationHierarchy.locations || [])}
</div>
</div>
</div>
`;
document.body.appendChild(modal);
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderHierarchyTree(locations, level = 0) {
return locations.map(location => `
<div class="hierarchy-node" style="margin-left: ${level * 20}px;">
<div class="node-content">
<span class="node-icon">${this.getLocationTypeIcon(location.type)}</span>
<span class="node-name">${location.name}</span>
<span class="node-code">(${location.code})</span>
<span class="resource-count">${location.resource_count.printers || 0} Drucker</span>
</div>
${location.children && location.children.length > 0 ?
this.renderHierarchyTree(location.children, level + 1) : ''}
</div>
`).join('');
}
getLocationTypeIcon(type) {
const icons = {
'headquarters': '🏢',
'branch': '🏪',
'department': '🏬',
'floor': '🏢',
'room': '🚪',
'area': '📍'
};
return icons[type] || '📍';
}
showNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.className = `notification notification-${type}`;
notification.textContent = message;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 3000);
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', function() {
window.multiLocationManager = new MultiLocationManager();
});
"""