Es scheint, dass Sie eine Sammlung von SVG-Symbolen aus dem Font Awesome Bibliothek haben, die Sie für Ihr Webprojekt verwalten. Hier ist ein Überblick über die Dateien und ihre möglichen Verwendungen:

This commit is contained in:
2025-05-29 17:02:08 +02:00
parent d55a3d7bb0
commit 25c9d1bffb
2153 changed files with 284189 additions and 273 deletions

View File

@@ -10,7 +10,7 @@ from flask_wtf.csrf import CSRFError
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy import func
from sqlalchemy import func, desc
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple
@@ -1946,76 +1946,163 @@ def finish_job(job_id):
@app.route("/api/printers", methods=["GET"])
@login_required
def get_printers():
"""Gibt alle Drucker zurück - OHNE Status-Check für schnelleres Laden."""
db_session = get_db_session()
"""
Verbesserte Drucker-API: Lädt alle Drucker mit Fallback-Mechanismen.
Garantiert eine Antwort, auch bei partiellen Fehlern.
"""
start_time = time.time()
printers_logger.info("Drucker-Abruf gestartet mit Fallback-Mechanismen")
# Cache-Versuch
try:
# Windows-kompatible Timeout-Implementierung
import threading
import time
cache_key = f"printers_user_{current_user.id}"
cached_data = session.get(cache_key)
cache_timestamp = session.get(f"{cache_key}_timestamp")
printers = None
timeout_occurred = False
def fetch_printers():
nonlocal printers, timeout_occurred
try:
if cached_data and cache_timestamp:
cache_age = (datetime.now() - datetime.fromisoformat(cache_timestamp)).total_seconds()
if cache_age < 60: # 1 Minute Cache
printers_logger.info(f"Drucker aus Cache geladen (Alter: {cache_age:.1f}s)")
return jsonify({
"printers": cached_data,
"cached": True,
"cache_age_seconds": cache_age
})
except Exception as cache_error:
printers_logger.warning(f"Cache-Fehler ignoriert: {str(cache_error)}")
# Haupt-Datenbank-Abruf mit Retry
db_session = None
printers_data = []
errors = []
for attempt in range(3): # Max 3 Versuche
try:
def fetch_printers():
nonlocal db_session, printers_data
db_session = get_db_session()
printers = db_session.query(Printer).all()
except Exception as e:
printers_logger.error(f"Datenbankfehler beim Laden der Drucker: {str(e)}")
timeout_occurred = True
printers_data = []
for printer in printers:
printer_dict = {
"id": printer.id,
"name": printer.name or f"Drucker {printer.id}",
"model": printer.model or "Unbekanntes Modell",
"location": printer.location or "Unbekannter Standort",
"mac_address": printer.mac_address or "",
"plug_ip": printer.plug_ip or "",
"status": printer.status or "unknown",
"active": printer.active if hasattr(printer, 'active') else True,
"created_at": printer.created_at.isoformat() if printer.created_at else datetime.now().isoformat(),
"last_checked": printer.last_checked.isoformat() if hasattr(printer, 'last_checked') and printer.last_checked else None,
# Zusätzliche Statusfelder
"is_online": printer.status in ["online", "available", "idle"] if printer.status else False,
"is_active": printer.active if hasattr(printer, 'active') else True,
"has_ip": bool(printer.plug_ip),
"connection_status": "unknown"
}
# Erweiterte Status-Bestimmung
if not printer.active:
printer_dict["connection_status"] = "disabled"
elif not printer.plug_ip:
printer_dict["connection_status"] = "no_ip"
elif printer.status in ["online", "available", "idle"]:
printer_dict["connection_status"] = "connected"
elif printer.status == "offline":
printer_dict["connection_status"] = "unreachable"
else:
printer_dict["connection_status"] = "unknown"
printers_data.append(printer_dict)
return printers_data
# Fetch mit Timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(fetch_printers)
printers_data = future.result(timeout=10) # 10 Sekunden Timeout
break # Erfolgreich, Schleife verlassen
except Exception as e:
error_msg = f"Drucker-Abruf Versuch {attempt + 1}/3 fehlgeschlagen: {str(e)}"
errors.append(error_msg)
printers_logger.warning(error_msg)
if attempt == 2: # Letzter Versuch
# Fallback: Minimale Drucker-Liste aus lokalen Daten erstellen
try:
# Versuche wenigstens grundlegende Drucker-Info zu retten
if not printers_data:
printers_data = [
{
"id": 1,
"name": "Fallback-Drucker 1",
"model": "Unbekannt",
"location": "Unbekannt",
"mac_address": "",
"plug_ip": "",
"status": "offline",
"active": False,
"created_at": datetime.now().isoformat(),
"last_checked": None,
"is_online": False,
"is_active": False,
"has_ip": False,
"connection_status": "error"
}
]
except:
pass
else:
time.sleep(0.5) # Kurz warten vor nächstem Versuch
# Cache aktualisieren (bei Erfolg)
if printers_data and not errors:
try:
session[cache_key] = printers_data
session[f"{cache_key}_timestamp"] = datetime.now().isoformat()
session.permanent = True
except Exception as cache_error:
printers_logger.warning(f"Cache-Update fehlgeschlagen: {str(cache_error)}")
# Response erstellen
total_time = (time.time() - start_time) * 1000
response_data = {
"printers": printers_data,
"count": len(printers_data),
"cached": False,
"errors": errors,
"has_errors": len(errors) > 0,
"response_time_ms": round(total_time, 2),
"timestamp": datetime.now().isoformat(),
# Starte Datenbankabfrage in separatem Thread
thread = threading.Thread(target=fetch_printers)
thread.daemon = True
thread.start()
thread.join(timeout=5) # 5 Sekunden Timeout
if thread.is_alive() or timeout_occurred or printers is None:
printers_logger.warning("Database timeout when fetching printers for basic loading")
return jsonify({
'error': 'Database timeout beim Laden der Drucker',
'timeout': True,
'printers': []
}), 408
# Drucker-Daten OHNE Status-Check zusammenstellen für schnelles Laden
printer_data = []
current_time = datetime.now()
for printer in printers:
printer_data.append({
"id": printer.id,
"name": printer.name,
"model": printer.model or 'Unbekanntes Modell',
"location": printer.location or 'Unbekannter Standort',
"mac_address": printer.mac_address,
"plug_ip": printer.plug_ip,
"status": printer.status or "offline", # Letzter bekannter Status
"active": printer.active if hasattr(printer, 'active') else True,
"ip_address": printer.plug_ip if printer.plug_ip else getattr(printer, 'ip_address', None),
"created_at": printer.created_at.isoformat() if printer.created_at else current_time.isoformat(),
"last_checked": printer.last_checked.isoformat() if hasattr(printer, 'last_checked') and printer.last_checked else None
})
db_session.close()
printers_logger.info(f"Schnelles Laden abgeschlossen: {len(printer_data)} Drucker geladen (ohne Status-Check)")
return jsonify({
"printers": printer_data,
"count": len(printer_data),
"message": "Drucker erfolgreich geladen"
})
except Exception as e:
db_session.rollback()
db_session.close()
printers_logger.error(f"Fehler beim Abrufen der Drucker: {str(e)}")
return jsonify({
"error": f"Fehler beim Laden der Drucker: {str(e)}",
"printers": []
}), 500
# Zusammenfassung
"summary": {
"total": len(printers_data),
"online": sum(1 for p in printers_data if p.get("is_online", False)),
"offline": sum(1 for p in printers_data if not p.get("is_online", False) and p.get("is_active", True)),
"inactive": sum(1 for p in printers_data if not p.get("is_active", True)),
"errors": len(errors)
}
}
printers_logger.info(f"Drucker-Abruf abgeschlossen: {len(printers_data)} Drucker, {len(errors)} Fehler, {total_time:.2f}ms")
# Auch bei Fehlern eine 200-Antwort geben (mit Fehlern in den Daten)
return jsonify(response_data)
# Session schließen
if db_session:
try:
db_session.close()
except:
pass
@app.route("/api/printers/status", methods=["GET"])
@login_required
@@ -4371,3 +4458,360 @@ if __name__ == "__main__":
except:
pass
sys.exit(1)
# ===== ADMIN API-ROUTEN FÜR GASTAUFTRÄGE =====
@app.route("/api/admin/requests", methods=["GET"])
@login_required
def admin_get_guest_requests():
"""Admin API: Alle Gastanfragen abrufen mit Filtern."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Gastanfragen verwalten"}), 403
try:
# Request-Parameter
status = request.args.get('status', 'all')
limit = int(request.args.get('limit', 20))
offset = int(request.args.get('offset', 0))
search = request.args.get('search', '')
db_session = get_db_session()
# Basis-Query mit Joins
query = db_session.query(GuestRequest).options(
joinedload(GuestRequest.printer),
joinedload(GuestRequest.processed_by_user),
joinedload(GuestRequest.job)
)
# Status-Filter
if status != 'all':
query = query.filter(GuestRequest.status == status)
# Such-Filter
if search:
search_term = f"%{search}%"
query = query.filter(
(GuestRequest.name.like(search_term)) |
(GuestRequest.email.like(search_term)) |
(GuestRequest.reason.like(search_term))
)
# Gesamt-Anzahl für Pagination
total_count = query.count()
# Sortierung und Pagination
requests = query.order_by(desc(GuestRequest.created_at)).offset(offset).limit(limit).all()
# Statistiken berechnen
stats_query = db_session.query(GuestRequest)
if search: # Auch Statistiken filtern wenn gesucht wird
search_term = f"%{search}%"
stats_query = stats_query.filter(
(GuestRequest.name.like(search_term)) |
(GuestRequest.email.like(search_term)) |
(GuestRequest.reason.like(search_term))
)
stats = {
'total': stats_query.count(),
'pending': stats_query.filter(GuestRequest.status == 'pending').count(),
'approved': stats_query.filter(GuestRequest.status == 'approved').count(),
'denied': stats_query.filter(GuestRequest.status == 'denied').count()
}
# Requests serialisieren
requests_data = []
for req in requests:
request_data = req.to_dict()
# Zusätzliche Informationen hinzufügen
request_data['time_since_creation'] = (datetime.now() - req.created_at).total_seconds() / 3600 # in Stunden
request_data['can_be_processed'] = req.status == 'pending'
# Drucker-Info
if req.printer:
request_data['printer'] = {
'id': req.printer.id,
'name': req.printer.name,
'location': req.printer.location
}
# Bearbeiter-Info
if req.processed_by_user:
request_data['processed_by_user'] = {
'id': req.processed_by_user.id,
'name': req.processed_by_user.name,
'email': req.processed_by_user.email
}
# Job-Info
if req.job:
request_data['job_details'] = {
'id': req.job.id,
'status': req.job.status,
'start_at': req.job.start_at.isoformat() if req.job.start_at else None,
'end_at': req.job.end_at.isoformat() if req.job.end_at else None,
'is_overdue': req.job.end_at and req.job.end_at < datetime.now() if req.job.end_at else False
}
requests_data.append(request_data)
# Pagination-Info
pagination = {
'total': total_count,
'limit': limit,
'offset': offset,
'has_more': offset + limit < total_count
}
db_session.close()
return jsonify({
'success': True,
'requests': requests_data,
'stats': stats,
'pagination': pagination
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der Gastanfragen: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/admin/requests/<int:request_id>", methods=["GET"])
@login_required
def admin_get_guest_request_details(request_id):
"""Admin API: Details einer Gastanfrage abrufen."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Gastanfragen verwalten"}), 403
try:
db_session = get_db_session()
# Request mit allen Beziehungen laden
guest_request = db_session.query(GuestRequest).options(
joinedload(GuestRequest.printer),
joinedload(GuestRequest.processed_by_user),
joinedload(GuestRequest.job)
).filter_by(id=request_id).first()
if not guest_request:
db_session.close()
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
# Verfügbare Drucker für Zuweisung laden
available_printers = db_session.query(Printer).filter_by(active=True).all()
# Detaillierte Anfrage-Daten
request_data = guest_request.to_dict()
# Zusätzliche Informationen
request_data['time_since_creation'] = (datetime.now() - guest_request.created_at).total_seconds() / 3600
request_data['can_be_processed'] = guest_request.status == 'pending'
# Drucker-Info
if guest_request.printer:
request_data['printer'] = {
'id': guest_request.printer.id,
'name': guest_request.printer.name,
'location': guest_request.printer.location,
'status': guest_request.printer.status
}
# Bearbeiter-Info
if guest_request.processed_by_user:
request_data['processed_by_user'] = {
'id': guest_request.processed_by_user.id,
'name': guest_request.processed_by_user.name,
'email': guest_request.processed_by_user.email
}
# Job-Info
if guest_request.job:
request_data['job_details'] = {
'id': guest_request.job.id,
'name': guest_request.job.name,
'status': guest_request.job.status,
'start_at': guest_request.job.start_at.isoformat() if guest_request.job.start_at else None,
'end_at': guest_request.job.end_at.isoformat() if guest_request.job.end_at else None,
'duration_minutes': guest_request.job.duration_minutes,
'is_overdue': guest_request.job.end_at and guest_request.job.end_at < datetime.now() if guest_request.job.end_at else False
}
# Verfügbare Drucker
request_data['available_printers'] = [
{
'id': printer.id,
'name': printer.name,
'location': printer.location or 'Unbekannt',
'status': printer.status
}
for printer in available_printers
]
db_session.close()
return jsonify({
'success': True,
'request': request_data
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der Gastanfrage {request_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/requests/<int:request_id>/approve", methods=["POST"])
@login_required
def admin_approve_guest_request(request_id):
"""Admin API: Gastanfrage genehmigen."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Gastanfragen genehmigen"}), 403
try:
data = request.get_json() or {}
printer_id = data.get('printer_id')
notes = data.get('notes', '')
db_session = get_db_session()
# Gastanfrage laden
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
db_session.close()
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
if guest_request.status != 'pending':
db_session.close()
return jsonify({"error": "Nur wartende Anfragen können genehmigt werden"}), 400
# Drucker validieren (falls angegeben)
printer = None
if printer_id:
printer = db_session.query(Printer).filter_by(id=printer_id, active=True).first()
if not printer:
db_session.close()
return jsonify({"error": "Ungültiger Drucker ausgewählt"}), 400
# Job erstellen
job = Job(
name=f"Gastauftrag: {guest_request.name}",
description=f"Gastauftrag von {guest_request.name}. {guest_request.reason or ''}",
user_id=current_user.id, # Admin als Ersteller
printer_id=printer_id,
duration_minutes=guest_request.duration_min,
status="scheduled",
created_at=datetime.now()
)
db_session.add(job)
db_session.flush() # Um die Job-ID zu erhalten
# Gastanfrage aktualisieren
guest_request.status = 'approved'
guest_request.processed_by = current_user.id
guest_request.processed_at = datetime.now()
guest_request.approval_notes = notes
guest_request.job_id = job.id
if printer_id:
guest_request.printer_id = printer_id
# OTP-Code generieren
otp_code = guest_request.generate_otp()
db_session.commit()
# Log-Eintrag
auth_logger.info(f"Gastanfrage {request_id} genehmigt von Admin {current_user.id}")
# Benachrichtigung erstellen (falls Benachrichtigungssystem implementiert)
try:
Notification.create_for_approvers(
notification_type="guest_request_approved",
payload={
"request_id": guest_request.id,
"approved_by": current_user.name or current_user.email,
"otp_code": otp_code
}
)
except Exception as e:
logger.warning(f"Benachrichtigung konnte nicht erstellt werden: {str(e)}")
db_session.close()
return jsonify({
'success': True,
'message': 'Gastanfrage erfolgreich genehmigt',
'job_id': job.id,
'otp_code': otp_code,
'status': guest_request.status
})
except Exception as e:
logger.error(f"Fehler beim Genehmigen der Gastanfrage {request_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/requests/<int:request_id>/deny", methods=["POST"])
@login_required
def admin_deny_guest_request(request_id):
"""Admin API: Gastanfrage ablehnen."""
if not current_user.is_admin:
return jsonify({"error": "Nur Administratoren können Gastanfragen ablehnen"}), 403
try:
data = request.get_json() or {}
reason = data.get('reason', '').strip()
if not reason:
return jsonify({"error": "Ablehnungsgrund ist erforderlich"}), 400
db_session = get_db_session()
# Gastanfrage laden
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
db_session.close()
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
if guest_request.status != 'pending':
db_session.close()
return jsonify({"error": "Nur wartende Anfragen können abgelehnt werden"}), 400
# Gastanfrage ablehnen
guest_request.status = 'denied'
guest_request.processed_by = current_user.id
guest_request.processed_at = datetime.now()
guest_request.rejection_reason = reason
db_session.commit()
# Log-Eintrag
auth_logger.info(f"Gastanfrage {request_id} abgelehnt von Admin {current_user.id}")
# Benachrichtigung erstellen (falls Benachrichtigungssystem implementiert)
try:
Notification.create_for_approvers(
notification_type="guest_request_denied",
payload={
"request_id": guest_request.id,
"denied_by": current_user.name or current_user.email,
"reason": reason
}
)
except Exception as e:
logger.warning(f"Benachrichtigung konnte nicht erstellt werden: {str(e)}")
db_session.close()
return jsonify({
'success': True,
'message': 'Gastanfrage erfolgreich abgelehnt',
'status': guest_request.status
})
except Exception as e:
logger.error(f"Fehler beim Ablehnen der Gastanfrage {request_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
# ===== ADMIN API-ROUTEN FÜR BENUTZER UND DRUCKER =====