841 lines
37 KiB
Python

import json
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, abort, make_response
from flask_login import current_user, login_required
from sqlalchemy import and_, or_, func
from models import Job, Printer, User, UserPermission, get_cached_session
from utils.logging_config import get_logger
calendar_blueprint = Blueprint('calendar', __name__)
logger = get_logger("calendar")
def can_edit_events(user):
"""Prüft, ob ein Benutzer Kalendereinträge bearbeiten darf."""
if user.is_admin:
return True
with get_cached_session() as db_session:
permission = db_session.query(UserPermission).filter_by(user_id=user.id).first()
if not permission:
return False
return permission.can_approve_jobs
def get_smart_printer_assignment(start_date, end_date, priority="normal", db_session=None):
"""
Intelligente Druckerzuweisung basierend auf verschiedenen Faktoren.
Args:
start_date: Startzeit des Jobs
end_date: Endzeit des Jobs
priority: Prioritätsstufe ('urgent', 'high', 'normal', 'low')
db_session: Datenbankverbindung
Returns:
printer_id: ID des empfohlenen Druckers oder None
"""
if not db_session:
return None
try:
# Verfügbare Drucker ermitteln
available_printers = db_session.query(Printer).filter(
Printer.active == True
).all()
if not available_printers:
logger.warning("Keine aktiven Drucker für automatische Zuweisung gefunden")
return None
printer_scores = []
for printer in available_printers:
score = 0
# 1. Verfügbarkeit prüfen - Jobs im gleichen Zeitraum
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_date, Job.start_at < end_date),
and_(Job.end_at > start_date, Job.end_at <= end_date),
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
).count()
if conflicting_jobs > 0:
continue # Drucker ist nicht verfügbar
score += 100 # Grundpunkte für Verfügbarkeit
# 2. Auslastung in den letzten 24 Stunden bewerten
last_24h = datetime.now() - timedelta(hours=24)
recent_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.start_at >= last_24h,
Job.status.in_(["scheduled", "running", "finished"])
).count()
# Weniger Auslastung = höhere Punktzahl
score += max(0, 50 - (recent_jobs * 10))
# 3. Prioritätsbasierte Zuweisung
if priority == "urgent":
# Für dringende Jobs: Express-Drucker bevorzugen
if "express" in printer.name.lower() or "schnell" in printer.name.lower():
score += 30
elif priority == "high":
# Für hohe Priorität: Weniger belastete Drucker
if recent_jobs <= 2:
score += 20
# 4. Zeitfenster-basierte Zuweisung
start_hour = start_date.hour
if start_hour >= 18 or start_hour <= 6: # Nachtschicht
if "nacht" in printer.name.lower() or printer.location and "c" in printer.location.lower():
score += 25
elif start_hour >= 8 and start_hour <= 17: # Tagschicht
if "tag" in printer.name.lower() or printer.location and "a" in printer.location.lower():
score += 15
# 5. Standort-basierte Bewertung (Round-Robin ähnlich)
if printer.location:
location_penalty = hash(printer.location) % 10 # Verteilung basierend auf Standort
score += location_penalty
# 6. Druckerdauer-Eignung
job_duration_hours = (end_date - start_date).total_seconds() / 3600
if job_duration_hours > 8: # Lange Jobs
if "langzeit" in printer.name.lower() or "marathon" in printer.name.lower():
score += 20
elif job_duration_hours <= 2: # Kurze Jobs
if "express" in printer.name.lower() or "schnell" in printer.name.lower():
score += 15
# 7. Wartungszyklen berücksichtigen
# Neuere Drucker (falls last_maintenance_date verfügbar) bevorzugen
# TODO: Implementierung abhängig von Printer-Model-Erweiterungen
printer_scores.append({
'printer': printer,
'score': score,
'conflicts': conflicting_jobs,
'recent_load': recent_jobs
})
# Nach Punktzahl sortieren und besten Drucker auswählen
if not printer_scores:
logger.warning("Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden")
return None
printer_scores.sort(key=lambda x: x['score'], reverse=True)
best_printer = printer_scores[0]
logger.info(f"Automatische Druckerzuweisung: {best_printer['printer'].name} "
f"(Score: {best_printer['score']}, Load: {best_printer['recent_load']})")
return best_printer['printer'].id
except Exception as e:
logger.error(f"Fehler bei automatischer Druckerzuweisung: {str(e)}")
return None
@calendar_blueprint.route('/calendar', methods=['GET'])
@login_required
def calendar_view():
"""Kalender-Ansicht anzeigen."""
can_edit = can_edit_events(current_user)
with get_cached_session() as db_session:
printers = db_session.query(Printer).filter_by(active=True).all()
return render_template('calendar.html',
printers=printers,
can_edit=can_edit)
@calendar_blueprint.route('/api/calendar', methods=['GET'])
@login_required
def api_get_calendar_events():
"""Kalendereinträge als JSON für FullCalendar zurückgeben."""
try:
# Datumsbereich aus Anfrage - FullCalendar verwendet 'start' und 'end'
start_str = request.args.get('start') or request.args.get('from')
end_str = request.args.get('end') or request.args.get('to')
if not start_str or not end_str:
# Standardmäßig eine Woche anzeigen
start_date = datetime.now()
end_date = start_date + timedelta(days=7)
else:
try:
# FullCalendar sendet ISO-Format mit Zeitzone, das muss geparst werden
if start_str and start_str.endswith('+02:00'):
start_str = start_str[:-6] # Zeitzone entfernen
if end_str and end_str.endswith('+02:00'):
end_str = end_str[:-6] # Zeitzone entfernen
start_date = datetime.fromisoformat(start_str)
end_date = datetime.fromisoformat(end_str)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
# Optional: Filter nach Druckern
printer_id = request.args.get('printer_id')
with get_cached_session() as db_session:
# Jobs im angegebenen Zeitraum abfragen
query = db_session.query(Job).filter(
or_(
# Jobs, die im Zeitraum beginnen
and_(Job.start_at >= start_date, Job.start_at <= end_date),
# Jobs, die im Zeitraum enden
and_(Job.end_at >= start_date, Job.end_at <= end_date),
# Jobs, die den Zeitraum komplett umfassen
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
)
if printer_id:
query = query.filter(Job.printer_id == printer_id)
jobs = query.all()
# Jobs in FullCalendar-Event-Format umwandeln
events = []
for job in jobs:
# Farbe basierend auf Status bestimmen
color = "#6B7280" # Grau für pending
if job.status == "running":
color = "#3B82F6" # Blau für running
elif job.status == "finished":
color = "#10B981" # Grün für finished
elif job.status == "scheduled":
color = "#10B981" # Grün für approved
elif job.status == "cancelled" or job.status == "failed":
color = "#EF4444" # Rot für abgebrochen/fehlgeschlagen
# Benutzerinformationen laden
user = db_session.query(User).filter_by(id=job.user_id).first()
user_name = user.name if user else "Unbekannt"
# Druckerinformationen laden
printer = db_session.query(Printer).filter_by(id=job.printer_id).first()
printer_name = printer.name if printer else "Unbekannt"
event = {
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"color": color,
"extendedProps": {
"status": job.status,
"description": job.description,
"userName": user_name,
"printerId": job.printer_id,
"printerName": printer_name
}
}
events.append(event)
logger.info(f"📅 Kalender-Events abgerufen: {len(events)} Einträge für Zeitraum {start_date} bis {end_date}")
return jsonify(events)
except Exception as e:
logger.error(f"Fehler beim Abrufen der Kalendereinträge: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
# Zusätzliche Route für FullCalendar-Kompatibilität
@calendar_blueprint.route('/api/calendar/events', methods=['GET'])
@login_required
def api_get_calendar_events_alt():
"""Alternative Route für FullCalendar-Events - delegiert an api_get_calendar_events."""
return api_get_calendar_events()
@calendar_blueprint.route('/api/calendar/event', methods=['POST'])
@login_required
def api_create_calendar_event():
"""Neuen Kalendereintrag (Job) erstellen."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge erstellen
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Erstellen von Kalendereinträgen"}), 403
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
# Pflichtfelder prüfen
title = data.get('title')
start = data.get('start')
end = data.get('end')
printer_id = data.get('printerId') # Jetzt optional
priority = data.get('priority', 'normal')
if not all([title, start, end]):
return jsonify({"error": "Titel, Start und Ende sind erforderlich"}), 400
# Datumsfelder konvertieren
try:
start_date = datetime.fromisoformat(start)
end_date = datetime.fromisoformat(end)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
# Dauer in Minuten berechnen
duration_minutes = int((end_date - start_date).total_seconds() / 60)
with get_cached_session() as db_session:
# Intelligente Druckerzuweisung falls kein Drucker angegeben
if not printer_id:
logger.info(f"Automatische Druckerzuweisung wird verwendet für Job '{title}'")
printer_id = get_smart_printer_assignment(
start_date=start_date,
end_date=end_date,
priority=priority,
db_session=db_session
)
if not printer_id:
return jsonify({
"error": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden. "
"Bitte wählen Sie einen spezifischen Drucker oder einen anderen Zeitraum."
}), 409
# Drucker prüfen/validieren
printer = db_session.query(Printer).filter_by(id=printer_id).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
if not printer.active:
return jsonify({"error": f"Drucker '{printer.name}' ist nicht aktiv"}), 400
# Nochmals Verfügbarkeit prüfen bei automatischer Zuweisung
if not data.get('printerId'): # War automatische Zuweisung
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_date, Job.start_at < end_date),
and_(Job.end_at > start_date, Job.end_at <= end_date),
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
).first()
if conflicting_jobs:
return jsonify({
"error": f"Automatisch zugewiesener Drucker '{printer.name}' ist nicht mehr verfügbar. "
"Bitte wählen Sie einen anderen Zeitraum oder spezifischen Drucker."
}), 409
# Neuen Job erstellen
job = Job(
name=title,
description=data.get('description', ''),
user_id=current_user.id,
printer_id=printer_id,
start_at=start_date,
end_at=end_date,
status="scheduled",
duration_minutes=duration_minutes,
owner_id=current_user.id
)
db_session.add(job)
db_session.commit()
assignment_type = "automatisch" if not data.get('printerId') else "manuell"
logger.info(f"Neuer Kalendereintrag erstellt: ID {job.id}, Name: {title}, "
f"Drucker: {printer.name} ({assignment_type} zugewiesen)")
return jsonify({
"success": True,
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status,
"printer": {
"id": printer.id,
"name": printer.name,
"location": printer.location,
"assignment_type": assignment_type
},
"message": f"Auftrag erfolgreich erstellt und {assignment_type} dem Drucker '{printer.name}' zugewiesen."
})
except Exception as e:
logger.error(f"Fehler beim Erstellen des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event/<int:event_id>', methods=['PUT'])
@login_required
def api_update_calendar_event(event_id):
"""Kalendereintrag (Job) aktualisieren."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge bearbeiten
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Bearbeiten von Kalendereinträgen"}), 403
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
with get_cached_session() as db_session:
job = db_session.query(Job).filter_by(id=event_id).first()
if not job:
return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404
# Felder aktualisieren, die im Request enthalten sind
if 'title' in data:
job.name = data['title']
if 'description' in data:
job.description = data['description']
if 'start' in data and 'end' in data:
try:
start_date = datetime.fromisoformat(data['start'])
end_date = datetime.fromisoformat(data['end'])
job.start_at = start_date
job.end_at = end_date
job.duration_minutes = int((end_date - start_date).total_seconds() / 60)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
if 'printerId' in data:
printer = db_session.query(Printer).filter_by(id=data['printerId']).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
job.printer_id = data['printerId']
if 'status' in data:
# Status nur ändern, wenn er gültig ist
valid_statuses = ["scheduled", "running", "finished", "cancelled"]
if data['status'] in valid_statuses:
job.status = data['status']
db_session.commit()
logger.info(f"Kalendereintrag aktualisiert: ID {job.id}")
return jsonify({
"success": True,
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status
})
except Exception as e:
logger.error(f"Fehler beim Aktualisieren des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event/<int:event_id>', methods=['DELETE'])
@login_required
def api_delete_calendar_event(event_id):
"""Kalendereintrag (Job) löschen."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge löschen
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Löschen von Kalendereinträgen"}), 403
try:
with get_cached_session() as db_session:
job = db_session.query(Job).filter_by(id=event_id).first()
if not job:
return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404
db_session.delete(job)
db_session.commit()
logger.info(f"Kalendereintrag gelöscht: ID {event_id}")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Fehler beim Löschen des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/smart-recommendation', methods=['POST'])
@login_required
def api_get_smart_recommendation():
"""Intelligente Druckerempfehlung für gegebenes Zeitfenster abrufen."""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
start = data.get('start')
end = data.get('end')
priority = data.get('priority', 'normal')
if not all([start, end]):
return jsonify({"error": "Start und Ende sind erforderlich"}), 400
# Datumsfelder konvertieren
try:
start_date = datetime.fromisoformat(start)
end_date = datetime.fromisoformat(end)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
with get_cached_session() as db_session:
# Empfohlenen Drucker ermitteln
recommended_printer_id = get_smart_printer_assignment(
start_date=start_date,
end_date=end_date,
priority=priority,
db_session=db_session
)
if not recommended_printer_id:
return jsonify({
"success": False,
"message": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden."
})
# Drucker-Details abrufen
printer = db_session.query(Printer).filter_by(id=recommended_printer_id).first()
if not printer:
return jsonify({
"success": False,
"message": "Empfohlener Drucker nicht mehr verfügbar."
})
# Zusätzliche Statistiken für die Empfehlung berechnen
last_24h = datetime.now() - timedelta(hours=24)
recent_jobs_count = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.start_at >= last_24h,
Job.status.in_(["scheduled", "running", "finished"])
).count()
# Verfügbarkeit als Prozentsatz berechnen
total_time_slots = 24 # Stunden pro Tag
availability_percent = max(0, 100 - (recent_jobs_count * 4)) # Grobe Schätzung
# Auslastung berechnen
utilization_percent = min(100, recent_jobs_count * 8) # Grobe Schätzung
# Eignung basierend auf Priorität und Zeitfenster bestimmen
suitability = "Gut"
if priority == "urgent" and ("express" in printer.name.lower() or "schnell" in printer.name.lower()):
suitability = "Perfekt"
elif priority == "high" and recent_jobs_count <= 2:
suitability = "Ausgezeichnet"
elif recent_jobs_count == 0:
suitability = "Optimal"
# Begründung generieren
reason = f"Optimale Verfügbarkeit und geringe Auslastung im gewählten Zeitraum"
job_duration_hours = (end_date - start_date).total_seconds() / 3600
start_hour = start_date.hour
if priority == "urgent":
reason = "Schnellster verfügbarer Drucker für dringende Aufträge"
elif start_hour >= 18 or start_hour <= 6:
reason = "Speziell für Nachtschichten optimiert"
elif job_duration_hours > 8:
reason = "Zuverlässig für lange Druckaufträge"
elif job_duration_hours <= 2:
reason = "Optimal für schnelle Druckaufträge"
return jsonify({
"success": True,
"recommendation": {
"printer_id": printer.id,
"printer_name": f"{printer.name}",
"location": printer.location or "Haupthalle",
"reason": reason,
"availability": f"{availability_percent}%",
"utilization": f"{utilization_percent}%",
"suitability": suitability,
"recent_jobs": recent_jobs_count,
"priority_optimized": priority in ["urgent", "high"] and suitability in ["Perfekt", "Ausgezeichnet"]
}
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der intelligenten Empfehlung: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/export', methods=['GET'])
@login_required
def api_export_calendar():
"""
Exportiert Kalenderdaten in verschiedenen Formaten (CSV, JSON, Excel).
URL-Parameter:
- format: csv, json, excel (Standard: csv)
- start_date: ISO-Format Start-Datum (Optional)
- end_date: ISO-Format End-Datum (Optional)
- printer_id: Spezifischer Drucker-Filter (Optional)
- status: Status-Filter (Optional)
"""
try:
# Parameter aus Request extrahieren
export_format = request.args.get('format', 'csv').lower()
start_str = request.args.get('start_date')
end_str = request.args.get('end_date')
printer_id = request.args.get('printer_id')
status_filter = request.args.get('status')
# Datumsbereich bestimmen (Standard: nächste 4 Wochen)
if start_str and end_str:
try:
start_date = datetime.fromisoformat(start_str)
end_date = datetime.fromisoformat(end_str)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat. Verwenden Sie ISO-Format."}), 400
else:
# Standard: Kommende 4 Wochen
start_date = datetime.now()
end_date = start_date + timedelta(days=28)
with get_cached_session() as db_session:
# Basis-Query für Jobs im Zeitraum
query = db_session.query(Job).filter(
or_(
# Jobs, die im Zeitraum beginnen
and_(Job.start_at >= start_date, Job.start_at <= end_date),
# Jobs, die im Zeitraum enden
and_(Job.end_at >= start_date, Job.end_at <= end_date),
# Jobs, die den Zeitraum komplett umfassen
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
)
# Filter anwenden
if printer_id:
try:
printer_id_int = int(printer_id)
query = query.filter(Job.printer_id == printer_id_int)
except ValueError:
return jsonify({"error": "Ungültige Drucker-ID"}), 400
if status_filter:
query = query.filter(Job.status == status_filter)
# Jobs abrufen und sortieren
jobs = query.order_by(Job.start_at.asc()).all()
if not jobs:
return jsonify({"message": "Keine Daten im angegebenen Zeitraum gefunden"}), 404
# Zusätzliche Daten für Export sammeln
export_data = []
for job in jobs:
# Benutzerinformationen
user = db_session.query(User).filter_by(id=job.user_id).first()
user_name = user.name if user else "Unbekannt"
user_email = user.email if user else ""
# Druckerinformationen
printer = db_session.query(Printer).filter_by(id=job.printer_id).first()
printer_name = printer.name if printer else "Unbekannt"
printer_location = printer.location if printer else ""
printer_model = printer.model if printer else ""
# Status-Übersetzung
status_mapping = {
"scheduled": "Geplant",
"running": "Läuft",
"finished": "Abgeschlossen",
"cancelled": "Abgebrochen",
"failed": "Fehlgeschlagen",
"pending": "Wartend"
}
status_german = status_mapping.get(job.status, job.status)
# Priorität-Übersetzung
priority_mapping = {
"urgent": "Dringend",
"high": "Hoch",
"normal": "Standard",
"low": "Niedrig"
}
priority_german = priority_mapping.get(job.priority if hasattr(job, 'priority') and job.priority else 'normal', 'Standard')
export_entry = {
"Job_ID": job.id,
"Auftragsname": job.name,
"Beschreibung": job.description or "",
"Status": status_german,
"Priorität": priority_german,
"Benutzer": user_name,
"Benutzer_E-Mail": user_email,
"Drucker": printer_name,
"Drucker_Standort": printer_location,
"Drucker_Modell": printer_model,
"Startzeit": job.start_at.strftime('%d.%m.%Y %H:%M:%S') if job.start_at else "",
"Endzeit": job.end_at.strftime('%d.%m.%Y %H:%M:%S') if job.end_at else "",
"Dauer_Minuten": job.duration_minutes or 0,
"Dauer_Stunden": round((job.duration_minutes or 0) / 60, 2),
"Erstellt_am": job.created_at.strftime('%d.%m.%Y %H:%M:%S') if job.created_at else "",
"Abgeschlossen_am": job.completed_at.strftime('%d.%m.%Y %H:%M:%S') if job.completed_at else "",
"Dateiname": job.filename if hasattr(job, 'filename') and job.filename else "",
"Materialverbrauch": getattr(job, 'material_used', "") or "",
"Kosten_EUR": getattr(job, 'cost', "") or ""
}
export_data.append(export_entry)
# Format-spezifischer Export
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if export_format == 'csv':
import csv
import io
# CSV-Export implementierung
output = io.StringIO()
writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL)
# Header-Zeile
if export_data:
headers = list(export_data[0].keys())
writer.writerow(headers)
# Daten-Zeilen
for row in export_data:
formatted_row = []
for value in row.values():
if value is None:
formatted_row.append('')
else:
formatted_row.append(str(value))
writer.writerow(formatted_row)
# Response erstellen
csv_content = output.getvalue()
output.close()
# BOM für UTF-8 hinzufügen (Excel-Kompatibilität)
csv_content = '\ufeff' + csv_content
response = make_response(csv_content)
response.headers['Content-Type'] = 'text/csv; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.csv"'
logger.info(f"📊 CSV-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}")
return response
elif export_format == 'json':
# JSON-Export implementierung
export_metadata = {
"export_info": {
"erstellt_am": datetime.now().strftime('%d.%m.%Y %H:%M:%S'),
"exportiert_von": current_user.username,
"zeitraum_von": start_date.strftime('%d.%m.%Y'),
"zeitraum_bis": end_date.strftime('%d.%m.%Y'),
"anzahl_jobs": len(export_data),
"filter_drucker_id": printer_id,
"filter_status": status_filter
},
"produktionsplan": export_data
}
json_content = json.dumps(export_metadata, indent=2, ensure_ascii=False)
response = make_response(json_content)
response.headers['Content-Type'] = 'application/json; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.json"'
logger.info(f"📊 JSON-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}")
return response
elif export_format == 'excel':
# Excel-Export implementierung
try:
import pandas as pd
import io
# DataFrame erstellen
df = pd.DataFrame(export_data)
# Excel-Datei in Memory erstellen
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
# Hauptdaten-Sheet
df.to_excel(writer, sheet_name='Produktionsplan', index=False)
# Zusammenfassung-Sheet
summary_data = {
'Metrik': [
'Gesamte Jobs',
'Geplante Jobs',
'Laufende Jobs',
'Abgeschlossene Jobs',
'Abgebrochene Jobs',
'Gesamte Produktionszeit (Stunden)',
'Durchschnittliche Job-Dauer (Stunden)',
'Anzahl verschiedener Drucker',
'Anzahl verschiedener Benutzer',
'Export erstellt am',
'Zeitraum von',
'Zeitraum bis'
],
'Wert': [
len(export_data),
len([j for j in export_data if j['Status'] == 'Geplant']),
len([j for j in export_data if j['Status'] == 'Läuft']),
len([j for j in export_data if j['Status'] == 'Abgeschlossen']),
len([j for j in export_data if j['Status'] in ['Abgebrochen', 'Fehlgeschlagen']]),
round(sum([j['Dauer_Stunden'] for j in export_data]), 2),
round(sum([j['Dauer_Stunden'] for j in export_data]) / len(export_data), 2) if export_data else 0,
len(set([j['Drucker'] for j in export_data])),
len(set([j['Benutzer'] for j in export_data])),
datetime.now().strftime('%d.%m.%Y %H:%M:%S'),
start_date.strftime('%d.%m.%Y'),
end_date.strftime('%d.%m.%Y')
]
}
summary_df = pd.DataFrame(summary_data)
summary_df.to_excel(writer, sheet_name='Zusammenfassung', index=False)
# Auto-fit Spaltenbreite
for sheet_name in writer.sheets:
worksheet = writer.sheets[sheet_name]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
response = make_response(output.getvalue())
response.headers['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.xlsx"'
logger.info(f"📊 Excel-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}")
return response
except ImportError:
return jsonify({
"error": "Excel-Export nicht verfügbar. Pandas/openpyxl-Bibliotheken fehlen. "
"Verwenden Sie CSV oder JSON Export."
}), 501
except Exception as e:
logger.error(f"Fehler beim Excel-Export: {str(e)}")
return jsonify({"error": f"Fehler beim Excel-Export: {str(e)}"}), 500
else:
return jsonify({"error": f"Unbekanntes Export-Format: {export_format}. Unterstützt: csv, json, excel"}), 400
except Exception as e:
logger.error(f"Fehler beim Kalender-Export: {str(e)}")
return jsonify({"error": f"Fehler beim Export: {str(e)}"}), 500