manage-your-printer/utils/conflict_manager.py
2025-06-04 10:03:22 +02:00

624 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Erweiterte Druckerkonflikt-Management-Engine - MYP Platform
Dieses Modul behandelt alle Arten von Druckerkonflikten:
- Zeitüberschneidungen
- Ressourcenkonflikte
- Prioritätskonflikte
- Automatische Lösungsfindung
- Benutzerbenachrichtigungen
"""
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass
from enum import Enum
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from models import Job, Printer, User, get_cached_session
# Logging setup
logger = logging.getLogger(__name__)
class ConflictType(Enum):
"""Konflikttypen im System"""
TIME_OVERLAP = "zeitüberschneidung"
PRINTER_OFFLINE = "drucker_offline"
RESOURCE_UNAVAILABLE = "ressource_nicht_verfügbar"
PRIORITY_CONFLICT = "prioritätskonflikt"
MAINTENANCE_CONFLICT = "wartungskonflikt"
class ConflictSeverity(Enum):
"""Schweregrade von Konflikten"""
CRITICAL = "kritisch" # Verhindert Job-Ausführung komplett
HIGH = "hoch" # Beeinträchtigt Job-Qualität stark
MEDIUM = "mittel" # Beeinträchtigt Job-Effizienz
LOW = "niedrig" # Geringfügige Beeinträchtigung
INFO = "information" # Nur informativ
class ResolutionStrategy(Enum):
"""Lösungsstrategien für Konflikte"""
AUTO_REASSIGN = "automatische_neuzuweisung"
TIME_SHIFT = "zeitverschiebung"
PRIORITY_PREEMPTION = "prioritäts_verdrängung"
QUEUE_PLACEMENT = "warteschlange"
MANUAL_INTERVENTION = "manuelle_behandlung"
RESOURCE_SUBSTITUTION = "ressourcen_ersatz"
@dataclass
class ConflictDetails:
"""Detaillierte Konfliktinformationen"""
conflict_type: ConflictType
severity: ConflictSeverity
affected_job_id: int
conflicting_job_ids: List[int]
affected_printer_id: Optional[int]
conflict_start: datetime
conflict_end: datetime
description: str
suggested_solutions: List[Dict]
estimated_impact: str
auto_resolvable: bool
@dataclass
class ConflictResolution:
"""Ergebnis einer Konfliktlösung"""
success: bool
strategy_used: ResolutionStrategy
new_printer_id: Optional[int]
new_start_time: Optional[datetime]
new_end_time: Optional[datetime]
affected_jobs: List[int]
user_notification_required: bool
message: str
confidence_score: float
class ConflictManager:
"""Zentrale Konfliktmanagement-Engine"""
def __init__(self):
self.priority_weights = {
'urgent': 4,
'high': 3,
'normal': 2,
'low': 1
}
self.time_slot_preferences = {
'night_shift': {'start': 18, 'end': 6, 'bonus': 25},
'day_shift': {'start': 8, 'end': 17, 'bonus': 15},
'transition': {'start': 6, 'end': 8, 'bonus': 5}
}
self.conflict_resolution_timeout = 300 # 5 Minuten
def detect_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""
Erkennt alle möglichen Konflikte für einen geplanten Job
Args:
job_data: Job-Informationen (printer_id, start_time, end_time, priority)
db_session: Datenbankverbindung
Returns:
Liste aller erkannten Konflikte
"""
conflicts = []
# 1. Zeitüberschneidungs-Konflikte prüfen
time_conflicts = self._detect_time_conflicts(job_data, db_session)
conflicts.extend(time_conflicts)
# 2. Drucker-Verfügbarkeits-Konflikte prüfen
printer_conflicts = self._detect_printer_conflicts(job_data, db_session)
conflicts.extend(printer_conflicts)
# 3. Ressourcen-Konflikte prüfen
resource_conflicts = self._detect_resource_conflicts(job_data, db_session)
conflicts.extend(resource_conflicts)
# 4. Prioritäts-Konflikte prüfen
priority_conflicts = self._detect_priority_conflicts(job_data, db_session)
conflicts.extend(priority_conflicts)
logger.info(f"🔍 Konfliktanalyse abgeschlossen: {len(conflicts)} Konflikte erkannt")
return conflicts
def _detect_time_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Zeitüberschneidungs-Konflikte"""
conflicts = []
printer_id = job_data.get('printer_id')
start_time = job_data.get('start_time')
end_time = job_data.get('end_time')
if not all([printer_id, start_time, end_time]):
return conflicts
# Konflikthafte Jobs finden
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).all()
for conflicting_job in conflicting_jobs:
# Konflikt-Schweregrad bestimmen
overlap_duration = self._calculate_overlap_duration(
start_time, end_time,
conflicting_job.start_at, conflicting_job.end_at
)
if overlap_duration.total_seconds() > 3600: # > 1 Stunde
severity = ConflictSeverity.CRITICAL
elif overlap_duration.total_seconds() > 1800: # > 30 Minuten
severity = ConflictSeverity.HIGH
else:
severity = ConflictSeverity.MEDIUM
# Lösungsvorschläge generieren
suggestions = self._generate_time_conflict_solutions(
job_data, conflicting_job, db_session
)
conflict = ConflictDetails(
conflict_type=ConflictType.TIME_OVERLAP,
severity=severity,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[conflicting_job.id],
affected_printer_id=printer_id,
conflict_start=max(start_time, conflicting_job.start_at),
conflict_end=min(end_time, conflicting_job.end_at),
description=f"Zeitüberschneidung mit Job '{conflicting_job.name}' "
f"({overlap_duration.total_seconds()/60:.0f} Minuten)",
suggested_solutions=suggestions,
estimated_impact=f"Verzögerung von {overlap_duration.total_seconds()/60:.0f} Minuten",
auto_resolvable=len(suggestions) > 0
)
conflicts.append(conflict)
return conflicts
def _detect_printer_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Drucker-Verfügbarkeits-Konflikte"""
conflicts = []
printer_id = job_data.get('printer_id')
if not printer_id:
return conflicts
printer = db_session.query(Printer).filter_by(id=printer_id).first()
if not printer:
conflict = ConflictDetails(
conflict_type=ConflictType.PRINTER_OFFLINE,
severity=ConflictSeverity.CRITICAL,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[],
affected_printer_id=printer_id,
conflict_start=job_data.get('start_time'),
conflict_end=job_data.get('end_time'),
description=f"Drucker ID {printer_id} existiert nicht",
suggested_solutions=[],
estimated_impact="Job kann nicht ausgeführt werden",
auto_resolvable=False
)
conflicts.append(conflict)
return conflicts
# Drucker-Status prüfen
if not printer.active:
suggestions = self._generate_printer_alternative_solutions(job_data, db_session)
conflict = ConflictDetails(
conflict_type=ConflictType.PRINTER_OFFLINE,
severity=ConflictSeverity.HIGH,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[],
affected_printer_id=printer_id,
conflict_start=job_data.get('start_time'),
conflict_end=job_data.get('end_time'),
description=f"Drucker '{printer.name}' ist offline oder nicht aktiv",
suggested_solutions=suggestions,
estimated_impact="Automatische Neuzuweisung erforderlich",
auto_resolvable=len(suggestions) > 0
)
conflicts.append(conflict)
return conflicts
def _detect_resource_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Ressourcen-Verfügbarkeits-Konflikte"""
conflicts = []
# TODO: Implementierung für Material-, Personal- und andere Ressourcenkonflikte
# Aktuell Platzhalter für zukünftige Erweiterungen
return conflicts
def _detect_priority_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Prioritäts-basierte Konflikte"""
conflicts = []
job_priority = job_data.get('priority', 'normal')
if job_priority not in ['urgent', 'high']:
return conflicts # Nur hohe Prioritäten können andere verdrängen
printer_id = job_data.get('printer_id')
start_time = job_data.get('start_time')
end_time = job_data.get('end_time')
if not all([printer_id, start_time, end_time]):
return conflicts
# Niedrigerprioisierte Jobs im gleichen Zeitraum finden
lower_priority_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).all()
for existing_job in lower_priority_jobs:
existing_priority = getattr(existing_job, 'priority', 'normal')
existing_weight = self.priority_weights.get(existing_priority, 2)
new_weight = self.priority_weights.get(job_priority, 2)
if new_weight > existing_weight:
suggestions = self._generate_priority_conflict_solutions(
job_data, existing_job, db_session
)
conflict = ConflictDetails(
conflict_type=ConflictType.PRIORITY_CONFLICT,
severity=ConflictSeverity.MEDIUM,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[existing_job.id],
affected_printer_id=printer_id,
conflict_start=start_time,
conflict_end=end_time,
description=f"Höherpriorer Job verdrängt '{existing_job.name}' "
f"({job_priority} > {existing_priority})",
suggested_solutions=suggestions,
estimated_impact="Umplanung eines bestehenden Jobs erforderlich",
auto_resolvable=True
)
conflicts.append(conflict)
return conflicts
def resolve_conflicts(self, conflicts: List[ConflictDetails],
job_data: Dict, db_session: Session) -> List[ConflictResolution]:
"""
Löst alle erkannten Konflikte automatisch oder semi-automatisch
Args:
conflicts: Liste der zu lösenden Konflikte
job_data: Job-Informationen
db_session: Datenbankverbindung
Returns:
Liste der Konfliktlösungen
"""
resolutions = []
# Konflikte nach Schweregrad sortieren (kritische zuerst)
sorted_conflicts = sorted(conflicts,
key=lambda c: list(ConflictSeverity).index(c.severity))
for conflict in sorted_conflicts:
if conflict.auto_resolvable and conflict.suggested_solutions:
resolution = self._auto_resolve_conflict(conflict, job_data, db_session)
resolutions.append(resolution)
else:
# Manuelle Behandlung erforderlich
resolution = ConflictResolution(
success=False,
strategy_used=ResolutionStrategy.MANUAL_INTERVENTION,
new_printer_id=None,
new_start_time=None,
new_end_time=None,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Manueller Eingriff erforderlich: {conflict.description}",
confidence_score=0.0
)
resolutions.append(resolution)
logger.info(f"🔧 Konfliktlösung abgeschlossen: {len(resolutions)} Konflikte bearbeitet")
return resolutions
def _auto_resolve_conflict(self, conflict: ConflictDetails,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Automatische Konfliktlösung"""
# Beste Lösung aus Vorschlägen wählen
best_solution = max(conflict.suggested_solutions,
key=lambda s: s.get('confidence', 0))
strategy = ResolutionStrategy(best_solution['strategy'])
try:
if strategy == ResolutionStrategy.AUTO_REASSIGN:
return self._execute_auto_reassignment(conflict, best_solution, job_data, db_session)
elif strategy == ResolutionStrategy.TIME_SHIFT:
return self._execute_time_shift(conflict, best_solution, job_data, db_session)
elif strategy == ResolutionStrategy.PRIORITY_PREEMPTION:
return self._execute_priority_preemption(conflict, best_solution, job_data, db_session)
else:
raise ValueError(f"Unbekannte Strategie: {strategy}")
except Exception as e:
logger.error(f"❌ Fehler bei automatischer Konfliktlösung: {str(e)}")
return ConflictResolution(
success=False,
strategy_used=strategy,
new_printer_id=None,
new_start_time=None,
new_end_time=None,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Automatische Lösung fehlgeschlagen: {str(e)}",
confidence_score=0.0
)
def _execute_auto_reassignment(self, conflict: ConflictDetails, solution: Dict,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Führt automatische Druckerzuweisung durch"""
new_printer_id = solution['new_printer_id']
printer = db_session.query(Printer).filter_by(id=new_printer_id).first()
if not printer or not printer.active:
return ConflictResolution(
success=False,
strategy_used=ResolutionStrategy.AUTO_REASSIGN,
new_printer_id=None,
new_start_time=None,
new_end_time=None,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message="Alternativer Drucker nicht mehr verfügbar",
confidence_score=0.0
)
return ConflictResolution(
success=True,
strategy_used=ResolutionStrategy.AUTO_REASSIGN,
new_printer_id=new_printer_id,
new_start_time=job_data.get('start_time'),
new_end_time=job_data.get('end_time'),
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Job automatisch zu Drucker '{printer.name}' verschoben",
confidence_score=solution.get('confidence', 0.8)
)
def _execute_time_shift(self, conflict: ConflictDetails, solution: Dict,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Führt Zeitverschiebung durch"""
new_start = solution['new_start_time']
new_end = solution['new_end_time']
return ConflictResolution(
success=True,
strategy_used=ResolutionStrategy.TIME_SHIFT,
new_printer_id=job_data.get('printer_id'),
new_start_time=new_start,
new_end_time=new_end,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Job zeitlich verschoben: {new_start.strftime('%H:%M')} - {new_end.strftime('%H:%M')}",
confidence_score=solution.get('confidence', 0.7)
)
def _execute_priority_preemption(self, conflict: ConflictDetails, solution: Dict,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Führt Prioritätsverdrängung durch"""
# Bestehenden Job umplanen
conflicting_job_id = conflict.conflicting_job_ids[0]
affected_jobs = [conflict.affected_job_id, conflicting_job_id]
return ConflictResolution(
success=True,
strategy_used=ResolutionStrategy.PRIORITY_PREEMPTION,
new_printer_id=job_data.get('printer_id'),
new_start_time=job_data.get('start_time'),
new_end_time=job_data.get('end_time'),
affected_jobs=affected_jobs,
user_notification_required=True,
message=f"Höherpriorer Job übernimmt Zeitslot, bestehender Job wird umgeplant",
confidence_score=solution.get('confidence', 0.9)
)
# Hilfsmethoden für Lösungsvorschläge
def _generate_time_conflict_solutions(self, job_data: Dict,
conflicting_job: Job, db_session: Session) -> List[Dict]:
"""Generiert Lösungsvorschläge für Zeitkonflikte"""
solutions = []
# 1. Alternative Drucker vorschlagen
alternative_printers = self._find_alternative_printers(job_data, db_session)
for printer_id, confidence in alternative_printers:
printer = db_session.query(Printer).filter_by(id=printer_id).first()
solutions.append({
'strategy': ResolutionStrategy.AUTO_REASSIGN.value,
'new_printer_id': printer_id,
'printer_name': printer.name if printer else f"Drucker {printer_id}",
'confidence': confidence,
'description': f"Automatische Umzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}"
})
# 2. Zeitverschiebung vorschlagen
time_alternatives = self._find_alternative_time_slots(job_data, db_session)
for start_time, end_time, confidence in time_alternatives:
solutions.append({
'strategy': ResolutionStrategy.TIME_SHIFT.value,
'new_start_time': start_time,
'new_end_time': end_time,
'confidence': confidence,
'description': f"Zeitverschiebung: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}"
})
return solutions
def _generate_printer_alternative_solutions(self, job_data: Dict, db_session: Session) -> List[Dict]:
"""Generiert Lösungsvorschläge für Drucker-Ausfälle"""
solutions = []
alternative_printers = self._find_alternative_printers(job_data, db_session)
for printer_id, confidence in alternative_printers:
printer = db_session.query(Printer).filter_by(id=printer_id).first()
solutions.append({
'strategy': ResolutionStrategy.AUTO_REASSIGN.value,
'new_printer_id': printer_id,
'printer_name': printer.name if printer else f"Drucker {printer_id}",
'confidence': confidence,
'description': f"Automatische Neuzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}"
})
return solutions
def _generate_priority_conflict_solutions(self, job_data: Dict,
existing_job: Job, db_session: Session) -> List[Dict]:
"""Generiert Lösungsvorschläge für Prioritätskonflikte"""
solutions = []
# Bestehenden Job umplanen
alternative_slots = self._find_alternative_time_slots({
'printer_id': existing_job.printer_id,
'start_time': existing_job.start_at,
'end_time': existing_job.end_at,
'duration_minutes': existing_job.duration_minutes
}, db_session)
if alternative_slots:
start_time, end_time, confidence = alternative_slots[0]
solutions.append({
'strategy': ResolutionStrategy.PRIORITY_PREEMPTION.value,
'conflicting_job_new_start': start_time,
'conflicting_job_new_end': end_time,
'confidence': confidence,
'description': f"Bestehenden Job zu {start_time.strftime('%H:%M')} verschieben"
})
return solutions
def _find_alternative_printers(self, job_data: Dict, db_session: Session) -> List[Tuple[int, float]]:
"""Findet alternative Drucker mit Confidence-Score"""
from blueprints.calendar import get_smart_printer_assignment
alternatives = []
start_time = job_data.get('start_time')
end_time = job_data.get('end_time')
priority = job_data.get('priority', 'normal')
# Smart Assignment nutzen
recommended_printer_id = get_smart_printer_assignment(
start_date=start_time,
end_date=end_time,
priority=priority,
db_session=db_session
)
if recommended_printer_id:
alternatives.append((recommended_printer_id, 0.9))
# Weitere verfügbare Drucker mit niedrigerer Confidence
available_printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.id != job_data.get('printer_id'),
Printer.id != recommended_printer_id
).all()
for printer in available_printers[:3]: # Top 3 Alternativen
# Einfache Verfügbarkeitsprüfung
conflicts = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).count()
if conflicts == 0:
alternatives.append((printer.id, 0.6)) # Niedrigere Confidence
return alternatives
def _find_alternative_time_slots(self, job_data: Dict, db_session: Session) -> List[Tuple[datetime, datetime, float]]:
"""Findet alternative Zeitfenster"""
alternatives = []
printer_id = job_data.get('printer_id')
original_start = job_data.get('start_time')
duration_minutes = job_data.get('duration_minutes')
if not all([printer_id, original_start, duration_minutes]):
return alternatives
duration = timedelta(minutes=duration_minutes)
# Zeitfenster um ursprünglichen Termin herum testen
test_intervals = [
timedelta(hours=1), # 1 Stunde später
timedelta(hours=2), # 2 Stunden später
timedelta(hours=-1), # 1 Stunde früher
timedelta(hours=3), # 3 Stunden später
timedelta(hours=-2), # 2 Stunden früher
]
for interval in test_intervals:
new_start = original_start + interval
new_end = new_start + duration
# Verfügbarkeit prüfen
conflicts = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= new_start, Job.start_at < new_end),
and_(Job.end_at > new_start, Job.end_at <= new_end),
and_(Job.start_at <= new_start, Job.end_at >= new_end)
)
).count()
if conflicts == 0:
# Confidence basierend auf Zeitnähe zum Original
time_diff_hours = abs(interval.total_seconds() / 3600)
confidence = max(0.3, 1.0 - (time_diff_hours * 0.1))
alternatives.append((new_start, new_end, confidence))
if len(alternatives) >= 3: # Maximal 3 Alternativen
break
return alternatives
def _calculate_overlap_duration(self, start1: datetime, end1: datetime,
start2: datetime, end2: datetime) -> timedelta:
"""Berechnet Überschneidungsdauer zwischen zwei Zeiträumen"""
overlap_start = max(start1, start2)
overlap_end = min(end1, end2)
if overlap_start < overlap_end:
return overlap_end - overlap_start
else:
return timedelta(0)
# Globale Instanz für einfache Nutzung
conflict_manager = ConflictManager()