1707 lines
62 KiB
Python

import os
import threading
import time
import json
import secrets
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Union
from functools import wraps
from flask import Flask, request, jsonify, session, render_template, redirect, url_for, flash, Response
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
import sqlalchemy.exc
import sqlalchemy
from PyP100 import PyP110
from flask_wtf.csrf import CSRFProtect
from config.settings import (
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context, FLASK_FALLBACK_PORT
)
from utils.logging_config import setup_logging, get_logger, log_startup_info
from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin
from utils.job_scheduler import scheduler
from utils.template_helpers import register_template_helpers
from blueprints.auth import auth_bp
from blueprints.user import user_bp
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
login_manager.login_message_category = "info"
@login_manager.user_loader
def load_user(user_id):
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
db_session.close()
return user
# Jinja2 Context Processors
@app.context_processor
def inject_now():
"""Inject the current datetime into templates."""
return {'now': datetime.now()}
# Custom Jinja2 filter für Datumsformatierung
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Format a datetime object to a German-style date and time string"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return value
return value.strftime(format)
# Blueprints registrieren
try:
from blueprints.kiosk_control import kiosk_bp
app.register_blueprint(kiosk_bp)
print("Kiosk-Kontrolle erfolgreich geladen")
except ImportError:
print("Kiosk-Kontrolle nicht verfügbar (nur im Kiosk-Modus)")
# Auth-Blueprint registrieren
app.register_blueprint(auth_bp)
print("Auth-Blueprint erfolgreich geladen")
# User-Blueprint registrieren
app.register_blueprint(user_bp)
print("User-Blueprint erfolgreich geladen")
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
auth_logger = get_logger("auth")
jobs_logger = get_logger("jobs")
printers_logger = get_logger("printers")
# Custom decorator für Job-Besitzer-Check
def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
is_admin = current_user.is_admin
if not (is_owner or is_admin):
db_session.close()
return jsonify({"error": "Keine Berechtigung"}), 403
db_session.close()
return f(job_id, *args, **kwargs)
return decorated_function
# UI-Routen
@app.route("/")
def index():
if current_user.is_authenticated:
return render_template("index.html")
return redirect(url_for("auth.login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template("dashboard.html")
@app.route("/profile")
@login_required
def profile_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
return redirect(url_for("user.profile"))
@app.route("/profil")
@login_required
def profil_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user.profile"))
@app.route("/settings")
@login_required
def settings_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
return redirect(url_for("user.settings"))
@app.route("/einstellungen")
@login_required
def einstellungen_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user.settings"))
@app.route("/admin")
@login_required
def admin():
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return redirect(url_for("admin_page"))
@app.route("/demo")
@login_required
def components_demo():
"""Demo-Seite für UI-Komponenten"""
return render_template("components_demo.html")
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistik-Seite an."""
return render_template("stats.html")
@app.route("/admin-dashboard")
@login_required
def admin_page():
"""Zeigt die Administrationsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
active_tab = request.args.get('tab', 'users')
# Daten für das Admin-Panel direkt beim Laden vorbereiten
stats = {}
users = []
printers = []
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
system_info = {"cpu": 0, "memory": 0, "disk": 0}
logs = []
db_session = get_db_session()
try:
# Statistiken laden
from sqlalchemy.orm import joinedload
# Benutzeranzahl
stats["total_users"] = db_session.query(User).count()
# Druckeranzahl
stats["total_printers"] = db_session.query(Printer).count()
# Aktive Jobs
stats["active_jobs"] = db_session.query(Job).filter(
Job.status.in_(["scheduled", "running"])
).count()
# Erfolgsrate
total_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"])
).count()
successful_jobs = db_session.query(Job).filter(
Job.status == "completed"
).count()
if total_jobs > 0:
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
else:
stats["success_rate"] = 0
# Benutzer laden
if active_tab == 'users':
users = db_session.query(User).all()
users = [user.to_dict() for user in users]
# Drucker laden
if active_tab == 'printers':
printers = db_session.query(Printer).all()
printers = [printer.to_dict() for printer in printers]
# Scheduler-Status laden
if active_tab == 'scheduler':
from utils.scheduler import scheduler_is_running
scheduler_status = {
"running": scheduler_is_running(),
"message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt"
}
# System-Informationen laden
if active_tab == 'system':
import psutil
system_info = {
"cpu": psutil.cpu_percent(),
"memory": psutil.virtual_memory().percent,
"disk": psutil.disk_usage('/').percent,
"uptime": get_system_uptime_days()
}
# Logs laden
if active_tab == 'logs':
import os
log_level = request.args.get('log_level', 'all')
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
# Logeinträge sammeln
app_logs = []
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
log_file = os.path.join(log_dir, category, f'{category}.log')
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
if log_level != 'all':
if log_level.upper() not in line:
continue
app_logs.append({
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
'category': category,
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
})
# Nach Zeitstempel sortieren (neueste zuerst)
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
finally:
db_session.close()
return render_template(
"admin.html",
active_tab=active_tab,
stats=stats,
users=users,
printers=printers,
scheduler_status=scheduler_status,
system_info=system_info,
logs=logs
)
# Direkter Zugriff auf Logout-Route (für Fallback)
@app.route("/logout", methods=["GET", "POST"])
def logout_redirect():
"""Leitet zur Blueprint-Logout-Route weiter."""
return redirect(url_for("auth.logout"))
# Job-Routen
@app.route("/api/jobs", methods=["GET"])
@login_required
def get_jobs():
db_session = get_db_session()
try:
# Import joinedload for eager loading
from sqlalchemy.orm import joinedload
# Admin sieht alle Jobs, User nur eigene
if current_user.is_admin:
# Eagerly load the user and printer relationships to avoid detached instance errors
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
else:
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
# Convert jobs to dictionaries before closing the session
job_dicts = [job.to_dict() for job in jobs]
db_session.close()
return jsonify({
"jobs": job_dicts
})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job(job_id):
db_session = get_db_session()
try:
from sqlalchemy.orm import joinedload
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs/active', methods=['GET'])
@login_required
def get_active_jobs():
"""
Gibt alle aktiven Jobs zurück.
"""
try:
db_session = get_db_session()
from sqlalchemy.orm import joinedload
active_jobs = db_session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(
Job.status.in_(["scheduled", "running"])
).all()
result = []
for job in active_jobs:
job_dict = job.to_dict()
# Aktuelle Restzeit berechnen
if job.status == "running" and job.end_at:
remaining_time = job.end_at - datetime.now()
if remaining_time.total_seconds() > 0:
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
else:
job_dict["remaining_minutes"] = 0
result.append(job_dict)
db_session.close()
return jsonify({"jobs": result})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs', methods=['POST'])
@login_required
def create_job():
"""
Erstellt einen neuen Job mit dem Status "scheduled".
Body: {
"printer_id": int,
"start_iso": str, # ISO-Datum-String
"duration_minutes": int
}
"""
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["printer_id", "start_iso", "duration_minutes"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren und validieren
printer_id = int(data["printer_id"])
start_iso = data["start_iso"]
duration_minutes = int(data["duration_minutes"])
# Optional: Jobtitel und Dateipfad
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
file_path = data.get("file_path")
# Start-Zeit parsen
try:
start_at = datetime.fromisoformat(start_iso)
except ValueError:
return jsonify({"error": "Ungültiges Startdatum"}), 400
# Dauer validieren
if duration_minutes <= 0:
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
# End-Zeit berechnen
end_at = start_at + timedelta(minutes=duration_minutes)
db_session = get_db_session()
# Prüfen, ob der Drucker existiert
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Neuen Job erstellen
new_job = Job(
name=name,
printer_id=printer_id,
user_id=current_user.id,
owner_id=current_user.id,
start_at=start_at,
end_at=end_at,
status="scheduled",
file_path=file_path,
duration_minutes=duration_minutes
)
db_session.add(new_job)
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = new_job.to_dict()
db_session.close()
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
return jsonify({"job": job_dict}), 201
except Exception as e:
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
@login_required
@job_owner_required
def extend_job(job_id):
"""
Verlängert die Endzeit eines Jobs.
Body: {
"extra_minutes": int
}
"""
try:
data = request.json
# Prüfen, ob die erforderlichen Daten vorhanden sind
if "extra_minutes" not in data:
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
extra_minutes = int(data["extra_minutes"])
# Validieren
if extra_minutes <= 0:
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job verlängert werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
# Endzeit aktualisieren
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
job.duration_minutes += extra_minutes
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
@login_required
def finish_job(job_id):
"""
Beendet einen Job manuell und schaltet die Steckdose aus.
Nur für Administratoren erlaubt.
"""
try:
# Prüfen, ob der Benutzer Administrator ist
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
db_session = get_db_session()
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job beendet werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
if not toggle_plug(job.printer_id, False):
# Trotzdem weitermachen, aber Warnung loggen
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = datetime.now()
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
db_session = get_db_session()
try:
printers = db_session.query(Printer).all()
printer_list = [printer.to_dict() for printer in printers]
db_session.close()
return jsonify({
"printers": printer_list
})
except Exception as e:
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
# API-Routen für Statistiken
@app.route("/api/stats/users", methods=["GET"])
@login_required
def get_stats_users():
"""Gibt die Anzahl der Benutzer zurück."""
db_session = get_db_session()
try:
user_count = db_session.query(User).count()
db_session.close()
return jsonify({"value": user_count})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/uptime", methods=["GET"])
@login_required
def get_stats_uptime():
"""Gibt die Systemlaufzeit zurück."""
import os
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
uptime_days = int(uptime_seconds / 86400)
return jsonify({"value": f"{uptime_days} Tage"})
@app.route("/api/stats/active-jobs", methods=["GET"])
@login_required
def get_stats_active_jobs():
"""Gibt die Anzahl der aktiven Jobs zurück."""
db_session = get_db_session()
try:
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
db_session.close()
return jsonify({"value": active_jobs})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/available-printers", methods=["GET"])
@login_required
def get_stats_available_printers():
"""Gibt die Anzahl der verfügbaren Drucker zurück."""
db_session = get_db_session()
try:
available_printers = db_session.query(Printer).filter(Printer.active == True).count()
db_session.close()
return jsonify({"value": available_printers})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/success-rate", methods=["GET"])
@login_required
def get_stats_success_rate():
"""Gibt die Erfolgsrate der Druckaufträge zurück."""
db_session = get_db_session()
try:
total_jobs = db_session.query(Job).filter(Job.status == "finished").count()
if total_jobs == 0:
success_rate = 0
else:
success_jobs = db_session.query(Job).filter(
Job.status == "finished",
Job.actual_end_time != None
).count()
success_rate = int((success_jobs / total_jobs) * 100)
db_session.close()
return jsonify({"value": f"{success_rate}%"})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/print-time", methods=["GET"])
@login_required
def get_stats_print_time():
"""Gibt die gesamte Druckzeit zurück."""
db_session = get_db_session()
try:
stats = db_session.query(Stats).first()
if stats and stats.total_print_time:
hours = stats.total_print_time // 3600
db_session.close()
return jsonify({"value": f"{hours}h"})
db_session.close()
return jsonify({"value": "0h"})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/activity", methods=["GET"])
@login_required
def get_activity():
"""Gibt die letzten Aktivitäten zurück."""
db_session = get_db_session()
try:
recent_jobs = db_session.query(Job).order_by(Job.created_at.desc()).limit(5).all()
activities = [
{
"type": "job",
"id": job.id,
"title": job.name,
"status": job.status,
"timestamp": job.created_at.isoformat() if job.created_at else None
}
for job in recent_jobs
]
db_session.close()
return jsonify(activities)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/printers/status", methods=["GET"])
@login_required
def get_printers_status():
"""Gibt den Status aller Drucker zurück."""
db_session = get_db_session()
try:
printers = db_session.query(Printer).all()
status_data = [
{
"id": printer.id,
"name": printer.name,
"status": printer.status,
"active": printer.active
}
for printer in printers
]
db_session.close()
return jsonify(status_data)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def get_current_job():
"""Gibt den aktuellen Job des Benutzers zurück."""
db_session = get_db_session()
try:
current_job = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status.in_(["scheduled", "running"])
).order_by(Job.start_at).first()
if current_job:
job_data = current_job.to_dict()
else:
job_data = None
db_session.close()
return jsonify(job_data)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
# Admin API Endpoints
@app.route("/api/users", methods=["GET"])
def get_users():
"""Returns a list of all users (admin only)"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
db_session = get_db_session()
try:
users = db_session.query(User).all()
users_list = []
for user in users:
users_list.append({
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None
})
db_session.close()
return jsonify({"users": users_list})
except Exception as e:
db_session.close()
app_logger.error(f"Error fetching users: {str(e)}")
return jsonify({"error": "Failed to fetch users"}), 500
@app.route("/api/users", methods=["POST"])
@login_required
def create_user():
"""Create a new user (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um neue Benutzer anzulegen.", "error")
return redirect(url_for('admin_page', tab='users'))
db_session = get_db_session()
try:
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
email = request.form.get('email')
name = request.form.get('name')
password = request.form.get('password')
role = request.form.get('role', 'user')
if not email or not password:
db_session.close()
flash("E-Mail und Passwort sind Pflichtfelder.", "error")
return redirect(url_for('admin_page', tab='users'))
# Check if user with same email already exists
existing_user = db_session.query(User).filter(
User.email == email
).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
return redirect(url_for('admin_page', tab='users'))
# Create new user
new_user = User(
email=email,
name=name if name else "",
username=email.split("@")[0], # Default username from email
role=role,
active=True,
created_at=datetime.now()
)
# Set password
new_user.set_password(password)
db_session.add(new_user)
db_session.commit()
user_id = new_user.id
db_session.close()
app_logger.info(f"New user created: {new_user.email} (ID: {user_id})")
flash(f"Benutzer {email} wurde erfolgreich angelegt.", "success")
return redirect(url_for('admin_page', tab='users'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error creating user: {str(e)}")
flash(f"Fehler beim Anlegen des Benutzers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='users'))
@app.route("/api/users/<int:user_id>", methods=["DELETE"])
@login_required
def delete_user(user_id):
"""Delete a user (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um Benutzer zu löschen.", "error")
return redirect(url_for('admin_page', tab='users'))
# Prevent admin from deleting themselves
if user_id == current_user.id:
flash("Sie können Ihren eigenen Account nicht löschen.", "error")
return redirect(url_for('admin_page', tab='users'))
db_session = get_db_session()
try:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for('admin_page', tab='users'))
# Prevent deletion of admin users
if user.role == "admin":
db_session.close()
flash("Administratoren können nicht gelöscht werden.", "error")
return redirect(url_for('admin_page', tab='users'))
email = user.email # Save for later logging
db_session.delete(user)
db_session.commit()
db_session.close()
app_logger.info(f"User deleted: {email} (ID: {user_id})")
flash(f"Benutzer {email} wurde erfolgreich gelöscht.", "success")
return redirect(url_for('admin_page', tab='users'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error deleting user: {str(e)}")
flash(f"Fehler beim Löschen des Benutzers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='users'))
@app.route("/api/printers", methods=["POST"])
@login_required
def create_printer():
"""Create a new printer (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um neue Drucker anzulegen.", "error")
return redirect(url_for('admin_page', tab='printers'))
db_session = get_db_session()
try:
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
name = request.form.get('name')
model = request.form.get('model')
location = request.form.get('location')
mac_address = request.form.get('mac_address')
plug_ip = request.form.get('plug_ip')
plug_username = request.form.get('plug_username')
plug_password = request.form.get('plug_password')
# Check required fields
if not name or not mac_address:
db_session.close()
flash("Name und MAC-Adresse sind Pflichtfelder.", "error")
return redirect(url_for('admin_page', tab='printers'))
# Check if printer with same MAC already exists
existing_printer = db_session.query(Printer).filter(
Printer.mac_address == mac_address
).first()
if existing_printer:
db_session.close()
flash("Ein Drucker mit dieser MAC-Adresse existiert bereits.", "error")
return redirect(url_for('admin_page', tab='printers'))
# Create new printer
new_printer = Printer(
name=name,
model=model or "",
location=location or "",
mac_address=mac_address,
plug_ip=plug_ip or "",
plug_username=plug_username or "",
plug_password=plug_password or "",
active=True
)
db_session.add(new_printer)
db_session.commit()
printer_id = new_printer.id
db_session.close()
app_logger.info(f"New printer created: {new_printer.name} (ID: {printer_id})")
flash(f"Drucker {name} wurde erfolgreich angelegt.", "success")
return redirect(url_for('admin_page', tab='printers'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error creating printer: {str(e)}")
flash(f"Fehler beim Anlegen des Druckers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='printers'))
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
@login_required
def delete_printer(printer_id):
"""Delete a printer (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um Drucker zu löschen.", "error")
return redirect(url_for('admin_page', tab='printers'))
db_session = get_db_session()
try:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
flash("Drucker nicht gefunden.", "error")
return redirect(url_for('admin_page', tab='printers'))
printer_name = printer.name # Save for later logging
db_session.delete(printer)
db_session.commit()
db_session.close()
app_logger.info(f"Printer deleted: {printer_name} (ID: {printer_id})")
flash(f"Drucker {printer_name} wurde erfolgreich gelöscht.", "success")
return redirect(url_for('admin_page', tab='printers'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error deleting printer: {str(e)}")
flash(f"Fehler beim Löschen des Druckers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='printers'))
@app.route("/api/stats", methods=["GET"])
@login_required
def get_stats():
"""Get overall system statistics"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
db_session = get_db_session()
try:
# Get basic stats
stats = db_session.query(Stats).first()
if not stats:
# Create initial stats if none exist
stats = Stats()
db_session.add(stats)
db_session.commit()
# Count users, printers, active jobs
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
result = {
"total_users": user_count,
"total_printers": printer_count,
"active_jobs": active_jobs,
"total_print_time_hours": round(stats.total_print_time / 3600, 1) if stats.total_print_time else 0,
"total_jobs_completed": stats.total_jobs_completed or 0,
"total_material_used": stats.total_material_used or 0,
"last_updated": stats.last_updated.isoformat() if stats.last_updated else None
}
db_session.close()
return jsonify(result)
except Exception as e:
db_session.close()
app_logger.error(f"Error fetching stats: {str(e)}")
return jsonify({"error": "Failed to fetch statistics"}), 500
@app.route("/api/scheduler/status", methods=["GET"])
@login_required
def get_scheduler_status():
"""Get the current status of the job scheduler"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
try:
is_running = scheduler.is_running()
tasks = []
# Add information about scheduler tasks
for task_id, task in scheduler.get_tasks().items():
tasks.append({
"id": task_id,
"interval": task.get("interval", 0),
"last_run": task.get("last_run"),
"enabled": task.get("enabled", False)
})
return jsonify({
"running": is_running,
"tasks": tasks,
"uptime": scheduler.get_uptime()
})
except Exception as e:
app_logger.error(f"Error fetching scheduler status: {str(e)}")
return jsonify({"error": "Failed to fetch scheduler status"}), 500
@app.route("/api/scheduler/start", methods=["POST"])
@login_required
def start_scheduler():
"""Start the job scheduler (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um den Scheduler zu starten.", "error")
return redirect(url_for('admin_page', tab='scheduler'))
try:
from utils.scheduler import start_scheduler as start_scheduler_func
result = start_scheduler_func()
if result:
app_logger.info(f"Scheduler started by admin user: {current_user.email}")
flash("Der Scheduler wurde erfolgreich gestartet.", "success")
else:
flash("Der Scheduler konnte nicht gestartet werden oder läuft bereits.", "warning")
return redirect(url_for('admin_page', tab='scheduler'))
except Exception as e:
app_logger.error(f"Error starting scheduler: {str(e)}")
flash(f"Fehler beim Starten des Schedulers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='scheduler'))
@app.route("/api/scheduler/stop", methods=["POST"])
@login_required
def stop_scheduler():
"""Stop the job scheduler (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um den Scheduler zu stoppen.", "error")
return redirect(url_for('admin_page', tab='scheduler'))
try:
from utils.scheduler import stop_scheduler as stop_scheduler_func
result = stop_scheduler_func()
if result:
app_logger.info(f"Scheduler stopped by admin user: {current_user.email}")
flash("Der Scheduler wurde erfolgreich gestoppt.", "success")
else:
flash("Der Scheduler konnte nicht gestoppt werden oder läuft nicht.", "warning")
return redirect(url_for('admin_page', tab='scheduler'))
except Exception as e:
app_logger.error(f"Error stopping scheduler: {str(e)}")
flash(f"Fehler beim Stoppen des Schedulers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='scheduler'))
@app.route("/api/logs", methods=["GET"])
@login_required
def get_logs():
"""Get system logs (admin only)"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
try:
# Get log type from query params
log_type = request.args.get("type", "app")
limit = int(request.args.get("limit", 100))
log_mapping = {
"app": "logs/app/app.log",
"auth": "logs/auth/auth.log",
"errors": "logs/errors/errors.log",
"jobs": "logs/jobs/jobs.log",
"printers": "logs/printers/printers.log",
"scheduler": "logs/scheduler/scheduler.log"
}
if log_type not in log_mapping:
return jsonify({"error": "Invalid log type"}), 400
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_mapping[log_type])
if not os.path.exists(log_path):
return jsonify({"logs": [], "success": True})
logs = []
with open(log_path, "r") as f:
for line in f.readlines()[-limit:]:
try:
# Parse log entry (format: [LEVEL] TIMESTAMP - MESSAGE)
parts = line.strip().split(" - ", 1)
if len(parts) == 2:
header, message = parts
level_timestamp = header.strip("[]").split("] [", 1)
if len(level_timestamp) == 2:
level, timestamp = level_timestamp
logs.append({
"level": level.strip(),
"timestamp": timestamp.strip(),
"message": message.strip(),
"source": log_type
})
except Exception:
# If parsing fails, add the raw line
logs.append({
"level": "INFO",
"timestamp": datetime.now().isoformat(),
"message": line.strip(),
"source": f"myp.{log_type}"
})
# Sort logs by timestamp in descending order
logs.sort(key=lambda x: x["timestamp"], reverse=True)
return jsonify({"logs": logs, "success": True})
except Exception as e:
app_logger.error(f"Error fetching logs: {str(e)}")
return jsonify({"error": "Failed to fetch logs"}), 500
@app.route("/api/activity/recent", methods=["GET"])
@login_required
def get_recent_activity():
"""Get recent system activity"""
try:
# Create mock activity data (to be replaced with real data in future)
activities = [
{
"description": "Neuer Druckauftrag erstellt: 'Motor_Halterung_v2'",
"timestamp": (datetime.now() - timedelta(minutes=15)).isoformat(),
"user": "admin@example.com",
"type": "job_created"
},
{
"description": "Drucker 'Prusa i3 MK3S' wurde neu konfiguriert",
"timestamp": (datetime.now() - timedelta(hours=2)).isoformat(),
"user": "admin@example.com",
"type": "printer_updated"
},
{
"description": "Druckauftrag 'Getriebe_Prototyp' abgeschlossen",
"timestamp": (datetime.now() - timedelta(hours=5)).isoformat(),
"user": "user@example.com",
"type": "job_completed"
},
{
"description": "Neuer Benutzer registriert: 'user@example.com'",
"timestamp": (datetime.now() - timedelta(days=1)).isoformat(),
"user": "admin@example.com",
"type": "user_created"
},
{
"description": "Systemwartung durchgeführt",
"timestamp": (datetime.now() - timedelta(days=2)).isoformat(),
"user": "admin@example.com",
"type": "system_maintenance"
}
]
# Get limit from query params
limit = int(request.args.get("limit", 5))
activities = activities[:limit]
return jsonify({"activities": activities})
except Exception as e:
app_logger.error(f"Error fetching recent activity: {str(e)}")
return jsonify({"error": "Failed to fetch recent activity"}), 500
# Service Worker Route
@app.route('/sw.js')
def service_worker():
"""Serve the service worker script with proper headers"""
response = app.send_static_file('js/sw.js')
# Wichtig: Korrekte MIME-Type setzen
response.headers['Content-Type'] = 'application/javascript'
# Wichtig: Cache-Control Header setzen, um häufige Updates zu ermöglichen
response.headers['Cache-Control'] = 'no-cache'
# Service-Worker-Allowed Header setzen, um Scope-Probleme zu beheben
response.headers['Service-Worker-Allowed'] = '/'
return response
# Fehlerbehandlung
@app.errorhandler(404)
def page_not_found(e):
return render_template("404.html"), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template("500.html"), 500
# CLI-Befehle für Tailwind CSS
@app.cli.group()
def tailwind():
"""Tailwind CSS Kommandos."""
pass
@tailwind.command("build")
def tailwind_build():
"""Tailwind CSS für die Produktion kompilieren."""
print("Tailwind CSS wird kompiliert...")
try:
subprocess.run(["npx", "tailwindcss", "-i", "./static/css/input.css",
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--minify"],
check=True)
print("Tailwind CSS erfolgreich kompiliert.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Kompilieren von Tailwind CSS: {e}")
raise
@tailwind.command("watch")
def tailwind_watch():
"""Tailwind CSS im Watch-Modus starten."""
print("Tailwind CSS Watch-Modus wird gestartet...")
try:
subprocess.Popen(["npx", "tailwindcss", "-i", "./static/css/input.css",
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--watch"])
print("Tailwind CSS Watch-Modus gestartet. CSS wird bei Änderungen automatisch aktualisiert.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Starten des Tailwind CSS Watch-Modus: {e}")
raise
# Auto-Kompilierung beim Serverstart im Debug-Modus
def compile_tailwind_if_debug():
"""Kompiliert Tailwind CSS im Debug-Modus, falls notwendig."""
if FLASK_DEBUG:
try:
app_logger.info("Kompiliere Tailwind CSS...")
# Prüfen, ob npx und Node.js verfügbar sind
import platform
import shutil
import subprocess
# Auf Windows nur fortfahren, wenn die CSS-Datei bereits existiert
# oder npx verfügbar ist
css_file_exists = os.path.exists("static/css/tailwind.min.css")
# Prüfen, ob npx verfügbar ist
npx_available = shutil.which("npx") is not None
if platform.system() == "Windows" and not npx_available and not css_file_exists:
app_logger.warning("npx nicht gefunden und keine CSS-Datei vorhanden. Tailwind CSS wird nicht kompiliert.")
return
# Tailwind CSS kompilieren
if npx_available:
subprocess.run([
"npx", "tailwindcss", "-i", "static/css/input.css",
"-o", "static/css/tailwind.min.css", "--minify"
], check=True)
app_logger.info("Tailwind CSS erfolgreich kompiliert.")
elif css_file_exists:
app_logger.info("Verwende existierende Tailwind CSS-Datei.")
else:
app_logger.warning("Tailwind konnte nicht kompiliert werden und keine CSS-Datei vorhanden.")
except subprocess.CalledProcessError as e:
app_logger.warning(f"Tailwind konnte nicht kompiliert werden. Möglicherweise ist npx/Node.js nicht installiert. Fehler: {e}")
except Exception as e:
app_logger.error(f"Fehler beim Kompilieren von Tailwind CSS: {str(e)}")
# Tailwind CSS kompilieren, wenn im Debug-Modus
if FLASK_DEBUG:
compile_tailwind_if_debug()
# Initialisierung der Datenbank beim Start
def init_app():
"""Initialisiert die App-Komponenten und startet den Scheduler."""
# Datenbank initialisieren
try:
init_database()
create_initial_admin()
except Exception as e:
app_logger.error(f"Fehler bei der Datenbank-Initialisierung: {str(e)}")
# Jinja2-Helfer registrieren
register_template_helpers(app)
# Tailwind im Debug-Modus kompilieren
compile_tailwind_if_debug()
# Scheduler starten, wenn aktiviert
if SCHEDULER_ENABLED:
try:
# Scheduler-Task für Druckauftrags-Prüfung registrieren
scheduler.register_task(
"check_jobs",
check_jobs,
interval=SCHEDULER_INTERVAL
)
# Scheduler starten
scheduler.start()
app_logger.info(f"Scheduler gestartet mit Intervall {SCHEDULER_INTERVAL} Sekunden.")
except Exception as e:
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
# SSL-Kontext protokollieren
ssl_context = get_ssl_context()
if ssl_context:
app_logger.info(f"SSL aktiviert mit Zertifikat {ssl_context[0]}")
else:
app_logger.warning("SSL ist deaktiviert. Die Verbindung ist unverschlüsselt!")
# Scheduler-Funktion zur Überprüfung der Druckaufträge
def check_jobs():
"""
Überprüft alle aktiven Druckaufträge und führt entsprechende Aktionen aus.
Diese Funktion wird vom Scheduler regelmäßig aufgerufen.
"""
app_logger.info("Überprüfe Druckaufträge...")
try:
db_session = get_db_session()
# Aktive Jobs abrufen
active_jobs = db_session.query(Job).filter(
Job.status.in_(["scheduled", "running"])
).all()
now = datetime.now()
for job in active_jobs:
# Prüfen, ob der Job gestartet werden soll
if job.status == "scheduled" and job.start_at <= now:
app_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}")
job.status = "running"
# Steckdose einschalten (implementieren Sie diese Funktion)
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, True)
# Prüfen, ob der Job beendet werden soll
elif job.status == "running" and job.end_at <= now:
app_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}")
job.status = "finished"
job.actual_end_time = now
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, False)
db_session.commit()
db_session.close()
except Exception as e:
app_logger.error(f"Fehler bei der Überprüfung von Druckaufträgen: {str(e)}")
if 'db_session' in locals():
db_session.close()
# App starten
if __name__ == "__main__":
import argparse
import threading
# Kommandozeilenargumente parsen
parser = argparse.ArgumentParser(description='MYP Platform - 3D-Drucker Reservierungssystem')
parser.add_argument('--port', type=int, help='Port für den Server (überschreibt die Konfiguration)')
parser.add_argument('--no-ssl', action='store_true', help='Deaktiviert SSL/HTTPS')
parser.add_argument('--dual-protocol', action='store_true', help='Startet sowohl HTTP als auch HTTPS Server')
args = parser.parse_args()
# Initialisierung
init_app()
# Port aus Kommandozeilenargument verwenden, falls angegeben
port = args.port if args.port else FLASK_PORT
# SSL-Kontext abrufen
ssl_context = None if args.no_ssl else get_ssl_context()
# Funktion zum Starten des Servers
def start_server(use_ssl=True, server_port=None):
if server_port is None:
server_port = port
try:
if use_ssl and ssl_context:
protocol = "HTTPS"
app_logger.info(f"{protocol}-Server wird auf Port {server_port} gestartet...")
app.run(host=FLASK_HOST, port=server_port, debug=FLASK_DEBUG, ssl_context=ssl_context)
else:
protocol = "HTTP"
app_logger.info(f"{protocol}-Server wird auf Port {server_port} gestartet...")
app.run(host=FLASK_HOST, port=server_port, debug=FLASK_DEBUG)
except Exception as e:
app_logger.error(f"Fehler beim Starten des {protocol}-Servers auf Port {server_port}: {str(e)}")
if server_port == FLASK_PORT:
fallback_port = FLASK_FALLBACK_PORT
app_logger.info(f"Versuche auf Fallback-Port {fallback_port} zu starten...")
try:
if use_ssl and ssl_context:
app.run(host=FLASK_HOST, port=fallback_port, debug=FLASK_DEBUG, ssl_context=ssl_context)
else:
app.run(host=FLASK_HOST, port=fallback_port, debug=FLASK_DEBUG)
except Exception as e2:
app_logger.error(f"Auch Fallback-Port fehlgeschlagen: {str(e2)}")
app_logger.info("Versuche auf Standard-Port 5000 zu starten...")
if use_ssl and ssl_context:
app.run(host=FLASK_HOST, port=5000, debug=FLASK_DEBUG, ssl_context=ssl_context)
else:
app.run(host=FLASK_HOST, port=5000, debug=FLASK_DEBUG)
# Dual-Protokoll-Modus: HTTP und HTTPS parallel starten
if args.dual_protocol:
app_logger.info("Starte Server im Dual-Protokoll-Modus (HTTP und HTTPS)...")
# HTTPS auf Hauptport (443)
https_thread = threading.Thread(target=start_server, kwargs={"use_ssl": True, "server_port": port})
https_thread.daemon = True
https_thread.start()
# HTTP auf Alternativport (80)
http_port = FLASK_FALLBACK_PORT
start_server(use_ssl=False, server_port=http_port)
else:
# Normaler Start mit einem Protokoll
app_logger.info(f"Server wird auf Port {port} mit {'HTTPS' if ssl_context else 'HTTP'} gestartet...")
start_server(use_ssl=bool(ssl_context))
# Content Security Policy anpassen
@app.after_request
def add_security_headers(response):
"""Fügt Sicherheitsheader zu allen Antworten hinzu"""
# Content Security Policy definieren
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' 'unsafe-eval'",
"script-src-elem 'self' 'unsafe-inline'",
"style-src 'self' 'unsafe-inline'",
"img-src 'self' data:",
"font-src 'self'",
"connect-src 'self'",
"worker-src 'self'", # Erlaubt Service Worker
"manifest-src 'self'"
]
# Setze CSP Header
response.headers['Content-Security-Policy'] = "; ".join(csp_directives)
# Weitere Sicherheitsheader
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
@app.route("/privacy")
def privacy_page():
"""Zeigt die Datenschutzseite an."""
return render_template("privacy.html")
@app.route("/terms")
def terms_page():
"""Zeigt die Nutzungsbedingungen an."""
return render_template("terms.html")
@app.route("/api/stats/export", methods=["GET"])
@login_required
def export_stats():
"""Exportiert Statistiken als JSON-Datei."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Statistiken exportieren"}), 403
try:
db_session = get_db_session()
# Grundlegende Statistiken sammeln
stats = db_session.query(Stats).first()
if not stats:
stats = Stats()
# Benutzerzahlen
user_count = db_session.query(User).count()
active_user_count = db_session.query(User).filter(User.active == True).count()
# Druckerzahlen
printer_count = db_session.query(Printer).count()
active_printer_count = db_session.query(Printer).filter(Printer.active == True).count()
# Jobstatistiken
total_jobs = db_session.query(Job).count()
completed_jobs = db_session.query(Job).filter(Job.status == "finished").count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
# Berechne durchschnittliche Druckzeit
jobs_with_duration = db_session.query(Job).filter(
Job.start_at != None,
Job.actual_end_time != None
).all()
total_print_time = 0
avg_print_time = 0
if jobs_with_duration:
for job in jobs_with_duration:
duration = (job.actual_end_time - job.start_at).total_seconds()
total_print_time += duration
avg_print_time = total_print_time / len(jobs_with_duration) if len(jobs_with_duration) > 0 else 0
# Füge zusätzliche Statistiken für jede Druckerart hinzu
printer_stats = []
printers = db_session.query(Printer).all()
for printer in printers:
printer_jobs = db_session.query(Job).filter(Job.printer_id == printer.id).count()
printer_success_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status == "finished"
).count()
success_rate = (printer_success_jobs / printer_jobs * 100) if printer_jobs > 0 else 0
printer_stats.append({
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"total_jobs": printer_jobs,
"success_rate": round(success_rate, 2),
"active": printer.active
})
# Export-Daten zusammenstellen
export_data = {
"generated_at": datetime.now().isoformat(),
"users": {
"total": user_count,
"active": active_user_count
},
"printers": {
"total": printer_count,
"active": active_printer_count,
"details": printer_stats
},
"jobs": {
"total": total_jobs,
"completed": completed_jobs,
"active": active_jobs,
"failed": failed_jobs,
"success_rate": round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0
},
"print_time": {
"total_seconds": total_print_time,
"total_hours": round(total_print_time / 3600, 2),
"average_seconds": avg_print_time,
"average_minutes": round(avg_print_time / 60, 2)
},
"system": {
"version": "3.0.0",
"uptime_days": get_system_uptime_days()
}
}
db_session.close()
# Als Datei zum Download anbieten
from flask import make_response
import json
response = make_response(json.dumps(export_data, indent=4))
response.headers["Content-Disposition"] = "attachment; filename=stats_export.json"
response.headers["Content-Type"] = "application/json"
return response
except Exception as e:
app_logger.error(f"Fehler beim Exportieren der Statistiken: {str(e)}")
return jsonify({"error": f"Fehler beim Exportieren: {str(e)}"}), 500
def get_system_uptime_days():
"""Gibt die Systemlaufzeit in Tagen zurück."""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
return round(uptime_seconds / 86400, 2) # Umrechnung in Tage
except Exception:
return 0
@app.route("/api/printers/add", methods=["POST"])
@login_required
def add_printer():
"""Fügt einen neuen Drucker hinzu."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker hinzufügen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["name", "mac_address", "plug_ip", "plug_username", "plug_password"]
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Das Feld '{field}' ist ein Pflichtfeld"}), 400
# Druckerdaten extrahieren
name = data["name"]
model = data.get("model", "")
location = data.get("location", "")
mac_address = data["mac_address"]
plug_ip = data["plug_ip"]
plug_username = data["plug_username"]
plug_password = data["plug_password"]
db_session = get_db_session()
# Prüfen, ob ein Drucker mit dieser MAC-Adresse bereits existiert
existing_printer = db_session.query(Printer).filter(Printer.mac_address == mac_address).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit dieser MAC-Adresse existiert bereits"}), 400
# Neuen Drucker erstellen
new_printer = Printer(
name=name,
model=model,
location=location,
mac_address=mac_address,
plug_ip=plug_ip,
plug_username=plug_username,
plug_password=plug_password,
status="offline",
active=True,
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
# Drucker-ID für die Antwort speichern
printer_id = new_printer.id
# Drucker-Objekt für die Antwort serialisieren
printer_dict = new_printer.to_dict()
db_session.close()
printers_logger.info(f"Neuer Drucker {name} (ID: {printer_id}) wurde von {current_user.username} hinzugefügt")
return jsonify({"success": True, "message": "Drucker erfolgreich hinzugefügt", "printer": printer_dict}), 201
except Exception as e:
printers_logger.error(f"Fehler beim Hinzufügen eines Druckers: {str(e)}")
return jsonify({"error": f"Fehler beim Hinzufügen des Druckers: {str(e)}"}), 500
@app.route("/my/jobs")
@login_required
def my_jobs():
"""Zeigt die persönlichen Jobs des angemeldeten Benutzers an."""
# Weiterleitung zur Jobs-Seite mit Filter für den aktuellen Benutzer
return redirect(url_for("jobs_page", user_filter=current_user.id))
@app.route("/api/user/export", methods=["GET"])
@login_required
def api_user_export_redirect():
"""Leitet den alten API-Pfad zum neuen Benutzer-Export weiter."""
return redirect(url_for("user.export_user_data"))
@app.route("/api/user/profile", methods=["PUT"])
@login_required
def api_user_profile_update_redirect():
"""Leitet den alten API-Pfad zum neuen Benutzer-Profil-Update weiter."""
return redirect(url_for("user.update_profile_api"))