624 lines
26 KiB
Python
624 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Erweiterte Druckerkonflikt-Management-Engine - MYP Platform
|
|
|
|
Dieses Modul behandelt alle Arten von Druckerkonflikten:
|
|
- Zeitüberschneidungen
|
|
- Ressourcenkonflikte
|
|
- Prioritätskonflikte
|
|
- Automatische Lösungsfindung
|
|
- Benutzerbenachrichtigungen
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Tuple, Optional, Set
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_, or_
|
|
|
|
from models import Job, Printer, User, get_cached_session
|
|
|
|
# Logging setup
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ConflictType(Enum):
|
|
"""Konflikttypen im System"""
|
|
TIME_OVERLAP = "zeitüberschneidung"
|
|
PRINTER_OFFLINE = "drucker_offline"
|
|
RESOURCE_UNAVAILABLE = "ressource_nicht_verfügbar"
|
|
PRIORITY_CONFLICT = "prioritätskonflikt"
|
|
MAINTENANCE_CONFLICT = "wartungskonflikt"
|
|
|
|
class ConflictSeverity(Enum):
|
|
"""Schweregrade von Konflikten"""
|
|
CRITICAL = "kritisch" # Verhindert Job-Ausführung komplett
|
|
HIGH = "hoch" # Beeinträchtigt Job-Qualität stark
|
|
MEDIUM = "mittel" # Beeinträchtigt Job-Effizienz
|
|
LOW = "niedrig" # Geringfügige Beeinträchtigung
|
|
INFO = "information" # Nur informativ
|
|
|
|
class ResolutionStrategy(Enum):
|
|
"""Lösungsstrategien für Konflikte"""
|
|
AUTO_REASSIGN = "automatische_neuzuweisung"
|
|
TIME_SHIFT = "zeitverschiebung"
|
|
PRIORITY_PREEMPTION = "prioritäts_verdrängung"
|
|
QUEUE_PLACEMENT = "warteschlange"
|
|
MANUAL_INTERVENTION = "manuelle_behandlung"
|
|
RESOURCE_SUBSTITUTION = "ressourcen_ersatz"
|
|
|
|
@dataclass
|
|
class ConflictDetails:
|
|
"""Detaillierte Konfliktinformationen"""
|
|
conflict_type: ConflictType
|
|
severity: ConflictSeverity
|
|
affected_job_id: int
|
|
conflicting_job_ids: List[int]
|
|
affected_printer_id: Optional[int]
|
|
conflict_start: datetime
|
|
conflict_end: datetime
|
|
description: str
|
|
suggested_solutions: List[Dict]
|
|
estimated_impact: str
|
|
auto_resolvable: bool
|
|
|
|
@dataclass
|
|
class ConflictResolution:
|
|
"""Ergebnis einer Konfliktlösung"""
|
|
success: bool
|
|
strategy_used: ResolutionStrategy
|
|
new_printer_id: Optional[int]
|
|
new_start_time: Optional[datetime]
|
|
new_end_time: Optional[datetime]
|
|
affected_jobs: List[int]
|
|
user_notification_required: bool
|
|
message: str
|
|
confidence_score: float
|
|
|
|
class ConflictManager:
|
|
"""Zentrale Konfliktmanagement-Engine"""
|
|
|
|
def __init__(self):
|
|
self.priority_weights = {
|
|
'urgent': 4,
|
|
'high': 3,
|
|
'normal': 2,
|
|
'low': 1
|
|
}
|
|
|
|
self.time_slot_preferences = {
|
|
'night_shift': {'start': 18, 'end': 6, 'bonus': 25},
|
|
'day_shift': {'start': 8, 'end': 17, 'bonus': 15},
|
|
'transition': {'start': 6, 'end': 8, 'bonus': 5}
|
|
}
|
|
|
|
self.conflict_resolution_timeout = 300 # 5 Minuten
|
|
|
|
def detect_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
|
|
"""
|
|
Erkennt alle möglichen Konflikte für einen geplanten Job
|
|
|
|
Args:
|
|
job_data: Job-Informationen (printer_id, start_time, end_time, priority)
|
|
db_session: Datenbankverbindung
|
|
|
|
Returns:
|
|
Liste aller erkannten Konflikte
|
|
"""
|
|
conflicts = []
|
|
|
|
# 1. Zeitüberschneidungs-Konflikte prüfen
|
|
time_conflicts = self._detect_time_conflicts(job_data, db_session)
|
|
conflicts.extend(time_conflicts)
|
|
|
|
# 2. Drucker-Verfügbarkeits-Konflikte prüfen
|
|
printer_conflicts = self._detect_printer_conflicts(job_data, db_session)
|
|
conflicts.extend(printer_conflicts)
|
|
|
|
# 3. Ressourcen-Konflikte prüfen
|
|
resource_conflicts = self._detect_resource_conflicts(job_data, db_session)
|
|
conflicts.extend(resource_conflicts)
|
|
|
|
# 4. Prioritäts-Konflikte prüfen
|
|
priority_conflicts = self._detect_priority_conflicts(job_data, db_session)
|
|
conflicts.extend(priority_conflicts)
|
|
|
|
logger.info(f"🔍 Konfliktanalyse abgeschlossen: {len(conflicts)} Konflikte erkannt")
|
|
return conflicts
|
|
|
|
def _detect_time_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
|
|
"""Erkennt Zeitüberschneidungs-Konflikte"""
|
|
conflicts = []
|
|
|
|
printer_id = job_data.get('printer_id')
|
|
start_time = job_data.get('start_time')
|
|
end_time = job_data.get('end_time')
|
|
|
|
if not all([printer_id, start_time, end_time]):
|
|
return conflicts
|
|
|
|
# Konflikthafte Jobs finden
|
|
conflicting_jobs = db_session.query(Job).filter(
|
|
Job.printer_id == printer_id,
|
|
Job.status.in_(["scheduled", "running"]),
|
|
or_(
|
|
and_(Job.start_at >= start_time, Job.start_at < end_time),
|
|
and_(Job.end_at > start_time, Job.end_at <= end_time),
|
|
and_(Job.start_at <= start_time, Job.end_at >= end_time)
|
|
)
|
|
).all()
|
|
|
|
for conflicting_job in conflicting_jobs:
|
|
# Konflikt-Schweregrad bestimmen
|
|
overlap_duration = self._calculate_overlap_duration(
|
|
start_time, end_time,
|
|
conflicting_job.start_at, conflicting_job.end_at
|
|
)
|
|
|
|
if overlap_duration.total_seconds() > 3600: # > 1 Stunde
|
|
severity = ConflictSeverity.CRITICAL
|
|
elif overlap_duration.total_seconds() > 1800: # > 30 Minuten
|
|
severity = ConflictSeverity.HIGH
|
|
else:
|
|
severity = ConflictSeverity.MEDIUM
|
|
|
|
# Lösungsvorschläge generieren
|
|
suggestions = self._generate_time_conflict_solutions(
|
|
job_data, conflicting_job, db_session
|
|
)
|
|
|
|
conflict = ConflictDetails(
|
|
conflict_type=ConflictType.TIME_OVERLAP,
|
|
severity=severity,
|
|
affected_job_id=job_data.get('job_id', 0),
|
|
conflicting_job_ids=[conflicting_job.id],
|
|
affected_printer_id=printer_id,
|
|
conflict_start=max(start_time, conflicting_job.start_at),
|
|
conflict_end=min(end_time, conflicting_job.end_at),
|
|
description=f"Zeitüberschneidung mit Job '{conflicting_job.name}' "
|
|
f"({overlap_duration.total_seconds()/60:.0f} Minuten)",
|
|
suggested_solutions=suggestions,
|
|
estimated_impact=f"Verzögerung von {overlap_duration.total_seconds()/60:.0f} Minuten",
|
|
auto_resolvable=len(suggestions) > 0
|
|
)
|
|
|
|
conflicts.append(conflict)
|
|
|
|
return conflicts
|
|
|
|
def _detect_printer_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
|
|
"""Erkennt Drucker-Verfügbarkeits-Konflikte"""
|
|
conflicts = []
|
|
|
|
printer_id = job_data.get('printer_id')
|
|
if not printer_id:
|
|
return conflicts
|
|
|
|
printer = db_session.query(Printer).filter_by(id=printer_id).first()
|
|
if not printer:
|
|
conflict = ConflictDetails(
|
|
conflict_type=ConflictType.PRINTER_OFFLINE,
|
|
severity=ConflictSeverity.CRITICAL,
|
|
affected_job_id=job_data.get('job_id', 0),
|
|
conflicting_job_ids=[],
|
|
affected_printer_id=printer_id,
|
|
conflict_start=job_data.get('start_time'),
|
|
conflict_end=job_data.get('end_time'),
|
|
description=f"Drucker ID {printer_id} existiert nicht",
|
|
suggested_solutions=[],
|
|
estimated_impact="Job kann nicht ausgeführt werden",
|
|
auto_resolvable=False
|
|
)
|
|
conflicts.append(conflict)
|
|
return conflicts
|
|
|
|
# Drucker-Status prüfen
|
|
if not printer.active:
|
|
suggestions = self._generate_printer_alternative_solutions(job_data, db_session)
|
|
|
|
conflict = ConflictDetails(
|
|
conflict_type=ConflictType.PRINTER_OFFLINE,
|
|
severity=ConflictSeverity.HIGH,
|
|
affected_job_id=job_data.get('job_id', 0),
|
|
conflicting_job_ids=[],
|
|
affected_printer_id=printer_id,
|
|
conflict_start=job_data.get('start_time'),
|
|
conflict_end=job_data.get('end_time'),
|
|
description=f"Drucker '{printer.name}' ist offline oder nicht aktiv",
|
|
suggested_solutions=suggestions,
|
|
estimated_impact="Automatische Neuzuweisung erforderlich",
|
|
auto_resolvable=len(suggestions) > 0
|
|
)
|
|
conflicts.append(conflict)
|
|
|
|
return conflicts
|
|
|
|
def _detect_resource_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
|
|
"""Erkennt Ressourcen-Verfügbarkeits-Konflikte"""
|
|
conflicts = []
|
|
|
|
# TODO: Implementierung für Material-, Personal- und andere Ressourcenkonflikte
|
|
# Aktuell Platzhalter für zukünftige Erweiterungen
|
|
|
|
return conflicts
|
|
|
|
def _detect_priority_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
|
|
"""Erkennt Prioritäts-basierte Konflikte"""
|
|
conflicts = []
|
|
|
|
job_priority = job_data.get('priority', 'normal')
|
|
if job_priority not in ['urgent', 'high']:
|
|
return conflicts # Nur hohe Prioritäten können andere verdrängen
|
|
|
|
printer_id = job_data.get('printer_id')
|
|
start_time = job_data.get('start_time')
|
|
end_time = job_data.get('end_time')
|
|
|
|
if not all([printer_id, start_time, end_time]):
|
|
return conflicts
|
|
|
|
# Niedrigerprioisierte Jobs im gleichen Zeitraum finden
|
|
lower_priority_jobs = db_session.query(Job).filter(
|
|
Job.printer_id == printer_id,
|
|
Job.status.in_(["scheduled"]),
|
|
or_(
|
|
and_(Job.start_at >= start_time, Job.start_at < end_time),
|
|
and_(Job.end_at > start_time, Job.end_at <= end_time),
|
|
and_(Job.start_at <= start_time, Job.end_at >= end_time)
|
|
)
|
|
).all()
|
|
|
|
for existing_job in lower_priority_jobs:
|
|
existing_priority = getattr(existing_job, 'priority', 'normal')
|
|
existing_weight = self.priority_weights.get(existing_priority, 2)
|
|
new_weight = self.priority_weights.get(job_priority, 2)
|
|
|
|
if new_weight > existing_weight:
|
|
suggestions = self._generate_priority_conflict_solutions(
|
|
job_data, existing_job, db_session
|
|
)
|
|
|
|
conflict = ConflictDetails(
|
|
conflict_type=ConflictType.PRIORITY_CONFLICT,
|
|
severity=ConflictSeverity.MEDIUM,
|
|
affected_job_id=job_data.get('job_id', 0),
|
|
conflicting_job_ids=[existing_job.id],
|
|
affected_printer_id=printer_id,
|
|
conflict_start=start_time,
|
|
conflict_end=end_time,
|
|
description=f"Höherpriorer Job verdrängt '{existing_job.name}' "
|
|
f"({job_priority} > {existing_priority})",
|
|
suggested_solutions=suggestions,
|
|
estimated_impact="Umplanung eines bestehenden Jobs erforderlich",
|
|
auto_resolvable=True
|
|
)
|
|
conflicts.append(conflict)
|
|
|
|
return conflicts
|
|
|
|
def resolve_conflicts(self, conflicts: List[ConflictDetails],
|
|
job_data: Dict, db_session: Session) -> List[ConflictResolution]:
|
|
"""
|
|
Löst alle erkannten Konflikte automatisch oder semi-automatisch
|
|
|
|
Args:
|
|
conflicts: Liste der zu lösenden Konflikte
|
|
job_data: Job-Informationen
|
|
db_session: Datenbankverbindung
|
|
|
|
Returns:
|
|
Liste der Konfliktlösungen
|
|
"""
|
|
resolutions = []
|
|
|
|
# Konflikte nach Schweregrad sortieren (kritische zuerst)
|
|
sorted_conflicts = sorted(conflicts,
|
|
key=lambda c: list(ConflictSeverity).index(c.severity))
|
|
|
|
for conflict in sorted_conflicts:
|
|
if conflict.auto_resolvable and conflict.suggested_solutions:
|
|
resolution = self._auto_resolve_conflict(conflict, job_data, db_session)
|
|
resolutions.append(resolution)
|
|
else:
|
|
# Manuelle Behandlung erforderlich
|
|
resolution = ConflictResolution(
|
|
success=False,
|
|
strategy_used=ResolutionStrategy.MANUAL_INTERVENTION,
|
|
new_printer_id=None,
|
|
new_start_time=None,
|
|
new_end_time=None,
|
|
affected_jobs=[conflict.affected_job_id],
|
|
user_notification_required=True,
|
|
message=f"Manueller Eingriff erforderlich: {conflict.description}",
|
|
confidence_score=0.0
|
|
)
|
|
resolutions.append(resolution)
|
|
|
|
logger.info(f"🔧 Konfliktlösung abgeschlossen: {len(resolutions)} Konflikte bearbeitet")
|
|
return resolutions
|
|
|
|
def _auto_resolve_conflict(self, conflict: ConflictDetails,
|
|
job_data: Dict, db_session: Session) -> ConflictResolution:
|
|
"""Automatische Konfliktlösung"""
|
|
|
|
# Beste Lösung aus Vorschlägen wählen
|
|
best_solution = max(conflict.suggested_solutions,
|
|
key=lambda s: s.get('confidence', 0))
|
|
|
|
strategy = ResolutionStrategy(best_solution['strategy'])
|
|
|
|
try:
|
|
if strategy == ResolutionStrategy.AUTO_REASSIGN:
|
|
return self._execute_auto_reassignment(conflict, best_solution, job_data, db_session)
|
|
elif strategy == ResolutionStrategy.TIME_SHIFT:
|
|
return self._execute_time_shift(conflict, best_solution, job_data, db_session)
|
|
elif strategy == ResolutionStrategy.PRIORITY_PREEMPTION:
|
|
return self._execute_priority_preemption(conflict, best_solution, job_data, db_session)
|
|
else:
|
|
raise ValueError(f"Unbekannte Strategie: {strategy}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Fehler bei automatischer Konfliktlösung: {str(e)}")
|
|
return ConflictResolution(
|
|
success=False,
|
|
strategy_used=strategy,
|
|
new_printer_id=None,
|
|
new_start_time=None,
|
|
new_end_time=None,
|
|
affected_jobs=[conflict.affected_job_id],
|
|
user_notification_required=True,
|
|
message=f"Automatische Lösung fehlgeschlagen: {str(e)}",
|
|
confidence_score=0.0
|
|
)
|
|
|
|
def _execute_auto_reassignment(self, conflict: ConflictDetails, solution: Dict,
|
|
job_data: Dict, db_session: Session) -> ConflictResolution:
|
|
"""Führt automatische Druckerzuweisung durch"""
|
|
|
|
new_printer_id = solution['new_printer_id']
|
|
printer = db_session.query(Printer).filter_by(id=new_printer_id).first()
|
|
|
|
if not printer or not printer.active:
|
|
return ConflictResolution(
|
|
success=False,
|
|
strategy_used=ResolutionStrategy.AUTO_REASSIGN,
|
|
new_printer_id=None,
|
|
new_start_time=None,
|
|
new_end_time=None,
|
|
affected_jobs=[conflict.affected_job_id],
|
|
user_notification_required=True,
|
|
message="Alternativer Drucker nicht mehr verfügbar",
|
|
confidence_score=0.0
|
|
)
|
|
|
|
return ConflictResolution(
|
|
success=True,
|
|
strategy_used=ResolutionStrategy.AUTO_REASSIGN,
|
|
new_printer_id=new_printer_id,
|
|
new_start_time=job_data.get('start_time'),
|
|
new_end_time=job_data.get('end_time'),
|
|
affected_jobs=[conflict.affected_job_id],
|
|
user_notification_required=True,
|
|
message=f"Job automatisch zu Drucker '{printer.name}' verschoben",
|
|
confidence_score=solution.get('confidence', 0.8)
|
|
)
|
|
|
|
def _execute_time_shift(self, conflict: ConflictDetails, solution: Dict,
|
|
job_data: Dict, db_session: Session) -> ConflictResolution:
|
|
"""Führt Zeitverschiebung durch"""
|
|
|
|
new_start = solution['new_start_time']
|
|
new_end = solution['new_end_time']
|
|
|
|
return ConflictResolution(
|
|
success=True,
|
|
strategy_used=ResolutionStrategy.TIME_SHIFT,
|
|
new_printer_id=job_data.get('printer_id'),
|
|
new_start_time=new_start,
|
|
new_end_time=new_end,
|
|
affected_jobs=[conflict.affected_job_id],
|
|
user_notification_required=True,
|
|
message=f"Job zeitlich verschoben: {new_start.strftime('%H:%M')} - {new_end.strftime('%H:%M')}",
|
|
confidence_score=solution.get('confidence', 0.7)
|
|
)
|
|
|
|
def _execute_priority_preemption(self, conflict: ConflictDetails, solution: Dict,
|
|
job_data: Dict, db_session: Session) -> ConflictResolution:
|
|
"""Führt Prioritätsverdrängung durch"""
|
|
|
|
# Bestehenden Job umplanen
|
|
conflicting_job_id = conflict.conflicting_job_ids[0]
|
|
affected_jobs = [conflict.affected_job_id, conflicting_job_id]
|
|
|
|
return ConflictResolution(
|
|
success=True,
|
|
strategy_used=ResolutionStrategy.PRIORITY_PREEMPTION,
|
|
new_printer_id=job_data.get('printer_id'),
|
|
new_start_time=job_data.get('start_time'),
|
|
new_end_time=job_data.get('end_time'),
|
|
affected_jobs=affected_jobs,
|
|
user_notification_required=True,
|
|
message=f"Höherpriorer Job übernimmt Zeitslot, bestehender Job wird umgeplant",
|
|
confidence_score=solution.get('confidence', 0.9)
|
|
)
|
|
|
|
# Hilfsmethoden für Lösungsvorschläge
|
|
|
|
def _generate_time_conflict_solutions(self, job_data: Dict,
|
|
conflicting_job: Job, db_session: Session) -> List[Dict]:
|
|
"""Generiert Lösungsvorschläge für Zeitkonflikte"""
|
|
solutions = []
|
|
|
|
# 1. Alternative Drucker vorschlagen
|
|
alternative_printers = self._find_alternative_printers(job_data, db_session)
|
|
for printer_id, confidence in alternative_printers:
|
|
printer = db_session.query(Printer).filter_by(id=printer_id).first()
|
|
solutions.append({
|
|
'strategy': ResolutionStrategy.AUTO_REASSIGN.value,
|
|
'new_printer_id': printer_id,
|
|
'printer_name': printer.name if printer else f"Drucker {printer_id}",
|
|
'confidence': confidence,
|
|
'description': f"Automatische Umzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}"
|
|
})
|
|
|
|
# 2. Zeitverschiebung vorschlagen
|
|
time_alternatives = self._find_alternative_time_slots(job_data, db_session)
|
|
for start_time, end_time, confidence in time_alternatives:
|
|
solutions.append({
|
|
'strategy': ResolutionStrategy.TIME_SHIFT.value,
|
|
'new_start_time': start_time,
|
|
'new_end_time': end_time,
|
|
'confidence': confidence,
|
|
'description': f"Zeitverschiebung: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}"
|
|
})
|
|
|
|
return solutions
|
|
|
|
def _generate_printer_alternative_solutions(self, job_data: Dict, db_session: Session) -> List[Dict]:
|
|
"""Generiert Lösungsvorschläge für Drucker-Ausfälle"""
|
|
solutions = []
|
|
|
|
alternative_printers = self._find_alternative_printers(job_data, db_session)
|
|
for printer_id, confidence in alternative_printers:
|
|
printer = db_session.query(Printer).filter_by(id=printer_id).first()
|
|
solutions.append({
|
|
'strategy': ResolutionStrategy.AUTO_REASSIGN.value,
|
|
'new_printer_id': printer_id,
|
|
'printer_name': printer.name if printer else f"Drucker {printer_id}",
|
|
'confidence': confidence,
|
|
'description': f"Automatische Neuzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}"
|
|
})
|
|
|
|
return solutions
|
|
|
|
def _generate_priority_conflict_solutions(self, job_data: Dict,
|
|
existing_job: Job, db_session: Session) -> List[Dict]:
|
|
"""Generiert Lösungsvorschläge für Prioritätskonflikte"""
|
|
solutions = []
|
|
|
|
# Bestehenden Job umplanen
|
|
alternative_slots = self._find_alternative_time_slots({
|
|
'printer_id': existing_job.printer_id,
|
|
'start_time': existing_job.start_at,
|
|
'end_time': existing_job.end_at,
|
|
'duration_minutes': existing_job.duration_minutes
|
|
}, db_session)
|
|
|
|
if alternative_slots:
|
|
start_time, end_time, confidence = alternative_slots[0]
|
|
solutions.append({
|
|
'strategy': ResolutionStrategy.PRIORITY_PREEMPTION.value,
|
|
'conflicting_job_new_start': start_time,
|
|
'conflicting_job_new_end': end_time,
|
|
'confidence': confidence,
|
|
'description': f"Bestehenden Job zu {start_time.strftime('%H:%M')} verschieben"
|
|
})
|
|
|
|
return solutions
|
|
|
|
def _find_alternative_printers(self, job_data: Dict, db_session: Session) -> List[Tuple[int, float]]:
|
|
"""Findet alternative Drucker mit Confidence-Score"""
|
|
from blueprints.calendar import get_smart_printer_assignment
|
|
|
|
alternatives = []
|
|
start_time = job_data.get('start_time')
|
|
end_time = job_data.get('end_time')
|
|
priority = job_data.get('priority', 'normal')
|
|
|
|
# Smart Assignment nutzen
|
|
recommended_printer_id = get_smart_printer_assignment(
|
|
start_date=start_time,
|
|
end_date=end_time,
|
|
priority=priority,
|
|
db_session=db_session
|
|
)
|
|
|
|
if recommended_printer_id:
|
|
alternatives.append((recommended_printer_id, 0.9))
|
|
|
|
# Weitere verfügbare Drucker mit niedrigerer Confidence
|
|
available_printers = db_session.query(Printer).filter(
|
|
Printer.active == True,
|
|
Printer.id != job_data.get('printer_id'),
|
|
Printer.id != recommended_printer_id
|
|
).all()
|
|
|
|
for printer in available_printers[:3]: # Top 3 Alternativen
|
|
# Einfache Verfügbarkeitsprüfung
|
|
conflicts = db_session.query(Job).filter(
|
|
Job.printer_id == printer.id,
|
|
Job.status.in_(["scheduled", "running"]),
|
|
or_(
|
|
and_(Job.start_at >= start_time, Job.start_at < end_time),
|
|
and_(Job.end_at > start_time, Job.end_at <= end_time),
|
|
and_(Job.start_at <= start_time, Job.end_at >= end_time)
|
|
)
|
|
).count()
|
|
|
|
if conflicts == 0:
|
|
alternatives.append((printer.id, 0.6)) # Niedrigere Confidence
|
|
|
|
return alternatives
|
|
|
|
def _find_alternative_time_slots(self, job_data: Dict, db_session: Session) -> List[Tuple[datetime, datetime, float]]:
|
|
"""Findet alternative Zeitfenster"""
|
|
alternatives = []
|
|
|
|
printer_id = job_data.get('printer_id')
|
|
original_start = job_data.get('start_time')
|
|
duration_minutes = job_data.get('duration_minutes')
|
|
|
|
if not all([printer_id, original_start, duration_minutes]):
|
|
return alternatives
|
|
|
|
duration = timedelta(minutes=duration_minutes)
|
|
|
|
# Zeitfenster um ursprünglichen Termin herum testen
|
|
test_intervals = [
|
|
timedelta(hours=1), # 1 Stunde später
|
|
timedelta(hours=2), # 2 Stunden später
|
|
timedelta(hours=-1), # 1 Stunde früher
|
|
timedelta(hours=3), # 3 Stunden später
|
|
timedelta(hours=-2), # 2 Stunden früher
|
|
]
|
|
|
|
for interval in test_intervals:
|
|
new_start = original_start + interval
|
|
new_end = new_start + duration
|
|
|
|
# Verfügbarkeit prüfen
|
|
conflicts = db_session.query(Job).filter(
|
|
Job.printer_id == printer_id,
|
|
Job.status.in_(["scheduled", "running"]),
|
|
or_(
|
|
and_(Job.start_at >= new_start, Job.start_at < new_end),
|
|
and_(Job.end_at > new_start, Job.end_at <= new_end),
|
|
and_(Job.start_at <= new_start, Job.end_at >= new_end)
|
|
)
|
|
).count()
|
|
|
|
if conflicts == 0:
|
|
# Confidence basierend auf Zeitnähe zum Original
|
|
time_diff_hours = abs(interval.total_seconds() / 3600)
|
|
confidence = max(0.3, 1.0 - (time_diff_hours * 0.1))
|
|
alternatives.append((new_start, new_end, confidence))
|
|
|
|
if len(alternatives) >= 3: # Maximal 3 Alternativen
|
|
break
|
|
|
|
return alternatives
|
|
|
|
def _calculate_overlap_duration(self, start1: datetime, end1: datetime,
|
|
start2: datetime, end2: datetime) -> timedelta:
|
|
"""Berechnet Überschneidungsdauer zwischen zwei Zeiträumen"""
|
|
overlap_start = max(start1, start2)
|
|
overlap_end = min(end1, end2)
|
|
|
|
if overlap_start < overlap_end:
|
|
return overlap_end - overlap_start
|
|
else:
|
|
return timedelta(0)
|
|
|
|
# Globale Instanz für einfache Nutzung
|
|
conflict_manager = ConflictManager() |