Files
Projektarbeit-MYP/backend/utils/job_queue_system.py
Till Tomczak 9696cdcc3f 🚀 Implementiere Backend-gesteuerte Drucker-Steuerung ohne JavaScript
NEUE ARCHITEKTUR - BACKEND DIKTIERT FRONTEND:
• Drucker-Steuerung erfolgt AUSSCHLIESSLICH über Tapo-Steckdosen
• KEIN JavaScript für Hardware-Kontrolle - nur Flask/Jinja Templates
• Backend sammelt ALLE Daten und übergibt sie komplett an Templates
• Frontend ist PASSIV und zeigt nur an, was Backend vorgibt

NEUE KOMPONENTEN:
 utils/hardware_integration.py: Komplett neugeschriebene DruckerSteuerung-Klasse
 blueprints/drucker_steuerung.py: Neue Backend-only Blueprint
 templates/drucker_steuerung.html: Pure HTML/CSS Template ohne JavaScript
 templates/drucker_details.html: Detailansicht für einzelne Drucker

TECHNISCHE UMSETZUNG:
• DruckerSteuerung-Klasse mit Singleton-Pattern für globale Hardware-Kontrolle
• template_daten_sammeln() sammelt ALLE UI-Daten server-side
• drucker_einschalten(), drucker_ausschalten(), drucker_toggle() für Backend-Kontrolle
• Vollständige Legacy-Kompatibilität für bestehende Systeme
• Status-Logging und Energie-Monitoring integriert

BENUTZER-ANFORDERUNG ERFÜLLT:
"sorge dafür, dass hardware integration ALLES macht bezüglich der tapo steckdosen
aka der drucker. KEIN JAVASCRIPT\! FLASK JINJA ONLY\! ALLES IM BACKEND\!
DAS BACKEND DIKTIERT DAS FRONTEND AN DEM PUNKT."

NÄCHSTE SCHRITTE:
• Integration des neuen Systems in bestehende Blueprints
• Vollständiger Übergang zu Backend-gesteuerter Architektur
• Test der neuen Hardware-Steuerung über /drucker/ Route

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-19 22:27:44 +02:00

514 lines
19 KiB
Python

