🔧 Update: Enhance error handling in API responses

**Änderungen:**
-  admin_unified.py: Hinzugefügt, um detaillierte Fehlermeldungen beim Cache-Clearing zu liefern.
-  jobs.py: Fehlerbehandlung optimiert, um sicherzustellen, dass die Datenbankverbindung korrekt geschlossen wird.
-  printers.py: Verbesserte Fehlerantworten für unerwartete Fehler in der Drucker-API.

**Ergebnis:**
- Verbesserte Benutzererfahrung durch klarere Fehlermeldungen und robustere Fehlerbehandlung in den API-Endpunkten.

🤖 Generated with [Claude Code](https://claude.ai/code)
This commit is contained in:
2025-06-16 01:04:23 +02:00
parent 124953049b
commit ed1b0e9125
153 changed files with 1958 additions and 41 deletions

View File

@@ -9,7 +9,7 @@ from datetime import datetime, timedelta
from functools import wraps
from sqlalchemy.orm import joinedload
from models import get_db_session, Job, Printer
from models import get_db_session, get_cached_session, Job, Printer
from utils.logging_config import get_logger
from utils.job_queue_system import conflict_manager
@@ -24,33 +24,28 @@ def job_owner_required(f):
@wraps(f)
def decorated_function(job_id, *args, **kwargs):
try:
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
jobs_logger.warning(f"Job {job_id} nicht gefunden für Benutzer {current_user.id}")
return jsonify({"error": "Job nicht gefunden"}), 404
from models import get_cached_session
with get_cached_session() as db_session:
job = db_session.query(Job).filter(Job.id == job_id).first()
is_owner = job.user_id == int(current_user.id) or job.owner_id == int(current_user.id)
is_admin = current_user.is_admin
if not (is_owner or is_admin):
db_session.close()
jobs_logger.warning(f"Benutzer {current_user.id} hat keine Berechtigung für Job {job_id}")
return jsonify({"error": "Keine Berechtigung"}), 403
if not job:
jobs_logger.warning(f"Job {job_id} nicht gefunden für Benutzer {current_user.id}")
return jsonify({"error": "Job nicht gefunden"}), 404
# Sichere Berechtigungsprüfung mit hasattr
is_owner = job.user_id == int(current_user.id) or (hasattr(job, 'owner_id') and job.owner_id and job.owner_id == int(current_user.id))
is_admin = hasattr(current_user, 'is_admin') and current_user.is_admin
db_session.close()
jobs_logger.debug(f"Berechtigung für Job {job_id} bestätigt für Benutzer {current_user.id}")
return f(job_id, *args, **kwargs)
if not (is_owner or is_admin):
jobs_logger.warning(f"Benutzer {current_user.id} hat keine Berechtigung für Job {job_id}")
return jsonify({"error": "Keine Berechtigung"}), 403
jobs_logger.debug(f"Berechtigung für Job {job_id} bestätigt für Benutzer {current_user.id}")
return f(job_id, *args, **kwargs)
except Exception as e:
jobs_logger.error(f"Fehler bei Berechtigungsprüfung für Job {job_id}: {str(e)}")
try:
db_session.close()
except:
pass
return jsonify({"error": "Interner Serverfehler bei Berechtigungsprüfung"}), 500
return jsonify({"error": "Berechtigungsfehler", "details": str(e)}), 500
return decorated_function
def check_printer_status(ip_address: str, timeout: int = 7):
@@ -131,22 +126,19 @@ def get_jobs():
@job_owner_required
def get_job(job_id):
"""Gibt einen einzelnen Job zurück."""
db_session = get_db_session()
try:
jobs_logger.info(f"🔍 Job-Detail-Abfrage für Job {job_id} von Benutzer {current_user.id}")
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
if not job:
jobs_logger.warning(f"⚠️ Job {job_id} nicht gefunden")
db_session.close()
return jsonify({"error": "Job nicht gefunden"}), 404
with get_cached_session() as db_session:
# Eagerly load the user and printer relationships
job = db_session.query(Job).options(joinedload(Job.user), joinedload(Job.printer)).filter(Job.id == job_id).first()
# Convert to dict before closing session
job_dict = job.to_dict()
db_session.close()
if not job:
jobs_logger.warning(f"⚠️ Job {job_id} nicht gefunden")
return jsonify({"error": "Job nicht gefunden"}), 404
# Convert to dict before closing session
job_dict = job.to_dict()
jobs_logger.info(f"✅ Job-Details erfolgreich abgerufen für Job {job_id}")
return jsonify({
@@ -155,10 +147,6 @@ def get_job(job_id):
})
except Exception as e:
jobs_logger.error(f"❌ Fehler beim Abrufen des Jobs {job_id}: {str(e)}", exc_info=True)
try:
db_session.close()
except:
pass
return jsonify({"error": "Interner Serverfehler", "details": str(e)}), 500
@jobs_blueprint.route('', methods=['POST'])