Files
Projektarbeit-MYP/backend/blueprints/calendar.py
Till Tomczak 9696cdcc3f 🚀 Implementiere Backend-gesteuerte Drucker-Steuerung ohne JavaScript
NEUE ARCHITEKTUR - BACKEND DIKTIERT FRONTEND:
• Drucker-Steuerung erfolgt AUSSCHLIESSLICH über Tapo-Steckdosen
• KEIN JavaScript für Hardware-Kontrolle - nur Flask/Jinja Templates
• Backend sammelt ALLE Daten und übergibt sie komplett an Templates
• Frontend ist PASSIV und zeigt nur an, was Backend vorgibt

NEUE KOMPONENTEN:
 utils/hardware_integration.py: Komplett neugeschriebene DruckerSteuerung-Klasse
 blueprints/drucker_steuerung.py: Neue Backend-only Blueprint
 templates/drucker_steuerung.html: Pure HTML/CSS Template ohne JavaScript
 templates/drucker_details.html: Detailansicht für einzelne Drucker

TECHNISCHE UMSETZUNG:
• DruckerSteuerung-Klasse mit Singleton-Pattern für globale Hardware-Kontrolle
• template_daten_sammeln() sammelt ALLE UI-Daten server-side
• drucker_einschalten(), drucker_ausschalten(), drucker_toggle() für Backend-Kontrolle
• Vollständige Legacy-Kompatibilität für bestehende Systeme
• Status-Logging und Energie-Monitoring integriert

BENUTZER-ANFORDERUNG ERFÜLLT:
"sorge dafür, dass hardware integration ALLES macht bezüglich der tapo steckdosen
aka der drucker. KEIN JAVASCRIPT\! FLASK JINJA ONLY\! ALLES IM BACKEND\!
DAS BACKEND DIKTIERT DAS FRONTEND AN DEM PUNKT."

NÄCHSTE SCHRITTE:
• Integration des neuen Systems in bestehende Blueprints
• Vollständiger Übergang zu Backend-gesteuerter Architektur
• Test der neuen Hardware-Steuerung über /drucker/ Route

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-19 22:27:44 +02:00

1551 lines
71 KiB
Python

