import os import sys import logging import threading import time import subprocess import socket import json import secrets from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from functools import wraps from typing import Optional, Dict, List, Tuple, Any, Union from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_file, Response from flask_login import LoginManager, login_user, logout_user, login_required, current_user, UserMixin from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.utils import secure_filename import sqlalchemy.exc import sqlalchemy from PyP100 import PyP110 from flask_wtf.csrf import CSRFProtect from config.settings import ( SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS, FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME, SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context, FLASK_FALLBACK_PORT, SSL_ENABLED, SSL_CERT_PATH, SSL_KEY_PATH ) from utils.logging_config import setup_logging, get_logger, log_startup_info from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin from utils.job_scheduler import scheduler from utils.template_helpers import register_template_helpers from blueprints.auth import auth_bp from blueprints.user import user_bp # Flask-App initialisieren app = Flask(__name__) app.secret_key = SECRET_KEY app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["WTF_CSRF_ENABLED"] = True # CSRF-Schutz initialisieren csrf = CSRFProtect(app) # Login-Manager initialisieren login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "auth.login" login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen." login_manager.login_message_category = "info" @login_manager.user_loader def load_user(user_id): db_session = get_db_session() user = db_session.query(User).filter(User.id == user_id).first() db_session.close() return user # Jinja2 Context Processors @app.context_processor def inject_now(): """Inject the current datetime into templates.""" return {'now': datetime.now()} # Custom Jinja2 filter für Datumsformatierung @app.template_filter('format_datetime') def format_datetime_filter(value, format='%d.%m.%Y %H:%M'): """Format a datetime object to a German-style date and time string""" if value is None: return "" if isinstance(value, str): try: value = datetime.fromisoformat(value) except ValueError: return value return value.strftime(format) # Blueprints registrieren try: from blueprints.kiosk_control import kiosk_bp app.register_blueprint(kiosk_bp) print("Kiosk-Kontrolle erfolgreich geladen") except ImportError: print("Kiosk-Kontrolle nicht verfügbar (nur im Kiosk-Modus)") # Auth-Blueprint registrieren app.register_blueprint(auth_bp) print("Auth-Blueprint erfolgreich geladen") # User-Blueprint registrieren app.register_blueprint(user_bp) print("User-Blueprint erfolgreich geladen") # Logging initialisieren setup_logging() log_startup_info() # Logger für verschiedene Komponenten app_logger = get_logger("app") auth_logger = get_logger("auth") jobs_logger = get_logger("jobs") printers_logger = get_logger("printers") # Custom decorator für Job-Besitzer-Check def job_owner_required(f): @wraps(f) def decorated_function(job_id, *args, **kwargs): db_session = get_db_session() job = db_session.query(Job).filter(Job.id == job_id).first() if not job: db_session.close() return jsonify({"error": "Job nicht gefunden"}), 404 is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id) is_admin = current_user.is_admin if not (is_owner or is_admin): db_session.close() return jsonify({"error": "Keine Berechtigung"}), 403 db_session.close() return f(job_id, *args, **kwargs) return decorated_function def check_printer_status(ip_address: str, timeout: int = 7) -> Tuple[str, bool]: """ Überprüft den Status eines Druckers über Ping mit Timeout. Args: ip_address: IP-Adresse des Druckers timeout: Timeout in Sekunden (Standard: 7) Returns: Tuple[str, bool]: (Status, Aktiv) - Status ist "online" oder "offline", Aktiv ist True/False """ if not ip_address or ip_address.strip() == "": printers_logger.debug(f"Keine IP-Adresse angegeben") return "offline", False try: # IP-Adresse validieren import ipaddress try: ipaddress.ip_address(ip_address.strip()) except ValueError: printers_logger.warning(f"Ungültige IP-Adresse: {ip_address}") return "offline", False # Windows-spezifischer Ping-Befehl mit Timeout if os.name == 'nt': # Windows cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()] else: # Unix/Linux/macOS cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()] printers_logger.debug(f"Ping-Befehl für {ip_address}: {' '.join(cmd)}") # Ping ausführen mit Timeout result = subprocess.run( cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', # Ignoriere Unicode-Fehler timeout=timeout + 2 # Zusätzlicher Timeout für subprocess ) # Erfolgreicher Ping (Return Code 0) if result.returncode == 0: printers_logger.debug(f"Ping erfolgreich für {ip_address}") return "online", True else: printers_logger.debug(f"Ping fehlgeschlagen für {ip_address} (Return Code: {result.returncode})") return "offline", False except subprocess.TimeoutExpired: printers_logger.warning(f"Ping-Timeout für Drucker {ip_address} nach {timeout} Sekunden") return "offline", False except Exception as e: printers_logger.error(f"Fehler beim Ping für Drucker {ip_address}: {str(e)}") return "offline", False def check_multiple_printers_status(printers: List[Dict], timeout: int = 7) -> Dict[int, Tuple[str, bool]]: """ Überprüft den Status mehrerer Drucker parallel mit Timeout. Args: printers: Liste von Drucker-Dictionaries mit 'id' und 'ip_address' timeout: Timeout in Sekunden pro Drucker (Standard: 7) Returns: Dict[int, Tuple[str, bool]]: Dictionary mit Drucker-ID als Key und (Status, Aktiv) als Value """ results = {} # Parallel-Ausführung mit ThreadPoolExecutor with ThreadPoolExecutor(max_workers=min(len(printers), 10)) as executor: # Futures für alle Drucker erstellen future_to_printer = { executor.submit(check_printer_status, printer.get('ip_address'), timeout): printer for printer in printers } # Ergebnisse sammeln for future in as_completed(future_to_printer, timeout=timeout + 2): printer = future_to_printer[future] try: status, active = future.result() results[printer['id']] = (status, active) printers_logger.info(f"Drucker {printer['name']} ({printer.get('ip_address')}): {status}") except Exception as e: printers_logger.error(f"Fehler bei Status-Check für Drucker {printer['name']}: {str(e)}") results[printer['id']] = ("offline", False) return results # UI-Routen @app.route("/") def index(): if current_user.is_authenticated: return render_template("index.html") return redirect(url_for("auth.login")) @app.route("/dashboard") @login_required def dashboard(): return render_template("dashboard.html") @app.route("/profile") @login_required def profile_redirect(): """Leitet zur neuen Profilseite im User-Blueprint weiter.""" return redirect(url_for("user.profile")) @app.route("/profil") @login_required def profil_redirect(): """Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL).""" return redirect(url_for("user.profile")) @app.route("/settings") @login_required def settings_redirect(): """Leitet zur neuen Einstellungsseite im User-Blueprint weiter.""" return redirect(url_for("user.settings")) @app.route("/einstellungen") @login_required def einstellungen_redirect(): """Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL).""" return redirect(url_for("user.settings")) @app.route("/admin") @login_required def admin(): """Leitet zur neuen Admin-Dashboard-Route weiter.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) return redirect(url_for("admin_page")) @app.route("/demo") @login_required def components_demo(): """Demo-Seite für UI-Komponenten""" return render_template("components_demo.html") @app.route("/printers") @login_required def printers_page(): """Zeigt die Übersichtsseite für Drucker an.""" return render_template("printers.html") @app.route("/jobs") @login_required def jobs_page(): """Zeigt die Übersichtsseite für Druckaufträge an.""" return render_template("jobs.html") @app.route("/stats") @login_required def stats_page(): """Zeigt die Statistik-Seite an.""" return render_template("stats.html") @app.route("/admin-dashboard") @login_required def admin_page(): """Zeigt die Administrationsseite an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) # Aktives Tab aus der URL auslesen oder Default-Wert verwenden active_tab = request.args.get('tab', 'users') # Daten für das Admin-Panel direkt beim Laden vorbereiten stats = {} users = [] printers = [] scheduler_status = {"running": False, "message": "Nicht verfügbar"} system_info = {"cpu": 0, "memory": 0, "disk": 0} logs = [] db_session = get_db_session() try: # Statistiken laden from sqlalchemy.orm import joinedload # Benutzeranzahl stats["total_users"] = db_session.query(User).count() # Druckeranzahl und Online-Status all_printers = db_session.query(Printer).all() stats["total_printers"] = len(all_printers) stats["online_printers"] = len([p for p in all_printers if p.status == "online"]) # Aktive Jobs und Warteschlange stats["active_jobs"] = db_session.query(Job).filter( Job.status.in_(["printing", "running"]) ).count() stats["queued_jobs"] = db_session.query(Job).filter( Job.status == "scheduled" ).count() # Erfolgsrate total_jobs = db_session.query(Job).filter( Job.status.in_(["completed", "failed", "cancelled"]) ).count() successful_jobs = db_session.query(Job).filter( Job.status == "completed" ).count() if total_jobs > 0: stats["success_rate"] = int((successful_jobs / total_jobs) * 100) else: stats["success_rate"] = 0 # Benutzer laden if active_tab == 'users': users = db_session.query(User).all() users = [user.to_dict() for user in users] # Drucker laden if active_tab == 'printers': printers = db_session.query(Printer).all() printers = [printer.to_dict() for printer in printers] # Scheduler-Status laden if active_tab == 'scheduler': from utils.scheduler import scheduler_is_running scheduler_status = { "running": scheduler_is_running(), "message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt" } # System-Informationen laden if active_tab == 'system': import psutil # CPU und Memory cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') # Uptime boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds // 86400) uptime_hours = int((uptime_seconds % 86400) // 3600) uptime_minutes = int((uptime_seconds % 3600) // 60) # Datenbank-Status db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db') db_size = 0 if os.path.exists(db_path): db_size = os.path.getsize(db_path) / (1024 * 1024) # MB # Scheduler-Status scheduler_running = False scheduler_jobs = 0 try: from utils.job_scheduler import scheduler scheduler_running = scheduler.running if hasattr(scheduler, 'get_jobs'): scheduler_jobs = len(scheduler.get_jobs()) except: pass # Nächster Job next_job = db_session.query(Job).filter( Job.status == "scheduled" ).order_by(Job.created_at.asc()).first() next_job_time = "Keine geplanten Jobs" if next_job: next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M") system_info = { "cpu_usage": round(cpu_percent, 1), "memory_usage": round(memory.percent, 1), "disk_usage": round((disk.used / disk.total) * 100, 1), "uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m", "db_size": f"{db_size:.1f} MB", "db_connections": "Aktiv", "scheduler_running": scheduler_running, "scheduler_jobs": scheduler_jobs, "next_job": next_job_time } # Logs laden if active_tab == 'logs': import os log_level = request.args.get('log_level', 'all') log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') # Logeinträge sammeln app_logs = [] for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']: log_file = os.path.join(log_dir, category, f'{category}.log') if os.path.exists(log_file): with open(log_file, 'r') as f: for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei if log_level != 'all': if log_level.upper() not in line: continue app_logs.append({ 'timestamp': line.split(' - ')[0] if ' - ' in line else '', 'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO', 'category': category, 'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line }) # Nach Zeitstempel sortieren (neueste zuerst) logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100] except Exception as e: app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}") finally: db_session.close() return render_template( "admin.html", active_tab=active_tab, stats=stats, users=users, printers=printers, scheduler_status=scheduler_status, system_info=system_info, logs=logs ) # Direkter Zugriff auf Logout-Route (für Fallback) @app.route("/logout", methods=["GET", "POST"]) def logout_redirect(): """Leitet zur Blueprint-Logout-Route weiter.""" return redirect(url_for("auth.logout")) # Job-Routen @app.route("/api/jobs", methods=["GET"]) @login_required def get_jobs(): db_session = get_db_session() try: # Import joinedload for eager loading from sqlalchemy.orm import joinedload # Admin sieht alle Jobs, User nur eigene if current_user.is_admin: # Eagerly load the user and printer relationships to avoid detached instance errors jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all() else: jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all() # Convert jobs to dictionaries before closing the session job_dicts = [job.to_dict() for job in jobs] db_session.close() return jsonify({ "jobs": job_dicts }) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}") db_session.close() return jsonify({"error": "Interner Serverfehler"}), 500 @app.route("/api/jobs/", methods=["GET"]) @login_required @job_owner_required def get_job(job_id): db_session = get_db_session() try: from sqlalchemy.orm import joinedload # Eagerly load the user and printer relationships job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first() if not job: db_session.close() return jsonify({"error": "Job nicht gefunden"}), 404 # Convert to dict before closing session job_dict = job.to_dict() db_session.close() return jsonify(job_dict) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}") db_session.close() return jsonify({"error": "Interner Serverfehler"}), 500 @app.route('/api/jobs/active', methods=['GET']) @login_required def get_active_jobs(): """ Gibt alle aktiven Jobs zurück. """ try: db_session = get_db_session() from sqlalchemy.orm import joinedload active_jobs = db_session.query(Job).options( joinedload(Job.user), joinedload(Job.printer) ).filter( Job.status.in_(["scheduled", "running"]) ).all() result = [] for job in active_jobs: job_dict = job.to_dict() # Aktuelle Restzeit berechnen if job.status == "running" and job.end_at: remaining_time = job.end_at - datetime.now() if remaining_time.total_seconds() > 0: job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60) else: job_dict["remaining_minutes"] = 0 result.append(job_dict) db_session.close() return jsonify({"jobs": result}) except Exception as e: jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 @app.route('/api/jobs', methods=['POST']) @login_required def create_job(): """ Erstellt einen neuen Job mit dem Status "scheduled". Body: { "printer_id": int, "start_iso": str, # ISO-Datum-String "duration_minutes": int } """ try: data = request.json # Pflichtfelder prüfen required_fields = ["printer_id", "start_iso", "duration_minutes"] for field in required_fields: if field not in data: return jsonify({"error": f"Feld '{field}' fehlt"}), 400 # Daten extrahieren und validieren printer_id = int(data["printer_id"]) start_iso = data["start_iso"] duration_minutes = int(data["duration_minutes"]) # Optional: Jobtitel und Dateipfad name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}") file_path = data.get("file_path") # Start-Zeit parsen try: start_at = datetime.fromisoformat(start_iso) except ValueError: return jsonify({"error": "Ungültiges Startdatum"}), 400 # Dauer validieren if duration_minutes <= 0: return jsonify({"error": "Dauer muss größer als 0 sein"}), 400 # End-Zeit berechnen end_at = start_at + timedelta(minutes=duration_minutes) db_session = get_db_session() # Prüfen, ob der Drucker existiert printer = db_session.query(Printer).get(printer_id) if not printer: db_session.close() return jsonify({"error": "Drucker nicht gefunden"}), 404 # Neuen Job erstellen new_job = Job( name=name, printer_id=printer_id, user_id=current_user.id, owner_id=current_user.id, start_at=start_at, end_at=end_at, status="scheduled", file_path=file_path, duration_minutes=duration_minutes ) db_session.add(new_job) db_session.commit() # Job-Objekt für die Antwort serialisieren job_dict = new_job.to_dict() db_session.close() jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten") return jsonify({"job": job_dict}), 201 except Exception as e: jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 @app.route('/api/jobs//extend', methods=['POST']) @login_required @job_owner_required def extend_job(job_id): """ Verlängert die Endzeit eines Jobs. Body: { "extra_minutes": int } """ try: data = request.json # Prüfen, ob die erforderlichen Daten vorhanden sind if "extra_minutes" not in data: return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400 extra_minutes = int(data["extra_minutes"]) # Validieren if extra_minutes <= 0: return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400 db_session = get_db_session() job = db_session.query(Job).get(job_id) if not job: db_session.close() return jsonify({"error": "Job nicht gefunden"}), 404 # Prüfen, ob der Job verlängert werden kann if job.status not in ["scheduled", "running"]: db_session.close() return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400 # Endzeit aktualisieren job.end_at = job.end_at + timedelta(minutes=extra_minutes) job.duration_minutes += extra_minutes db_session.commit() # Job-Objekt für die Antwort serialisieren job_dict = job.to_dict() db_session.close() jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}") return jsonify({"job": job_dict}) except Exception as e: jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 @app.route('/api/jobs//finish', methods=['POST']) @login_required def finish_job(job_id): """ Beendet einen Job manuell und schaltet die Steckdose aus. Nur für Administratoren erlaubt. """ try: # Prüfen, ob der Benutzer Administrator ist if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403 db_session = get_db_session() job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id) if not job: db_session.close() return jsonify({"error": "Job nicht gefunden"}), 404 # Prüfen, ob der Job beendet werden kann if job.status not in ["scheduled", "running"]: db_session.close() return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400 # Steckdose ausschalten from utils.job_scheduler import toggle_plug if not toggle_plug(job.printer_id, False): # Trotzdem weitermachen, aber Warnung loggen jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden") # Job als beendet markieren job.status = "finished" job.actual_end_time = datetime.now() db_session.commit() # Job-Objekt für die Antwort serialisieren job_dict = job.to_dict() db_session.close() jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}") return jsonify({"job": job_dict}) except Exception as e: jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}") return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500 @app.route("/api/printers", methods=["GET"]) @login_required def get_printers(): """Gibt alle Drucker zurück - ohne Status-Check für schnelleres Laden.""" db_session = get_db_session() try: printers = db_session.query(Printer).all() # Drucker-Liste ohne Status-Check erstellen (für schnelleres Laden) printer_list = [] for printer in printers: printer_data = printer.to_dict() # Verwende den gespeicherten Status oder setze auf "offline" als Standard printer_data["status"] = printer.status if printer.status else "offline" printer_data["active"] = printer.active if printer.active is not None else False printer_data["last_checked"] = None # Wird beim Status-Check gesetzt printer_list.append(printer_data) db_session.close() printers_logger.info(f"Drucker-Liste geladen: {len(printer_list)} Drucker") return jsonify({ "printers": printer_list }) except Exception as e: printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}") db_session.rollback() db_session.close() return jsonify({"error": "Interner Serverfehler"}), 500 # API-Routen für Statistiken @app.route("/api/stats/users", methods=["GET"]) @login_required def get_stats_users(): """Gibt die Anzahl der Benutzer zurück.""" db_session = get_db_session() try: user_count = db_session.query(User).count() db_session.close() return jsonify({"value": user_count}) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 @app.route("/api/stats/uptime", methods=["GET"]) @login_required def get_stats_uptime(): """Gibt die Systemlaufzeit zurück.""" import os with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) uptime_days = int(uptime_seconds / 86400) return jsonify({"value": f"{uptime_days} Tage"}) @app.route("/api/stats/active-jobs", methods=["GET"]) @login_required def get_stats_active_jobs(): """Gibt die Anzahl der aktiven Jobs zurück.""" db_session = get_db_session() try: active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count() db_session.close() return jsonify({"value": active_jobs}) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 @app.route("/api/stats/available-printers", methods=["GET"]) @login_required def get_stats_available_printers(): """Gibt die Anzahl der verfügbaren Drucker zurück.""" db_session = get_db_session() try: available_printers = db_session.query(Printer).filter(Printer.active == True).count() db_session.close() return jsonify({"value": available_printers}) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 @app.route("/api/stats/success-rate", methods=["GET"]) @login_required def get_stats_success_rate(): """Gibt die Erfolgsrate der Druckaufträge zurück.""" db_session = get_db_session() try: total_jobs = db_session.query(Job).filter(Job.status == "finished").count() if total_jobs == 0: success_rate = 0 else: success_jobs = db_session.query(Job).filter( Job.status == "finished", Job.actual_end_time != None ).count() success_rate = int((success_jobs / total_jobs) * 100) db_session.close() return jsonify({"value": f"{success_rate}%"}) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 @app.route("/api/stats/print-time", methods=["GET"]) @login_required def get_stats_print_time(): """Gibt die gesamte Druckzeit zurück.""" db_session = get_db_session() try: stats = db_session.query(Stats).first() if stats and stats.total_print_time: hours = stats.total_print_time // 3600 db_session.close() return jsonify({"value": f"{hours}h"}) db_session.close() return jsonify({"value": "0h"}) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 @app.route("/api/activity", methods=["GET"]) @login_required def get_activity(): """Gibt die letzten Aktivitäten zurück.""" db_session = get_db_session() try: recent_jobs = db_session.query(Job).order_by(Job.created_at.desc()).limit(5).all() activities = [ { "type": "job", "id": job.id, "title": job.name, "status": job.status, "timestamp": job.created_at.isoformat() if job.created_at else None } for job in recent_jobs ] db_session.close() return jsonify(activities) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 @app.route("/api/printers/status", methods=["GET"]) @login_required def get_printers_status(): """Gibt den Status aller Drucker zurück mit echtem Ping-Check und 7-Sekunden-Timeout.""" db_session = get_db_session() try: printers = db_session.query(Printer).all() # Drucker-Daten für Status-Check vorbereiten printer_data = [] for printer in printers: # Verwende plug_ip als primäre IP-Adresse, fallback auf ip_address ip_to_check = printer.plug_ip if printer.plug_ip else printer.ip_address printer_data.append({ 'id': printer.id, 'name': printer.name, 'ip_address': ip_to_check, 'location': printer.location }) # Status aller Drucker parallel überprüfen mit 7-Sekunden-Timeout printers_logger.info(f"Starte Status-Check für {len(printer_data)} Drucker mit 7-Sekunden-Timeout") # Fallback: Wenn keine IP-Adressen vorhanden sind, alle als offline markieren if not any(p['ip_address'] for p in printer_data): printers_logger.warning("Keine IP-Adressen für Drucker gefunden - alle als offline markiert") status_results = {p['id']: ("offline", False) for p in printer_data} else: status_results = check_multiple_printers_status(printer_data, timeout=7) # Ergebnisse zusammenstellen und Datenbank aktualisieren status_data = [] for printer in printers: if printer.id in status_results: status, active = status_results[printer.id] # Mapping für Frontend-Kompatibilität if status == "online": frontend_status = "available" else: frontend_status = "offline" else: # Fallback falls kein Ergebnis vorliegt frontend_status = "offline" active = False # Status in der Datenbank aktualisieren printer.status = frontend_status printer.active = active status_data.append({ "id": printer.id, "name": printer.name, "status": frontend_status, "active": active, "ip_address": printer.plug_ip if printer.plug_ip else printer.ip_address, "location": printer.location, "last_checked": datetime.now().isoformat() }) # Speichere die aktualisierten Status db_session.commit() db_session.close() online_count = len([s for s in status_data if s['status'] == 'available']) printers_logger.info(f"Status-Check abgeschlossen: {online_count} von {len(status_data)} Drucker online") return jsonify(status_data) except Exception as e: db_session.rollback() db_session.close() printers_logger.error(f"Fehler beim Abrufen des Drucker-Status: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/api/jobs/current", methods=["GET"]) @login_required def get_current_job(): """Gibt den aktuellen Job des Benutzers zurück.""" db_session = get_db_session() try: current_job = db_session.query(Job).filter( Job.user_id == current_user.id, Job.status.in_(["scheduled", "running"]) ).order_by(Job.start_at).first() if current_job: job_data = current_job.to_dict() else: job_data = None db_session.close() return jsonify(job_data) except Exception as e: db_session.close() return jsonify({"error": str(e)}), 500 # Admin API Endpoints @app.route("/api/users", methods=["GET"]) def get_users(): """Returns a list of all users (admin only)""" if not current_user.is_admin: return jsonify({"error": "Unauthorized"}), 403 db_session = get_db_session() try: users = db_session.query(User).all() users_list = [] for user in users: users_list.append({ "id": user.id, "email": user.email, "name": user.name, "role": user.role, "active": user.active, "created_at": user.created_at.isoformat() if user.created_at else None }) db_session.close() return jsonify({"users": users_list}) except Exception as e: db_session.close() app_logger.error(f"Error fetching users: {str(e)}") return jsonify({"error": "Failed to fetch users"}), 500 @app.route("/api/users", methods=["POST"]) @login_required def create_user(): """Create a new user (admin only)""" if not current_user.is_admin: flash("Sie haben keine Berechtigung, um neue Benutzer anzulegen.", "error") return redirect(url_for('admin_page', tab='users')) db_session = get_db_session() try: # Statt JSON-Daten die Formulardaten aus dem POST-Request holen email = request.form.get('email') name = request.form.get('name') password = request.form.get('password') role = request.form.get('role', 'user') if not email or not password: db_session.close() flash("E-Mail und Passwort sind Pflichtfelder.", "error") return redirect(url_for('admin_page', tab='users')) # Check if user with same email already exists existing_user = db_session.query(User).filter( User.email == email ).first() if existing_user: db_session.close() flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error") return redirect(url_for('admin_page', tab='users')) # Create new user new_user = User( email=email, name=name if name else "", username=email.split("@")[0], # Default username from email role=role, active=True, created_at=datetime.now() ) # Set password new_user.set_password(password) db_session.add(new_user) db_session.commit() user_id = new_user.id db_session.close() app_logger.info(f"New user created: {new_user.email} (ID: {user_id})") flash(f"Benutzer {email} wurde erfolgreich angelegt.", "success") return redirect(url_for('admin_page', tab='users')) except Exception as e: db_session.rollback() db_session.close() app_logger.error(f"Error creating user: {str(e)}") flash(f"Fehler beim Anlegen des Benutzers: {str(e)}", "error") return redirect(url_for('admin_page', tab='users')) @app.route("/api/users/", methods=["DELETE"]) @login_required def delete_user(user_id): """Delete a user (admin only)""" if not current_user.is_admin: flash("Sie haben keine Berechtigung, um Benutzer zu löschen.", "error") return redirect(url_for('admin_page', tab='users')) # Prevent admin from deleting themselves if user_id == current_user.id: flash("Sie können Ihren eigenen Account nicht löschen.", "error") return redirect(url_for('admin_page', tab='users')) db_session = get_db_session() try: user = db_session.query(User).filter(User.id == user_id).first() if not user: db_session.close() flash("Benutzer nicht gefunden.", "error") return redirect(url_for('admin_page', tab='users')) # Prevent deletion of admin users if user.role == "admin": db_session.close() flash("Administratoren können nicht gelöscht werden.", "error") return redirect(url_for('admin_page', tab='users')) email = user.email # Save for later logging db_session.delete(user) db_session.commit() db_session.close() app_logger.info(f"User deleted: {email} (ID: {user_id})") flash(f"Benutzer {email} wurde erfolgreich gelöscht.", "success") return redirect(url_for('admin_page', tab='users')) except Exception as e: db_session.rollback() db_session.close() app_logger.error(f"Error deleting user: {str(e)}") flash(f"Fehler beim Löschen des Benutzers: {str(e)}", "error") return redirect(url_for('admin_page', tab='users')) # Diese Route wurde entfernt - verwende stattdessen /api/printers/add für JSON-API @app.route("/api/printers/", methods=["DELETE"]) @login_required def delete_printer(printer_id): """Löscht einen Drucker (nur für Administratoren)""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können Drucker löschen"}), 403 db_session = get_db_session() try: printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: db_session.close() return jsonify({"error": "Drucker nicht gefunden"}), 404 printer_name = printer.name # Für Logging speichern db_session.delete(printer) db_session.commit() db_session.close() printers_logger.info(f"Drucker {printer_name} (ID: {printer_id}) wurde von {current_user.username} gelöscht") return jsonify({"success": True, "message": f"Drucker {printer_name} wurde erfolgreich gelöscht"}), 200 except Exception as e: db_session.rollback() db_session.close() printers_logger.error(f"Fehler beim Löschen des Druckers {printer_id}: {str(e)}") return jsonify({"error": f"Fehler beim Löschen des Druckers: {str(e)}"}), 500 @app.route("/api/stats", methods=["GET"]) @login_required def get_stats(): """Get overall system statistics""" if not current_user.is_admin: return jsonify({"error": "Unauthorized"}), 403 db_session = get_db_session() try: # Get basic stats stats = db_session.query(Stats).first() if not stats: # Create initial stats if none exist stats = Stats() db_session.add(stats) db_session.commit() # Count users, printers, active jobs user_count = db_session.query(User).count() printer_count = db_session.query(Printer).count() active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count() result = { "total_users": user_count, "total_printers": printer_count, "active_jobs": active_jobs, "total_print_time_hours": round(stats.total_print_time / 3600, 1) if stats.total_print_time else 0, "total_jobs_completed": stats.total_jobs_completed or 0, "total_material_used": stats.total_material_used or 0, "last_updated": stats.last_updated.isoformat() if stats.last_updated else None } db_session.close() return jsonify(result) except Exception as e: db_session.close() app_logger.error(f"Error fetching stats: {str(e)}") return jsonify({"error": "Failed to fetch statistics"}), 500 @app.route("/api/scheduler/status", methods=["GET"]) @login_required def get_scheduler_status(): """Get the current status of the job scheduler""" if not current_user.is_admin: return jsonify({"error": "Unauthorized"}), 403 try: is_running = scheduler.is_running() tasks = [] # Add information about scheduler tasks for task_id, task in scheduler.get_tasks().items(): tasks.append({ "id": task_id, "interval": task.get("interval", 0), "last_run": task.get("last_run"), "enabled": task.get("enabled", False) }) return jsonify({ "running": is_running, "tasks": tasks, "uptime": scheduler.get_uptime() }) except Exception as e: app_logger.error(f"Error fetching scheduler status: {str(e)}") return jsonify({"error": "Failed to fetch scheduler status"}), 500 @app.route("/api/scheduler/start", methods=["POST"]) @login_required def start_scheduler(): """Start the job scheduler (admin only)""" if not current_user.is_admin: flash("Sie haben keine Berechtigung, um den Scheduler zu starten.", "error") return redirect(url_for('admin_page', tab='scheduler')) try: from utils.scheduler import start_scheduler as start_scheduler_func result = start_scheduler_func() if result: app_logger.info(f"Scheduler started by admin user: {current_user.email}") flash("Der Scheduler wurde erfolgreich gestartet.", "success") else: flash("Der Scheduler konnte nicht gestartet werden oder läuft bereits.", "warning") return redirect(url_for('admin_page', tab='scheduler')) except Exception as e: app_logger.error(f"Error starting scheduler: {str(e)}") flash(f"Fehler beim Starten des Schedulers: {str(e)}", "error") return redirect(url_for('admin_page', tab='scheduler')) @app.route("/api/scheduler/stop", methods=["POST"]) @login_required def stop_scheduler(): """Stop the job scheduler (admin only)""" if not current_user.is_admin: flash("Sie haben keine Berechtigung, um den Scheduler zu stoppen.", "error") return redirect(url_for('admin_page', tab='scheduler')) try: from utils.scheduler import stop_scheduler as stop_scheduler_func result = stop_scheduler_func() if result: app_logger.info(f"Scheduler stopped by admin user: {current_user.email}") flash("Der Scheduler wurde erfolgreich gestoppt.", "success") else: flash("Der Scheduler konnte nicht gestoppt werden oder läuft nicht.", "warning") return redirect(url_for('admin_page', tab='scheduler')) except Exception as e: app_logger.error(f"Error stopping scheduler: {str(e)}") flash(f"Fehler beim Stoppen des Schedulers: {str(e)}", "error") return redirect(url_for('admin_page', tab='scheduler')) @app.route("/api/logs", methods=["GET"]) @login_required def get_logs(): """Get system logs (admin only)""" if not current_user.is_admin: return jsonify({"error": "Unauthorized"}), 403 try: # Get log type from query params log_type = request.args.get("type", "app") limit = int(request.args.get("limit", 100)) log_mapping = { "app": "logs/app/app.log", "auth": "logs/auth/auth.log", "errors": "logs/errors/errors.log", "jobs": "logs/jobs/jobs.log", "printers": "logs/printers/printers.log", "scheduler": "logs/scheduler/scheduler.log" } if log_type not in log_mapping: return jsonify({"error": "Invalid log type"}), 400 log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_mapping[log_type]) if not os.path.exists(log_path): return jsonify({"logs": [], "success": True}) logs = [] with open(log_path, "r") as f: for line in f.readlines()[-limit:]: try: # Parse log entry (format: [LEVEL] TIMESTAMP - MESSAGE) parts = line.strip().split(" - ", 1) if len(parts) == 2: header, message = parts level_timestamp = header.strip("[]").split("] [", 1) if len(level_timestamp) == 2: level, timestamp = level_timestamp logs.append({ "level": level.strip(), "timestamp": timestamp.strip(), "message": message.strip(), "source": log_type }) except Exception: # If parsing fails, add the raw line logs.append({ "level": "INFO", "timestamp": datetime.now().isoformat(), "message": line.strip(), "source": f"myp.{log_type}" }) # Sort logs by timestamp in descending order logs.sort(key=lambda x: x["timestamp"], reverse=True) return jsonify({"logs": logs, "success": True}) except Exception as e: app_logger.error(f"Error fetching logs: {str(e)}") return jsonify({"error": "Failed to fetch logs"}), 500 @app.route("/api/activity/recent", methods=["GET"]) @login_required def get_recent_activity(): """Get recent system activity""" try: # Create mock activity data (to be replaced with real data in future) activities = [ { "description": "Neuer Druckauftrag erstellt: 'Motor_Halterung_v2'", "timestamp": (datetime.now() - timedelta(minutes=15)).isoformat(), "user": "admin@example.com", "type": "job_created" }, { "description": "Drucker 'Prusa i3 MK3S' wurde neu konfiguriert", "timestamp": (datetime.now() - timedelta(hours=2)).isoformat(), "user": "admin@example.com", "type": "printer_updated" }, { "description": "Druckauftrag 'Getriebe_Prototyp' abgeschlossen", "timestamp": (datetime.now() - timedelta(hours=5)).isoformat(), "user": "user@example.com", "type": "job_completed" }, { "description": "Neuer Benutzer registriert: 'user@example.com'", "timestamp": (datetime.now() - timedelta(days=1)).isoformat(), "user": "admin@example.com", "type": "user_created" }, { "description": "Systemwartung durchgeführt", "timestamp": (datetime.now() - timedelta(days=2)).isoformat(), "user": "admin@example.com", "type": "system_maintenance" } ] # Get limit from query params limit = int(request.args.get("limit", 5)) activities = activities[:limit] return jsonify({"activities": activities}) except Exception as e: app_logger.error(f"Error fetching recent activity: {str(e)}") return jsonify({"error": "Failed to fetch recent activity"}), 500 # Service Worker Route @app.route('/sw.js') def service_worker(): """Serve the service worker script with proper headers""" response = app.send_static_file('js/sw.js') # Wichtig: Korrekte MIME-Type setzen response.headers['Content-Type'] = 'application/javascript' # Wichtig: Cache-Control Header setzen, um häufige Updates zu ermöglichen response.headers['Cache-Control'] = 'no-cache' # Service-Worker-Allowed Header setzen, um Scope-Probleme zu beheben response.headers['Service-Worker-Allowed'] = '/' return response # Fehlerbehandlung @app.errorhandler(404) def page_not_found(e): return render_template("404.html"), 404 @app.errorhandler(500) def internal_server_error(e): return render_template("500.html"), 500 # CLI-Befehle für Tailwind CSS @app.cli.group() def tailwind(): """Tailwind CSS Kommandos.""" pass @tailwind.command("build") def tailwind_build(): """Tailwind CSS für die Produktion kompilieren.""" print("Tailwind CSS wird kompiliert...") try: subprocess.run(["npx", "tailwindcss", "-i", "./static/css/input.css", "-o", "./static/css/tailwind-dark-consolidated.min.css", "--minify"], check=True) print("Tailwind CSS erfolgreich kompiliert.") except subprocess.CalledProcessError as e: print(f"Fehler beim Kompilieren von Tailwind CSS: {e}") raise @tailwind.command("watch") def tailwind_watch(): """Tailwind CSS im Watch-Modus starten.""" print("Tailwind CSS Watch-Modus wird gestartet...") try: subprocess.Popen(["npx", "tailwindcss", "-i", "./static/css/input.css", "-o", "./static/css/tailwind-dark-consolidated.min.css", "--watch"]) print("Tailwind CSS Watch-Modus gestartet. CSS wird bei Änderungen automatisch aktualisiert.") except subprocess.CalledProcessError as e: print(f"Fehler beim Starten des Tailwind CSS Watch-Modus: {e}") raise # Auto-Kompilierung beim Serverstart im Debug-Modus def compile_tailwind_if_debug(): """Kompiliert Tailwind CSS im Debug-Modus, falls notwendig.""" if FLASK_DEBUG: try: app_logger.info("Kompiliere Tailwind CSS...") # Prüfen, ob npx und Node.js verfügbar sind import platform import shutil import subprocess # Auf Windows nur fortfahren, wenn die CSS-Datei bereits existiert # oder npx verfügbar ist css_file_exists = os.path.exists("static/css/tailwind.min.css") # Prüfen, ob npx verfügbar ist npx_available = shutil.which("npx") is not None if platform.system() == "Windows" and not npx_available and not css_file_exists: app_logger.warning("npx nicht gefunden und keine CSS-Datei vorhanden. Tailwind CSS wird nicht kompiliert.") return # Tailwind CSS kompilieren if npx_available: subprocess.run([ "npx", "tailwindcss", "-i", "static/css/input.css", "-o", "static/css/tailwind.min.css", "--minify" ], check=True) app_logger.info("Tailwind CSS erfolgreich kompiliert.") elif css_file_exists: app_logger.info("Verwende existierende Tailwind CSS-Datei.") else: app_logger.warning("Tailwind konnte nicht kompiliert werden und keine CSS-Datei vorhanden.") except subprocess.CalledProcessError as e: app_logger.warning(f"Tailwind konnte nicht kompiliert werden. Möglicherweise ist npx/Node.js nicht installiert. Fehler: {e}") except Exception as e: app_logger.error(f"Fehler beim Kompilieren von Tailwind CSS: {str(e)}") # Tailwind CSS kompilieren, wenn im Debug-Modus if FLASK_DEBUG: compile_tailwind_if_debug() # Initialisierung der Datenbank beim Start def init_app(): """Initialisiert die App-Komponenten und startet den Scheduler.""" # Datenbank initialisieren try: init_database() create_initial_admin() except Exception as e: app_logger.error(f"Fehler bei der Datenbank-Initialisierung: {str(e)}") # Jinja2-Helfer registrieren register_template_helpers(app) # Tailwind im Debug-Modus kompilieren compile_tailwind_if_debug() # Scheduler starten, wenn aktiviert if SCHEDULER_ENABLED: try: # Scheduler-Task für Druckauftrags-Prüfung registrieren scheduler.register_task( "check_jobs", check_jobs, interval=SCHEDULER_INTERVAL ) # Scheduler starten scheduler.start() app_logger.info(f"Scheduler gestartet mit Intervall {SCHEDULER_INTERVAL} Sekunden.") except Exception as e: app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}") # SSL-Kontext protokollieren ssl_context = get_ssl_context() if ssl_context: app_logger.info(f"SSL aktiviert mit Zertifikat {ssl_context[0]}") else: app_logger.warning("SSL ist deaktiviert. Die Verbindung ist unverschlüsselt!") # Scheduler-Funktion zur Überprüfung der Druckaufträge def check_jobs(): """ Überprüft alle aktiven Druckaufträge und führt entsprechende Aktionen aus. Diese Funktion wird vom Scheduler regelmäßig aufgerufen. """ app_logger.info("Überprüfe Druckaufträge...") try: db_session = get_db_session() # Aktive Jobs abrufen active_jobs = db_session.query(Job).filter( Job.status.in_(["scheduled", "running"]) ).all() now = datetime.now() for job in active_jobs: # Prüfen, ob der Job gestartet werden soll if job.status == "scheduled" and job.start_at <= now: app_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}") job.status = "running" # Steckdose einschalten (implementieren Sie diese Funktion) from utils.job_scheduler import toggle_plug toggle_plug(job.printer_id, True) # Prüfen, ob der Job beendet werden soll elif job.status == "running" and job.end_at <= now: app_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}") job.status = "finished" job.actual_end_time = now # Steckdose ausschalten from utils.job_scheduler import toggle_plug toggle_plug(job.printer_id, False) db_session.commit() db_session.close() except Exception as e: app_logger.error(f"Fehler bei der Überprüfung von Druckaufträgen: {str(e)}") if 'db_session' in locals(): db_session.close() # App starten if __name__ == "__main__": import argparse import threading import ssl import socket import logging # Kommandozeilenargumente parsen parser = argparse.ArgumentParser(description='MYP Platform - 3D-Drucker Reservierungssystem') parser.add_argument('--port', type=int, help='Port für den Server (überschreibt die Konfiguration)') parser.add_argument('--no-ssl', action='store_true', help='Deaktiviert SSL/HTTPS') parser.add_argument('--dual-protocol', action='store_true', help='Startet sowohl HTTP als auch HTTPS Server') args = parser.parse_args() # Initialisierung init_app() # Port aus Kommandozeilenargument verwenden, falls angegeben port = args.port if args.port else FLASK_PORT # SSL-Kontext abrufen ssl_context = None if SSL_ENABLED and not args.no_ssl: try: if SSL_CERT_PATH and SSL_KEY_PATH: ssl_context = (SSL_CERT_PATH, SSL_KEY_PATH) logging.info(f"SSL aktiviert mit Zertifikat: {SSL_CERT_PATH}") else: ssl_context = 'adhoc' logging.info("SSL aktiviert mit selbstsigniertem Ad-hoc-Zertifikat") except Exception as e: logging.error(f"Fehler beim Laden des SSL-Kontexts: {e}") ssl_context = None # Dual-Protokoll-Modus: HTTP und HTTPS gleichzeitig if args.dual_protocol: # Funktion zum Starten des HTTP-Servers def start_http_server(): try: logging.info(f"Starte HTTP-Server auf Port 80...") # Kopie der App erstellen from werkzeug.serving import run_simple run_simple('0.0.0.0', 80, app, threaded=True) except socket.error as e: logging.error(f"Konnte HTTP-Server nicht starten: {e}") # Funktion zum Starten des HTTPS-Servers def start_https_server(): try: if ssl_context: logging.info(f"Starte HTTPS-Server auf Port {port}...") app.run(host='0.0.0.0', port=port, ssl_context=ssl_context, threaded=True) else: logging.warning("HTTPS deaktiviert aufgrund fehlender Zertifikate") app.run(host='0.0.0.0', port=port, threaded=True) except socket.error as e: logging.error(f"Konnte HTTPS-Server nicht starten: {e}") # Beide Server in separaten Threads starten http_thread = threading.Thread(target=start_http_server) https_thread = threading.Thread(target=start_https_server) http_thread.daemon = True https_thread.daemon = True http_thread.start() https_thread.start() # Warten, bis beide Threads beendet sind (was sie normalerweise nicht sein sollten) http_thread.join() https_thread.join() else: # Normaler Modus - entweder HTTP oder HTTPS if ssl_context: logging.info(f"Starte HTTPS-Server auf Port {port}...") app.run(host='0.0.0.0', port=port, ssl_context=ssl_context, threaded=True) else: logging.info(f"Starte HTTP-Server auf Port {port}...") app.run(host='0.0.0.0', port=port, threaded=True) # Content Security Policy anpassen @app.after_request def add_security_headers(response): """Fügt Sicherheitsheader zu allen Antworten hinzu""" # Content Security Policy definieren csp_directives = [ "default-src 'self'", "script-src 'self' 'unsafe-inline' 'unsafe-eval'", "script-src-elem 'self' 'unsafe-inline'", "style-src 'self' 'unsafe-inline'", "img-src 'self' data:", "font-src 'self'", "connect-src 'self'", "worker-src 'self'", # Erlaubt Service Worker "manifest-src 'self'" ] # Setze CSP Header response.headers['Content-Security-Policy'] = "; ".join(csp_directives) # Weitere Sicherheitsheader response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'SAMEORIGIN' response.headers['X-XSS-Protection'] = '1; mode=block' return response @app.route("/privacy") def privacy_page(): """Zeigt die Datenschutzseite an.""" return render_template("privacy.html") @app.route("/terms") def terms_page(): """Zeigt die Nutzungsbedingungen an.""" return render_template("terms.html") @app.route("/api/stats/export", methods=["GET"]) @login_required def export_stats(): """Exportiert Statistiken als JSON-Datei.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können Statistiken exportieren"}), 403 try: db_session = get_db_session() # Grundlegende Statistiken sammeln stats = db_session.query(Stats).first() if not stats: stats = Stats() # Benutzerzahlen user_count = db_session.query(User).count() active_user_count = db_session.query(User).filter(User.active == True).count() # Druckerzahlen printer_count = db_session.query(Printer).count() active_printer_count = db_session.query(Printer).filter(Printer.active == True).count() # Jobstatistiken total_jobs = db_session.query(Job).count() completed_jobs = db_session.query(Job).filter(Job.status == "finished").count() active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count() failed_jobs = db_session.query(Job).filter(Job.status == "failed").count() # Berechne durchschnittliche Druckzeit jobs_with_duration = db_session.query(Job).filter( Job.start_at != None, Job.actual_end_time != None ).all() total_print_time = 0 avg_print_time = 0 if jobs_with_duration: for job in jobs_with_duration: duration = (job.actual_end_time - job.start_at).total_seconds() total_print_time += duration avg_print_time = total_print_time / len(jobs_with_duration) if len(jobs_with_duration) > 0 else 0 # Füge zusätzliche Statistiken für jede Druckerart hinzu printer_stats = [] printers = db_session.query(Printer).all() for printer in printers: printer_jobs = db_session.query(Job).filter(Job.printer_id == printer.id).count() printer_success_jobs = db_session.query(Job).filter( Job.printer_id == printer.id, Job.status == "finished" ).count() success_rate = (printer_success_jobs / printer_jobs * 100) if printer_jobs > 0 else 0 printer_stats.append({ "id": printer.id, "name": printer.name, "model": printer.model, "location": printer.location, "total_jobs": printer_jobs, "success_rate": round(success_rate, 2), "active": printer.active }) # Export-Daten zusammenstellen export_data = { "generated_at": datetime.now().isoformat(), "users": { "total": user_count, "active": active_user_count }, "printers": { "total": printer_count, "active": active_printer_count, "details": printer_stats }, "jobs": { "total": total_jobs, "completed": completed_jobs, "active": active_jobs, "failed": failed_jobs, "success_rate": round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0 }, "print_time": { "total_seconds": total_print_time, "total_hours": round(total_print_time / 3600, 2), "average_seconds": avg_print_time, "average_minutes": round(avg_print_time / 60, 2) }, "system": { "version": "3.0.0", "uptime_days": get_system_uptime_days() } } db_session.close() # Als Datei zum Download anbieten from flask import make_response import json response = make_response(json.dumps(export_data, indent=4)) response.headers["Content-Disposition"] = "attachment; filename=stats_export.json" response.headers["Content-Type"] = "application/json" return response except Exception as e: app_logger.error(f"Fehler beim Exportieren der Statistiken: {str(e)}") return jsonify({"error": f"Fehler beim Exportieren: {str(e)}"}), 500 def get_system_uptime_days(): """Gibt die Systemlaufzeit in Tagen zurück.""" try: with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) return round(uptime_seconds / 86400, 2) # Umrechnung in Tage except Exception: return 0 @app.route("/api/printers/add", methods=["POST"]) @login_required def add_printer(): """Fügt einen neuen Drucker hinzu.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können Drucker hinzufügen"}), 403 try: data = request.json # Pflichtfelder prüfen required_fields = ["name", "mac_address", "plug_ip", "plug_username", "plug_password"] for field in required_fields: if field not in data or not data[field]: return jsonify({"error": f"Das Feld '{field}' ist ein Pflichtfeld"}), 400 # Druckerdaten extrahieren name = data["name"] model = data.get("model", "") location = data.get("location", "") mac_address = data["mac_address"] plug_ip = data["plug_ip"] plug_username = data["plug_username"] plug_password = data["plug_password"] db_session = get_db_session() # Prüfen, ob ein Drucker mit dieser MAC-Adresse bereits existiert existing_printer = db_session.query(Printer).filter(Printer.mac_address == mac_address).first() if existing_printer: db_session.close() return jsonify({"error": "Ein Drucker mit dieser MAC-Adresse existiert bereits"}), 400 # Neuen Drucker erstellen new_printer = Printer( name=name, model=model, location=location, mac_address=mac_address, plug_ip=plug_ip, plug_username=plug_username, plug_password=plug_password, status="offline", active=True, created_at=datetime.now() ) db_session.add(new_printer) db_session.commit() # Drucker-ID für die Antwort speichern printer_id = new_printer.id # Drucker-Objekt für die Antwort serialisieren printer_dict = new_printer.to_dict() db_session.close() printers_logger.info(f"Neuer Drucker {name} (ID: {printer_id}) wurde von {current_user.username} hinzugefügt") return jsonify({"success": True, "message": "Drucker erfolgreich hinzugefügt", "printer": printer_dict}), 201 except Exception as e: printers_logger.error(f"Fehler beim Hinzufügen eines Druckers: {str(e)}") return jsonify({"error": f"Fehler beim Hinzufügen des Druckers: {str(e)}"}), 500 @app.route("/my/jobs") @login_required def my_jobs(): """Zeigt die persönlichen Jobs des angemeldeten Benutzers an.""" # Weiterleitung zur Jobs-Seite mit Filter für den aktuellen Benutzer return redirect(url_for("jobs_page", user_filter=current_user.id)) @app.route("/api/user/export", methods=["GET"]) @login_required def api_user_export_redirect(): """Leitet den alten API-Pfad zum neuen Benutzer-Export weiter.""" return redirect(url_for("user.export_user_data")) @app.route("/api/user/profile", methods=["PUT"]) @login_required def api_user_profile_update_redirect(): """Leitet den alten API-Pfad zum neuen Benutzer-Profil-Update weiter.""" return redirect(url_for("user.update_profile_api")) @app.route("/user/update-settings", methods=["POST"]) @login_required def user_update_settings_redirect(): """Weiterleitung zur Blueprint-Route für Settings-Updates.""" return redirect(url_for("user.api_update_settings")) # SSL-Verwaltungsrouten @app.route("/api/ssl/info", methods=["GET"]) @login_required def get_ssl_info(): """Gibt Informationen über das aktuelle SSL-Zertifikat zurück.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können SSL-Informationen abrufen"}), 403 try: from utils.ssl_manager import ssl_manager cert_info = ssl_manager.get_certificate_info() if not cert_info: return jsonify({ "exists": False, "message": "Kein SSL-Zertifikat gefunden" }) return jsonify({ "exists": True, "certificate": cert_info, "paths": { "cert": ssl_manager.cert_path, "key": ssl_manager.key_path } }) except Exception as e: app_logger.error(f"Fehler beim Abrufen der SSL-Informationen: {e}") return jsonify({"error": f"Fehler beim Abrufen der SSL-Informationen: {str(e)}"}), 500 @app.route("/api/ssl/generate", methods=["POST"]) @login_required def generate_ssl_certificate(): """Generiert ein neues SSL-Zertifikat.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können SSL-Zertifikate generieren"}), 403 try: from utils.ssl_manager import ssl_manager # Parameter aus Request extrahieren data = request.json or {} key_size = data.get("key_size", 4096) validity_days = data.get("validity_days", 365) # Zertifikat generieren success = ssl_manager.generate_mercedes_certificate(key_size, validity_days) if success: cert_info = ssl_manager.get_certificate_info() app_logger.info(f"SSL-Zertifikat von {current_user.username} generiert") return jsonify({ "success": True, "message": "SSL-Zertifikat erfolgreich generiert", "certificate": cert_info }) else: return jsonify({ "success": False, "error": "Fehler beim Generieren des SSL-Zertifikats" }), 500 except Exception as e: app_logger.error(f"Fehler beim Generieren des SSL-Zertifikats: {e}") return jsonify({"error": f"Fehler beim Generieren: {str(e)}"}), 500 @app.route("/api/ssl/install", methods=["POST"]) @login_required def install_ssl_certificate(): """Installiert das SSL-Zertifikat im System.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können SSL-Zertifikate installieren"}), 403 try: from utils.ssl_manager import ssl_manager success = ssl_manager.install_system_certificate() if success: app_logger.info(f"SSL-Zertifikat von {current_user.username} im System installiert") return jsonify({ "success": True, "message": "SSL-Zertifikat erfolgreich im System installiert" }) else: return jsonify({ "success": False, "error": "Fehler bei der Installation des SSL-Zertifikats im System" }), 500 except Exception as e: app_logger.error(f"Fehler bei der SSL-Installation: {e}") return jsonify({"error": f"Fehler bei der Installation: {str(e)}"}), 500 @app.route("/api/ssl/copy-raspberry", methods=["POST"]) @login_required def copy_ssl_to_raspberry(): """Kopiert das SSL-Zertifikat auf den Raspberry Pi.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können SSL-Zertifikate kopieren"}), 403 try: from utils.ssl_manager import ssl_manager # Parameter aus Request extrahieren data = request.json or {} host = data.get("host", "raspberrypi") user = data.get("user", "user") dest = data.get("dest", "/home/user/Projektarbeit-MYP/backend/app/certs") success = ssl_manager.copy_to_raspberry(host, user, dest) if success: app_logger.info(f"SSL-Zertifikat von {current_user.username} auf Raspberry Pi kopiert") return jsonify({ "success": True, "message": f"SSL-Zertifikat erfolgreich auf {host} kopiert" }) else: return jsonify({ "success": False, "error": "Fehler beim Kopieren des SSL-Zertifikats auf den Raspberry Pi" }), 500 except Exception as e: app_logger.error(f"Fehler beim Kopieren auf Raspberry Pi: {e}") return jsonify({"error": f"Fehler beim Kopieren: {str(e)}"}), 500 @app.route("/api/ssl/validate", methods=["GET"]) @login_required def validate_ssl_certificate(): """Validiert das aktuelle SSL-Zertifikat.""" if not current_user.is_admin: return jsonify({"error": "Nur Administratoren können SSL-Zertifikate validieren"}), 403 try: from utils.ssl_manager import ssl_manager is_valid = ssl_manager.is_certificate_valid() cert_info = ssl_manager.get_certificate_info() return jsonify({ "valid": is_valid, "certificate": cert_info, "message": "Zertifikat ist gültig" if is_valid else "Zertifikat ist ungültig oder läuft bald ab" }) except Exception as e: app_logger.error(f"Fehler bei der SSL-Validierung: {e}") return jsonify({"error": f"Fehler bei der Validierung: {str(e)}"}), 500 # Neue Admin-System-Management-Routen @app.route("/api/admin/cache/clear", methods=["POST"]) @login_required def clear_cache(): """Leert den System-Cache.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: import shutil import tempfile # Flask-Cache leeren (falls vorhanden) cache_dir = os.path.join(tempfile.gettempdir(), 'flask_cache') if os.path.exists(cache_dir): shutil.rmtree(cache_dir) # Python __pycache__ leeren for root, dirs, files in os.walk('.'): for dir_name in dirs: if dir_name == '__pycache__': pycache_path = os.path.join(root, dir_name) shutil.rmtree(pycache_path) app_logger.info(f"Cache wurde von Admin {current_user.username} geleert") return jsonify({"success": True, "message": "Cache erfolgreich geleert"}) except Exception as e: app_logger.error(f"Fehler beim Leeren des Cache: {str(e)}") return jsonify({"error": f"Fehler beim Leeren des Cache: {str(e)}"}), 500 @app.route("/api/admin/database/optimize", methods=["POST"]) @login_required def optimize_database(): """Optimiert die Datenbank.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: db_session = get_db_session() # VACUUM und ANALYZE für SQLite db_session.execute(sqlalchemy.text("VACUUM")) db_session.execute(sqlalchemy.text("ANALYZE")) db_session.commit() # Alte abgeschlossene Jobs löschen (älter als 30 Tage) thirty_days_ago = datetime.now() - timedelta(days=30) old_jobs = db_session.query(Job).filter( Job.status.in_(["completed", "failed", "cancelled"]), Job.created_at < thirty_days_ago ).count() db_session.query(Job).filter( Job.status.in_(["completed", "failed", "cancelled"]), Job.created_at < thirty_days_ago ).delete() db_session.commit() db_session.close() app_logger.info(f"Datenbank wurde von Admin {current_user.username} optimiert. {old_jobs} alte Jobs entfernt.") return jsonify({ "success": True, "message": f"Datenbank optimiert. {old_jobs} alte Jobs entfernt." }) except Exception as e: app_logger.error(f"Fehler bei der Datenbankoptimierung: {str(e)}") return jsonify({"error": f"Fehler bei der Datenbankoptimierung: {str(e)}"}), 500 @app.route("/api/admin/backup/create", methods=["POST"]) @login_required def create_backup(): """Erstellt ein System-Backup.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: import shutil from datetime import datetime backup_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'backups') os.makedirs(backup_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"backup_{timestamp}" backup_path = os.path.join(backup_dir, backup_name) # Datenbank-Backup db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db') if os.path.exists(db_path): shutil.copy2(db_path, os.path.join(backup_path, 'database.db')) # Konfigurationsdateien config_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config') if os.path.exists(config_dir): shutil.copytree(config_dir, os.path.join(backup_path, 'config')) # Uploads-Verzeichnis uploads_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') if os.path.exists(uploads_dir): shutil.copytree(uploads_dir, os.path.join(backup_path, 'uploads')) # Backup komprimieren shutil.make_archive(backup_path, 'zip', backup_path) shutil.rmtree(backup_path) # Temporäres Verzeichnis löschen app_logger.info(f"Backup wurde von Admin {current_user.username} erstellt: {backup_name}.zip") return jsonify({ "success": True, "message": f"Backup erfolgreich erstellt: {backup_name}.zip" }) except Exception as e: app_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}") return jsonify({"error": f"Fehler beim Erstellen des Backups: {str(e)}"}), 500 @app.route("/api/admin/printers/update", methods=["POST"]) @login_required def update_printers(): """Aktualisiert alle Drucker-Verbindungen.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: db_session = get_db_session() printers = db_session.query(Printer).all() updated_count = 0 error_count = 0 for printer in printers: try: # Drucker-Status prüfen import requests import socket # Ping-Test sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) result = sock.connect_ex((printer.ip_address, 80)) sock.close() if result == 0: printer.status = "online" printer.last_seen = datetime.now() updated_count += 1 else: printer.status = "offline" error_count += 1 except Exception as e: printer.status = "error" error_count += 1 printers_logger.error(f"Fehler beim Aktualisieren von Drucker {printer.name}: {str(e)}") db_session.commit() db_session.close() app_logger.info(f"Drucker wurden von Admin {current_user.username} aktualisiert. {updated_count} online, {error_count} Fehler.") return jsonify({ "success": True, "message": f"Drucker aktualisiert: {updated_count} online, {error_count} offline/Fehler" }) except Exception as e: app_logger.error(f"Fehler beim Aktualisieren der Drucker: {str(e)}") return jsonify({"error": f"Fehler beim Aktualisieren der Drucker: {str(e)}"}), 500 @app.route("/api/admin/system/restart", methods=["POST"]) @login_required def restart_system(): """Startet das System neu.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: app_logger.warning(f"System-Neustart wurde von Admin {current_user.username} initiiert") # Graceful shutdown def shutdown_server(): import time time.sleep(2) # Kurz warten, damit die Response gesendet wird os._exit(0) # Shutdown in separatem Thread import threading shutdown_thread = threading.Thread(target=shutdown_server) shutdown_thread.start() return jsonify({ "success": True, "message": "System wird neugestartet..." }) except Exception as e: app_logger.error(f"Fehler beim Neustart des Systems: {str(e)}") return jsonify({"error": f"Fehler beim Neustart des Systems: {str(e)}"}), 500 @app.route("/api/admin/system/status", methods=["GET"]) @login_required def get_system_status(): """Gibt den aktuellen Systemstatus zurück.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: import psutil import sqlite3 # CPU und Memory cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') # Uptime boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds // 86400) uptime_hours = int((uptime_seconds % 86400) // 3600) uptime_minutes = int((uptime_seconds % 3600) // 60) # Datenbank-Status db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db') db_size = 0 db_connections = 0 if os.path.exists(db_path): db_size = os.path.getsize(db_path) / (1024 * 1024) # MB # Scheduler-Status scheduler_running = False try: from utils.job_scheduler import scheduler scheduler_running = scheduler.running except: pass # Nächster Job db_session = get_db_session() next_job = db_session.query(Job).filter( Job.status == "scheduled" ).order_by(Job.created_at.asc()).first() next_job_time = "Keine geplanten Jobs" if next_job: next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M") db_session.close() return jsonify({ "cpu_usage": round(cpu_percent, 1), "memory_usage": round(memory.percent, 1), "disk_usage": round((disk.used / disk.total) * 100, 1), "uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m", "db_size": f"{db_size:.1f} MB", "db_connections": db_connections, "scheduler_running": scheduler_running, "next_job": next_job_time }) except Exception as e: app_logger.error(f"Fehler beim Abrufen des Systemstatus: {str(e)}") return jsonify({"error": f"Fehler beim Abrufen des Systemstatus: {str(e)}"}), 500 @app.route("/api/admin/database/status", methods=["GET"]) @login_required def get_database_status(): """Gibt den Datenbankstatus zurück.""" if not current_user.is_admin: return jsonify({"error": "Keine Berechtigung"}), 403 try: db_session = get_db_session() # Verbindungstest db_session.execute(sqlalchemy.text("SELECT 1")) # Datenbankgröße db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db') db_size = 0 if os.path.exists(db_path): db_size = os.path.getsize(db_path) / (1024 * 1024) # MB # Tabellenstatistiken user_count = db_session.query(User).count() printer_count = db_session.query(Printer).count() job_count = db_session.query(Job).count() db_session.close() return jsonify({ "connected": True, "size": f"{db_size:.1f} MB", "tables": { "users": user_count, "printers": printer_count, "jobs": job_count } }) except Exception as e: app_logger.error(f"Fehler beim Abrufen des Datenbankstatus: {str(e)}") return jsonify({ "connected": False, "error": str(e) }), 500 @app.route("/admin/users/add") @login_required def admin_add_user(): """Zeigt das Formular zum Hinzufügen eines neuen Benutzers an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) return render_template("admin_add_user.html") @app.route("/admin/users//edit") @login_required def admin_edit_user(user_id): """Zeigt das Formular zum Bearbeiten eines Benutzers an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) db_session = get_db_session() try: user = db_session.query(User).filter(User.id == user_id).first() if not user: flash("Benutzer nicht gefunden.", "error") return redirect(url_for("admin_page", tab="users")) return render_template("admin_edit_user.html", user=user) finally: db_session.close() @app.route("/admin/printers/add") @login_required def admin_add_printer(): """Zeigt das Formular zum Hinzufügen eines neuen Druckers an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) return render_template("admin_add_printer.html") @app.route("/admin/printers//manage") @login_required def admin_manage_printer(printer_id): """Zeigt die Drucker-Verwaltungsseite an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) db_session = get_db_session() try: printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: flash("Drucker nicht gefunden.", "error") return redirect(url_for("admin_page", tab="printers")) return render_template("admin_manage_printer.html", printer=printer) finally: db_session.close() @app.route("/admin/printers//settings") @login_required def admin_printer_settings(printer_id): """Zeigt die Drucker-Einstellungsseite an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) db_session = get_db_session() try: printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: flash("Drucker nicht gefunden.", "error") return redirect(url_for("admin_page", tab="printers")) return render_template("admin_printer_settings.html", printer=printer) finally: db_session.close() @app.route("/admin/settings") @login_required def admin_settings(): """Zeigt die Admin-Einstellungsseite an.""" if not current_user.is_admin: flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error") return redirect(url_for("index")) return render_template("admin_settings.html")