#!/usr/bin/env python3 """ Erweiterte Druckerkonflikt-Management-Engine - MYP Platform Dieses Modul behandelt alle Arten von Druckerkonflikten: - Zeitüberschneidungen - Ressourcenkonflikte - Prioritätskonflikte - Automatische Lösungsfindung - Benutzerbenachrichtigungen """ import logging from datetime import datetime, timedelta from typing import List, Dict, Tuple, Optional, Set from dataclasses import dataclass from enum import Enum from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from models import Job, Printer, User, get_cached_session # Logging setup logger = logging.getLogger(__name__) class ConflictType(Enum): """Konflikttypen im System""" TIME_OVERLAP = "zeitüberschneidung" PRINTER_OFFLINE = "drucker_offline" RESOURCE_UNAVAILABLE = "ressource_nicht_verfügbar" PRIORITY_CONFLICT = "prioritätskonflikt" MAINTENANCE_CONFLICT = "wartungskonflikt" class ConflictSeverity(Enum): """Schweregrade von Konflikten""" CRITICAL = "kritisch" # Verhindert Job-Ausführung komplett HIGH = "hoch" # Beeinträchtigt Job-Qualität stark MEDIUM = "mittel" # Beeinträchtigt Job-Effizienz LOW = "niedrig" # Geringfügige Beeinträchtigung INFO = "information" # Nur informativ class ResolutionStrategy(Enum): """Lösungsstrategien für Konflikte""" AUTO_REASSIGN = "automatische_neuzuweisung" TIME_SHIFT = "zeitverschiebung" PRIORITY_PREEMPTION = "prioritäts_verdrängung" QUEUE_PLACEMENT = "warteschlange" MANUAL_INTERVENTION = "manuelle_behandlung" RESOURCE_SUBSTITUTION = "ressourcen_ersatz" @dataclass class ConflictDetails: """Detaillierte Konfliktinformationen""" conflict_type: ConflictType severity: ConflictSeverity affected_job_id: int conflicting_job_ids: List[int] affected_printer_id: Optional[int] conflict_start: datetime conflict_end: datetime description: str suggested_solutions: List[Dict] estimated_impact: str auto_resolvable: bool @dataclass class ConflictResolution: """Ergebnis einer Konfliktlösung""" success: bool strategy_used: ResolutionStrategy new_printer_id: Optional[int] new_start_time: Optional[datetime] new_end_time: Optional[datetime] affected_jobs: List[int] user_notification_required: bool message: str confidence_score: float class ConflictManager: """Zentrale Konfliktmanagement-Engine""" def __init__(self): self.priority_weights = { 'urgent': 4, 'high': 3, 'normal': 2, 'low': 1 } self.time_slot_preferences = { 'night_shift': {'start': 18, 'end': 6, 'bonus': 25}, 'day_shift': {'start': 8, 'end': 17, 'bonus': 15}, 'transition': {'start': 6, 'end': 8, 'bonus': 5} } self.conflict_resolution_timeout = 300 # 5 Minuten def detect_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]: """ Erkennt alle möglichen Konflikte für einen geplanten Job Args: job_data: Job-Informationen (printer_id, start_time, end_time, priority) db_session: Datenbankverbindung Returns: Liste aller erkannten Konflikte """ conflicts = [] # 1. Zeitüberschneidungs-Konflikte prüfen time_conflicts = self._detect_time_conflicts(job_data, db_session) conflicts.extend(time_conflicts) # 2. Drucker-Verfügbarkeits-Konflikte prüfen printer_conflicts = self._detect_printer_conflicts(job_data, db_session) conflicts.extend(printer_conflicts) # 3. Ressourcen-Konflikte prüfen resource_conflicts = self._detect_resource_conflicts(job_data, db_session) conflicts.extend(resource_conflicts) # 4. Prioritäts-Konflikte prüfen priority_conflicts = self._detect_priority_conflicts(job_data, db_session) conflicts.extend(priority_conflicts) logger.info(f"🔍 Konfliktanalyse abgeschlossen: {len(conflicts)} Konflikte erkannt") return conflicts def _detect_time_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]: """Erkennt Zeitüberschneidungs-Konflikte""" conflicts = [] printer_id = job_data.get('printer_id') start_time = job_data.get('start_time') end_time = job_data.get('end_time') if not all([printer_id, start_time, end_time]): return conflicts # Konflikthafte Jobs finden conflicting_jobs = db_session.query(Job).filter( Job.printer_id == printer_id, Job.status.in_(["scheduled", "running"]), or_( and_(Job.start_at >= start_time, Job.start_at < end_time), and_(Job.end_at > start_time, Job.end_at <= end_time), and_(Job.start_at <= start_time, Job.end_at >= end_time) ) ).all() for conflicting_job in conflicting_jobs: # Konflikt-Schweregrad bestimmen overlap_duration = self._calculate_overlap_duration( start_time, end_time, conflicting_job.start_at, conflicting_job.end_at ) if overlap_duration.total_seconds() > 3600: # > 1 Stunde severity = ConflictSeverity.CRITICAL elif overlap_duration.total_seconds() > 1800: # > 30 Minuten severity = ConflictSeverity.HIGH else: severity = ConflictSeverity.MEDIUM # Lösungsvorschläge generieren suggestions = self._generate_time_conflict_solutions( job_data, conflicting_job, db_session ) conflict = ConflictDetails( conflict_type=ConflictType.TIME_OVERLAP, severity=severity, affected_job_id=job_data.get('job_id', 0), conflicting_job_ids=[conflicting_job.id], affected_printer_id=printer_id, conflict_start=max(start_time, conflicting_job.start_at), conflict_end=min(end_time, conflicting_job.end_at), description=f"Zeitüberschneidung mit Job '{conflicting_job.name}' " f"({overlap_duration.total_seconds()/60:.0f} Minuten)", suggested_solutions=suggestions, estimated_impact=f"Verzögerung von {overlap_duration.total_seconds()/60:.0f} Minuten", auto_resolvable=len(suggestions) > 0 ) conflicts.append(conflict) return conflicts def _detect_printer_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]: """Erkennt Drucker-Verfügbarkeits-Konflikte""" conflicts = [] printer_id = job_data.get('printer_id') if not printer_id: return conflicts printer = db_session.query(Printer).filter_by(id=printer_id).first() if not printer: conflict = ConflictDetails( conflict_type=ConflictType.PRINTER_OFFLINE, severity=ConflictSeverity.CRITICAL, affected_job_id=job_data.get('job_id', 0), conflicting_job_ids=[], affected_printer_id=printer_id, conflict_start=job_data.get('start_time'), conflict_end=job_data.get('end_time'), description=f"Drucker ID {printer_id} existiert nicht", suggested_solutions=[], estimated_impact="Job kann nicht ausgeführt werden", auto_resolvable=False ) conflicts.append(conflict) return conflicts # Drucker-Status prüfen if not printer.active: suggestions = self._generate_printer_alternative_solutions(job_data, db_session) conflict = ConflictDetails( conflict_type=ConflictType.PRINTER_OFFLINE, severity=ConflictSeverity.HIGH, affected_job_id=job_data.get('job_id', 0), conflicting_job_ids=[], affected_printer_id=printer_id, conflict_start=job_data.get('start_time'), conflict_end=job_data.get('end_time'), description=f"Drucker '{printer.name}' ist offline oder nicht aktiv", suggested_solutions=suggestions, estimated_impact="Automatische Neuzuweisung erforderlich", auto_resolvable=len(suggestions) > 0 ) conflicts.append(conflict) return conflicts def _detect_resource_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]: """Erkennt Ressourcen-Verfügbarkeits-Konflikte""" conflicts = [] # TODO: Implementierung für Material-, Personal- und andere Ressourcenkonflikte # Aktuell Platzhalter für zukünftige Erweiterungen return conflicts def _detect_priority_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]: """Erkennt Prioritäts-basierte Konflikte""" conflicts = [] job_priority = job_data.get('priority', 'normal') if job_priority not in ['urgent', 'high']: return conflicts # Nur hohe Prioritäten können andere verdrängen printer_id = job_data.get('printer_id') start_time = job_data.get('start_time') end_time = job_data.get('end_time') if not all([printer_id, start_time, end_time]): return conflicts # Niedrigerprioisierte Jobs im gleichen Zeitraum finden lower_priority_jobs = db_session.query(Job).filter( Job.printer_id == printer_id, Job.status.in_(["scheduled"]), or_( and_(Job.start_at >= start_time, Job.start_at < end_time), and_(Job.end_at > start_time, Job.end_at <= end_time), and_(Job.start_at <= start_time, Job.end_at >= end_time) ) ).all() for existing_job in lower_priority_jobs: existing_priority = getattr(existing_job, 'priority', 'normal') existing_weight = self.priority_weights.get(existing_priority, 2) new_weight = self.priority_weights.get(job_priority, 2) if new_weight > existing_weight: suggestions = self._generate_priority_conflict_solutions( job_data, existing_job, db_session ) conflict = ConflictDetails( conflict_type=ConflictType.PRIORITY_CONFLICT, severity=ConflictSeverity.MEDIUM, affected_job_id=job_data.get('job_id', 0), conflicting_job_ids=[existing_job.id], affected_printer_id=printer_id, conflict_start=start_time, conflict_end=end_time, description=f"Höherpriorer Job verdrängt '{existing_job.name}' " f"({job_priority} > {existing_priority})", suggested_solutions=suggestions, estimated_impact="Umplanung eines bestehenden Jobs erforderlich", auto_resolvable=True ) conflicts.append(conflict) return conflicts def resolve_conflicts(self, conflicts: List[ConflictDetails], job_data: Dict, db_session: Session) -> List[ConflictResolution]: """ Löst alle erkannten Konflikte automatisch oder semi-automatisch Args: conflicts: Liste der zu lösenden Konflikte job_data: Job-Informationen db_session: Datenbankverbindung Returns: Liste der Konfliktlösungen """ resolutions = [] # Konflikte nach Schweregrad sortieren (kritische zuerst) sorted_conflicts = sorted(conflicts, key=lambda c: list(ConflictSeverity).index(c.severity)) for conflict in sorted_conflicts: if conflict.auto_resolvable and conflict.suggested_solutions: resolution = self._auto_resolve_conflict(conflict, job_data, db_session) resolutions.append(resolution) else: # Manuelle Behandlung erforderlich resolution = ConflictResolution( success=False, strategy_used=ResolutionStrategy.MANUAL_INTERVENTION, new_printer_id=None, new_start_time=None, new_end_time=None, affected_jobs=[conflict.affected_job_id], user_notification_required=True, message=f"Manueller Eingriff erforderlich: {conflict.description}", confidence_score=0.0 ) resolutions.append(resolution) logger.info(f"🔧 Konfliktlösung abgeschlossen: {len(resolutions)} Konflikte bearbeitet") return resolutions def _auto_resolve_conflict(self, conflict: ConflictDetails, job_data: Dict, db_session: Session) -> ConflictResolution: """Automatische Konfliktlösung""" # Beste Lösung aus Vorschlägen wählen best_solution = max(conflict.suggested_solutions, key=lambda s: s.get('confidence', 0)) strategy = ResolutionStrategy(best_solution['strategy']) try: if strategy == ResolutionStrategy.AUTO_REASSIGN: return self._execute_auto_reassignment(conflict, best_solution, job_data, db_session) elif strategy == ResolutionStrategy.TIME_SHIFT: return self._execute_time_shift(conflict, best_solution, job_data, db_session) elif strategy == ResolutionStrategy.PRIORITY_PREEMPTION: return self._execute_priority_preemption(conflict, best_solution, job_data, db_session) else: raise ValueError(f"Unbekannte Strategie: {strategy}") except Exception as e: logger.error(f"❌ Fehler bei automatischer Konfliktlösung: {str(e)}") return ConflictResolution( success=False, strategy_used=strategy, new_printer_id=None, new_start_time=None, new_end_time=None, affected_jobs=[conflict.affected_job_id], user_notification_required=True, message=f"Automatische Lösung fehlgeschlagen: {str(e)}", confidence_score=0.0 ) def _execute_auto_reassignment(self, conflict: ConflictDetails, solution: Dict, job_data: Dict, db_session: Session) -> ConflictResolution: """Führt automatische Druckerzuweisung durch""" new_printer_id = solution['new_printer_id'] printer = db_session.query(Printer).filter_by(id=new_printer_id).first() if not printer or not printer.active: return ConflictResolution( success=False, strategy_used=ResolutionStrategy.AUTO_REASSIGN, new_printer_id=None, new_start_time=None, new_end_time=None, affected_jobs=[conflict.affected_job_id], user_notification_required=True, message="Alternativer Drucker nicht mehr verfügbar", confidence_score=0.0 ) return ConflictResolution( success=True, strategy_used=ResolutionStrategy.AUTO_REASSIGN, new_printer_id=new_printer_id, new_start_time=job_data.get('start_time'), new_end_time=job_data.get('end_time'), affected_jobs=[conflict.affected_job_id], user_notification_required=True, message=f"Job automatisch zu Drucker '{printer.name}' verschoben", confidence_score=solution.get('confidence', 0.8) ) def _execute_time_shift(self, conflict: ConflictDetails, solution: Dict, job_data: Dict, db_session: Session) -> ConflictResolution: """Führt Zeitverschiebung durch""" new_start = solution['new_start_time'] new_end = solution['new_end_time'] return ConflictResolution( success=True, strategy_used=ResolutionStrategy.TIME_SHIFT, new_printer_id=job_data.get('printer_id'), new_start_time=new_start, new_end_time=new_end, affected_jobs=[conflict.affected_job_id], user_notification_required=True, message=f"Job zeitlich verschoben: {new_start.strftime('%H:%M')} - {new_end.strftime('%H:%M')}", confidence_score=solution.get('confidence', 0.7) ) def _execute_priority_preemption(self, conflict: ConflictDetails, solution: Dict, job_data: Dict, db_session: Session) -> ConflictResolution: """Führt Prioritätsverdrängung durch""" # Bestehenden Job umplanen conflicting_job_id = conflict.conflicting_job_ids[0] affected_jobs = [conflict.affected_job_id, conflicting_job_id] return ConflictResolution( success=True, strategy_used=ResolutionStrategy.PRIORITY_PREEMPTION, new_printer_id=job_data.get('printer_id'), new_start_time=job_data.get('start_time'), new_end_time=job_data.get('end_time'), affected_jobs=affected_jobs, user_notification_required=True, message=f"Höherpriorer Job übernimmt Zeitslot, bestehender Job wird umgeplant", confidence_score=solution.get('confidence', 0.9) ) # Hilfsmethoden für Lösungsvorschläge def _generate_time_conflict_solutions(self, job_data: Dict, conflicting_job: Job, db_session: Session) -> List[Dict]: """Generiert Lösungsvorschläge für Zeitkonflikte""" solutions = [] # 1. Alternative Drucker vorschlagen alternative_printers = self._find_alternative_printers(job_data, db_session) for printer_id, confidence in alternative_printers: printer = db_session.query(Printer).filter_by(id=printer_id).first() solutions.append({ 'strategy': ResolutionStrategy.AUTO_REASSIGN.value, 'new_printer_id': printer_id, 'printer_name': printer.name if printer else f"Drucker {printer_id}", 'confidence': confidence, 'description': f"Automatische Umzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}" }) # 2. Zeitverschiebung vorschlagen time_alternatives = self._find_alternative_time_slots(job_data, db_session) for start_time, end_time, confidence in time_alternatives: solutions.append({ 'strategy': ResolutionStrategy.TIME_SHIFT.value, 'new_start_time': start_time, 'new_end_time': end_time, 'confidence': confidence, 'description': f"Zeitverschiebung: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}" }) return solutions def _generate_printer_alternative_solutions(self, job_data: Dict, db_session: Session) -> List[Dict]: """Generiert Lösungsvorschläge für Drucker-Ausfälle""" solutions = [] alternative_printers = self._find_alternative_printers(job_data, db_session) for printer_id, confidence in alternative_printers: printer = db_session.query(Printer).filter_by(id=printer_id).first() solutions.append({ 'strategy': ResolutionStrategy.AUTO_REASSIGN.value, 'new_printer_id': printer_id, 'printer_name': printer.name if printer else f"Drucker {printer_id}", 'confidence': confidence, 'description': f"Automatische Neuzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}" }) return solutions def _generate_priority_conflict_solutions(self, job_data: Dict, existing_job: Job, db_session: Session) -> List[Dict]: """Generiert Lösungsvorschläge für Prioritätskonflikte""" solutions = [] # Bestehenden Job umplanen alternative_slots = self._find_alternative_time_slots({ 'printer_id': existing_job.printer_id, 'start_time': existing_job.start_at, 'end_time': existing_job.end_at, 'duration_minutes': existing_job.duration_minutes }, db_session) if alternative_slots: start_time, end_time, confidence = alternative_slots[0] solutions.append({ 'strategy': ResolutionStrategy.PRIORITY_PREEMPTION.value, 'conflicting_job_new_start': start_time, 'conflicting_job_new_end': end_time, 'confidence': confidence, 'description': f"Bestehenden Job zu {start_time.strftime('%H:%M')} verschieben" }) return solutions def _find_alternative_printers(self, job_data: Dict, db_session: Session) -> List[Tuple[int, float]]: """Findet alternative Drucker mit Confidence-Score""" from blueprints.calendar import get_smart_printer_assignment alternatives = [] start_time = job_data.get('start_time') end_time = job_data.get('end_time') priority = job_data.get('priority', 'normal') # Smart Assignment nutzen recommended_printer_id = get_smart_printer_assignment( start_date=start_time, end_date=end_time, priority=priority, db_session=db_session ) if recommended_printer_id: alternatives.append((recommended_printer_id, 0.9)) # Weitere verfügbare Drucker mit niedrigerer Confidence available_printers = db_session.query(Printer).filter( Printer.active == True, Printer.id != job_data.get('printer_id'), Printer.id != recommended_printer_id ).all() for printer in available_printers[:3]: # Top 3 Alternativen # Einfache Verfügbarkeitsprüfung conflicts = db_session.query(Job).filter( Job.printer_id == printer.id, Job.status.in_(["scheduled", "running"]), or_( and_(Job.start_at >= start_time, Job.start_at < end_time), and_(Job.end_at > start_time, Job.end_at <= end_time), and_(Job.start_at <= start_time, Job.end_at >= end_time) ) ).count() if conflicts == 0: alternatives.append((printer.id, 0.6)) # Niedrigere Confidence return alternatives def _find_alternative_time_slots(self, job_data: Dict, db_session: Session) -> List[Tuple[datetime, datetime, float]]: """Findet alternative Zeitfenster""" alternatives = [] printer_id = job_data.get('printer_id') original_start = job_data.get('start_time') duration_minutes = job_data.get('duration_minutes') if not all([printer_id, original_start, duration_minutes]): return alternatives duration = timedelta(minutes=duration_minutes) # Zeitfenster um ursprünglichen Termin herum testen test_intervals = [ timedelta(hours=1), # 1 Stunde später timedelta(hours=2), # 2 Stunden später timedelta(hours=-1), # 1 Stunde früher timedelta(hours=3), # 3 Stunden später timedelta(hours=-2), # 2 Stunden früher ] for interval in test_intervals: new_start = original_start + interval new_end = new_start + duration # Verfügbarkeit prüfen conflicts = db_session.query(Job).filter( Job.printer_id == printer_id, Job.status.in_(["scheduled", "running"]), or_( and_(Job.start_at >= new_start, Job.start_at < new_end), and_(Job.end_at > new_start, Job.end_at <= new_end), and_(Job.start_at <= new_start, Job.end_at >= new_end) ) ).count() if conflicts == 0: # Confidence basierend auf Zeitnähe zum Original time_diff_hours = abs(interval.total_seconds() / 3600) confidence = max(0.3, 1.0 - (time_diff_hours * 0.1)) alternatives.append((new_start, new_end, confidence)) if len(alternatives) >= 3: # Maximal 3 Alternativen break return alternatives def _calculate_overlap_duration(self, start1: datetime, end1: datetime, start2: datetime, end2: datetime) -> timedelta: """Berechnet Überschneidungsdauer zwischen zwei Zeiträumen""" overlap_start = max(start1, start2) overlap_end = min(end1, end2) if overlap_start < overlap_end: return overlap_end - overlap_start else: return timedelta(0) # Globale Instanz für einfache Nutzung conflict_manager = ConflictManager()