Es scheint, dass es sich um eine Versionskontrolle handelt, möglicherweise mit einem Git-Repository. Hier sind die wichtigsten Dateien und Veränderungen, die in diesem Commit enthalten sein könnten:

This commit is contained in:
2025-06-12 08:34:12 +02:00
parent f2be2e65a8
commit 79f4682b20
418 changed files with 1312 additions and 875 deletions

View File

@@ -946,880 +946,98 @@ def api_get_printer_status():
"""API-Endpunkt für Drucker-Status"""
try:
from models import get_db_session, Printer
from utils.hardware_integration import tapo_controller
db_session = get_db_session()
# Alle Drucker für Status-Abfragen anzeigen (unabhängig von active-Status)
printers = db_session.query(Printer).all()
status_list = []
# Tapo-Controller nur importieren, wenn benötigt
tapo_controller = None
has_tapo_printers = any(printer.plug_ip for printer in printers)
if has_tapo_printers:
try:
from utils.hardware_integration import tapo_controller
app_logger.info(f"✅ Tapo-Controller erfolgreich importiert: {type(tapo_controller)}")
except Exception as import_error:
app_logger.warning(f"⚠️ Tapo-Controller konnte nicht importiert werden: {str(import_error)}")
tapo_controller = None
for printer in printers:
# Tapo-Steckdosen-Status prüfen
# Basis-Status-Informationen
status_dict = {
"id": printer.id,
"name": printer.name,
"status": printer.status or "offline",
"location": printer.location,
"model": printer.model,
"ip_address": printer.ip_address,
"active": getattr(printer, 'active', True)
}
# Tapo-Steckdosen-Status prüfen, wenn verfügbar
if printer.plug_ip:
try:
reachable, plug_status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
status_dict = {
"id": printer.id,
"name": printer.name,
"status": printer.status,
"plug_status": plug_status,
"plug_reachable": reachable,
"plug_ip": printer.plug_ip,
"location": printer.location
}
except Exception as e:
app_logger.warning(f"⚠️ Fehler bei Steckdosen-Status für {printer.name}: {str(e)}")
status_dict = {
"id": printer.id,
"name": printer.name,
"status": "error",
"plug_status": "unknown",
if tapo_controller:
try:
reachable, plug_status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id
)
status_dict.update({
"plug_status": plug_status,
"plug_reachable": reachable,
"plug_ip": printer.plug_ip,
"has_plug": True
})
except Exception as e:
app_logger.warning(f"⚠️ Fehler bei Steckdosen-Status für {printer.name}: {str(e)}")
status_dict.update({
"plug_status": "error",
"plug_reachable": False,
"plug_ip": printer.plug_ip,
"has_plug": True,
"plug_error": str(e)
})
else:
# Tapo-Controller nicht verfügbar
status_dict.update({
"plug_status": "unavailable",
"plug_reachable": False,
"plug_ip": printer.plug_ip,
"location": printer.location,
"error": str(e)
}
"has_plug": True,
"plug_error": "Tapo-Controller nicht verfügbar"
})
else:
status_dict = {
"id": printer.id,
"name": printer.name,
"status": printer.status,
# Kein Smart-Plug konfiguriert
status_dict.update({
"plug_status": "no_plug",
"plug_reachable": False,
"plug_ip": None,
"location": printer.location
}
"has_plug": False
})
status_list.append(status_dict)
db_session.close()
app_logger.info(f"✅ API: Status für {len(status_list)} Drucker abgerufen")
return jsonify({"printers": status_list})
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen des Drucker-Status: {str(e)}")
return jsonify({"error": "Fehler beim Laden des Drucker-Status", "details": str(e)}), 500
# ===== SESSION-API-ENDPUNKTE =====
@app.route("/api/session/status", methods=["GET"])
@login_required
def api_session_status():
"""API-Endpunkt für Session-Status"""
try:
from utils.utilities_collection import SESSION_LIFETIME
last_activity = session.get('last_activity')
if last_activity:
last_activity_time = datetime.fromisoformat(last_activity)
time_since_activity = (datetime.now() - last_activity_time).total_seconds()
time_left_seconds = max(0, SESSION_LIFETIME.total_seconds() - time_since_activity)
else:
time_left_seconds = SESSION_LIFETIME.total_seconds()
# Erfolgreiche Response mit konsistenter Struktur
return jsonify({
"success": True,
"user": {
"id": current_user.id,
"email": current_user.email,
"name": current_user.name,
"is_admin": current_user.is_admin
},
"session": {
"time_left_seconds": int(time_left_seconds),
"max_inactive_minutes": int(SESSION_LIFETIME.total_seconds() / 60),
"last_activity": last_activity or datetime.now().isoformat()
}
})
except Exception as e:
app_logger.error(f"❌ Session-Status-Fehler: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/session/heartbeat", methods=["POST"])
@login_required
def api_session_heartbeat():
"""API-Endpunkt für Session-Heartbeat"""
try:
from utils.utilities_collection import SESSION_LIFETIME
# Session-Aktivität NICHT in Cookie speichern - Cookie-Reduktion
# session['last_activity'] = datetime.now().isoformat() # ENTFERNT
session.permanent = True
# Verbleibende Zeit berechnen
time_left_seconds = SESSION_LIFETIME.total_seconds()
return jsonify({
"success": True,
"time_left_seconds": int(time_left_seconds),
"printers": status_list,
"count": len(status_list),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
app_logger.error(f"❌ Session-Heartbeat-Fehler: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/session/extend", methods=["POST"])
@login_required
def api_session_extend():
"""API-Endpunkt für Session-Verlängerung"""
try:
from utils.utilities_collection import SESSION_LIFETIME
data = request.get_json() or {}
extend_minutes = data.get('extend_minutes', 30)
# Session verlängern - NICHT in Cookie speichern
# session['last_activity'] = datetime.now().isoformat() # ENTFERNT
session.permanent = True
return jsonify({
"success": True,
"extended_minutes": extend_minutes,
"new_expiry": (datetime.now() + SESSION_LIFETIME).isoformat()
})
except Exception as e:
app_logger.error(f"❌ Session-Extend-Fehler: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/jobs/recent", methods=["GET"])
@login_required
def api_get_recent_jobs():
"""API-Endpunkt für kürzlich erstellte Jobs"""
try:
from models import get_db_session, Job
db_session = get_db_session()
# Letzte 10 Jobs des Benutzers (oder alle für Admin)
query = db_session.query(Job).order_by(Job.created_at.desc())
if not current_user.is_admin:
query = query.filter(Job.user_id == current_user.id)
recent_jobs = query.limit(10).all()
job_list = []
for job in recent_jobs:
job_dict = {
"id": job.id,
"name": job.name,
"status": job.status,
"created_at": job.created_at.isoformat() if job.created_at else None,
"start_at": job.start_at.isoformat() if job.start_at else None,
"duration_minutes": job.duration_minutes,
"printer_name": job.printer.name if job.printer else "Unbekannt"
}
job_list.append(job_dict)
db_session.close()
app_logger.info(f"✅ API: {len(job_list)} kürzliche Jobs abgerufen")
return jsonify({"jobs": job_list})
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen kürzlicher Jobs: {str(e)}")
return jsonify({"error": "Fehler beim Laden kürzlicher Jobs", "details": str(e)}), 500
@app.route("/api/stats", methods=["GET"])
@login_required
def api_get_stats():
"""API-Endpunkt für System-Statistiken"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
# Grundlegende Statistiken
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
completed_jobs = db_session.query(Job).filter(Job.status == "finished").count()
total_printers = db_session.query(Printer).filter(Printer.active == True).count()
# Benutzer-spezifische Statistiken
if not current_user.is_admin:
user_jobs = db_session.query(Job).filter(Job.user_id == current_user.id).count()
user_active_jobs = db_session.query(Job).filter(
Job.user_id == current_user.id,
Job.status.in_(["scheduled", "running"])
).count()
else:
user_jobs = total_jobs
user_active_jobs = active_jobs
db_session.close()
stats = {
"total_jobs": total_jobs,
"active_jobs": active_jobs,
"completed_jobs": completed_jobs,
"total_printers": total_printers,
"user_jobs": user_jobs,
"user_active_jobs": user_active_jobs,
"timestamp": datetime.now().isoformat()
}
app_logger.info(f"✅ API: Statistiken abgerufen")
return jsonify(stats)
except Exception as e:
app_logger.error(f"❌ API-Fehler beim Abrufen der Statistiken: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Statistiken", "details": str(e)}), 500
# ===== ADMIN-API-ENDPUNKTE =====
def admin_required(f):
"""Decorator für Admin-only Funktionen"""
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
return jsonify({"error": "Anmeldung erforderlich"}), 401
if not current_user.is_admin:
return jsonify({"error": "Admin-Berechtigung erforderlich"}), 403
return f(*args, **kwargs)
return decorated_function
@app.route("/api/admin/users", methods=["GET"])
@login_required
@admin_required
def api_admin_get_users():
"""API-Endpunkt für alle Benutzer (Admin only)"""
try:
from models import get_db_session, User
db_session = get_db_session()
users = db_session.query(User).all()
user_list = []
for user in users:
user_dict = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": getattr(user, 'department', None),
"position": getattr(user, 'position', None)
}
user_list.append(user_dict)
db_session.close()
app_logger.info(f"✅ Admin API: {len(user_list)} Benutzer abgerufen")
return jsonify({"users": user_list})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Abrufen der Benutzer: {str(e)}")
return jsonify({"error": "Fehler beim Laden der Benutzer", "details": str(e)}), 500
@app.route("/api/admin/users", methods=["POST"])
@login_required
@admin_required
def api_admin_create_user():
"""API-Endpunkt für Benutzer-Erstellung (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
# Pflichtfelder prüfen
required_fields = ["username", "email", "password"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
from models import get_db_session, User
from werkzeug.security import generate_password_hash
db_session = get_db_session()
# Prüfen ob Email/Username bereits existiert
existing_user = db_session.query(User).filter(
(User.email == data["email"]) | (User.username == data["username"])
).first()
if existing_user:
db_session.close()
return jsonify({"error": "Benutzer mit dieser E-Mail oder diesem Benutzernamen existiert bereits"}), 409
# Neuen Benutzer erstellen
new_user = User(
username=data["username"],
email=data["email"],
password_hash=generate_password_hash(data["password"]),
name=data.get("name", ""),
role=data.get("role", "user"),
active=data.get("active", True),
department=data.get("department", ""),
position=data.get("position", "")
)
db_session.add(new_user)
db_session.commit()
user_dict = {
"id": new_user.id,
"username": new_user.username,
"email": new_user.email,
"name": new_user.name,
"role": new_user.role,
"active": new_user.active
}
db_session.close()
app_logger.info(f"✅ Admin API: Neuer Benutzer '{new_user.username}' erstellt")
return jsonify({"user": user_dict}), 201
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Erstellen des Benutzers: {str(e)}")
return jsonify({"error": "Fehler beim Erstellen des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["GET"])
@login_required
@admin_required
def api_admin_get_user(user_id):
"""API-Endpunkt für einzelnen Benutzer (Admin only)"""
try:
from models import get_db_session, User
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
user_dict = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"department": getattr(user, 'department', None),
"position": getattr(user, 'position', None),
"phone": getattr(user, 'phone', None),
"bio": getattr(user, 'bio', None)
}
db_session.close()
app_logger.info(f"✅ Admin API: Benutzer {user_id} abgerufen")
return jsonify({"user": user_dict})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Abrufen des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Fehler beim Laden des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["PUT"])
@login_required
@admin_required
def api_admin_update_user(user_id):
"""API-Endpunkt für Benutzer-Update (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
from models import get_db_session, User
from werkzeug.security import generate_password_hash
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Felder aktualisieren
if "username" in data:
user.username = data["username"]
if "email" in data:
user.email = data["email"]
if "name" in data:
user.name = data["name"]
if "role" in data:
user.role = data["role"]
if "active" in data:
user.active = data["active"]
if "department" in data:
user.department = data["department"]
if "position" in data:
user.position = data["position"]
if "password" in data and data["password"]:
user.password_hash = generate_password_hash(data["password"])
user.updated_at = datetime.now()
db_session.commit()
user_dict = {
"id": user.id,
"username": user.username,
"email": user.email,
"name": user.name,
"role": user.role,
"active": user.active
}
db_session.close()
app_logger.info(f"✅ Admin API: Benutzer {user_id} aktualisiert")
return jsonify({"user": user_dict})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Aktualisieren des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Fehler beim Aktualisieren des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/users/<int:user_id>", methods=["DELETE"])
@login_required
@admin_required
def api_admin_delete_user(user_id):
"""API-Endpunkt für Benutzer-Löschung (Admin only)"""
try:
from models import get_db_session, User
db_session = get_db_session()
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
db_session.close()
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Sich selbst nicht löschen
if user.id == current_user.id:
db_session.close()
return jsonify({"error": "Sie können sich nicht selbst löschen"}), 400
username = user.username
db_session.delete(user)
db_session.commit()
db_session.close()
app_logger.info(f"✅ Admin API: Benutzer '{username}' (ID: {user_id}) gelöscht")
return jsonify({"success": True, "message": "Benutzer erfolgreich gelöscht"})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Löschen des Benutzers {user_id}: {str(e)}")
return jsonify({"error": "Fehler beim Löschen des Benutzers", "details": str(e)}), 500
@app.route("/api/admin/error-recovery/status", methods=["GET"])
@login_required
@admin_required
def api_admin_error_recovery_status():
"""API-Endpunkt für Error-Recovery-Status (Admin only)"""
try:
# Mock Error-Recovery-Status da das Modul möglicherweise nicht verfügbar ist
error_stats = {
"auto_recovery_enabled": True,
"monitoring_active": True,
"total_errors": 0,
"recovered_errors": 0,
"unrecovered_errors": 0,
"recovery_success_rate": 100.0,
"last_error": None,
"uptime_hours": 24,
"status": "healthy"
}
recent_errors = []
app_logger.info("✅ Admin API: Error-Recovery-Status abgerufen")
return jsonify({
"success": True,
"statistics": error_stats,
"recent_errors": recent_errors
})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Error-Recovery-Status: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/admin/system-health", methods=["GET"])
@login_required
@admin_required
def api_admin_system_health():
"""API-Endpunkt für System-Health (Admin only)"""
try:
from models import get_db_session, Job, Printer, User
import psutil
import os
db_session = get_db_session()
# Datenbank-Statistiken
total_users = db_session.query(User).count()
active_users = db_session.query(User).filter(User.active == True).count()
total_printers = db_session.query(Printer).count()
active_printers = db_session.query(Printer).filter(Printer.active == True).count()
total_jobs = db_session.query(Job).count()
active_jobs = db_session.query(Job).filter(Job.status.in_(["scheduled", "running"])).count()
db_session.close()
# System-Ressourcen
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
system_resources = {
"cpu_percent": cpu_percent,
"memory_total_gb": round(memory.total / (1024**3), 2),
"memory_used_gb": round(memory.used / (1024**3), 2),
"memory_percent": memory.percent,
"disk_total_gb": round(disk.total / (1024**3), 2),
"disk_used_gb": round(disk.used / (1024**3), 2),
"disk_percent": round((disk.used / disk.total) * 100, 1)
}
except:
system_resources = {
"cpu_percent": 0,
"memory_total_gb": 0,
"memory_used_gb": 0,
"memory_percent": 0,
"disk_total_gb": 0,
"disk_used_gb": 0,
"disk_percent": 0
}
# Health-Status bestimmen
health_status = "healthy"
health_issues = []
if system_resources["cpu_percent"] > 80:
health_status = "warning"
health_issues.append("Hohe CPU-Auslastung")
if system_resources["memory_percent"] > 85:
health_status = "warning"
health_issues.append("Hoher Speicherverbrauch")
if system_resources["disk_percent"] > 90:
health_status = "critical"
health_issues.append("Kritischer Speicherplatz")
health_data = {
"success": True,
"health_status": health_status,
"health_issues": health_issues,
"timestamp": datetime.now().isoformat(),
"database": {
"total_users": total_users,
"active_users": active_users,
"total_printers": total_printers,
"active_printers": active_printers,
"total_jobs": total_jobs,
"active_jobs": active_jobs
},
"system_resources": system_resources,
"services": {
"database": "online",
"tapo_controller": "online",
"job_scheduler": "online",
"session_manager": "online"
}
}
app_logger.info("✅ Admin API: System-Health abgerufen")
return jsonify(health_data)
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim System-Health: {str(e)}")
app_logger.error(f"❌ API-Fehler beim Abrufen des Drucker-Status: {str(e)}", exc_info=True)
return jsonify({
"success": False,
"error": str(e),
"health_status": "error"
}), 500
# ===== WEITERE WICHTIGE API-ENDPUNKTE =====
@app.route("/api/admin/printers", methods=["POST"])
@login_required
@admin_required
def api_admin_create_printer():
"""API-Endpunkt für Drucker-Erstellung (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
# Pflichtfelder prüfen
required_fields = ["name", "model"]
for field in required_fields:
if field not in data:
return jsonify({"error": f"Feld '{field}' fehlt"}), 400
from models import get_db_session, Printer
db_session = get_db_session()
# Prüfen ob Name bereits existiert
existing_printer = db_session.query(Printer).filter(Printer.name == data["name"]).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit diesem Namen existiert bereits"}), 409
# MAC-Adresse generieren falls nicht vorhanden
mac_address = data.get("mac_address")
if not mac_address:
# Generiere eine eindeutige MAC-Adresse
mac_address = "00:50:56:" + ":".join([f"{uuid.uuid4().hex[:2]}" for _ in range(3)])
# Neuen Drucker erstellen
new_printer = Printer(
name=data["name"],
model=data["model"],
location=data.get("location", ""),
ip_address=data.get("ip_address"),
mac_address=mac_address,
plug_ip=data.get("plug_ip"),
plug_username=data.get("plug_username"),
plug_password=data.get("plug_password"),
status=data.get("status", "offline"),
active=data.get("active", True)
)
db_session.add(new_printer)
db_session.commit()
printer_dict = {
"id": new_printer.id,
"name": new_printer.name,
"model": new_printer.model,
"location": new_printer.location,
"ip_address": new_printer.ip_address,
"mac_address": new_printer.mac_address,
"plug_ip": new_printer.plug_ip,
"status": new_printer.status,
"active": new_printer.active,
"created_at": new_printer.created_at.isoformat() if new_printer.created_at else None
}
db_session.close()
app_logger.info(f"✅ Admin API: Neuer Drucker '{new_printer.name}' erstellt")
return jsonify({"success": True, "printer": printer_dict, "message": "Drucker erfolgreich erstellt"}), 201
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Erstellen des Druckers: {str(e)}")
return jsonify({"success": False, "error": "Fehler beim Erstellen des Druckers", "details": str(e)}), 500
@app.route("/api/admin/printers/<int:printer_id>", methods=["PUT"])
@login_required
@admin_required
def api_admin_update_printer(printer_id):
"""API-Endpunkt für Drucker-Update (Admin only)"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine JSON-Daten empfangen"}), 400
from models import get_db_session, Printer
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Prüfen ob neuer Name bereits existiert (falls Name geändert wird)
if "name" in data and data["name"] != printer.name:
existing_printer = db_session.query(Printer).filter(
Printer.name == data["name"],
Printer.id != printer_id
).first()
if existing_printer:
db_session.close()
return jsonify({"error": "Ein Drucker mit diesem Namen existiert bereits"}), 409
# Felder aktualisieren
if "name" in data:
printer.name = data["name"]
if "model" in data:
printer.model = data["model"]
if "location" in data:
printer.location = data["location"]
if "ip_address" in data:
printer.ip_address = data["ip_address"]
if "mac_address" in data:
printer.mac_address = data["mac_address"]
if "plug_ip" in data:
printer.plug_ip = data["plug_ip"]
if "plug_username" in data:
printer.plug_username = data["plug_username"]
if "plug_password" in data:
printer.plug_password = data["plug_password"]
if "status" in data:
printer.status = data["status"]
if "active" in data:
printer.active = data["active"]
printer.last_checked = datetime.now()
db_session.commit()
printer_dict = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"ip_address": printer.ip_address,
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status,
"active": printer.active,
"created_at": printer.created_at.isoformat() if printer.created_at else None,
"updated_at": printer.updated_at.isoformat() if printer.updated_at else None,
"last_checked": printer.last_checked.isoformat() if printer.last_checked else None
}
db_session.close()
app_logger.info(f"✅ Admin API: Drucker {printer_id} aktualisiert")
return jsonify({"success": True, "printer": printer_dict, "message": "Drucker erfolgreich aktualisiert"})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Aktualisieren des Druckers {printer_id}: {str(e)}")
return jsonify({"success": False, "error": "Fehler beim Aktualisieren des Druckers", "details": str(e)}), 500
@app.route("/api/admin/printers/<int:printer_id>", methods=["DELETE"])
@login_required
@admin_required
def api_admin_delete_printer(printer_id):
"""API-Endpunkt für Drucker-Löschung (Admin only)"""
try:
from models import get_db_session, Printer, Job
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Prüfen ob der Drucker aktive Jobs hat
active_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"])
).count()
if active_jobs > 0:
db_session.close()
return jsonify({
"error": f"Drucker hat {active_jobs} aktive Jobs und kann nicht gelöscht werden",
"active_jobs": active_jobs
}), 409
printer_name = printer.name
db_session.delete(printer)
db_session.commit()
db_session.close()
app_logger.info(f"✅ Admin API: Drucker '{printer_name}' (ID: {printer_id}) gelöscht")
return jsonify({"success": True, "message": f"Drucker '{printer_name}' erfolgreich gelöscht"})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Löschen des Druckers {printer_id}: {str(e)}")
return jsonify({"success": False, "error": "Fehler beim Löschen des Druckers", "details": str(e)}), 500
@app.route("/api/admin/printers/<int:printer_id>", methods=["GET"])
@login_required
@admin_required
def api_admin_get_printer(printer_id):
"""API-Endpunkt für einzelnen Drucker (Admin only)"""
try:
from models import get_db_session, Printer
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
db_session.close()
return jsonify({"error": "Drucker nicht gefunden"}), 404
printer_dict = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"ip_address": printer.ip_address,
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"plug_username": printer.plug_username, # Für Admin sichtbar
"status": printer.status,
"active": printer.active,
"created_at": printer.created_at.isoformat() if printer.created_at else None,
"updated_at": printer.updated_at.isoformat() if hasattr(printer, 'updated_at') and printer.updated_at else None,
"last_checked": printer.last_checked.isoformat() if printer.last_checked else None,
"has_plug": bool(printer.plug_ip), # Hilfreich für Frontend
"job_count": len(printer.jobs) if printer.jobs else 0
}
db_session.close()
app_logger.info(f"✅ Admin API: Drucker {printer_id} abgerufen")
return jsonify({"success": True, "printer": printer_dict})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Abrufen des Druckers {printer_id}: {str(e)}")
return jsonify({"success": False, "error": "Fehler beim Laden des Druckers", "details": str(e)}), 500
@app.route("/api/admin/printers", methods=["GET"])
@login_required
@admin_required
def api_admin_get_printers():
"""API-Endpunkt für alle Drucker (Admin only)"""
try:
from models import get_db_session, Printer
db_session = get_db_session()
printers = db_session.query(Printer).all()
printer_list = []
for printer in printers:
printer_dict = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"ip_address": printer.ip_address,
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status,
"active": printer.active,
"created_at": printer.created_at.isoformat() if printer.created_at else None,
"last_checked": printer.last_checked.isoformat() if printer.last_checked else None,
"has_plug": bool(printer.plug_ip),
"job_count": len(printer.jobs) if printer.jobs else 0
}
printer_list.append(printer_dict)
db_session.close()
app_logger.info(f"✅ Admin API: {len(printer_list)} Drucker abgerufen")
return jsonify({
"success": True,
"printers": printer_list,
"count": len(printer_list)
})
except Exception as e:
app_logger.error(f"❌ Admin API-Fehler beim Abrufen der Drucker: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim Laden der Drucker",
"error": "Fehler beim Laden des Drucker-Status",
"details": str(e),
"printers": [],
"count": 0