Till Tomczak 62e131c02f """
feat: Update frontend and backend configurations for development environment

- Downgrade PyP100 version in requirements.txt for compatibility.
- Add new frontend routes for index, login, dashboard, printers, jobs, and profile pages.
- Modify docker-compose files for development setup, including environment variables and service names.
- Update Caddyfile for local development with Raspberry Pi backend.
- Adjust health check route to use updated backend URL.
- Enhance setup-backend-url.sh for development environment configuration.
"""
2025-05-24 18:58:17 +02:00

1076 lines
34 KiB
Python

import os
import threading
import time
import json
import secrets
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Union
from functools import wraps
from flask import Flask, request, jsonify, session, render_template, redirect, url_for
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
import sqlalchemy.exc
from PyP100 import PyP110
from config.settings import (
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
SCHEDULER_INTERVAL, SCHEDULER_ENABLED
)
from utils.logging_config import setup_logging, get_logger, log_startup_info
from models import User, Printer, Job, Stats, init_db, get_db_session, create_initial_admin
from utils.job_scheduler import scheduler
# Logging initialisieren
setup_logging()
log_startup_info()
# Logger für verschiedene Komponenten
app_logger = get_logger("app")
auth_logger = get_logger("auth")
jobs_logger = get_logger("jobs")
printers_logger = get_logger("printers")
# Flask-App initialisieren
app = Flask(__name__)
app.secret_key = SECRET_KEY
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
# Flask-Login initialisieren
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "/login"
# Datenbank initialisieren
init_db()
# Flask-Login User-Loader
@login_manager.user_loader
def load_user(user_id: str) -> Optional[UserMixin]:
db_session = get_db_session()
user = db_session.query(User).filter(User.id == int(user_id)).first()
db_session.close()
if not user:
return None
# UserMixin-Objekt erstellen
user_mixin = UserMixin()
user_mixin.id = str(user.id)
user_mixin.is_admin = user.is_admin()
user_mixin.email = user.email
user_mixin.user_obj = user
return user_mixin
# Dekorator für Admin-Zugriffskontrolle
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
auth_logger.warning(f"Unbefugter Admin-Zugriff versucht von User {getattr(current_user, 'id', 'anonym')}")
return jsonify({"error": "Nur Administratoren haben Zugriff auf diese Ressource"}), 403
return f(*args, **kwargs)
return decorated_function
# Dekorator für Owner-Zugriffskontrolle (nur Job-Eigentümer oder Admin)
def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
db_session.close()
if not job:
return jsonify({"error": "Job nicht gefunden"}), 404
if not current_user.is_authenticated:
auth_logger.warning(f"Nicht authentifizierter Zugriff auf Job {job_id}")
return jsonify({"error": "Nicht authentifiziert"}), 401
if not current_user.is_admin and int(current_user.id) != job.user_id:
auth_logger.warning(f"Unbefugter Zugriff auf Job {job_id} von User {current_user.id}")
return jsonify({"error": "Keine Berechtigung für diesen Job"}), 403
return f(job_id, *args, **kwargs)
return decorated_function
# Smart Plug Steuerung
def set_plug_state(mac_address: str, state: bool) -> bool:
"""
Steuert einen TP-Link Tapo P110 Smart Plug.
Args:
mac_address: MAC-Adresse des Plugs zur Identifikation
state: True für Ein, False für Aus
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.mac_address == mac_address).first()
db_session.close()
if not printer:
printers_logger.error(f"Drucker mit MAC {mac_address} nicht gefunden")
return False
p110 = PyP110.P110(printer.plug_ip, TAPO_USERNAME, TAPO_PASSWORD)
p110.handshake()
p110.login()
if state:
p110.turnOn()
printers_logger.info(f"Plug {mac_address} eingeschaltet")
else:
p110.turnOff()
printers_logger.info(f"Plug {mac_address} ausgeschaltet")
return True
except Exception as e:
printers_logger.error(f"Fehler beim Schalten des Plugs {mac_address}: {str(e)}")
return False
# Job-Monitor-Funktion für den Scheduler
def job_monitor() -> None:
"""
Überwacht aktive Jobs und schaltet Plugs ein/aus basierend auf den Start- und Endzeiten.
"""
jobs_logger.info("Job-Monitor-Check läuft...")
now = datetime.now()
db_session = get_db_session()
try:
# Jobs finden, die jetzt starten sollten
jobs_to_start = db_session.query(Job).filter(
Job.status == "scheduled",
Job.start_time <= now,
Job.end_time > now
).all()
for job in jobs_to_start:
jobs_logger.info(f"Job {job.id} ({job.title}) wird gestartet")
# Drucker holen
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
if printer:
# Plug einschalten
success = set_plug_state(printer.mac_address, True)
if success:
job.status = "active"
jobs_logger.info(f"Job {job.id} aktiviert, Plug eingeschaltet")
else:
jobs_logger.error(f"Konnte Plug für Job {job.id} nicht einschalten")
# Jobs finden, die jetzt enden sollten
jobs_to_end = db_session.query(Job).filter(
Job.status == "active",
Job.end_time <= now
).all()
for job in jobs_to_end:
jobs_logger.info(f"Job {job.id} ({job.title}) wird beendet")
# Drucker holen
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
if printer:
# Plug ausschalten
success = set_plug_state(printer.mac_address, False)
if success:
job.status = "completed"
job.actual_end_time = now
# Statistik aktualisieren
stats = db_session.query(Stats).first()
if stats:
# Druckzeit in Sekunden berechnen
print_time_seconds = int((now - job.start_time).total_seconds())
stats.total_print_time += print_time_seconds
stats.total_jobs_completed += 1
if job.material_used:
stats.total_material_used += job.material_used
stats.last_updated = now
jobs_logger.info(f"Job {job.id} beendet, Plug ausgeschaltet")
else:
jobs_logger.error(f"Konnte Plug für Job {job.id} nicht ausschalten")
db_session.commit()
except Exception as e:
jobs_logger.error(f"Fehler im Job-Monitor: {str(e)}")
db_session.rollback()
finally:
db_session.close()
# Authentifizierungs-Routen
@app.route("/auth/register", methods=["POST"])
def register():
data = request.json
if not data or not all(k in data for k in ["email", "password", "name"]):
auth_logger.warning("Registrierung mit unvollständigen Daten versucht")
return jsonify({"error": "Unvollständige Daten"}), 400
db_session = get_db_session()
# Prüfen, ob der erste Benutzer angelegt wird (wird automatisch Admin)
is_first_user = db_session.query(User).count() == 0
try:
new_user = User(
email=data["email"],
name=data["name"],
role="admin" if is_first_user else "user"
)
new_user.set_password(data["password"])
db_session.add(new_user)
db_session.commit()
# Falls erster Benutzer, Stats anlegen
if is_first_user:
stats = Stats()
db_session.add(stats)
db_session.commit()
auth_logger.info(f"Neuer Benutzer registriert: {data['email']} (Admin: {is_first_user})")
result = {"success": True, "user_id": new_user.id, "is_admin": is_first_user}
except sqlalchemy.exc.IntegrityError:
db_session.rollback()
auth_logger.warning(f"Registrierung fehlgeschlagen - E-Mail bereits vorhanden: {data['email']}")
result = {"error": "E-Mail-Adresse bereits registriert"}, 400
except Exception as e:
db_session.rollback()
auth_logger.error(f"Fehler bei der Registrierung: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/auth/login", methods=["POST"])
def login():
data = request.json
if not data or not all(k in data for k in ["email", "password"]):
auth_logger.warning("Login mit unvollständigen Daten versucht")
return jsonify({"error": "Unvollständige Daten"}), 400
db_session = get_db_session()
user = db_session.query(User).filter(User.email == data["email"]).first()
db_session.close()
if not user or not user.check_password(data["password"]):
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für: {data['email']}")
return jsonify({"error": "Ungültige Anmeldedaten"}), 401
# UserMixin-Objekt erstellen
user_mixin = UserMixin()
user_mixin.id = str(user.id)
user_mixin.is_admin = user.is_admin()
user_mixin.email = user.email
# Benutzer anmelden und persistente Session erstellen
login_user(user_mixin, remember=True)
session.permanent = True
auth_logger.info(f"Erfolgreicher Login: {user.email}")
return jsonify({
"success": True,
"user": {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role
}
})
@app.route("/auth/logout", methods=["POST"])
@login_required
def logout():
user_email = getattr(current_user, 'email', 'unbekannt')
logout_user()
auth_logger.info(f"Benutzer abgemeldet: {user_email}")
return jsonify({"success": True, "message": "Erfolgreich abgemeldet"})
@app.route("/api/create-initial-admin", methods=["POST"])
def api_create_initial_admin():
data = request.json
if not data or not all(k in data for k in ["email", "password", "name"]):
return jsonify({"error": "Unvollständige Daten"}), 400
success = create_initial_admin(
email=data["email"],
password=data["password"],
name=data["name"]
)
if success:
auth_logger.info(f"Initialer Admin erstellt: {data['email']}")
return jsonify({"success": True, "message": "Admin-Benutzer erfolgreich angelegt"})
else:
return jsonify({"error": "Es existieren bereits Benutzer"}), 400
# Drucker-Routen
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
db_session = get_db_session()
printers = db_session.query(Printer).all()
db_session.close()
return jsonify({
"printers": [printer.to_dict() for printer in printers]
})
@app.route("/api/printers/<int:printer_id>", methods=["GET"])
@login_required
def get_printer(printer_id):
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
db_session.close()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
return jsonify(printer.to_dict())
@app.route("/api/printers", methods=["POST"])
@login_required
@admin_required
def create_printer():
data = request.json
if not data or not all(k in data for k in ["name", "mac_address"]):
return jsonify({"error": "Unvollständige Daten"}), 400
# IP-Adresse aus der PRINTERS-Konfiguration holen
printer_name = data["name"]
if printer_name not in PRINTERS:
printers_logger.error(f"Drucker {printer_name} nicht in Konfiguration gefunden")
return jsonify({"error": "Drucker nicht in Konfiguration gefunden"}), 400
plug_ip = PRINTERS[printer_name]["ip"]
db_session = get_db_session()
try:
# Prüfen, ob MAC bereits existiert
existing = db_session.query(Printer).filter(Printer.mac_address == data["mac_address"]).first()
if existing:
db_session.close()
return jsonify({"error": "MAC-Adresse bereits registriert"}), 400
new_printer = Printer(
name=data["name"],
location=data.get("location", ""),
mac_address=data["mac_address"],
plug_ip=plug_ip,
plug_username=TAPO_USERNAME,
plug_password=TAPO_PASSWORD,
active=data.get("active", True)
)
db_session.add(new_printer)
db_session.commit()
printers_logger.info(f"Neuer Drucker erstellt: {data['name']} ({plug_ip})")
result = {"success": True, "printer": new_printer.to_dict()}
except Exception as e:
db_session.rollback()
printers_logger.error(f"Fehler beim Anlegen des Druckers: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
@login_required
@admin_required
def delete_printer(printer_id):
db_session = get_db_session()
try:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Prüfen, ob aktive Jobs existieren
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "active"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({"error": "Es existieren aktive Jobs für diesen Drucker"}), 400
printer_name = printer.name
db_session.delete(printer)
db_session.commit()
printers_logger.info(f"Drucker gelöscht: {printer_name}")
result = {"success": True, "message": "Drucker erfolgreich gelöscht"}
except Exception as e:
db_session.rollback()
printers_logger.error(f"Fehler beim Löschen des Druckers: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
# Job-Routen
@app.route("/api/jobs", methods=["GET"])
@login_required
def get_jobs():
db_session = get_db_session()
# Admin sieht alle Jobs, User nur eigene
if hasattr(current_user, "is_admin") and current_user.is_admin:
jobs = db_session.query(Job).all()
else:
jobs = db_session.query(Job).filter(Job.user_id == int(current_user.id)).all()
db_session.close()
return jsonify({
"jobs": [job.to_dict() for job in jobs]
})
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
@login_required
@job_owner_required
def get_job(job_id):
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
db_session.close()
return jsonify(job.to_dict())
@app.route("/api/jobs", methods=["POST"])
@login_required
def create_job():
data = request.json
if not data or not all(k in data for k in ["title", "printer_id", "start_time", "end_time"]):
return jsonify({"error": "Unvollständige Daten"}), 400
db_session = get_db_session()
try:
# Drucker prüfen
printer = db_session.query(Printer).filter(Printer.id == data["printer_id"]).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Datumsformate parsen
start_time = datetime.fromisoformat(data["start_time"].replace("Z", "+00:00"))
end_time = datetime.fromisoformat(data["end_time"].replace("Z", "+00:00"))
# Zeitvalidierung
if start_time < datetime.now():
db_session.close()
return jsonify({"error": "Startzeit liegt in der Vergangenheit"}), 400
if end_time <= start_time:
db_session.close()
return jsonify({"error": "Endzeit muss nach der Startzeit liegen"}), 400
# Überlappende Jobs prüfen
overlapping_jobs = db_session.query(Job).filter(
Job.printer_id == data["printer_id"],
Job.status.in_(["scheduled", "active"]),
((Job.start_time <= start_time) & (Job.end_time > start_time)) |
((Job.start_time < end_time) & (Job.end_time >= end_time)) |
((Job.start_time >= start_time) & (Job.end_time <= end_time))
).count()
if overlapping_jobs > 0:
db_session.close()
return jsonify({"error": "Es existieren bereits überlappende Jobs"}), 400
new_job = Job(
title=data["title"],
user_id=int(current_user.id),
printer_id=data["printer_id"],
start_time=start_time,
end_time=end_time,
notes=data.get("notes", ""),
status="scheduled"
)
db_session.add(new_job)
db_session.commit()
jobs_logger.info(f"Neuer Job erstellt: {data['title']} von User {current_user.id}")
result = {"success": True, "job": new_job.to_dict()}
except ValueError as e:
db_session.rollback()
jobs_logger.error(f"Fehler beim Datum-Parsing: {str(e)}")
result = {"error": "Ungültiges Datumsformat"}, 400
except Exception as e:
db_session.rollback()
jobs_logger.error(f"Fehler beim Anlegen des Jobs: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/jobs/<int:job_id>", methods=["DELETE"])
@login_required
@job_owner_required
def delete_job(job_id):
db_session = get_db_session()
try:
job = db_session.query(Job).filter(Job.id == job_id).first()
job_title = job.title
# Job löschen
db_session.delete(job)
db_session.commit()
# Wenn Job aktiv war, Plug ausschalten
if job.status == "active":
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
if printer:
set_plug_state(printer.mac_address, False)
jobs_logger.info(f"Job gelöscht: {job_title} (ID: {job_id})")
result = {"success": True, "message": "Job erfolgreich gelöscht"}
except Exception as e:
db_session.rollback()
jobs_logger.error(f"Fehler beim Löschen des Jobs: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/jobs/<int:job_id>/finish", methods=["POST"])
@login_required
@job_owner_required
def finish_job(job_id):
db_session = get_db_session()
try:
job = db_session.query(Job).filter(Job.id == job_id).first()
if job.status not in ["scheduled", "active"]:
db_session.close()
return jsonify({"error": "Nur geplante oder aktive Jobs können beendet werden"}), 400
# Drucker holen und Plug ausschalten
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
set_plug_state(printer.mac_address, False)
# Job als beendet markieren
job.status = "completed"
job.actual_end_time = datetime.now()
# Statistik aktualisieren
stats = db_session.query(Stats).first()
if stats:
# Druckzeit in Sekunden berechnen
if job.start_time and job.actual_end_time:
print_time_seconds = int((job.actual_end_time - job.start_time).total_seconds())
stats.total_print_time += print_time_seconds
stats.total_jobs_completed += 1
if job.material_used:
stats.total_material_used += job.material_used
stats.last_updated = datetime.now()
db_session.commit()
jobs_logger.info(f"Job manuell beendet: {job.title} (ID: {job_id})")
result = {"success": True, "job": job.to_dict()}
except Exception as e:
db_session.rollback()
jobs_logger.error(f"Fehler beim Beenden des Jobs: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/jobs/<int:job_id>/abort", methods=["POST"])
@login_required
@job_owner_required
def abort_job(job_id):
db_session = get_db_session()
try:
job = db_session.query(Job).filter(Job.id == job_id).first()
if job.status not in ["scheduled", "active"]:
db_session.close()
return jsonify({"error": "Nur geplante oder aktive Jobs können abgebrochen werden"}), 400
# Drucker holen und Plug ausschalten
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
set_plug_state(printer.mac_address, False)
# Job als abgebrochen markieren
job.status = "aborted"
job.actual_end_time = datetime.now()
db_session.commit()
jobs_logger.info(f"Job abgebrochen: {job.title} (ID: {job_id})")
result = {"success": True, "job": job.to_dict()}
except Exception as e:
db_session.rollback()
jobs_logger.error(f"Fehler beim Abbrechen des Jobs: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/jobs/<int:job_id>/extend", methods=["POST"])
@login_required
@job_owner_required
def extend_job(job_id):
data = request.json
if not data or "minutes" not in data:
return jsonify({"error": "Anzahl der Minuten fehlt"}), 400
minutes = int(data["minutes"])
if minutes <= 0 or minutes > 180: # Max. 3 Stunden Verlängerung
return jsonify({"error": "Ungültige Minutenanzahl (1-180)"}), 400
db_session = get_db_session()
try:
job = db_session.query(Job).filter(Job.id == job_id).first()
if job.status not in ["scheduled", "active"]:
db_session.close()
return jsonify({"error": "Nur geplante oder aktive Jobs können verlängert werden"}), 400
# Neue Endzeit berechnen
new_end_time = job.end_time + timedelta(minutes=minutes)
# Überlappende Jobs prüfen
overlapping_jobs = db_session.query(Job).filter(
Job.printer_id == job.printer_id,
Job.id != job_id,
Job.status.in_(["scheduled", "active"]),
Job.start_time < new_end_time,
Job.start_time > job.end_time
).count()
if overlapping_jobs > 0:
db_session.close()
return jsonify({"error": "Verlängerung überschneidet sich mit nachfolgenden Jobs"}), 400
# Endzeit aktualisieren
job.end_time = new_end_time
db_session.commit()
jobs_logger.info(f"Job verlängert: {job.title} (ID: {job_id}) um {minutes} Minuten")
result = {"success": True, "job": job.to_dict()}
except Exception as e:
db_session.rollback()
jobs_logger.error(f"Fehler beim Verlängern des Jobs: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/jobs/<int:job_id>/status", methods=["GET"])
@login_required
@job_owner_required
def get_job_status(job_id):
db_session = get_db_session()
try:
job = db_session.query(Job).filter(Job.id == job_id).first()
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Plug-Status abfragen
try:
p110 = PyP110.P110(printer.plug_ip, TAPO_USERNAME, TAPO_PASSWORD)
p110.handshake()
p110.login()
plug_info = p110.getDeviceInfo()
plug_state = plug_info.get("device_on", False)
power_consumption = plug_info.get("current_power", 0)
except Exception as e:
printers_logger.error(f"Fehler beim Abfragen des Plug-Status: {str(e)}")
plug_state = None
power_consumption = None
result = {
"job_id": job.id,
"status": job.status,
"plug_state": plug_state,
"power_consumption": power_consumption
}
except Exception as e:
jobs_logger.error(f"Fehler beim Abfragen des Job-Status: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
@app.route("/api/jobs/<int:job_id>/remaining-time", methods=["GET"])
@login_required
@job_owner_required
def get_remaining_time(job_id):
db_session = get_db_session()
try:
job = db_session.query(Job).filter(Job.id == job_id).first()
if job.status != "active":
db_session.close()
return jsonify({"error": "Nur aktive Jobs haben eine Restzeit"}), 400
now = datetime.now()
if now > job.end_time:
remaining_seconds = 0
else:
remaining_seconds = int((job.end_time - now).total_seconds())
result = {
"job_id": job.id,
"remaining_seconds": remaining_seconds,
"end_time": job.end_time.isoformat()
}
except Exception as e:
jobs_logger.error(f"Fehler beim Berechnen der Restzeit: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
# Benutzer-Routen
@app.route("/api/users", methods=["GET"])
@login_required
@admin_required
def get_users():
db_session = get_db_session()
users = db_session.query(User).all()
db_session.close()
return jsonify({
"users": [user.to_dict() for user in users]
})
@app.route("/api/users/<int:user_id>", methods=["GET"])
@login_required
def get_user(user_id):
# Normale Benutzer dürfen nur sich selbst sehen
if not current_user.is_admin and int(current_user.id) != user_id:
return jsonify({"error": "Keine Berechtigung"}), 403
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
db_session.close()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
return jsonify(user.to_dict())
@app.route("/api/users/<int:user_id>", methods=["DELETE"])
@login_required
@admin_required
def delete_user(user_id):
# Admin kann sich nicht selbst löschen
if int(current_user.id) == user_id:
return jsonify({"error": "Admin kann sich nicht selbst löschen"}), 400
db_session = get_db_session()
try:
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Prüfen, ob aktive Jobs existieren
active_jobs = db_session.query(Job).filter(
Job.user_id == user_id,
Job.status.in_(["scheduled", "active"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({"error": "Es existieren aktive Jobs für diesen Benutzer"}), 400
user_email = user.email
db_session.delete(user)
db_session.commit()
auth_logger.info(f"Benutzer gelöscht: {user_email}")
result = {"success": True, "message": "Benutzer erfolgreich gelöscht"}
except Exception as e:
db_session.rollback()
auth_logger.error(f"Fehler beim Löschen des Benutzers: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
# Statistik-Route
@app.route("/api/stats", methods=["GET"])
@login_required
def get_stats():
db_session = get_db_session()
try:
stats = db_session.query(Stats).first()
if not stats:
stats = Stats()
db_session.add(stats)
db_session.commit()
# Zusätzliche Statistikdaten berechnen
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "active"])).count()
completed_jobs = db_session.query(Job).filter(Job.status == "completed").count()
result = {
"total_print_time_seconds": stats.total_print_time,
"total_print_time_hours": round(stats.total_print_time / 3600, 2),
"total_jobs_completed": stats.total_jobs_completed,
"total_material_used": stats.total_material_used,
"user_count": user_count,
"printer_count": printer_count,
"active_jobs": active_jobs,
"completed_jobs": completed_jobs,
"last_updated": stats.last_updated.isoformat() if stats.last_updated else None
}
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Statistik: {str(e)}")
result = {"error": "Interner Serverfehler"}, 500
finally:
db_session.close()
return jsonify(result)
# Health-Check-Route
@app.route("/api/test", methods=["GET"])
def test():
return jsonify({
"status": "ok",
"time": datetime.now().isoformat(),
"version": "1.0.0"
})
# API-Endpoint zur Scheduler-Steuerung
@app.route("/api/scheduler/status", methods=["GET"])
@login_required
@admin_required
def get_scheduler_status():
"""Gibt den aktuellen Status des Schedulers zurück."""
return jsonify({
"running": scheduler.is_running(),
"tasks": scheduler.get_task_info()
})
@app.route("/api/scheduler/start", methods=["POST"])
@login_required
@admin_required
def start_scheduler_api():
"""Startet den Scheduler."""
success = scheduler.start()
return jsonify({
"success": success,
"running": scheduler.is_running()
})
@app.route("/api/scheduler/stop", methods=["POST"])
@login_required
@admin_required
def stop_scheduler_api():
"""Stoppt den Scheduler."""
success = scheduler.stop()
return jsonify({
"success": success,
"running": scheduler.is_running()
})
# Frontend-Routen
@app.route("/")
def index():
"""Hauptseite - Weiterleitung zum Dashboard oder Login."""
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
return redirect(url_for('login_page'))
@app.route("/login")
def login_page():
"""Login-Seite."""
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
return render_template('login.html')
@app.route("/dashboard")
@login_required
def dashboard():
"""Dashboard-Seite."""
return render_template('dashboard.html')
@app.route("/printers")
@login_required
def printers_page():
"""Drucker-Übersichtsseite."""
return render_template('printers.html')
@app.route("/jobs")
@login_required
def jobs_page():
"""Jobs-Übersichtsseite."""
return render_template('jobs.html')
@app.route("/jobs/new")
@login_required
def new_job_page():
"""Neuen Job erstellen."""
return render_template('job_create.html')
@app.route("/jobs/<int:job_id>")
@login_required
@job_owner_required
def job_detail_page(job_id):
"""Job-Detailseite."""
return render_template('job_detail.html', job_id=job_id)
@app.route("/stats")
@login_required
def stats_page():
"""Statistiken-Seite."""
return render_template('stats.html')
@app.route("/admin")
@login_required
@admin_required
def admin_page():
"""Admin-Panel."""
return render_template('admin.html')
@app.route("/profile")
@login_required
def profile_page():
"""Benutzerprofil."""
return render_template('profile.html')
# Scheduler starten
def start_scheduler():
"""Initialisiert und startet den Scheduler mit den erforderlichen Tasks."""
if not SCHEDULER_ENABLED:
app_logger.info("Scheduler ist deaktiviert")
return
# Registriere Job-Monitor-Task
scheduler.register_task(
task_id="job_monitor",
func=job_monitor,
interval=SCHEDULER_INTERVAL,
enabled=True
)
# Scheduler starten
scheduler.start()
app_logger.info("Job-Scheduler gestartet")
if __name__ == "__main__":
# Scheduler starten
start_scheduler()
# Flask-App starten
app_logger.info(f"Flask-App wird gestartet auf {FLASK_HOST}:{FLASK_PORT}")
app.run(host=FLASK_HOST, port=FLASK_PORT, debug=FLASK_DEBUG)