- Added SSL configuration to the backend, including self-signed certificate generation and management. - Updated `setup_myp.sh` to create SSL certificates during installation. - Enhanced `app.py` to support SSL context for secure communication. - Introduced a new SSL management menu in the setup script for easier certificate handling. - Updated frontend API calls to use HTTPS for secure data transmission. - Implemented kiosk mode features, including automatic browser launch with SSL support. - Improved documentation in `SUMMARY.md` to reflect new features and network topology changes.
1590 lines
56 KiB
Python
1590 lines
56 KiB
Python
import os
|
|
import threading
|
|
import time
|
|
import json
|
|
import secrets
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any, Union
|
|
from functools import wraps
|
|
|
|
from flask import Flask, request, jsonify, session, render_template, redirect, url_for, flash, Response
|
|
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
|
|
import sqlalchemy.exc
|
|
import sqlalchemy
|
|
from PyP100 import PyP110
|
|
from flask_wtf.csrf import CSRFProtect
|
|
|
|
from config.settings import (
|
|
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
|
|
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
|
|
SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context
|
|
)
|
|
from utils.logging_config import setup_logging, get_logger, log_startup_info
|
|
from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin
|
|
from utils.job_scheduler import scheduler
|
|
from utils.template_helpers import register_template_helpers
|
|
from blueprints.auth import auth_bp
|
|
from blueprints.user import user_bp
|
|
|
|
# Flask-App initialisieren
|
|
app = Flask(__name__)
|
|
app.secret_key = SECRET_KEY
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
app.config["WTF_CSRF_ENABLED"] = True
|
|
|
|
# CSRF-Schutz initialisieren
|
|
csrf = CSRFProtect(app)
|
|
|
|
# Login-Manager initialisieren
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = "auth.login"
|
|
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
|
|
login_manager.login_message_category = "info"
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(User.id == user_id).first()
|
|
db_session.close()
|
|
return user
|
|
|
|
# Jinja2 Context Processors
|
|
@app.context_processor
|
|
def inject_now():
|
|
"""Inject the current datetime into templates."""
|
|
return {'now': datetime.now()}
|
|
|
|
# Custom Jinja2 filter für Datumsformatierung
|
|
@app.template_filter('format_datetime')
|
|
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
|
|
"""Format a datetime object to a German-style date and time string"""
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, str):
|
|
try:
|
|
value = datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return value
|
|
return value.strftime(format)
|
|
|
|
# Blueprints registrieren
|
|
try:
|
|
from blueprints.kiosk_control import kiosk_bp
|
|
app.register_blueprint(kiosk_bp)
|
|
print("Kiosk-Kontrolle erfolgreich geladen")
|
|
except ImportError:
|
|
print("Kiosk-Kontrolle nicht verfügbar (nur im Kiosk-Modus)")
|
|
|
|
# Auth-Blueprint registrieren
|
|
app.register_blueprint(auth_bp)
|
|
print("Auth-Blueprint erfolgreich geladen")
|
|
|
|
# User-Blueprint registrieren
|
|
app.register_blueprint(user_bp)
|
|
print("User-Blueprint erfolgreich geladen")
|
|
|
|
# Logging initialisieren
|
|
setup_logging()
|
|
log_startup_info()
|
|
|
|
# Logger für verschiedene Komponenten
|
|
app_logger = get_logger("app")
|
|
auth_logger = get_logger("auth")
|
|
jobs_logger = get_logger("jobs")
|
|
printers_logger = get_logger("printers")
|
|
|
|
# Custom decorator für Job-Besitzer-Check
|
|
def job_owner_required(f):
|
|
@wraps(f)
|
|
def decorated_function(job_id, *args, **kwargs):
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).filter(Job.id == job_id).first()
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
|
|
is_admin = current_user.is_admin
|
|
|
|
if not (is_owner or is_admin):
|
|
db_session.close()
|
|
return jsonify({"error": "Keine Berechtigung"}), 403
|
|
|
|
db_session.close()
|
|
return f(job_id, *args, **kwargs)
|
|
return decorated_function
|
|
|
|
# UI-Routen
|
|
@app.route("/")
|
|
def index():
|
|
if current_user.is_authenticated:
|
|
return render_template("index.html")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
return render_template("dashboard.html")
|
|
|
|
@app.route("/profile")
|
|
@login_required
|
|
def profile_redirect():
|
|
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
|
|
return redirect(url_for("user.profile"))
|
|
|
|
@app.route("/profil")
|
|
@login_required
|
|
def profil_redirect():
|
|
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
|
|
return redirect(url_for("user.profile"))
|
|
|
|
@app.route("/settings")
|
|
@login_required
|
|
def settings_redirect():
|
|
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
|
|
return redirect(url_for("user.settings"))
|
|
|
|
@app.route("/einstellungen")
|
|
@login_required
|
|
def einstellungen_redirect():
|
|
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
|
|
return redirect(url_for("user.settings"))
|
|
|
|
@app.route("/admin")
|
|
@login_required
|
|
def admin():
|
|
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
return redirect(url_for("admin_page"))
|
|
|
|
@app.route("/demo")
|
|
@login_required
|
|
def components_demo():
|
|
"""Demo-Seite für UI-Komponenten"""
|
|
return render_template("components_demo.html")
|
|
|
|
@app.route("/printers")
|
|
@login_required
|
|
def printers_page():
|
|
"""Zeigt die Übersichtsseite für Drucker an."""
|
|
return render_template("printers.html")
|
|
|
|
@app.route("/jobs")
|
|
@login_required
|
|
def jobs_page():
|
|
"""Zeigt die Übersichtsseite für Druckaufträge an."""
|
|
return render_template("jobs.html")
|
|
|
|
@app.route("/stats")
|
|
@login_required
|
|
def stats_page():
|
|
"""Zeigt die Statistik-Seite an."""
|
|
return render_template("stats.html")
|
|
|
|
@app.route("/admin-dashboard")
|
|
@login_required
|
|
def admin_page():
|
|
"""Zeigt die Administrationsseite an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
|
|
active_tab = request.args.get('tab', 'users')
|
|
|
|
# Daten für das Admin-Panel direkt beim Laden vorbereiten
|
|
stats = {}
|
|
users = []
|
|
printers = []
|
|
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
|
|
system_info = {"cpu": 0, "memory": 0, "disk": 0}
|
|
logs = []
|
|
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Statistiken laden
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
# Benutzeranzahl
|
|
stats["total_users"] = db_session.query(User).count()
|
|
|
|
# Druckeranzahl
|
|
stats["total_printers"] = db_session.query(Printer).count()
|
|
|
|
# Aktive Jobs
|
|
stats["active_jobs"] = db_session.query(Job).filter(
|
|
Job.status.in_(["scheduled", "running"])
|
|
).count()
|
|
|
|
# Erfolgsrate
|
|
total_jobs = db_session.query(Job).filter(
|
|
Job.status.in_(["completed", "failed", "cancelled"])
|
|
).count()
|
|
|
|
successful_jobs = db_session.query(Job).filter(
|
|
Job.status == "completed"
|
|
).count()
|
|
|
|
if total_jobs > 0:
|
|
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
|
|
else:
|
|
stats["success_rate"] = 0
|
|
|
|
# Benutzer laden
|
|
if active_tab == 'users':
|
|
users = db_session.query(User).all()
|
|
users = [user.to_dict() for user in users]
|
|
|
|
# Drucker laden
|
|
if active_tab == 'printers':
|
|
printers = db_session.query(Printer).all()
|
|
printers = [printer.to_dict() for printer in printers]
|
|
|
|
# Scheduler-Status laden
|
|
if active_tab == 'scheduler':
|
|
from utils.scheduler import scheduler_is_running
|
|
scheduler_status = {
|
|
"running": scheduler_is_running(),
|
|
"message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt"
|
|
}
|
|
|
|
# System-Informationen laden
|
|
if active_tab == 'system':
|
|
import psutil
|
|
system_info = {
|
|
"cpu": psutil.cpu_percent(),
|
|
"memory": psutil.virtual_memory().percent,
|
|
"disk": psutil.disk_usage('/').percent,
|
|
"uptime": get_system_uptime_days()
|
|
}
|
|
|
|
# Logs laden
|
|
if active_tab == 'logs':
|
|
import os
|
|
log_level = request.args.get('log_level', 'all')
|
|
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
|
|
|
|
# Logeinträge sammeln
|
|
app_logs = []
|
|
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
|
|
log_file = os.path.join(log_dir, category, f'{category}.log')
|
|
if os.path.exists(log_file):
|
|
with open(log_file, 'r') as f:
|
|
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
|
|
if log_level != 'all':
|
|
if log_level.upper() not in line:
|
|
continue
|
|
app_logs.append({
|
|
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
|
|
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
|
|
'category': category,
|
|
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
|
|
})
|
|
|
|
# Nach Zeitstempel sortieren (neueste zuerst)
|
|
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
return render_template(
|
|
"admin.html",
|
|
active_tab=active_tab,
|
|
stats=stats,
|
|
users=users,
|
|
printers=printers,
|
|
scheduler_status=scheduler_status,
|
|
system_info=system_info,
|
|
logs=logs
|
|
)
|
|
|
|
# Direkter Zugriff auf Logout-Route (für Fallback)
|
|
@app.route("/logout", methods=["GET", "POST"])
|
|
def logout_redirect():
|
|
"""Leitet zur Blueprint-Logout-Route weiter."""
|
|
return redirect(url_for("auth.logout"))
|
|
|
|
# Job-Routen
|
|
@app.route("/api/jobs", methods=["GET"])
|
|
@login_required
|
|
def get_jobs():
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Import joinedload for eager loading
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
# Admin sieht alle Jobs, User nur eigene
|
|
if current_user.is_admin:
|
|
# Eagerly load the user and printer relationships to avoid detached instance errors
|
|
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
|
|
else:
|
|
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
|
|
|
|
# Convert jobs to dictionaries before closing the session
|
|
job_dicts = [job.to_dict() for job in jobs]
|
|
|
|
db_session.close()
|
|
|
|
return jsonify({
|
|
"jobs": job_dicts
|
|
})
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
|
|
db_session.close()
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
|
|
@login_required
|
|
@job_owner_required
|
|
def get_job(job_id):
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
from sqlalchemy.orm import joinedload
|
|
# Eagerly load the user and printer relationships
|
|
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Convert to dict before closing session
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
return jsonify(job_dict)
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
|
|
db_session.close()
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route('/api/jobs/active', methods=['GET'])
|
|
@login_required
|
|
def get_active_jobs():
|
|
"""
|
|
Gibt alle aktiven Jobs zurück.
|
|
"""
|
|
try:
|
|
db_session = get_db_session()
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
active_jobs = db_session.query(Job).options(
|
|
joinedload(Job.user),
|
|
joinedload(Job.printer)
|
|
).filter(
|
|
Job.status.in_(["scheduled", "running"])
|
|
).all()
|
|
|
|
result = []
|
|
for job in active_jobs:
|
|
job_dict = job.to_dict()
|
|
# Aktuelle Restzeit berechnen
|
|
if job.status == "running" and job.end_at:
|
|
remaining_time = job.end_at - datetime.now()
|
|
if remaining_time.total_seconds() > 0:
|
|
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
|
|
else:
|
|
job_dict["remaining_minutes"] = 0
|
|
|
|
result.append(job_dict)
|
|
|
|
db_session.close()
|
|
return jsonify({"jobs": result})
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route('/api/jobs', methods=['POST'])
|
|
@login_required
|
|
def create_job():
|
|
"""
|
|
Erstellt einen neuen Job mit dem Status "scheduled".
|
|
|
|
Body: {
|
|
"printer_id": int,
|
|
"start_iso": str, # ISO-Datum-String
|
|
"duration_minutes": int
|
|
}
|
|
"""
|
|
try:
|
|
data = request.json
|
|
|
|
# Pflichtfelder prüfen
|
|
required_fields = ["printer_id", "start_iso", "duration_minutes"]
|
|
for field in required_fields:
|
|
if field not in data:
|
|
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
|
|
|
|
# Daten extrahieren und validieren
|
|
printer_id = int(data["printer_id"])
|
|
start_iso = data["start_iso"]
|
|
duration_minutes = int(data["duration_minutes"])
|
|
|
|
# Optional: Jobtitel und Dateipfad
|
|
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
|
|
file_path = data.get("file_path")
|
|
|
|
# Start-Zeit parsen
|
|
try:
|
|
start_at = datetime.fromisoformat(start_iso)
|
|
except ValueError:
|
|
return jsonify({"error": "Ungültiges Startdatum"}), 400
|
|
|
|
# Dauer validieren
|
|
if duration_minutes <= 0:
|
|
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
|
|
|
|
# End-Zeit berechnen
|
|
end_at = start_at + timedelta(minutes=duration_minutes)
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob der Drucker existiert
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
if not printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Drucker nicht gefunden"}), 404
|
|
|
|
# Neuen Job erstellen
|
|
new_job = Job(
|
|
name=name,
|
|
printer_id=printer_id,
|
|
user_id=current_user.id,
|
|
owner_id=current_user.id,
|
|
start_at=start_at,
|
|
end_at=end_at,
|
|
status="scheduled",
|
|
file_path=file_path,
|
|
duration_minutes=duration_minutes
|
|
)
|
|
|
|
db_session.add(new_job)
|
|
db_session.commit()
|
|
|
|
# Job-Objekt für die Antwort serialisieren
|
|
job_dict = new_job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
|
|
return jsonify({"job": job_dict}), 201
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
|
|
@login_required
|
|
@job_owner_required
|
|
def extend_job(job_id):
|
|
"""
|
|
Verlängert die Endzeit eines Jobs.
|
|
|
|
Body: {
|
|
"extra_minutes": int
|
|
}
|
|
"""
|
|
try:
|
|
data = request.json
|
|
|
|
# Prüfen, ob die erforderlichen Daten vorhanden sind
|
|
if "extra_minutes" not in data:
|
|
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
|
|
|
|
extra_minutes = int(data["extra_minutes"])
|
|
|
|
# Validieren
|
|
if extra_minutes <= 0:
|
|
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
|
|
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).get(job_id)
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob der Job verlängert werden kann
|
|
if job.status not in ["scheduled", "running"]:
|
|
db_session.close()
|
|
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
|
|
|
|
# Endzeit aktualisieren
|
|
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
|
|
job.duration_minutes += extra_minutes
|
|
|
|
db_session.commit()
|
|
|
|
# Job-Objekt für die Antwort serialisieren
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
|
|
return jsonify({"job": job_dict})
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
|
|
@login_required
|
|
def finish_job(job_id):
|
|
"""
|
|
Beendet einen Job manuell und schaltet die Steckdose aus.
|
|
Nur für Administratoren erlaubt.
|
|
"""
|
|
try:
|
|
# Prüfen, ob der Benutzer Administrator ist
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
|
|
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob der Job beendet werden kann
|
|
if job.status not in ["scheduled", "running"]:
|
|
db_session.close()
|
|
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
|
|
|
|
# Steckdose ausschalten
|
|
from utils.job_scheduler import toggle_plug
|
|
if not toggle_plug(job.printer_id, False):
|
|
# Trotzdem weitermachen, aber Warnung loggen
|
|
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
|
|
|
|
# Job als beendet markieren
|
|
job.status = "finished"
|
|
job.actual_end_time = datetime.now()
|
|
|
|
db_session.commit()
|
|
|
|
# Job-Objekt für die Antwort serialisieren
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
|
|
return jsonify({"job": job_dict})
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route("/api/printers", methods=["GET"])
|
|
@login_required
|
|
def get_printers():
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
printers = db_session.query(Printer).all()
|
|
printer_list = [printer.to_dict() for printer in printers]
|
|
|
|
db_session.close()
|
|
return jsonify({
|
|
"printers": printer_list
|
|
})
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
|
|
db_session.close()
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
# API-Routen für Statistiken
|
|
@app.route("/api/stats/users", methods=["GET"])
|
|
@login_required
|
|
def get_stats_users():
|
|
"""Gibt die Anzahl der Benutzer zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
user_count = db_session.query(User).count()
|
|
db_session.close()
|
|
return jsonify({"value": user_count})
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/stats/uptime", methods=["GET"])
|
|
@login_required
|
|
def get_stats_uptime():
|
|
"""Gibt die Systemlaufzeit zurück."""
|
|
import os
|
|
with open('/proc/uptime', 'r') as f:
|
|
uptime_seconds = float(f.readline().split()[0])
|
|
uptime_days = int(uptime_seconds / 86400)
|
|
return jsonify({"value": f"{uptime_days} Tage"})
|
|
|
|
@app.route("/api/stats/active-jobs", methods=["GET"])
|
|
@login_required
|
|
def get_stats_active_jobs():
|
|
"""Gibt die Anzahl der aktiven Jobs zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
|
|
db_session.close()
|
|
return jsonify({"value": active_jobs})
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/stats/available-printers", methods=["GET"])
|
|
@login_required
|
|
def get_stats_available_printers():
|
|
"""Gibt die Anzahl der verfügbaren Drucker zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
available_printers = db_session.query(Printer).filter(Printer.active == True).count()
|
|
db_session.close()
|
|
return jsonify({"value": available_printers})
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/stats/success-rate", methods=["GET"])
|
|
@login_required
|
|
def get_stats_success_rate():
|
|
"""Gibt die Erfolgsrate der Druckaufträge zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
total_jobs = db_session.query(Job).filter(Job.status == "finished").count()
|
|
if total_jobs == 0:
|
|
success_rate = 0
|
|
else:
|
|
success_jobs = db_session.query(Job).filter(
|
|
Job.status == "finished",
|
|
Job.actual_end_time != None
|
|
).count()
|
|
success_rate = int((success_jobs / total_jobs) * 100)
|
|
db_session.close()
|
|
return jsonify({"value": f"{success_rate}%"})
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/stats/print-time", methods=["GET"])
|
|
@login_required
|
|
def get_stats_print_time():
|
|
"""Gibt die gesamte Druckzeit zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
stats = db_session.query(Stats).first()
|
|
if stats and stats.total_print_time:
|
|
hours = stats.total_print_time // 3600
|
|
db_session.close()
|
|
return jsonify({"value": f"{hours}h"})
|
|
db_session.close()
|
|
return jsonify({"value": "0h"})
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/activity", methods=["GET"])
|
|
@login_required
|
|
def get_activity():
|
|
"""Gibt die letzten Aktivitäten zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
recent_jobs = db_session.query(Job).order_by(Job.created_at.desc()).limit(5).all()
|
|
activities = [
|
|
{
|
|
"type": "job",
|
|
"id": job.id,
|
|
"title": job.name,
|
|
"status": job.status,
|
|
"timestamp": job.created_at.isoformat() if job.created_at else None
|
|
}
|
|
for job in recent_jobs
|
|
]
|
|
db_session.close()
|
|
return jsonify(activities)
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/printers/status", methods=["GET"])
|
|
@login_required
|
|
def get_printers_status():
|
|
"""Gibt den Status aller Drucker zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
printers = db_session.query(Printer).all()
|
|
status_data = [
|
|
{
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"status": printer.status,
|
|
"active": printer.active
|
|
}
|
|
for printer in printers
|
|
]
|
|
db_session.close()
|
|
return jsonify(status_data)
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/jobs/current", methods=["GET"])
|
|
@login_required
|
|
def get_current_job():
|
|
"""Gibt den aktuellen Job des Benutzers zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
current_job = db_session.query(Job).filter(
|
|
Job.user_id == current_user.id,
|
|
Job.status.in_(["scheduled", "running"])
|
|
).order_by(Job.start_at).first()
|
|
|
|
if current_job:
|
|
job_data = current_job.to_dict()
|
|
else:
|
|
job_data = None
|
|
|
|
db_session.close()
|
|
return jsonify(job_data)
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
# Admin API Endpoints
|
|
@app.route("/api/users", methods=["GET"])
|
|
def get_users():
|
|
"""Returns a list of all users (admin only)"""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Unauthorized"}), 403
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
users = db_session.query(User).all()
|
|
users_list = []
|
|
|
|
for user in users:
|
|
users_list.append({
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"active": user.active,
|
|
"created_at": user.created_at.isoformat() if user.created_at else None
|
|
})
|
|
|
|
db_session.close()
|
|
return jsonify({"users": users_list})
|
|
except Exception as e:
|
|
db_session.close()
|
|
app_logger.error(f"Error fetching users: {str(e)}")
|
|
return jsonify({"error": "Failed to fetch users"}), 500
|
|
|
|
@app.route("/api/users", methods=["POST"])
|
|
@login_required
|
|
def create_user():
|
|
"""Create a new user (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung, um neue Benutzer anzulegen.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
|
|
email = request.form.get('email')
|
|
name = request.form.get('name')
|
|
password = request.form.get('password')
|
|
role = request.form.get('role', 'user')
|
|
|
|
if not email or not password:
|
|
db_session.close()
|
|
flash("E-Mail und Passwort sind Pflichtfelder.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
# Check if user with same email already exists
|
|
existing_user = db_session.query(User).filter(
|
|
User.email == email
|
|
).first()
|
|
|
|
if existing_user:
|
|
db_session.close()
|
|
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
# Create new user
|
|
new_user = User(
|
|
email=email,
|
|
name=name if name else "",
|
|
username=email.split("@")[0], # Default username from email
|
|
role=role,
|
|
active=True,
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
# Set password
|
|
new_user.set_password(password)
|
|
|
|
db_session.add(new_user)
|
|
db_session.commit()
|
|
|
|
user_id = new_user.id
|
|
|
|
db_session.close()
|
|
app_logger.info(f"New user created: {new_user.email} (ID: {user_id})")
|
|
flash(f"Benutzer {email} wurde erfolgreich angelegt.", "success")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
app_logger.error(f"Error creating user: {str(e)}")
|
|
flash(f"Fehler beim Anlegen des Benutzers: {str(e)}", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
@app.route("/api/users/<int:user_id>", methods=["DELETE"])
|
|
@login_required
|
|
def delete_user(user_id):
|
|
"""Delete a user (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung, um Benutzer zu löschen.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
# Prevent admin from deleting themselves
|
|
if user_id == current_user.id:
|
|
flash("Sie können Ihren eigenen Account nicht löschen.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
user = db_session.query(User).filter(User.id == user_id).first()
|
|
|
|
if not user:
|
|
db_session.close()
|
|
flash("Benutzer nicht gefunden.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
# Prevent deletion of admin users
|
|
if user.role == "admin":
|
|
db_session.close()
|
|
flash("Administratoren können nicht gelöscht werden.", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
email = user.email # Save for later logging
|
|
db_session.delete(user)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
app_logger.info(f"User deleted: {email} (ID: {user_id})")
|
|
flash(f"Benutzer {email} wurde erfolgreich gelöscht.", "success")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
app_logger.error(f"Error deleting user: {str(e)}")
|
|
flash(f"Fehler beim Löschen des Benutzers: {str(e)}", "error")
|
|
return redirect(url_for('admin_page', tab='users'))
|
|
|
|
@app.route("/api/printers", methods=["POST"])
|
|
@login_required
|
|
def create_printer():
|
|
"""Create a new printer (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung, um neue Drucker anzulegen.", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
|
|
name = request.form.get('name')
|
|
model = request.form.get('model')
|
|
location = request.form.get('location')
|
|
mac_address = request.form.get('mac_address')
|
|
plug_ip = request.form.get('plug_ip')
|
|
plug_username = request.form.get('plug_username')
|
|
plug_password = request.form.get('plug_password')
|
|
|
|
# Check required fields
|
|
if not name or not mac_address:
|
|
db_session.close()
|
|
flash("Name und MAC-Adresse sind Pflichtfelder.", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
# Check if printer with same MAC already exists
|
|
existing_printer = db_session.query(Printer).filter(
|
|
Printer.mac_address == mac_address
|
|
).first()
|
|
|
|
if existing_printer:
|
|
db_session.close()
|
|
flash("Ein Drucker mit dieser MAC-Adresse existiert bereits.", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
# Create new printer
|
|
new_printer = Printer(
|
|
name=name,
|
|
model=model or "",
|
|
location=location or "",
|
|
mac_address=mac_address,
|
|
plug_ip=plug_ip or "",
|
|
plug_username=plug_username or "",
|
|
plug_password=plug_password or "",
|
|
active=True
|
|
)
|
|
|
|
db_session.add(new_printer)
|
|
db_session.commit()
|
|
|
|
printer_id = new_printer.id
|
|
|
|
db_session.close()
|
|
app_logger.info(f"New printer created: {new_printer.name} (ID: {printer_id})")
|
|
flash(f"Drucker {name} wurde erfolgreich angelegt.", "success")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
app_logger.error(f"Error creating printer: {str(e)}")
|
|
flash(f"Fehler beim Anlegen des Druckers: {str(e)}", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
|
|
@login_required
|
|
def delete_printer(printer_id):
|
|
"""Delete a printer (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung, um Drucker zu löschen.", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
|
|
|
|
if not printer:
|
|
db_session.close()
|
|
flash("Drucker nicht gefunden.", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
printer_name = printer.name # Save for later logging
|
|
db_session.delete(printer)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
app_logger.info(f"Printer deleted: {printer_name} (ID: {printer_id})")
|
|
flash(f"Drucker {printer_name} wurde erfolgreich gelöscht.", "success")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
app_logger.error(f"Error deleting printer: {str(e)}")
|
|
flash(f"Fehler beim Löschen des Druckers: {str(e)}", "error")
|
|
return redirect(url_for('admin_page', tab='printers'))
|
|
|
|
@app.route("/api/stats", methods=["GET"])
|
|
@login_required
|
|
def get_stats():
|
|
"""Get overall system statistics"""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Unauthorized"}), 403
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
# Get basic stats
|
|
stats = db_session.query(Stats).first()
|
|
|
|
if not stats:
|
|
# Create initial stats if none exist
|
|
stats = Stats()
|
|
db_session.add(stats)
|
|
db_session.commit()
|
|
|
|
# Count users, printers, active jobs
|
|
user_count = db_session.query(User).count()
|
|
printer_count = db_session.query(Printer).count()
|
|
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
|
|
|
|
result = {
|
|
"total_users": user_count,
|
|
"total_printers": printer_count,
|
|
"active_jobs": active_jobs,
|
|
"total_print_time_hours": round(stats.total_print_time / 3600, 1) if stats.total_print_time else 0,
|
|
"total_jobs_completed": stats.total_jobs_completed or 0,
|
|
"total_material_used": stats.total_material_used or 0,
|
|
"last_updated": stats.last_updated.isoformat() if stats.last_updated else None
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
db_session.close()
|
|
app_logger.error(f"Error fetching stats: {str(e)}")
|
|
return jsonify({"error": "Failed to fetch statistics"}), 500
|
|
|
|
@app.route("/api/scheduler/status", methods=["GET"])
|
|
@login_required
|
|
def get_scheduler_status():
|
|
"""Get the current status of the job scheduler"""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Unauthorized"}), 403
|
|
|
|
try:
|
|
is_running = scheduler.is_running()
|
|
tasks = []
|
|
|
|
# Add information about scheduler tasks
|
|
for task_id, task in scheduler.get_tasks().items():
|
|
tasks.append({
|
|
"id": task_id,
|
|
"interval": task.get("interval", 0),
|
|
"last_run": task.get("last_run"),
|
|
"enabled": task.get("enabled", False)
|
|
})
|
|
|
|
return jsonify({
|
|
"running": is_running,
|
|
"tasks": tasks,
|
|
"uptime": scheduler.get_uptime()
|
|
})
|
|
except Exception as e:
|
|
app_logger.error(f"Error fetching scheduler status: {str(e)}")
|
|
return jsonify({"error": "Failed to fetch scheduler status"}), 500
|
|
|
|
@app.route("/api/scheduler/start", methods=["POST"])
|
|
@login_required
|
|
def start_scheduler():
|
|
"""Start the job scheduler (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung, um den Scheduler zu starten.", "error")
|
|
return redirect(url_for('admin_page', tab='scheduler'))
|
|
|
|
try:
|
|
from utils.scheduler import start_scheduler as start_scheduler_func
|
|
result = start_scheduler_func()
|
|
|
|
if result:
|
|
app_logger.info(f"Scheduler started by admin user: {current_user.email}")
|
|
flash("Der Scheduler wurde erfolgreich gestartet.", "success")
|
|
else:
|
|
flash("Der Scheduler konnte nicht gestartet werden oder läuft bereits.", "warning")
|
|
|
|
return redirect(url_for('admin_page', tab='scheduler'))
|
|
except Exception as e:
|
|
app_logger.error(f"Error starting scheduler: {str(e)}")
|
|
flash(f"Fehler beim Starten des Schedulers: {str(e)}", "error")
|
|
return redirect(url_for('admin_page', tab='scheduler'))
|
|
|
|
@app.route("/api/scheduler/stop", methods=["POST"])
|
|
@login_required
|
|
def stop_scheduler():
|
|
"""Stop the job scheduler (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung, um den Scheduler zu stoppen.", "error")
|
|
return redirect(url_for('admin_page', tab='scheduler'))
|
|
|
|
try:
|
|
from utils.scheduler import stop_scheduler as stop_scheduler_func
|
|
result = stop_scheduler_func()
|
|
|
|
if result:
|
|
app_logger.info(f"Scheduler stopped by admin user: {current_user.email}")
|
|
flash("Der Scheduler wurde erfolgreich gestoppt.", "success")
|
|
else:
|
|
flash("Der Scheduler konnte nicht gestoppt werden oder läuft nicht.", "warning")
|
|
|
|
return redirect(url_for('admin_page', tab='scheduler'))
|
|
except Exception as e:
|
|
app_logger.error(f"Error stopping scheduler: {str(e)}")
|
|
flash(f"Fehler beim Stoppen des Schedulers: {str(e)}", "error")
|
|
return redirect(url_for('admin_page', tab='scheduler'))
|
|
|
|
@app.route("/api/logs", methods=["GET"])
|
|
@login_required
|
|
def get_logs():
|
|
"""Get system logs (admin only)"""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Unauthorized"}), 403
|
|
|
|
try:
|
|
# Get log type from query params
|
|
log_type = request.args.get("type", "app")
|
|
limit = int(request.args.get("limit", 100))
|
|
|
|
log_mapping = {
|
|
"app": "logs/app/app.log",
|
|
"auth": "logs/auth/auth.log",
|
|
"errors": "logs/errors/errors.log",
|
|
"jobs": "logs/jobs/jobs.log",
|
|
"printers": "logs/printers/printers.log",
|
|
"scheduler": "logs/scheduler/scheduler.log"
|
|
}
|
|
|
|
if log_type not in log_mapping:
|
|
return jsonify({"error": "Invalid log type"}), 400
|
|
|
|
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_mapping[log_type])
|
|
|
|
if not os.path.exists(log_path):
|
|
return jsonify({"logs": [], "success": True})
|
|
|
|
logs = []
|
|
with open(log_path, "r") as f:
|
|
for line in f.readlines()[-limit:]:
|
|
try:
|
|
# Parse log entry (format: [LEVEL] TIMESTAMP - MESSAGE)
|
|
parts = line.strip().split(" - ", 1)
|
|
if len(parts) == 2:
|
|
header, message = parts
|
|
level_timestamp = header.strip("[]").split("] [", 1)
|
|
if len(level_timestamp) == 2:
|
|
level, timestamp = level_timestamp
|
|
logs.append({
|
|
"level": level.strip(),
|
|
"timestamp": timestamp.strip(),
|
|
"message": message.strip(),
|
|
"source": log_type
|
|
})
|
|
except Exception:
|
|
# If parsing fails, add the raw line
|
|
logs.append({
|
|
"level": "INFO",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"message": line.strip(),
|
|
"source": f"myp.{log_type}"
|
|
})
|
|
|
|
# Sort logs by timestamp in descending order
|
|
logs.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
|
|
return jsonify({"logs": logs, "success": True})
|
|
except Exception as e:
|
|
app_logger.error(f"Error fetching logs: {str(e)}")
|
|
return jsonify({"error": "Failed to fetch logs"}), 500
|
|
|
|
@app.route("/api/activity/recent", methods=["GET"])
|
|
@login_required
|
|
def get_recent_activity():
|
|
"""Get recent system activity"""
|
|
try:
|
|
# Create mock activity data (to be replaced with real data in future)
|
|
activities = [
|
|
{
|
|
"description": "Neuer Druckauftrag erstellt: 'Motor_Halterung_v2'",
|
|
"timestamp": (datetime.now() - timedelta(minutes=15)).isoformat(),
|
|
"user": "admin@example.com",
|
|
"type": "job_created"
|
|
},
|
|
{
|
|
"description": "Drucker 'Prusa i3 MK3S' wurde neu konfiguriert",
|
|
"timestamp": (datetime.now() - timedelta(hours=2)).isoformat(),
|
|
"user": "admin@example.com",
|
|
"type": "printer_updated"
|
|
},
|
|
{
|
|
"description": "Druckauftrag 'Getriebe_Prototyp' abgeschlossen",
|
|
"timestamp": (datetime.now() - timedelta(hours=5)).isoformat(),
|
|
"user": "user@example.com",
|
|
"type": "job_completed"
|
|
},
|
|
{
|
|
"description": "Neuer Benutzer registriert: 'user@example.com'",
|
|
"timestamp": (datetime.now() - timedelta(days=1)).isoformat(),
|
|
"user": "admin@example.com",
|
|
"type": "user_created"
|
|
},
|
|
{
|
|
"description": "Systemwartung durchgeführt",
|
|
"timestamp": (datetime.now() - timedelta(days=2)).isoformat(),
|
|
"user": "admin@example.com",
|
|
"type": "system_maintenance"
|
|
}
|
|
]
|
|
|
|
# Get limit from query params
|
|
limit = int(request.args.get("limit", 5))
|
|
activities = activities[:limit]
|
|
|
|
return jsonify({"activities": activities})
|
|
except Exception as e:
|
|
app_logger.error(f"Error fetching recent activity: {str(e)}")
|
|
return jsonify({"error": "Failed to fetch recent activity"}), 500
|
|
|
|
# Service Worker Route
|
|
@app.route('/sw.js')
|
|
def service_worker():
|
|
"""Serve the service worker script with proper headers"""
|
|
response = app.send_static_file('js/sw.js')
|
|
# Wichtig: Korrekte MIME-Type setzen
|
|
response.headers['Content-Type'] = 'application/javascript'
|
|
# Wichtig: Cache-Control Header setzen, um häufige Updates zu ermöglichen
|
|
response.headers['Cache-Control'] = 'no-cache'
|
|
# Service-Worker-Allowed Header setzen, um Scope-Probleme zu beheben
|
|
response.headers['Service-Worker-Allowed'] = '/'
|
|
return response
|
|
|
|
# Fehlerbehandlung
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return render_template("404.html"), 404
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(e):
|
|
return render_template("500.html"), 500
|
|
|
|
# CLI-Befehle für Tailwind CSS
|
|
@app.cli.group()
|
|
def tailwind():
|
|
"""Tailwind CSS Kommandos."""
|
|
pass
|
|
|
|
@tailwind.command("build")
|
|
def tailwind_build():
|
|
"""Tailwind CSS für die Produktion kompilieren."""
|
|
print("Tailwind CSS wird kompiliert...")
|
|
try:
|
|
subprocess.run(["npx", "tailwindcss", "-i", "./static/css/input.css",
|
|
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--minify"],
|
|
check=True)
|
|
print("Tailwind CSS erfolgreich kompiliert.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Fehler beim Kompilieren von Tailwind CSS: {e}")
|
|
raise
|
|
|
|
@tailwind.command("watch")
|
|
def tailwind_watch():
|
|
"""Tailwind CSS im Watch-Modus starten."""
|
|
print("Tailwind CSS Watch-Modus wird gestartet...")
|
|
try:
|
|
subprocess.Popen(["npx", "tailwindcss", "-i", "./static/css/input.css",
|
|
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--watch"])
|
|
print("Tailwind CSS Watch-Modus gestartet. CSS wird bei Änderungen automatisch aktualisiert.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Fehler beim Starten des Tailwind CSS Watch-Modus: {e}")
|
|
raise
|
|
|
|
# Auto-Kompilierung beim Serverstart im Debug-Modus
|
|
def compile_tailwind_if_debug():
|
|
"""Kompiliert Tailwind CSS im Debug-Modus, falls notwendig."""
|
|
if FLASK_DEBUG:
|
|
try:
|
|
app_logger.info("Kompiliere Tailwind CSS...")
|
|
subprocess.run([
|
|
"npx", "tailwindcss", "-i", "static/css/input.css",
|
|
"-o", "static/css/tailwind.min.css", "--minify"
|
|
], check=True)
|
|
app_logger.info("Tailwind CSS erfolgreich kompiliert.")
|
|
except subprocess.CalledProcessError:
|
|
app_logger.warning("Tailwind konnte nicht kompiliert werden. Möglicherweise ist npx/Node.js nicht installiert.")
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Kompilieren von Tailwind CSS: {str(e)}")
|
|
|
|
# Tailwind CSS kompilieren, wenn im Debug-Modus
|
|
if FLASK_DEBUG:
|
|
compile_tailwind_if_debug()
|
|
|
|
# Initialisierung der Datenbank beim Start
|
|
def init_app():
|
|
"""Initialisiert die App-Komponenten und startet den Scheduler."""
|
|
# Datenbank initialisieren
|
|
try:
|
|
init_database()
|
|
create_initial_admin()
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler bei der Datenbank-Initialisierung: {str(e)}")
|
|
|
|
# Jinja2-Helfer registrieren
|
|
register_template_helpers(app)
|
|
|
|
# Tailwind im Debug-Modus kompilieren
|
|
compile_tailwind_if_debug()
|
|
|
|
# Scheduler starten, wenn aktiviert
|
|
if SCHEDULER_ENABLED:
|
|
try:
|
|
# Scheduler-Task für Druckauftrags-Prüfung registrieren
|
|
scheduler.register_task(
|
|
"check_jobs",
|
|
check_jobs,
|
|
interval=SCHEDULER_INTERVAL
|
|
)
|
|
|
|
# Scheduler starten
|
|
scheduler.start()
|
|
app_logger.info(f"Scheduler gestartet mit Intervall {SCHEDULER_INTERVAL} Sekunden.")
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
|
|
|
|
# SSL-Kontext protokollieren
|
|
ssl_context = get_ssl_context()
|
|
if ssl_context:
|
|
app_logger.info(f"SSL aktiviert mit Zertifikat {ssl_context[0]}")
|
|
else:
|
|
app_logger.warning("SSL ist deaktiviert. Die Verbindung ist unverschlüsselt!")
|
|
|
|
# App starten
|
|
if __name__ == "__main__":
|
|
try:
|
|
# App initialisieren
|
|
init_app()
|
|
|
|
# SSL-Kontext ermitteln
|
|
ssl_context = get_ssl_context()
|
|
|
|
# Konsolen-Ausgabe für HTTPS
|
|
protocol = "HTTPS" if ssl_context else "HTTP"
|
|
app_logger.info(f"MYP startet auf {protocol}://{FLASK_HOST}:{FLASK_PORT} (Debug: {FLASK_DEBUG})")
|
|
|
|
# App starten
|
|
app.run(
|
|
host=FLASK_HOST,
|
|
port=FLASK_PORT,
|
|
debug=FLASK_DEBUG,
|
|
ssl_context=ssl_context
|
|
)
|
|
except Exception as e:
|
|
app_logger.critical(f"Kritischer Fehler beim Starten der Anwendung: {str(e)}")
|
|
|
|
# Content Security Policy anpassen
|
|
@app.after_request
|
|
def add_security_headers(response):
|
|
"""Fügt Sicherheitsheader zu allen Antworten hinzu"""
|
|
# Content Security Policy definieren
|
|
csp_directives = [
|
|
"default-src 'self'",
|
|
"script-src 'self' 'unsafe-inline' 'unsafe-eval'",
|
|
"script-src-elem 'self' 'unsafe-inline'",
|
|
"style-src 'self' 'unsafe-inline'",
|
|
"img-src 'self' data:",
|
|
"font-src 'self'",
|
|
"connect-src 'self'",
|
|
"worker-src 'self'", # Erlaubt Service Worker
|
|
"manifest-src 'self'"
|
|
]
|
|
|
|
# Setze CSP Header
|
|
response.headers['Content-Security-Policy'] = "; ".join(csp_directives)
|
|
|
|
# Weitere Sicherheitsheader
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
|
|
response.headers['X-XSS-Protection'] = '1; mode=block'
|
|
|
|
return response
|
|
|
|
@app.route("/privacy")
|
|
def privacy_page():
|
|
"""Zeigt die Datenschutzseite an."""
|
|
return render_template("privacy.html")
|
|
|
|
@app.route("/terms")
|
|
def terms_page():
|
|
"""Zeigt die Nutzungsbedingungen an."""
|
|
return render_template("terms.html")
|
|
|
|
@app.route("/api/stats/export", methods=["GET"])
|
|
@login_required
|
|
def export_stats():
|
|
"""Exportiert Statistiken als JSON-Datei."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Statistiken exportieren"}), 403
|
|
|
|
try:
|
|
db_session = get_db_session()
|
|
|
|
# Grundlegende Statistiken sammeln
|
|
stats = db_session.query(Stats).first()
|
|
if not stats:
|
|
stats = Stats()
|
|
|
|
# Benutzerzahlen
|
|
user_count = db_session.query(User).count()
|
|
active_user_count = db_session.query(User).filter(User.active == True).count()
|
|
|
|
# Druckerzahlen
|
|
printer_count = db_session.query(Printer).count()
|
|
active_printer_count = db_session.query(Printer).filter(Printer.active == True).count()
|
|
|
|
# Jobstatistiken
|
|
total_jobs = db_session.query(Job).count()
|
|
completed_jobs = db_session.query(Job).filter(Job.status == "finished").count()
|
|
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
|
|
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
|
|
|
|
# Berechne durchschnittliche Druckzeit
|
|
jobs_with_duration = db_session.query(Job).filter(
|
|
Job.start_at != None,
|
|
Job.actual_end_time != None
|
|
).all()
|
|
|
|
total_print_time = 0
|
|
avg_print_time = 0
|
|
|
|
if jobs_with_duration:
|
|
for job in jobs_with_duration:
|
|
duration = (job.actual_end_time - job.start_at).total_seconds()
|
|
total_print_time += duration
|
|
|
|
avg_print_time = total_print_time / len(jobs_with_duration) if len(jobs_with_duration) > 0 else 0
|
|
|
|
# Füge zusätzliche Statistiken für jede Druckerart hinzu
|
|
printer_stats = []
|
|
printers = db_session.query(Printer).all()
|
|
|
|
for printer in printers:
|
|
printer_jobs = db_session.query(Job).filter(Job.printer_id == printer.id).count()
|
|
printer_success_jobs = db_session.query(Job).filter(
|
|
Job.printer_id == printer.id,
|
|
Job.status == "finished"
|
|
).count()
|
|
|
|
success_rate = (printer_success_jobs / printer_jobs * 100) if printer_jobs > 0 else 0
|
|
|
|
printer_stats.append({
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model,
|
|
"location": printer.location,
|
|
"total_jobs": printer_jobs,
|
|
"success_rate": round(success_rate, 2),
|
|
"active": printer.active
|
|
})
|
|
|
|
# Export-Daten zusammenstellen
|
|
export_data = {
|
|
"generated_at": datetime.now().isoformat(),
|
|
"users": {
|
|
"total": user_count,
|
|
"active": active_user_count
|
|
},
|
|
"printers": {
|
|
"total": printer_count,
|
|
"active": active_printer_count,
|
|
"details": printer_stats
|
|
},
|
|
"jobs": {
|
|
"total": total_jobs,
|
|
"completed": completed_jobs,
|
|
"active": active_jobs,
|
|
"failed": failed_jobs,
|
|
"success_rate": round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0
|
|
},
|
|
"print_time": {
|
|
"total_seconds": total_print_time,
|
|
"total_hours": round(total_print_time / 3600, 2),
|
|
"average_seconds": avg_print_time,
|
|
"average_minutes": round(avg_print_time / 60, 2)
|
|
},
|
|
"system": {
|
|
"version": "3.0.0",
|
|
"uptime_days": get_system_uptime_days()
|
|
}
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
# Als Datei zum Download anbieten
|
|
from flask import make_response
|
|
import json
|
|
|
|
response = make_response(json.dumps(export_data, indent=4))
|
|
response.headers["Content-Disposition"] = "attachment; filename=stats_export.json"
|
|
response.headers["Content-Type"] = "application/json"
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Exportieren der Statistiken: {str(e)}")
|
|
return jsonify({"error": f"Fehler beim Exportieren: {str(e)}"}), 500
|
|
|
|
def get_system_uptime_days():
|
|
"""Gibt die Systemlaufzeit in Tagen zurück."""
|
|
try:
|
|
with open('/proc/uptime', 'r') as f:
|
|
uptime_seconds = float(f.readline().split()[0])
|
|
return round(uptime_seconds / 86400, 2) # Umrechnung in Tage
|
|
except Exception:
|
|
return 0
|
|
|
|
@app.route("/api/printers/add", methods=["POST"])
|
|
@login_required
|
|
def add_printer():
|
|
"""Fügt einen neuen Drucker hinzu."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Drucker hinzufügen"}), 403
|
|
|
|
try:
|
|
data = request.json
|
|
|
|
# Pflichtfelder prüfen
|
|
required_fields = ["name", "mac_address", "plug_ip", "plug_username", "plug_password"]
|
|
for field in required_fields:
|
|
if field not in data or not data[field]:
|
|
return jsonify({"error": f"Das Feld '{field}' ist ein Pflichtfeld"}), 400
|
|
|
|
# Druckerdaten extrahieren
|
|
name = data["name"]
|
|
model = data.get("model", "")
|
|
location = data.get("location", "")
|
|
mac_address = data["mac_address"]
|
|
plug_ip = data["plug_ip"]
|
|
plug_username = data["plug_username"]
|
|
plug_password = data["plug_password"]
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob ein Drucker mit dieser MAC-Adresse bereits existiert
|
|
existing_printer = db_session.query(Printer).filter(Printer.mac_address == mac_address).first()
|
|
if existing_printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Ein Drucker mit dieser MAC-Adresse existiert bereits"}), 400
|
|
|
|
# Neuen Drucker erstellen
|
|
new_printer = Printer(
|
|
name=name,
|
|
model=model,
|
|
location=location,
|
|
mac_address=mac_address,
|
|
plug_ip=plug_ip,
|
|
plug_username=plug_username,
|
|
plug_password=plug_password,
|
|
status="offline",
|
|
active=True,
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
db_session.add(new_printer)
|
|
db_session.commit()
|
|
|
|
# Drucker-ID für die Antwort speichern
|
|
printer_id = new_printer.id
|
|
|
|
# Drucker-Objekt für die Antwort serialisieren
|
|
printer_dict = new_printer.to_dict()
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Neuer Drucker {name} (ID: {printer_id}) wurde von {current_user.username} hinzugefügt")
|
|
return jsonify({"success": True, "message": "Drucker erfolgreich hinzugefügt", "printer": printer_dict}), 201
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Hinzufügen eines Druckers: {str(e)}")
|
|
return jsonify({"error": f"Fehler beim Hinzufügen des Druckers: {str(e)}"}), 500
|
|
|
|
@app.route("/my/jobs")
|
|
@login_required
|
|
def my_jobs():
|
|
"""Zeigt die persönlichen Jobs des angemeldeten Benutzers an."""
|
|
# Weiterleitung zur Jobs-Seite mit Filter für den aktuellen Benutzer
|
|
return redirect(url_for("jobs_page", user_filter=current_user.id))
|
|
|
|
@app.route("/api/user/export", methods=["GET"])
|
|
@login_required
|
|
def api_user_export_redirect():
|
|
"""Leitet den alten API-Pfad zum neuen Benutzer-Export weiter."""
|
|
return redirect(url_for("user.export_user_data"))
|
|
|
|
@app.route("/api/user/profile", methods=["PUT"])
|
|
@login_required
|
|
def api_user_profile_update_redirect():
|
|
"""Leitet den alten API-Pfad zum neuen Benutzer-Profil-Update weiter."""
|
|
return redirect(url_for("user.update_profile_api")) |