manage-your-printer/utils/maintenance_system.py
2025-06-04 10:03:22 +02:00

790 lines
32 KiB
Python

"""
Wartungsplanungs- und Tracking-System für das MYP-System
========================================================
Dieses Modul stellt umfassende Wartungsfunktionalität bereit:
- Geplante und ungeplante Wartungen
- Wartungsintervalle und Erinnerungen
- Wartungshistorie und Berichte
- Automatische Wartungsprüfungen
- Ersatzteil-Management
- Techniker-Zuweisungen
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import schedule
import time
from utils.logging_config import get_logger
from models import Printer, get_db_session
from utils.email_notification import send_email_notification
from utils.realtime_dashboard import emit_system_alert
logger = get_logger("maintenance")
class MaintenanceType(Enum):
"""Arten von Wartungen"""
PREVENTIVE = "preventive" # Vorbeugende Wartung
CORRECTIVE = "corrective" # Reparatur/Korrektur
EMERGENCY = "emergency" # Notfall-Wartung
SCHEDULED = "scheduled" # Geplante Wartung
INSPECTION = "inspection" # Inspektion
class MaintenanceStatus(Enum):
"""Status einer Wartung"""
PLANNED = "planned" # Geplant
SCHEDULED = "scheduled" # Terminiert
IN_PROGRESS = "in_progress" # In Bearbeitung
COMPLETED = "completed" # Abgeschlossen
CANCELLED = "cancelled" # Abgebrochen
OVERDUE = "overdue" # Überfällig
class MaintenancePriority(Enum):
"""Priorität einer Wartung"""
LOW = "low" # Niedrig
NORMAL = "normal" # Normal
HIGH = "high" # Hoch
CRITICAL = "critical" # Kritisch
EMERGENCY = "emergency" # Notfall
@dataclass
class MaintenanceTask:
"""Wartungsaufgabe"""
id: Optional[int] = None
printer_id: int = None
title: str = ""
description: str = ""
maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE
priority: MaintenancePriority = MaintenancePriority.NORMAL
status: MaintenanceStatus = MaintenanceStatus.PLANNED
scheduled_date: Optional[datetime] = None
due_date: Optional[datetime] = None
estimated_duration: int = 60 # Minuten
actual_duration: Optional[int] = None
assigned_technician: Optional[str] = None
created_at: datetime = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: str = ""
required_parts: List[str] = None
actual_parts_used: List[str] = None
cost: Optional[float] = None
checklist: List[Dict[str, Any]] = None
photos: List[str] = None
created_by: Optional[int] = None
@dataclass
class MaintenanceSchedule:
"""Wartungsplan"""
printer_id: int
maintenance_type: MaintenanceType
interval_days: int
next_due: datetime
last_completed: Optional[datetime] = None
is_active: bool = True
description: str = ""
checklist_template: List[str] = None
@dataclass
class MaintenanceMetrics:
"""Wartungsmetriken"""
total_tasks: int = 0
completed_tasks: int = 0
overdue_tasks: int = 0
average_completion_time: float = 0.0
total_cost: float = 0.0
mtbf: float = 0.0 # Mean Time Between Failures
mttr: float = 0.0 # Mean Time To Repair
uptime_percentage: float = 0.0
class MaintenanceManager:
"""Manager für Wartungsplanung und -tracking"""
def __init__(self):
self.tasks: Dict[int, MaintenanceTask] = {}
self.schedules: Dict[int, List[MaintenanceSchedule]] = {}
self.maintenance_history: List[MaintenanceTask] = []
self.next_task_id = 1
self.is_running = False
self._setup_scheduler()
def _setup_scheduler(self):
"""Richtet automatische Wartungsplanung ein"""
schedule.every().day.at("06:00").do(self._check_scheduled_maintenance)
schedule.every().hour.do(self._check_overdue_tasks)
schedule.every().monday.at("08:00").do(self._generate_weekly_report)
# Scheduler in separatem Thread
def run_scheduler():
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
self.is_running = True
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("Wartungs-Scheduler gestartet")
def create_task(self, task: MaintenanceTask) -> int:
"""Erstellt eine neue Wartungsaufgabe"""
task.id = self.next_task_id
self.next_task_id += 1
task.created_at = datetime.now()
self.tasks[task.id] = task
# Automatische Terminierung für vorbeugende Wartungen
if task.maintenance_type == MaintenanceType.PREVENTIVE and not task.scheduled_date:
task.scheduled_date = self._calculate_next_maintenance_date(task.printer_id)
# Benachrichtigungen senden
self._send_task_notifications(task, "created")
logger.info(f"Wartungsaufgabe erstellt: {task.title} für Drucker {task.printer_id}")
return task.id
def update_task_status(self, task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool:
"""Aktualisiert den Status einer Wartungsaufgabe"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
old_status = task.status
task.status = new_status
# Zeitstempel setzen
if new_status == MaintenanceStatus.IN_PROGRESS:
task.started_at = datetime.now()
elif new_status == MaintenanceStatus.COMPLETED:
task.completed_at = datetime.now()
if task.started_at:
task.actual_duration = int((task.completed_at - task.started_at).total_seconds() / 60)
# Zur Historie hinzufügen
self.maintenance_history.append(task)
# Nächste Wartung planen
self._schedule_next_maintenance(task)
if notes:
task.notes += f"\n{datetime.now().strftime('%d.%m.%Y %H:%M')}: {notes}"
# Benachrichtigungen senden
if old_status != new_status:
self._send_task_notifications(task, "status_changed")
logger.info(f"Wartungsaufgabe {task_id} Status: {old_status.value}{new_status.value}")
return True
def schedule_maintenance(self, printer_id: int, maintenance_type: MaintenanceType,
interval_days: int, description: str = "") -> MaintenanceSchedule:
"""Plant regelmäßige Wartungen"""
schedule_item = MaintenanceSchedule(
printer_id=printer_id,
maintenance_type=maintenance_type,
interval_days=interval_days,
next_due=datetime.now() + timedelta(days=interval_days),
description=description
)
if printer_id not in self.schedules:
self.schedules[printer_id] = []
self.schedules[printer_id].append(schedule_item)
logger.info(f"Wartungsplan erstellt: {maintenance_type.value} alle {interval_days} Tage für Drucker {printer_id}")
return schedule_item
def get_upcoming_maintenance(self, days_ahead: int = 7) -> List[MaintenanceTask]:
"""Holt anstehende Wartungen"""
cutoff_date = datetime.now() + timedelta(days=days_ahead)
upcoming = []
for task in self.tasks.values():
if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
task.due_date and task.due_date <= cutoff_date):
upcoming.append(task)
return sorted(upcoming, key=lambda t: t.due_date or datetime.max)
def get_overdue_tasks(self) -> List[MaintenanceTask]:
"""Holt überfällige Wartungen"""
now = datetime.now()
overdue = []
for task in self.tasks.values():
if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
task.due_date and task.due_date < now):
task.status = MaintenanceStatus.OVERDUE
overdue.append(task)
return overdue
def get_maintenance_metrics(self, printer_id: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> MaintenanceMetrics:
"""Berechnet Wartungsmetriken"""
# Filter tasks
tasks = self.maintenance_history.copy()
if printer_id:
tasks = [t for t in tasks if t.printer_id == printer_id]
if start_date:
tasks = [t for t in tasks if t.completed_at and t.completed_at >= start_date]
if end_date:
tasks = [t for t in tasks if t.completed_at and t.completed_at <= end_date]
if not tasks:
return MaintenanceMetrics()
completed_tasks = [t for t in tasks if t.status == MaintenanceStatus.COMPLETED]
# Grundmetriken
total_tasks = len(tasks)
completed_count = len(completed_tasks)
# Durchschnittliche Bearbeitungszeit
completion_times = [t.actual_duration for t in completed_tasks if t.actual_duration]
avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else 0
# Gesamtkosten
total_cost = sum(t.cost for t in completed_tasks if t.cost)
# MTBF und MTTR berechnen
mtbf = self._calculate_mtbf(tasks, printer_id)
mttr = avg_completion_time / 60 # Konvertiere zu Stunden
# Verfügbarkeit berechnen
uptime_percentage = self._calculate_uptime(printer_id, start_date, end_date)
return MaintenanceMetrics(
total_tasks=total_tasks,
completed_tasks=completed_count,
overdue_tasks=len(self.get_overdue_tasks()),
average_completion_time=avg_completion_time,
total_cost=total_cost,
mtbf=mtbf,
mttr=mttr,
uptime_percentage=uptime_percentage
)
def create_maintenance_checklist(self, maintenance_type: MaintenanceType) -> List[Dict[str, Any]]:
"""Erstellt eine Wartungs-Checkliste"""
checklists = {
MaintenanceType.PREVENTIVE: [
{"task": "Drucker äußerlich reinigen", "completed": False, "required": True},
{"task": "Druckbett-Level prüfen", "completed": False, "required": True},
{"task": "Extruder-Düse reinigen", "completed": False, "required": True},
{"task": "Riemen-Spannung prüfen", "completed": False, "required": True},
{"task": "Filament-Führung prüfen", "completed": False, "required": False},
{"task": "Software-Updates prüfen", "completed": False, "required": False},
{"task": "Lüfter reinigen", "completed": False, "required": True},
{"task": "Schrauben nachziehen", "completed": False, "required": False}
],
MaintenanceType.CORRECTIVE: [
{"task": "Problem-Diagnose durchführen", "completed": False, "required": True},
{"task": "Defekte Teile identifizieren", "completed": False, "required": True},
{"task": "Ersatzteile bestellen/bereitstellen", "completed": False, "required": True},
{"task": "Reparatur durchführen", "completed": False, "required": True},
{"task": "Funktionstest durchführen", "completed": False, "required": True},
{"task": "Kalibrierung prüfen", "completed": False, "required": True}
],
MaintenanceType.INSPECTION: [
{"task": "Sichtprüfung der Mechanik", "completed": False, "required": True},
{"task": "Druckqualität testen", "completed": False, "required": True},
{"task": "Temperaturen prüfen", "completed": False, "required": True},
{"task": "Bewegungen testen", "completed": False, "required": True},
{"task": "Verschleiß bewerten", "completed": False, "required": True}
]
}
return checklists.get(maintenance_type, [])
def _check_scheduled_maintenance(self):
"""Prüft täglich auf fällige Wartungen"""
logger.info("Prüfe fällige Wartungen...")
today = datetime.now()
for printer_id, schedules in self.schedules.items():
for schedule_item in schedules:
if not schedule_item.is_active:
continue
if schedule_item.next_due <= today:
# Erstelle Wartungsaufgabe
task = MaintenanceTask(
printer_id=printer_id,
title=f"{schedule_item.maintenance_type.value.title()} Wartung",
description=schedule_item.description,
maintenance_type=schedule_item.maintenance_type,
priority=MaintenancePriority.NORMAL,
due_date=schedule_item.next_due,
checklist=self.create_maintenance_checklist(schedule_item.maintenance_type)
)
task_id = self.create_task(task)
# Nächsten Termin berechnen
schedule_item.next_due = today + timedelta(days=schedule_item.interval_days)
logger.info(f"Automatische Wartungsaufgabe erstellt: {task_id}")
def _check_overdue_tasks(self):
"""Prüft stündlich auf überfällige Aufgaben"""
overdue = self.get_overdue_tasks()
if overdue:
logger.warning(f"{len(overdue)} überfällige Wartungsaufgaben gefunden")
for task in overdue:
emit_system_alert(
f"Wartung überfällig: {task.title} (Drucker {task.printer_id})",
"warning",
"high"
)
def _generate_weekly_report(self):
"""Generiert wöchentlichen Wartungsbericht"""
logger.info("Generiere wöchentlichen Wartungsbericht...")
# Sammle Daten der letzten Woche
last_week = datetime.now() - timedelta(days=7)
metrics = self.get_maintenance_metrics(start_date=last_week)
# Sende Report (Implementation abhängig von verfügbaren Services)
# send_maintenance_report(metrics)
def _calculate_next_maintenance_date(self, printer_id: int) -> datetime:
"""Berechnet nächstes Wartungsdatum basierend auf Nutzung"""
# Vereinfachte Implementierung - kann erweitert werden
base_interval = 30 # Tage
# Hier könnte man Nutzungsstatistiken einbeziehen
with get_db_session() as db_session:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if printer:
# Berücksichtige letzten Check
if printer.last_checked:
days_since_check = (datetime.now() - printer.last_checked).days
if days_since_check < 15: # Kürzlich gecheckt
base_interval += 15
return datetime.now() + timedelta(days=base_interval)
def _schedule_next_maintenance(self, completed_task: MaintenanceTask):
"""Plant nächste Wartung nach Abschluss einer Aufgabe"""
if completed_task.maintenance_type == MaintenanceType.PREVENTIVE:
# Finde entsprechenden Schedule
printer_schedules = self.schedules.get(completed_task.printer_id, [])
for schedule_item in printer_schedules:
if schedule_item.maintenance_type == completed_task.maintenance_type:
schedule_item.last_completed = completed_task.completed_at
schedule_item.next_due = datetime.now() + timedelta(days=schedule_item.interval_days)
break
def _calculate_mtbf(self, tasks: List[MaintenanceTask], printer_id: Optional[int]) -> float:
"""Berechnet Mean Time Between Failures"""
# Vereinfachte MTBF-Berechnung
failure_tasks = [t for t in tasks if t.maintenance_type == MaintenanceType.CORRECTIVE]
if len(failure_tasks) < 2:
return 0.0
# Zeitspanne zwischen ersten und letzten Ausfall
first_failure = min(failure_tasks, key=lambda t: t.created_at)
last_failure = max(failure_tasks, key=lambda t: t.created_at)
total_time = (last_failure.created_at - first_failure.created_at).total_seconds() / 3600 # Stunden
failure_count = len(failure_tasks) - 1
return total_time / failure_count if failure_count > 0 else 0.0
def _calculate_uptime(self, printer_id: Optional[int], start_date: Optional[datetime],
end_date: Optional[datetime]) -> float:
"""Berechnet Verfügbarkeit in Prozent"""
# Vereinfachte Uptime-Berechnung
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
total_time = (end_date - start_date).total_seconds()
# Berechne Downtime aus Wartungszeiten
downtime = 0
for task in self.maintenance_history:
if printer_id and task.printer_id != printer_id:
continue
if (task.status == MaintenanceStatus.COMPLETED and
task.started_at and task.completed_at and
task.started_at >= start_date and task.completed_at <= end_date):
downtime += (task.completed_at - task.started_at).total_seconds()
uptime = ((total_time - downtime) / total_time) * 100 if total_time > 0 else 0
return max(0, min(100, uptime))
def _send_task_notifications(self, task: MaintenanceTask, event_type: str):
"""Sendet Benachrichtigungen für Wartungsaufgaben"""
try:
if event_type == "created":
emit_system_alert(
f"Neue Wartungsaufgabe: {task.title} (Drucker {task.printer_id})",
"info",
"normal"
)
elif event_type == "status_changed":
emit_system_alert(
f"Wartungsstatus geändert: {task.title}{task.status.value}",
"info",
"normal"
)
except Exception as e:
logger.error(f"Fehler beim Senden der Wartungsbenachrichtigung: {str(e)}")
# Globale Instanz
maintenance_manager = MaintenanceManager()
def get_maintenance_dashboard_data() -> Dict[str, Any]:
"""Holt Dashboard-Daten für Wartungen"""
upcoming = maintenance_manager.get_upcoming_maintenance()
overdue = maintenance_manager.get_overdue_tasks()
metrics = maintenance_manager.get_maintenance_metrics()
return {
'upcoming_count': len(upcoming),
'overdue_count': len(overdue),
'upcoming_tasks': [asdict(task) for task in upcoming[:5]],
'overdue_tasks': [asdict(task) for task in overdue],
'metrics': asdict(metrics),
'next_scheduled': upcoming[0] if upcoming else None
}
def create_emergency_maintenance(printer_id: int, description: str,
priority: MaintenancePriority = MaintenancePriority.CRITICAL) -> int:
"""Erstellt eine Notfall-Wartung"""
task = MaintenanceTask(
printer_id=printer_id,
title="Notfall-Wartung",
description=description,
maintenance_type=MaintenanceType.EMERGENCY,
priority=priority,
due_date=datetime.now(), # Sofort fällig
checklist=maintenance_manager.create_maintenance_checklist(MaintenanceType.CORRECTIVE)
)
return maintenance_manager.create_task(task)
def schedule_preventive_maintenance(printer_id: int, interval_days: int = 30) -> MaintenanceSchedule:
"""Plant vorbeugende Wartung"""
return maintenance_manager.schedule_maintenance(
printer_id=printer_id,
maintenance_type=MaintenanceType.PREVENTIVE,
interval_days=interval_days,
description="Regelmäßige vorbeugende Wartung"
)
# JavaScript für Wartungs-Frontend
def get_maintenance_javascript() -> str:
"""JavaScript für Wartungsmanagement"""
return """
class MaintenanceManager {
constructor() {
this.currentTasks = [];
this.selectedTask = null;
this.init();
}
init() {
this.loadTasks();
this.setupEventListeners();
this.startAutoRefresh();
}
setupEventListeners() {
// Task status updates
document.addEventListener('click', (e) => {
if (e.target.matches('.maintenance-status-btn')) {
const taskId = e.target.dataset.taskId;
const newStatus = e.target.dataset.status;
this.updateTaskStatus(taskId, newStatus);
}
if (e.target.matches('.maintenance-details-btn')) {
const taskId = e.target.dataset.taskId;
this.showTaskDetails(taskId);
}
});
// Create maintenance form
const createForm = document.getElementById('create-maintenance-form');
createForm?.addEventListener('submit', (e) => {
e.preventDefault();
this.createTask(new FormData(createForm));
});
}
async loadTasks() {
try {
const response = await fetch('/api/maintenance/tasks');
const data = await response.json();
if (data.success) {
this.currentTasks = data.tasks;
this.renderTasks();
}
} catch (error) {
console.error('Fehler beim Laden der Wartungsaufgaben:', error);
}
}
async updateTaskStatus(taskId, newStatus) {
try {
const response = await fetch(`/api/maintenance/tasks/${taskId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: newStatus })
});
const result = await response.json();
if (result.success) {
this.loadTasks(); // Refresh
this.showNotification('Wartungsstatus aktualisiert', 'success');
} else {
this.showNotification('Fehler beim Aktualisieren', 'error');
}
} catch (error) {
console.error('Status-Update fehlgeschlagen:', error);
}
}
renderTasks() {
const container = document.getElementById('maintenance-tasks-container');
if (!container) return;
container.innerHTML = this.currentTasks.map(task => `
<div class="maintenance-task-card ${task.status} priority-${task.priority}">
<div class="task-header">
<h3>${task.title}</h3>
<span class="task-priority">${task.priority}</span>
</div>
<div class="task-info">
<p><strong>Drucker:</strong> ${task.printer_id}</p>
<p><strong>Typ:</strong> ${task.maintenance_type}</p>
<p><strong>Fällig:</strong> ${this.formatDate(task.due_date)}</p>
<p><strong>Status:</strong> ${task.status}</p>
</div>
<div class="task-actions">
<button class="maintenance-status-btn" data-task-id="${task.id}" data-status="in_progress">
Starten
</button>
<button class="maintenance-status-btn" data-task-id="${task.id}" data-status="completed">
Abschließen
</button>
<button class="maintenance-details-btn" data-task-id="${task.id}">
Details
</button>
</div>
</div>
`).join('');
}
showTaskDetails(taskId) {
const task = this.currentTasks.find(t => t.id == taskId);
if (!task) return;
// Create modal with task details
const modal = document.createElement('div');
modal.className = 'maintenance-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>${task.title}</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="task-details">
<p><strong>Beschreibung:</strong> ${task.description}</p>
<p><strong>Techniker:</strong> ${task.assigned_technician || 'Nicht zugewiesen'}</p>
<p><strong>Geschätzte Dauer:</strong> ${task.estimated_duration} Minuten</p>
${task.checklist ? this.renderChecklist(task.checklist) : ''}
<div class="task-notes">
<h4>Notizen:</h4>
<textarea id="task-notes-${taskId}" rows="4" cols="50">${task.notes || ''}</textarea>
<button onclick="maintenanceManager.saveNotes(${taskId})">Notizen speichern</button>
</div>
</div>
</div>
</div>
`;
document.body.appendChild(modal);
// Close modal handlers
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderChecklist(checklist) {
return `
<div class="maintenance-checklist">
<h4>Checkliste:</h4>
${checklist.map((item, index) => `
<label class="checklist-item">
<input type="checkbox" ${item.completed ? 'checked' : ''}
onchange="maintenanceManager.updateChecklistItem(${index}, this.checked)">
${item.task}
${item.required ? '<span class="required">*</span>' : ''}
</label>
`).join('')}
</div>
`;
}
formatDate(dateString) {
if (!dateString) return 'Nicht gesetzt';
const date = new Date(dateString);
return date.toLocaleDateString('de-DE') + ' ' + date.toLocaleTimeString('de-DE', {hour: '2-digit', minute: '2-digit'});
}
showNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.className = `notification notification-${type}`;
notification.textContent = message;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 3000);
}
startAutoRefresh() {
setInterval(() => {
this.loadTasks();
}, 30000); // Refresh every 30 seconds
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', function() {
window.maintenanceManager = new MaintenanceManager();
});
"""
def create_maintenance_task(printer_id: int, title: str, description: str = "",
maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE,
priority: MaintenancePriority = MaintenancePriority.NORMAL) -> int:
"""
Erstellt eine neue Wartungsaufgabe.
Args:
printer_id: ID des Druckers
title: Titel der Wartungsaufgabe
description: Beschreibung der Aufgabe
maintenance_type: Art der Wartung
priority: Priorität der Aufgabe
Returns:
int: ID der erstellten Aufgabe
"""
task = MaintenanceTask(
printer_id=printer_id,
title=title,
description=description,
maintenance_type=maintenance_type,
priority=priority,
checklist=maintenance_manager.create_maintenance_checklist(maintenance_type)
)
return maintenance_manager.create_task(task)
def schedule_maintenance(printer_id: int, maintenance_type: MaintenanceType,
interval_days: int, description: str = "") -> MaintenanceSchedule:
"""
Plant regelmäßige Wartungen (Alias für maintenance_manager.schedule_maintenance).
Args:
printer_id: ID des Druckers
maintenance_type: Art der Wartung
interval_days: Intervall in Tagen
description: Beschreibung
Returns:
MaintenanceSchedule: Erstellter Wartungsplan
"""
return maintenance_manager.schedule_maintenance(
printer_id=printer_id,
maintenance_type=maintenance_type,
interval_days=interval_days,
description=description
)
def get_maintenance_overview() -> Dict[str, Any]:
"""
Holt eine Übersicht aller Wartungsaktivitäten.
Returns:
Dict: Wartungsübersicht mit Statistiken und anstehenden Aufgaben
"""
upcoming = maintenance_manager.get_upcoming_maintenance()
overdue = maintenance_manager.get_overdue_tasks()
metrics = maintenance_manager.get_maintenance_metrics()
# Aktive Tasks
active_tasks = [task for task in maintenance_manager.tasks.values()
if task.status == MaintenanceStatus.IN_PROGRESS]
# Completed tasks in last 30 days
thirty_days_ago = datetime.now() - timedelta(days=30)
recent_completed = [task for task in maintenance_manager.maintenance_history
if task.completed_at and task.completed_at >= thirty_days_ago]
return {
'summary': {
'total_tasks': len(maintenance_manager.tasks),
'active_tasks': len(active_tasks),
'upcoming_tasks': len(upcoming),
'overdue_tasks': len(overdue),
'completed_this_month': len(recent_completed)
},
'upcoming_tasks': [asdict(task) for task in upcoming[:10]],
'overdue_tasks': [asdict(task) for task in overdue],
'active_tasks': [asdict(task) for task in active_tasks],
'recent_completed': [asdict(task) for task in recent_completed[:5]],
'metrics': asdict(metrics),
'schedules': {
printer_id: [asdict(schedule) for schedule in schedules]
for printer_id, schedules in maintenance_manager.schedules.items()
}
}
def update_maintenance_status(task_id: int, new_status: MaintenanceStatus,
notes: str = "") -> bool:
"""
Aktualisiert den Status einer Wartungsaufgabe (Alias für maintenance_manager.update_task_status).
Args:
task_id: ID der Wartungsaufgabe
new_status: Neuer Status
notes: Optionale Notizen
Returns:
bool: True wenn erfolgreich aktualisiert
"""
return maintenance_manager.update_task_status(task_id, new_status, notes)