""" Wartungsplanungs- und Tracking-System für das MYP-System ======================================================== Dieses Modul stellt umfassende Wartungsfunktionalität bereit: - Geplante und ungeplante Wartungen - Wartungsintervalle und Erinnerungen - Wartungshistorie und Berichte - Automatische Wartungsprüfungen - Ersatzteil-Management - Techniker-Zuweisungen """ import asyncio import json import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass, asdict from enum import Enum import threading import schedule import time from utils.logging_config import get_logger from models import Printer, get_db_session from utils.email_notification import send_email_notification from utils.realtime_dashboard import emit_system_alert logger = get_logger("maintenance") class MaintenanceType(Enum): """Arten von Wartungen""" PREVENTIVE = "preventive" # Vorbeugende Wartung CORRECTIVE = "corrective" # Reparatur/Korrektur EMERGENCY = "emergency" # Notfall-Wartung SCHEDULED = "scheduled" # Geplante Wartung INSPECTION = "inspection" # Inspektion class MaintenanceStatus(Enum): """Status einer Wartung""" PLANNED = "planned" # Geplant SCHEDULED = "scheduled" # Terminiert IN_PROGRESS = "in_progress" # In Bearbeitung COMPLETED = "completed" # Abgeschlossen CANCELLED = "cancelled" # Abgebrochen OVERDUE = "overdue" # Überfällig class MaintenancePriority(Enum): """Priorität einer Wartung""" LOW = "low" # Niedrig NORMAL = "normal" # Normal HIGH = "high" # Hoch CRITICAL = "critical" # Kritisch EMERGENCY = "emergency" # Notfall @dataclass class MaintenanceTask: """Wartungsaufgabe""" id: Optional[int] = None printer_id: int = None title: str = "" description: str = "" maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE priority: MaintenancePriority = MaintenancePriority.NORMAL status: MaintenanceStatus = MaintenanceStatus.PLANNED scheduled_date: Optional[datetime] = None due_date: Optional[datetime] = None estimated_duration: int = 60 # Minuten actual_duration: Optional[int] = None assigned_technician: Optional[str] = None created_at: datetime = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None notes: str = "" required_parts: List[str] = None actual_parts_used: List[str] = None cost: Optional[float] = None checklist: List[Dict[str, Any]] = None photos: List[str] = None created_by: Optional[int] = None @dataclass class MaintenanceSchedule: """Wartungsplan""" printer_id: int maintenance_type: MaintenanceType interval_days: int next_due: datetime last_completed: Optional[datetime] = None is_active: bool = True description: str = "" checklist_template: List[str] = None @dataclass class MaintenanceMetrics: """Wartungsmetriken""" total_tasks: int = 0 completed_tasks: int = 0 overdue_tasks: int = 0 average_completion_time: float = 0.0 total_cost: float = 0.0 mtbf: float = 0.0 # Mean Time Between Failures mttr: float = 0.0 # Mean Time To Repair uptime_percentage: float = 0.0 class MaintenanceManager: """Manager für Wartungsplanung und -tracking""" def __init__(self): self.tasks: Dict[int, MaintenanceTask] = {} self.schedules: Dict[int, List[MaintenanceSchedule]] = {} self.maintenance_history: List[MaintenanceTask] = [] self.next_task_id = 1 self.is_running = False self._setup_scheduler() def _setup_scheduler(self): """Richtet automatische Wartungsplanung ein""" schedule.every().day.at("06:00").do(self._check_scheduled_maintenance) schedule.every().hour.do(self._check_overdue_tasks) schedule.every().monday.at("08:00").do(self._generate_weekly_report) # Scheduler in separatem Thread def run_scheduler(): while self.is_running: schedule.run_pending() time.sleep(60) # Check every minute self.is_running = True scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) scheduler_thread.start() logger.info("Wartungs-Scheduler gestartet") def create_task(self, task: MaintenanceTask) -> int: """Erstellt eine neue Wartungsaufgabe""" task.id = self.next_task_id self.next_task_id += 1 task.created_at = datetime.now() self.tasks[task.id] = task # Automatische Terminierung für vorbeugende Wartungen if task.maintenance_type == MaintenanceType.PREVENTIVE and not task.scheduled_date: task.scheduled_date = self._calculate_next_maintenance_date(task.printer_id) # Benachrichtigungen senden self._send_task_notifications(task, "created") logger.info(f"Wartungsaufgabe erstellt: {task.title} für Drucker {task.printer_id}") return task.id def update_task_status(self, task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool: """Aktualisiert den Status einer Wartungsaufgabe""" if task_id not in self.tasks: return False task = self.tasks[task_id] old_status = task.status task.status = new_status # Zeitstempel setzen if new_status == MaintenanceStatus.IN_PROGRESS: task.started_at = datetime.now() elif new_status == MaintenanceStatus.COMPLETED: task.completed_at = datetime.now() if task.started_at: task.actual_duration = int((task.completed_at - task.started_at).total_seconds() / 60) # Zur Historie hinzufügen self.maintenance_history.append(task) # Nächste Wartung planen self._schedule_next_maintenance(task) if notes: task.notes += f"\n{datetime.now().strftime('%d.%m.%Y %H:%M')}: {notes}" # Benachrichtigungen senden if old_status != new_status: self._send_task_notifications(task, "status_changed") logger.info(f"Wartungsaufgabe {task_id} Status: {old_status.value} → {new_status.value}") return True def schedule_maintenance(self, printer_id: int, maintenance_type: MaintenanceType, interval_days: int, description: str = "") -> MaintenanceSchedule: """Plant regelmäßige Wartungen""" schedule_item = MaintenanceSchedule( printer_id=printer_id, maintenance_type=maintenance_type, interval_days=interval_days, next_due=datetime.now() + timedelta(days=interval_days), description=description ) if printer_id not in self.schedules: self.schedules[printer_id] = [] self.schedules[printer_id].append(schedule_item) logger.info(f"Wartungsplan erstellt: {maintenance_type.value} alle {interval_days} Tage für Drucker {printer_id}") return schedule_item def get_upcoming_maintenance(self, days_ahead: int = 7) -> List[MaintenanceTask]: """Holt anstehende Wartungen""" cutoff_date = datetime.now() + timedelta(days=days_ahead) upcoming = [] for task in self.tasks.values(): if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and task.due_date and task.due_date <= cutoff_date): upcoming.append(task) return sorted(upcoming, key=lambda t: t.due_date or datetime.max) def get_overdue_tasks(self) -> List[MaintenanceTask]: """Holt überfällige Wartungen""" now = datetime.now() overdue = [] for task in self.tasks.values(): if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and task.due_date and task.due_date < now): task.status = MaintenanceStatus.OVERDUE overdue.append(task) return overdue def get_maintenance_metrics(self, printer_id: Optional[int] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> MaintenanceMetrics: """Berechnet Wartungsmetriken""" # Filter tasks tasks = self.maintenance_history.copy() if printer_id: tasks = [t for t in tasks if t.printer_id == printer_id] if start_date: tasks = [t for t in tasks if t.completed_at and t.completed_at >= start_date] if end_date: tasks = [t for t in tasks if t.completed_at and t.completed_at <= end_date] if not tasks: return MaintenanceMetrics() completed_tasks = [t for t in tasks if t.status == MaintenanceStatus.COMPLETED] # Grundmetriken total_tasks = len(tasks) completed_count = len(completed_tasks) # Durchschnittliche Bearbeitungszeit completion_times = [t.actual_duration for t in completed_tasks if t.actual_duration] avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else 0 # Gesamtkosten total_cost = sum(t.cost for t in completed_tasks if t.cost) # MTBF und MTTR berechnen mtbf = self._calculate_mtbf(tasks, printer_id) mttr = avg_completion_time / 60 # Konvertiere zu Stunden # Verfügbarkeit berechnen uptime_percentage = self._calculate_uptime(printer_id, start_date, end_date) return MaintenanceMetrics( total_tasks=total_tasks, completed_tasks=completed_count, overdue_tasks=len(self.get_overdue_tasks()), average_completion_time=avg_completion_time, total_cost=total_cost, mtbf=mtbf, mttr=mttr, uptime_percentage=uptime_percentage ) def create_maintenance_checklist(self, maintenance_type: MaintenanceType) -> List[Dict[str, Any]]: """Erstellt eine Wartungs-Checkliste""" checklists = { MaintenanceType.PREVENTIVE: [ {"task": "Drucker äußerlich reinigen", "completed": False, "required": True}, {"task": "Druckbett-Level prüfen", "completed": False, "required": True}, {"task": "Extruder-Düse reinigen", "completed": False, "required": True}, {"task": "Riemen-Spannung prüfen", "completed": False, "required": True}, {"task": "Filament-Führung prüfen", "completed": False, "required": False}, {"task": "Software-Updates prüfen", "completed": False, "required": False}, {"task": "Lüfter reinigen", "completed": False, "required": True}, {"task": "Schrauben nachziehen", "completed": False, "required": False} ], MaintenanceType.CORRECTIVE: [ {"task": "Problem-Diagnose durchführen", "completed": False, "required": True}, {"task": "Defekte Teile identifizieren", "completed": False, "required": True}, {"task": "Ersatzteile bestellen/bereitstellen", "completed": False, "required": True}, {"task": "Reparatur durchführen", "completed": False, "required": True}, {"task": "Funktionstest durchführen", "completed": False, "required": True}, {"task": "Kalibrierung prüfen", "completed": False, "required": True} ], MaintenanceType.INSPECTION: [ {"task": "Sichtprüfung der Mechanik", "completed": False, "required": True}, {"task": "Druckqualität testen", "completed": False, "required": True}, {"task": "Temperaturen prüfen", "completed": False, "required": True}, {"task": "Bewegungen testen", "completed": False, "required": True}, {"task": "Verschleiß bewerten", "completed": False, "required": True} ] } return checklists.get(maintenance_type, []) def _check_scheduled_maintenance(self): """Prüft täglich auf fällige Wartungen""" logger.info("Prüfe fällige Wartungen...") today = datetime.now() for printer_id, schedules in self.schedules.items(): for schedule_item in schedules: if not schedule_item.is_active: continue if schedule_item.next_due <= today: # Erstelle Wartungsaufgabe task = MaintenanceTask( printer_id=printer_id, title=f"{schedule_item.maintenance_type.value.title()} Wartung", description=schedule_item.description, maintenance_type=schedule_item.maintenance_type, priority=MaintenancePriority.NORMAL, due_date=schedule_item.next_due, checklist=self.create_maintenance_checklist(schedule_item.maintenance_type) ) task_id = self.create_task(task) # Nächsten Termin berechnen schedule_item.next_due = today + timedelta(days=schedule_item.interval_days) logger.info(f"Automatische Wartungsaufgabe erstellt: {task_id}") def _check_overdue_tasks(self): """Prüft stündlich auf überfällige Aufgaben""" overdue = self.get_overdue_tasks() if overdue: logger.warning(f"{len(overdue)} überfällige Wartungsaufgaben gefunden") for task in overdue: emit_system_alert( f"Wartung überfällig: {task.title} (Drucker {task.printer_id})", "warning", "high" ) def _generate_weekly_report(self): """Generiert wöchentlichen Wartungsbericht""" logger.info("Generiere wöchentlichen Wartungsbericht...") # Sammle Daten der letzten Woche last_week = datetime.now() - timedelta(days=7) metrics = self.get_maintenance_metrics(start_date=last_week) # Sende Report (Implementation abhängig von verfügbaren Services) # send_maintenance_report(metrics) def _calculate_next_maintenance_date(self, printer_id: int) -> datetime: """Berechnet nächstes Wartungsdatum basierend auf Nutzung""" # Vereinfachte Implementierung - kann erweitert werden base_interval = 30 # Tage # Hier könnte man Nutzungsstatistiken einbeziehen with get_db_session() as db_session: printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if printer: # Berücksichtige letzten Check if printer.last_checked: days_since_check = (datetime.now() - printer.last_checked).days if days_since_check < 15: # Kürzlich gecheckt base_interval += 15 return datetime.now() + timedelta(days=base_interval) def _schedule_next_maintenance(self, completed_task: MaintenanceTask): """Plant nächste Wartung nach Abschluss einer Aufgabe""" if completed_task.maintenance_type == MaintenanceType.PREVENTIVE: # Finde entsprechenden Schedule printer_schedules = self.schedules.get(completed_task.printer_id, []) for schedule_item in printer_schedules: if schedule_item.maintenance_type == completed_task.maintenance_type: schedule_item.last_completed = completed_task.completed_at schedule_item.next_due = datetime.now() + timedelta(days=schedule_item.interval_days) break def _calculate_mtbf(self, tasks: List[MaintenanceTask], printer_id: Optional[int]) -> float: """Berechnet Mean Time Between Failures""" # Vereinfachte MTBF-Berechnung failure_tasks = [t for t in tasks if t.maintenance_type == MaintenanceType.CORRECTIVE] if len(failure_tasks) < 2: return 0.0 # Zeitspanne zwischen ersten und letzten Ausfall first_failure = min(failure_tasks, key=lambda t: t.created_at) last_failure = max(failure_tasks, key=lambda t: t.created_at) total_time = (last_failure.created_at - first_failure.created_at).total_seconds() / 3600 # Stunden failure_count = len(failure_tasks) - 1 return total_time / failure_count if failure_count > 0 else 0.0 def _calculate_uptime(self, printer_id: Optional[int], start_date: Optional[datetime], end_date: Optional[datetime]) -> float: """Berechnet Verfügbarkeit in Prozent""" # Vereinfachte Uptime-Berechnung if not start_date: start_date = datetime.now() - timedelta(days=30) if not end_date: end_date = datetime.now() total_time = (end_date - start_date).total_seconds() # Berechne Downtime aus Wartungszeiten downtime = 0 for task in self.maintenance_history: if printer_id and task.printer_id != printer_id: continue if (task.status == MaintenanceStatus.COMPLETED and task.started_at and task.completed_at and task.started_at >= start_date and task.completed_at <= end_date): downtime += (task.completed_at - task.started_at).total_seconds() uptime = ((total_time - downtime) / total_time) * 100 if total_time > 0 else 0 return max(0, min(100, uptime)) def _send_task_notifications(self, task: MaintenanceTask, event_type: str): """Sendet Benachrichtigungen für Wartungsaufgaben""" try: if event_type == "created": emit_system_alert( f"Neue Wartungsaufgabe: {task.title} (Drucker {task.printer_id})", "info", "normal" ) elif event_type == "status_changed": emit_system_alert( f"Wartungsstatus geändert: {task.title} → {task.status.value}", "info", "normal" ) except Exception as e: logger.error(f"Fehler beim Senden der Wartungsbenachrichtigung: {str(e)}") # Globale Instanz maintenance_manager = MaintenanceManager() def get_maintenance_dashboard_data() -> Dict[str, Any]: """Holt Dashboard-Daten für Wartungen""" upcoming = maintenance_manager.get_upcoming_maintenance() overdue = maintenance_manager.get_overdue_tasks() metrics = maintenance_manager.get_maintenance_metrics() return { 'upcoming_count': len(upcoming), 'overdue_count': len(overdue), 'upcoming_tasks': [asdict(task) for task in upcoming[:5]], 'overdue_tasks': [asdict(task) for task in overdue], 'metrics': asdict(metrics), 'next_scheduled': upcoming[0] if upcoming else None } def create_emergency_maintenance(printer_id: int, description: str, priority: MaintenancePriority = MaintenancePriority.CRITICAL) -> int: """Erstellt eine Notfall-Wartung""" task = MaintenanceTask( printer_id=printer_id, title="Notfall-Wartung", description=description, maintenance_type=MaintenanceType.EMERGENCY, priority=priority, due_date=datetime.now(), # Sofort fällig checklist=maintenance_manager.create_maintenance_checklist(MaintenanceType.CORRECTIVE) ) return maintenance_manager.create_task(task) def schedule_preventive_maintenance(printer_id: int, interval_days: int = 30) -> MaintenanceSchedule: """Plant vorbeugende Wartung""" return maintenance_manager.schedule_maintenance( printer_id=printer_id, maintenance_type=MaintenanceType.PREVENTIVE, interval_days=interval_days, description="Regelmäßige vorbeugende Wartung" ) # JavaScript für Wartungs-Frontend def get_maintenance_javascript() -> str: """JavaScript für Wartungsmanagement""" return """ class MaintenanceManager { constructor() { this.currentTasks = []; this.selectedTask = null; this.init(); } init() { this.loadTasks(); this.setupEventListeners(); this.startAutoRefresh(); } setupEventListeners() { // Task status updates document.addEventListener('click', (e) => { if (e.target.matches('.maintenance-status-btn')) { const taskId = e.target.dataset.taskId; const newStatus = e.target.dataset.status; this.updateTaskStatus(taskId, newStatus); } if (e.target.matches('.maintenance-details-btn')) { const taskId = e.target.dataset.taskId; this.showTaskDetails(taskId); } }); // Create maintenance form const createForm = document.getElementById('create-maintenance-form'); createForm?.addEventListener('submit', (e) => { e.preventDefault(); this.createTask(new FormData(createForm)); }); } async loadTasks() { try { const response = await fetch('/api/maintenance/tasks'); const data = await response.json(); if (data.success) { this.currentTasks = data.tasks; this.renderTasks(); } } catch (error) { console.error('Fehler beim Laden der Wartungsaufgaben:', error); } } async updateTaskStatus(taskId, newStatus) { try { const response = await fetch(`/api/maintenance/tasks/${taskId}/status`, { method: 'PUT', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status: newStatus }) }); const result = await response.json(); if (result.success) { this.loadTasks(); // Refresh this.showNotification('Wartungsstatus aktualisiert', 'success'); } else { this.showNotification('Fehler beim Aktualisieren', 'error'); } } catch (error) { console.error('Status-Update fehlgeschlagen:', error); } } renderTasks() { const container = document.getElementById('maintenance-tasks-container'); if (!container) return; container.innerHTML = this.currentTasks.map(task => `

