import json from datetime import datetime, timedelta from flask import Blueprint, render_template, request, jsonify, redirect, url_for, abort, make_response from flask_login import current_user, login_required from sqlalchemy import and_, or_, func from models import Job, Printer, User, UserPermission, get_cached_session from utils.logging_config import get_logger from utils.conflict_manager import conflict_manager, ConflictType, ConflictSeverity calendar_blueprint = Blueprint('calendar', __name__) logger = get_logger("calendar") def can_edit_events(user): """Prüft, ob ein Benutzer Kalendereinträge bearbeiten darf.""" if user.is_admin: return True with get_cached_session() as db_session: permission = db_session.query(UserPermission).filter_by(user_id=user.id).first() if not permission: return False return permission.can_approve_jobs def get_smart_printer_assignment(start_date, end_date, priority="normal", db_session=None): """ Intelligente Druckerzuweisung basierend auf verschiedenen Faktoren. Args: start_date: Startzeit des Jobs end_date: Endzeit des Jobs priority: Prioritätsstufe ('urgent', 'high', 'normal', 'low') db_session: Datenbankverbindung Returns: printer_id: ID des empfohlenen Druckers oder None """ if not db_session: return None try: # Verfügbare Drucker ermitteln available_printers = db_session.query(Printer).filter( Printer.active == True ).all() if not available_printers: logger.warning("Keine aktiven Drucker für automatische Zuweisung gefunden") return None printer_scores = [] for printer in available_printers: score = 0 # 1. Verfügbarkeit prüfen - Jobs im gleichen Zeitraum conflicting_jobs = db_session.query(Job).filter( Job.printer_id == printer.id, Job.status.in_(["scheduled", "running"]), or_( and_(Job.start_at >= start_date, Job.start_at < end_date), and_(Job.end_at > start_date, Job.end_at <= end_date), and_(Job.start_at <= start_date, Job.end_at >= end_date) ) ).count() if conflicting_jobs > 0: continue # Drucker ist nicht verfügbar score += 100 # Grundpunkte für Verfügbarkeit # 2. Auslastung in den letzten 24 Stunden bewerten last_24h = datetime.now() - timedelta(hours=24) recent_jobs = db_session.query(Job).filter( Job.printer_id == printer.id, Job.start_at >= last_24h, Job.status.in_(["scheduled", "running", "finished"]) ).count() # Weniger Auslastung = höhere Punktzahl score += max(0, 50 - (recent_jobs * 10)) # 3. Prioritätsbasierte Zuweisung if priority == "urgent": # Für dringende Jobs: Express-Drucker bevorzugen if "express" in printer.name.lower() or "schnell" in printer.name.lower(): score += 30 elif priority == "high": # Für hohe Priorität: Weniger belastete Drucker if recent_jobs <= 2: score += 20 # 4. Zeitfenster-basierte Zuweisung start_hour = start_date.hour if start_hour >= 18 or start_hour <= 6: # Nachtschicht if "nacht" in printer.name.lower() or printer.location and "c" in printer.location.lower(): score += 25 elif start_hour >= 8 and start_hour <= 17: # Tagschicht if "tag" in printer.name.lower() or printer.location and "a" in printer.location.lower(): score += 15 # 5. Standort-basierte Bewertung (Round-Robin ähnlich) if printer.location: location_penalty = hash(printer.location) % 10 # Verteilung basierend auf Standort score += location_penalty # 6. Druckerdauer-Eignung job_duration_hours = (end_date - start_date).total_seconds() / 3600 if job_duration_hours > 8: # Lange Jobs if "langzeit" in printer.name.lower() or "marathon" in printer.name.lower(): score += 20 elif job_duration_hours <= 2: # Kurze Jobs if "express" in printer.name.lower() or "schnell" in printer.name.lower(): score += 15 # 7. Wartungszyklen berücksichtigen # Neuere Drucker (falls last_maintenance_date verfügbar) bevorzugen # TODO: Implementierung abhängig von Printer-Model-Erweiterungen printer_scores.append({ 'printer': printer, 'score': score, 'conflicts': conflicting_jobs, 'recent_load': recent_jobs }) # Nach Punktzahl sortieren und besten Drucker auswählen if not printer_scores: logger.warning("Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden") return None printer_scores.sort(key=lambda x: x['score'], reverse=True) best_printer = printer_scores[0] logger.info(f"Automatische Druckerzuweisung: {best_printer['printer'].name} " f"(Score: {best_printer['score']}, Load: {best_printer['recent_load']})") return best_printer['printer'].id except Exception as e: logger.error(f"Fehler bei automatischer Druckerzuweisung: {str(e)}") return None @calendar_blueprint.route('/calendar', methods=['GET']) @login_required def calendar_view(): """Kalender-Ansicht anzeigen.""" can_edit = can_edit_events(current_user) with get_cached_session() as db_session: printers = db_session.query(Printer).filter_by(active=True).all() return render_template('calendar.html', printers=printers, can_edit=can_edit) @calendar_blueprint.route('/api/calendar', methods=['GET']) @login_required def api_get_calendar_events(): """Kalendereinträge als JSON für FullCalendar zurückgeben.""" try: # Datumsbereich aus Anfrage - FullCalendar verwendet 'start' und 'end' start_str = request.args.get('start') or request.args.get('from') end_str = request.args.get('end') or request.args.get('to') if not start_str or not end_str: # Standardmäßig eine Woche anzeigen start_date = datetime.now() end_date = start_date + timedelta(days=7) else: try: # FullCalendar sendet ISO-Format mit Zeitzone, das muss geparst werden if start_str and start_str.endswith('+02:00'): start_str = start_str[:-6] # Zeitzone entfernen if end_str and end_str.endswith('+02:00'): end_str = end_str[:-6] # Zeitzone entfernen start_date = datetime.fromisoformat(start_str) end_date = datetime.fromisoformat(end_str) except ValueError: return jsonify({"error": "Ungültiges Datumsformat"}), 400 # Optional: Filter nach Druckern printer_id = request.args.get('printer_id') with get_cached_session() as db_session: # Jobs im angegebenen Zeitraum abfragen query = db_session.query(Job).filter( or_( # Jobs, die im Zeitraum beginnen and_(Job.start_at >= start_date, Job.start_at <= end_date), # Jobs, die im Zeitraum enden and_(Job.end_at >= start_date, Job.end_at <= end_date), # Jobs, die den Zeitraum komplett umfassen and_(Job.start_at <= start_date, Job.end_at >= end_date) ) ) if printer_id: query = query.filter(Job.printer_id == printer_id) jobs = query.all() # Jobs in FullCalendar-Event-Format umwandeln events = [] for job in jobs: # Farbe basierend auf Status bestimmen color = "#6B7280" # Grau für pending if job.status == "running": color = "#3B82F6" # Blau für running elif job.status == "finished": color = "#10B981" # Grün für finished elif job.status == "scheduled": color = "#10B981" # Grün für approved elif job.status == "cancelled" or job.status == "failed": color = "#EF4444" # Rot für abgebrochen/fehlgeschlagen # Benutzerinformationen laden user = db_session.query(User).filter_by(id=job.user_id).first() user_name = user.name if user else "Unbekannt" # Druckerinformationen laden printer = db_session.query(Printer).filter_by(id=job.printer_id).first() printer_name = printer.name if printer else "Unbekannt" event = { "id": job.id, "title": job.name, "start": job.start_at.isoformat(), "end": job.end_at.isoformat(), "color": color, "extendedProps": { "status": job.status, "description": job.description, "userName": user_name, "printerId": job.printer_id, "printerName": printer_name } } events.append(event) logger.info(f"📅 Kalender-Events abgerufen: {len(events)} Einträge für Zeitraum {start_date} bis {end_date}") return jsonify(events) except Exception as e: logger.error(f"Fehler beim Abrufen der Kalendereinträge: {str(e)}") return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500 # Zusätzliche Route für FullCalendar-Kompatibilität @calendar_blueprint.route('/api/calendar/events', methods=['GET']) @login_required def api_get_calendar_events_alt(): """Alternative Route für FullCalendar-Events - delegiert an api_get_calendar_events.""" return api_get_calendar_events() @calendar_blueprint.route('/api/calendar/event', methods=['POST']) @login_required def api_create_calendar_event(): """Neuen Kalendereintrag (Job) erstellen.""" # Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge erstellen if not can_edit_events(current_user): return jsonify({"error": "Keine Berechtigung zum Erstellen von Kalendereinträgen"}), 403 try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 # Pflichtfelder prüfen title = data.get('title') start = data.get('start') end = data.get('end') printer_id = data.get('printerId') # Jetzt optional priority = data.get('priority', 'normal') if not all([title, start, end]): return jsonify({"error": "Titel, Start und Ende sind erforderlich"}), 400 # Datumsfelder konvertieren try: start_date = datetime.fromisoformat(start) end_date = datetime.fromisoformat(end) except ValueError: return jsonify({"error": "Ungültiges Datumsformat"}), 400 # Dauer in Minuten berechnen duration_minutes = int((end_date - start_date).total_seconds() / 60) with get_cached_session() as db_session: # Intelligente Druckerzuweisung falls kein Drucker angegeben if not printer_id: logger.info(f"Automatische Druckerzuweisung wird verwendet für Job '{title}'") printer_id = get_smart_printer_assignment( start_date=start_date, end_date=end_date, priority=priority, db_session=db_session ) if not printer_id: return jsonify({ "error": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden. " "Bitte wählen Sie einen spezifischen Drucker oder einen anderen Zeitraum." }), 409 # Drucker prüfen/validieren printer = db_session.query(Printer).filter_by(id=printer_id).first() if not printer: return jsonify({"error": "Drucker nicht gefunden"}), 404 if not printer.active: return jsonify({"error": f"Drucker '{printer.name}' ist nicht aktiv"}), 400 # Nochmals Verfügbarkeit prüfen bei automatischer Zuweisung if not data.get('printerId'): # War automatische Zuweisung conflicting_jobs = db_session.query(Job).filter( Job.printer_id == printer_id, Job.status.in_(["scheduled", "running"]), or_( and_(Job.start_at >= start_date, Job.start_at < end_date), and_(Job.end_at > start_date, Job.end_at <= end_date), and_(Job.start_at <= start_date, Job.end_at >= end_date) ) ).first() if conflicting_jobs: return jsonify({ "error": f"Automatisch zugewiesener Drucker '{printer.name}' ist nicht mehr verfügbar. " "Bitte wählen Sie einen anderen Zeitraum oder spezifischen Drucker." }), 409 # Neuen Job erstellen job = Job( name=title, description=data.get('description', ''), user_id=current_user.id, printer_id=printer_id, start_at=start_date, end_at=end_date, status="scheduled", duration_minutes=duration_minutes, owner_id=current_user.id ) db_session.add(job) db_session.commit() assignment_type = "automatisch" if not data.get('printerId') else "manuell" logger.info(f"Neuer Kalendereintrag erstellt: ID {job.id}, Name: {title}, " f"Drucker: {printer.name} ({assignment_type} zugewiesen)") return jsonify({ "success": True, "id": job.id, "title": job.name, "start": job.start_at.isoformat(), "end": job.end_at.isoformat(), "status": job.status, "printer": { "id": printer.id, "name": printer.name, "location": printer.location, "assignment_type": assignment_type }, "message": f"Auftrag erfolgreich erstellt und {assignment_type} dem Drucker '{printer.name}' zugewiesen." }) except Exception as e: logger.error(f"Fehler beim Erstellen des Kalendereintrags: {str(e)}") return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500 @calendar_blueprint.route('/api/calendar/event/', methods=['PUT']) @login_required def api_update_calendar_event(event_id): """Kalendereintrag (Job) aktualisieren.""" # Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge bearbeiten if not can_edit_events(current_user): return jsonify({"error": "Keine Berechtigung zum Bearbeiten von Kalendereinträgen"}), 403 try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 with get_cached_session() as db_session: job = db_session.query(Job).filter_by(id=event_id).first() if not job: return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404 # Felder aktualisieren, die im Request enthalten sind if 'title' in data: job.name = data['title'] if 'description' in data: job.description = data['description'] if 'start' in data and 'end' in data: try: start_date = datetime.fromisoformat(data['start']) end_date = datetime.fromisoformat(data['end']) job.start_at = start_date job.end_at = end_date job.duration_minutes = int((end_date - start_date).total_seconds() / 60) except ValueError: return jsonify({"error": "Ungültiges Datumsformat"}), 400 if 'printerId' in data: printer = db_session.query(Printer).filter_by(id=data['printerId']).first() if not printer: return jsonify({"error": "Drucker nicht gefunden"}), 404 job.printer_id = data['printerId'] if 'status' in data: # Status nur ändern, wenn er gültig ist valid_statuses = ["scheduled", "running", "finished", "cancelled"] if data['status'] in valid_statuses: job.status = data['status'] db_session.commit() logger.info(f"Kalendereintrag aktualisiert: ID {job.id}") return jsonify({ "success": True, "id": job.id, "title": job.name, "start": job.start_at.isoformat(), "end": job.end_at.isoformat(), "status": job.status }) except Exception as e: logger.error(f"Fehler beim Aktualisieren des Kalendereintrags: {str(e)}") return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500 @calendar_blueprint.route('/api/calendar/event/', methods=['DELETE']) @login_required def api_delete_calendar_event(event_id): """Kalendereintrag (Job) löschen.""" # Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge löschen if not can_edit_events(current_user): return jsonify({"error": "Keine Berechtigung zum Löschen von Kalendereinträgen"}), 403 try: with get_cached_session() as db_session: job = db_session.query(Job).filter_by(id=event_id).first() if not job: return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404 db_session.delete(job) db_session.commit() logger.info(f"Kalendereintrag gelöscht: ID {event_id}") return jsonify({"success": True}) except Exception as e: logger.error(f"Fehler beim Löschen des Kalendereintrags: {str(e)}") return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500 @calendar_blueprint.route('/api/calendar/smart-recommendation', methods=['POST']) @login_required def api_get_smart_recommendation(): """Intelligente Druckerempfehlung für gegebenes Zeitfenster abrufen.""" try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 start = data.get('start') end = data.get('end') priority = data.get('priority', 'normal') if not all([start, end]): return jsonify({"error": "Start und Ende sind erforderlich"}), 400 # Datumsfelder konvertieren try: start_date = datetime.fromisoformat(start) end_date = datetime.fromisoformat(end) except ValueError: return jsonify({"error": "Ungültiges Datumsformat"}), 400 with get_cached_session() as db_session: # Empfohlenen Drucker ermitteln recommended_printer_id = get_smart_printer_assignment( start_date=start_date, end_date=end_date, priority=priority, db_session=db_session ) if not recommended_printer_id: return jsonify({ "success": False, "message": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden." }) # Drucker-Details abrufen printer = db_session.query(Printer).filter_by(id=recommended_printer_id).first() if not printer: return jsonify({ "success": False, "message": "Empfohlener Drucker nicht mehr verfügbar." }) # Zusätzliche Statistiken für die Empfehlung berechnen last_24h = datetime.now() - timedelta(hours=24) recent_jobs_count = db_session.query(Job).filter( Job.printer_id == printer.id, Job.start_at >= last_24h, Job.status.in_(["scheduled", "running", "finished"]) ).count() # Verfügbarkeit als Prozentsatz berechnen total_time_slots = 24 # Stunden pro Tag availability_percent = max(0, 100 - (recent_jobs_count * 4)) # Grobe Schätzung # Auslastung berechnen utilization_percent = min(100, recent_jobs_count * 8) # Grobe Schätzung # Eignung basierend auf Priorität und Zeitfenster bestimmen suitability = "Gut" if priority == "urgent" and ("express" in printer.name.lower() or "schnell" in printer.name.lower()): suitability = "Perfekt" elif priority == "high" and recent_jobs_count <= 2: suitability = "Ausgezeichnet" elif recent_jobs_count == 0: suitability = "Optimal" # Begründung generieren reason = f"Optimale Verfügbarkeit und geringe Auslastung im gewählten Zeitraum" job_duration_hours = (end_date - start_date).total_seconds() / 3600 start_hour = start_date.hour if priority == "urgent": reason = "Schnellster verfügbarer Drucker für dringende Aufträge" elif start_hour >= 18 or start_hour <= 6: reason = "Speziell für Nachtschichten optimiert" elif job_duration_hours > 8: reason = "Zuverlässig für lange Druckaufträge" elif job_duration_hours <= 2: reason = "Optimal für schnelle Druckaufträge" return jsonify({ "success": True, "recommendation": { "printer_id": printer.id, "printer_name": f"{printer.name}", "location": printer.location or "Haupthalle", "reason": reason, "availability": f"{availability_percent}%", "utilization": f"{utilization_percent}%", "suitability": suitability, "recent_jobs": recent_jobs_count, "priority_optimized": priority in ["urgent", "high"] and suitability in ["Perfekt", "Ausgezeichnet"] } }) except Exception as e: logger.error(f"Fehler beim Abrufen der intelligenten Empfehlung: {str(e)}") return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500 @calendar_blueprint.route('/api/calendar/export', methods=['GET']) @login_required def api_export_calendar(): """ Exportiert Kalenderdaten in verschiedenen Formaten (CSV, JSON, Excel). URL-Parameter: - format: csv, json, excel (Standard: csv) - start_date: ISO-Format Start-Datum (Optional) - end_date: ISO-Format End-Datum (Optional) - printer_id: Spezifischer Drucker-Filter (Optional) - status: Status-Filter (Optional) """ try: # Parameter aus Request extrahieren export_format = request.args.get('format', 'csv').lower() start_str = request.args.get('start_date') end_str = request.args.get('end_date') printer_id = request.args.get('printer_id') status_filter = request.args.get('status') # Datumsbereich bestimmen (Standard: nächste 4 Wochen) if start_str and end_str: try: start_date = datetime.fromisoformat(start_str) end_date = datetime.fromisoformat(end_str) except ValueError: return jsonify({"error": "Ungültiges Datumsformat. Verwenden Sie ISO-Format."}), 400 else: # Standard: Kommende 4 Wochen start_date = datetime.now() end_date = start_date + timedelta(days=28) with get_cached_session() as db_session: # Basis-Query für Jobs im Zeitraum query = db_session.query(Job).filter( or_( # Jobs, die im Zeitraum beginnen and_(Job.start_at >= start_date, Job.start_at <= end_date), # Jobs, die im Zeitraum enden and_(Job.end_at >= start_date, Job.end_at <= end_date), # Jobs, die den Zeitraum komplett umfassen and_(Job.start_at <= start_date, Job.end_at >= end_date) ) ) # Filter anwenden if printer_id: try: printer_id_int = int(printer_id) query = query.filter(Job.printer_id == printer_id_int) except ValueError: return jsonify({"error": "Ungültige Drucker-ID"}), 400 if status_filter: query = query.filter(Job.status == status_filter) # Jobs abrufen und sortieren jobs = query.order_by(Job.start_at.asc()).all() if not jobs: return jsonify({"message": "Keine Daten im angegebenen Zeitraum gefunden"}), 404 # Zusätzliche Daten für Export sammeln export_data = [] for job in jobs: # Benutzerinformationen user = db_session.query(User).filter_by(id=job.user_id).first() user_name = user.name if user else "Unbekannt" user_email = user.email if user else "" # Druckerinformationen printer = db_session.query(Printer).filter_by(id=job.printer_id).first() printer_name = printer.name if printer else "Unbekannt" printer_location = printer.location if printer else "" printer_model = printer.model if printer else "" # Status-Übersetzung status_mapping = { "scheduled": "Geplant", "running": "Läuft", "finished": "Abgeschlossen", "cancelled": "Abgebrochen", "failed": "Fehlgeschlagen", "pending": "Wartend" } status_german = status_mapping.get(job.status, job.status) # Priorität-Übersetzung priority_mapping = { "urgent": "Dringend", "high": "Hoch", "normal": "Standard", "low": "Niedrig" } priority_german = priority_mapping.get(job.priority if hasattr(job, 'priority') and job.priority else 'normal', 'Standard') export_entry = { "Job_ID": job.id, "Auftragsname": job.name, "Beschreibung": job.description or "", "Status": status_german, "Priorität": priority_german, "Benutzer": user_name, "Benutzer_E-Mail": user_email, "Drucker": printer_name, "Drucker_Standort": printer_location, "Drucker_Modell": printer_model, "Startzeit": job.start_at.strftime('%d.%m.%Y %H:%M:%S') if job.start_at else "", "Endzeit": job.end_at.strftime('%d.%m.%Y %H:%M:%S') if job.end_at else "", "Dauer_Minuten": job.duration_minutes or 0, "Dauer_Stunden": round((job.duration_minutes or 0) / 60, 2), "Erstellt_am": job.created_at.strftime('%d.%m.%Y %H:%M:%S') if job.created_at else "", "Abgeschlossen_am": job.completed_at.strftime('%d.%m.%Y %H:%M:%S') if job.completed_at else "", "Dateiname": job.filename if hasattr(job, 'filename') and job.filename else "", "Materialverbrauch": getattr(job, 'material_used', "") or "", "Kosten_EUR": getattr(job, 'cost', "") or "" } export_data.append(export_entry) # Format-spezifischer Export timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if export_format == 'csv': import csv import io # CSV-Export implementierung output = io.StringIO() writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL) # Header-Zeile if export_data: headers = list(export_data[0].keys()) writer.writerow(headers) # Daten-Zeilen for row in export_data: formatted_row = [] for value in row.values(): if value is None: formatted_row.append('') else: formatted_row.append(str(value)) writer.writerow(formatted_row) # Response erstellen csv_content = output.getvalue() output.close() # BOM für UTF-8 hinzufügen (Excel-Kompatibilität) csv_content = '\ufeff' + csv_content response = make_response(csv_content) response.headers['Content-Type'] = 'text/csv; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.csv"' logger.info(f"📊 CSV-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}") return response elif export_format == 'json': # JSON-Export implementierung export_metadata = { "export_info": { "erstellt_am": datetime.now().strftime('%d.%m.%Y %H:%M:%S'), "exportiert_von": current_user.username, "zeitraum_von": start_date.strftime('%d.%m.%Y'), "zeitraum_bis": end_date.strftime('%d.%m.%Y'), "anzahl_jobs": len(export_data), "filter_drucker_id": printer_id, "filter_status": status_filter }, "produktionsplan": export_data } json_content = json.dumps(export_metadata, indent=2, ensure_ascii=False) response = make_response(json_content) response.headers['Content-Type'] = 'application/json; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.json"' logger.info(f"📊 JSON-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}") return response elif export_format == 'excel': # Excel-Export implementierung try: import pandas as pd import io # DataFrame erstellen df = pd.DataFrame(export_data) # Excel-Datei in Memory erstellen output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: # Hauptdaten-Sheet df.to_excel(writer, sheet_name='Produktionsplan', index=False) # Zusammenfassung-Sheet summary_data = { 'Metrik': [ 'Gesamte Jobs', 'Geplante Jobs', 'Laufende Jobs', 'Abgeschlossene Jobs', 'Abgebrochene Jobs', 'Gesamte Produktionszeit (Stunden)', 'Durchschnittliche Job-Dauer (Stunden)', 'Anzahl verschiedener Drucker', 'Anzahl verschiedener Benutzer', 'Export erstellt am', 'Zeitraum von', 'Zeitraum bis' ], 'Wert': [ len(export_data), len([j for j in export_data if j['Status'] == 'Geplant']), len([j for j in export_data if j['Status'] == 'Läuft']), len([j for j in export_data if j['Status'] == 'Abgeschlossen']), len([j for j in export_data if j['Status'] in ['Abgebrochen', 'Fehlgeschlagen']]), round(sum([j['Dauer_Stunden'] for j in export_data]), 2), round(sum([j['Dauer_Stunden'] for j in export_data]) / len(export_data), 2) if export_data else 0, len(set([j['Drucker'] for j in export_data])), len(set([j['Benutzer'] for j in export_data])), datetime.now().strftime('%d.%m.%Y %H:%M:%S'), start_date.strftime('%d.%m.%Y'), end_date.strftime('%d.%m.%Y') ] } summary_df = pd.DataFrame(summary_data) summary_df.to_excel(writer, sheet_name='Zusammenfassung', index=False) # Auto-fit Spaltenbreite for sheet_name in writer.sheets: worksheet = writer.sheets[sheet_name] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) worksheet.column_dimensions[column_letter].width = adjusted_width output.seek(0) response = make_response(output.getvalue()) response.headers['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' response.headers['Content-Disposition'] = f'attachment; filename="schichtplan_export_{timestamp}.xlsx"' logger.info(f"📊 Excel-Export erstellt: {len(export_data)} Einträge für Benutzer {current_user.username}") return response except ImportError: return jsonify({ "error": "Excel-Export nicht verfügbar. Pandas/openpyxl-Bibliotheken fehlen. " "Verwenden Sie CSV oder JSON Export." }), 501 except Exception as e: logger.error(f"Fehler beim Excel-Export: {str(e)}") return jsonify({"error": f"Fehler beim Excel-Export: {str(e)}"}), 500 else: return jsonify({"error": f"Unbekanntes Export-Format: {export_format}. Unterstützt: csv, json, excel"}), 400 except Exception as e: logger.error(f"Fehler beim Kalender-Export: {str(e)}") return jsonify({"error": f"Fehler beim Export: {str(e)}"}), 500 @calendar_blueprint.route('/api/calendar/check-conflicts', methods=['POST']) @login_required def api_check_conflicts(): """ Prüft explizit auf Konflikte für eine geplante Reservierung und gibt detaillierte Konfliktinformationen zurück. """ try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 # Pflichtfelder prüfen required_fields = ['start_time', 'end_time'] for field in required_fields: if field not in data: return jsonify({"error": f"Feld '{field}' fehlt"}), 400 # Daten extrahieren try: start_time = datetime.fromisoformat(data['start_time']) end_time = datetime.fromisoformat(data['end_time']) printer_id = data.get('printer_id') priority = data.get('priority', 'normal') duration_minutes = int((end_time - start_time).total_seconds() / 60) except (ValueError, TypeError) as e: return jsonify({"error": f"Ungültige Datenformate: {str(e)}"}), 400 with get_cached_session() as db_session: # Job-Daten für Konfliktanalyse vorbereiten job_data = { 'printer_id': printer_id, 'start_time': start_time, 'end_time': end_time, 'priority': priority, 'duration_minutes': duration_minutes } # Konfliktanalyse durchführen conflicts = conflict_manager.detect_conflicts(job_data, db_session) # Konfliktinformationen für Response aufbereiten conflict_info = [] total_severity_score = 0 for conflict in conflicts: severity_scores = { ConflictSeverity.CRITICAL: 4, ConflictSeverity.HIGH: 3, ConflictSeverity.MEDIUM: 2, ConflictSeverity.LOW: 1, ConflictSeverity.INFO: 0 } total_severity_score += severity_scores.get(conflict.severity, 0) # Konflikthafte Jobs laden für Details conflicting_job_details = [] for job_id in conflict.conflicting_job_ids: job = db_session.query(Job).filter_by(id=job_id).first() if job: conflicting_job_details.append({ 'id': job.id, 'name': job.name, 'start_time': job.start_at.isoformat() if job.start_at else None, 'end_time': job.end_at.isoformat() if job.end_at else None, 'user_name': job.user.name if job.user else "Unbekannt" }) # Drucker-Details laden printer_info = None if conflict.affected_printer_id: printer = db_session.query(Printer).filter_by(id=conflict.affected_printer_id).first() if printer: printer_info = { 'id': printer.id, 'name': printer.name, 'location': printer.location, 'status': printer.status, 'active': printer.active } conflict_detail = { 'type': conflict.conflict_type.value, 'severity': conflict.severity.value, 'description': conflict.description, 'estimated_impact': conflict.estimated_impact, 'auto_resolvable': conflict.auto_resolvable, 'conflict_start': conflict.conflict_start.isoformat() if conflict.conflict_start else None, 'conflict_end': conflict.conflict_end.isoformat() if conflict.conflict_end else None, 'affected_printer': printer_info, 'conflicting_jobs': conflicting_job_details, 'suggested_solutions': conflict.suggested_solutions } conflict_info.append(conflict_detail) # Gesamtbewertung has_conflicts = len(conflicts) > 0 can_proceed = all(c.auto_resolvable for c in conflicts) or len(conflicts) == 0 # Empfehlungen basierend auf Konflikten recommendations = [] if has_conflicts: if can_proceed: recommendations.append({ 'type': 'auto_resolve', 'message': 'Konflikte können automatisch gelöst werden', 'action': 'Automatische Lösung anwenden' }) else: recommendations.append({ 'type': 'manual_intervention', 'message': 'Manuelle Anpassung erforderlich', 'action': 'Zeitraum oder Drucker ändern' }) # Alternative Zeitfenster vorschlagen if printer_id: alternative_slots = conflict_manager._find_alternative_time_slots(job_data, db_session) if alternative_slots: slot_suggestions = [] for start, end, confidence in alternative_slots[:3]: slot_suggestions.append({ 'start_time': start.isoformat(), 'end_time': end.isoformat(), 'confidence': confidence, 'description': f"{start.strftime('%H:%M')} - {end.strftime('%H:%M')}" }) recommendations.append({ 'type': 'alternative_times', 'message': 'Alternative Zeitfenster verfügbar', 'suggestions': slot_suggestions }) # Alternative Drucker vorschlagen alternative_printers = conflict_manager._find_alternative_printers(job_data, db_session) if alternative_printers: printer_suggestions = [] for printer_id_alt, confidence in alternative_printers[:3]: printer = db_session.query(Printer).filter_by(id=printer_id_alt).first() if printer: printer_suggestions.append({ 'printer_id': printer.id, 'printer_name': printer.name, 'location': printer.location, 'confidence': confidence }) recommendations.append({ 'type': 'alternative_printers', 'message': 'Alternative Drucker verfügbar', 'suggestions': printer_suggestions }) logger.info(f"🔍 Konfliktprüfung abgeschlossen: {len(conflicts)} Konflikte, " f"Schweregrad: {total_severity_score}, Automatisch lösbar: {can_proceed}") return jsonify({ 'has_conflicts': has_conflicts, 'can_proceed': can_proceed, 'severity_score': total_severity_score, 'conflict_count': len(conflicts), 'conflicts': conflict_info, 'recommendations': recommendations, 'summary': { 'critical_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.CRITICAL]), 'high_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.HIGH]), 'medium_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.MEDIUM]), 'low_conflicts': len([c for c in conflicts if c.severity == ConflictSeverity.LOW]) } }) except Exception as e: logger.error(f"❌ Fehler bei Konfliktprüfung: {str(e)}", exc_info=True) return jsonify({"error": "Fehler bei der Konfliktanalyse"}), 500 @calendar_blueprint.route('/api/calendar/resolve-conflicts', methods=['POST']) @login_required def api_resolve_conflicts(): """ Löst erkannte Konflikte automatisch und erstellt den Job mit den optimalen Parametern. """ try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 # Pflichtfelder prüfen required_fields = ['start_time', 'end_time', 'title'] for field in required_fields: if field not in data: return jsonify({"error": f"Feld '{field}' fehlt"}), 400 # Daten extrahieren try: start_time = datetime.fromisoformat(data['start_time']) end_time = datetime.fromisoformat(data['end_time']) title = data['title'] description = data.get('description', '') printer_id = data.get('printer_id') priority = data.get('priority', 'normal') duration_minutes = int((end_time - start_time).total_seconds() / 60) auto_resolve = data.get('auto_resolve', True) except (ValueError, TypeError) as e: return jsonify({"error": f"Ungültige Datenformate: {str(e)}"}), 400 with get_cached_session() as db_session: # Job-Daten für Konfliktanalyse vorbereiten job_data = { 'printer_id': printer_id, 'start_time': start_time, 'end_time': end_time, 'priority': priority, 'duration_minutes': duration_minutes, 'title': title, 'description': description } # Konfliktanalyse durchführen conflicts = conflict_manager.detect_conflicts(job_data, db_session) final_job_data = job_data.copy() resolution_messages = [] if conflicts and auto_resolve: # Konflikte automatisch lösen resolutions = conflict_manager.resolve_conflicts(conflicts, job_data, db_session) # Erste erfolgreiche Lösung anwenden successful_resolution = next((r for r in resolutions if r.success), None) if successful_resolution: # Job-Parameter basierend auf Lösung anpassen if successful_resolution.new_printer_id: final_job_data['printer_id'] = successful_resolution.new_printer_id if successful_resolution.new_start_time: final_job_data['start_time'] = successful_resolution.new_start_time if successful_resolution.new_end_time: final_job_data['end_time'] = successful_resolution.new_end_time resolution_messages.append(successful_resolution.message) logger.info(f"🔧 Konflikt automatisch gelöst: {successful_resolution.strategy_used.value}") else: return jsonify({ "error": "Konflikte können nicht automatisch gelöst werden", "conflicts": [c.description for c in conflicts], "requires_manual_intervention": True }), 409 elif conflicts and not auto_resolve: return jsonify({ "error": "Konflikte erkannt - automatische Lösung deaktiviert", "conflicts": [c.description for c in conflicts], "suggestions": [s for c in conflicts for s in c.suggested_solutions] }), 409 # Finalen Drucker ermitteln if not final_job_data.get('printer_id'): # Intelligente Druckerzuweisung printer_id = get_smart_printer_assignment( start_date=final_job_data['start_time'], end_date=final_job_data['end_time'], priority=priority, db_session=db_session ) if not printer_id: return jsonify({ "error": "Keine verfügbaren Drucker für den gewünschten Zeitraum gefunden" }), 409 final_job_data['printer_id'] = printer_id resolution_messages.append("Drucker automatisch zugewiesen") # Drucker validieren printer = db_session.query(Printer).filter_by(id=final_job_data['printer_id']).first() if not printer: return jsonify({"error": "Zugewiesener Drucker nicht gefunden"}), 404 if not printer.active: return jsonify({"error": f"Drucker '{printer.name}' ist nicht aktiv"}), 400 # Job erstellen job = Job( name=title, description=description, user_id=current_user.id, printer_id=final_job_data['printer_id'], start_at=final_job_data['start_time'], end_at=final_job_data['end_time'], status="scheduled", duration_minutes=duration_minutes, owner_id=current_user.id ) db_session.add(job) db_session.commit() assignment_type = "automatisch mit Konfliktlösung" if conflicts else "automatisch" logger.info(f"✅ Job mit Konfliktlösung erstellt: ID {job.id}, " f"Drucker: {printer.name} ({assignment_type})") return jsonify({ "success": True, "job": { "id": job.id, "title": job.name, "start": job.start_at.isoformat(), "end": job.end_at.isoformat(), "status": job.status }, "printer": { "id": printer.id, "name": printer.name, "location": printer.location }, "conflict_resolution": { "conflicts_detected": len(conflicts), "conflicts_resolved": len([r for r in (resolutions if conflicts else []) if r.success]), "messages": resolution_messages, "assignment_type": assignment_type } }) except Exception as e: logger.error(f"❌ Fehler bei Konfliktlösung und Job-Erstellung: {str(e)}", exc_info=True) return jsonify({"error": "Fehler bei der Verarbeitung"}), 500 @calendar_blueprint.route('/api/calendar/printer-availability', methods=['GET']) @login_required def api_printer_availability(): """ Zeigt detaillierte Verfügbarkeit aller Drucker für einen Zeitraum an. """ try: # Parameter extrahieren start_str = request.args.get('start') end_str = request.args.get('end') if not start_str or not end_str: return jsonify({"error": "Start- und Endzeit erforderlich"}), 400 try: start_time = datetime.fromisoformat(start_str) end_time = datetime.fromisoformat(end_str) except ValueError: return jsonify({"error": "Ungültiges Datumsformat"}), 400 with get_cached_session() as db_session: # Alle aktiven Drucker laden printers = db_session.query(Printer).filter_by(active=True).all() availability_info = [] for printer in printers: # Jobs im Zeitraum finden jobs_in_period = db_session.query(Job).filter( Job.printer_id == printer.id, Job.status.in_(["scheduled", "running"]), or_( and_(Job.start_at >= start_time, Job.start_at < end_time), and_(Job.end_at > start_time, Job.end_at <= end_time), and_(Job.start_at <= start_time, Job.end_at >= end_time) ) ).all() # Auslastung der letzten 24 Stunden last_24h = datetime.now() - timedelta(hours=24) recent_jobs = db_session.query(Job).filter( Job.printer_id == printer.id, Job.start_at >= last_24h, Job.status.in_(["scheduled", "running", "finished"]) ).count() # Verfügbarkeits-Status bestimmen is_available = len(jobs_in_period) == 0 if recent_jobs == 0: availability_status = "optimal" availability_icon = "🟢" elif recent_jobs <= 2: availability_status = "gut" availability_icon = "🟡" elif recent_jobs <= 5: availability_status = "mäßig" availability_icon = "🟠" else: availability_status = "hoch_belegt" availability_icon = "🔴" if not printer.active: availability_status = "offline" availability_icon = "⚫" # Nächste freie Zeitfenster finden next_free_slots = [] if not is_available: # Einfache Implementierung: Zeitfenster nach bestehenden Jobs last_job_end = max([job.end_at for job in jobs_in_period]) next_free_slots.append({ 'start': last_job_end.isoformat(), 'description': f"Frei ab {last_job_end.strftime('%H:%M')}" }) # Belegte Zeitfenster occupied_slots = [] for job in jobs_in_period: occupied_slots.append({ 'job_id': job.id, 'job_name': job.name, 'start': job.start_at.isoformat() if job.start_at else None, 'end': job.end_at.isoformat() if job.end_at else None, 'user_name': job.user.name if job.user else "Unbekannt", 'status': job.status }) printer_info = { 'printer_id': printer.id, 'printer_name': printer.name, 'location': printer.location, 'model': printer.model, 'is_available': is_available, 'availability_status': availability_status, 'availability_icon': availability_icon, 'recent_jobs_24h': recent_jobs, 'jobs_in_period': len(jobs_in_period), 'occupied_slots': occupied_slots, 'next_free_slots': next_free_slots, 'status_description': { 'optimal': 'Keine kürzlichen Jobs, sofort verfügbar', 'gut': 'Wenige kürzliche Jobs, gute Verfügbarkeit', 'mäßig': 'Moderate Auslastung', 'hoch_belegt': 'Hohe Auslastung, möglicherweise Wartezeit', 'offline': 'Drucker offline oder nicht aktiv' }.get(availability_status, availability_status) } availability_info.append(printer_info) # Nach Verfügbarkeit sortieren (beste zuerst) availability_info.sort(key=lambda x: ( not x['is_available'], # Verfügbare zuerst x['recent_jobs_24h'], # Dann nach geringster Auslastung x['printer_name'] # Dann alphabetisch )) # Zusammenfassung erstellen total_printers = len(availability_info) available_printers = len([p for p in availability_info if p['is_available']]) optimal_printers = len([p for p in availability_info if p['availability_status'] == 'optimal']) summary = { 'total_printers': total_printers, 'available_printers': available_printers, 'optimal_printers': optimal_printers, 'availability_rate': round((available_printers / total_printers * 100) if total_printers > 0 else 0, 1), 'period': { 'start': start_time.isoformat(), 'end': end_time.isoformat(), 'duration_hours': round((end_time - start_time).total_seconds() / 3600, 1) } } logger.info(f"📊 Verfügbarkeitsabfrage: {available_printers}/{total_printers} Drucker verfügbar") return jsonify({ 'summary': summary, 'printers': availability_info }) except Exception as e: logger.error(f"❌ Fehler bei Verfügbarkeitsabfrage: {str(e)}", exc_info=True) return jsonify({"error": "Fehler bei der Verfügbarkeitsanalyse"}), 500