#!/usr/bin/env python3.11
"""
Job & Queue System - ULTRA KONSOLIDIERUNG
=========================================
Migration Information:
- Ursprünglich: queue_manager.py, conflict_manager.py, timer_manager.py, job_scheduler.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: Job-Scheduling, Queue-Management, Konfliktauflösung, Timer-System
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
ULTRA KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
Ziel: DRASTISCHE Datei-Reduktion!
"""
import time
import threading
import queue
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from utils.logging_config import get_logger
from utils.hardware_integration import get_drucker_steuerung, get_tapo_controller
# Logger
job_logger = get_logger("job_queue_system")
# ===== ENUMS =====
class ConflictType(Enum):
"""Arten von Konflikten"""
TIME_OVERLAP = "time_overlap"
RESOURCE_CONFLICT = "resource_conflict"
DEPENDENCY_CONFLICT = "dependency_conflict"
PRIORITY_CONFLICT = "priority_conflict"
class ConflictSeverity(Enum):
"""Schweregrad von Konflikten"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
# ===== DATA STRUCTURES =====
@dataclass
class QueuedJob:
"""Job in der Warteschlange"""
job_id: int
printer_id: int
priority: int = 0
scheduled_time: Optional[datetime] = None
dependencies: List[int] = None
max_retries: int = 3
retry_count: int = 0
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
@dataclass
class TimerTask:
"""Timer-Aufgabe"""
task_id: str
callback: Callable
schedule_time: datetime
repeat_interval: Optional[timedelta] = None
max_repeats: Optional[int] = None
repeat_count: int = 0
is_active: bool = True
# ===== QUEUE MANAGER =====
class QueueManager:
"""Job-Warteschlangen-Management"""
def __init__(self):
self.job_queue = queue.PriorityQueue()
self.processing_jobs = {}
self.completed_jobs = []
self.failed_jobs = []
self.lock = threading.Lock()
def add_job(self, job: QueuedJob) -> bool:
"""Fügt Job zur Warteschlange hinzu"""
try:
with self.lock:
# Priorität negativ für PriorityQueue (höhere Zahl = höhere Priorität)
priority = -job.priority
self.job_queue.put((priority, job.job_id, job))
job_logger.info(f"Job {job.job_id} zur Warteschlange hinzugefügt")
return True
except Exception as e:
job_logger.error(f"Fehler beim Hinzufügen von Job {job.job_id}: {e}")
return False
def get_next_job(self) -> Optional[QueuedJob]:
"""Holt nächsten Job aus der Warteschlange"""
try:
if not self.job_queue.empty():
priority, job_id, job = self.job_queue.get_nowait()
with self.lock:
self.processing_jobs[job_id] = job
return job
return None
except queue.Empty:
return None
except Exception as e:
job_logger.error(f"Fehler beim Holen des nächsten Jobs: {e}")
return None
def complete_job(self, job_id: int, success: bool = True):
"""Markiert Job als abgeschlossen"""
with self.lock:
if job_id in self.processing_jobs:
job = self.processing_jobs.pop(job_id)
if success:
self.completed_jobs.append(job)
job_logger.info(f"Job {job_id} erfolgreich abgeschlossen")
else:
job.retry_count += 1
if job.retry_count < job.max_retries:
# Erneut einreihen
self.add_job(job)
job_logger.warning(f"Job {job_id} wird wiederholt (Versuch {job.retry_count})")
else:
self.failed_jobs.append(job)
job_logger.error(f"Job {job_id} endgültig fehlgeschlagen")
def get_queue_status(self) -> Dict[str, Any]:
"""Holt Status der Warteschlange"""
with self.lock:
return {
'queued': self.job_queue.qsize(),
'processing': len(self.processing_jobs),
'completed': len(self.completed_jobs),
'failed': len(self.failed_jobs)
}
# ===== CONFLICT MANAGER =====
class ConflictManager:
"""Konfliktauflösung bei Überschneidungen"""
def __init__(self):
self.active_conflicts = {}
def check_printer_conflict(self, printer_id: int, scheduled_time: datetime, duration: int) -> bool:
"""Prüft Drucker-Konflikt"""
try:
from models import get_db_session, Job
db_session = get_db_session()
# Zeitfenster für Konflikt-Check
start_time = scheduled_time
end_time = scheduled_time + timedelta(minutes=duration)
# Aktive Jobs für diesen Drucker
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(['pending', 'printing']),
Job.scheduled_time >= start_time - timedelta(minutes=30),
Job.scheduled_time <= end_time + timedelta(minutes=30)
).all()
db_session.close()
has_conflict = len(conflicting_jobs) > 0
if has_conflict:
job_logger.warning(f"Konflikt erkannt für Drucker {printer_id} um {scheduled_time}")
return has_conflict
except Exception as e:
job_logger.error(f"Konflikt-Check Fehler: {e}")
return True # Bei Fehler vorsichtig sein
def resolve_conflict(self, job1: QueuedJob, job2: QueuedJob) -> str:
"""Löst Konflikt zwischen zwei Jobs"""
# Prioritäts-basierte Auflösung
if job1.priority > job2.priority:
return f"job_{job1.job_id}_wins"
elif job2.priority > job1.priority:
return f"job_{job2.job_id}_wins"
else:
# Bei gleicher Priorität: Früherer Job gewinnt
if job1.scheduled_time and job2.scheduled_time:
if job1.scheduled_time < job2.scheduled_time:
return f"job_{job1.job_id}_wins"
else:
return f"job_{job2.job_id}_wins"
return "no_resolution"
def suggest_alternative_time(self, printer_id: int, requested_time: datetime, duration: int) -> Optional[datetime]:
"""Schlägt alternative Zeit vor"""
try:
# Versuche nächste verfügbare Slots
for offset in range(1, 24): # Bis zu 24 Stunden in die Zukunft
alternative_time = requested_time + timedelta(hours=offset)
if not self.check_printer_conflict(printer_id, alternative_time, duration):
job_logger.info(f"Alternative Zeit gefunden: {alternative_time}")
return alternative_time
return None
except Exception as e:
job_logger.error(f"Fehler bei alternativer Zeitfindung: {e}")
return None
# ===== TIMER MANAGER =====
class TimerManager:
"""Erweiterte Timer-Verwaltung"""
def __init__(self):
self.timers = {}
self.running = True
self.timer_thread = threading.Thread(target=self._timer_loop, daemon=True)
self.timer_thread.start()
def add_timer(self, task: TimerTask) -> bool:
"""Fügt Timer-Aufgabe hinzu"""
try:
self.timers[task.task_id] = task
job_logger.info(f"Timer {task.task_id} hinzugefügt für {task.schedule_time}")
return True
except Exception as e:
job_logger.error(f"Timer-Hinzufügung Fehler: {e}")
return False
def remove_timer(self, task_id: str) -> bool:
"""Entfernt Timer-Aufgabe"""
if task_id in self.timers:
del self.timers[task_id]
job_logger.info(f"Timer {task_id} entfernt")
return True
return False
def _timer_loop(self):
"""Timer-Hauptschleife"""
while self.running:
try:
current_time = datetime.now()
expired_timers = []
for task_id, task in self.timers.items():
if task.is_active and current_time >= task.schedule_time:
try:
# Callback ausführen
task.callback()
job_logger.debug(f"Timer {task_id} ausgeführt")
# Wiederholung prüfen
if task.repeat_interval and (task.max_repeats is None or task.repeat_count < task.max_repeats):
task.schedule_time = current_time + task.repeat_interval
task.repeat_count += 1
else:
expired_timers.append(task_id)
except Exception as e:
job_logger.error(f"Timer {task_id} Callback-Fehler: {e}")
expired_timers.append(task_id)
# Abgelaufene Timer entfernen
for task_id in expired_timers:
self.remove_timer(task_id)
time.sleep(1) # 1 Sekunde Pause
except Exception as e:
job_logger.error(f"Timer-Loop Fehler: {e}")
time.sleep(5)
# ===== JOB SCHEDULER =====
class JobScheduler:
"""Haupt-Job-Scheduler mit Smart-Plug-Integration"""
def __init__(self):
self.queue_manager = QueueManager()
self.conflict_manager = ConflictManager()
self.timer_manager = TimerManager()
self.running = True
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.scheduler_thread.start()
def schedule_job(self, job_id: int, printer_id: int, scheduled_time: datetime = None, priority: int = 0) -> bool:
"""Plant Job ein"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not job or not printer:
job_logger.error(f"Job {job_id} oder Drucker {printer_id} nicht gefunden")
db_session.close()
return False
# Standard-Zeitpunkt: Jetzt
if not scheduled_time:
scheduled_time = datetime.now()
# Konflikt-Check
duration = job.print_time or 60 # Standard: 1 Stunde
if self.conflict_manager.check_printer_conflict(printer_id, scheduled_time, duration):
# Alternative Zeit vorschlagen
alternative = self.conflict_manager.suggest_alternative_time(printer_id, scheduled_time, duration)
if alternative:
scheduled_time = alternative
job_logger.info(f"Job {job_id} auf alternative Zeit verschoben: {scheduled_time}")
else:
job_logger.error(f"Keine verfügbare Zeit für Job {job_id} gefunden")
db_session.close()
return False
# Job zur Warteschlange hinzufügen
queued_job = QueuedJob(
job_id=job_id,
printer_id=printer_id,
priority=priority,
scheduled_time=scheduled_time
)
success = self.queue_manager.add_job(queued_job)
if success:
# Job-Status aktualisieren
job.status = 'scheduled'
job.scheduled_time = scheduled_time
db_session.commit()
db_session.close()
return success
except Exception as e:
job_logger.error(f"Job-Einplanung Fehler: {e}")
return False
def start_job_execution(self, job_id: int) -> bool:
"""Startet Job-Ausführung mit Smart-Plug"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return False
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
# Smart-Plug einschalten
if printer and printer.tapo_ip:
plug_success = tapo_controller.turn_on_plug(printer.tapo_ip)
if plug_success:
job_logger.info(f"Smart-Plug für Drucker {printer.name} eingeschaltet")
else:
job_logger.warning(f"Smart-Plug für Drucker {printer.name} konnte nicht eingeschaltet werden")
# Job-Status aktualisieren
job.status = 'printing'
job.started_at = datetime.now()
db_session.commit()
# Timer für automatisches Beenden setzen
if job.print_time:
end_time = datetime.now() + timedelta(minutes=job.print_time)
timer_task = TimerTask(
task_id=f"job_end_{job_id}",
callback=lambda: self.finish_job_execution(job_id),
schedule_time=end_time
)
self.timer_manager.add_timer(timer_task)
db_session.close()
job_logger.info(f"Job {job_id} gestartet")
return True
except Exception as e:
job_logger.error(f"Job-Start Fehler: {e}")
return False
def finish_job_execution(self, job_id: int) -> bool:
"""Beendet Job-Ausführung"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return False
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
# Smart-Plug ausschalten
if printer and printer.tapo_ip:
plug_success = tapo_controller.turn_off_plug(printer.tapo_ip)
if plug_success:
job_logger.info(f"Smart-Plug für Drucker {printer.name} ausgeschaltet")
# Job-Status aktualisieren
job.status = 'completed'
job.completed_at = datetime.now()
db_session.commit()
# Aus Warteschlange entfernen
self.queue_manager.complete_job(job_id, success=True)
db_session.close()
job_logger.info(f"Job {job_id} abgeschlossen")
return True
except Exception as e:
job_logger.error(f"Job-Beendigung Fehler: {e}")
return False
def _scheduler_loop(self):
"""Scheduler-Hauptschleife"""
while self.running:
try:
# Nächsten Job holen
next_job = self.queue_manager.get_next_job()
if next_job:
# Prüfen ob Zeit erreicht
if not next_job.scheduled_time or datetime.now() >= next_job.scheduled_time:
self.start_job_execution(next_job.job_id)
else:
# Noch nicht Zeit - zurück in die Warteschlange
self.queue_manager.add_job(next_job)
time.sleep(10) # 10 Sekunden Pause
except Exception as e:
job_logger.error(f"Scheduler-Loop Fehler: {e}")
time.sleep(30)
# ===== GLOBALE INSTANZEN =====
queue_manager = QueueManager()
conflict_manager = ConflictManager()
timer_manager = TimerManager()
job_scheduler = JobScheduler()
# ===== CONVENIENCE FUNCTIONS =====
def schedule_print_job(job_id: int, printer_id: int, scheduled_time: datetime = None) -> bool:
"""Plant Druckauftrag ein"""
return job_scheduler.schedule_job(job_id, printer_id, scheduled_time)
def get_queue_status() -> Dict[str, Any]:
"""Holt Warteschlangen-Status"""
return queue_manager.get_queue_status()
def check_scheduling_conflict(printer_id: int, scheduled_time: datetime, duration: int) -> bool:
"""Prüft Terminkonflikt"""
return conflict_manager.check_printer_conflict(printer_id, scheduled_time, duration)
def add_system_timer(task_id: str, callback: Callable, schedule_time: datetime) -> bool:
"""Fügt System-Timer hinzu"""
timer_task = TimerTask(task_id=task_id, callback=callback, schedule_time=schedule_time)
return timer_manager.add_timer(timer_task)
# ===== LEGACY COMPATIBILITY =====
# Original queue_manager.py compatibility
class LegacyQueueManager:
@staticmethod
def add_to_queue(job_data):
return queue_manager.add_job(job_data)
def start_queue_manager():
"""Legacy-Kompatibilität für Queue Manager Start"""
job_logger.info("Queue Manager gestartet (Legacy-Kompatibilität)")
return True
def stop_queue_manager():
"""Legacy-Kompatibilität für Queue Manager Stop"""
job_logger.info("Queue Manager gestoppt (Legacy-Kompatibilität)")
return True
# Original conflict_manager.py compatibility
class LegacyConflictManager:
@staticmethod
def check_conflicts(printer_id, time, duration):
return conflict_manager.check_printer_conflict(printer_id, time, duration)
# Original timer_manager.py compatibility
class LegacyTimerManager:
@staticmethod
def schedule_task(task_id, callback, time):
return add_system_timer(task_id, callback, time)
job_logger.info("✅ Job & Queue System Module initialisiert")
job_logger.info("📊 MASSIVE Konsolidierung: 4 Dateien → 1 Datei (75% Reduktion)")