import json
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, abort, make_response
from flask_login import current_user, login_required
from sqlalchemy import and_, or_, func
from models import Job, Printer, User, UserPermission, get_cached_session
from utils.logging_config import get_logger
from utils.job_queue_system import conflict_manager, ConflictType, ConflictSeverity
from utils.hardware_integration import get_drucker_steuerung, get_printer_monitor
calendar_blueprint = Blueprint('calendar', __name__)
logger = get_logger("calendar")
def can_edit_events(user):
"""Prüft, ob ein Benutzer Kalendereinträge bearbeiten darf."""
if user.is_admin:
return True
with get_cached_session() as db_session:
permission = db_session.query(UserPermission).filter_by(user_id=user.id).first()
if not permission:
return False
return permission.can_approve_jobs
def get_smart_printer_assignment(start_date, end_date, priority="normal", db_session=None):
"""
Intelligente Druckerzuweisung basierend auf verschiedenen Faktoren.
Args:
start_date: Startzeit des Jobs
end_date: Endzeit des Jobs
priority: Prioritätsstufe ('urgent', 'high', 'normal', 'low')
db_session: Datenbankverbindung
Returns:
printer_id: ID des empfohlenen Druckers oder None
"""
if not db_session:
return None
try:
# Verfügbare Drucker ermitteln (nur TBA Marienfelde)
available_printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.location == "TBA Marienfelde"
).all()
if not available_printers:
logger.warning("Keine aktiven Drucker für automatische Zuweisung gefunden")
return None
printer_scores = []
for printer in available_printers:
score = 0
# 1. Verfügbarkeit prüfen - Jobs im gleichen Zeitraum
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_date, Job.start_at < end_date),
and_(Job.end_at > start_date, Job.end_at <= end_date),
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
).count()
if conflicting_jobs > 0:
continue # Drucker ist nicht verfügbar
score += 100 # Grundpunkte für Verfügbarkeit
# 2. Auslastung in den letzten 24 Stunden bewerten
last_24h = datetime.now() - timedelta(hours=24)
recent_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.start_at >= last_24h,
Job.status.in_(["scheduled", "running", "finished"])
).count()
# Weniger Auslastung = höhere Punktzahl
score += max(0, 50 - (recent_jobs * 10))
# 3. Prioritätsbasierte Zuweisung
if priority == "urgent":
# Für dringende Jobs: Express-Drucker bevorzugen
if "express" in printer.name.lower() or "schnell" in printer.name.lower():
score += 30
elif priority == "high":
# Für hohe Priorität: Weniger belastete Drucker
if recent_jobs <= 2:
score += 20
# 4. Zeitfenster-basierte Zuweisung
start_hour = start_date.hour
if start_hour >= 18 or start_hour <= 6: # Nachtschicht
if "nacht" in printer.name.lower() or printer.location and "c" in printer.location.lower():
score += 25
elif start_hour >= 8 and start_hour <= 17: # Tagschicht
if "tag" in printer.name.lower() or printer.location and "a" in printer.location.lower():
score += 15
# 5. Standort-basierte Bewertung (Round-Robin ähnlich)
if printer.location:
location_penalty = hash(printer.location) % 10 # Verteilung basierend auf Standort
score += location_penalty
# 6. Druckerdauer-Eignung
job_duration_hours = (end_date - start_date).total_seconds() / 3600
if job_duration_hours > 8: # Lange Jobs
if "langzeit" in printer.name.lower() or "marathon" in printer.name.lower():
score += 20
elif job_duration_hours <= 2: # Kurze Jobs
if "express" in printer.name.lower() or "schnell" in printer.name.lower():
score += 15
# 7. Wartungszyklen berücksichtigen
# Neuere Drucker (falls last_maintenance_date verfügbar) bevorzugen
# Hinweis: Wartungszyklen-Feature geplant für erweiterte Printer-Models
printer_scores.append({
'printer': printer,
'score': score,
'conflicts': conflicting_jobs,
'recent_load': recent_jobs
})
# Nach Punktzahl sortieren und besten Drucker auswählen
if not printer_scores:
logger.warning("Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden")
return None
printer_scores.sort(key=lambda x: x['score'], reverse=True)
best_printer = printer_scores[0]
logger.info(f"Automatische Druckerzuweisung: {best_printer['printer'].name} "
f"(Score: {best_printer['score']}, Load: {best_printer['recent_load']})")
return best_printer['printer'].id
except Exception as e:
logger.error(f"Fehler bei automatischer Druckerzuweisung: {str(e)}")
return None
@calendar_blueprint.route('/calendar', methods=['GET'])
@login_required
def calendar_view():
"""Kalender-Ansicht anzeigen."""
can_edit = can_edit_events(current_user)
with get_cached_session() as db_session:
# Nur die 6 Standard-Drucker von TBA Marienfelde anzeigen
standard_printer_names = [
"3D-Drucker-001", "3D-Drucker-002", "3D-Drucker-003",
"3D-Drucker-004", "3D-Drucker-005", "3D-Drucker-006"
]
printers = db_session.query(Printer).filter(
Printer.location == "TBA Marienfelde",
Printer.name.in_(standard_printer_names),
Printer.active == True
).order_by(Printer.name).all()
return render_template('calendar.html',
printers=printers,
can_edit=can_edit)
@calendar_blueprint.route('/api/calendar', methods=['GET'])
@login_required
def api_get_calendar_events():
"""Kalendereinträge als JSON für FullCalendar zurückgeben."""
try:
# Datumsbereich aus Anfrage - FullCalendar verwendet 'start' und 'end'
start_str = request.args.get('start') or request.args.get('from')
end_str = request.args.get('end') or request.args.get('to')
if not start_str or not end_str:
# Standardmäßig eine Woche anzeigen
start_date = datetime.now()
end_date = start_date + timedelta(days=7)
else:
try:
# FullCalendar sendet ISO-Format mit Zeitzone, das muss geparst werden
if start_str and start_str.endswith('+02:00'):
start_str = start_str[:-6] # Zeitzone entfernen
if end_str and end_str.endswith('+02:00'):
end_str = end_str[:-6] # Zeitzone entfernen
start_date = datetime.fromisoformat(start_str)
end_date = datetime.fromisoformat(end_str)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
# Optional: Filter nach Druckern
printer_id = request.args.get('printer_id')
show_plug_events = request.args.get('show_plug_events', 'false').lower() == 'true'
with get_cached_session() as db_session:
# Jobs im angegebenen Zeitraum abfragen
query = db_session.query(Job).filter(
or_(
# Jobs, die im Zeitraum beginnen
and_(Job.start_at >= start_date, Job.start_at <= end_date),
# Jobs, die im Zeitraum enden
and_(Job.end_at >= start_date, Job.end_at <= end_date),
# Jobs, die den Zeitraum komplett umfassen
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
)
if printer_id:
query = query.filter(Job.printer_id == printer_id)
jobs = query.all()
# Jobs in FullCalendar-Event-Format umwandeln
events = []
for job in jobs:
# Farbe basierend auf Status bestimmen
color = "#6B7280" # Grau für pending
if job.status == "running":
color = "#3B82F6" # Blau für running
elif job.status == "finished":
color = "#10B981" # Grün für finished
elif job.status == "scheduled":
color = "#10B981" # Grün für approved
elif job.status == "cancelled" or job.status == "failed":
color = "#EF4444" # Rot für abgebrochen/fehlgeschlagen
# Benutzerinformationen laden
user = db_session.query(User).filter_by(id=job.user_id).first()
user_name = user.name if user else "Unbekannt"
# Druckerinformationen laden
printer = db_session.query(Printer).filter_by(id=job.printer_id).first()
printer_name = printer.name if printer else "Unbekannt"
event = {
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"color": color,
"extendedProps": {
"status": job.status,
"description": job.description,
"userName": user_name,
"printerId": job.printer_id,
"printerName": printer_name,
"eventType": "job"
}
}
# Für Admins: Erweiterte Steckdosen-Status-Informationen hinzufügen
if current_user.is_admin:
# Aktuellen Steckdosen-Status über neue Hardware-Integration abrufen
try:
drucker_steuerung = get_drucker_steuerung()
template_daten = drucker_steuerung.template_daten_sammeln()
# Drucker-Daten aus der Liste finden
printer_status = None
for drucker in template_daten.get('drucker', []):
if drucker['id'] == job.printer_id:
printer_status = {
"plug_status": drucker.get('status', 'unknown'),
"plug_reachable": drucker.get('kann_gesteuert_werden', False),
"has_plug": bool(drucker.get('plug_ip')),
"can_control": drucker.get('kann_gesteuert_werden', False),
"current_job": None, # Wird nicht mehr von der neuen Integration bereitgestellt
"next_job": None
}
break
if printer_status:
event["extendedProps"].update({
"plugStatus": printer_status.get("plug_status", "unknown"),
"plugReachable": printer_status.get("plug_reachable", False),
"hasPlug": printer_status.get("has_plug", False),
"canControl": printer_status.get("can_control", False),
"currentJob": printer_status.get("current_job"),
"nextJob": printer_status.get("next_job")
})
# Status-Display-Informationen hinzufügen (vereinfacht)
plug_status = printer_status.get("plug_status", "unknown")
if plug_status == 'online':
status_info = {"text": "Online", "color": "green", "icon": "check"}
elif plug_status == 'offline':
status_info = {"text": "Offline", "color": "red", "icon": "times"}
else:
status_info = {"text": "Unbekannt", "color": "gray", "icon": "question"}
event["extendedProps"]["statusDisplay"] = {
"text": status_info["text"],
"color": status_info["color"],
"icon": status_info["icon"]
}
except Exception as e:
logger.warning(f"Fehler beim Abrufen des Drucker-Status für Job {job.id}: {e}")
# Fallback-Status
event["extendedProps"].update({
"plugStatus": "unknown",
"plugReachable": False,
"hasPlug": False,
"canControl": False,
"statusDisplay": {"text": "Fehler", "color": "red", "icon": "exclamation"}
})
# Tooltip-Informationen hinzufügen
tooltip_parts = [
f"Job: {job.name}",
f"Drucker: {printer_name}",
f"Status: {job.status}",
f"Benutzer: {user_name}"
]
if printer_status.get("has_plug"):
plug_status_text = event["extendedProps"].get("statusDisplay", {}).get("text", "Unbekannt")
tooltip_parts.append(f"Steckdose: {plug_status_text}")
if printer_status.get("plug_reachable"):
tooltip_parts.append("✓ Steckdose erreichbar")
else:
tooltip_parts.append("✗ Steckdose nicht erreichbar")
event["tooltip"] = "\n".join(tooltip_parts)
events.append(event)
# Steckdosen-Status-Events hinzufügen (falls gewünscht)
if show_plug_events:
from models import PlugStatusLog
# PlugStatusLog-Events im Zeitraum abfragen
plug_query = db_session.query(PlugStatusLog).filter(
PlugStatusLog.timestamp >= start_date,
PlugStatusLog.timestamp <= end_date
)
if printer_id:
plug_query = plug_query.filter(PlugStatusLog.printer_id == printer_id)
plug_logs = plug_query.order_by(PlugStatusLog.timestamp.desc()).all()
# Steckdosen-Events gruppieren (nur Statuswechsel anzeigen)
for i, log in enumerate(plug_logs):
# Prüfen ob es eine Statusänderung ist (nicht einfach ein Status-Check)
if i < len(plug_logs) - 1:
prev_log = plug_logs[i + 1]
if prev_log.status == log.status and prev_log.printer_id == log.printer_id:
continue # Gleicher Status, überspringen
# Drucker-Name für den Event
printer = db_session.query(Printer).filter_by(id=log.printer_id).first()
printer_name = printer.name if printer else f"Drucker {log.printer_id}"
# Event-Farbe basierend auf Status
plug_color = "#FF6B35" # Orange für Steckdosen-Events
if log.status == "on":
plug_color = "#4ECDC4" # Türkis für eingeschaltet
elif log.status == "off":
plug_color = "#FFD23F" # Gelb für ausgeschaltet
elif log.status == "disconnected":
plug_color = "#EE6C4D" # Rot für nicht erreichbar
# Status-Text übersetzen
status_text = {
"on": "Eingeschaltet",
"off": "Ausgeschaltet",
"connected": "Verbunden",
"disconnected": "Offline"
}.get(log.status, log.status)
# Quelle anzeigen
source_text = {
"system": "System",
"manual": "Manuell",
"api": "API",
"scheduler": "Scheduler"
}.get(log.source, log.source)
plug_event = {
"id": f"plug_{log.id}",
"title": f"🔌 {printer_name}: {status_text}",
"start": log.timestamp.isoformat(),
"end": log.timestamp.isoformat(), # Punktereignis
"color": plug_color,
"display": "list-item", # Als kleines Item anzeigen
"extendedProps": {
"eventType": "plug_status",
"status": log.status,
"printerId": log.printer_id,
"printerName": printer_name,
"source": source_text,
"powerConsumption": log.power_consumption,
"voltage": log.voltage,
"current": log.current,
"responseTime": log.response_time_ms,
"notes": log.notes,
"errorMessage": log.error_message
}
}
events.append(plug_event)
logger.info(f"📅 Kalender-Events abgerufen: {len(events)} Einträge für Zeitraum {start_date} bis {end_date}")
return jsonify(events)
except Exception as e:
logger.error(f"Fehler beim Abrufen der Kalendereinträge: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
# Zusätzliche Route für FullCalendar-Kompatibilität
@calendar_blueprint.route('/api/calendar/events', methods=['GET'])
@login_required
def api_get_calendar_events_alt():
"""Alternative Route für FullCalendar-Events - delegiert an api_get_calendar_events."""
return api_get_calendar_events()
@calendar_blueprint.route('/api/calendar/event', methods=['POST'])
@login_required
def api_create_calendar_event():
"""Neuen Kalendereintrag (Job) erstellen."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge erstellen
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Erstellen von Kalendereinträgen"}), 403
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
# Pflichtfelder prüfen
title = data.get('title')
start = data.get('start')
end = data.get('end')
printer_id = data.get('printerId') # Jetzt optional
priority = data.get('priority', 'normal')
if not all([title, start, end]):
return jsonify({"error": "Titel, Start und Ende sind erforderlich"}), 400
# Datumsfelder konvertieren
try:
start_date = datetime.fromisoformat(start)
end_date = datetime.fromisoformat(end)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
# Dauer in Minuten berechnen
duration_minutes = int((end_date - start_date).total_seconds() / 60)
with get_cached_session() as db_session:
# Intelligente Druckerzuweisung falls kein Drucker angegeben
if not printer_id:
logger.info(f"Automatische Druckerzuweisung wird verwendet für Job '{title}'")
printer_id = get_smart_printer_assignment(
start_date=start_date,
end_date=end_date,
priority=priority,
db_session=db_session
)
if not printer_id:
return jsonify({
"error": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden. "
"Bitte wählen Sie einen spezifischen Drucker oder einen anderen Zeitraum."
}), 409
# Drucker prüfen/validieren
printer = db_session.query(Printer).filter_by(id=printer_id).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
if not printer.active:
return jsonify({"error": f"Drucker '{printer.name}' ist nicht aktiv"}), 400
# Nochmals Verfügbarkeit prüfen bei automatischer Zuweisung
if not data.get('printerId'): # War automatische Zuweisung
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_date, Job.start_at < end_date),
and_(Job.end_at > start_date, Job.end_at <= end_date),
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
).first()
if conflicting_jobs:
return jsonify({
"error": f"Automatisch zugewiesener Drucker '{printer.name}' ist nicht mehr verfügbar. "
"Bitte wählen Sie einen anderen Zeitraum oder spezifischen Drucker."
}), 409
# Neuen Job erstellen
job = Job(
name=title,
description=data.get('description', ''),
user_id=current_user.id,
printer_id=printer_id,
start_at=start_date,
end_at=end_date,
status="scheduled",
duration_minutes=duration_minutes,
owner_id=current_user.id
)
db_session.add(job)
db_session.commit()
assignment_type = "automatisch" if not data.get('printerId') else "manuell"
logger.info(f"Neuer Kalendereintrag erstellt: ID {job.id}, Name: {title}, "
f"Drucker: {printer.name} ({assignment_type} zugewiesen)")
return jsonify({
"success": True,
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status,
"printer": {
"id": printer.id,
"name": printer.name,
"location": printer.location,
"assignment_type": assignment_type
},
"message": f"Auftrag erfolgreich erstellt und {assignment_type} dem Drucker '{printer.name}' zugewiesen."
})
except Exception as e:
logger.error(f"Fehler beim Erstellen des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event/<int:event_id>', methods=['PUT'])
@login_required
def api_update_calendar_event(event_id):
"""Kalendereintrag (Job) aktualisieren."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge bearbeiten
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Bearbeiten von Kalendereinträgen"}), 403
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
with get_cached_session() as db_session:
job = db_session.query(Job).filter_by(id=event_id).first()
if not job:
return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404
# Felder aktualisieren, die im Request enthalten sind
if 'title' in data:
job.name = data['title']
if 'description' in data:
job.description = data['description']
if 'start' in data and 'end' in data:
try:
start_date = datetime.fromisoformat(data['start'])
end_date = datetime.fromisoformat(data['end'])
job.start_at = start_date
job.end_at = end_date
job.duration_minutes = int((end_date - start_date).total_seconds() / 60)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
if 'printerId' in data:
printer = db_session.query(Printer).filter_by(id=data['printerId']).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
job.printer_id = data['printerId']
if 'status' in data:
# Status nur ändern, wenn er gültig ist
valid_statuses = ["scheduled", "running", "finished", "cancelled"]
if data['status'] in valid_statuses:
job.status = data['status']
db_session.commit()
logger.info(f"Kalendereintrag aktualisiert: ID {job.id}")
return jsonify({
"success": True,
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status
})
except Exception as e:
logger.error(f"Fehler beim Aktualisieren des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event/<int:event_id>', methods=['DELETE'])
@login_required
def api_delete_calendar_event(event_id):
"""Kalendereintrag (Job) löschen."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge löschen
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Löschen von Kalendereinträgen"}), 403
try:
with get_cached_session() as db_session:
job = db_session.query(Job).filter_by(id=event_id).first()
if not job:
return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404
db_session.delete(job)
db_session.commit()
logger.info(f"Kalendereintrag gelöscht: ID {event_id}")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Fehler beim Löschen des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/smart-recommendation', methods=['POST'])
@login_required
def api_get_smart_recommendation():
"""Intelligente Druckerempfehlung für gegebenes Zeitfenster abrufen."""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
start = data.get('start')
end = data.get('end')
priority = data.get('priority', 'normal')
if not all([start, end]):
return jsonify({"error": "Start und Ende sind erforderlich"}), 400
# Datumsfelder konvertieren
try:
start_date = datetime.fromisoformat(start)
end_date = datetime.fromisoformat(end)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
with get_cached_session() as db_session:
# Empfohlenen Drucker ermitteln
recommended_printer_id = get_smart_printer_assignment(
start_date=start_date,
end_date=end_date,
priority=priority,
db_session=db_session
)
if not recommended_printer_id:
return jsonify({
"success": False,
"message": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden."
})
# Drucker-Details abrufen
printer = db_session.query(Printer).filter_by(id=recommended_printer_id).first()
if not printer:
return jsonify({
"success": False,
"message": "Empfohlener Drucker nicht mehr verfügbar."
})
# Zusätzliche Statistiken für die Empfehlung berechnen
last_24h = datetime.now() - timedelta(hours=24)
recent_jobs_count = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.start_at >= last_24h,
Job.status.in_(["scheduled", "running", "finished"])
).count()
# Verfügbarkeit als Prozentsatz berechnen
total_time_slots = 24 # Stunden pro Tag
availability_percent = max(0, 100 - (recent_jobs_count * 4)) # Grobe Schätzung
# Auslastung berechnen
utilization_percent = min(100, recent_jobs_count * 8) # Grobe Schätzung
# Eignung basierend auf Priorität und Zeitfenster bestimmen
suitability = "Gut"
if priority == "urgent" and ("express" in printer.name.lower() or "schnell" in printer.name.lower()):
suitability = "Perfekt"
elif priority == "high" and recent_jobs_count <= 2:
suitability = "Ausgezeichnet"
elif recent_jobs_count == 0:
suitability = "Optimal"
# Begründung generieren
reason = f"Optimale Verfügbarkeit und geringe Auslastung im gewählten Zeitraum"
job_duration_hours = (end_date - start_date).total_seconds() / 3600
start_hour = start_date.hour
if priority == "urgent":
reason = "Schnellster verfügbarer Drucker für dringende Aufträge"
elif start_hour >= 18 or start_hour <= 6:
reason = "Speziell für Nachtschichten optimiert"
elif job_duration_hours > 8:
reason = "Zuverlässig für lange Druckaufträge"
elif job_duration_hours <= 2:
reason = "Optimal für schnelle Druckaufträge"
return jsonify({
"success": True,
"recommendation": {
"printer_id": printer.id,
"printer_name": f"{printer.name}",
"location": printer.location or "Haupthalle",
"reason": reason,
"availability": f"{availability_percent}%",
"utilization": f"{utilization_percent}%",
"suitability": suitability,
"recent_jobs": recent_jobs_count,
"priority_optimized": priority in ["urgent", "high"] and suitability in ["Perfekt", "Ausgezeichnet"]
}
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der intelligenten Empfehlung: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/export', methods=['GET'])
@login_required
def api_export_calendar():
"""
Exportiert Kalenderdaten in verschiedenen Formaten (CSV, JSON, Excel).
URL-Parameter:
- format: csv, json, excel (Standard: csv)
- start_date: ISO-Format Start-Datum (Optional)
- end_date: ISO-Format End-Datum (Optional)
- printer_id: Spezifischer Drucker-Filter (Optional)
- status: Status-Filter (Optional)
"""
try:
# Parameter aus Request extrahieren
export_format = request.args.get('format', 'csv').lower()
start_str = request.args.get('start_date')
end_str = request.args.get('end_date')
printer_id = request.args.get('printer_id')
status_filter = request.args.get('status')
# Datumsbereich bestimmen (Standard: nächste 4 Wochen)
if start_str and end_str:
try:
start_date = datetime.fromisoformat(start_str)
end_date = datetime.fromisoformat(end_str)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat. Verwenden Sie ISO-Format."}), 400
else:
# Standard: Kommende 4 Wochen
start_date = datetime.now()
end_date = start_date + timedelta(days=28)
with get_cached_session() as db_session:
# Basis-Query für Jobs im Zeitraum
query = db_session.query(Job).filter(
or_(
# Jobs, die im Zeitraum beginnen
and_(Job.start_at >= start_date, Job.start_at <= end_date),
# Jobs, die im Zeitraum enden
and_(Job.end_at >= start_date, Job.end_at <= end_date),
# Jobs, die den Zeitraum komplett umfassen
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
)
# Filter anwenden
if printer_id:
try:
printer_id_int = int(printer_id)
query = query.filter(Job.printer_id == printer_id_int)
except ValueError:
return jsonify({"error": "Ungültige Drucker-ID"}), 400
if status_filter:
query = query.filter(Job.status == status_filter)
# Jobs abrufen und sortieren
jobs = query.order_by(Job.start_at.asc()).all()
if not jobs:
return jsonify({"message": "Keine Daten im angegebenen Zeitraum gefunden"}), 404
# Zusätzliche Daten für Export sammeln
export_data = []
for job in jobs:
# Benutzerinformationen
user = db_session.query(User).filter_by(id=job.user_id).first()
user_name = user.name if user else "Unbekannt"
user_email = user.email if user else ""
# Druckerinformationen
printer = db_session.query(Printer).filter_by(id=job.printer_id).first()
printer_name = printer.name if printer else "Unbekannt"
printer_location = printer.location if printer else ""
printer_model = printer.model if printer else ""
# Status-Übersetzung
status_mapping = {
"scheduled": "Geplant",
"running": "Läuft",
"finished": "Abgeschlossen",
"cancelled": "Abgebrochen",
"failed": "Fehlgeschlagen",
"pending": "Wartend"
}
status_german = status_mapping.get(job.status, job.status)
# Priorität-Übersetzung
priority_mapping = {
"urgent": "Dringend",
"high": "Hoch",
"normal": "Standard",
"low": "Niedrig"
}
priority_german = priority_mapping.get(job.priority if hasattr(job, 'priority') and job.priority else 'normal', 'Standard')
export_entry = {
"Job_ID": job.id,
"Auftragsname": job.name,
"Beschreibung": job.description or "",
"Status": status_german,
"Priorität": priority_german,
"Benutzer": user_name,
"Benutzer_E-Mail": user_email,
"Drucker": printer_name,
"Drucker_Standort": printer_location,
"Drucker_Modell": printer_model,
"Startzeit": job.start_at.strftime('%d.%m.%Y %H:%M:%S') if job.start_at else "",
"Endzeit": job.end_at.strftime('%d.%m.%Y %H:%M:%S') if job.end_at else "",
"Dauer_Minuten": job.duration_minutes or 0,
"Dauer_Stunden": round((job.duration_minutes or 0) / 60, 2),
"Erstellt_am": job.created_at.strftime('%d.%m.%Y %H:%M:%S') if job.created_at else "",
"Abgeschlossen_am": job.completed_at.strftime('%d.%m.%Y %H:%M:%S') if job.completed_at else "",
"Dateiname": job.filename if hasattr(job, 'filename') and job.filename else "",
"Materialverbrauch": getattr(job, 'material_used', "") or "",
"Kosten_EUR": getattr(job, 'cost', "") or ""
}
export_data.append(export_entry)
# Format-spezifischer Export
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if export_format == 'csv':
import csv
import io
# CSV-Export implementierung
output = io.StringIO()
writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL)
# Header-Zeile
if export_data:
headers = list(export_data[0].keys())
writer.writerow(headers)
# Daten-Zeilen
for row in export_data:
formatted_row = []
for value in row.values():
if value is None:
formatted_row.append('')
else:
formatted_row.append(str(value))
writer.writerow(formatted_row)
# Response erstellen
csv_content = output.getvalue()
output.close()
# BOM für UTF-8 hinzufügen (Excel-Kompatibilität)
csv_content = '\ufeff' + csv_content
response = make_response(csv_content)
response.headers['Content-Type'] = 'text/csv; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.csv"'
logger.info(f"📊 CSV-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}")
return response
elif export_format == 'json':
# JSON-Export implementierung
export_metadata = {
"export_info": {
"erstellt_am": datetime.now().strftime('%d.%m.%Y %H:%M:%S'),
"exportiert_von": current_user.username,
"zeitraum_von": start_date.strftime('%d.%m.%Y'),
"zeitraum_bis": end_date.strftime('%d.%m.%Y'),
"anzahl_jobs": len(export_data),
"filter_drucker_id": printer_id,
"filter_status": status_filter
},
"produktionsplan": export_data
}
json_content = json.dumps(export_metadata, indent=2, ensure_ascii=False)
response = make_response(json_content)
response.headers['Content-Type'] = 'application/json; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.json"'
logger.info(f"📊 JSON-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}")
return response
elif export_format == 'excel':
# Excel-Export implementierung
try:
import pandas as pd
import io
# DataFrame erstellen
df = pd.DataFrame(export_data)
# Excel-Datei in Memory erstellen
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
# Hauptdaten-Sheet
df.to_excel(writer, sheet_name='Produktionsplan', index=False)
# Zusammenfassung-Sheet
summary_data = {
'Metrik': [
'Gesamte Jobs',
'Geplante Jobs',
'Laufende Jobs',
'Abgeschlossene Jobs',
'Abgebrochene Jobs',
'Gesamte Produktionszeit (Stunden)',
'Durchschnittliche Job-Dauer (Stunden)',
'Anzahl verschiedener Drucker',
'Anzahl verschiedener Benutzer',
'Export erstellt am',
'Zeitraum von',
'Zeitraum bis'
],
'Wert': [
len(export_data),
len([j for j in export_data if j['Status'] == 'Geplant']),
len([j for j in export_data if j['Status'] == 'Läuft']),
len([j for j in export_data if j['Status'] == 'Abgeschlossen']),
len([j for j in export_data if j['Status'] in ['Abgebrochen', 'Fehlgeschlagen']]),
round(sum([j['Dauer_Stunden'] for j in export_data]), 2),
round(sum([j['Dauer_Stunden'] for j in export_data]) / len(export_data), 2) if export_data else 0,
len(set([j['Drucker'] for j in export_data])),
len(set([j['Benutzer'] for j in export_data])),
datetime.now().strftime('%d.%m.%Y %H:%M:%S'),
start_date.strftime('%d.%m.%Y'),
end_date.strftime('%d.%m.%Y')
]
}
summary_df = pd.DataFrame(summary_data)
summary_df.to_excel(writer, sheet_name='Zusammenfassung', index=False)
# Auto-fit Spaltenbreite
for sheet_name in writer.sheets:
worksheet = writer.sheets[sheet_name]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
response = make_response(output.getvalue())
response.headers['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.xlsx"'
logger.info(f"📊 Excel-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}")
return response
except ImportError:
return jsonify({
"error": "Excel-Export nicht verfügbar. Pandas/openpyxl-Bibliotheken fehlen. "
"Verwenden Sie CSV oder JSON Export."
}), 501
except Exception as e:
logger.error(f"Fehler beim Excel-Export: {str(e)}")
return jsonify({"error": f"Fehler beim Excel-Export: {str(e)}"}), 500
else:
return jsonify({"error": f"Unbekanntes Export-Format: {export_format}. Unterstützt: csv, json, excel"}), 400
except Exception as e:
logger.error(f"Fehler beim Kalender-Export: {str(e)}")
return jsonify({"error": f"Fehler beim Export: {str(e)}"}), 500
@calendar_blueprint.route('/api/calendar/check-conflicts', methods=['POST'])
@login_required
def api_check_conflicts():
"""
Prüft explizit auf Konflikte für eine geplante Reservierung
und gibt detaillierte Konfliktinformationen zurück.
"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
# Pflichtfelder prüfen
required_fields = ['start_time', 'end_time']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren
try:
start_time = datetime.fromisoformat(data['start_time'])
end_time = datetime.fromisoformat(data['end_time'])
printer_id = data.get('printer_id')
priority = data.get('priority', 'normal')
duration_minutes = int((end_time - start_time).total_seconds() / 60)
except (ValueError, TypeError) as e:
return jsonify({"error": f"Ungültige Datenformate: {str(e)}"}), 400
with get_cached_session() as db_session:
# Job-Daten für Konfliktanalyse vorbereiten
job_data = {
'printer_id': printer_id,
'start_time': start_time,
'end_time': end_time,
'priority': priority,
'duration_minutes': duration_minutes
}
# Konfliktanalyse durchführen
conflicts = conflict_manager.detect_conflicts(job_data, db_session)
# Konfliktinformationen für Response aufbereiten
conflict_info = []
total_severity_score = 0
for conflict in conflicts:
severity_scores = {
ConflictSeverity.CRITICAL: 4,
ConflictSeverity.HIGH: 3,
ConflictSeverity.MEDIUM: 2,
ConflictSeverity.LOW: 1,
ConflictSeverity.INFO: 0
}
total_severity_score += severity_scores.get(conflict.severity, 0)
# Konflikthafte Jobs laden für Details
conflicting_job_details = []
for job_id in conflict.conflicting_job_ids:
job = db_session.query(Job).filter_by(id=job_id).first()
if job:
conflicting_job_details.append({
'id': job.id,
'name': job.name,
'start_time': job.start_at.isoformat() if job.start_at else None,
'end_time': job.end_at.isoformat() if job.end_at else None,
'user_name': job.user.name if job.user else "Unbekannt"
})
# Drucker-Details laden
printer_info = None
if conflict.affected_printer_id:
printer = db_session.query(Printer).filter_by(id=conflict.affected_printer_id).first()
if printer:
printer_info = {
'id': printer.id,
'name': printer.name,
'location': printer.location,
'status': printer.status,
'active': printer.active
}
conflict_detail = {
'type': conflict.conflict_type.value,
'severity': conflict.severity.value,
'description': conflict.description,
'estimated_impact': conflict.estimated_impact,
'auto_resolvable': conflict.auto_resolvable,
'conflict_start': conflict.conflict_start.isoformat() if conflict.conflict_start else None,
'conflict_end': conflict.conflict_end.isoformat() if conflict.conflict_end else None,
'affected_printer': printer_info,
'conflicting_jobs': conflicting_job_details,
'suggested_solutions': conflict.suggested_solutions
}
conflict_info.append(conflict_detail)
# Gesamtbewertung
has_conflicts = len(conflicts) > 0
can_proceed = all(c.auto_resolvable for c in conflicts) or len(conflicts) == 0
# Empfehlungen basierend auf Konflikten
recommendations = []
if has_conflicts:
if can_proceed:
recommendations.append({
'type': 'auto_resolve',
'message': 'Konflikte können automatisch gelöst werden',
'action': 'Automatische Lösung anwenden'
})
else:
recommendations.append({
'type': 'manual_intervention',
'message': 'Manuelle Anpassung erforderlich',
'action': 'Zeitraum oder Drucker ändern'
})
# Alternative Zeitfenster vorschlagen
if printer_id:
alternative_slots = conflict_manager._find_alternative_time_slots(job_data, db_session)
if alternative_slots:
slot_suggestions = []
for start, end, confidence in alternative_slots[:3]:
slot_suggestions.append({
'start_time': start.isoformat(),
'end_time': end.isoformat(),
'confidence': confidence,
'description': f"{start.strftime('%H:%M')} - {end.strftime('%H:%M')}"
})
recommendations.append({
'type': 'alternative_times',
'message': 'Alternative Zeitfenster verfügbar',
'suggestions': slot_suggestions
})
# Alternative Drucker vorschlagen
alternative_printers = conflict_manager._find_alternative_printers(job_data, db_session)
if alternative_printers:
printer_suggestions = []
for printer_id_alt, confidence in alternative_printers[:3]:
printer = db_session.query(Printer).filter_by(id=printer_id_alt).first()
if printer:
printer_suggestions.append({
'printer_id': printer.id,
'printer_name': printer.name,
'location': printer.location,
'confidence': confidence
})
recommendations.append({
'type': 'alternative_printers',
'message': 'Alternative Drucker verfügbar',
'suggestions': printer_suggestions
})
logger.info(f"🔍 Konfliktprüfung abgeschlossen: {len(conflicts)} Konflikte, "
f"Schweregrad: {total_severity_score}, Automatisch lösbar: {can_proceed}")
return jsonify({
'has_conflicts': has_conflicts,
'can_proceed': can_proceed,
'severity_score': total_severity_score,
'conflict_count': len(conflicts),
'conflicts': conflict_info,
'recommendations': recommendations,
'summary': {
'critical_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.CRITICAL]),
'high_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.HIGH]),
'medium_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.MEDIUM]),
'low_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.LOW])
}
})
except Exception as e:
logger.error(f"❌ Fehler bei Konfliktprüfung: {str(e)}", exc_info=True)
return jsonify({"error": "Fehler bei der Konfliktanalyse"}), 500
@calendar_blueprint.route('/api/calendar/resolve-conflicts', methods=['POST'])
@login_required
def api_resolve_conflicts():
"""
Löst erkannte Konflikte automatisch und erstellt den Job
mit den optimalen Parametern.
"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
# Pflichtfelder prüfen
required_fields = ['start_time', 'end_time', 'title']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren
try:
start_time = datetime.fromisoformat(data['start_time'])
end_time = datetime.fromisoformat(data['end_time'])
title = data['title']
description = data.get('description', '')
printer_id = data.get('printer_id')
priority = data.get('priority', 'normal')
duration_minutes = int((end_time - start_time).total_seconds() / 60)
auto_resolve = data.get('auto_resolve', True)
except (ValueError, TypeError) as e:
return jsonify({"error": f"Ungültige Datenformate: {str(e)}"}), 400
with get_cached_session() as db_session:
# Job-Daten für Konfliktanalyse vorbereiten
job_data = {
'printer_id': printer_id,
'start_time': start_time,
'end_time': end_time,
'priority': priority,
'duration_minutes': duration_minutes,
'title': title,
'description': description
}
# Konfliktanalyse durchführen
conflicts = conflict_manager.detect_conflicts(job_data, db_session)
final_job_data = job_data.copy()
resolution_messages = []
if conflicts and auto_resolve:
# Konflikte automatisch lösen
resolutions = conflict_manager.resolve_conflicts(conflicts, job_data, db_session)
# Erste erfolgreiche Lösung anwenden
successful_resolution = next((r for r in resolutions if r.success), None)
if successful_resolution:
# Job-Parameter basierend auf Lösung anpassen
if successful_resolution.new_printer_id:
final_job_data['printer_id'] = successful_resolution.new_printer_id
if successful_resolution.new_start_time:
final_job_data['start_time'] = successful_resolution.new_start_time
if successful_resolution.new_end_time:
final_job_data['end_time'] = successful_resolution.new_end_time
resolution_messages.append(successful_resolution.message)
logger.info(f"🔧 Konflikt automatisch gelöst: {successful_resolution.strategy_used.value}")
else:
return jsonify({
"error": "Konflikte können nicht automatisch gelöst werden",
"conflicts": [c.description for c in conflicts],
"requires_manual_intervention": True
}), 409
elif conflicts and not auto_resolve:
return jsonify({
"error": "Konflikte erkannt - automatische Lösung deaktiviert",
"conflicts": [c.description for c in conflicts],
"suggestions": [s for c in conflicts for s in c.suggested_solutions]
}), 409
# Finalen Drucker ermitteln
if not final_job_data.get('printer_id'):
# Intelligente Druckerzuweisung
printer_id = get_smart_printer_assignment(
start_date=final_job_data['start_time'],
end_date=final_job_data['end_time'],
priority=priority,
db_session=db_session
)
if not printer_id:
return jsonify({
"error": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden"
}), 409
final_job_data['printer_id'] = printer_id
resolution_messages.append("Drucker automatisch zugewiesen")
# Drucker validieren
printer = db_session.query(Printer).filter_by(id=final_job_data['printer_id']).first()
if not printer:
return jsonify({"error": "Zugewiesener Drucker nicht gefunden"}), 404
if not printer.active:
return jsonify({"error": f"Drucker '{printer.name}' ist nicht aktiv"}), 400
# Job erstellen
job = Job(
name=title,
description=description,
user_id=current_user.id,
printer_id=final_job_data['printer_id'],
start_at=final_job_data['start_time'],
end_at=final_job_data['end_time'],
status="scheduled",
duration_minutes=duration_minutes,
owner_id=current_user.id
)
db_session.add(job)
db_session.commit()
assignment_type = "automatisch mit Konfliktlösung" if conflicts else "automatisch"
logger.info(f"✅ Job mit Konfliktlösung erstellt: ID {job.id}, "
f"Drucker: {printer.name} ({assignment_type})")
return jsonify({
"success": True,
"job": {
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status
},
"printer": {
"id": printer.id,
"name": printer.name,
"location": printer.location
},
"conflict_resolution": {
"conflicts_detected": len(conflicts),
"conflicts_resolved": len([r for r in (resolutions if conflicts else []) if r.success]),
"messages": resolution_messages,
"assignment_type": assignment_type
}
})
except Exception as e:
logger.error(f"❌ Fehler bei Konfliktlösung und Job-Erstellung: {str(e)}", exc_info=True)
return jsonify({"error": "Fehler bei der Verarbeitung"}), 500
@calendar_blueprint.route('/api/calendar/statistics', methods=['GET'])
@login_required
def api_calendar_statistics():
"""Kalender-Statistiken für Dashboard-Anzeige."""
try:
with get_cached_session() as db_session:
# Aktive Jobs (running)
active_jobs = db_session.query(Job).filter(
Job.status == "running"
).count()
# Jobs in Warteschlange (scheduled)
queued_jobs = db_session.query(Job).filter(
Job.status == "scheduled"
).count()
# Heute geplante Jobs
today = datetime.now().date()
today_start = datetime.combine(today, datetime.min.time())
today_end = datetime.combine(today, datetime.max.time())
today_jobs = db_session.query(Job).filter(
Job.start_at >= today_start,
Job.start_at <= today_end,
Job.status.in_(["scheduled", "running"])
).all()
# Gesamte Produktionszeit heute (in Stunden)
total_time = 0
for job in today_jobs:
if job.duration_minutes:
total_time += job.duration_minutes / 60
# Auslastung berechnen (basierend auf verfügbaren Druckern)
available_printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.location == "TBA Marienfelde"
).count()
# Maximale Arbeitszeit pro Tag (10 Stunden pro Drucker)
max_daily_hours = available_printers * 10
utilization = min(100, (total_time / max_daily_hours * 100)) if max_daily_hours > 0 else 0
statistics = {
"active_jobs": active_jobs,
"queued_jobs": queued_jobs,
"total_time": round(total_time, 1),
"utilization": round(utilization, 1),
"available_printers": available_printers,
"today_jobs_count": len(today_jobs)
}
logger.debug(f"📊 Kalender-Statistiken abgerufen: {statistics}")
return jsonify(statistics)
except Exception as e:
logger.error(f"Fehler beim Abrufen der Kalender-Statistiken: {str(e)}")
return jsonify({
"active_jobs": 0,
"queued_jobs": 0,
"total_time": 0,
"utilization": 0,
"available_printers": 0,
"today_jobs_count": 0
}), 200 # Fallback-Daten zurückgeben statt Fehler
@calendar_blueprint.route('/api/calendar/printer-availability', methods=['GET'])
@login_required
def api_printer_availability():
"""
Zeigt detaillierte Verfügbarkeit aller Drucker für einen Zeitraum an.
"""
try:
# Parameter extrahieren
start_str = request.args.get('start')
end_str = request.args.get('end')
if not start_str or not end_str:
return jsonify({"error": "Start- und Endzeit erforderlich"}), 400
try:
start_time = datetime.fromisoformat(start_str)
end_time = datetime.fromisoformat(end_str)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
with get_cached_session() as db_session:
# Nur TBA Marienfelde Drucker laden für Verfügbarkeitsanalyse
printers = db_session.query(Printer).filter(
Printer.location == "TBA Marienfelde"
).all()
availability_info = []
for printer in printers:
# Jobs im Zeitraum finden
jobs_in_period = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).all()
# Auslastung der letzten 24 Stunden
last_24h = datetime.now() - timedelta(hours=24)
recent_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.start_at >= last_24h,
Job.status.in_(["scheduled", "running", "finished"])
).count()
# Verfügbarkeits-Status bestimmen
is_available = len(jobs_in_period) == 0
if recent_jobs == 0:
availability_status = "optimal"
availability_icon = "🟢"
elif recent_jobs <= 2:
availability_status = "gut"
availability_icon = "🟡"
elif recent_jobs <= 5:
availability_status = "mäßig"
availability_icon = "🟠"
else:
availability_status = "hoch_belegt"
availability_icon = "🔴"
if not printer.active:
availability_status = "offline"
availability_icon = ""
# Nächste freie Zeitfenster finden
next_free_slots = []
if not is_available:
# Einfache Implementierung: Zeitfenster nach bestehenden Jobs
last_job_end = max([job.end_at for job in jobs_in_period])
next_free_slots.append({
'start': last_job_end.isoformat(),
'description': f"Frei ab {last_job_end.strftime('%H:%M')}"
})
# Belegte Zeitfenster
occupied_slots = []
for job in jobs_in_period:
occupied_slots.append({
'job_id': job.id,
'job_name': job.name,
'start': job.start_at.isoformat() if job.start_at else None,
'end': job.end_at.isoformat() if job.end_at else None,
'user_name': job.user.name if job.user else "Unbekannt",
'status': job.status
})
printer_info = {
'printer_id': printer.id,
'printer_name': printer.name,
'location': printer.location,
'model': printer.model,
'is_available': is_available,
'availability_status': availability_status,
'availability_icon': availability_icon,
'recent_jobs_24h': recent_jobs,
'jobs_in_period': len(jobs_in_period),
'occupied_slots': occupied_slots,
'next_free_slots': next_free_slots,
'status_description': {
'optimal': 'Keine kürzlichen Jobs, sofort verfügbar',
'gut': 'Wenige kürzliche Jobs, gute Verfügbarkeit',
'mäßig': 'Moderate Auslastung',
'hoch_belegt': 'Hohe Auslastung, möglicherweise Wartezeit',
'offline': 'Drucker offline oder nicht aktiv'
}.get(availability_status, availability_status)
}
availability_info.append(printer_info)
# Nach Verfügbarkeit sortieren (beste zuerst)
availability_info.sort(key=lambda x: (
not x['is_available'], # Verfügbare zuerst
x['recent_jobs_24h'], # Dann nach geringster Auslastung
x['printer_name'] # Dann alphabetisch
))
# Zusammenfassung erstellen
total_printers = len(availability_info)
available_printers = len([p for p in availability_info if p['is_available']])
optimal_printers = len([p for p in availability_info if p['availability_status'] == 'optimal'])
summary = {
'total_printers': total_printers,
'available_printers': available_printers,
'optimal_printers': optimal_printers,
'availability_rate': round((available_printers / total_printers * 100) if total_printers > 0 else 0, 1),
'period': {
'start': start_time.isoformat(),
'end': end_time.isoformat(),
'duration_hours': round((end_time - start_time).total_seconds() / 3600, 1)
}
}
logger.info(f"📊 Verfügbarkeitsabfrage: {available_printers}/{total_printers} Drucker verfügbar")
return jsonify({
'summary': summary,
'printers': availability_info
})
except Exception as e:
logger.error(f"❌ Fehler bei Verfügbarkeitsabfrage: {str(e)}", exc_info=True)
return jsonify({"error": "Fehler bei der Verfügbarkeitsanalyse"}), 500