2235 lines
80 KiB
Python

import os
import threading
import time
import json
import secrets
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Union
from functools import wraps
from flask import Flask, request, jsonify, session, render_template, redirect, url_for, flash, Response
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
import sqlalchemy.exc
import sqlalchemy
from PyP100 import PyP110
from flask_wtf.csrf import CSRFProtect
from config.settings import (
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context, FLASK_FALLBACK_PORT,
SSL_ENABLED, SSL_CERT_PATH, SSL_KEY_PATH
)
from utils.logging_config import setup_logging, get_logger, log_startup_info
from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin
from utils.job_scheduler import scheduler
from utils.template_helpers import register_template_helpers
from blueprints.auth import auth_bp
from blueprints.user import user_bp
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
login_manager.login_message_category = "info"
@login_manager.user_loader
def load_user(user_id):
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
db_session.close()
return user
# Jinja2 Context Processors
@app.context_processor
def inject_now():
"""Inject the current datetime into templates."""
return {'now': datetime.now()}
# Custom Jinja2 filter für Datumsformatierung
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Format a datetime object to a German-style date and time string"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return value
return value.strftime(format)
# Blueprints registrieren
try:
from blueprints.kiosk_control import kiosk_bp
app.register_blueprint(kiosk_bp)
print("Kiosk-Kontrolle erfolgreich geladen")
except ImportError:
print("Kiosk-Kontrolle nicht verfügbar (nur im Kiosk-Modus)")
# Auth-Blueprint registrieren
app.register_blueprint(auth_bp)
print("Auth-Blueprint erfolgreich geladen")
# User-Blueprint registrieren
app.register_blueprint(user_bp)
print("User-Blueprint erfolgreich geladen")
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
auth_logger = get_logger("auth")
jobs_logger = get_logger("jobs")
printers_logger = get_logger("printers")
# Custom decorator für Job-Besitzer-Check
def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
is_admin = current_user.is_admin
if not (is_owner or is_admin):
db_session.close()
return jsonify({"error": "Keine Berechtigung"}), 403
db_session.close()
return f(job_id, *args, **kwargs)
return decorated_function
# UI-Routen
@app.route("/")
def index():
if current_user.is_authenticated:
return render_template("index.html")
return redirect(url_for("auth.login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template("dashboard.html")
@app.route("/profile")
@login_required
def profile_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
return redirect(url_for("user.profile"))
@app.route("/profil")
@login_required
def profil_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user.profile"))
@app.route("/settings")
@login_required
def settings_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
return redirect(url_for("user.settings"))
@app.route("/einstellungen")
@login_required
def einstellungen_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user.settings"))
@app.route("/admin")
@login_required
def admin():
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return redirect(url_for("admin_page"))
@app.route("/demo")
@login_required
def components_demo():
"""Demo-Seite für UI-Komponenten"""
return render_template("components_demo.html")
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistik-Seite an."""
return render_template("stats.html")
@app.route("/admin-dashboard")
@login_required
def admin_page():
"""Zeigt die Administrationsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
active_tab = request.args.get('tab', 'users')
# Daten für das Admin-Panel direkt beim Laden vorbereiten
stats = {}
users = []
printers = []
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
system_info = {"cpu": 0, "memory": 0, "disk": 0}
logs = []
db_session = get_db_session()
try:
# Statistiken laden
from sqlalchemy.orm import joinedload
# Benutzeranzahl
stats["total_users"] = db_session.query(User).count()
# Druckeranzahl
stats["total_printers"] = db_session.query(Printer).count()
# Aktive Jobs
stats["active_jobs"] = db_session.query(Job).filter(
Job.status.in_(["scheduled", "running"])
).count()
# Erfolgsrate
total_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"])
).count()
successful_jobs = db_session.query(Job).filter(
Job.status == "completed"
).count()
if total_jobs > 0:
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
else:
stats["success_rate"] = 0
# Benutzer laden
if active_tab == 'users':
users = db_session.query(User).all()
users = [user.to_dict() for user in users]
# Drucker laden
if active_tab == 'printers':
printers = db_session.query(Printer).all()
printers = [printer.to_dict() for printer in printers]
# Scheduler-Status laden
if active_tab == 'scheduler':
from utils.scheduler import scheduler_is_running
scheduler_status = {
"running": scheduler_is_running(),
"message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt"
}
# System-Informationen laden
if active_tab == 'system':
import psutil
system_info = {
"cpu": psutil.cpu_percent(),
"memory": psutil.virtual_memory().percent,
"disk": psutil.disk_usage('/').percent,
"uptime": get_system_uptime_days()
}
# Logs laden
if active_tab == 'logs':
import os
log_level = request.args.get('log_level', 'all')
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
# Logeinträge sammeln
app_logs = []
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
log_file = os.path.join(log_dir, category, f'{category}.log')
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
if log_level != 'all':
if log_level.upper() not in line:
continue
app_logs.append({
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
'category': category,
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
})
# Nach Zeitstempel sortieren (neueste zuerst)
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
finally:
db_session.close()
return render_template(
"admin.html",
active_tab=active_tab,
stats=stats,
users=users,
printers=printers,
scheduler_status=scheduler_status,
system_info=system_info,
logs=logs
)
# Direkter Zugriff auf Logout-Route (für Fallback)
@app.route("/logout", methods=["GET", "POST"])
def logout_redirect():
"""Leitet zur Blueprint-Logout-Route weiter."""
return redirect(url_for("auth.logout"))
# Job-Routen
@app.route("/api/jobs", methods=["GET"])
@login_required
def get_jobs():
db_session = get_db_session()
try:
# Import joinedload for eager loading
from sqlalchemy.orm import joinedload
# Admin sieht alle Jobs, User nur eigene
if current_user.is_admin:
# Eagerly load the user and printer relationships to avoid detached instance errors
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
else:
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
# Convert jobs to dictionaries before closing the session
job_dicts = [job.to_dict() for job in jobs]
db_session.close()
return jsonify({
"jobs": job_dicts
})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job(job_id):
db_session = get_db_session()
try:
from sqlalchemy.orm import joinedload
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs/active', methods=['GET'])
@login_required
def get_active_jobs():
"""
Gibt alle aktiven Jobs zurück.
"""
try:
db_session = get_db_session()
from sqlalchemy.orm import joinedload
active_jobs = db_session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(
Job.status.in_(["scheduled", "running"])
).all()
result = []
for job in active_jobs:
job_dict = job.to_dict()
# Aktuelle Restzeit berechnen
if job.status == "running" and job.end_at:
remaining_time = job.end_at - datetime.now()
if remaining_time.total_seconds() > 0:
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
else:
job_dict["remaining_minutes"] = 0
result.append(job_dict)
db_session.close()
return jsonify({"jobs": result})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs', methods=['POST'])
@login_required
def create_job():
"""
Erstellt einen neuen Job mit dem Status "scheduled".
Body: {
"printer_id": int,
"start_iso": str, # ISO-Datum-String
"duration_minutes": int
}
"""
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["printer_id", "start_iso", "duration_minutes"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren und validieren
printer_id = int(data["printer_id"])
start_iso = data["start_iso"]
duration_minutes = int(data["duration_minutes"])
# Optional: Jobtitel und Dateipfad
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
file_path = data.get("file_path")
# Start-Zeit parsen
try:
start_at = datetime.fromisoformat(start_iso)
except ValueError:
return jsonify({"error": "Ungültiges Startdatum"}), 400
# Dauer validieren
if duration_minutes <= 0:
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
# End-Zeit berechnen
end_at = start_at + timedelta(minutes=duration_minutes)
db_session = get_db_session()
# Prüfen, ob der Drucker existiert
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Neuen Job erstellen
new_job = Job(
name=name,
printer_id=printer_id,
user_id=current_user.id,
owner_id=current_user.id,
start_at=start_at,
end_at=end_at,
status="scheduled",
file_path=file_path,
duration_minutes=duration_minutes
)
db_session.add(new_job)
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = new_job.to_dict()
db_session.close()
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
return jsonify({"job": job_dict}), 201
except Exception as e:
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
@login_required
@job_owner_required
def extend_job(job_id):
"""
Verlängert die Endzeit eines Jobs.
Body: {
"extra_minutes": int
}
"""
try:
data = request.json
# Prüfen, ob die erforderlichen Daten vorhanden sind
if "extra_minutes" not in data:
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
extra_minutes = int(data["extra_minutes"])
# Validieren
if extra_minutes <= 0:
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job verlängert werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
# Endzeit aktualisieren
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
job.duration_minutes += extra_minutes
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
@login_required
def finish_job(job_id):
"""
Beendet einen Job manuell und schaltet die Steckdose aus.
Nur für Administratoren erlaubt.
"""
try:
# Prüfen, ob der Benutzer Administrator ist
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
db_session = get_db_session()
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job beendet werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
if not toggle_plug(job.printer_id, False):
# Trotzdem weitermachen, aber Warnung loggen
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = datetime.now()
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
db_session = get_db_session()
try:
printers = db_session.query(Printer).all()
# Optimierte Drucker-Liste mit schneller Status-Bestimmung
printer_list = []
for printer in printers:
# Bestimme Status basierend auf hardkodierten Druckern
printer_config = PRINTERS.get(printer.name)
if printer_config:
status = "available" # Drucker verfügbar
active = True
else:
status = "offline"
active = False
# Aktualisiere Status in der Datenbank
printer.status = status
printer.active = active
printer_data = printer.to_dict()
printer_data["status"] = status
printer_data["active"] = active
printer_list.append(printer_data)
# Speichere Updates
db_session.commit()
db_session.close()
return jsonify({
"printers": printer_list
})
except Exception as e:
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
db_session.rollback()
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
# API-Routen für Statistiken
@app.route("/api/stats/users", methods=["GET"])
@login_required
def get_stats_users():
"""Gibt die Anzahl der Benutzer zurück."""
db_session = get_db_session()
try:
user_count = db_session.query(User).count()
db_session.close()
return jsonify({"value": user_count})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/uptime", methods=["GET"])
@login_required
def get_stats_uptime():
"""Gibt die Systemlaufzeit zurück."""
import os
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
uptime_days = int(uptime_seconds / 86400)
return jsonify({"value": f"{uptime_days} Tage"})
@app.route("/api/stats/active-jobs", methods=["GET"])
@login_required
def get_stats_active_jobs():
"""Gibt die Anzahl der aktiven Jobs zurück."""
db_session = get_db_session()
try:
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
db_session.close()
return jsonify({"value": active_jobs})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/available-printers", methods=["GET"])
@login_required
def get_stats_available_printers():
"""Gibt die Anzahl der verfügbaren Drucker zurück."""
db_session = get_db_session()
try:
available_printers = db_session.query(Printer).filter(Printer.active == True).count()
db_session.close()
return jsonify({"value": available_printers})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/success-rate", methods=["GET"])
@login_required
def get_stats_success_rate():
"""Gibt die Erfolgsrate der Druckaufträge zurück."""
db_session = get_db_session()
try:
total_jobs = db_session.query(Job).filter(Job.status == "finished").count()
if total_jobs == 0:
success_rate = 0
else:
success_jobs = db_session.query(Job).filter(
Job.status == "finished",
Job.actual_end_time != None
).count()
success_rate = int((success_jobs / total_jobs) * 100)
db_session.close()
return jsonify({"value": f"{success_rate}%"})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/print-time", methods=["GET"])
@login_required
def get_stats_print_time():
"""Gibt die gesamte Druckzeit zurück."""
db_session = get_db_session()
try:
stats = db_session.query(Stats).first()
if stats and stats.total_print_time:
hours = stats.total_print_time // 3600
db_session.close()
return jsonify({"value": f"{hours}h"})
db_session.close()
return jsonify({"value": "0h"})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/activity", methods=["GET"])
@login_required
def get_activity():
"""Gibt die letzten Aktivitäten zurück."""
db_session = get_db_session()
try:
recent_jobs = db_session.query(Job).order_by(Job.created_at.desc()).limit(5).all()
activities = [
{
"type": "job",
"id": job.id,
"title": job.name,
"status": job.status,
"timestamp": job.created_at.isoformat() if job.created_at else None
}
for job in recent_jobs
]
db_session.close()
return jsonify(activities)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/printers/status", methods=["GET"])
@login_required
def get_printers_status():
"""Gibt den Status aller Drucker zurück - optimiert für schnelle Antwort."""
db_session = get_db_session()
try:
printers = db_session.query(Printer).all()
# Schnelle Status-Bestimmung basierend auf hardkodierten Druckern
status_data = []
for printer in printers:
# Bestimme Status basierend auf IP-Adresse aus der Konfiguration
printer_config = PRINTERS.get(printer.name)
if printer_config:
# Drucker ist in der Konfiguration -> als online betrachten
status = "online"
active = True
else:
# Drucker nicht in Konfiguration -> offline
status = "offline"
active = False
# Aktualisiere den Status in der Datenbank für Konsistenz
printer.status = status
printer.active = active
status_data.append({
"id": printer.id,
"name": printer.name,
"status": status,
"active": active,
"ip_address": printer.ip_address,
"location": printer.location
})
# Speichere die aktualisierten Status
db_session.commit()
db_session.close()
return jsonify(status_data)
except Exception as e:
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Abrufen des Drucker-Status: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def get_current_job():
"""Gibt den aktuellen Job des Benutzers zurück."""
db_session = get_db_session()
try:
current_job = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status.in_(["scheduled", "running"])
).order_by(Job.start_at).first()
if current_job:
job_data = current_job.to_dict()
else:
job_data = None
db_session.close()
return jsonify(job_data)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
# Admin API Endpoints
@app.route("/api/users", methods=["GET"])
def get_users():
"""Returns a list of all users (admin only)"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
db_session = get_db_session()
try:
users = db_session.query(User).all()
users_list = []
for user in users:
users_list.append({
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None
})
db_session.close()
return jsonify({"users": users_list})
except Exception as e:
db_session.close()
app_logger.error(f"Error fetching users: {str(e)}")
return jsonify({"error": "Failed to fetch users"}), 500
@app.route("/api/users", methods=["POST"])
@login_required
def create_user():
"""Create a new user (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um neue Benutzer anzulegen.", "error")
return redirect(url_for('admin_page', tab='users'))
db_session = get_db_session()
try:
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
email = request.form.get('email')
name = request.form.get('name')
password = request.form.get('password')
role = request.form.get('role', 'user')
if not email or not password:
db_session.close()
flash("E-Mail und Passwort sind Pflichtfelder.", "error")
return redirect(url_for('admin_page', tab='users'))
# Check if user with same email already exists
existing_user = db_session.query(User).filter(
User.email == email
).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
return redirect(url_for('admin_page', tab='users'))
# Create new user
new_user = User(
email=email,
name=name if name else "",
username=email.split("@")[0], # Default username from email
role=role,
active=True,
created_at=datetime.now()
)
# Set password
new_user.set_password(password)
db_session.add(new_user)
db_session.commit()
user_id = new_user.id
db_session.close()
app_logger.info(f"New user created: {new_user.email} (ID: {user_id})")
flash(f"Benutzer {email} wurde erfolgreich angelegt.", "success")
return redirect(url_for('admin_page', tab='users'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error creating user: {str(e)}")
flash(f"Fehler beim Anlegen des Benutzers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='users'))
@app.route("/api/users/<int:user_id>", methods=["DELETE"])
@login_required
def delete_user(user_id):
"""Delete a user (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um Benutzer zu löschen.", "error")
return redirect(url_for('admin_page', tab='users'))
# Prevent admin from deleting themselves
if user_id == current_user.id:
flash("Sie können Ihren eigenen Account nicht löschen.", "error")
return redirect(url_for('admin_page', tab='users'))
db_session = get_db_session()
try:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for('admin_page', tab='users'))
# Prevent deletion of admin users
if user.role == "admin":
db_session.close()
flash("Administratoren können nicht gelöscht werden.", "error")
return redirect(url_for('admin_page', tab='users'))
email = user.email # Save for later logging
db_session.delete(user)
db_session.commit()
db_session.close()
app_logger.info(f"User deleted: {email} (ID: {user_id})")
flash(f"Benutzer {email} wurde erfolgreich gelöscht.", "success")
return redirect(url_for('admin_page', tab='users'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error deleting user: {str(e)}")
flash(f"Fehler beim Löschen des Benutzers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='users'))
@app.route("/api/printers", methods=["POST"])
@login_required
def create_printer():
"""Create a new printer (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um neue Drucker anzulegen.", "error")
return redirect(url_for('admin_page', tab='printers'))
db_session = get_db_session()
try:
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
name = request.form.get('name')
model = request.form.get('model')
location = request.form.get('location')
mac_address = request.form.get('mac_address')
plug_ip = request.form.get('plug_ip')
plug_username = request.form.get('plug_username')
plug_password = request.form.get('plug_password')
# Check required fields
if not name or not mac_address:
db_session.close()
flash("Name und MAC-Adresse sind Pflichtfelder.", "error")
return redirect(url_for('admin_page', tab='printers'))
# Check if printer with same MAC already exists
existing_printer = db_session.query(Printer).filter(
Printer.mac_address == mac_address
).first()
if existing_printer:
db_session.close()
flash("Ein Drucker mit dieser MAC-Adresse existiert bereits.", "error")
return redirect(url_for('admin_page', tab='printers'))
# Create new printer
new_printer = Printer(
name=name,
model=model or "",
location=location or "",
mac_address=mac_address,
plug_ip=plug_ip or "",
plug_username=plug_username or "",
plug_password=plug_password or "",
active=True
)
db_session.add(new_printer)
db_session.commit()
printer_id = new_printer.id
db_session.close()
app_logger.info(f"New printer created: {new_printer.name} (ID: {printer_id})")
flash(f"Drucker {name} wurde erfolgreich angelegt.", "success")
return redirect(url_for('admin_page', tab='printers'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error creating printer: {str(e)}")
flash(f"Fehler beim Anlegen des Druckers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='printers'))
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
@login_required
def delete_printer(printer_id):
"""Delete a printer (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um Drucker zu löschen.", "error")
return redirect(url_for('admin_page', tab='printers'))
db_session = get_db_session()
try:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
flash("Drucker nicht gefunden.", "error")
return redirect(url_for('admin_page', tab='printers'))
printer_name = printer.name # Save for later logging
db_session.delete(printer)
db_session.commit()
db_session.close()
app_logger.info(f"Printer deleted: {printer_name} (ID: {printer_id})")
flash(f"Drucker {printer_name} wurde erfolgreich gelöscht.", "success")
return redirect(url_for('admin_page', tab='printers'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error deleting printer: {str(e)}")
flash(f"Fehler beim Löschen des Druckers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='printers'))
@app.route("/api/stats", methods=["GET"])
@login_required
def get_stats():
"""Get overall system statistics"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
db_session = get_db_session()
try:
# Get basic stats
stats = db_session.query(Stats).first()
if not stats:
# Create initial stats if none exist
stats = Stats()
db_session.add(stats)
db_session.commit()
# Count users, printers, active jobs
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
result = {
"total_users": user_count,
"total_printers": printer_count,
"active_jobs": active_jobs,
"total_print_time_hours": round(stats.total_print_time / 3600, 1) if stats.total_print_time else 0,
"total_jobs_completed": stats.total_jobs_completed or 0,
"total_material_used": stats.total_material_used or 0,
"last_updated": stats.last_updated.isoformat() if stats.last_updated else None
}
db_session.close()
return jsonify(result)
except Exception as e:
db_session.close()
app_logger.error(f"Error fetching stats: {str(e)}")
return jsonify({"error": "Failed to fetch statistics"}), 500
@app.route("/api/scheduler/status", methods=["GET"])
@login_required
def get_scheduler_status():
"""Get the current status of the job scheduler"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
try:
is_running = scheduler.is_running()
tasks = []
# Add information about scheduler tasks
for task_id, task in scheduler.get_tasks().items():
tasks.append({
"id": task_id,
"interval": task.get("interval", 0),
"last_run": task.get("last_run"),
"enabled": task.get("enabled", False)
})
return jsonify({
"running": is_running,
"tasks": tasks,
"uptime": scheduler.get_uptime()
})
except Exception as e:
app_logger.error(f"Error fetching scheduler status: {str(e)}")
return jsonify({"error": "Failed to fetch scheduler status"}), 500
@app.route("/api/scheduler/start", methods=["POST"])
@login_required
def start_scheduler():
"""Start the job scheduler (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um den Scheduler zu starten.", "error")
return redirect(url_for('admin_page', tab='scheduler'))
try:
from utils.scheduler import start_scheduler as start_scheduler_func
result = start_scheduler_func()
if result:
app_logger.info(f"Scheduler started by admin user: {current_user.email}")
flash("Der Scheduler wurde erfolgreich gestartet.", "success")
else:
flash("Der Scheduler konnte nicht gestartet werden oder läuft bereits.", "warning")
return redirect(url_for('admin_page', tab='scheduler'))
except Exception as e:
app_logger.error(f"Error starting scheduler: {str(e)}")
flash(f"Fehler beim Starten des Schedulers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='scheduler'))
@app.route("/api/scheduler/stop", methods=["POST"])
@login_required
def stop_scheduler():
"""Stop the job scheduler (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um den Scheduler zu stoppen.", "error")
return redirect(url_for('admin_page', tab='scheduler'))
try:
from utils.scheduler import stop_scheduler as stop_scheduler_func
result = stop_scheduler_func()
if result:
app_logger.info(f"Scheduler stopped by admin user: {current_user.email}")
flash("Der Scheduler wurde erfolgreich gestoppt.", "success")
else:
flash("Der Scheduler konnte nicht gestoppt werden oder läuft nicht.", "warning")
return redirect(url_for('admin_page', tab='scheduler'))
except Exception as e:
app_logger.error(f"Error stopping scheduler: {str(e)}")
flash(f"Fehler beim Stoppen des Schedulers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='scheduler'))
@app.route("/api/logs", methods=["GET"])
@login_required
def get_logs():
"""Get system logs (admin only)"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
try:
# Get log type from query params
log_type = request.args.get("type", "app")
limit = int(request.args.get("limit", 100))
log_mapping = {
"app": "logs/app/app.log",
"auth": "logs/auth/auth.log",
"errors": "logs/errors/errors.log",
"jobs": "logs/jobs/jobs.log",
"printers": "logs/printers/printers.log",
"scheduler": "logs/scheduler/scheduler.log"
}
if log_type not in log_mapping:
return jsonify({"error": "Invalid log type"}), 400
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_mapping[log_type])
if not os.path.exists(log_path):
return jsonify({"logs": [], "success": True})
logs = []
with open(log_path, "r") as f:
for line in f.readlines()[-limit:]:
try:
# Parse log entry (format: [LEVEL] TIMESTAMP - MESSAGE)
parts = line.strip().split(" - ", 1)
if len(parts) == 2:
header, message = parts
level_timestamp = header.strip("[]").split("] [", 1)
if len(level_timestamp) == 2:
level, timestamp = level_timestamp
logs.append({
"level": level.strip(),
"timestamp": timestamp.strip(),
"message": message.strip(),
"source": log_type
})
except Exception:
# If parsing fails, add the raw line
logs.append({
"level": "INFO",
"timestamp": datetime.now().isoformat(),
"message": line.strip(),
"source": f"myp.{log_type}"
})
# Sort logs by timestamp in descending order
logs.sort(key=lambda x: x["timestamp"], reverse=True)
return jsonify({"logs": logs, "success": True})
except Exception as e:
app_logger.error(f"Error fetching logs: {str(e)}")
return jsonify({"error": "Failed to fetch logs"}), 500
@app.route("/api/activity/recent", methods=["GET"])
@login_required
def get_recent_activity():
"""Get recent system activity"""
try:
# Create mock activity data (to be replaced with real data in future)
activities = [
{
"description": "Neuer Druckauftrag erstellt: 'Motor_Halterung_v2'",
"timestamp": (datetime.now() - timedelta(minutes=15)).isoformat(),
"user": "admin@example.com",
"type": "job_created"
},
{
"description": "Drucker 'Prusa i3 MK3S' wurde neu konfiguriert",
"timestamp": (datetime.now() - timedelta(hours=2)).isoformat(),
"user": "admin@example.com",
"type": "printer_updated"
},
{
"description": "Druckauftrag 'Getriebe_Prototyp' abgeschlossen",
"timestamp": (datetime.now() - timedelta(hours=5)).isoformat(),
"user": "user@example.com",
"type": "job_completed"
},
{
"description": "Neuer Benutzer registriert: 'user@example.com'",
"timestamp": (datetime.now() - timedelta(days=1)).isoformat(),
"user": "admin@example.com",
"type": "user_created"
},
{
"description": "Systemwartung durchgeführt",
"timestamp": (datetime.now() - timedelta(days=2)).isoformat(),
"user": "admin@example.com",
"type": "system_maintenance"
}
]
# Get limit from query params
limit = int(request.args.get("limit", 5))
activities = activities[:limit]
return jsonify({"activities": activities})
except Exception as e:
app_logger.error(f"Error fetching recent activity: {str(e)}")
return jsonify({"error": "Failed to fetch recent activity"}), 500
# Service Worker Route
@app.route('/sw.js')
def service_worker():
"""Serve the service worker script with proper headers"""
response = app.send_static_file('js/sw.js')
# Wichtig: Korrekte MIME-Type setzen
response.headers['Content-Type'] = 'application/javascript'
# Wichtig: Cache-Control Header setzen, um häufige Updates zu ermöglichen
response.headers['Cache-Control'] = 'no-cache'
# Service-Worker-Allowed Header setzen, um Scope-Probleme zu beheben
response.headers['Service-Worker-Allowed'] = '/'
return response
# Fehlerbehandlung
@app.errorhandler(404)
def page_not_found(e):
return render_template("404.html"), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template("500.html"), 500
# CLI-Befehle für Tailwind CSS
@app.cli.group()
def tailwind():
"""Tailwind CSS Kommandos."""
pass
@tailwind.command("build")
def tailwind_build():
"""Tailwind CSS für die Produktion kompilieren."""
print("Tailwind CSS wird kompiliert...")
try:
subprocess.run(["npx", "tailwindcss", "-i", "./static/css/input.css",
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--minify"],
check=True)
print("Tailwind CSS erfolgreich kompiliert.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Kompilieren von Tailwind CSS: {e}")
raise
@tailwind.command("watch")
def tailwind_watch():
"""Tailwind CSS im Watch-Modus starten."""
print("Tailwind CSS Watch-Modus wird gestartet...")
try:
subprocess.Popen(["npx", "tailwindcss", "-i", "./static/css/input.css",
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--watch"])
print("Tailwind CSS Watch-Modus gestartet. CSS wird bei Änderungen automatisch aktualisiert.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Starten des Tailwind CSS Watch-Modus: {e}")
raise
# Auto-Kompilierung beim Serverstart im Debug-Modus
def compile_tailwind_if_debug():
"""Kompiliert Tailwind CSS im Debug-Modus, falls notwendig."""
if FLASK_DEBUG:
try:
app_logger.info("Kompiliere Tailwind CSS...")
# Prüfen, ob npx und Node.js verfügbar sind
import platform
import shutil
import subprocess
# Auf Windows nur fortfahren, wenn die CSS-Datei bereits existiert
# oder npx verfügbar ist
css_file_exists = os.path.exists("static/css/tailwind.min.css")
# Prüfen, ob npx verfügbar ist
npx_available = shutil.which("npx") is not None
if platform.system() == "Windows" and not npx_available and not css_file_exists:
app_logger.warning("npx nicht gefunden und keine CSS-Datei vorhanden. Tailwind CSS wird nicht kompiliert.")
return
# Tailwind CSS kompilieren
if npx_available:
subprocess.run([
"npx", "tailwindcss", "-i", "static/css/input.css",
"-o", "static/css/tailwind.min.css", "--minify"
], check=True)
app_logger.info("Tailwind CSS erfolgreich kompiliert.")
elif css_file_exists:
app_logger.info("Verwende existierende Tailwind CSS-Datei.")
else:
app_logger.warning("Tailwind konnte nicht kompiliert werden und keine CSS-Datei vorhanden.")
except subprocess.CalledProcessError as e:
app_logger.warning(f"Tailwind konnte nicht kompiliert werden. Möglicherweise ist npx/Node.js nicht installiert. Fehler: {e}")
except Exception as e:
app_logger.error(f"Fehler beim Kompilieren von Tailwind CSS: {str(e)}")
# Tailwind CSS kompilieren, wenn im Debug-Modus
if FLASK_DEBUG:
compile_tailwind_if_debug()
# Initialisierung der Datenbank beim Start
def init_app():
"""Initialisiert die App-Komponenten und startet den Scheduler."""
# Datenbank initialisieren
try:
init_database()
create_initial_admin()
except Exception as e:
app_logger.error(f"Fehler bei der Datenbank-Initialisierung: {str(e)}")
# Jinja2-Helfer registrieren
register_template_helpers(app)
# Tailwind im Debug-Modus kompilieren
compile_tailwind_if_debug()
# Scheduler starten, wenn aktiviert
if SCHEDULER_ENABLED:
try:
# Scheduler-Task für Druckauftrags-Prüfung registrieren
scheduler.register_task(
"check_jobs",
check_jobs,
interval=SCHEDULER_INTERVAL
)
# Scheduler starten
scheduler.start()
app_logger.info(f"Scheduler gestartet mit Intervall {SCHEDULER_INTERVAL} Sekunden.")
except Exception as e:
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
# SSL-Kontext protokollieren
ssl_context = get_ssl_context()
if ssl_context:
app_logger.info(f"SSL aktiviert mit Zertifikat {ssl_context[0]}")
else:
app_logger.warning("SSL ist deaktiviert. Die Verbindung ist unverschlüsselt!")
# Scheduler-Funktion zur Überprüfung der Druckaufträge
def check_jobs():
"""
Überprüft alle aktiven Druckaufträge und führt entsprechende Aktionen aus.
Diese Funktion wird vom Scheduler regelmäßig aufgerufen.
"""
app_logger.info("Überprüfe Druckaufträge...")
try:
db_session = get_db_session()
# Aktive Jobs abrufen
active_jobs = db_session.query(Job).filter(
Job.status.in_(["scheduled", "running"])
).all()
now = datetime.now()
for job in active_jobs:
# Prüfen, ob der Job gestartet werden soll
if job.status == "scheduled" and job.start_at <= now:
app_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}")
job.status = "running"
# Steckdose einschalten (implementieren Sie diese Funktion)
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, True)
# Prüfen, ob der Job beendet werden soll
elif job.status == "running" and job.end_at <= now:
app_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}")
job.status = "finished"
job.actual_end_time = now
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, False)
db_session.commit()
db_session.close()
except Exception as e:
app_logger.error(f"Fehler bei der Überprüfung von Druckaufträgen: {str(e)}")
if 'db_session' in locals():
db_session.close()
# App starten
if __name__ == "__main__":
import argparse
import threading
import ssl
import socket
import logging
# Kommandozeilenargumente parsen
parser = argparse.ArgumentParser(description='MYP Platform - 3D-Drucker Reservierungssystem')
parser.add_argument('--port', type=int, help='Port für den Server (überschreibt die Konfiguration)')
parser.add_argument('--no-ssl', action='store_true', help='Deaktiviert SSL/HTTPS')
parser.add_argument('--dual-protocol', action='store_true', help='Startet sowohl HTTP als auch HTTPS Server')
args = parser.parse_args()
# Initialisierung
init_app()
# Port aus Kommandozeilenargument verwenden, falls angegeben
port = args.port if args.port else FLASK_PORT
# SSL-Kontext abrufen
ssl_context = None
if SSL_ENABLED and not args.no_ssl:
try:
if SSL_CERT_PATH and SSL_KEY_PATH:
ssl_context = (SSL_CERT_PATH, SSL_KEY_PATH)
logging.info(f"SSL aktiviert mit Zertifikat: {SSL_CERT_PATH}")
else:
ssl_context = 'adhoc'
logging.info("SSL aktiviert mit selbstsigniertem Ad-hoc-Zertifikat")
except Exception as e:
logging.error(f"Fehler beim Laden des SSL-Kontexts: {e}")
ssl_context = None
# Dual-Protokoll-Modus: HTTP und HTTPS gleichzeitig
if args.dual_protocol:
# Funktion zum Starten des HTTP-Servers
def start_http_server():
try:
logging.info(f"Starte HTTP-Server auf Port 80...")
# Kopie der App erstellen
from werkzeug.serving import run_simple
run_simple('0.0.0.0', 80, app, threaded=True)
except socket.error as e:
logging.error(f"Konnte HTTP-Server nicht starten: {e}")
# Funktion zum Starten des HTTPS-Servers
def start_https_server():
try:
if ssl_context:
logging.info(f"Starte HTTPS-Server auf Port {port}...")
app.run(host='0.0.0.0', port=port, ssl_context=ssl_context, threaded=True)
else:
logging.warning("HTTPS deaktiviert aufgrund fehlender Zertifikate")
app.run(host='0.0.0.0', port=port, threaded=True)
except socket.error as e:
logging.error(f"Konnte HTTPS-Server nicht starten: {e}")
# Beide Server in separaten Threads starten
http_thread = threading.Thread(target=start_http_server)
https_thread = threading.Thread(target=start_https_server)
http_thread.daemon = True
https_thread.daemon = True
http_thread.start()
https_thread.start()
# Warten, bis beide Threads beendet sind (was sie normalerweise nicht sein sollten)
http_thread.join()
https_thread.join()
else:
# Normaler Modus - entweder HTTP oder HTTPS
if ssl_context:
logging.info(f"Starte HTTPS-Server auf Port {port}...")
app.run(host='0.0.0.0', port=port, ssl_context=ssl_context, threaded=True)
else:
logging.info(f"Starte HTTP-Server auf Port {port}...")
app.run(host='0.0.0.0', port=port, threaded=True)
# Content Security Policy anpassen
@app.after_request
def add_security_headers(response):
"""Fügt Sicherheitsheader zu allen Antworten hinzu"""
# Content Security Policy definieren
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' 'unsafe-eval'",
"script-src-elem 'self' 'unsafe-inline'",
"style-src 'self' 'unsafe-inline'",
"img-src 'self' data:",
"font-src 'self'",
"connect-src 'self'",
"worker-src 'self'", # Erlaubt Service Worker
"manifest-src 'self'"
]
# Setze CSP Header
response.headers['Content-Security-Policy'] = "; ".join(csp_directives)
# Weitere Sicherheitsheader
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
@app.route("/privacy")
def privacy_page():
"""Zeigt die Datenschutzseite an."""
return render_template("privacy.html")
@app.route("/terms")
def terms_page():
"""Zeigt die Nutzungsbedingungen an."""
return render_template("terms.html")
@app.route("/api/stats/export", methods=["GET"])
@login_required
def export_stats():
"""Exportiert Statistiken als JSON-Datei."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Statistiken exportieren"}), 403
try:
db_session = get_db_session()
# Grundlegende Statistiken sammeln
stats = db_session.query(Stats).first()
if not stats:
stats = Stats()
# Benutzerzahlen
user_count = db_session.query(User).count()
active_user_count = db_session.query(User).filter(User.active == True).count()
# Druckerzahlen
printer_count = db_session.query(Printer).count()
active_printer_count = db_session.query(Printer).filter(Printer.active == True).count()
# Jobstatistiken
total_jobs = db_session.query(Job).count()
completed_jobs = db_session.query(Job).filter(Job.status == "finished").count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
# Berechne durchschnittliche Druckzeit
jobs_with_duration = db_session.query(Job).filter(
Job.start_at != None,
Job.actual_end_time != None
).all()
total_print_time = 0
avg_print_time = 0
if jobs_with_duration:
for job in jobs_with_duration:
duration = (job.actual_end_time - job.start_at).total_seconds()
total_print_time += duration
avg_print_time = total_print_time / len(jobs_with_duration) if len(jobs_with_duration) > 0 else 0
# Füge zusätzliche Statistiken für jede Druckerart hinzu
printer_stats = []
printers = db_session.query(Printer).all()
for printer in printers:
printer_jobs = db_session.query(Job).filter(Job.printer_id == printer.id).count()
printer_success_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status == "finished"
).count()
success_rate = (printer_success_jobs / printer_jobs * 100) if printer_jobs > 0 else 0
printer_stats.append({
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"total_jobs": printer_jobs,
"success_rate": round(success_rate, 2),
"active": printer.active
})
# Export-Daten zusammenstellen
export_data = {
"generated_at": datetime.now().isoformat(),
"users": {
"total": user_count,
"active": active_user_count
},
"printers": {
"total": printer_count,
"active": active_printer_count,
"details": printer_stats
},
"jobs": {
"total": total_jobs,
"completed": completed_jobs,
"active": active_jobs,
"failed": failed_jobs,
"success_rate": round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0
},
"print_time": {
"total_seconds": total_print_time,
"total_hours": round(total_print_time / 3600, 2),
"average_seconds": avg_print_time,
"average_minutes": round(avg_print_time / 60, 2)
},
"system": {
"version": "3.0.0",
"uptime_days": get_system_uptime_days()
}
}
db_session.close()
# Als Datei zum Download anbieten
from flask import make_response
import json
response = make_response(json.dumps(export_data, indent=4))
response.headers["Content-Disposition"] = "attachment; filename=stats_export.json"
response.headers["Content-Type"] = "application/json"
return response
except Exception as e:
app_logger.error(f"Fehler beim Exportieren der Statistiken: {str(e)}")
return jsonify({"error": f"Fehler beim Exportieren: {str(e)}"}), 500
def get_system_uptime_days():
"""Gibt die Systemlaufzeit in Tagen zurück."""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
return round(uptime_seconds / 86400, 2) # Umrechnung in Tage
except Exception:
return 0
@app.route("/api/printers/add", methods=["POST"])
@login_required
def add_printer():
"""Fügt einen neuen Drucker hinzu."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker hinzufügen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["name", "mac_address", "plug_ip", "plug_username", "plug_password"]
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Das Feld '{field}' ist ein Pflichtfeld"}), 400
# Druckerdaten extrahieren
name = data["name"]
model = data.get("model", "")
location = data.get("location", "")
mac_address = data["mac_address"]
plug_ip = data["plug_ip"]
plug_username = data["plug_username"]
plug_password = data["plug_password"]
db_session = get_db_session()
# Prüfen, ob ein Drucker mit dieser MAC-Adresse bereits existiert
existing_printer = db_session.query(Printer).filter(Printer.mac_address == mac_address).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit dieser MAC-Adresse existiert bereits"}), 400
# Neuen Drucker erstellen
new_printer = Printer(
name=name,
model=model,
location=location,
mac_address=mac_address,
plug_ip=plug_ip,
plug_username=plug_username,
plug_password=plug_password,
status="offline",
active=True,
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
# Drucker-ID für die Antwort speichern
printer_id = new_printer.id
# Drucker-Objekt für die Antwort serialisieren
printer_dict = new_printer.to_dict()
db_session.close()
printers_logger.info(f"Neuer Drucker {name} (ID: {printer_id}) wurde von {current_user.username} hinzugefügt")
return jsonify({"success": True, "message": "Drucker erfolgreich hinzugefügt", "printer": printer_dict}), 201
except Exception as e:
printers_logger.error(f"Fehler beim Hinzufügen eines Druckers: {str(e)}")
return jsonify({"error": f"Fehler beim Hinzufügen des Druckers: {str(e)}"}), 500
@app.route("/my/jobs")
@login_required
def my_jobs():
"""Zeigt die persönlichen Jobs des angemeldeten Benutzers an."""
# Weiterleitung zur Jobs-Seite mit Filter für den aktuellen Benutzer
return redirect(url_for("jobs_page", user_filter=current_user.id))
@app.route("/api/user/export", methods=["GET"])
@login_required
def api_user_export_redirect():
"""Leitet den alten API-Pfad zum neuen Benutzer-Export weiter."""
return redirect(url_for("user.export_user_data"))
@app.route("/api/user/profile", methods=["PUT"])
@login_required
def api_user_profile_update_redirect():
"""Leitet den alten API-Pfad zum neuen Benutzer-Profil-Update weiter."""
return redirect(url_for("user.update_profile_api"))
@app.route("/user/update-settings", methods=["POST"])
@login_required
def user_update_settings_redirect():
"""Weiterleitung zur Blueprint-Route für Settings-Updates."""
return redirect(url_for("user.api_update_settings"))
# SSL-Verwaltungsrouten
@app.route("/api/ssl/info", methods=["GET"])
@login_required
def get_ssl_info():
"""Gibt Informationen über das aktuelle SSL-Zertifikat zurück."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Informationen abrufen"}), 403
try:
from utils.ssl_manager import ssl_manager
cert_info = ssl_manager.get_certificate_info()
if not cert_info:
return jsonify({
"exists": False,
"message": "Kein SSL-Zertifikat gefunden"
})
return jsonify({
"exists": True,
"certificate": cert_info,
"paths": {
"cert": ssl_manager.cert_path,
"key": ssl_manager.key_path
}
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der SSL-Informationen: {e}")
return jsonify({"error": f"Fehler beim Abrufen der SSL-Informationen: {str(e)}"}), 500
@app.route("/api/ssl/generate", methods=["POST"])
@login_required
def generate_ssl_certificate():
"""Generiert ein neues SSL-Zertifikat."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate generieren"}), 403
try:
from utils.ssl_manager import ssl_manager
# Parameter aus Request extrahieren
data = request.json or {}
key_size = data.get("key_size", 4096)
validity_days = data.get("validity_days", 365)
# Zertifikat generieren
success = ssl_manager.generate_mercedes_certificate(key_size, validity_days)
if success:
cert_info = ssl_manager.get_certificate_info()
app_logger.info(f"SSL-Zertifikat von {current_user.username} generiert")
return jsonify({
"success": True,
"message": "SSL-Zertifikat erfolgreich generiert",
"certificate": cert_info
})
else:
return jsonify({
"success": False,
"error": "Fehler beim Generieren des SSL-Zertifikats"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Generieren des SSL-Zertifikats: {e}")
return jsonify({"error": f"Fehler beim Generieren: {str(e)}"}), 500
@app.route("/api/ssl/install", methods=["POST"])
@login_required
def install_ssl_certificate():
"""Installiert das SSL-Zertifikat im System."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate installieren"}), 403
try:
from utils.ssl_manager import ssl_manager
success = ssl_manager.install_system_certificate()
if success:
app_logger.info(f"SSL-Zertifikat von {current_user.username} im System installiert")
return jsonify({
"success": True,
"message": "SSL-Zertifikat erfolgreich im System installiert"
})
else:
return jsonify({
"success": False,
"error": "Fehler bei der Installation des SSL-Zertifikats im System"
}), 500
except Exception as e:
app_logger.error(f"Fehler bei der SSL-Installation: {e}")
return jsonify({"error": f"Fehler bei der Installation: {str(e)}"}), 500
@app.route("/api/ssl/copy-raspberry", methods=["POST"])
@login_required
def copy_ssl_to_raspberry():
"""Kopiert das SSL-Zertifikat auf den Raspberry Pi."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate kopieren"}), 403
try:
from utils.ssl_manager import ssl_manager
# Parameter aus Request extrahieren
data = request.json or {}
host = data.get("host", "raspberrypi")
user = data.get("user", "user")
dest = data.get("dest", "/home/user/Projektarbeit-MYP/backend/app/certs")
success = ssl_manager.copy_to_raspberry(host, user, dest)
if success:
app_logger.info(f"SSL-Zertifikat von {current_user.username} auf Raspberry Pi kopiert")
return jsonify({
"success": True,
"message": f"SSL-Zertifikat erfolgreich auf {host} kopiert"
})
else:
return jsonify({
"success": False,
"error": "Fehler beim Kopieren des SSL-Zertifikats auf den Raspberry Pi"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Kopieren auf Raspberry Pi: {e}")
return jsonify({"error": f"Fehler beim Kopieren: {str(e)}"}), 500
@app.route("/api/ssl/validate", methods=["GET"])
@login_required
def validate_ssl_certificate():
"""Validiert das aktuelle SSL-Zertifikat."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate validieren"}), 403
try:
from utils.ssl_manager import ssl_manager
is_valid = ssl_manager.is_certificate_valid()
cert_info = ssl_manager.get_certificate_info()
return jsonify({
"valid": is_valid,
"certificate": cert_info,
"message": "Zertifikat ist gültig" if is_valid else "Zertifikat ist ungültig oder läuft bald ab"
})
except Exception as e:
app_logger.error(f"Fehler bei der SSL-Validierung: {e}")
return jsonify({"error": f"Fehler bei der Validierung: {str(e)}"}), 500
# Neue Admin-System-Management-Routen
@app.route("/api/admin/cache/clear", methods=["POST"])
@login_required
def clear_cache():
"""Leert den System-Cache."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
import shutil
import tempfile
# Flask-Cache leeren (falls vorhanden)
cache_dir = os.path.join(tempfile.gettempdir(), 'flask_cache')
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Python __pycache__ leeren
for root, dirs, files in os.walk('.'):
for dir_name in dirs:
if dir_name == '__pycache__':
pycache_path = os.path.join(root, dir_name)
shutil.rmtree(pycache_path)
app_logger.info(f"Cache wurde von Admin {current_user.username} geleert")
return jsonify({"success": True, "message": "Cache erfolgreich geleert"})
except Exception as e:
app_logger.error(f"Fehler beim Leeren des Cache: {str(e)}")
return jsonify({"error": f"Fehler beim Leeren des Cache: {str(e)}"}), 500
@app.route("/api/admin/database/optimize", methods=["POST"])
@login_required
def optimize_database():
"""Optimiert die Datenbank."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
db_session = get_db_session()
# VACUUM und ANALYZE für SQLite
db_session.execute(sqlalchemy.text("VACUUM"))
db_session.execute(sqlalchemy.text("ANALYZE"))
db_session.commit()
# Alte abgeschlossene Jobs löschen (älter als 30 Tage)
thirty_days_ago = datetime.now() - timedelta(days=30)
old_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"]),
Job.created_at < thirty_days_ago
).count()
db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"]),
Job.created_at < thirty_days_ago
).delete()
db_session.commit()
db_session.close()
app_logger.info(f"Datenbank wurde von Admin {current_user.username} optimiert. {old_jobs} alte Jobs entfernt.")
return jsonify({
"success": True,
"message": f"Datenbank optimiert. {old_jobs} alte Jobs entfernt."
})
except Exception as e:
app_logger.error(f"Fehler bei der Datenbankoptimierung: {str(e)}")
return jsonify({"error": f"Fehler bei der Datenbankoptimierung: {str(e)}"}), 500
@app.route("/api/admin/backup/create", methods=["POST"])
@login_required
def create_backup():
"""Erstellt ein System-Backup."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
import shutil
from datetime import datetime
backup_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'backups')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}"
backup_path = os.path.join(backup_dir, backup_name)
# Datenbank-Backup
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
if os.path.exists(db_path):
shutil.copy2(db_path, os.path.join(backup_path, 'database.db'))
# Konfigurationsdateien
config_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config')
if os.path.exists(config_dir):
shutil.copytree(config_dir, os.path.join(backup_path, 'config'))
# Uploads-Verzeichnis
uploads_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads')
if os.path.exists(uploads_dir):
shutil.copytree(uploads_dir, os.path.join(backup_path, 'uploads'))
# Backup komprimieren
shutil.make_archive(backup_path, 'zip', backup_path)
shutil.rmtree(backup_path) # Temporäres Verzeichnis löschen
app_logger.info(f"Backup wurde von Admin {current_user.username} erstellt: {backup_name}.zip")
return jsonify({
"success": True,
"message": f"Backup erfolgreich erstellt: {backup_name}.zip"
})
except Exception as e:
app_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
return jsonify({"error": f"Fehler beim Erstellen des Backups: {str(e)}"}), 500
@app.route("/api/admin/printers/update", methods=["POST"])
@login_required
def update_printers():
"""Aktualisiert alle Drucker-Verbindungen."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
db_session = get_db_session()
printers = db_session.query(Printer).all()
updated_count = 0
error_count = 0
for printer in printers:
try:
# Drucker-Status prüfen
import requests
import socket
# Ping-Test
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((printer.ip_address, 80))
sock.close()
if result == 0:
printer.status = "online"
printer.last_seen = datetime.now()
updated_count += 1
else:
printer.status = "offline"
error_count += 1
except Exception as e:
printer.status = "error"
error_count += 1
printers_logger.error(f"Fehler beim Aktualisieren von Drucker {printer.name}: {str(e)}")
db_session.commit()
db_session.close()
app_logger.info(f"Drucker wurden von Admin {current_user.username} aktualisiert. {updated_count} online, {error_count} Fehler.")
return jsonify({
"success": True,
"message": f"Drucker aktualisiert: {updated_count} online, {error_count} offline/Fehler"
})
except Exception as e:
app_logger.error(f"Fehler beim Aktualisieren der Drucker: {str(e)}")
return jsonify({"error": f"Fehler beim Aktualisieren der Drucker: {str(e)}"}), 500
@app.route("/api/admin/system/restart", methods=["POST"])
@login_required
def restart_system():
"""Startet das System neu."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
app_logger.warning(f"System-Neustart wurde von Admin {current_user.username} initiiert")
# Graceful shutdown
def shutdown_server():
import time
time.sleep(2) # Kurz warten, damit die Response gesendet wird
os._exit(0)
# Shutdown in separatem Thread
import threading
shutdown_thread = threading.Thread(target=shutdown_server)
shutdown_thread.start()
return jsonify({
"success": True,
"message": "System wird neugestartet..."
})
except Exception as e:
app_logger.error(f"Fehler beim Neustart des Systems: {str(e)}")
return jsonify({"error": f"Fehler beim Neustart des Systems: {str(e)}"}), 500
@app.route("/api/admin/system/status", methods=["GET"])
@login_required
def get_system_status():
"""Gibt den aktuellen Systemstatus zurück."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
import psutil
import sqlite3
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds // 86400)
uptime_hours = int((uptime_seconds % 86400) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
# Datenbank-Status
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
db_size = 0
db_connections = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Scheduler-Status
scheduler_running = False
try:
from utils.job_scheduler import scheduler
scheduler_running = scheduler.running
except:
pass
# Nächster Job
db_session = get_db_session()
next_job = db_session.query(Job).filter(
Job.status == "scheduled"
).order_by(Job.created_at.asc()).first()
next_job_time = "Keine geplanten Jobs"
if next_job:
next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M")
db_session.close()
return jsonify({
"cpu_usage": round(cpu_percent, 1),
"memory_usage": round(memory.percent, 1),
"disk_usage": round((disk.used / disk.total) * 100, 1),
"uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m",
"db_size": f"{db_size:.1f} MB",
"db_connections": db_connections,
"scheduler_running": scheduler_running,
"next_job": next_job_time
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Systemstatus: {str(e)}")
return jsonify({"error": f"Fehler beim Abrufen des Systemstatus: {str(e)}"}), 500
@app.route("/api/admin/database/status", methods=["GET"])
@login_required
def get_database_status():
"""Gibt den Datenbankstatus zurück."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
db_session = get_db_session()
# Verbindungstest
db_session.execute(sqlalchemy.text("SELECT 1"))
# Datenbankgröße
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
db_size = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Tabellenstatistiken
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
job_count = db_session.query(Job).count()
db_session.close()
return jsonify({
"connected": True,
"size": f"{db_size:.1f} MB",
"tables": {
"users": user_count,
"printers": printer_count,
"jobs": job_count
}
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Datenbankstatus: {str(e)}")
return jsonify({
"connected": False,
"error": str(e)
}), 500