${task.title}

${task.priority}

Drucker: ${task.printer_id}

Typ: ${task.maintenance_type}

Fällig: ${this.formatDate(task.due_date)}

Status: ${task.status}

`).join(''); } showTaskDetails(taskId) { const task = this.currentTasks.find(t => t.id == taskId); if (!task) return; // Create modal with task details const modal = document.createElement('div'); modal.className = 'maintenance-modal'; modal.innerHTML = ` `; document.body.appendChild(modal); // Close modal handlers modal.querySelector('.close-modal').onclick = () => modal.remove(); modal.onclick = (e) => { if (e.target === modal) modal.remove(); }; } renderChecklist(checklist) { return `

Checkliste:

${checklist.map((item, index) => ` `).join('')}
`; } formatDate(dateString) { if (!dateString) return 'Nicht gesetzt'; const date = new Date(dateString); return date.toLocaleDateString('de-DE') + ' ' + date.toLocaleTimeString('de-DE', {hour: '2-digit', minute: '2-digit'}); } showNotification(message, type = 'info') { const notification = document.createElement('div'); notification.className = `notification notification-${type}`; notification.textContent = message; document.body.appendChild(notification); setTimeout(() => { notification.remove(); }, 3000); } startAutoRefresh() { setInterval(() => { this.loadTasks(); }, 30000); // Refresh every 30 seconds } } // Initialize when DOM is ready document.addEventListener('DOMContentLoaded', function() { window.maintenanceManager = new MaintenanceManager(); }); """ def create_maintenance_task(printer_id: int, title: str, description: str = "", maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE, priority: MaintenancePriority = MaintenancePriority.NORMAL) -> int: """ Erstellt eine neue Wartungsaufgabe. Args: printer_id: ID des Druckers title: Titel der Wartungsaufgabe description: Beschreibung der Aufgabe maintenance_type: Art der Wartung priority: Priorität der Aufgabe Returns: int: ID der erstellten Aufgabe """ task = MaintenanceTask( printer_id=printer_id, title=title, description=description, maintenance_type=maintenance_type, priority=priority, checklist=maintenance_manager.create_maintenance_checklist(maintenance_type) ) return maintenance_manager.create_task(task) def schedule_maintenance(printer_id: int, maintenance_type: MaintenanceType, interval_days: int, description: str = "") -> MaintenanceSchedule: """ Plant regelmäßige Wartungen (Alias für maintenance_manager.schedule_maintenance). Args: printer_id: ID des Druckers maintenance_type: Art der Wartung interval_days: Intervall in Tagen description: Beschreibung Returns: MaintenanceSchedule: Erstellter Wartungsplan """ return maintenance_manager.schedule_maintenance( printer_id=printer_id, maintenance_type=maintenance_type, interval_days=interval_days, description=description ) def get_maintenance_overview() -> Dict[str, Any]: """ Holt eine Übersicht aller Wartungsaktivitäten. Returns: Dict: Wartungsübersicht mit Statistiken und anstehenden Aufgaben """ upcoming = maintenance_manager.get_upcoming_maintenance() overdue = maintenance_manager.get_overdue_tasks() metrics = maintenance_manager.get_maintenance_metrics() # Aktive Tasks active_tasks = [task for task in maintenance_manager.tasks.values() if task.status == MaintenanceStatus.IN_PROGRESS] # Completed tasks in last 30 days thirty_days_ago = datetime.now() - timedelta(days=30) recent_completed = [task for task in maintenance_manager.maintenance_history if task.completed_at and task.completed_at >= thirty_days_ago] return { 'summary': { 'total_tasks': len(maintenance_manager.tasks), 'active_tasks': len(active_tasks), 'upcoming_tasks': len(upcoming), 'overdue_tasks': len(overdue), 'completed_this_month': len(recent_completed) }, 'upcoming_tasks': [asdict(task) for task in upcoming[:10]], 'overdue_tasks': [asdict(task) for task in overdue], 'active_tasks': [asdict(task) for task in active_tasks], 'recent_completed': [asdict(task) for task in recent_completed[:5]], 'metrics': asdict(metrics), 'schedules': { printer_id: [asdict(schedule) for schedule in schedules] for printer_id, schedules in maintenance_manager.schedules.items() } } def update_maintenance_status(task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool: """ Aktualisiert den Status einer Wartungsaufgabe (Alias für maintenance_manager.update_task_status). Args: task_id: ID der Wartungsaufgabe new_status: Neuer Status notes: Optionale Notizen Returns: bool: True wenn erfolgreich aktualisiert """ return maintenance_manager.update_task_status(task_id, new_status, notes)