3269 lines
121 KiB
Python
3269 lines
121 KiB
Python
import os
|
|
import sys
|
|
import logging
|
|
import threading
|
|
import time
|
|
import subprocess
|
|
import socket
|
|
import json
|
|
import secrets
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from typing import Optional, Dict, List, Tuple, Any, Union
|
|
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_file, Response, make_response
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user, UserMixin
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
from werkzeug.utils import secure_filename
|
|
import sqlalchemy.exc
|
|
import sqlalchemy
|
|
from sqlalchemy.orm import joinedload
|
|
from sqlalchemy import func
|
|
from PyP100 import PyP110
|
|
from flask_wtf.csrf import CSRFProtect
|
|
|
|
from config.settings import (
|
|
SECRET_KEY, TAPO_USERNAME, TAPO_PASSWORD, PRINTERS,
|
|
FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SESSION_LIFETIME,
|
|
SCHEDULER_INTERVAL, SCHEDULER_ENABLED, get_ssl_context, FLASK_FALLBACK_PORT,
|
|
SSL_ENABLED, SSL_CERT_PATH, SSL_KEY_PATH
|
|
)
|
|
from utils.logging_config import setup_logging, get_logger, log_startup_info
|
|
from models import User, Printer, Job, Stats, get_db_session, init_database, create_initial_admin
|
|
from utils.job_scheduler import scheduler
|
|
from utils.template_helpers import register_template_helpers
|
|
from utils.database_utils import backup_manager, database_monitor, maintenance_scheduler
|
|
|
|
# Flask-App initialisieren
|
|
app = Flask(__name__)
|
|
app.secret_key = SECRET_KEY
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = SESSION_LIFETIME
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
app.config["WTF_CSRF_ENABLED"] = True
|
|
|
|
# CSRF-Schutz initialisieren
|
|
csrf = CSRFProtect(app)
|
|
|
|
# Login-Manager initialisieren
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = "login"
|
|
login_manager.login_message = "Bitte melden Sie sich an, um auf diese Seite zuzugreifen."
|
|
login_manager.login_message_category = "info"
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(User.id == user_id).first()
|
|
db_session.close()
|
|
return user
|
|
|
|
# Jinja2 Context Processors
|
|
@app.context_processor
|
|
def inject_now():
|
|
"""Inject the current datetime into templates."""
|
|
return {'now': datetime.now()}
|
|
|
|
# Custom Jinja2 filter für Datumsformatierung
|
|
@app.template_filter('format_datetime')
|
|
def format_datetime_filter(value, format='%d.%m.%Y %H:%M'):
|
|
"""Format a datetime object to a German-style date and time string"""
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, str):
|
|
try:
|
|
value = datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return value
|
|
return value.strftime(format)
|
|
|
|
# Logging initialisieren
|
|
setup_logging()
|
|
log_startup_info()
|
|
|
|
# Logger für verschiedene Komponenten
|
|
app_logger = get_logger("app")
|
|
auth_logger = get_logger("auth")
|
|
jobs_logger = get_logger("jobs")
|
|
printers_logger = get_logger("printers")
|
|
user_logger = get_logger("user")
|
|
kiosk_logger = get_logger("kiosk")
|
|
|
|
# Sicheres Passwort-Hash für Kiosk-Deaktivierung
|
|
KIOSK_PASSWORD_HASH = generate_password_hash("744563017196A")
|
|
|
|
print("Alle Blueprints wurden in app.py integriert")
|
|
|
|
# Custom decorator für Job-Besitzer-Check
|
|
def job_owner_required(f):
|
|
@wraps(f)
|
|
def decorated_function(job_id, *args, **kwargs):
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).filter(Job.id == job_id).first()
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
|
|
is_admin = current_user.is_admin
|
|
|
|
if not (is_owner or is_admin):
|
|
db_session.close()
|
|
return jsonify({"error": "Keine Berechtigung"}), 403
|
|
|
|
db_session.close()
|
|
return f(job_id, *args, **kwargs)
|
|
return decorated_function
|
|
|
|
# Custom decorator für Admin-Check
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
@login_required
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren haben Zugriff"}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# ===== AUTHENTIFIZIERUNGS-ROUTEN (ehemals auth.py) =====
|
|
|
|
@app.route("/auth/login", methods=["GET", "POST"])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
|
|
error = None
|
|
if request.method == "POST":
|
|
# Unterscheiden zwischen JSON-Anfragen und normalen Formular-Anfragen
|
|
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
|
|
|
|
# Daten je nach Anfrageart auslesen
|
|
if is_json_request:
|
|
data = request.get_json()
|
|
username = data.get("username")
|
|
password = data.get("password")
|
|
remember_me = data.get("remember_me", False)
|
|
else:
|
|
username = request.form.get("username")
|
|
password = request.form.get("password")
|
|
remember_me = request.form.get("remember-me") == "on"
|
|
|
|
if not username or not password:
|
|
error = "Benutzername und Passwort müssen angegeben werden."
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 400
|
|
else:
|
|
try:
|
|
db_session = get_db_session()
|
|
# Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail
|
|
user = db_session.query(User).filter(
|
|
(User.username == username) | (User.email == username)
|
|
).first()
|
|
|
|
if user and user.check_password(password):
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=remember_me)
|
|
auth_logger.info(f"Benutzer {username} hat sich angemeldet")
|
|
|
|
next_page = request.args.get("next")
|
|
db_session.close()
|
|
|
|
if is_json_request:
|
|
return jsonify({"success": True, "redirect_url": next_page or url_for("index")})
|
|
else:
|
|
if next_page:
|
|
return redirect(next_page)
|
|
return redirect(url_for("index"))
|
|
else:
|
|
error = "Ungültiger Benutzername oder Passwort."
|
|
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}")
|
|
db_session.close()
|
|
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 401
|
|
except Exception as e:
|
|
# Fehlerbehandlung für Datenbankprobleme
|
|
error = "Anmeldefehler. Bitte versuchen Sie es später erneut."
|
|
auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}")
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 500
|
|
|
|
return render_template("login.html", error=error)
|
|
|
|
@app.route("/auth/logout", methods=["GET", "POST"])
|
|
@login_required
|
|
def auth_logout():
|
|
username = current_user.username if hasattr(current_user, "username") else "Unbekannt"
|
|
logout_user()
|
|
auth_logger.info(f"Benutzer {username} hat sich abgemeldet")
|
|
|
|
# Unterscheiden zwischen JSON-Anfragen und normalen Anfragen
|
|
if request.is_json or request.headers.get('Content-Type') == 'application/json':
|
|
return jsonify({"success": True, "redirect_url": url_for("login")})
|
|
else:
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/auth/api/login", methods=["POST"])
|
|
def api_login():
|
|
"""API-Login-Endpunkt für Frontend"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "Keine Daten erhalten"}), 400
|
|
|
|
username = data.get("username")
|
|
password = data.get("password")
|
|
remember_me = data.get("remember_me", False)
|
|
|
|
if not username or not password:
|
|
return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400
|
|
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(
|
|
(User.username == username) | (User.email == username)
|
|
).first()
|
|
|
|
if user and user.check_password(password):
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=remember_me)
|
|
auth_logger.info(f"API-Login erfolgreich für Benutzer {username}")
|
|
|
|
user_data = {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"is_admin": user.is_admin
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify({
|
|
"success": True,
|
|
"user": user_data,
|
|
"redirect_url": url_for("index")
|
|
})
|
|
else:
|
|
auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}")
|
|
db_session.close()
|
|
return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler beim API-Login: {str(e)}")
|
|
return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500
|
|
|
|
@app.route("/auth/api/callback", methods=["GET", "POST"])
|
|
def api_callback():
|
|
"""OAuth-Callback-Endpunkt für externe Authentifizierung"""
|
|
try:
|
|
# OAuth-Provider bestimmen
|
|
provider = request.args.get('provider', 'github')
|
|
|
|
if request.method == "GET":
|
|
# Authorization Code aus URL-Parameter extrahieren
|
|
code = request.args.get('code')
|
|
state = request.args.get('state')
|
|
error = request.args.get('error')
|
|
|
|
if error:
|
|
auth_logger.warning(f"OAuth-Fehler von {provider}: {error}")
|
|
return jsonify({
|
|
"error": f"OAuth-Authentifizierung fehlgeschlagen: {error}",
|
|
"redirect_url": url_for("login")
|
|
}), 400
|
|
|
|
if not code:
|
|
auth_logger.warning(f"Kein Authorization Code von {provider} erhalten")
|
|
return jsonify({
|
|
"error": "Kein Authorization Code erhalten",
|
|
"redirect_url": url_for("login")
|
|
}), 400
|
|
|
|
# State-Parameter validieren (CSRF-Schutz)
|
|
session_state = session.get('oauth_state')
|
|
if not state or state != session_state:
|
|
auth_logger.warning(f"Ungültiger State-Parameter von {provider}")
|
|
return jsonify({
|
|
"error": "Ungültiger State-Parameter",
|
|
"redirect_url": url_for("login")
|
|
}), 400
|
|
|
|
# OAuth-Token austauschen
|
|
if provider == 'github':
|
|
user_data = handle_github_callback(code)
|
|
else:
|
|
auth_logger.error(f"Unbekannter OAuth-Provider: {provider}")
|
|
return jsonify({
|
|
"error": "Unbekannter OAuth-Provider",
|
|
"redirect_url": url_for("login")
|
|
}), 400
|
|
|
|
if not user_data:
|
|
return jsonify({
|
|
"error": "Fehler beim Abrufen der Benutzerdaten",
|
|
"redirect_url": url_for("login")
|
|
}), 400
|
|
|
|
# Benutzer in Datenbank suchen oder erstellen
|
|
db_session = get_db_session()
|
|
try:
|
|
user = db_session.query(User).filter(
|
|
User.email == user_data['email']
|
|
).first()
|
|
|
|
if not user:
|
|
# Neuen Benutzer erstellen
|
|
user = User(
|
|
username=user_data['username'],
|
|
email=user_data['email'],
|
|
name=user_data['name'],
|
|
is_admin=False,
|
|
oauth_provider=provider,
|
|
oauth_id=str(user_data['id'])
|
|
)
|
|
# Zufälliges Passwort setzen (wird nicht verwendet)
|
|
import secrets
|
|
user.set_password(secrets.token_urlsafe(32))
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
|
|
else:
|
|
# Bestehenden Benutzer aktualisieren
|
|
user.oauth_provider = provider
|
|
user.oauth_id = str(user_data['id'])
|
|
user.name = user_data['name']
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
|
|
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=True)
|
|
|
|
# Session-State löschen
|
|
session.pop('oauth_state', None)
|
|
|
|
response_data = {
|
|
"success": True,
|
|
"user": {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"is_admin": user.is_admin
|
|
},
|
|
"redirect_url": url_for("index")
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify(response_data)
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
|
|
return jsonify({
|
|
"error": "Datenbankfehler bei der Benutzeranmeldung",
|
|
"redirect_url": url_for("login")
|
|
}), 500
|
|
|
|
elif request.method == "POST":
|
|
# POST-Anfragen für manuelle Token-Übermittlung
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "Keine Daten erhalten"}), 400
|
|
|
|
access_token = data.get('access_token')
|
|
provider = data.get('provider', 'github')
|
|
|
|
if not access_token:
|
|
return jsonify({"error": "Kein Access Token erhalten"}), 400
|
|
|
|
# Benutzerdaten mit Access Token abrufen
|
|
if provider == 'github':
|
|
user_data = get_github_user_data(access_token)
|
|
else:
|
|
return jsonify({"error": "Unbekannter OAuth-Provider"}), 400
|
|
|
|
if not user_data:
|
|
return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 400
|
|
|
|
# Benutzer verarbeiten (gleiche Logik wie bei GET)
|
|
db_session = get_db_session()
|
|
try:
|
|
user = db_session.query(User).filter(
|
|
User.email == user_data['email']
|
|
).first()
|
|
|
|
if not user:
|
|
user = User(
|
|
username=user_data['username'],
|
|
email=user_data['email'],
|
|
name=user_data['name'],
|
|
is_admin=False,
|
|
oauth_provider=provider,
|
|
oauth_id=str(user_data['id'])
|
|
)
|
|
import secrets
|
|
user.set_password(secrets.token_urlsafe(32))
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
|
|
else:
|
|
user.oauth_provider = provider
|
|
user.oauth_id = str(user_data['id'])
|
|
user.name = user_data['name']
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
|
|
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=True)
|
|
|
|
response_data = {
|
|
"success": True,
|
|
"user": {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"is_admin": user.is_admin
|
|
},
|
|
"redirect_url": url_for("index")
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify(response_data)
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
|
|
return jsonify({
|
|
"error": "Datenbankfehler bei der Benutzeranmeldung",
|
|
"redirect_url": url_for("login")
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}")
|
|
return jsonify({
|
|
"error": "OAuth-Callback-Fehler",
|
|
"redirect_url": url_for("login")
|
|
}), 500
|
|
|
|
|
|
def handle_github_callback(code):
|
|
"""GitHub OAuth-Callback verarbeiten"""
|
|
try:
|
|
import requests
|
|
|
|
# GitHub OAuth-Konfiguration (sollte aus Umgebungsvariablen kommen)
|
|
client_id = "7c5d8bef1a5519ec1fdc"
|
|
client_secret = "5f1e586204358fbd53cf5fb7d418b3f06ccab8fd"
|
|
|
|
if not client_id or not client_secret:
|
|
auth_logger.error("GitHub OAuth-Konfiguration fehlt")
|
|
return None
|
|
|
|
# Access Token anfordern
|
|
token_url = "https://github.com/login/oauth/access_token"
|
|
token_data = {
|
|
'client_id': client_id,
|
|
'client_secret': client_secret,
|
|
'code': code
|
|
}
|
|
|
|
token_response = requests.post(
|
|
token_url,
|
|
data=token_data,
|
|
headers={'Accept': 'application/json'},
|
|
timeout=10
|
|
)
|
|
|
|
if token_response.status_code != 200:
|
|
auth_logger.error(f"GitHub Token-Anfrage fehlgeschlagen: {token_response.status_code}")
|
|
return None
|
|
|
|
token_json = token_response.json()
|
|
access_token = token_json.get('access_token')
|
|
|
|
if not access_token:
|
|
auth_logger.error("Kein Access Token von GitHub erhalten")
|
|
return None
|
|
|
|
return get_github_user_data(access_token)
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler bei GitHub OAuth-Callback: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_github_user_data(access_token):
|
|
"""GitHub-Benutzerdaten mit Access Token abrufen"""
|
|
try:
|
|
import requests
|
|
|
|
# Benutzerdaten von GitHub API abrufen
|
|
user_url = "https://api.github.com/user"
|
|
headers = {
|
|
'Authorization': f'token {access_token}',
|
|
'Accept': 'application/vnd.github.v3+json'
|
|
}
|
|
|
|
user_response = requests.get(user_url, headers=headers, timeout=10)
|
|
|
|
if user_response.status_code != 200:
|
|
auth_logger.error(f"GitHub User-API-Anfrage fehlgeschlagen: {user_response.status_code}")
|
|
return None
|
|
|
|
user_data = user_response.json()
|
|
|
|
# E-Mail-Adresse separat abrufen (falls nicht öffentlich)
|
|
email = user_data.get('email')
|
|
if not email:
|
|
email_url = "https://api.github.com/user/emails"
|
|
email_response = requests.get(email_url, headers=headers, timeout=10)
|
|
|
|
if email_response.status_code == 200:
|
|
emails = email_response.json()
|
|
# Primäre E-Mail-Adresse finden
|
|
for email_obj in emails:
|
|
if email_obj.get('primary', False):
|
|
email = email_obj.get('email')
|
|
break
|
|
|
|
# Fallback: Erste E-Mail-Adresse verwenden
|
|
if not email and emails:
|
|
email = emails[0].get('email')
|
|
|
|
if not email:
|
|
auth_logger.error("Keine E-Mail-Adresse von GitHub erhalten")
|
|
return None
|
|
|
|
return {
|
|
'id': user_data.get('id'),
|
|
'username': user_data.get('login'),
|
|
'name': user_data.get('name') or user_data.get('login'),
|
|
'email': email
|
|
}
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler beim Abrufen der GitHub-Benutzerdaten: {str(e)}")
|
|
return None
|
|
|
|
# ===== BENUTZER-ROUTEN (ehemals user.py) =====
|
|
|
|
@app.route("/user/profile", methods=["GET"])
|
|
@login_required
|
|
def user_profile():
|
|
"""Profil-Seite anzeigen"""
|
|
user_logger.info(f"Benutzer {current_user.username} hat seine Profilseite aufgerufen")
|
|
return render_template("profile.html", user=current_user)
|
|
|
|
@app.route("/user/settings", methods=["GET"])
|
|
@login_required
|
|
def user_settings():
|
|
"""Einstellungen-Seite anzeigen"""
|
|
user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungsseite aufgerufen")
|
|
return render_template("settings.html", user=current_user)
|
|
|
|
@app.route("/user/update-profile", methods=["POST"])
|
|
@login_required
|
|
def user_update_profile():
|
|
"""Benutzerprofilinformationen aktualisieren"""
|
|
try:
|
|
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
|
|
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
|
|
|
|
if is_json_request:
|
|
data = request.get_json()
|
|
name = data.get("name")
|
|
email = data.get("email")
|
|
department = data.get("department")
|
|
position = data.get("position")
|
|
phone = data.get("phone")
|
|
else:
|
|
name = request.form.get("name")
|
|
email = request.form.get("email")
|
|
department = request.form.get("department")
|
|
position = request.form.get("position")
|
|
phone = request.form.get("phone")
|
|
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(User.id == current_user.id).first()
|
|
|
|
if user:
|
|
# Aktualisiere die Benutzerinformationen
|
|
if name:
|
|
user.name = name
|
|
if email:
|
|
user.email = email
|
|
if department:
|
|
user.department = department
|
|
if position:
|
|
user.position = position
|
|
if phone:
|
|
user.phone = phone
|
|
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
user_logger.info(f"Benutzer {current_user.username} hat sein Profil aktualisiert")
|
|
|
|
if is_json_request:
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Profil erfolgreich aktualisiert"
|
|
})
|
|
else:
|
|
flash("Profil erfolgreich aktualisiert", "success")
|
|
return redirect(url_for("user_profile"))
|
|
else:
|
|
error = "Benutzer nicht gefunden."
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 404
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_profile"))
|
|
|
|
except Exception as e:
|
|
error = f"Fehler beim Aktualisieren des Profils: {str(e)}"
|
|
user_logger.error(error)
|
|
if request.is_json:
|
|
return jsonify({"error": error}), 500
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_profile"))
|
|
finally:
|
|
db_session.close()
|
|
|
|
@app.route("/user/api/update-settings", methods=["POST"])
|
|
@login_required
|
|
def user_api_update_settings():
|
|
"""API-Endpunkt für Einstellungen-Updates (JSON)"""
|
|
return user_update_settings()
|
|
|
|
@app.route("/user/update-settings", methods=["POST"])
|
|
@login_required
|
|
def user_update_settings():
|
|
"""Benutzereinstellungen aktualisieren"""
|
|
db_session = get_db_session()
|
|
try:
|
|
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
|
|
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
|
|
|
|
# Einstellungen aus der Anfrage extrahieren
|
|
if is_json_request:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "Keine Daten empfangen"}), 400
|
|
|
|
theme = data.get("theme", "system")
|
|
reduced_motion = bool(data.get("reduced_motion", False))
|
|
contrast = data.get("contrast", "normal")
|
|
notifications = data.get("notifications", {})
|
|
privacy = data.get("privacy", {})
|
|
else:
|
|
theme = request.form.get("theme", "system")
|
|
reduced_motion = request.form.get("reduced_motion") == "on"
|
|
contrast = request.form.get("contrast", "normal")
|
|
notifications = {
|
|
"new_jobs": request.form.get("notify_new_jobs") == "on",
|
|
"job_updates": request.form.get("notify_job_updates") == "on",
|
|
"system": request.form.get("notify_system") == "on",
|
|
"email": request.form.get("notify_email") == "on"
|
|
}
|
|
privacy = {
|
|
"activity_logs": request.form.get("activity_logs") == "on",
|
|
"two_factor": request.form.get("two_factor") == "on",
|
|
"auto_logout": int(request.form.get("auto_logout", "60"))
|
|
}
|
|
|
|
# Validierung der Eingaben
|
|
valid_themes = ["light", "dark", "system"]
|
|
if theme not in valid_themes:
|
|
theme = "system"
|
|
|
|
valid_contrasts = ["normal", "high"]
|
|
if contrast not in valid_contrasts:
|
|
contrast = "normal"
|
|
|
|
# Benutzer aus der Datenbank laden
|
|
user = db_session.query(User).filter(User.id == current_user.id).first()
|
|
|
|
if not user:
|
|
error = "Benutzer nicht gefunden."
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 404
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_settings"))
|
|
|
|
# Einstellungen-Dictionary erstellen
|
|
settings = {
|
|
"theme": theme,
|
|
"reduced_motion": reduced_motion,
|
|
"contrast": contrast,
|
|
"notifications": {
|
|
"new_jobs": bool(notifications.get("new_jobs", True)),
|
|
"job_updates": bool(notifications.get("job_updates", True)),
|
|
"system": bool(notifications.get("system", True)),
|
|
"email": bool(notifications.get("email", False))
|
|
},
|
|
"privacy": {
|
|
"activity_logs": bool(privacy.get("activity_logs", True)),
|
|
"two_factor": bool(privacy.get("two_factor", False)),
|
|
"auto_logout": max(5, min(480, int(privacy.get("auto_logout", 60)))) # 5-480 Minuten
|
|
},
|
|
"last_updated": datetime.now().isoformat()
|
|
}
|
|
|
|
# Prüfen, ob User-Tabelle eine settings-Spalte hat
|
|
if hasattr(user, 'settings'):
|
|
# Einstellungen in der Datenbank speichern
|
|
import json
|
|
user.settings = json.dumps(settings)
|
|
else:
|
|
# Fallback: In Session speichern (temporär)
|
|
session['user_settings'] = settings
|
|
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
|
|
user_logger.info(f"Benutzer {current_user.username} hat seine Einstellungen aktualisiert")
|
|
|
|
if is_json_request:
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Einstellungen erfolgreich aktualisiert",
|
|
"settings": settings
|
|
})
|
|
else:
|
|
flash("Einstellungen erfolgreich aktualisiert", "success")
|
|
return redirect(url_for("user_settings"))
|
|
|
|
except ValueError as e:
|
|
error = f"Ungültige Eingabedaten: {str(e)}"
|
|
user_logger.warning(f"Ungültige Einstellungsdaten von Benutzer {current_user.username}: {str(e)}")
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 400
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_settings"))
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
error = f"Fehler beim Aktualisieren der Einstellungen: {str(e)}"
|
|
user_logger.error(f"Fehler beim Aktualisieren der Einstellungen für Benutzer {current_user.username}: {str(e)}")
|
|
if is_json_request:
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
else:
|
|
flash("Fehler beim Speichern der Einstellungen", "error")
|
|
return redirect(url_for("user_settings"))
|
|
finally:
|
|
db_session.close()
|
|
|
|
@app.route("/user/change-password", methods=["POST"])
|
|
@login_required
|
|
def user_change_password():
|
|
"""Benutzerpasswort ändern"""
|
|
try:
|
|
# Überprüfen, ob es sich um eine JSON-Anfrage handelt
|
|
is_json_request = request.is_json or request.headers.get('Content-Type') == 'application/json'
|
|
|
|
if is_json_request:
|
|
data = request.get_json()
|
|
current_password = data.get("current_password")
|
|
new_password = data.get("new_password")
|
|
confirm_password = data.get("confirm_password")
|
|
else:
|
|
current_password = request.form.get("current_password")
|
|
new_password = request.form.get("new_password")
|
|
confirm_password = request.form.get("confirm_password")
|
|
|
|
# Prüfen, ob alle Felder ausgefüllt sind
|
|
if not current_password or not new_password or not confirm_password:
|
|
error = "Alle Passwortfelder müssen ausgefüllt sein."
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 400
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_profile"))
|
|
|
|
# Prüfen, ob das neue Passwort und die Bestätigung übereinstimmen
|
|
if new_password != confirm_password:
|
|
error = "Das neue Passwort und die Bestätigung stimmen nicht überein."
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 400
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_profile"))
|
|
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(User.id == current_user.id).first()
|
|
|
|
if user and user.check_password(current_password):
|
|
# Passwort aktualisieren
|
|
user.set_password(new_password)
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
|
|
user_logger.info(f"Benutzer {current_user.username} hat sein Passwort geändert")
|
|
|
|
if is_json_request:
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Passwort erfolgreich geändert"
|
|
})
|
|
else:
|
|
flash("Passwort erfolgreich geändert", "success")
|
|
return redirect(url_for("user_profile"))
|
|
else:
|
|
error = "Das aktuelle Passwort ist nicht korrekt."
|
|
if is_json_request:
|
|
return jsonify({"error": error}), 401
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_profile"))
|
|
|
|
except Exception as e:
|
|
error = f"Fehler beim Ändern des Passworts: {str(e)}"
|
|
user_logger.error(error)
|
|
if request.is_json:
|
|
return jsonify({"error": error}), 500
|
|
else:
|
|
flash(error, "error")
|
|
return redirect(url_for("user_profile"))
|
|
finally:
|
|
db_session.close()
|
|
|
|
@app.route("/user/export", methods=["GET"])
|
|
@login_required
|
|
def user_export_data():
|
|
"""Exportiert alle Benutzerdaten als JSON für DSGVO-Konformität"""
|
|
try:
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(User.id == current_user.id).first()
|
|
|
|
if not user:
|
|
db_session.close()
|
|
return jsonify({"error": "Benutzer nicht gefunden"}), 404
|
|
|
|
# Benutzerdaten abrufen
|
|
user_data = user.to_dict()
|
|
|
|
# Jobs des Benutzers abrufen
|
|
jobs = db_session.query(Job).filter(Job.user_id == user.id).all()
|
|
user_data["jobs"] = [job.to_dict() for job in jobs]
|
|
|
|
# Aktivitäten und Einstellungen hinzufügen
|
|
user_data["settings"] = session.get('user_settings', {})
|
|
|
|
# Persönliche Statistiken
|
|
user_data["statistics"] = {
|
|
"total_jobs": len(jobs),
|
|
"completed_jobs": len([j for j in jobs if j.status == "finished"]),
|
|
"failed_jobs": len([j for j in jobs if j.status == "failed"]),
|
|
"account_created": user.created_at.isoformat() if user.created_at else None,
|
|
"last_login": user.last_login.isoformat() if user.last_login else None
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
# Daten als JSON-Datei zum Download anbieten
|
|
response = make_response(json.dumps(user_data, indent=4))
|
|
response.headers["Content-Disposition"] = f"attachment; filename=user_data_{user.username}.json"
|
|
response.headers["Content-Type"] = "application/json"
|
|
|
|
user_logger.info(f"Benutzer {current_user.username} hat seine Daten exportiert")
|
|
return response
|
|
|
|
except Exception as e:
|
|
error = f"Fehler beim Exportieren der Benutzerdaten: {str(e)}"
|
|
user_logger.error(error)
|
|
return jsonify({"error": error}), 500
|
|
|
|
@app.route("/user/profile", methods=["PUT"])
|
|
@login_required
|
|
def user_update_profile_api():
|
|
"""API-Endpunkt zum Aktualisieren des Benutzerprofils"""
|
|
try:
|
|
if not request.is_json:
|
|
return jsonify({"error": "Anfrage muss im JSON-Format sein"}), 400
|
|
|
|
data = request.get_json()
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(User.id == current_user.id).first()
|
|
|
|
if not user:
|
|
db_session.close()
|
|
return jsonify({"error": "Benutzer nicht gefunden"}), 404
|
|
|
|
# Aktualisiere nur die bereitgestellten Felder
|
|
if "name" in data:
|
|
user.name = data["name"]
|
|
if "email" in data:
|
|
user.email = data["email"]
|
|
if "department" in data:
|
|
user.department = data["department"]
|
|
if "position" in data:
|
|
user.position = data["position"]
|
|
if "phone" in data:
|
|
user.phone = data["phone"]
|
|
if "bio" in data:
|
|
user.bio = data["bio"]
|
|
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
|
|
# Aktualisierte Benutzerdaten zurückgeben
|
|
user_data = user.to_dict()
|
|
db_session.close()
|
|
|
|
user_logger.info(f"Benutzer {current_user.username} hat sein Profil über die API aktualisiert")
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Profil erfolgreich aktualisiert",
|
|
"user": user_data
|
|
})
|
|
|
|
except Exception as e:
|
|
error = f"Fehler beim Aktualisieren des Profils: {str(e)}"
|
|
user_logger.error(error)
|
|
return jsonify({"error": error}), 500
|
|
|
|
# ===== KIOSK-KONTROLL-ROUTEN (ehemals kiosk_control.py) =====
|
|
|
|
@app.route('/api/kiosk/status', methods=['GET'])
|
|
def kiosk_get_status():
|
|
"""Kiosk-Status abrufen."""
|
|
try:
|
|
# Prüfen ob Kiosk-Modus aktiv ist
|
|
kiosk_active = os.path.exists('/tmp/kiosk_active')
|
|
|
|
return jsonify({
|
|
"active": kiosk_active,
|
|
"message": "Kiosk-Status erfolgreich abgerufen"
|
|
})
|
|
except Exception as e:
|
|
kiosk_logger.error(f"Fehler beim Abrufen des Kiosk-Status: {str(e)}")
|
|
return jsonify({"error": "Fehler beim Abrufen des Status"}), 500
|
|
|
|
@app.route('/api/kiosk/deactivate', methods=['POST'])
|
|
def kiosk_deactivate():
|
|
"""Kiosk-Modus mit Passwort deaktivieren."""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or 'password' not in data:
|
|
return jsonify({"error": "Passwort erforderlich"}), 400
|
|
|
|
password = data['password']
|
|
|
|
# Passwort überprüfen
|
|
if not check_password_hash(KIOSK_PASSWORD_HASH, password):
|
|
kiosk_logger.warning(f"Fehlgeschlagener Kiosk-Deaktivierungsversuch von IP: {request.remote_addr}")
|
|
return jsonify({"error": "Ungültiges Passwort"}), 401
|
|
|
|
# Kiosk deaktivieren
|
|
try:
|
|
# Kiosk-Service stoppen
|
|
subprocess.run(['sudo', 'systemctl', 'stop', 'myp-kiosk'], check=True)
|
|
subprocess.run(['sudo', 'systemctl', 'disable', 'myp-kiosk'], check=True)
|
|
|
|
# Kiosk-Marker entfernen
|
|
if os.path.exists('/tmp/kiosk_active'):
|
|
os.remove('/tmp/kiosk_active')
|
|
|
|
# Normale Desktop-Umgebung wiederherstellen
|
|
subprocess.run(['sudo', 'systemctl', 'set-default', 'graphical.target'], check=True)
|
|
|
|
kiosk_logger.info(f"Kiosk-Modus erfolgreich deaktiviert von IP: {request.remote_addr}")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Kiosk-Modus erfolgreich deaktiviert. System wird neu gestartet."
|
|
})
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
kiosk_logger.error(f"Fehler beim Deaktivieren des Kiosk-Modus: {str(e)}")
|
|
return jsonify({"error": "Fehler beim Deaktivieren des Kiosk-Modus"}), 500
|
|
|
|
except Exception as e:
|
|
kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Deaktivierung: {str(e)}")
|
|
return jsonify({"error": "Unerwarteter Fehler"}), 500
|
|
@app.route('/api/kiosk/activate', methods=['POST'])
|
|
@login_required
|
|
def kiosk_activate():
|
|
"""Kiosk-Modus aktivieren (nur für Admins)."""
|
|
try:
|
|
# Admin-Authentifizierung prüfen
|
|
if not current_user.is_admin:
|
|
kiosk_logger.warning(f"Nicht-Admin-Benutzer {current_user.username} versuchte Kiosk-Aktivierung")
|
|
return jsonify({"error": "Nur Administratoren können den Kiosk-Modus aktivieren"}), 403
|
|
|
|
# Kiosk aktivieren
|
|
try:
|
|
# Kiosk-Marker setzen
|
|
with open('/tmp/kiosk_active', 'w') as f:
|
|
f.write('1')
|
|
|
|
# Kiosk-Service aktivieren
|
|
subprocess.run(['sudo', 'systemctl', 'enable', 'myp-kiosk'], check=True)
|
|
subprocess.run(['sudo', 'systemctl', 'start', 'myp-kiosk'], check=True)
|
|
|
|
kiosk_logger.info(f"Kiosk-Modus erfolgreich aktiviert von Admin {current_user.username} (IP: {request.remote_addr})")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Kiosk-Modus erfolgreich aktiviert"
|
|
})
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
kiosk_logger.error(f"Fehler beim Aktivieren des Kiosk-Modus: {str(e)}")
|
|
return jsonify({"error": "Fehler beim Aktivieren des Kiosk-Modus"}), 500
|
|
|
|
except Exception as e:
|
|
kiosk_logger.error(f"Unerwarteter Fehler bei Kiosk-Aktivierung: {str(e)}")
|
|
return jsonify({"error": "Unerwarteter Fehler"}), 500
|
|
|
|
@app.route('/api/kiosk/restart', methods=['POST'])
|
|
def kiosk_restart_system():
|
|
"""System neu starten (nur nach Kiosk-Deaktivierung)."""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or 'password' not in data:
|
|
return jsonify({"error": "Passwort erforderlich"}), 400
|
|
|
|
password = data['password']
|
|
|
|
# Passwort überprüfen
|
|
if not check_password_hash(KIOSK_PASSWORD_HASH, password):
|
|
kiosk_logger.warning(f"Fehlgeschlagener Neustart-Versuch von IP: {request.remote_addr}")
|
|
return jsonify({"error": "Ungültiges Passwort"}), 401
|
|
|
|
kiosk_logger.info(f"System-Neustart initiiert von IP: {request.remote_addr}")
|
|
|
|
# System nach kurzer Verzögerung neu starten
|
|
subprocess.Popen(['sudo', 'shutdown', '-r', '+1'])
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "System wird in 1 Minute neu gestartet"
|
|
})
|
|
|
|
except Exception as e:
|
|
kiosk_logger.error(f"Fehler beim System-Neustart: {str(e)}")
|
|
return jsonify({"error": "Fehler beim Neustart"}), 500
|
|
|
|
# ===== HILFSFUNKTIONEN =====
|
|
|
|
def check_printer_status(ip_address: str, timeout: int = 7) -> Tuple[str, bool]:
|
|
"""
|
|
Überprüft den Status eines Druckers über Ping mit Timeout.
|
|
|
|
Args:
|
|
ip_address: IP-Adresse des Druckers
|
|
timeout: Timeout in Sekunden (Standard: 7)
|
|
|
|
Returns:
|
|
Tuple[str, bool]: (Status, Aktiv) - Status ist "online" oder "offline", Aktiv ist True/False
|
|
"""
|
|
if not ip_address or ip_address.strip() == "":
|
|
printers_logger.debug(f"Keine IP-Adresse angegeben")
|
|
return "offline", False
|
|
|
|
try:
|
|
# IP-Adresse validieren
|
|
import ipaddress
|
|
try:
|
|
ipaddress.ip_address(ip_address.strip())
|
|
except ValueError:
|
|
printers_logger.warning(f"Ungültige IP-Adresse: {ip_address}")
|
|
return "offline", False
|
|
|
|
# Windows-spezifischer Ping-Befehl mit Timeout
|
|
if os.name == 'nt': # Windows
|
|
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()]
|
|
else: # Unix/Linux/macOS
|
|
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()]
|
|
|
|
printers_logger.debug(f"Ping-Befehl für {ip_address}: {' '.join(cmd)}")
|
|
|
|
# Ping ausführen mit Timeout
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding='utf-8',
|
|
errors='ignore', # Ignoriere Unicode-Fehler
|
|
timeout=timeout + 2 # Zusätzlicher Timeout für subprocess
|
|
)
|
|
|
|
# Erfolgreicher Ping (Return Code 0)
|
|
if result.returncode == 0:
|
|
printers_logger.debug(f"Ping erfolgreich für {ip_address}")
|
|
return "online", True
|
|
else:
|
|
printers_logger.debug(f"Ping fehlgeschlagen für {ip_address} (Return Code: {result.returncode})")
|
|
return "offline", False
|
|
|
|
except subprocess.TimeoutExpired:
|
|
printers_logger.warning(f"Ping-Timeout für Drucker {ip_address} nach {timeout} Sekunden")
|
|
return "offline", False
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Ping für Drucker {ip_address}: {str(e)}")
|
|
return "offline", False
|
|
|
|
def check_multiple_printers_status(printers: List[Dict], timeout: int = 7) -> Dict[int, Tuple[str, bool]]:
|
|
"""
|
|
Überprüft den Status mehrerer Drucker parallel mit Timeout.
|
|
|
|
Args:
|
|
printers: Liste von Drucker-Dictionaries mit 'id' und 'ip_address'
|
|
timeout: Timeout in Sekunden pro Drucker (Standard: 7)
|
|
|
|
Returns:
|
|
Dict[int, Tuple[str, bool]]: Dictionary mit Drucker-ID als Key und (Status, Aktiv) als Value
|
|
"""
|
|
results = {}
|
|
|
|
# Parallel-Ausführung mit ThreadPoolExecutor
|
|
with ThreadPoolExecutor(max_workers=min(len(printers), 10)) as executor:
|
|
# Futures für alle Drucker erstellen
|
|
future_to_printer = {
|
|
executor.submit(check_printer_status, printer.get('ip_address'), timeout): printer
|
|
for printer in printers
|
|
}
|
|
|
|
# Ergebnisse sammeln
|
|
for future in as_completed(future_to_printer, timeout=timeout + 2):
|
|
printer = future_to_printer[future]
|
|
try:
|
|
status, active = future.result()
|
|
results[printer['id']] = (status, active)
|
|
printers_logger.info(f"Drucker {printer['name']} ({printer.get('ip_address')}): {status}")
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler bei Status-Check für Drucker {printer['name']}: {str(e)}")
|
|
results[printer['id']] = ("offline", False)
|
|
|
|
return results
|
|
|
|
# ===== UI-ROUTEN =====
|
|
|
|
@app.route("/")
|
|
def index():
|
|
if current_user.is_authenticated:
|
|
return render_template("index.html")
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
return render_template("dashboard.html")
|
|
|
|
@app.route("/profile")
|
|
@login_required
|
|
def profile_redirect():
|
|
"""Leitet zur neuen Profilseite im User-Blueprint weiter."""
|
|
return redirect(url_for("user_profile"))
|
|
|
|
@app.route("/profil")
|
|
@login_required
|
|
def profil_redirect():
|
|
"""Leitet zur neuen Profilseite im User-Blueprint weiter (deutsche URL)."""
|
|
return redirect(url_for("user_profile"))
|
|
|
|
@app.route("/settings")
|
|
@login_required
|
|
def settings_redirect():
|
|
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter."""
|
|
return redirect(url_for("user_settings"))
|
|
|
|
@app.route("/einstellungen")
|
|
@login_required
|
|
def einstellungen_redirect():
|
|
"""Leitet zur neuen Einstellungsseite im User-Blueprint weiter (deutsche URL)."""
|
|
return redirect(url_for("user_settings"))
|
|
|
|
@app.route("/admin")
|
|
@login_required
|
|
def admin():
|
|
"""Leitet zur neuen Admin-Dashboard-Route weiter."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
return redirect(url_for("admin_page"))
|
|
|
|
@app.route("/demo")
|
|
@login_required
|
|
def components_demo():
|
|
"""Demo-Seite für UI-Komponenten"""
|
|
return render_template("components_demo.html")
|
|
|
|
@app.route("/printers")
|
|
@login_required
|
|
def printers_page():
|
|
"""Zeigt die Übersichtsseite für Drucker an."""
|
|
return render_template("printers.html")
|
|
|
|
@app.route("/jobs")
|
|
@login_required
|
|
def jobs_page():
|
|
"""Zeigt die Übersichtsseite für Druckaufträge an."""
|
|
return render_template("jobs.html")
|
|
|
|
@app.route("/stats")
|
|
@login_required
|
|
def stats_page():
|
|
"""Zeigt die Statistik-Seite an."""
|
|
return render_template("stats.html")
|
|
|
|
@app.route("/admin-dashboard")
|
|
@login_required
|
|
def admin_page():
|
|
"""Zeigt die Administrationsseite an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
# Aktives Tab aus der URL auslesen oder Default-Wert verwenden
|
|
active_tab = request.args.get('tab', 'users')
|
|
|
|
# Daten für das Admin-Panel direkt beim Laden vorbereiten
|
|
stats = {}
|
|
users = []
|
|
printers = []
|
|
scheduler_status = {"running": False, "message": "Nicht verfügbar"}
|
|
system_info = {"cpu": 0, "memory": 0, "disk": 0}
|
|
logs = []
|
|
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Statistiken laden
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
# Benutzeranzahl
|
|
stats["total_users"] = db_session.query(User).count()
|
|
|
|
# Druckeranzahl und Online-Status
|
|
all_printers = db_session.query(Printer).all()
|
|
stats["total_printers"] = len(all_printers)
|
|
stats["online_printers"] = len([p for p in all_printers if p.status == "online"])
|
|
|
|
# Aktive Jobs und Warteschlange
|
|
stats["active_jobs"] = db_session.query(Job).filter(
|
|
Job.status.in_(["printing", "running"])
|
|
).count()
|
|
|
|
stats["queued_jobs"] = db_session.query(Job).filter(
|
|
Job.status == "scheduled"
|
|
).count()
|
|
|
|
# Erfolgsrate
|
|
total_jobs = db_session.query(Job).filter(
|
|
Job.status.in_(["completed", "failed", "cancelled"])
|
|
).count()
|
|
|
|
successful_jobs = db_session.query(Job).filter(
|
|
Job.status == "completed"
|
|
).count()
|
|
|
|
if total_jobs > 0:
|
|
stats["success_rate"] = int((successful_jobs / total_jobs) * 100)
|
|
else:
|
|
stats["success_rate"] = 0
|
|
|
|
# Benutzer laden
|
|
if active_tab == 'users':
|
|
users = db_session.query(User).all()
|
|
users = [user.to_dict() for user in users]
|
|
|
|
# Drucker laden
|
|
if active_tab == 'printers':
|
|
printers = db_session.query(Printer).all()
|
|
printers = [printer.to_dict() for printer in printers]
|
|
|
|
# Scheduler-Status laden
|
|
if active_tab == 'scheduler':
|
|
from utils.scheduler import scheduler_is_running
|
|
scheduler_status = {
|
|
"running": scheduler_is_running(),
|
|
"message": "Der Scheduler läuft" if scheduler_is_running() else "Der Scheduler ist gestoppt"
|
|
}
|
|
|
|
# System-Informationen laden
|
|
if active_tab == 'system':
|
|
import os
|
|
import psutil
|
|
|
|
# CPU und Memory
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
# Uptime
|
|
boot_time = psutil.boot_time()
|
|
uptime_seconds = time.time() - boot_time
|
|
uptime_days = int(uptime_seconds // 86400)
|
|
uptime_hours = int((uptime_seconds % 86400) // 3600)
|
|
uptime_minutes = int((uptime_seconds % 3600) // 60)
|
|
|
|
# Datenbank-Status
|
|
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'database', 'myp.db')
|
|
db_size = 0
|
|
if os.path.exists(db_path):
|
|
db_size = os.path.getsize(db_path) / (1024 * 1024) # MB
|
|
|
|
# Scheduler-Status
|
|
scheduler_running = False
|
|
scheduler_jobs = 0
|
|
try:
|
|
from utils.job_scheduler import scheduler
|
|
scheduler_running = scheduler.running
|
|
if hasattr(scheduler, 'get_jobs'):
|
|
scheduler_jobs = len(scheduler.get_jobs())
|
|
except:
|
|
pass
|
|
|
|
# Nächster Job
|
|
next_job = db_session.query(Job).filter(
|
|
Job.status == "scheduled"
|
|
).order_by(Job.created_at.asc()).first()
|
|
|
|
next_job_time = "Keine geplanten Jobs"
|
|
if next_job:
|
|
next_job_time = next_job.created_at.strftime("%d.%m.%Y %H:%M")
|
|
|
|
system_info = {
|
|
"cpu_usage": round(cpu_percent, 1),
|
|
"memory_usage": round(memory.percent, 1),
|
|
"disk_usage": round((disk.used / disk.total) * 100, 1),
|
|
"uptime": f"{uptime_days}d {uptime_hours}h {uptime_minutes}m",
|
|
"db_size": f"{db_size:.1f} MB",
|
|
"db_connections": "Aktiv",
|
|
"scheduler_running": scheduler_running,
|
|
"scheduler_jobs": scheduler_jobs,
|
|
"next_job": next_job_time
|
|
}
|
|
|
|
# Logs laden
|
|
if active_tab == 'logs':
|
|
import os
|
|
log_level = request.args.get('log_level', 'all')
|
|
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
|
|
|
|
# Logeinträge sammeln
|
|
app_logs = []
|
|
for category in ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']:
|
|
log_file = os.path.join(log_dir, category, f'{category}.log')
|
|
if os.path.exists(log_file):
|
|
with open(log_file, 'r') as f:
|
|
for line in f.readlines()[-100:]: # Nur die letzten 100 Zeilen pro Datei
|
|
if log_level != 'all':
|
|
if log_level.upper() not in line:
|
|
continue
|
|
app_logs.append({
|
|
'timestamp': line.split(' - ')[0] if ' - ' in line else '',
|
|
'level': line.split(' - ')[1].split(' - ')[0] if ' - ' in line and len(line.split(' - ')) > 2 else 'INFO',
|
|
'category': category,
|
|
'message': ' - '.join(line.split(' - ')[2:]) if ' - ' in line and len(line.split(' - ')) > 2 else line
|
|
})
|
|
|
|
# Nach Zeitstempel sortieren (neueste zuerst)
|
|
logs = sorted(app_logs, key=lambda x: x['timestamp'] if x['timestamp'] else '', reverse=True)[:100]
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
return render_template(
|
|
"admin.html",
|
|
active_tab=active_tab,
|
|
stats=stats,
|
|
users=users,
|
|
printers=printers,
|
|
scheduler_status=scheduler_status,
|
|
system_info=system_info,
|
|
logs=logs
|
|
)
|
|
|
|
# Direkter Zugriff auf Logout-Route (für Fallback)
|
|
@app.route("/logout", methods=["GET", "POST"])
|
|
def logout_redirect():
|
|
"""Leitet zur Blueprint-Logout-Route weiter."""
|
|
return redirect(url_for("auth_logout"))
|
|
|
|
# ===== JOB-ROUTEN =====
|
|
|
|
@app.route("/api/jobs", methods=["GET"])
|
|
@login_required
|
|
def get_jobs():
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Import joinedload for eager loading
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
# Admin sieht alle Jobs, User nur eigene
|
|
if current_user.is_admin:
|
|
# Eagerly load the user and printer relationships to avoid detached instance errors
|
|
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).all()
|
|
else:
|
|
jobs = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.user_id == int(current_user.id)).all()
|
|
|
|
# Convert jobs to dictionaries before closing the session
|
|
job_dicts = [job.to_dict() for job in jobs]
|
|
|
|
db_session.close()
|
|
|
|
return jsonify({
|
|
"jobs": job_dicts
|
|
})
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abrufen von Jobs: {str(e)}")
|
|
db_session.close()
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/jobs/<int:job_id>", methods=["GET"])
|
|
@login_required
|
|
@job_owner_required
|
|
def get_job(job_id):
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
from sqlalchemy.orm import joinedload
|
|
# Eagerly load the user and printer relationships
|
|
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Convert to dict before closing session
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
return jsonify(job_dict)
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abrufen des Jobs {job_id}: {str(e)}")
|
|
db_session.close()
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route('/api/jobs/active', methods=['GET'])
|
|
@login_required
|
|
def get_active_jobs():
|
|
"""
|
|
Gibt alle aktiven Jobs zurück.
|
|
"""
|
|
try:
|
|
db_session = get_db_session()
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
active_jobs = db_session.query(Job).options(
|
|
joinedload(Job.user),
|
|
joinedload(Job.printer)
|
|
).filter(
|
|
Job.status.in_(["scheduled", "running"])
|
|
).all()
|
|
|
|
result = []
|
|
for job in active_jobs:
|
|
job_dict = job.to_dict()
|
|
# Aktuelle Restzeit berechnen
|
|
if job.status == "running" and job.end_at:
|
|
remaining_time = job.end_at - datetime.now()
|
|
if remaining_time.total_seconds() > 0:
|
|
job_dict["remaining_minutes"] = int(remaining_time.total_seconds() / 60)
|
|
else:
|
|
job_dict["remaining_minutes"] = 0
|
|
|
|
result.append(job_dict)
|
|
|
|
db_session.close()
|
|
return jsonify({"jobs": result})
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abrufen aktiver Jobs: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route('/api/jobs', methods=['POST'])
|
|
@login_required
|
|
def create_job():
|
|
"""
|
|
Erstellt einen neuen Job mit dem Status "scheduled".
|
|
|
|
Body: {
|
|
"printer_id": int,
|
|
"start_iso": str, # ISO-Datum-String
|
|
"duration_minutes": int
|
|
}
|
|
"""
|
|
try:
|
|
data = request.json
|
|
|
|
# Pflichtfelder prüfen
|
|
required_fields = ["printer_id", "start_iso", "duration_minutes"]
|
|
for field in required_fields:
|
|
if field not in data:
|
|
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
|
|
|
|
# Daten extrahieren und validieren
|
|
printer_id = int(data["printer_id"])
|
|
start_iso = data["start_iso"]
|
|
duration_minutes = int(data["duration_minutes"])
|
|
|
|
# Optional: Jobtitel und Dateipfad
|
|
name = data.get("name", f"Druckjob vom {datetime.now().strftime('%d.%m.%Y')}")
|
|
file_path = data.get("file_path")
|
|
|
|
# Start-Zeit parsen
|
|
try:
|
|
start_at = datetime.fromisoformat(start_iso)
|
|
except ValueError:
|
|
return jsonify({"error": "Ungültiges Startdatum"}), 400
|
|
|
|
# Dauer validieren
|
|
if duration_minutes <= 0:
|
|
return jsonify({"error": "Dauer muss größer als 0 sein"}), 400
|
|
|
|
# End-Zeit berechnen
|
|
end_at = start_at + timedelta(minutes=duration_minutes)
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob der Drucker existiert
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
if not printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Drucker nicht gefunden"}), 404
|
|
|
|
# Neuen Job erstellen
|
|
new_job = Job(
|
|
name=name,
|
|
printer_id=printer_id,
|
|
user_id=current_user.id,
|
|
owner_id=current_user.id,
|
|
start_at=start_at,
|
|
end_at=end_at,
|
|
status="scheduled",
|
|
file_path=file_path,
|
|
duration_minutes=duration_minutes
|
|
)
|
|
|
|
db_session.add(new_job)
|
|
db_session.commit()
|
|
|
|
# Job-Objekt für die Antwort serialisieren
|
|
job_dict = new_job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Neuer Job {new_job.id} erstellt für Drucker {printer_id}, Start: {start_at}, Dauer: {duration_minutes} Minuten")
|
|
return jsonify({"job": job_dict}), 201
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Erstellen eines Jobs: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route('/api/jobs/<int:job_id>/extend', methods=['POST'])
|
|
@login_required
|
|
@job_owner_required
|
|
def extend_job(job_id):
|
|
"""
|
|
Verlängert die Endzeit eines Jobs.
|
|
|
|
Body: {
|
|
"extra_minutes": int
|
|
}
|
|
"""
|
|
try:
|
|
data = request.json
|
|
|
|
# Prüfen, ob die erforderlichen Daten vorhanden sind
|
|
if "extra_minutes" not in data:
|
|
return jsonify({"error": "Feld 'extra_minutes' fehlt"}), 400
|
|
|
|
extra_minutes = int(data["extra_minutes"])
|
|
|
|
# Validieren
|
|
if extra_minutes <= 0:
|
|
return jsonify({"error": "Zusätzliche Minuten müssen größer als 0 sein"}), 400
|
|
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).get(job_id)
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob der Job verlängert werden kann
|
|
if job.status not in ["scheduled", "running"]:
|
|
db_session.close()
|
|
return jsonify({"error": f"Job kann im Status '{job.status}' nicht verlängert werden"}), 400
|
|
|
|
# Endzeit aktualisieren
|
|
job.end_at = job.end_at + timedelta(minutes=extra_minutes)
|
|
job.duration_minutes += extra_minutes
|
|
|
|
db_session.commit()
|
|
|
|
# Job-Objekt für die Antwort serialisieren
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Job {job_id} um {extra_minutes} Minuten verlängert, neue Endzeit: {job.end_at}")
|
|
return jsonify({"job": job_dict})
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Verlängern von Job {job_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
@app.route('/api/jobs/<int:job_id>/finish', methods=['POST'])
|
|
@login_required
|
|
def finish_job(job_id):
|
|
"""
|
|
Beendet einen Job manuell und schaltet die Steckdose aus.
|
|
Nur für Administratoren erlaubt.
|
|
"""
|
|
try:
|
|
# Prüfen, ob der Benutzer Administrator ist
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Jobs manuell beenden"}), 403
|
|
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).options(joinedload(Job.printer)).get(job_id)
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob der Job beendet werden kann
|
|
if job.status not in ["scheduled", "running"]:
|
|
db_session.close()
|
|
return jsonify({"error": f"Job kann im Status '{job.status}' nicht beendet werden"}), 400
|
|
|
|
# Steckdose ausschalten
|
|
from utils.job_scheduler import toggle_plug
|
|
if not toggle_plug(job.printer_id, False):
|
|
# Trotzdem weitermachen, aber Warnung loggen
|
|
jobs_logger.warning(f"Steckdose für Job {job_id} konnte nicht ausgeschaltet werden")
|
|
|
|
# Job als beendet markieren
|
|
job.status = "finished"
|
|
job.actual_end_time = datetime.now()
|
|
|
|
db_session.commit()
|
|
|
|
# Job-Objekt für die Antwort serialisieren
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Job {job_id} manuell beendet durch Admin {current_user.id}")
|
|
return jsonify({"job": job_dict})
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim manuellen Beenden von Job {job_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
|
|
|
|
# ===== DRUCKER-ROUTEN =====
|
|
|
|
@app.route("/api/printers", methods=["GET"])
|
|
@login_required
|
|
def get_printers():
|
|
"""Gibt alle Drucker zurück - OHNE Status-Check für schnelleres Laden."""
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Windows-kompatible Timeout-Implementierung
|
|
import threading
|
|
import time
|
|
|
|
printers = None
|
|
timeout_occurred = False
|
|
|
|
def fetch_printers():
|
|
nonlocal printers, timeout_occurred
|
|
try:
|
|
printers = db_session.query(Printer).all()
|
|
except Exception as e:
|
|
printers_logger.error(f"Datenbankfehler beim Laden der Drucker: {str(e)}")
|
|
timeout_occurred = True
|
|
|
|
# Starte Datenbankabfrage in separatem Thread
|
|
thread = threading.Thread(target=fetch_printers)
|
|
thread.daemon = True
|
|
thread.start()
|
|
thread.join(timeout=5) # 5 Sekunden Timeout
|
|
|
|
if thread.is_alive() or timeout_occurred or printers is None:
|
|
printers_logger.warning("Database timeout when fetching printers for basic loading")
|
|
return jsonify({
|
|
'error': 'Database timeout beim Laden der Drucker',
|
|
'timeout': True,
|
|
'printers': []
|
|
}), 408
|
|
|
|
# Drucker-Daten OHNE Status-Check zusammenstellen für schnelles Laden
|
|
printer_data = []
|
|
current_time = datetime.now()
|
|
|
|
for printer in printers:
|
|
printer_data.append({
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model or 'Unbekanntes Modell',
|
|
"location": printer.location or 'Unbekannter Standort',
|
|
"mac_address": printer.mac_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"status": printer.status or "offline", # Letzter bekannter Status
|
|
"active": printer.active if hasattr(printer, 'active') else True,
|
|
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
|
|
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
|
|
"last_checked": printer.last_checked.isoformat() if hasattr(printer, 'last_checked') and printer.last_checked else None
|
|
})
|
|
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Schnelles Laden abgeschlossen: {len(printer_data)} Drucker geladen (ohne Status-Check)")
|
|
|
|
return jsonify({
|
|
"printers": printer_data,
|
|
"count": len(printer_data),
|
|
"message": "Drucker erfolgreich geladen"
|
|
})
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
|
|
return jsonify({
|
|
"error": f"Fehler beim Laden der Drucker: {str(e)}",
|
|
"printers": []
|
|
}), 500
|
|
|
|
@app.route("/api/printers/status", methods=["GET"])
|
|
@login_required
|
|
def get_printers_with_status():
|
|
"""Gibt alle Drucker MIT aktuellem Status-Check zurück - für Aktualisierung."""
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
# Windows-kompatible Timeout-Implementierung
|
|
import threading
|
|
import time
|
|
|
|
printers = None
|
|
timeout_occurred = False
|
|
|
|
def fetch_printers():
|
|
nonlocal printers, timeout_occurred
|
|
try:
|
|
printers = db_session.query(Printer).all()
|
|
except Exception as e:
|
|
printers_logger.error(f"Datenbankfehler beim Status-Check: {str(e)}")
|
|
timeout_occurred = True
|
|
|
|
# Starte Datenbankabfrage in separatem Thread
|
|
thread = threading.Thread(target=fetch_printers)
|
|
thread.daemon = True
|
|
thread.start()
|
|
thread.join(timeout=8) # 8 Sekunden Timeout für Status-Check
|
|
|
|
if thread.is_alive() or timeout_occurred or printers is None:
|
|
printers_logger.warning("Database timeout when fetching printers for status check")
|
|
return jsonify({
|
|
'error': 'Database timeout beim Status-Check der Drucker',
|
|
'timeout': True
|
|
}), 408
|
|
|
|
# Drucker-Daten für Status-Check vorbereiten
|
|
printer_data = []
|
|
for printer in printers:
|
|
# Verwende plug_ip als primäre IP-Adresse, fallback auf ip_address
|
|
ip_to_check = printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None)
|
|
printer_data.append({
|
|
'id': printer.id,
|
|
'name': printer.name,
|
|
'ip_address': ip_to_check,
|
|
'location': printer.location,
|
|
'model': printer.model
|
|
})
|
|
|
|
# Status aller Drucker parallel überprüfen mit 7-Sekunden-Timeout
|
|
printers_logger.info(f"Starte Status-Check für {len(printer_data)} Drucker mit 7-Sekunden-Timeout")
|
|
|
|
# Fallback: Wenn keine IP-Adressen vorhanden sind, alle als offline markieren
|
|
if not any(p['ip_address'] for p in printer_data):
|
|
printers_logger.warning("Keine IP-Adressen für Drucker gefunden - alle als offline markiert")
|
|
status_results = {p['id']: ("offline", False) for p in printer_data}
|
|
else:
|
|
try:
|
|
status_results = check_multiple_printers_status(printer_data, timeout=7)
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Status-Check: {str(e)}")
|
|
# Fallback: alle als offline markieren
|
|
status_results = {p['id']: ("offline", False) for p in printer_data}
|
|
|
|
# Ergebnisse zusammenstellen und Datenbank aktualisieren
|
|
status_data = []
|
|
current_time = datetime.now()
|
|
|
|
for printer in printers:
|
|
if printer.id in status_results:
|
|
status, active = status_results[printer.id]
|
|
# Mapping für Frontend-Kompatibilität
|
|
if status == "online":
|
|
frontend_status = "available"
|
|
else:
|
|
frontend_status = "offline"
|
|
else:
|
|
# Fallback falls kein Ergebnis vorliegt
|
|
frontend_status = "offline"
|
|
active = False
|
|
|
|
# Status in der Datenbank aktualisieren
|
|
printer.status = frontend_status
|
|
printer.active = active
|
|
|
|
# Setze last_checked falls das Feld existiert
|
|
if hasattr(printer, 'last_checked'):
|
|
printer.last_checked = current_time
|
|
|
|
status_data.append({
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model or 'Unbekanntes Modell',
|
|
"location": printer.location or 'Unbekannter Standort',
|
|
"mac_address": printer.mac_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"status": frontend_status,
|
|
"active": active,
|
|
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
|
|
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
|
|
"last_checked": current_time.isoformat()
|
|
})
|
|
|
|
# Speichere die aktualisierten Status
|
|
try:
|
|
db_session.commit()
|
|
printers_logger.info("Drucker-Status erfolgreich in Datenbank aktualisiert")
|
|
except Exception as e:
|
|
printers_logger.warning(f"Fehler beim Speichern der Status-Updates: {str(e)}")
|
|
# Nicht kritisch, Status-Check kann trotzdem zurückgegeben werden
|
|
|
|
db_session.close()
|
|
|
|
online_count = len([s for s in status_data if s['status'] == 'available'])
|
|
printers_logger.info(f"Status-Check abgeschlossen: {online_count} von {len(status_data)} Drucker online")
|
|
|
|
return jsonify(status_data)
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
printers_logger.error(f"Fehler beim Status-Check der Drucker: {str(e)}")
|
|
return jsonify({
|
|
"error": f"Fehler beim Status-Check: {str(e)}",
|
|
"printers": []
|
|
}), 500
|
|
|
|
@app.route("/api/jobs/current", methods=["GET"])
|
|
@login_required
|
|
def get_current_job():
|
|
"""Gibt den aktuellen Job des Benutzers zurück."""
|
|
db_session = get_db_session()
|
|
try:
|
|
current_job = db_session.query(Job).filter(
|
|
Job.user_id == current_user.id,
|
|
Job.status.in_(["scheduled", "running"])
|
|
).order_by(Job.start_at).first()
|
|
|
|
if current_job:
|
|
job_data = current_job.to_dict()
|
|
else:
|
|
job_data = None
|
|
|
|
db_session.close()
|
|
return jsonify(job_data)
|
|
except Exception as e:
|
|
db_session.close()
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
# ===== WEITERE API-ROUTEN =====
|
|
|
|
@app.route("/api/printers/<int:printer_id>", methods=["GET"])
|
|
@login_required
|
|
def get_printer(printer_id):
|
|
"""Gibt einen spezifischen Drucker zurück."""
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
|
|
if not printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Drucker nicht gefunden"}), 404
|
|
|
|
# Status-Check für diesen Drucker
|
|
ip_to_check = printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None)
|
|
if ip_to_check:
|
|
status, active = check_printer_status(ip_to_check)
|
|
printer.status = "available" if status == "online" else "offline"
|
|
printer.active = active
|
|
db_session.commit()
|
|
|
|
printer_data = {
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model or 'Unbekanntes Modell',
|
|
"location": printer.location or 'Unbekannter Standort',
|
|
"mac_address": printer.mac_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"status": printer.status or "offline",
|
|
"active": printer.active if hasattr(printer, 'active') else True,
|
|
"ip_address": ip_to_check,
|
|
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify(printer_data)
|
|
|
|
except Exception as e:
|
|
db_session.close()
|
|
printers_logger.error(f"Fehler beim Abrufen des Druckers {printer_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/printers", methods=["POST"])
|
|
@login_required
|
|
def create_printer():
|
|
"""Erstellt einen neuen Drucker (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Drucker erstellen"}), 403
|
|
|
|
try:
|
|
data = request.json
|
|
|
|
# Pflichtfelder prüfen
|
|
required_fields = ["name", "plug_ip"]
|
|
for field in required_fields:
|
|
if field not in data:
|
|
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob bereits ein Drucker mit diesem Namen existiert
|
|
existing_printer = db_session.query(Printer).filter(Printer.name == data["name"]).first()
|
|
if existing_printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Ein Drucker mit diesem Namen existiert bereits"}), 400
|
|
|
|
# Neuen Drucker erstellen
|
|
new_printer = Printer(
|
|
name=data["name"],
|
|
model=data.get("model", ""),
|
|
location=data.get("location", ""),
|
|
mac_address=data.get("mac_address", ""),
|
|
plug_ip=data["plug_ip"],
|
|
status="offline",
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
db_session.add(new_printer)
|
|
db_session.commit()
|
|
|
|
printer_data = {
|
|
"id": new_printer.id,
|
|
"name": new_printer.name,
|
|
"model": new_printer.model,
|
|
"location": new_printer.location,
|
|
"mac_address": new_printer.mac_address,
|
|
"plug_ip": new_printer.plug_ip,
|
|
"status": new_printer.status,
|
|
"created_at": new_printer.created_at.isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}")
|
|
return jsonify({"printer": printer_data}), 201
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Erstellen eines Druckers: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/printers/<int:printer_id>", methods=["PUT"])
|
|
@login_required
|
|
def update_printer(printer_id):
|
|
"""Aktualisiert einen Drucker (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Drucker bearbeiten"}), 403
|
|
|
|
try:
|
|
data = request.json
|
|
db_session = get_db_session()
|
|
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
if not printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Drucker nicht gefunden"}), 404
|
|
|
|
# Aktualisierbare Felder
|
|
updatable_fields = ["name", "model", "location", "mac_address", "plug_ip"]
|
|
for field in updatable_fields:
|
|
if field in data:
|
|
setattr(printer, field, data[field])
|
|
|
|
db_session.commit()
|
|
|
|
printer_data = {
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model,
|
|
"location": printer.location,
|
|
"mac_address": printer.mac_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"status": printer.status,
|
|
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Drucker {printer_id} aktualisiert von Admin {current_user.id}")
|
|
return jsonify({"printer": printer_data})
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Aktualisieren des Druckers {printer_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/printers/<int:printer_id>", methods=["DELETE"])
|
|
@login_required
|
|
def delete_printer(printer_id):
|
|
"""Löscht einen Drucker (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Drucker löschen"}), 403
|
|
|
|
try:
|
|
db_session = get_db_session()
|
|
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
if not printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Drucker nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob noch aktive Jobs für diesen Drucker existieren
|
|
active_jobs = db_session.query(Job).filter(
|
|
Job.printer_id == printer_id,
|
|
Job.status.in_(["scheduled", "running"])
|
|
).count()
|
|
|
|
if active_jobs > 0:
|
|
db_session.close()
|
|
return jsonify({"error": f"Drucker kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400
|
|
|
|
printer_name = printer.name
|
|
db_session.delete(printer)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Drucker '{printer_name}' (ID: {printer_id}) gelöscht von Admin {current_user.id}")
|
|
return jsonify({"message": "Drucker erfolgreich gelöscht"})
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Löschen des Druckers {printer_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/jobs/<int:job_id>", methods=["DELETE"])
|
|
@login_required
|
|
@job_owner_required
|
|
def delete_job(job_id):
|
|
"""Löscht einen Job."""
|
|
try:
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).get(job_id)
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob der Job gelöscht werden kann
|
|
if job.status == "running":
|
|
db_session.close()
|
|
return jsonify({"error": "Laufende Jobs können nicht gelöscht werden"}), 400
|
|
|
|
job_name = job.name
|
|
db_session.delete(job)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Job '{job_name}' (ID: {job_id}) gelöscht von Benutzer {current_user.id}")
|
|
return jsonify({"message": "Job erfolgreich gelöscht"})
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Löschen des Jobs {job_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/jobs/<int:job_id>/cancel", methods=["POST"])
|
|
@login_required
|
|
@job_owner_required
|
|
def cancel_job(job_id):
|
|
"""Bricht einen Job ab."""
|
|
try:
|
|
db_session = get_db_session()
|
|
job = db_session.query(Job).get(job_id)
|
|
|
|
if not job:
|
|
db_session.close()
|
|
return jsonify({"error": "Job nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob der Job abgebrochen werden kann
|
|
if job.status not in ["scheduled", "running"]:
|
|
db_session.close()
|
|
return jsonify({"error": f"Job kann im Status '{job.status}' nicht abgebrochen werden"}), 400
|
|
|
|
# Job als abgebrochen markieren
|
|
job.status = "cancelled"
|
|
job.actual_end_time = datetime.now()
|
|
|
|
# Wenn der Job läuft, Steckdose ausschalten
|
|
if job.status == "running":
|
|
from utils.job_scheduler import toggle_plug
|
|
toggle_plug(job.printer_id, False)
|
|
|
|
db_session.commit()
|
|
|
|
job_dict = job.to_dict()
|
|
db_session.close()
|
|
|
|
jobs_logger.info(f"Job {job_id} abgebrochen von Benutzer {current_user.id}")
|
|
return jsonify({"job": job_dict})
|
|
|
|
except Exception as e:
|
|
jobs_logger.error(f"Fehler beim Abbrechen des Jobs {job_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/stats", methods=["GET"])
|
|
@login_required
|
|
def get_stats():
|
|
"""Gibt Statistiken zurück."""
|
|
try:
|
|
db_session = get_db_session()
|
|
|
|
# Grundlegende Statistiken
|
|
total_users = db_session.query(User).count()
|
|
total_printers = db_session.query(Printer).count()
|
|
total_jobs = db_session.query(Job).count()
|
|
|
|
# Jobs nach Status
|
|
completed_jobs = db_session.query(Job).filter(Job.status == "completed").count()
|
|
failed_jobs = db_session.query(Job).filter(Job.status == "failed").count()
|
|
cancelled_jobs = db_session.query(Job).filter(Job.status == "cancelled").count()
|
|
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
|
|
|
|
# Online-Drucker
|
|
online_printers = db_session.query(Printer).filter(Printer.status == "available").count()
|
|
|
|
# Erfolgsrate
|
|
finished_jobs = completed_jobs + failed_jobs + cancelled_jobs
|
|
success_rate = (completed_jobs / finished_jobs * 100) if finished_jobs > 0 else 0
|
|
|
|
# Benutzer-spezifische Statistiken (falls nicht Admin)
|
|
user_stats = {}
|
|
if not current_user.is_admin:
|
|
user_jobs = db_session.query(Job).filter(Job.user_id == current_user.id).count()
|
|
user_completed = db_session.query(Job).filter(
|
|
Job.user_id == current_user.id,
|
|
Job.status == "completed"
|
|
).count()
|
|
user_stats = {
|
|
"total_jobs": user_jobs,
|
|
"completed_jobs": user_completed,
|
|
"success_rate": (user_completed / user_jobs * 100) if user_jobs > 0 else 0
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
stats = {
|
|
"total_users": total_users,
|
|
"total_printers": total_printers,
|
|
"online_printers": online_printers,
|
|
"total_jobs": total_jobs,
|
|
"completed_jobs": completed_jobs,
|
|
"failed_jobs": failed_jobs,
|
|
"cancelled_jobs": cancelled_jobs,
|
|
"active_jobs": active_jobs,
|
|
"success_rate": round(success_rate, 1),
|
|
"user_stats": user_stats
|
|
}
|
|
|
|
return jsonify(stats)
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen der Statistiken: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/admin/users", methods=["GET"])
|
|
@login_required
|
|
def get_users():
|
|
"""Gibt alle Benutzer zurück (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Benutzer anzeigen"}), 403
|
|
|
|
try:
|
|
db_session = get_db_session()
|
|
users = db_session.query(User).all()
|
|
|
|
user_data = []
|
|
for user in users:
|
|
user_data.append({
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"first_name": user.first_name,
|
|
"last_name": user.last_name,
|
|
"is_admin": user.is_admin,
|
|
"created_at": user.created_at.isoformat() if user.created_at else None,
|
|
"last_login": user.last_login.isoformat() if hasattr(user, 'last_login') and user.last_login else None
|
|
})
|
|
|
|
db_session.close()
|
|
return jsonify({"users": user_data})
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen der Benutzer: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/admin/users/<int:user_id>", methods=["PUT"])
|
|
@login_required
|
|
def update_user(user_id):
|
|
"""Aktualisiert einen Benutzer (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Benutzer bearbeiten"}), 403
|
|
|
|
try:
|
|
data = request.json
|
|
db_session = get_db_session()
|
|
|
|
user = db_session.get(User, user_id)
|
|
if not user:
|
|
db_session.close()
|
|
return jsonify({"error": "Benutzer nicht gefunden"}), 404
|
|
|
|
# Aktualisierbare Felder
|
|
updatable_fields = ["username", "email", "first_name", "last_name", "is_admin"]
|
|
for field in updatable_fields:
|
|
if field in data:
|
|
setattr(user, field, data[field])
|
|
|
|
# Passwort separat behandeln
|
|
if "password" in data and data["password"]:
|
|
user.set_password(data["password"])
|
|
|
|
db_session.commit()
|
|
|
|
user_data = {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"first_name": user.first_name,
|
|
"last_name": user.last_name,
|
|
"is_admin": user.is_admin,
|
|
"created_at": user.created_at.isoformat() if user.created_at else None
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
user_logger.info(f"Benutzer {user_id} aktualisiert von Admin {current_user.id}")
|
|
return jsonify({"user": user_data})
|
|
|
|
except Exception as e:
|
|
user_logger.error(f"Fehler beim Aktualisieren des Benutzers {user_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/admin/users/<int:user_id>", methods=["DELETE"])
|
|
@login_required
|
|
def delete_user(user_id):
|
|
"""Löscht einen Benutzer (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Benutzer löschen"}), 403
|
|
|
|
# Verhindern, dass sich der Admin selbst löscht
|
|
if user_id == current_user.id:
|
|
return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400
|
|
|
|
try:
|
|
db_session = get_db_session()
|
|
|
|
user = db_session.get(User, user_id)
|
|
if not user:
|
|
db_session.close()
|
|
return jsonify({"error": "Benutzer nicht gefunden"}), 404
|
|
|
|
# Prüfen, ob noch aktive Jobs für diesen Benutzer existieren
|
|
active_jobs = db_session.query(Job).filter(
|
|
Job.user_id == user_id,
|
|
Job.status.in_(["scheduled", "running"])
|
|
).count()
|
|
|
|
if active_jobs > 0:
|
|
db_session.close()
|
|
return jsonify({"error": f"Benutzer kann nicht gelöscht werden: {active_jobs} aktive Jobs vorhanden"}), 400
|
|
|
|
username = user.username
|
|
db_session.delete(user)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
user_logger.info(f"Benutzer '{username}' (ID: {user_id}) gelöscht von Admin {current_user.id}")
|
|
return jsonify({"message": "Benutzer erfolgreich gelöscht"})
|
|
|
|
except Exception as e:
|
|
user_logger.error(f"Fehler beim Löschen des Benutzers {user_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
# ===== FEHLERBEHANDLUNG =====
|
|
|
|
@app.errorhandler(404)
|
|
def not_found_error(error):
|
|
return render_template('errors/404.html'), 404
|
|
|
|
@app.errorhandler(500)
|
|
def internal_error(error):
|
|
return render_template('errors/500.html'), 500
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden_error(error):
|
|
return render_template('errors/403.html'), 403
|
|
|
|
|
|
|
|
# ===== ADMIN - DATENBANK-VERWALTUNG =====
|
|
|
|
@app.route('/api/admin/database/stats', methods=['GET'])
|
|
@admin_required
|
|
def get_database_stats():
|
|
"""Gibt Datenbank-Statistiken zurück."""
|
|
try:
|
|
stats = database_monitor.get_database_stats()
|
|
return jsonify({
|
|
"success": True,
|
|
"stats": stats
|
|
})
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen der Datenbank-Statistiken: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route('/api/admin/database/health', methods=['GET'])
|
|
@admin_required
|
|
def check_database_health():
|
|
"""Führt eine Datenbank-Gesundheitsprüfung durch."""
|
|
try:
|
|
health = database_monitor.check_database_health()
|
|
return jsonify({
|
|
"success": True,
|
|
"health": health
|
|
})
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler bei Datenbank-Gesundheitsprüfung: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route('/api/admin/database/optimize', methods=['POST'])
|
|
@admin_required
|
|
def optimize_database():
|
|
"""Führt Datenbank-Optimierung durch."""
|
|
try:
|
|
result = database_monitor.optimize_database()
|
|
return jsonify({
|
|
"success": result["success"],
|
|
"result": result
|
|
})
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler bei Datenbank-Optimierung: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route('/api/admin/database/backup', methods=['POST'])
|
|
@admin_required
|
|
def create_database_backup():
|
|
"""Erstellt ein manuelles Datenbank-Backup."""
|
|
try:
|
|
data = request.get_json() or {}
|
|
compress = data.get('compress', True)
|
|
|
|
backup_path = backup_manager.create_backup(compress=compress)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"backup_path": backup_path,
|
|
"message": "Backup erfolgreich erstellt"
|
|
})
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route('/api/admin/database/backups', methods=['GET'])
|
|
@admin_required
|
|
def list_database_backups():
|
|
"""Listet alle verfügbaren Datenbank-Backups auf."""
|
|
try:
|
|
backups = backup_manager.get_backup_list()
|
|
|
|
# Konvertiere datetime-Objekte zu Strings für JSON
|
|
for backup in backups:
|
|
backup['created'] = backup['created'].isoformat()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"backups": backups
|
|
})
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen der Backup-Liste: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route('/api/admin/database/backup/restore', methods=['POST'])
|
|
@admin_required
|
|
def restore_database_backup():
|
|
"""Stellt ein Datenbank-Backup wieder her."""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or 'backup_path' not in data:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Backup-Pfad erforderlich"
|
|
}), 400
|
|
|
|
backup_path = data['backup_path']
|
|
|
|
# Sicherheitsprüfung: Nur Backups aus dem Backup-Verzeichnis erlauben
|
|
if not backup_path.startswith(backup_manager.backup_dir):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Ungültiger Backup-Pfad"
|
|
}), 400
|
|
|
|
success = backup_manager.restore_backup(backup_path)
|
|
|
|
if success:
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Backup erfolgreich wiederhergestellt"
|
|
})
|
|
else:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "Fehler beim Wiederherstellen des Backups"
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Wiederherstellen des Backups: {str(e)}")
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route('/api/admin/database/backup/cleanup', methods=['POST'])
|
|
@admin_required
|
|
def cleanup_old_backups():
|
|
"""Löscht alte Datenbank-Backups."""
|
|
try:
|
|
backup_dir = os.path.join(os.path.dirname(__file__), 'database', 'backups')
|
|
if not os.path.exists(backup_dir):
|
|
return jsonify({"error": "Backup-Verzeichnis nicht gefunden"}), 404
|
|
|
|
# Backups älter als 30 Tage löschen
|
|
cutoff_date = datetime.now() - timedelta(days=30)
|
|
deleted_count = 0
|
|
|
|
for filename in os.listdir(backup_dir):
|
|
if filename.endswith('.sql'):
|
|
file_path = os.path.join(backup_dir, filename)
|
|
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
|
|
|
|
if file_mtime < cutoff_date:
|
|
os.remove(file_path)
|
|
deleted_count += 1
|
|
|
|
return jsonify({
|
|
"message": f"{deleted_count} alte Backups gelöscht",
|
|
"deleted_count": deleted_count
|
|
})
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Löschen alter Backups: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route('/api/admin/stats/live', methods=['GET'])
|
|
@admin_required
|
|
def get_admin_live_stats():
|
|
"""Liefert Live-Statistiken für das Admin-Dashboard."""
|
|
try:
|
|
db_session = get_db_session()
|
|
|
|
# Aktuelle Statistiken sammeln
|
|
total_users = db_session.query(User).count()
|
|
total_printers = db_session.query(Printer).count()
|
|
total_jobs = db_session.query(Job).count()
|
|
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
|
|
|
|
# Printer-Status
|
|
available_printers = db_session.query(Printer).filter(Printer.status == "available").count()
|
|
offline_printers = db_session.query(Printer).filter(Printer.status == "offline").count()
|
|
maintenance_printers = db_session.query(Printer).filter(Printer.status == "maintenance").count()
|
|
|
|
# Jobs heute
|
|
today = datetime.now().date()
|
|
jobs_today = db_session.query(Job).filter(
|
|
func.date(Job.created_at) == today
|
|
).count()
|
|
|
|
# Erfolgreiche Jobs heute
|
|
completed_jobs_today = db_session.query(Job).filter(
|
|
func.date(Job.created_at) == today,
|
|
Job.status == "completed"
|
|
).count()
|
|
|
|
db_session.close()
|
|
|
|
stats = {
|
|
"users": {
|
|
"total": total_users
|
|
},
|
|
"printers": {
|
|
"total": total_printers,
|
|
"available": available_printers,
|
|
"offline": offline_printers,
|
|
"maintenance": maintenance_printers
|
|
},
|
|
"jobs": {
|
|
"total": total_jobs,
|
|
"active": active_jobs,
|
|
"today": jobs_today,
|
|
"completed_today": completed_jobs_today
|
|
},
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
return jsonify(stats)
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen der Live-Statistiken: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route('/api/admin/system/status', methods=['GET'])
|
|
@admin_required
|
|
def get_system_status():
|
|
"""Liefert System-Status-Informationen."""
|
|
try:
|
|
import psutil
|
|
import platform
|
|
|
|
# CPU und Memory
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
# Netzwerk (vereinfacht)
|
|
network = psutil.net_io_counters()
|
|
|
|
system_info = {
|
|
"platform": platform.system(),
|
|
"platform_release": platform.release(),
|
|
"platform_version": platform.version(),
|
|
"machine": platform.machine(),
|
|
"processor": platform.processor(),
|
|
"cpu": {
|
|
"percent": cpu_percent,
|
|
"count": psutil.cpu_count()
|
|
},
|
|
"memory": {
|
|
"total": memory.total,
|
|
"available": memory.available,
|
|
"percent": memory.percent,
|
|
"used": memory.used
|
|
},
|
|
"disk": {
|
|
"total": disk.total,
|
|
"used": disk.used,
|
|
"free": disk.free,
|
|
"percent": (disk.used / disk.total) * 100
|
|
},
|
|
"network": {
|
|
"bytes_sent": network.bytes_sent,
|
|
"bytes_recv": network.bytes_recv
|
|
},
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
return jsonify(system_info)
|
|
|
|
except ImportError:
|
|
return jsonify({
|
|
"error": "psutil nicht installiert",
|
|
"message": "Systemstatus kann nicht abgerufen werden"
|
|
}), 500
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen des Systemstatus: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route('/api/admin/database/status', methods=['GET'])
|
|
@admin_required
|
|
def get_database_status():
|
|
"""Liefert Datenbank-Status-Informationen."""
|
|
try:
|
|
db_session = get_db_session()
|
|
|
|
# Tabellen-Informationen sammeln
|
|
table_stats = {}
|
|
|
|
# User-Tabelle
|
|
user_count = db_session.query(User).count()
|
|
latest_user = db_session.query(User).order_by(User.created_at.desc()).first()
|
|
|
|
# Printer-Tabelle
|
|
printer_count = db_session.query(Printer).count()
|
|
latest_printer = db_session.query(Printer).order_by(Printer.created_at.desc()).first()
|
|
|
|
# Job-Tabelle
|
|
job_count = db_session.query(Job).count()
|
|
latest_job = db_session.query(Job).order_by(Job.created_at.desc()).first()
|
|
|
|
table_stats = {
|
|
"users": {
|
|
"count": user_count,
|
|
"latest": latest_user.created_at.isoformat() if latest_user else None
|
|
},
|
|
"printers": {
|
|
"count": printer_count,
|
|
"latest": latest_printer.created_at.isoformat() if latest_printer else None
|
|
},
|
|
"jobs": {
|
|
"count": job_count,
|
|
"latest": latest_job.created_at.isoformat() if latest_job else None
|
|
}
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
# Datenbank-Dateigröße (falls SQLite)
|
|
db_file_size = None
|
|
try:
|
|
db_path = os.path.join(os.path.dirname(__file__), 'database', 'app.db')
|
|
if os.path.exists(db_path):
|
|
db_file_size = os.path.getsize(db_path)
|
|
except:
|
|
pass
|
|
|
|
status = {
|
|
"tables": table_stats,
|
|
"database_size": db_file_size,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"connection_status": "connected"
|
|
}
|
|
|
|
return jsonify(status)
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Abrufen des Datenbankstatus: {str(e)}")
|
|
return jsonify({
|
|
"error": "Datenbankfehler",
|
|
"connection_status": "error",
|
|
"timestamp": datetime.now().isoformat()
|
|
}), 500
|
|
|
|
# ===== WEITERE UI-ROUTEN =====
|
|
|
|
@app.route("/terms")
|
|
def terms():
|
|
"""Zeigt die Nutzungsbedingungen an."""
|
|
return render_template("terms.html")
|
|
|
|
@app.route("/privacy")
|
|
def privacy():
|
|
"""Zeigt die Datenschutzerklärung an."""
|
|
return render_template("privacy.html")
|
|
|
|
@app.route("/admin/users/add")
|
|
@login_required
|
|
def admin_add_user_page():
|
|
"""Zeigt die Seite zum Hinzufügen eines neuen Benutzers an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
return render_template("admin_add_user.html")
|
|
|
|
@app.route("/admin/printers/add")
|
|
@login_required
|
|
def admin_add_printer_page():
|
|
"""Zeigt die Seite zum Hinzufügen eines neuen Druckers an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
return render_template("admin_add_printer.html")
|
|
|
|
@app.route("/admin/printers/<int:printer_id>/manage")
|
|
@login_required
|
|
def admin_manage_printer_page(printer_id):
|
|
"""Zeigt die Drucker-Verwaltungsseite an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
printer = db_session.get(Printer, printer_id)
|
|
if not printer:
|
|
flash("Drucker nicht gefunden.", "error")
|
|
return redirect(url_for("admin_page"))
|
|
|
|
printer_data = {
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model or 'Unbekanntes Modell',
|
|
"location": printer.location or 'Unbekannter Standort',
|
|
"mac_address": printer.mac_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"status": printer.status or "offline",
|
|
"active": printer.active if hasattr(printer, 'active') else True,
|
|
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
return render_template("admin_manage_printer.html", printer=printer_data)
|
|
|
|
except Exception as e:
|
|
db_session.close()
|
|
app_logger.error(f"Fehler beim Laden der Drucker-Verwaltung: {str(e)}")
|
|
flash("Fehler beim Laden der Drucker-Daten.", "error")
|
|
return redirect(url_for("admin_page"))
|
|
|
|
@app.route("/admin/printers/<int:printer_id>/settings")
|
|
@login_required
|
|
def admin_printer_settings_page(printer_id):
|
|
"""Zeigt die Drucker-Einstellungsseite an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
printer = db_session.get(Printer, printer_id)
|
|
if not printer:
|
|
flash("Drucker nicht gefunden.", "error")
|
|
return redirect(url_for("admin_page"))
|
|
|
|
printer_data = {
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"model": printer.model or 'Unbekanntes Modell',
|
|
"location": printer.location or 'Unbekannter Standort',
|
|
"mac_address": printer.mac_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"status": printer.status or "offline",
|
|
"active": printer.active if hasattr(printer, 'active') else True,
|
|
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
return render_template("admin_printer_settings.html", printer=printer_data)
|
|
|
|
except Exception as e:
|
|
db_session.close()
|
|
app_logger.error(f"Fehler beim Laden der Drucker-Einstellungen: {str(e)}")
|
|
flash("Fehler beim Laden der Drucker-Daten.", "error")
|
|
return redirect(url_for("admin_page"))
|
|
|
|
# ===== ADMIN API-ROUTEN FÜR BENUTZER UND DRUCKER =====
|
|
|
|
@app.route("/api/admin/users", methods=["POST"])
|
|
@login_required
|
|
def create_user_api():
|
|
"""Erstellt einen neuen Benutzer (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Benutzer erstellen"}), 403
|
|
|
|
try:
|
|
data = request.json
|
|
|
|
# Pflichtfelder prüfen
|
|
required_fields = ["username", "email", "password"]
|
|
for field in required_fields:
|
|
if field not in data or not data[field]:
|
|
return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob bereits ein Benutzer mit diesem Benutzernamen oder E-Mail existiert
|
|
existing_user = db_session.query(User).filter(
|
|
(User.username == data["username"]) | (User.email == data["email"])
|
|
).first()
|
|
|
|
if existing_user:
|
|
db_session.close()
|
|
return jsonify({"error": "Ein Benutzer mit diesem Benutzernamen oder E-Mail existiert bereits"}), 400
|
|
|
|
# Neuen Benutzer erstellen
|
|
new_user = User(
|
|
username=data["username"],
|
|
email=data["email"],
|
|
first_name=data.get("first_name", ""),
|
|
last_name=data.get("last_name", ""),
|
|
is_admin=data.get("is_admin", False),
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
# Passwort setzen
|
|
new_user.set_password(data["password"])
|
|
|
|
db_session.add(new_user)
|
|
db_session.commit()
|
|
|
|
user_data = {
|
|
"id": new_user.id,
|
|
"username": new_user.username,
|
|
"email": new_user.email,
|
|
"first_name": new_user.first_name,
|
|
"last_name": new_user.last_name,
|
|
"is_admin": new_user.is_admin,
|
|
"created_at": new_user.created_at.isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
|
|
user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}")
|
|
return jsonify({"user": user_data}), 201
|
|
|
|
except Exception as e:
|
|
user_logger.error(f"Fehler beim Erstellen eines Benutzers: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
@app.route("/api/admin/printers/<int:printer_id>/toggle", methods=["POST"])
|
|
@login_required
|
|
def toggle_printer_power(printer_id):
|
|
"""Schaltet einen Drucker ein oder aus (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Nur Administratoren können Drucker steuern"}), 403
|
|
|
|
try:
|
|
data = request.json
|
|
power_on = data.get("power_on", True)
|
|
|
|
db_session = get_db_session()
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
|
|
if not printer:
|
|
db_session.close()
|
|
return jsonify({"error": "Drucker nicht gefunden"}), 404
|
|
|
|
# Steckdose schalten
|
|
from utils.job_scheduler import toggle_plug
|
|
success = toggle_plug(printer_id, power_on)
|
|
|
|
if success:
|
|
# Status in der Datenbank aktualisieren
|
|
printer.status = "available" if power_on else "offline"
|
|
printer.active = power_on
|
|
db_session.commit()
|
|
|
|
action = "eingeschaltet" if power_on else "ausgeschaltet"
|
|
printers_logger.info(f"Drucker {printer.name} {action} von Admin {current_user.id}")
|
|
|
|
db_session.close()
|
|
return jsonify({
|
|
"success": True,
|
|
"message": f"Drucker erfolgreich {action}",
|
|
"status": printer.status
|
|
})
|
|
else:
|
|
db_session.close()
|
|
return jsonify({"error": "Fehler beim Schalten der Steckdose"}), 500
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Schalten des Druckers {printer_id}: {str(e)}")
|
|
return jsonify({"error": "Interner Serverfehler"}), 500
|
|
|
|
# ===== ADMIN FORM ENDPOINTS =====
|
|
|
|
@app.route("/admin/users/create", methods=["POST"])
|
|
@login_required
|
|
def admin_create_user_form():
|
|
"""Erstellt einen neuen Benutzer über HTML-Form (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
try:
|
|
# Form-Daten lesen
|
|
email = request.form.get("email", "").strip()
|
|
name = request.form.get("name", "").strip()
|
|
password = request.form.get("password", "").strip()
|
|
role = request.form.get("role", "user").strip()
|
|
|
|
# Pflichtfelder prüfen
|
|
if not email or not password:
|
|
flash("E-Mail und Passwort sind erforderlich.", "error")
|
|
return redirect(url_for("admin_add_user_page"))
|
|
|
|
# E-Mail validieren
|
|
import re
|
|
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
if not re.match(email_pattern, email):
|
|
flash("Ungültige E-Mail-Adresse.", "error")
|
|
return redirect(url_for("admin_add_user_page"))
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob bereits ein Benutzer mit dieser E-Mail existiert
|
|
existing_user = db_session.query(User).filter(User.email == email).first()
|
|
if existing_user:
|
|
db_session.close()
|
|
flash("Ein Benutzer mit dieser E-Mail existiert bereits.", "error")
|
|
return redirect(url_for("admin_add_user_page"))
|
|
|
|
# E-Mail als Username verwenden (falls kein separates Username-Feld)
|
|
username = email.split('@')[0]
|
|
counter = 1
|
|
original_username = username
|
|
while db_session.query(User).filter(User.username == username).first():
|
|
username = f"{original_username}{counter}"
|
|
counter += 1
|
|
|
|
# Neuen Benutzer erstellen
|
|
new_user = User(
|
|
username=username,
|
|
email=email,
|
|
first_name=name.split(' ')[0] if name else "",
|
|
last_name=" ".join(name.split(' ')[1:]) if name and ' ' in name else "",
|
|
is_admin=(role == "admin"),
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
# Passwort setzen
|
|
new_user.set_password(password)
|
|
|
|
db_session.add(new_user)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
user_logger.info(f"Neuer Benutzer '{new_user.username}' erstellt von Admin {current_user.id}")
|
|
flash(f"Benutzer '{new_user.email}' erfolgreich erstellt.", "success")
|
|
return redirect(url_for("admin_page", tab="users"))
|
|
|
|
except Exception as e:
|
|
user_logger.error(f"Fehler beim Erstellen eines Benutzers über Form: {str(e)}")
|
|
flash("Fehler beim Erstellen des Benutzers.", "error")
|
|
return redirect(url_for("admin_add_user_page"))
|
|
|
|
@app.route("/admin/printers/create", methods=["POST"])
|
|
@login_required
|
|
def admin_create_printer_form():
|
|
"""Erstellt einen neuen Drucker über HTML-Form (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
try:
|
|
# Form-Daten lesen
|
|
name = request.form.get("name", "").strip()
|
|
ip_address = request.form.get("ip_address", "").strip()
|
|
model = request.form.get("model", "").strip()
|
|
location = request.form.get("location", "").strip()
|
|
description = request.form.get("description", "").strip()
|
|
status = request.form.get("status", "available").strip()
|
|
|
|
# Pflichtfelder prüfen
|
|
if not name or not ip_address:
|
|
flash("Name und IP-Adresse sind erforderlich.", "error")
|
|
return redirect(url_for("admin_add_printer_page"))
|
|
|
|
# IP-Adresse validieren
|
|
import re
|
|
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
|
|
if not re.match(ip_pattern, ip_address):
|
|
flash("Ungültige IP-Adresse.", "error")
|
|
return redirect(url_for("admin_add_printer_page"))
|
|
|
|
db_session = get_db_session()
|
|
|
|
# Prüfen, ob bereits ein Drucker mit diesem Namen existiert
|
|
existing_printer = db_session.query(Printer).filter(Printer.name == name).first()
|
|
if existing_printer:
|
|
db_session.close()
|
|
flash("Ein Drucker mit diesem Namen existiert bereits.", "error")
|
|
return redirect(url_for("admin_add_printer_page"))
|
|
|
|
# Neuen Drucker erstellen
|
|
new_printer = Printer(
|
|
name=name,
|
|
model=model,
|
|
location=location,
|
|
description=description,
|
|
mac_address="", # Wird später ausgefüllt
|
|
plug_ip=ip_address,
|
|
status=status,
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
db_session.add(new_printer)
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Neuer Drucker '{new_printer.name}' erstellt von Admin {current_user.id}")
|
|
flash(f"Drucker '{new_printer.name}' erfolgreich erstellt.", "success")
|
|
return redirect(url_for("admin_page", tab="printers"))
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Erstellen eines Druckers über Form: {str(e)}")
|
|
flash("Fehler beim Erstellen des Druckers.", "error")
|
|
return redirect(url_for("admin_add_printer_page"))
|
|
|
|
@app.route("/admin/users/<int:user_id>/edit", methods=["GET"])
|
|
@login_required
|
|
def admin_edit_user_page(user_id):
|
|
"""Zeigt die Benutzer-Bearbeitungsseite an."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
db_session = get_db_session()
|
|
try:
|
|
user = db_session.get(User, user_id)
|
|
if not user:
|
|
flash("Benutzer nicht gefunden.", "error")
|
|
return redirect(url_for("admin_page", tab="users"))
|
|
|
|
user_data = {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"name": user.name or "",
|
|
"is_admin": user.is_admin,
|
|
"active": user.active,
|
|
"created_at": user.created_at.isoformat() if user.created_at else datetime.now().isoformat()
|
|
}
|
|
|
|
db_session.close()
|
|
return render_template("admin_edit_user.html", user=user_data)
|
|
|
|
except Exception as e:
|
|
db_session.close()
|
|
app_logger.error(f"Fehler beim Laden der Benutzer-Daten: {str(e)}")
|
|
flash("Fehler beim Laden der Benutzer-Daten.", "error")
|
|
return redirect(url_for("admin_page", tab="users"))
|
|
|
|
@app.route("/admin/users/<int:user_id>/update", methods=["POST"])
|
|
@login_required
|
|
def admin_update_user_form(user_id):
|
|
"""Aktualisiert einen Benutzer über HTML-Form (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
try:
|
|
# Form-Daten lesen
|
|
email = request.form.get("email", "").strip()
|
|
name = request.form.get("name", "").strip()
|
|
password = request.form.get("password", "").strip()
|
|
role = request.form.get("role", "user").strip()
|
|
is_active = request.form.get("is_active", "true").strip() == "true"
|
|
|
|
# Pflichtfelder prüfen
|
|
if not email:
|
|
flash("E-Mail-Adresse ist erforderlich.", "error")
|
|
return redirect(url_for("admin_edit_user_page", user_id=user_id))
|
|
|
|
# E-Mail validieren
|
|
import re
|
|
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
if not re.match(email_pattern, email):
|
|
flash("Ungültige E-Mail-Adresse.", "error")
|
|
return redirect(url_for("admin_edit_user_page", user_id=user_id))
|
|
|
|
db_session = get_db_session()
|
|
|
|
user = db_session.query(User).get(user_id)
|
|
if not user:
|
|
db_session.close()
|
|
flash("Benutzer nicht gefunden.", "error")
|
|
return redirect(url_for("admin_page", tab="users"))
|
|
|
|
# Prüfen, ob bereits ein anderer Benutzer mit dieser E-Mail existiert
|
|
existing_user = db_session.query(User).filter(
|
|
User.email == email,
|
|
User.id != user_id
|
|
).first()
|
|
if existing_user:
|
|
db_session.close()
|
|
flash("Ein Benutzer mit dieser E-Mail-Adresse existiert bereits.", "error")
|
|
return redirect(url_for("admin_edit_user_page", user_id=user_id))
|
|
|
|
# Benutzer aktualisieren
|
|
user.email = email
|
|
if name:
|
|
user.name = name
|
|
|
|
# Passwort nur ändern, wenn eines angegeben wurde
|
|
if password:
|
|
user.password_hash = generate_password_hash(password)
|
|
|
|
user.role = "admin" if role == "admin" else "user"
|
|
user.active = is_active
|
|
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
auth_logger.info(f"Benutzer '{user.email}' (ID: {user_id}) aktualisiert von Admin {current_user.id}")
|
|
flash(f"Benutzer '{user.email}' erfolgreich aktualisiert.", "success")
|
|
return redirect(url_for("admin_page", tab="users"))
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler beim Aktualisieren eines Benutzers über Form: {str(e)}")
|
|
flash("Fehler beim Aktualisieren des Benutzers.", "error")
|
|
return redirect(url_for("admin_edit_user_page", user_id=user_id))
|
|
|
|
@app.route("/admin/printers/<int:printer_id>/update", methods=["POST"])
|
|
@login_required
|
|
def admin_update_printer_form(printer_id):
|
|
"""Aktualisiert einen Drucker über HTML-Form (nur für Admins)."""
|
|
if not current_user.is_admin:
|
|
flash("Sie haben keine Berechtigung für den Admin-Bereich.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
try:
|
|
# Form-Daten lesen
|
|
name = request.form.get("name", "").strip()
|
|
ip_address = request.form.get("ip_address", "").strip()
|
|
model = request.form.get("model", "").strip()
|
|
location = request.form.get("location", "").strip()
|
|
description = request.form.get("description", "").strip()
|
|
status = request.form.get("status", "available").strip()
|
|
|
|
# Pflichtfelder prüfen
|
|
if not name or not ip_address:
|
|
flash("Name und IP-Adresse sind erforderlich.", "error")
|
|
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
|
|
|
|
# IP-Adresse validieren
|
|
import re
|
|
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
|
|
if not re.match(ip_pattern, ip_address):
|
|
flash("Ungültige IP-Adresse.", "error")
|
|
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
|
|
|
|
db_session = get_db_session()
|
|
|
|
printer = db_session.query(Printer).get(printer_id)
|
|
if not printer:
|
|
db_session.close()
|
|
flash("Drucker nicht gefunden.", "error")
|
|
return redirect(url_for("admin_page", tab="printers"))
|
|
|
|
# Prüfen, ob bereits ein anderer Drucker mit diesem Namen existiert
|
|
existing_printer = db_session.query(Printer).filter(
|
|
Printer.name == name,
|
|
Printer.id != printer_id
|
|
).first()
|
|
if existing_printer:
|
|
db_session.close()
|
|
flash("Ein Drucker mit diesem Namen existiert bereits.", "error")
|
|
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
|
|
|
|
# Drucker aktualisieren
|
|
printer.name = name
|
|
printer.model = model
|
|
printer.location = location
|
|
printer.description = description
|
|
printer.plug_ip = ip_address
|
|
printer.status = status
|
|
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
printers_logger.info(f"Drucker '{printer.name}' (ID: {printer_id}) aktualisiert von Admin {current_user.id}")
|
|
flash(f"Drucker '{printer.name}' erfolgreich aktualisiert.", "success")
|
|
return redirect(url_for("admin_manage_printer_page", printer_id=printer_id))
|
|
|
|
except Exception as e:
|
|
printers_logger.error(f"Fehler beim Aktualisieren eines Druckers über Form: {str(e)}")
|
|
flash("Fehler beim Aktualisieren des Druckers.", "error")
|
|
return redirect(url_for("admin_printer_settings_page", printer_id=printer_id))
|
|
|
|
# ===== STARTUP UND MAIN =====
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
# Debug-Modus prüfen
|
|
debug_mode = len(sys.argv) > 1 and sys.argv[1] == "--debug"
|
|
|
|
try:
|
|
# Datenbank initialisieren
|
|
init_database()
|
|
create_initial_admin()
|
|
|
|
# Template-Hilfsfunktionen registrieren
|
|
register_template_helpers(app)
|
|
|
|
# Scheduler starten (falls aktiviert)
|
|
if SCHEDULER_ENABLED:
|
|
try:
|
|
scheduler.start()
|
|
app_logger.info("Job-Scheduler gestartet")
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Starten des Schedulers: {str(e)}")
|
|
|
|
if debug_mode:
|
|
# Debug-Modus: HTTP auf Port 5000
|
|
app_logger.info("Starte Debug-Server auf 0.0.0.0:5000 (HTTP)")
|
|
app.run(
|
|
host="0.0.0.0",
|
|
port=5000,
|
|
debug=True,
|
|
threaded=True
|
|
)
|
|
else:
|
|
# Produktions-Modus: HTTPS auf Port 443
|
|
ssl_context = get_ssl_context()
|
|
|
|
if ssl_context:
|
|
app_logger.info("Starte HTTPS-Server auf 0.0.0.0:443")
|
|
app.run(
|
|
host="0.0.0.0",
|
|
port=443,
|
|
debug=False,
|
|
ssl_context=ssl_context,
|
|
threaded=True
|
|
)
|
|
else:
|
|
app_logger.info("Starte HTTP-Server auf 0.0.0.0:80")
|
|
app.run(
|
|
host="0.0.0.0",
|
|
port=80,
|
|
debug=False,
|
|
threaded=True
|
|
)
|
|
|
|
except Exception as e:
|
|
app_logger.error(f"Fehler beim Starten der Anwendung: {str(e)}")
|
|
sys.exit(1) |