2401 lines
87 KiB
Python

import os
import sys
import logging
import threading
import time
import subprocess
import socket
import json
import secrets
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from functools import wraps
from typing import Optional, Dict, List, Tuple, Any, Union
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_file, Response
from flask_login import LoginManager, login_user, logout_user, login_required, current_user, UserMixin
from werkzeug.security import check_password_hash, generate_password_hash
from werkzeug.utils import secure_filename
import sqlalchemy.exc
import sqlalchemy
from PyP100 import PyP110
from flask_wtf.csrf import CSRFProtect
from config.settings import (
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context, FLASK_FALLBACK_PORT,
SSL_ENABLED, SSL_CERT_PATH, SSL_KEY_PATH
)
from utils.logging_config import setup_logging, get_logger, log_startup_info
from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin
from utils.job_scheduler import scheduler
from utils.template_helpers import register_template_helpers
from blueprints.auth import auth_bp
from blueprints.user import user_bp
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["WTF_CSRF_ENABLED"] = True
# CSRF-Schutz initialisieren
csrf = CSRFProtect(app)
# Login-Manager initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
login_manager.login_message_category = "info"
@login_manager.user_loader
def load_user(user_id):
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
db_session.close()
return user
# Jinja2 Context Processors
@app.context_processor
def inject_now():
"""Inject the current datetime into templates."""
return {'now': datetime.now()}
# Custom Jinja2 filter für Datumsformatierung
@app.template_filter('format_datetime')
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
"""Format a datetime object to a German-style date and time string"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return value
return value.strftime(format)
# Blueprints registrieren
try:
from blueprints.kiosk_control import kiosk_bp
app.register_blueprint(kiosk_bp)
print("Kiosk-Kontrolle erfolgreich geladen")
except ImportError:
print("Kiosk-Kontrolle nicht verfügbar (nur im Kiosk-Modus)")
# Auth-Blueprint registrieren
app.register_blueprint(auth_bp)
print("Auth-Blueprint erfolgreich geladen")
# User-Blueprint registrieren
app.register_blueprint(user_bp)
print("User-Blueprint erfolgreich geladen")
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
auth_logger = get_logger("auth")
jobs_logger = get_logger("jobs")
printers_logger = get_logger("printers")
# Custom decorator für Job-Besitzer-Check
def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
is_admin = current_user.is_admin
if not (is_owner or is_admin):
db_session.close()
return jsonify({"error": "Keine Berechtigung"}), 403
db_session.close()
return f(job_id, *args, **kwargs)
return decorated_function
def check_printer_status(ip_address: str, timeout: int = 7) -> Tuple[str, bool]:
"""
Überprüft den Status eines Druckers über Ping mit Timeout.
Args:
ip_address: IP-Adresse des Druckers
timeout: Timeout in Sekunden (Standard: 7)
Returns:
Tuple[str, bool]: (Status, Aktiv) - Status ist "online" oder "offline", Aktiv ist True/False
"""
if not ip_address:
return "offline", False
try:
# Windows-spezifischer Ping-Befehl mit Timeout
if os.name == 'nt': # Windows
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address]
else: # Unix/Linux/macOS
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address]
# Ping ausführen mit Timeout
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout + 1 # Zusätzlicher Timeout für subprocess
)
# Erfolgreicher Ping (Return Code 0)
if result.returncode == 0:
return "online", True
else:
return "offline", False
except subprocess.TimeoutExpired:
printers_logger.warning(f"Ping-Timeout für Drucker {ip_address} nach {timeout} Sekunden")
return "offline", False
except Exception as e:
printers_logger.error(f"Fehler beim Ping für Drucker {ip_address}: {str(e)}")
return "offline", False
def check_multiple_printers_status(printers: List[Dict], timeout: int = 7) -> Dict[int, Tuple[str, bool]]:
"""
Überprüft den Status mehrerer Drucker parallel mit Timeout.
Args:
printers: Liste von Drucker-Dictionaries mit 'id' und 'ip_address'
timeout: Timeout in Sekunden pro Drucker (Standard: 7)
Returns:
Dict[int, Tuple[str, bool]]: Dictionary mit Drucker-ID als Key und (Status, Aktiv) als Value
"""
results = {}
# Parallel-Ausführung mit ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=min(len(printers), 10)) as executor:
# Futures für alle Drucker erstellen
future_to_printer = {
executor.submit(check_printer_status, printer.get('ip_address'), timeout): printer
for printer in printers
}
# Ergebnisse sammeln
for future in as_completed(future_to_printer, timeout=timeout + 2):
printer = future_to_printer[future]
try:
status, active = future.result()
results[printer['id']] = (status, active)
printers_logger.info(f"Drucker {printer['name']} ({printer.get('ip_address')}): {status}")
except Exception as e:
printers_logger.error(f"Fehler bei Status-Check für Drucker {printer['name']}: {str(e)}")
results[printer['id']] = ("offline", False)
return results
# UI-Routen
@app.route("/")
def index():
if current_user.is_authenticated:
return render_template("index.html")
return redirect(url_for("auth.login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template("dashboard.html")
@app.route("/profile")
@login_required
def profile_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
return redirect(url_for("user.profile"))
@app.route("/profil")
@login_required
def profil_redirect():
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user.profile"))
@app.route("/settings")
@login_required
def settings_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
return redirect(url_for("user.settings"))
@app.route("/einstellungen")
@login_required
def einstellungen_redirect():
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
return redirect(url_for("user.settings"))
@app.route("/admin")
@login_required
def admin():
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
return redirect(url_for("admin_page"))
@app.route("/demo")
@login_required
def components_demo():
"""Demo-Seite für UI-Komponenten"""
return render_template("components_demo.html")
@app.route("/printers")
@login_required
def printers_page():
"""Zeigt die Übersichtsseite für Drucker an."""
return render_template("printers.html")
@app.route("/jobs")
@login_required
def jobs_page():
"""Zeigt die Übersichtsseite für Druckaufträge an."""
return render_template("jobs.html")
@app.route("/stats")
@login_required
def stats_page():
"""Zeigt die Statistik-Seite an."""
return render_template("stats.html")
@app.route("/admin-dashboard")
@login_required
def admin_page():
"""Zeigt die Administrationsseite an."""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
return redirect(url_for("index"))
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
active_tab = request.args.get('tab', 'users')
# Daten für das Admin-Panel direkt beim Laden vorbereiten
stats = {}
users = []
printers = []
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
system_info = {"cpu": 0, "memory": 0, "disk": 0}
logs = []
db_session = get_db_session()
try:
# Statistiken laden
from sqlalchemy.orm import joinedload
# Benutzeranzahl
stats["total_users"] = db_session.query(User).count()
# Druckeranzahl und Online-Status
all_printers = db_session.query(Printer).all()
stats["total_printers"] = len(all_printers)
stats["online_printers"] = len([p for p in all_printers if p.status == "online"])
# Aktive Jobs und Warteschlange
stats["active_jobs"] = db_session.query(Job).filter(
Job.status.in_(["printing", "running"])
).count()
stats["queued_jobs"] = db_session.query(Job).filter(
Job.status == "scheduled"
).count()
# Erfolgsrate
total_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"])
).count()
successful_jobs = db_session.query(Job).filter(
Job.status == "completed"
).count()
if total_jobs > 0:
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
else:
stats["success_rate"] = 0
# Benutzer laden
if active_tab == 'users':
users = db_session.query(User).all()
users = [user.to_dict() for user in users]
# Drucker laden
if active_tab == 'printers':
printers = db_session.query(Printer).all()
printers = [printer.to_dict() for printer in printers]
# Scheduler-Status laden
if active_tab == 'scheduler':
from utils.scheduler import scheduler_is_running
scheduler_status = {
"running": scheduler_is_running(),
"message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt"
}
# System-Informationen laden
if active_tab == 'system':
import psutil
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds // 86400)
uptime_hours = int((uptime_seconds % 86400) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
# Datenbank-Status
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
db_size = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Scheduler-Status
scheduler_running = False
scheduler_jobs = 0
try:
from utils.job_scheduler import scheduler
scheduler_running = scheduler.running
if hasattr(scheduler, 'get_jobs'):
scheduler_jobs = len(scheduler.get_jobs())
except:
pass
# Nächster Job
next_job = db_session.query(Job).filter(
Job.status == "scheduled"
).order_by(Job.created_at.asc()).first()
next_job_time = "Keine geplanten Jobs"
if next_job:
next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M")
system_info = {
"cpu_usage": round(cpu_percent, 1),
"memory_usage": round(memory.percent, 1),
"disk_usage": round((disk.used / disk.total) * 100, 1),
"uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m",
"db_size": f"{db_size:.1f} MB",
"db_connections": "Aktiv",
"scheduler_running": scheduler_running,
"scheduler_jobs": scheduler_jobs,
"next_job": next_job_time
}
# Logs laden
if active_tab == 'logs':
import os
log_level = request.args.get('log_level', 'all')
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
# Logeinträge sammeln
app_logs = []
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
log_file = os.path.join(log_dir, category, f'{category}.log')
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
if log_level != 'all':
if log_level.upper() not in line:
continue
app_logs.append({
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
'category': category,
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
})
# Nach Zeitstempel sortieren (neueste zuerst)
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
finally:
db_session.close()
return render_template(
"admin.html",
active_tab=active_tab,
stats=stats,
users=users,
printers=printers,
scheduler_status=scheduler_status,
system_info=system_info,
logs=logs
)
# Direkter Zugriff auf Logout-Route (für Fallback)
@app.route("/logout", methods=["GET", "POST"])
def logout_redirect():
"""Leitet zur Blueprint-Logout-Route weiter."""
return redirect(url_for("auth.logout"))
# Job-Routen
@app.route("/api/jobs", methods=["GET"])
@login_required
def get_jobs():
db_session = get_db_session()
try:
# Import joinedload for eager loading
from sqlalchemy.orm import joinedload
# Admin sieht alle Jobs, User nur eigene
if current_user.is_admin:
# Eagerly load the user and printer relationships to avoid detached instance errors
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
else:
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
# Convert jobs to dictionaries before closing the session
job_dicts = [job.to_dict() for job in jobs]
db_session.close()
return jsonify({
"jobs": job_dicts
})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job(job_id):
db_session = get_db_session()
try:
from sqlalchemy.orm import joinedload
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
return jsonify(job_dict)
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route('/api/jobs/active', methods=['GET'])
@login_required
def get_active_jobs():
"""
Gibt alle aktiven Jobs zurück.
"""
try:
db_session = get_db_session()
from sqlalchemy.orm import joinedload
active_jobs = db_session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(
Job.status.in_(["scheduled", "running"])
).all()
result = []
for job in active_jobs:
job_dict = job.to_dict()
# Aktuelle Restzeit berechnen
if job.status == "running" and job.end_at:
remaining_time = job.end_at - datetime.now()
if remaining_time.total_seconds() > 0:
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
else:
job_dict["remaining_minutes"] = 0
result.append(job_dict)
db_session.close()
return jsonify({"jobs": result})
except Exception as e:
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs', methods=['POST'])
@login_required
def create_job():
"""
Erstellt einen neuen Job mit dem Status "scheduled".
Body: {
"printer_id": int,
"start_iso": str, # ISO-Datum-String
"duration_minutes": int
}
"""
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["printer_id", "start_iso", "duration_minutes"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
# Daten extrahieren und validieren
printer_id = int(data["printer_id"])
start_iso = data["start_iso"]
duration_minutes = int(data["duration_minutes"])
# Optional: Jobtitel und Dateipfad
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
file_path = data.get("file_path")
# Start-Zeit parsen
try:
start_at = datetime.fromisoformat(start_iso)
except ValueError:
return jsonify({"error": "Ungültiges Startdatum"}), 400
# Dauer validieren
if duration_minutes <= 0:
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
# End-Zeit berechnen
end_at = start_at + timedelta(minutes=duration_minutes)
db_session = get_db_session()
# Prüfen, ob der Drucker existiert
printer = db_session.query(Printer).get(printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Neuen Job erstellen
new_job = Job(
name=name,
printer_id=printer_id,
user_id=current_user.id,
owner_id=current_user.id,
start_at=start_at,
end_at=end_at,
status="scheduled",
file_path=file_path,
duration_minutes=duration_minutes
)
db_session.add(new_job)
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = new_job.to_dict()
db_session.close()
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
return jsonify({"job": job_dict}), 201
except Exception as e:
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
@login_required
@job_owner_required
def extend_job(job_id):
"""
Verlängert die Endzeit eines Jobs.
Body: {
"extra_minutes": int
}
"""
try:
data = request.json
# Prüfen, ob die erforderlichen Daten vorhanden sind
if "extra_minutes" not in data:
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
extra_minutes = int(data["extra_minutes"])
# Validieren
if extra_minutes <= 0:
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
db_session = get_db_session()
job = db_session.query(Job).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job verlängert werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
# Endzeit aktualisieren
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
job.duration_minutes += extra_minutes
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
@login_required
def finish_job(job_id):
"""
Beendet einen Job manuell und schaltet die Steckdose aus.
Nur für Administratoren erlaubt.
"""
try:
# Prüfen, ob der Benutzer Administrator ist
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
db_session = get_db_session()
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
if not job:
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
# Prüfen, ob der Job beendet werden kann
if job.status not in ["scheduled", "running"]:
db_session.close()
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
if not toggle_plug(job.printer_id, False):
# Trotzdem weitermachen, aber Warnung loggen
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
# Job als beendet markieren
job.status = "finished"
job.actual_end_time = datetime.now()
db_session.commit()
# Job-Objekt für die Antwort serialisieren
job_dict = job.to_dict()
db_session.close()
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
return jsonify({"job": job_dict})
except Exception as e:
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
"""Gibt alle Drucker mit aktuellem Status zurück (mit Ping-Check und 7-Sekunden-Timeout)."""
db_session = get_db_session()
try:
printers = db_session.query(Printer).all()
# Drucker-Daten für Status-Check vorbereiten
printer_data = []
for printer in printers:
printer_data.append({
'id': printer.id,
'name': printer.name,
'ip_address': printer.ip_address,
'location': printer.location
})
# Status aller Drucker parallel überprüfen mit 7-Sekunden-Timeout
printers_logger.info(f"Starte Drucker-Status-Check für {len(printer_data)} Drucker")
status_results = check_multiple_printers_status(printer_data, timeout=7)
# Drucker-Liste mit aktuellem Status erstellen
printer_list = []
for printer in printers:
if printer.id in status_results:
status, active = status_results[printer.id]
# Mapping für Frontend-Kompatibilität
if status == "online":
frontend_status = "available"
else:
frontend_status = "offline"
else:
# Fallback falls kein Ergebnis vorliegt
frontend_status = "offline"
active = False
# Status in der Datenbank aktualisieren
printer.status = frontend_status
printer.active = active
printer_data = printer.to_dict()
printer_data["status"] = frontend_status
printer_data["active"] = active
printer_data["last_checked"] = datetime.now().isoformat()
printer_list.append(printer_data)
# Speichere Updates
db_session.commit()
db_session.close()
online_count = len([p for p in printer_list if p["status"] == "available"])
printers_logger.info(f"Drucker-Status-Check abgeschlossen: {online_count} von {len(printer_list)} Drucker verfügbar")
return jsonify({
"printers": printer_list
})
except Exception as e:
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
db_session.rollback()
db_session.close()
return jsonify({"error": "Interner Serverfehler"}), 500
# API-Routen für Statistiken
@app.route("/api/stats/users", methods=["GET"])
@login_required
def get_stats_users():
"""Gibt die Anzahl der Benutzer zurück."""
db_session = get_db_session()
try:
user_count = db_session.query(User).count()
db_session.close()
return jsonify({"value": user_count})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/uptime", methods=["GET"])
@login_required
def get_stats_uptime():
"""Gibt die Systemlaufzeit zurück."""
import os
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
uptime_days = int(uptime_seconds / 86400)
return jsonify({"value": f"{uptime_days} Tage"})
@app.route("/api/stats/active-jobs", methods=["GET"])
@login_required
def get_stats_active_jobs():
"""Gibt die Anzahl der aktiven Jobs zurück."""
db_session = get_db_session()
try:
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
db_session.close()
return jsonify({"value": active_jobs})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/available-printers", methods=["GET"])
@login_required
def get_stats_available_printers():
"""Gibt die Anzahl der verfügbaren Drucker zurück."""
db_session = get_db_session()
try:
available_printers = db_session.query(Printer).filter(Printer.active == True).count()
db_session.close()
return jsonify({"value": available_printers})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/success-rate", methods=["GET"])
@login_required
def get_stats_success_rate():
"""Gibt die Erfolgsrate der Druckaufträge zurück."""
db_session = get_db_session()
try:
total_jobs = db_session.query(Job).filter(Job.status == "finished").count()
if total_jobs == 0:
success_rate = 0
else:
success_jobs = db_session.query(Job).filter(
Job.status == "finished",
Job.actual_end_time != None
).count()
success_rate = int((success_jobs / total_jobs) * 100)
db_session.close()
return jsonify({"value": f"{success_rate}%"})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/stats/print-time", methods=["GET"])
@login_required
def get_stats_print_time():
"""Gibt die gesamte Druckzeit zurück."""
db_session = get_db_session()
try:
stats = db_session.query(Stats).first()
if stats and stats.total_print_time:
hours = stats.total_print_time // 3600
db_session.close()
return jsonify({"value": f"{hours}h"})
db_session.close()
return jsonify({"value": "0h"})
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/activity", methods=["GET"])
@login_required
def get_activity():
"""Gibt die letzten Aktivitäten zurück."""
db_session = get_db_session()
try:
recent_jobs = db_session.query(Job).order_by(Job.created_at.desc()).limit(5).all()
activities = [
{
"type": "job",
"id": job.id,
"title": job.name,
"status": job.status,
"timestamp": job.created_at.isoformat() if job.created_at else None
}
for job in recent_jobs
]
db_session.close()
return jsonify(activities)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
@app.route("/api/printers/status", methods=["GET"])
@login_required
def get_printers_status():
"""Gibt den Status aller Drucker zurück mit echtem Ping-Check und 7-Sekunden-Timeout."""
db_session = get_db_session()
try:
printers = db_session.query(Printer).all()
# Drucker-Daten für Status-Check vorbereiten
printer_data = []
for printer in printers:
printer_data.append({
'id': printer.id,
'name': printer.name,
'ip_address': printer.ip_address,
'location': printer.location
})
# Status aller Drucker parallel überprüfen mit 7-Sekunden-Timeout
printers_logger.info(f"Starte Status-Check für {len(printer_data)} Drucker mit 7-Sekunden-Timeout")
status_results = check_multiple_printers_status(printer_data, timeout=7)
# Ergebnisse zusammenstellen und Datenbank aktualisieren
status_data = []
for printer in printers:
if printer.id in status_results:
status, active = status_results[printer.id]
else:
# Fallback falls kein Ergebnis vorliegt
status, active = "offline", False
# Status in der Datenbank aktualisieren
printer.status = status
printer.active = active
status_data.append({
"id": printer.id,
"name": printer.name,
"status": status,
"active": active,
"ip_address": printer.ip_address,
"location": printer.location,
"last_checked": datetime.now().isoformat()
})
# Speichere die aktualisierten Status
db_session.commit()
db_session.close()
printers_logger.info(f"Status-Check abgeschlossen: {len([s for s in status_data if s['status'] == 'online'])} von {len(status_data)} Drucker online")
return jsonify(status_data)
except Exception as e:
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Abrufen des Drucker-Status: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/api/jobs/current", methods=["GET"])
@login_required
def get_current_job():
"""Gibt den aktuellen Job des Benutzers zurück."""
db_session = get_db_session()
try:
current_job = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status.in_(["scheduled", "running"])
).order_by(Job.start_at).first()
if current_job:
job_data = current_job.to_dict()
else:
job_data = None
db_session.close()
return jsonify(job_data)
except Exception as e:
db_session.close()
return jsonify({"error": str(e)}), 500
# Admin API Endpoints
@app.route("/api/users", methods=["GET"])
def get_users():
"""Returns a list of all users (admin only)"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
db_session = get_db_session()
try:
users = db_session.query(User).all()
users_list = []
for user in users:
users_list.append({
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"created_at": user.created_at.isoformat() if user.created_at else None
})
db_session.close()
return jsonify({"users": users_list})
except Exception as e:
db_session.close()
app_logger.error(f"Error fetching users: {str(e)}")
return jsonify({"error": "Failed to fetch users"}), 500
@app.route("/api/users", methods=["POST"])
@login_required
def create_user():
"""Create a new user (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um neue Benutzer anzulegen.", "error")
return redirect(url_for('admin_page', tab='users'))
db_session = get_db_session()
try:
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
email = request.form.get('email')
name = request.form.get('name')
password = request.form.get('password')
role = request.form.get('role', 'user')
if not email or not password:
db_session.close()
flash("E-Mail und Passwort sind Pflichtfelder.", "error")
return redirect(url_for('admin_page', tab='users'))
# Check if user with same email already exists
existing_user = db_session.query(User).filter(
User.email == email
).first()
if existing_user:
db_session.close()
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
return redirect(url_for('admin_page', tab='users'))
# Create new user
new_user = User(
email=email,
name=name if name else "",
username=email.split("@")[0], # Default username from email
role=role,
active=True,
created_at=datetime.now()
)
# Set password
new_user.set_password(password)
db_session.add(new_user)
db_session.commit()
user_id = new_user.id
db_session.close()
app_logger.info(f"New user created: {new_user.email} (ID: {user_id})")
flash(f"Benutzer {email} wurde erfolgreich angelegt.", "success")
return redirect(url_for('admin_page', tab='users'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error creating user: {str(e)}")
flash(f"Fehler beim Anlegen des Benutzers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='users'))
@app.route("/api/users/<int:user_id>", methods=["DELETE"])
@login_required
def delete_user(user_id):
"""Delete a user (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um Benutzer zu löschen.", "error")
return redirect(url_for('admin_page', tab='users'))
# Prevent admin from deleting themselves
if user_id == current_user.id:
flash("Sie können Ihren eigenen Account nicht löschen.", "error")
return redirect(url_for('admin_page', tab='users'))
db_session = get_db_session()
try:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
flash("Benutzer nicht gefunden.", "error")
return redirect(url_for('admin_page', tab='users'))
# Prevent deletion of admin users
if user.role == "admin":
db_session.close()
flash("Administratoren können nicht gelöscht werden.", "error")
return redirect(url_for('admin_page', tab='users'))
email = user.email # Save for later logging
db_session.delete(user)
db_session.commit()
db_session.close()
app_logger.info(f"User deleted: {email} (ID: {user_id})")
flash(f"Benutzer {email} wurde erfolgreich gelöscht.", "success")
return redirect(url_for('admin_page', tab='users'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error deleting user: {str(e)}")
flash(f"Fehler beim Löschen des Benutzers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='users'))
@app.route("/api/printers", methods=["POST"])
@login_required
def create_printer():
"""Create a new printer (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um neue Drucker anzulegen.", "error")
return redirect(url_for('admin_page', tab='printers'))
db_session = get_db_session()
try:
# Statt JSON-Daten die Formulardaten aus dem POST-Request holen
name = request.form.get('name')
model = request.form.get('model')
location = request.form.get('location')
mac_address = request.form.get('mac_address')
plug_ip = request.form.get('plug_ip')
plug_username = request.form.get('plug_username')
plug_password = request.form.get('plug_password')
# Check required fields
if not name or not mac_address:
db_session.close()
flash("Name und MAC-Adresse sind Pflichtfelder.", "error")
return redirect(url_for('admin_page', tab='printers'))
# Check if printer with same MAC already exists
existing_printer = db_session.query(Printer).filter(
Printer.mac_address == mac_address
).first()
if existing_printer:
db_session.close()
flash("Ein Drucker mit dieser MAC-Adresse existiert bereits.", "error")
return redirect(url_for('admin_page', tab='printers'))
# Create new printer
new_printer = Printer(
name=name,
model=model or "",
location=location or "",
mac_address=mac_address,
plug_ip=plug_ip or "",
plug_username=plug_username or "",
plug_password=plug_password or "",
active=True
)
db_session.add(new_printer)
db_session.commit()
printer_id = new_printer.id
db_session.close()
app_logger.info(f"New printer created: {new_printer.name} (ID: {printer_id})")
flash(f"Drucker {name} wurde erfolgreich angelegt.", "success")
return redirect(url_for('admin_page', tab='printers'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error creating printer: {str(e)}")
flash(f"Fehler beim Anlegen des Druckers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='printers'))
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
@login_required
def delete_printer(printer_id):
"""Delete a printer (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um Drucker zu löschen.", "error")
return redirect(url_for('admin_page', tab='printers'))
db_session = get_db_session()
try:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
flash("Drucker nicht gefunden.", "error")
return redirect(url_for('admin_page', tab='printers'))
printer_name = printer.name # Save for later logging
db_session.delete(printer)
db_session.commit()
db_session.close()
app_logger.info(f"Printer deleted: {printer_name} (ID: {printer_id})")
flash(f"Drucker {printer_name} wurde erfolgreich gelöscht.", "success")
return redirect(url_for('admin_page', tab='printers'))
except Exception as e:
db_session.rollback()
db_session.close()
app_logger.error(f"Error deleting printer: {str(e)}")
flash(f"Fehler beim Löschen des Druckers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='printers'))
@app.route("/api/stats", methods=["GET"])
@login_required
def get_stats():
"""Get overall system statistics"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
db_session = get_db_session()
try:
# Get basic stats
stats = db_session.query(Stats).first()
if not stats:
# Create initial stats if none exist
stats = Stats()
db_session.add(stats)
db_session.commit()
# Count users, printers, active jobs
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
result = {
"total_users": user_count,
"total_printers": printer_count,
"active_jobs": active_jobs,
"total_print_time_hours": round(stats.total_print_time / 3600, 1) if stats.total_print_time else 0,
"total_jobs_completed": stats.total_jobs_completed or 0,
"total_material_used": stats.total_material_used or 0,
"last_updated": stats.last_updated.isoformat() if stats.last_updated else None
}
db_session.close()
return jsonify(result)
except Exception as e:
db_session.close()
app_logger.error(f"Error fetching stats: {str(e)}")
return jsonify({"error": "Failed to fetch statistics"}), 500
@app.route("/api/scheduler/status", methods=["GET"])
@login_required
def get_scheduler_status():
"""Get the current status of the job scheduler"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
try:
is_running = scheduler.is_running()
tasks = []
# Add information about scheduler tasks
for task_id, task in scheduler.get_tasks().items():
tasks.append({
"id": task_id,
"interval": task.get("interval", 0),
"last_run": task.get("last_run"),
"enabled": task.get("enabled", False)
})
return jsonify({
"running": is_running,
"tasks": tasks,
"uptime": scheduler.get_uptime()
})
except Exception as e:
app_logger.error(f"Error fetching scheduler status: {str(e)}")
return jsonify({"error": "Failed to fetch scheduler status"}), 500
@app.route("/api/scheduler/start", methods=["POST"])
@login_required
def start_scheduler():
"""Start the job scheduler (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um den Scheduler zu starten.", "error")
return redirect(url_for('admin_page', tab='scheduler'))
try:
from utils.scheduler import start_scheduler as start_scheduler_func
result = start_scheduler_func()
if result:
app_logger.info(f"Scheduler started by admin user: {current_user.email}")
flash("Der Scheduler wurde erfolgreich gestartet.", "success")
else:
flash("Der Scheduler konnte nicht gestartet werden oder läuft bereits.", "warning")
return redirect(url_for('admin_page', tab='scheduler'))
except Exception as e:
app_logger.error(f"Error starting scheduler: {str(e)}")
flash(f"Fehler beim Starten des Schedulers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='scheduler'))
@app.route("/api/scheduler/stop", methods=["POST"])
@login_required
def stop_scheduler():
"""Stop the job scheduler (admin only)"""
if not current_user.is_admin:
flash("Sie haben keine Berechtigung, um den Scheduler zu stoppen.", "error")
return redirect(url_for('admin_page', tab='scheduler'))
try:
from utils.scheduler import stop_scheduler as stop_scheduler_func
result = stop_scheduler_func()
if result:
app_logger.info(f"Scheduler stopped by admin user: {current_user.email}")
flash("Der Scheduler wurde erfolgreich gestoppt.", "success")
else:
flash("Der Scheduler konnte nicht gestoppt werden oder läuft nicht.", "warning")
return redirect(url_for('admin_page', tab='scheduler'))
except Exception as e:
app_logger.error(f"Error stopping scheduler: {str(e)}")
flash(f"Fehler beim Stoppen des Schedulers: {str(e)}", "error")
return redirect(url_for('admin_page', tab='scheduler'))
@app.route("/api/logs", methods=["GET"])
@login_required
def get_logs():
"""Get system logs (admin only)"""
if not current_user.is_admin:
return jsonify({"error": "Unauthorized"}), 403
try:
# Get log type from query params
log_type = request.args.get("type", "app")
limit = int(request.args.get("limit", 100))
log_mapping = {
"app": "logs/app/app.log",
"auth": "logs/auth/auth.log",
"errors": "logs/errors/errors.log",
"jobs": "logs/jobs/jobs.log",
"printers": "logs/printers/printers.log",
"scheduler": "logs/scheduler/scheduler.log"
}
if log_type not in log_mapping:
return jsonify({"error": "Invalid log type"}), 400
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_mapping[log_type])
if not os.path.exists(log_path):
return jsonify({"logs": [], "success": True})
logs = []
with open(log_path, "r") as f:
for line in f.readlines()[-limit:]:
try:
# Parse log entry (format: [LEVEL] TIMESTAMP - MESSAGE)
parts = line.strip().split(" - ", 1)
if len(parts) == 2:
header, message = parts
level_timestamp = header.strip("[]").split("] [", 1)
if len(level_timestamp) == 2:
level, timestamp = level_timestamp
logs.append({
"level": level.strip(),
"timestamp": timestamp.strip(),
"message": message.strip(),
"source": log_type
})
except Exception:
# If parsing fails, add the raw line
logs.append({
"level": "INFO",
"timestamp": datetime.now().isoformat(),
"message": line.strip(),
"source": f"myp.{log_type}"
})
# Sort logs by timestamp in descending order
logs.sort(key=lambda x: x["timestamp"], reverse=True)
return jsonify({"logs": logs, "success": True})
except Exception as e:
app_logger.error(f"Error fetching logs: {str(e)}")
return jsonify({"error": "Failed to fetch logs"}), 500
@app.route("/api/activity/recent", methods=["GET"])
@login_required
def get_recent_activity():
"""Get recent system activity"""
try:
# Create mock activity data (to be replaced with real data in future)
activities = [
{
"description": "Neuer Druckauftrag erstellt: 'Motor_Halterung_v2'",
"timestamp": (datetime.now() - timedelta(minutes=15)).isoformat(),
"user": "admin@example.com",
"type": "job_created"
},
{
"description": "Drucker 'Prusa i3 MK3S' wurde neu konfiguriert",
"timestamp": (datetime.now() - timedelta(hours=2)).isoformat(),
"user": "admin@example.com",
"type": "printer_updated"
},
{
"description": "Druckauftrag 'Getriebe_Prototyp' abgeschlossen",
"timestamp": (datetime.now() - timedelta(hours=5)).isoformat(),
"user": "user@example.com",
"type": "job_completed"
},
{
"description": "Neuer Benutzer registriert: 'user@example.com'",
"timestamp": (datetime.now() - timedelta(days=1)).isoformat(),
"user": "admin@example.com",
"type": "user_created"
},
{
"description": "Systemwartung durchgeführt",
"timestamp": (datetime.now() - timedelta(days=2)).isoformat(),
"user": "admin@example.com",
"type": "system_maintenance"
}
]
# Get limit from query params
limit = int(request.args.get("limit", 5))
activities = activities[:limit]
return jsonify({"activities": activities})
except Exception as e:
app_logger.error(f"Error fetching recent activity: {str(e)}")
return jsonify({"error": "Failed to fetch recent activity"}), 500
# Service Worker Route
@app.route('/sw.js')
def service_worker():
"""Serve the service worker script with proper headers"""
response = app.send_static_file('js/sw.js')
# Wichtig: Korrekte MIME-Type setzen
response.headers['Content-Type'] = 'application/javascript'
# Wichtig: Cache-Control Header setzen, um häufige Updates zu ermöglichen
response.headers['Cache-Control'] = 'no-cache'
# Service-Worker-Allowed Header setzen, um Scope-Probleme zu beheben
response.headers['Service-Worker-Allowed'] = '/'
return response
# Fehlerbehandlung
@app.errorhandler(404)
def page_not_found(e):
return render_template("404.html"), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template("500.html"), 500
# CLI-Befehle für Tailwind CSS
@app.cli.group()
def tailwind():
"""Tailwind CSS Kommandos."""
pass
@tailwind.command("build")
def tailwind_build():
"""Tailwind CSS für die Produktion kompilieren."""
print("Tailwind CSS wird kompiliert...")
try:
subprocess.run(["npx", "tailwindcss", "-i", "./static/css/input.css",
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--minify"],
check=True)
print("Tailwind CSS erfolgreich kompiliert.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Kompilieren von Tailwind CSS: {e}")
raise
@tailwind.command("watch")
def tailwind_watch():
"""Tailwind CSS im Watch-Modus starten."""
print("Tailwind CSS Watch-Modus wird gestartet...")
try:
subprocess.Popen(["npx", "tailwindcss", "-i", "./static/css/input.css",
"-o", "./static/css/tailwind-dark-consolidated.min.css", "--watch"])
print("Tailwind CSS Watch-Modus gestartet. CSS wird bei Änderungen automatisch aktualisiert.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Starten des Tailwind CSS Watch-Modus: {e}")
raise
# Auto-Kompilierung beim Serverstart im Debug-Modus
def compile_tailwind_if_debug():
"""Kompiliert Tailwind CSS im Debug-Modus, falls notwendig."""
if FLASK_DEBUG:
try:
app_logger.info("Kompiliere Tailwind CSS...")
# Prüfen, ob npx und Node.js verfügbar sind
import platform
import shutil
import subprocess
# Auf Windows nur fortfahren, wenn die CSS-Datei bereits existiert
# oder npx verfügbar ist
css_file_exists = os.path.exists("static/css/tailwind.min.css")
# Prüfen, ob npx verfügbar ist
npx_available = shutil.which("npx") is not None
if platform.system() == "Windows" and not npx_available and not css_file_exists:
app_logger.warning("npx nicht gefunden und keine CSS-Datei vorhanden. Tailwind CSS wird nicht kompiliert.")
return
# Tailwind CSS kompilieren
if npx_available:
subprocess.run([
"npx", "tailwindcss", "-i", "static/css/input.css",
"-o", "static/css/tailwind.min.css", "--minify"
], check=True)
app_logger.info("Tailwind CSS erfolgreich kompiliert.")
elif css_file_exists:
app_logger.info("Verwende existierende Tailwind CSS-Datei.")
else:
app_logger.warning("Tailwind konnte nicht kompiliert werden und keine CSS-Datei vorhanden.")
except subprocess.CalledProcessError as e:
app_logger.warning(f"Tailwind konnte nicht kompiliert werden. Möglicherweise ist npx/Node.js nicht installiert. Fehler: {e}")
except Exception as e:
app_logger.error(f"Fehler beim Kompilieren von Tailwind CSS: {str(e)}")
# Tailwind CSS kompilieren, wenn im Debug-Modus
if FLASK_DEBUG:
compile_tailwind_if_debug()
# Initialisierung der Datenbank beim Start
def init_app():
"""Initialisiert die App-Komponenten und startet den Scheduler."""
# Datenbank initialisieren
try:
init_database()
create_initial_admin()
except Exception as e:
app_logger.error(f"Fehler bei der Datenbank-Initialisierung: {str(e)}")
# Jinja2-Helfer registrieren
register_template_helpers(app)
# Tailwind im Debug-Modus kompilieren
compile_tailwind_if_debug()
# Scheduler starten, wenn aktiviert
if SCHEDULER_ENABLED:
try:
# Scheduler-Task für Druckauftrags-Prüfung registrieren
scheduler.register_task(
"check_jobs",
check_jobs,
interval=SCHEDULER_INTERVAL
)
# Scheduler starten
scheduler.start()
app_logger.info(f"Scheduler gestartet mit Intervall {SCHEDULER_INTERVAL} Sekunden.")
except Exception as e:
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
# SSL-Kontext protokollieren
ssl_context = get_ssl_context()
if ssl_context:
app_logger.info(f"SSL aktiviert mit Zertifikat {ssl_context[0]}")
else:
app_logger.warning("SSL ist deaktiviert. Die Verbindung ist unverschlüsselt!")
# Scheduler-Funktion zur Überprüfung der Druckaufträge
def check_jobs():
"""
Überprüft alle aktiven Druckaufträge und führt entsprechende Aktionen aus.
Diese Funktion wird vom Scheduler regelmäßig aufgerufen.
"""
app_logger.info("Überprüfe Druckaufträge...")
try:
db_session = get_db_session()
# Aktive Jobs abrufen
active_jobs = db_session.query(Job).filter(
Job.status.in_(["scheduled", "running"])
).all()
now = datetime.now()
for job in active_jobs:
# Prüfen, ob der Job gestartet werden soll
if job.status == "scheduled" and job.start_at <= now:
app_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}")
job.status = "running"
# Steckdose einschalten (implementieren Sie diese Funktion)
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, True)
# Prüfen, ob der Job beendet werden soll
elif job.status == "running" and job.end_at <= now:
app_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}")
job.status = "finished"
job.actual_end_time = now
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, False)
db_session.commit()
db_session.close()
except Exception as e:
app_logger.error(f"Fehler bei der Überprüfung von Druckaufträgen: {str(e)}")
if 'db_session' in locals():
db_session.close()
# App starten
if __name__ == "__main__":
import argparse
import threading
import ssl
import socket
import logging
# Kommandozeilenargumente parsen
parser = argparse.ArgumentParser(description='MYP Platform - 3D-Drucker Reservierungssystem')
parser.add_argument('--port', type=int, help='Port für den Server (überschreibt die Konfiguration)')
parser.add_argument('--no-ssl', action='store_true', help='Deaktiviert SSL/HTTPS')
parser.add_argument('--dual-protocol', action='store_true', help='Startet sowohl HTTP als auch HTTPS Server')
args = parser.parse_args()
# Initialisierung
init_app()
# Port aus Kommandozeilenargument verwenden, falls angegeben
port = args.port if args.port else FLASK_PORT
# SSL-Kontext abrufen
ssl_context = None
if SSL_ENABLED and not args.no_ssl:
try:
if SSL_CERT_PATH and SSL_KEY_PATH:
ssl_context = (SSL_CERT_PATH, SSL_KEY_PATH)
logging.info(f"SSL aktiviert mit Zertifikat: {SSL_CERT_PATH}")
else:
ssl_context = 'adhoc'
logging.info("SSL aktiviert mit selbstsigniertem Ad-hoc-Zertifikat")
except Exception as e:
logging.error(f"Fehler beim Laden des SSL-Kontexts: {e}")
ssl_context = None
# Dual-Protokoll-Modus: HTTP und HTTPS gleichzeitig
if args.dual_protocol:
# Funktion zum Starten des HTTP-Servers
def start_http_server():
try:
logging.info(f"Starte HTTP-Server auf Port 80...")
# Kopie der App erstellen
from werkzeug.serving import run_simple
run_simple('0.0.0.0', 80, app, threaded=True)
except socket.error as e:
logging.error(f"Konnte HTTP-Server nicht starten: {e}")
# Funktion zum Starten des HTTPS-Servers
def start_https_server():
try:
if ssl_context:
logging.info(f"Starte HTTPS-Server auf Port {port}...")
app.run(host='0.0.0.0', port=port, ssl_context=ssl_context, threaded=True)
else:
logging.warning("HTTPS deaktiviert aufgrund fehlender Zertifikate")
app.run(host='0.0.0.0', port=port, threaded=True)
except socket.error as e:
logging.error(f"Konnte HTTPS-Server nicht starten: {e}")
# Beide Server in separaten Threads starten
http_thread = threading.Thread(target=start_http_server)
https_thread = threading.Thread(target=start_https_server)
http_thread.daemon = True
https_thread.daemon = True
http_thread.start()
https_thread.start()
# Warten, bis beide Threads beendet sind (was sie normalerweise nicht sein sollten)
http_thread.join()
https_thread.join()
else:
# Normaler Modus - entweder HTTP oder HTTPS
if ssl_context:
logging.info(f"Starte HTTPS-Server auf Port {port}...")
app.run(host='0.0.0.0', port=port, ssl_context=ssl_context, threaded=True)
else:
logging.info(f"Starte HTTP-Server auf Port {port}...")
app.run(host='0.0.0.0', port=port, threaded=True)
# Content Security Policy anpassen
@app.after_request
def add_security_headers(response):
"""Fügt Sicherheitsheader zu allen Antworten hinzu"""
# Content Security Policy definieren
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' 'unsafe-eval'",
"script-src-elem 'self' 'unsafe-inline'",
"style-src 'self' 'unsafe-inline'",
"img-src 'self' data:",
"font-src 'self'",
"connect-src 'self'",
"worker-src 'self'", # Erlaubt Service Worker
"manifest-src 'self'"
]
# Setze CSP Header
response.headers['Content-Security-Policy'] = "; ".join(csp_directives)
# Weitere Sicherheitsheader
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
@app.route("/privacy")
def privacy_page():
"""Zeigt die Datenschutzseite an."""
return render_template("privacy.html")
@app.route("/terms")
def terms_page():
"""Zeigt die Nutzungsbedingungen an."""
return render_template("terms.html")
@app.route("/api/stats/export", methods=["GET"])
@login_required
def export_stats():
"""Exportiert Statistiken als JSON-Datei."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Statistiken exportieren"}), 403
try:
db_session = get_db_session()
# Grundlegende Statistiken sammeln
stats = db_session.query(Stats).first()
if not stats:
stats = Stats()
# Benutzerzahlen
user_count = db_session.query(User).count()
active_user_count = db_session.query(User).filter(User.active == True).count()
# Druckerzahlen
printer_count = db_session.query(Printer).count()
active_printer_count = db_session.query(Printer).filter(Printer.active == True).count()
# Jobstatistiken
total_jobs = db_session.query(Job).count()
completed_jobs = db_session.query(Job).filter(Job.status == "finished").count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
# Berechne durchschnittliche Druckzeit
jobs_with_duration = db_session.query(Job).filter(
Job.start_at != None,
Job.actual_end_time != None
).all()
total_print_time = 0
avg_print_time = 0
if jobs_with_duration:
for job in jobs_with_duration:
duration = (job.actual_end_time - job.start_at).total_seconds()
total_print_time += duration
avg_print_time = total_print_time / len(jobs_with_duration) if len(jobs_with_duration) > 0 else 0
# Füge zusätzliche Statistiken für jede Druckerart hinzu
printer_stats = []
printers = db_session.query(Printer).all()
for printer in printers:
printer_jobs = db_session.query(Job).filter(Job.printer_id == printer.id).count()
printer_success_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status == "finished"
).count()
success_rate = (printer_success_jobs / printer_jobs * 100) if printer_jobs > 0 else 0
printer_stats.append({
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"total_jobs": printer_jobs,
"success_rate": round(success_rate, 2),
"active": printer.active
})
# Export-Daten zusammenstellen
export_data = {
"generated_at": datetime.now().isoformat(),
"users": {
"total": user_count,
"active": active_user_count
},
"printers": {
"total": printer_count,
"active": active_printer_count,
"details": printer_stats
},
"jobs": {
"total": total_jobs,
"completed": completed_jobs,
"active": active_jobs,
"failed": failed_jobs,
"success_rate": round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0
},
"print_time": {
"total_seconds": total_print_time,
"total_hours": round(total_print_time / 3600, 2),
"average_seconds": avg_print_time,
"average_minutes": round(avg_print_time / 60, 2)
},
"system": {
"version": "3.0.0",
"uptime_days": get_system_uptime_days()
}
}
db_session.close()
# Als Datei zum Download anbieten
from flask import make_response
import json
response = make_response(json.dumps(export_data, indent=4))
response.headers["Content-Disposition"] = "attachment; filename=stats_export.json"
response.headers["Content-Type"] = "application/json"
return response
except Exception as e:
app_logger.error(f"Fehler beim Exportieren der Statistiken: {str(e)}")
return jsonify({"error": f"Fehler beim Exportieren: {str(e)}"}), 500
def get_system_uptime_days():
"""Gibt die Systemlaufzeit in Tagen zurück."""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
return round(uptime_seconds / 86400, 2) # Umrechnung in Tage
except Exception:
return 0
@app.route("/api/printers/add", methods=["POST"])
@login_required
def add_printer():
"""Fügt einen neuen Drucker hinzu."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Drucker hinzufügen"}), 403
try:
data = request.json
# Pflichtfelder prüfen
required_fields = ["name", "mac_address", "plug_ip", "plug_username", "plug_password"]
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Das Feld '{field}' ist ein Pflichtfeld"}), 400
# Druckerdaten extrahieren
name = data["name"]
model = data.get("model", "")
location = data.get("location", "")
mac_address = data["mac_address"]
plug_ip = data["plug_ip"]
plug_username = data["plug_username"]
plug_password = data["plug_password"]
db_session = get_db_session()
# Prüfen, ob ein Drucker mit dieser MAC-Adresse bereits existiert
existing_printer = db_session.query(Printer).filter(Printer.mac_address == mac_address).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit dieser MAC-Adresse existiert bereits"}), 400
# Neuen Drucker erstellen
new_printer = Printer(
name=name,
model=model,
location=location,
mac_address=mac_address,
plug_ip=plug_ip,
plug_username=plug_username,
plug_password=plug_password,
status="offline",
active=True,
created_at=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
# Drucker-ID für die Antwort speichern
printer_id = new_printer.id
# Drucker-Objekt für die Antwort serialisieren
printer_dict = new_printer.to_dict()
db_session.close()
printers_logger.info(f"Neuer Drucker {name} (ID: {printer_id}) wurde von {current_user.username} hinzugefügt")
return jsonify({"success": True, "message": "Drucker erfolgreich hinzugefügt", "printer": printer_dict}), 201
except Exception as e:
printers_logger.error(f"Fehler beim Hinzufügen eines Druckers: {str(e)}")
return jsonify({"error": f"Fehler beim Hinzufügen des Druckers: {str(e)}"}), 500
@app.route("/my/jobs")
@login_required
def my_jobs():
"""Zeigt die persönlichen Jobs des angemeldeten Benutzers an."""
# Weiterleitung zur Jobs-Seite mit Filter für den aktuellen Benutzer
return redirect(url_for("jobs_page", user_filter=current_user.id))
@app.route("/api/user/export", methods=["GET"])
@login_required
def api_user_export_redirect():
"""Leitet den alten API-Pfad zum neuen Benutzer-Export weiter."""
return redirect(url_for("user.export_user_data"))
@app.route("/api/user/profile", methods=["PUT"])
@login_required
def api_user_profile_update_redirect():
"""Leitet den alten API-Pfad zum neuen Benutzer-Profil-Update weiter."""
return redirect(url_for("user.update_profile_api"))
@app.route("/user/update-settings", methods=["POST"])
@login_required
def user_update_settings_redirect():
"""Weiterleitung zur Blueprint-Route für Settings-Updates."""
return redirect(url_for("user.api_update_settings"))
# SSL-Verwaltungsrouten
@app.route("/api/ssl/info", methods=["GET"])
@login_required
def get_ssl_info():
"""Gibt Informationen über das aktuelle SSL-Zertifikat zurück."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Informationen abrufen"}), 403
try:
from utils.ssl_manager import ssl_manager
cert_info = ssl_manager.get_certificate_info()
if not cert_info:
return jsonify({
"exists": False,
"message": "Kein SSL-Zertifikat gefunden"
})
return jsonify({
"exists": True,
"certificate": cert_info,
"paths": {
"cert": ssl_manager.cert_path,
"key": ssl_manager.key_path
}
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der SSL-Informationen: {e}")
return jsonify({"error": f"Fehler beim Abrufen der SSL-Informationen: {str(e)}"}), 500
@app.route("/api/ssl/generate", methods=["POST"])
@login_required
def generate_ssl_certificate():
"""Generiert ein neues SSL-Zertifikat."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate generieren"}), 403
try:
from utils.ssl_manager import ssl_manager
# Parameter aus Request extrahieren
data = request.json or {}
key_size = data.get("key_size", 4096)
validity_days = data.get("validity_days", 365)
# Zertifikat generieren
success = ssl_manager.generate_mercedes_certificate(key_size, validity_days)
if success:
cert_info = ssl_manager.get_certificate_info()
app_logger.info(f"SSL-Zertifikat von {current_user.username} generiert")
return jsonify({
"success": True,
"message": "SSL-Zertifikat erfolgreich generiert",
"certificate": cert_info
})
else:
return jsonify({
"success": False,
"error": "Fehler beim Generieren des SSL-Zertifikats"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Generieren des SSL-Zertifikats: {e}")
return jsonify({"error": f"Fehler beim Generieren: {str(e)}"}), 500
@app.route("/api/ssl/install", methods=["POST"])
@login_required
def install_ssl_certificate():
"""Installiert das SSL-Zertifikat im System."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate installieren"}), 403
try:
from utils.ssl_manager import ssl_manager
success = ssl_manager.install_system_certificate()
if success:
app_logger.info(f"SSL-Zertifikat von {current_user.username} im System installiert")
return jsonify({
"success": True,
"message": "SSL-Zertifikat erfolgreich im System installiert"
})
else:
return jsonify({
"success": False,
"error": "Fehler bei der Installation des SSL-Zertifikats im System"
}), 500
except Exception as e:
app_logger.error(f"Fehler bei der SSL-Installation: {e}")
return jsonify({"error": f"Fehler bei der Installation: {str(e)}"}), 500
@app.route("/api/ssl/copy-raspberry", methods=["POST"])
@login_required
def copy_ssl_to_raspberry():
"""Kopiert das SSL-Zertifikat auf den Raspberry Pi."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate kopieren"}), 403
try:
from utils.ssl_manager import ssl_manager
# Parameter aus Request extrahieren
data = request.json or {}
host = data.get("host", "raspberrypi")
user = data.get("user", "user")
dest = data.get("dest", "/home/user/Projektarbeit-MYP/backend/app/certs")
success = ssl_manager.copy_to_raspberry(host, user, dest)
if success:
app_logger.info(f"SSL-Zertifikat von {current_user.username} auf Raspberry Pi kopiert")
return jsonify({
"success": True,
"message": f"SSL-Zertifikat erfolgreich auf {host} kopiert"
})
else:
return jsonify({
"success": False,
"error": "Fehler beim Kopieren des SSL-Zertifikats auf den Raspberry Pi"
}), 500
except Exception as e:
app_logger.error(f"Fehler beim Kopieren auf Raspberry Pi: {e}")
return jsonify({"error": f"Fehler beim Kopieren: {str(e)}"}), 500
@app.route("/api/ssl/validate", methods=["GET"])
@login_required
def validate_ssl_certificate():
"""Validiert das aktuelle SSL-Zertifikat."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können SSL-Zertifikate validieren"}), 403
try:
from utils.ssl_manager import ssl_manager
is_valid = ssl_manager.is_certificate_valid()
cert_info = ssl_manager.get_certificate_info()
return jsonify({
"valid": is_valid,
"certificate": cert_info,
"message": "Zertifikat ist gültig" if is_valid else "Zertifikat ist ungültig oder läuft bald ab"
})
except Exception as e:
app_logger.error(f"Fehler bei der SSL-Validierung: {e}")
return jsonify({"error": f"Fehler bei der Validierung: {str(e)}"}), 500
# Neue Admin-System-Management-Routen
@app.route("/api/admin/cache/clear", methods=["POST"])
@login_required
def clear_cache():
"""Leert den System-Cache."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
import shutil
import tempfile
# Flask-Cache leeren (falls vorhanden)
cache_dir = os.path.join(tempfile.gettempdir(), 'flask_cache')
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Python __pycache__ leeren
for root, dirs, files in os.walk('.'):
for dir_name in dirs:
if dir_name == '__pycache__':
pycache_path = os.path.join(root, dir_name)
shutil.rmtree(pycache_path)
app_logger.info(f"Cache wurde von Admin {current_user.username} geleert")
return jsonify({"success": True, "message": "Cache erfolgreich geleert"})
except Exception as e:
app_logger.error(f"Fehler beim Leeren des Cache: {str(e)}")
return jsonify({"error": f"Fehler beim Leeren des Cache: {str(e)}"}), 500
@app.route("/api/admin/database/optimize", methods=["POST"])
@login_required
def optimize_database():
"""Optimiert die Datenbank."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
db_session = get_db_session()
# VACUUM und ANALYZE für SQLite
db_session.execute(sqlalchemy.text("VACUUM"))
db_session.execute(sqlalchemy.text("ANALYZE"))
db_session.commit()
# Alte abgeschlossene Jobs löschen (älter als 30 Tage)
thirty_days_ago = datetime.now() - timedelta(days=30)
old_jobs = db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"]),
Job.created_at < thirty_days_ago
).count()
db_session.query(Job).filter(
Job.status.in_(["completed", "failed", "cancelled"]),
Job.created_at < thirty_days_ago
).delete()
db_session.commit()
db_session.close()
app_logger.info(f"Datenbank wurde von Admin {current_user.username} optimiert. {old_jobs} alte Jobs entfernt.")
return jsonify({
"success": True,
"message": f"Datenbank optimiert. {old_jobs} alte Jobs entfernt."
})
except Exception as e:
app_logger.error(f"Fehler bei der Datenbankoptimierung: {str(e)}")
return jsonify({"error": f"Fehler bei der Datenbankoptimierung: {str(e)}"}), 500
@app.route("/api/admin/backup/create", methods=["POST"])
@login_required
def create_backup():
"""Erstellt ein System-Backup."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
import shutil
from datetime import datetime
backup_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'backups')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}"
backup_path = os.path.join(backup_dir, backup_name)
# Datenbank-Backup
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
if os.path.exists(db_path):
shutil.copy2(db_path, os.path.join(backup_path, 'database.db'))
# Konfigurationsdateien
config_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config')
if os.path.exists(config_dir):
shutil.copytree(config_dir, os.path.join(backup_path, 'config'))
# Uploads-Verzeichnis
uploads_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads')
if os.path.exists(uploads_dir):
shutil.copytree(uploads_dir, os.path.join(backup_path, 'uploads'))
# Backup komprimieren
shutil.make_archive(backup_path, 'zip', backup_path)
shutil.rmtree(backup_path) # Temporäres Verzeichnis löschen
app_logger.info(f"Backup wurde von Admin {current_user.username} erstellt: {backup_name}.zip")
return jsonify({
"success": True,
"message": f"Backup erfolgreich erstellt: {backup_name}.zip"
})
except Exception as e:
app_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
return jsonify({"error": f"Fehler beim Erstellen des Backups: {str(e)}"}), 500
@app.route("/api/admin/printers/update", methods=["POST"])
@login_required
def update_printers():
"""Aktualisiert alle Drucker-Verbindungen."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
db_session = get_db_session()
printers = db_session.query(Printer).all()
updated_count = 0
error_count = 0
for printer in printers:
try:
# Drucker-Status prüfen
import requests
import socket
# Ping-Test
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((printer.ip_address, 80))
sock.close()
if result == 0:
printer.status = "online"
printer.last_seen = datetime.now()
updated_count += 1
else:
printer.status = "offline"
error_count += 1
except Exception as e:
printer.status = "error"
error_count += 1
printers_logger.error(f"Fehler beim Aktualisieren von Drucker {printer.name}: {str(e)}")
db_session.commit()
db_session.close()
app_logger.info(f"Drucker wurden von Admin {current_user.username} aktualisiert. {updated_count} online, {error_count} Fehler.")
return jsonify({
"success": True,
"message": f"Drucker aktualisiert: {updated_count} online, {error_count} offline/Fehler"
})
except Exception as e:
app_logger.error(f"Fehler beim Aktualisieren der Drucker: {str(e)}")
return jsonify({"error": f"Fehler beim Aktualisieren der Drucker: {str(e)}"}), 500
@app.route("/api/admin/system/restart", methods=["POST"])
@login_required
def restart_system():
"""Startet das System neu."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
app_logger.warning(f"System-Neustart wurde von Admin {current_user.username} initiiert")
# Graceful shutdown
def shutdown_server():
import time
time.sleep(2) # Kurz warten, damit die Response gesendet wird
os._exit(0)
# Shutdown in separatem Thread
import threading
shutdown_thread = threading.Thread(target=shutdown_server)
shutdown_thread.start()
return jsonify({
"success": True,
"message": "System wird neugestartet..."
})
except Exception as e:
app_logger.error(f"Fehler beim Neustart des Systems: {str(e)}")
return jsonify({"error": f"Fehler beim Neustart des Systems: {str(e)}"}), 500
@app.route("/api/admin/system/status", methods=["GET"])
@login_required
def get_system_status():
"""Gibt den aktuellen Systemstatus zurück."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
import psutil
import sqlite3
# CPU und Memory
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds // 86400)
uptime_hours = int((uptime_seconds % 86400) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
# Datenbank-Status
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
db_size = 0
db_connections = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Scheduler-Status
scheduler_running = False
try:
from utils.job_scheduler import scheduler
scheduler_running = scheduler.running
except:
pass
# Nächster Job
db_session = get_db_session()
next_job = db_session.query(Job).filter(
Job.status == "scheduled"
).order_by(Job.created_at.asc()).first()
next_job_time = "Keine geplanten Jobs"
if next_job:
next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M")
db_session.close()
return jsonify({
"cpu_usage": round(cpu_percent, 1),
"memory_usage": round(memory.percent, 1),
"disk_usage": round((disk.used / disk.total) * 100, 1),
"uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m",
"db_size": f"{db_size:.1f} MB",
"db_connections": db_connections,
"scheduler_running": scheduler_running,
"next_job": next_job_time
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Systemstatus: {str(e)}")
return jsonify({"error": f"Fehler beim Abrufen des Systemstatus: {str(e)}"}), 500
@app.route("/api/admin/database/status", methods=["GET"])
@login_required
def get_database_status():
"""Gibt den Datenbankstatus zurück."""
if not current_user.is_admin:
return jsonify({"error": "Keine Berechtigung"}), 403
try:
db_session = get_db_session()
# Verbindungstest
db_session.execute(sqlalchemy.text("SELECT 1"))
# Datenbankgröße
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance', 'database.db')
db_size = 0
if os.path.exists(db_path):
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
# Tabellenstatistiken
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
job_count = db_session.query(Job).count()
db_session.close()
return jsonify({
"connected": True,
"size": f"{db_size:.1f} MB",
"tables": {
"users": user_count,
"printers": printer_count,
"jobs": job_count
}
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen des Datenbankstatus: {str(e)}")
return jsonify({
"connected": False,
"error": str(e)
}), 500