" feat: Integrate calendar functionality into blueprint system "

This commit is contained in:
Till Tomczak 2025-05-29 09:49:06 +02:00
parent 8b9ae6b451
commit f525bfc0a6
6 changed files with 2270 additions and 0 deletions

View File

@ -0,0 +1,280 @@
import json
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, abort
from flask_login import current_user, login_required
from sqlalchemy import and_, or_
from models import Job, Printer, User, UserPermission, get_cached_session
from utils.logging_config import get_logger
calendar_blueprint = Blueprint('calendar', __name__)
logger = get_logger("calendar")
def can_edit_events(user):
"""Prüft, ob ein Benutzer Kalendereinträge bearbeiten darf."""
if user.is_admin:
return True
with get_cached_session() as db_session:
permission = db_session.query(UserPermission).filter_by(user_id=user.id).first()
if not permission:
return False
return permission.can_approve_jobs
@calendar_blueprint.route('/calendar', methods=['GET'])
@login_required
def calendar_view():
"""Kalender-Ansicht anzeigen."""
can_edit = can_edit_events(current_user)
with get_cached_session() as db_session:
printers = db_session.query(Printer).filter_by(active=True).all()
return render_template('calendar.html',
printers=printers,
can_edit=can_edit)
@calendar_blueprint.route('/api/calendar', methods=['GET'])
@login_required
def api_get_calendar_events():
"""Kalendereinträge als JSON für FullCalendar zurückgeben."""
try:
# Datumsbereich aus Anfrage
start_str = request.args.get('from')
end_str = request.args.get('to')
if not start_str or not end_str:
# Standardmäßig eine Woche anzeigen
start_date = datetime.now()
end_date = start_date + timedelta(days=7)
else:
try:
start_date = datetime.fromisoformat(start_str)
end_date = datetime.fromisoformat(end_str)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
# Optional: Filter nach Druckern
printer_id = request.args.get('printer_id')
with get_cached_session() as db_session:
# Jobs im angegebenen Zeitraum abfragen
query = db_session.query(Job).filter(
or_(
# Jobs, die im Zeitraum beginnen
and_(Job.start_at >= start_date, Job.start_at <= end_date),
# Jobs, die im Zeitraum enden
and_(Job.end_at >= start_date, Job.end_at <= end_date),
# Jobs, die den Zeitraum komplett umfassen
and_(Job.start_at <= start_date, Job.end_at >= end_date)
)
)
if printer_id:
query = query.filter(Job.printer_id == printer_id)
jobs = query.all()
# Jobs in FullCalendar-Event-Format umwandeln
events = []
for job in jobs:
# Farbe basierend auf Status bestimmen
color = "#6B7280" # Grau für pending
if job.status == "running":
color = "#3B82F6" # Blau für running
elif job.status == "finished":
color = "#10B981" # Grün für finished
elif job.status == "scheduled":
color = "#10B981" # Grün für approved
elif job.status == "cancelled" or job.status == "failed":
color = "#EF4444" # Rot für abgebrochen/fehlgeschlagen
# Benutzerinformationen laden
user = db_session.query(User).filter_by(id=job.user_id).first()
user_name = user.name if user else "Unbekannt"
# Druckerinformationen laden
printer = db_session.query(Printer).filter_by(id=job.printer_id).first()
printer_name = printer.name if printer else "Unbekannt"
event = {
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"color": color,
"extendedProps": {
"status": job.status,
"description": job.description,
"userName": user_name,
"printerId": job.printer_id,
"printerName": printer_name
}
}
events.append(event)
return jsonify(events)
except Exception as e:
logger.error(f"Fehler beim Abrufen der Kalendereinträge: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event', methods=['POST'])
@login_required
def api_create_calendar_event():
"""Neuen Kalendereintrag (Job) erstellen."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge erstellen
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Erstellen von Kalendereinträgen"}), 403
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
# Pflichtfelder prüfen
title = data.get('title')
start = data.get('start')
end = data.get('end')
printer_id = data.get('printerId')
if not all([title, start, end, printer_id]):
return jsonify({"error": "Titel, Start, Ende und Drucker sind erforderlich"}), 400
# Datumsfelder konvertieren
try:
start_date = datetime.fromisoformat(start)
end_date = datetime.fromisoformat(end)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
# Dauer in Minuten berechnen
duration_minutes = int((end_date - start_date).total_seconds() / 60)
with get_cached_session() as db_session:
# Drucker prüfen
printer = db_session.query(Printer).filter_by(id=printer_id).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
# Neuen Job erstellen
job = Job(
name=title,
description=data.get('description', ''),
user_id=current_user.id,
printer_id=printer_id,
start_at=start_date,
end_at=end_date,
status="scheduled",
duration_minutes=duration_minutes,
owner_id=current_user.id
)
db_session.add(job)
db_session.commit()
logger.info(f"Neuer Kalendereintrag erstellt: ID {job.id}, Name: {title}")
return jsonify({
"success": True,
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status
})
except Exception as e:
logger.error(f"Fehler beim Erstellen des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event/<int:event_id>', methods=['PUT'])
@login_required
def api_update_calendar_event(event_id):
"""Kalendereintrag (Job) aktualisieren."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge bearbeiten
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Bearbeiten von Kalendereinträgen"}), 403
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
with get_cached_session() as db_session:
job = db_session.query(Job).filter_by(id=event_id).first()
if not job:
return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404
# Felder aktualisieren, die im Request enthalten sind
if 'title' in data:
job.name = data['title']
if 'description' in data:
job.description = data['description']
if 'start' in data and 'end' in data:
try:
start_date = datetime.fromisoformat(data['start'])
end_date = datetime.fromisoformat(data['end'])
job.start_at = start_date
job.end_at = end_date
job.duration_minutes = int((end_date - start_date).total_seconds() / 60)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
if 'printerId' in data:
printer = db_session.query(Printer).filter_by(id=data['printerId']).first()
if not printer:
return jsonify({"error": "Drucker nicht gefunden"}), 404
job.printer_id = data['printerId']
if 'status' in data:
# Status nur ändern, wenn er gültig ist
valid_statuses = ["scheduled", "running", "finished", "cancelled"]
if data['status'] in valid_statuses:
job.status = data['status']
db_session.commit()
logger.info(f"Kalendereintrag aktualisiert: ID {job.id}")
return jsonify({
"success": True,
"id": job.id,
"title": job.name,
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"status": job.status
})
except Exception as e:
logger.error(f"Fehler beim Aktualisieren des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@calendar_blueprint.route('/api/calendar/event/<int:event_id>', methods=['DELETE'])
@login_required
def api_delete_calendar_event(event_id):
"""Kalendereintrag (Job) löschen."""
# Nur Admins und Benutzer mit can_approve_jobs dürfen Einträge löschen
if not can_edit_events(current_user):
return jsonify({"error": "Keine Berechtigung zum Löschen von Kalendereinträgen"}), 403
try:
with get_cached_session() as db_session:
job = db_session.query(Job).filter_by(id=event_id).first()
if not job:
return jsonify({"error": "Kalendereintrag nicht gefunden"}), 404
db_session.delete(job)
db_session.commit()
logger.info(f"Kalendereintrag gelöscht: ID {event_id}")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Fehler beim Löschen des Kalendereintrags: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500

View File

@ -0,0 +1,349 @@
import json
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, abort, session, flash
from flask_login import current_user, login_required
from functools import wraps
from sqlalchemy import desc
from models import GuestRequest, Job, Printer, User, UserPermission, Notification, get_cached_session
from utils.logging_config import get_logger
guest_blueprint = Blueprint('guest', __name__)
logger = get_logger("guest")
# Hilfsfunktionen
def can_approve_jobs(user_id):
"""Prüft, ob ein Benutzer Anfragen genehmigen darf."""
with get_cached_session() as db_session:
permission = db_session.query(UserPermission).filter_by(user_id=user_id).first()
if not permission:
return False
return permission.can_approve_jobs
def approver_required(f):
"""Decorator zur Prüfung der Genehmigungsberechtigung."""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if not can_approve_jobs(current_user.id):
abort(403, "Keine Berechtigung zum Genehmigen von Anfragen")
return f(*args, **kwargs)
return decorated_function
# Gast-Routen
@guest_blueprint.route('/request', methods=['GET'])
def guest_request_form():
"""Formular für Gastanfragen anzeigen."""
with get_cached_session() as db_session:
printers = db_session.query(Printer).filter_by(active=True).all()
return render_template('guest_request.html', printers=printers)
@guest_blueprint.route('/request/<int:request_id>', methods=['GET'])
def guest_request_status(request_id):
"""Status einer Gastanfrage anzeigen."""
with get_cached_session() as db_session:
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
abort(404, "Anfrage nicht gefunden")
# Nur wenn Status "approved" ist, OTP generieren und anzeigen
otp_code = None
if guest_request.status == "approved" and not guest_request.otp_code:
otp_code = guest_request.generate_otp()
db_session.commit()
# Zugehörigen Job laden, falls vorhanden
job = None
if guest_request.job_id:
job = db_session.query(Job).filter_by(id=guest_request.job_id).first()
return render_template('guest_status.html',
request=guest_request,
job=job,
otp_code=otp_code)
# API-Endpunkte
@guest_blueprint.route('/api/guest/requests', methods=['POST'])
def api_create_guest_request():
"""Neue Gastanfrage erstellen."""
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
# Pflichtfelder prüfen
name = data.get('name')
if not name:
return jsonify({"error": "Name ist erforderlich"}), 400
# Optionale Felder
email = data.get('email')
reason = data.get('reason')
duration_min = data.get('duration_min', 60) # Standard: 1 Stunde
printer_id = data.get('printer_id')
# IP-Adresse erfassen
author_ip = request.remote_addr
try:
with get_cached_session() as db_session:
# Drucker prüfen
if printer_id:
printer = db_session.query(Printer).filter_by(id=printer_id, active=True).first()
if not printer:
return jsonify({"error": "Ungültiger Drucker ausgewählt"}), 400
# Neue Anfrage erstellen
guest_request = GuestRequest(
name=name,
email=email,
reason=reason,
duration_min=duration_min,
printer_id=printer_id,
author_ip=author_ip
)
db_session.add(guest_request)
db_session.commit()
# Benachrichtigung für Genehmiger erstellen
Notification.create_for_approvers(
notification_type="guest_request",
payload={
"request_id": guest_request.id,
"name": guest_request.name,
"created_at": guest_request.created_at.isoformat(),
"status": guest_request.status
}
)
logger.info(f"Neue Gastanfrage erstellt: ID {guest_request.id}, Name: {name}")
return jsonify({
"success": True,
"request_id": guest_request.id,
"status": guest_request.status,
"redirect_url": url_for('guest.guest_request_status', request_id=guest_request.id)
})
except Exception as e:
logger.error(f"Fehler beim Erstellen der Gastanfrage: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@guest_blueprint.route('/api/guest/requests/<int:request_id>', methods=['GET'])
def api_get_guest_request(request_id):
"""Status einer Gastanfrage abrufen."""
try:
with get_cached_session() as db_session:
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
return jsonify({"error": "Anfrage nicht gefunden"}), 404
# OTP wird nie über die API zurückgegeben
response_data = guest_request.to_dict()
response_data.pop("otp_code", None)
return jsonify(response_data)
except Exception as e:
logger.error(f"Fehler beim Abrufen der Gastanfrage: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@guest_blueprint.route('/api/requests/<int:request_id>/approve', methods=['POST'])
@approver_required
def api_approve_request(request_id):
"""Gastanfrage genehmigen."""
try:
with get_cached_session() as db_session:
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
return jsonify({"error": "Anfrage nicht gefunden"}), 404
if guest_request.status != "pending":
return jsonify({"error": "Anfrage wurde bereits bearbeitet"}), 400
# Anfrage genehmigen
guest_request.status = "approved"
# OTP generieren
otp_plain = guest_request.generate_otp()
# Zugehörigen Job erstellen
start_time = datetime.now() + timedelta(minutes=5) # Start in 5 Minuten
end_time = start_time + timedelta(minutes=guest_request.duration_min)
# Admin-Benutzer als Eigentümer verwenden
admin_user = db_session.query(User).filter_by(role="admin").first()
job = Job(
name=f"Gastauftrag: {guest_request.name}",
description=guest_request.reason or "Gastauftrag",
user_id=admin_user.id,
printer_id=guest_request.printer_id,
start_at=start_time,
end_at=end_time,
status="scheduled",
duration_minutes=guest_request.duration_min,
owner_id=admin_user.id
)
db_session.add(job)
db_session.flush() # ID generieren
# Job-ID in Gastanfrage speichern
guest_request.job_id = job.id
db_session.commit()
logger.info(f"Gastanfrage {request_id} genehmigt von Benutzer {current_user.id}")
return jsonify({
"success": True,
"status": "approved",
"job_id": job.id,
"otp": otp_plain # Nur in dieser Antwort wird der OTP-Klartext zurückgegeben
})
except Exception as e:
logger.error(f"Fehler beim Genehmigen der Gastanfrage: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@guest_blueprint.route('/api/requests/<int:request_id>/deny', methods=['POST'])
@approver_required
def api_deny_request(request_id):
"""Gastanfrage ablehnen."""
try:
data = request.get_json() or {}
reason = data.get('reason', '')
with get_cached_session() as db_session:
guest_request = db_session.query(GuestRequest).filter_by(id=request_id).first()
if not guest_request:
return jsonify({"error": "Anfrage nicht gefunden"}), 404
if guest_request.status != "pending":
return jsonify({"error": "Anfrage wurde bereits bearbeitet"}), 400
# Anfrage ablehnen
guest_request.status = "denied"
db_session.commit()
logger.info(f"Gastanfrage {request_id} abgelehnt von Benutzer {current_user.id}")
return jsonify({
"success": True,
"status": "denied"
})
except Exception as e:
logger.error(f"Fehler beim Ablehnen der Gastanfrage: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@guest_blueprint.route('/api/jobs/start/<string:otp>', methods=['POST'])
def api_start_job_with_otp(otp):
"""Job mit OTP-Code starten."""
if not otp:
return jsonify({"error": "Kein OTP-Code angegeben"}), 400
try:
with get_cached_session() as db_session:
# Alle Gastanfragen mit approved-Status durchsuchen
guest_requests = db_session.query(GuestRequest).filter_by(status="approved").all()
valid_request = None
for req in guest_requests:
if req.verify_otp(otp):
valid_request = req
break
if not valid_request:
return jsonify({"error": "Ungültiger OTP-Code"}), 400
# Zugehörigen Job laden
job = db_session.query(Job).filter_by(id=valid_request.job_id).first()
if not job:
return jsonify({"error": "Kein Job für diese Anfrage gefunden"}), 404
# Grace-Period prüfen (5 Minuten nach geplantem Start)
now = datetime.now()
grace_end = job.start_at + timedelta(minutes=5)
if now > job.end_at:
return jsonify({"error": "Der Job ist bereits abgelaufen"}), 400
# Job starten
job.status = "running"
# OTP-Code nach Verwendung löschen
valid_request.otp_code = None
db_session.commit()
logger.info(f"Job {job.id} mit OTP-Code gestartet")
return jsonify({
"success": True,
"job_id": job.id,
"status": "running"
})
except Exception as e:
logger.error(f"Fehler beim Starten des Jobs mit OTP: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@guest_blueprint.route('/api/notifications', methods=['GET'])
@login_required
def api_get_notifications():
"""Benachrichtigungen für den aktuellen Benutzer abrufen."""
try:
# Zeitstempel für Filter (nur neue Benachrichtigungen)
since = request.args.get('since')
if since:
try:
since_date = datetime.fromisoformat(since)
except ValueError:
return jsonify({"error": "Ungültiges Datumsformat"}), 400
else:
since_date = None
with get_cached_session() as db_session:
query = db_session.query(Notification).filter_by(
user_id=current_user.id,
read=False
)
if since_date:
query = query.filter(Notification.created_at > since_date)
notifications = query.order_by(desc(Notification.created_at)).all()
return jsonify({
"count": len(notifications),
"notifications": [n.to_dict() for n in notifications]
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der Benachrichtigungen: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500
@guest_blueprint.route('/api/notifications/<int:notification_id>/read', methods=['POST'])
@login_required
def api_mark_notification_read(notification_id):
"""Benachrichtigung als gelesen markieren."""
try:
with get_cached_session() as db_session:
notification = db_session.query(Notification).filter_by(
id=notification_id,
user_id=current_user.id
).first()
if not notification:
return jsonify({"error": "Benachrichtigung nicht gefunden"}), 404
notification.read = True
db_session.commit()
return jsonify({"success": True})
except Exception as e:
logger.error(f"Fehler beim Markieren der Benachrichtigung als gelesen: {str(e)}")
return jsonify({"error": "Fehler beim Verarbeiten der Anfrage"}), 500

View File

@ -0,0 +1 @@

625
backend/app/debug_cli.py Normal file
View File

@ -0,0 +1,625 @@
#!/usr/bin/env python3
"""
MYP Debug CLI
Kommandozeilen-Tool für Diagnose und Debugging der MYP-Anwendung
"""
import os
import sys
import argparse
import time
import json
import importlib
import logging
import sqlite3
from datetime import datetime
import traceback
from pprint import pprint
# Eigene Module importieren
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Farbige Ausgabe für die Konsole
COLORS = {
'RESET': '\033[0m',
'BOLD': '\033[1m',
'RED': '\033[31m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'BLUE': '\033[34m',
'MAGENTA': '\033[35m',
'CYAN': '\033[36m',
}
# Prüfen, ob das Terminal Farben unterstützt
def supports_color():
"""Prüft, ob das Terminal Farben unterstützt."""
if os.name == 'nt':
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# Aktiviere VT100-Unterstützung unter Windows
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
return True
except:
return False
else:
return sys.stdout.isatty()
USE_COLOR = supports_color()
def colorize(text, color):
"""Färbt den Text ein, wenn Farben unterstützt werden."""
if USE_COLOR and color in COLORS:
return f"{COLORS[color]}{text}{COLORS['RESET']}"
return text
def print_success(message):
print(f"{colorize(message, 'GREEN')}")
def print_error(message):
print(f"{colorize(message, 'RED')}")
def print_warning(message):
print(f"⚠️ {colorize(message, 'YELLOW')}")
def print_info(message):
print(f" {colorize(message, 'BLUE')}")
def print_debug(message):
print(f"🔍 {colorize(message, 'CYAN')}")
def print_header(message):
print(f"\n{colorize('='*80, 'BOLD')}")
print(f"{colorize(message.center(80), 'BOLD')}")
print(f"{colorize('='*80, 'BOLD')}\n")
def print_section(message):
print(f"\n{colorize('-'*40, 'BOLD')}")
print(f"{colorize(message, 'BOLD')}")
print(f"{colorize('-'*40, 'BOLD')}\n")
# Hilfsfunktionen
def get_database_path():
"""Gibt den Pfad zur Datenbank zurück."""
try:
from config.settings import DATABASE_PATH
return DATABASE_PATH
except ImportError:
# Fallback auf Standard-Pfad
base_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_dir, "database", "myp.db")
def check_database():
"""Prüft den Zustand der Datenbank."""
db_path = get_database_path()
if not os.path.exists(db_path):
print_error(f"Datenbank nicht gefunden: {db_path}")
return False
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Tabellen auflisten
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print_info(f"Datenbank gefunden: {db_path}")
print_info(f"Größe: {os.path.getsize(db_path) / (1024*1024):.2f} MB")
print_info(f"Tabellen ({len(tables)}):")
for table in tables:
# Anzahl der Datensätze pro Tabelle
cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
count = cursor.fetchone()[0]
print(f" - {table[0]}: {count} Einträge")
conn.close()
return True
except sqlite3.Error as e:
print_error(f"Datenbankfehler: {e}")
return False
except Exception as e:
print_error(f"Fehler beim Prüfen der Datenbank: {e}")
return False
def check_log_files():
"""Prüft die Log-Dateien und zeigt die neuesten Einträge an."""
try:
from config.settings import LOG_DIR, LOG_SUBDIRS
if not os.path.exists(LOG_DIR):
print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}")
return False
print_info(f"Log-Verzeichnis: {LOG_DIR}")
for subdir in LOG_SUBDIRS:
log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log")
if not os.path.exists(log_path):
print_warning(f"Log-Datei nicht gefunden: {log_path}")
continue
size = os.path.getsize(log_path) / 1024 # KB
print_info(f"Log-Datei: {subdir}.log ({size:.1f} KB)")
# Letzte Zeilen anzeigen
try:
with open(log_path, 'r') as f:
lines = f.readlines()
last_lines = lines[-5:] # Letzte 5 Zeilen
print(" Letzte Einträge:")
for line in last_lines:
line = line.strip()
# Farbliche Hervorhebung je nach Log-Level
if "ERROR" in line:
print(f" {colorize(line, 'RED')}")
elif "WARNING" in line:
print(f" {colorize(line, 'YELLOW')}")
elif "INFO" in line:
print(f" {colorize(line, 'GREEN')}")
elif "DEBUG" in line:
print(f" {colorize(line, 'CYAN')}")
else:
print(f" {line}")
except Exception as e:
print_warning(f" Fehler beim Lesen der Log-Datei: {e}")
return True
except ImportError:
print_error("Konfiguration für Logs nicht gefunden")
return False
except Exception as e:
print_error(f"Fehler beim Prüfen der Log-Dateien: {e}")
return False
def check_environment():
"""Prüft die Umgebungsvariablen und System-Einstellungen."""
print_info("Umgebungsinformationen:")
print(f" Python-Version: {sys.version.split()[0]}")
print(f" Betriebssystem: {os.name} - {sys.platform}")
print(f" Arbeitsverzeichnis: {os.getcwd()}")
print_info("Wichtige Umgebungsvariablen:")
env_vars = [
"FLASK_ENV", "FLASK_DEBUG", "MYP_SSL_ENABLED",
"MYP_SSL_HOSTNAME", "PYTHONPATH"
]
for var in env_vars:
value = os.environ.get(var, "nicht gesetzt")
print(f" {var}: {value}")
try:
# Flask-Konfiguration prüfen
print_info("Flask-Konfiguration:")
from config.settings import FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SSL_ENABLED
print(f" Host: {FLASK_HOST}")
print(f" Port: {FLASK_PORT}")
print(f" Debug-Modus: {FLASK_DEBUG}")
print(f" SSL aktiviert: {SSL_ENABLED}")
# Module prüfen
required_modules = [
'flask', 'sqlalchemy', 'flask_login', 'werkzeug'
]
print_info("Benötigte Module:")
for module in required_modules:
try:
mod = importlib.import_module(module)
version = getattr(mod, '__version__', 'unbekannt')
print(f" {module}: {colorize('OK', 'GREEN')} (Version {version})")
except ImportError:
print(f" {module}: {colorize('FEHLT', 'RED')}")
except ImportError:
print_warning("Flask-Konfiguration konnte nicht geladen werden")
except Exception as e:
print_error(f"Fehler beim Prüfen der Umgebung: {e}")
def scan_printer(ip_address, timeout=5):
"""Scannt einen Drucker und zeigt Informationen an."""
import socket
print_info(f"Prüfe Drucker mit IP: {ip_address}")
# Ping testen
import subprocess
try:
if os.name == 'nt': # Windows
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address]
else: # Unix/Linux/macOS
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address]
print(f" Ping-Test: ", end="")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(colorize("Erreichbar", "GREEN"))
else:
print(colorize("Nicht erreichbar", "RED"))
print(f" Details: {result.stdout}")
return
except Exception as e:
print(colorize(f"Fehler bei Ping-Test: {e}", "RED"))
# Offene Ports prüfen
common_ports = [80, 443, 8080, 8443, 631, 9100, 9101, 9102]
open_ports = []
print(" Port-Scan: ", end="")
for port in common_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip_address, port))
if result == 0:
open_ports.append(port)
sock.close()
if open_ports:
print(colorize(f"Offene Ports: {', '.join(map(str, open_ports))}", "GREEN"))
else:
print(colorize("Keine offenen Ports gefunden", "YELLOW"))
# Drucker-Info über Tapo-API testen (wenn vorhanden)
try:
from PyP100 import PyP110
print(" Smart Plug Test: ", end="")
try:
# Standardmäßig Anmeldeinformationen aus der Konfiguration verwenden
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
p110 = PyP110.P110(ip_address, TAPO_USERNAME, TAPO_PASSWORD)
p110.handshake()
p110.login()
device_info = p110.getDeviceInfo()
print(colorize("Verbunden", "GREEN"))
print(f" Gerätename: {device_info.get('nickname', 'Unbekannt')}")
print(f" Status: {'Ein' if device_info.get('device_on', False) else 'Aus'}")
if 'on_time' in device_info:
on_time = device_info['on_time']
print(f" Betriebszeit: {on_time // 60} Minuten, {on_time % 60} Sekunden")
except Exception as e:
print(colorize(f"Fehler: {e}", "RED"))
except ImportError:
print_warning(" PyP100-Modul nicht verfügbar - Smart Plug Test übersprungen")
def check_printers_from_db():
"""Prüft die in der Datenbank gespeicherten Drucker."""
db_path = get_database_path()
if not os.path.exists(db_path):
print_error(f"Datenbank nicht gefunden: {db_path}")
return
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Drucker-Tabelle prüfen
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='printer';")
if not cursor.fetchone():
print_error("Drucker-Tabelle nicht gefunden")
conn.close()
return
# Drucker auslesen
cursor.execute("SELECT * FROM printer;")
printers = cursor.fetchall()
if not printers:
print_warning("Keine Drucker in der Datenbank gefunden")
conn.close()
return
print_info(f"{len(printers)} Drucker gefunden:")
for printer in printers:
status_color = 'GREEN' if printer['status'] == 'online' else 'RED'
print(f" {printer['name']}: {colorize(printer['status'], status_color)}")
print(f" IP: {printer['ip_address']}")
print(f" Plug IP: {printer['plug_ip'] or 'Nicht konfiguriert'}")
# Detaillierteren Status prüfen
if printer['plug_ip']:
ask = input(f" Möchten Sie den Drucker {printer['name']} scannen? (j/n): ")
if ask.lower() in ('j', 'ja', 'y', 'yes'):
scan_printer(printer['plug_ip'])
conn.close()
except Exception as e:
print_error(f"Fehler beim Prüfen der Drucker: {e}")
traceback.print_exc()
def check_flask_routes():
"""Zeigt alle verfügbaren Flask-Routen an."""
try:
# Versuche, die Flask-App zu importieren
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from app import app as flask_app
except ImportError:
print_error("Flask-App konnte nicht importiert werden")
return
# Alle Routen auflisten
print_info("Verfügbare Flask-Routen:")
routes = []
for rule in flask_app.url_map.iter_rules():
routes.append({
'endpoint': rule.endpoint,
'methods': ', '.join(sorted(rule.methods - {'HEAD', 'OPTIONS'})),
'path': rule.rule
})
# Nach Pfad sortieren
routes = sorted(routes, key=lambda x: x['path'])
# Routen anzeigen
for route in routes:
method_color = 'GREEN' if 'GET' in route['methods'] else 'BLUE'
print(f" {colorize(route['methods'], method_color)} {route['path']}")
print(f"{route['endpoint']}")
print_info(f"Insgesamt {len(routes)} Routen gefunden")
except Exception as e:
print_error(f"Fehler beim Abrufen der Flask-Routen: {e}")
traceback.print_exc()
def print_system_info():
"""Zeigt detaillierte Systeminformationen an."""
print_header("Systeminformationen")
print_section("Basisinformationen")
import platform
print(f"Python-Version: {platform.python_version()}")
print(f"Betriebssystem: {platform.system()} {platform.release()}")
print(f"Architektur: {platform.machine()}")
print(f"Prozessor: {platform.processor()}")
print_section("Speicher")
try:
import psutil
vm = psutil.virtual_memory()
print(f"Gesamter Speicher: {vm.total / (1024**3):.1f} GB")
print(f"Verfügbarer Speicher: {vm.available / (1024**3):.1f} GB")
print(f"Speicherauslastung: {vm.percent}%")
disk = psutil.disk_usage('/')
print(f"Festplatte gesamt: {disk.total / (1024**3):.1f} GB")
print(f"Festplatte frei: {disk.free / (1024**3):.1f} GB")
print(f"Festplattenauslastung: {disk.percent}%")
except ImportError:
print_warning("psutil-Modul nicht verfügbar - eingeschränkte Informationen")
print_section("Netzwerk")
try:
import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"Hostname: {hostname}")
print(f"IP-Adresse: {ip_address}")
# Netzwerkschnittstellen
if 'psutil' in sys.modules:
print("Netzwerkschnittstellen:")
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET:
print(f" {name}: {addr.address}")
except Exception as e:
print_warning(f"Fehler beim Abrufen der Netzwerkinformationen: {e}")
# Hauptfunktionen für die Befehlszeile
def diagnose():
"""Führt eine umfassende Diagnose durch."""
print_header("MYP Diagnose-Tool")
print_section("Systemprüfung")
check_environment()
print_section("Datenbankprüfung")
check_database()
print_section("Log-Dateien")
check_log_files()
print_success("Diagnose abgeschlossen!")
def scan_printers():
"""Scannt und prüft alle Drucker."""
print_header("Drucker-Scanner")
# Direkter Scan einer IP-Adresse
ip = input("IP-Adresse zum Scannen (leer lassen, um Drucker aus der Datenbank zu prüfen): ")
if ip:
scan_printer(ip)
else:
check_printers_from_db()
def show_routes():
"""Zeigt alle verfügbaren API-Routen an."""
print_header("API-Routen")
check_flask_routes()
def system_info():
"""Zeigt detaillierte Systeminformationen an."""
print_system_info()
def show_logs():
"""Zeigt und analysiert Log-Dateien."""
print_header("Log-Analyse")
try:
from config.settings import LOG_DIR, LOG_SUBDIRS
if not os.path.exists(LOG_DIR):
print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}")
return
print_info(f"Log-Verzeichnis: {LOG_DIR}")
print_info("Verfügbare Logs:")
for i, subdir in enumerate(LOG_SUBDIRS, 1):
log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log")
size = "Nicht gefunden"
if os.path.exists(log_path):
size = f"{os.path.getsize(log_path) / 1024:.1f} KB"
print(f" {i}. {subdir}.log ({size})")
choice = input("\nWelches Log möchten Sie anzeigen? (Nummer oder Name): ")
# Nummer in Namen umwandeln
try:
choice_num = int(choice) - 1
if 0 <= choice_num < len(LOG_SUBDIRS):
choice = LOG_SUBDIRS[choice_num]
except ValueError:
pass
# Prüfen, ob die Wahl gültig ist
if choice not in LOG_SUBDIRS:
print_error(f"Ungültige Auswahl: {choice}")
return
log_path = os.path.join(LOG_DIR, choice, f"{choice}.log")
if not os.path.exists(log_path):
print_error(f"Log-Datei nicht gefunden: {log_path}")
return
# Anzahl der anzuzeigenden Zeilen
lines_count = input("Anzahl der anzuzeigenden Zeilen (Standard: 20): ")
lines_count = int(lines_count) if lines_count.isdigit() else 20
# Filter für bestimmte Log-Level
level_filter = input("Nach Log-Level filtern (INFO, WARNING, ERROR oder leer für alle): ").upper()
# Log-Datei anzeigen
with open(log_path, 'r') as f:
lines = f.readlines()
# Filtern nach Log-Level
if level_filter:
lines = [line for line in lines if level_filter in line]
# Letzte n Zeilen auswählen
lines = lines[-lines_count:]
print_section(f"Log-Datei: {choice}.log (letzte {len(lines)} Einträge)")
for line in lines:
line = line.strip()
# Farbliche Hervorhebung je nach Log-Level
if "ERROR" in line:
print(colorize(line, 'RED'))
elif "WARNING" in line:
print(colorize(line, 'YELLOW'))
elif "INFO" in line:
print(colorize(line, 'GREEN'))
elif "DEBUG" in line:
print(colorize(line, 'CYAN'))
else:
print(line)
except ImportError:
print_error("Konfiguration für Logs nicht gefunden")
except Exception as e:
print_error(f"Fehler beim Anzeigen der Log-Dateien: {e}")
traceback.print_exc()
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="MYP Debug CLI")
subparsers = parser.add_subparsers(dest="command", help="Befehl")
# Diagnose
diag_parser = subparsers.add_parser("diagnose", help="Führt eine umfassende Diagnose durch")
# Drucker scannen
scan_parser = subparsers.add_parser("scan", help="Scannt und prüft alle Drucker")
# Routen anzeigen
routes_parser = subparsers.add_parser("routes", help="Zeigt alle verfügbaren API-Routen an")
# Systeminformationen
sysinfo_parser = subparsers.add_parser("sysinfo", help="Zeigt detaillierte Systeminformationen an")
# Logs anzeigen
logs_parser = subparsers.add_parser("logs", help="Zeigt und analysiert Log-Dateien")
return parser.parse_args()
def main():
"""Hauptfunktion."""
args = parse_args()
if args.command == "diagnose":
diagnose()
elif args.command == "scan":
scan_printers()
elif args.command == "routes":
show_routes()
elif args.command == "sysinfo":
system_info()
elif args.command == "logs":
show_logs()
else:
# Interaktives Menü, wenn kein Befehl angegeben wurde
print_header("MYP Debug CLI")
print("Wählen Sie eine Option:")
print(" 1. Diagnose durchführen")
print(" 2. Drucker scannen")
print(" 3. API-Routen anzeigen")
print(" 4. Systeminformationen anzeigen")
print(" 5. Log-Dateien anzeigen")
print(" 0. Beenden")
choice = input("\nIhre Wahl: ")
if choice == "1":
diagnose()
elif choice == "2":
scan_printers()
elif choice == "3":
show_routes()
elif choice == "4":
system_info()
elif choice == "5":
show_logs()
elif choice == "0":
print("Auf Wiedersehen!")
sys.exit(0)
else:
print_error("Ungültige Auswahl")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print_info("\nProgramm wurde durch Benutzer abgebrochen")
except Exception as e:
print_error(f"Unerwarteter Fehler: {e}")
traceback.print_exc()

View File

@ -626,5 +626,628 @@
});
});
}
// Helper functions
function getPrinterStatusColor(status) {
switch (status) {
case 'available': return 'bg-green-100 text-green-800 dark:bg-green-900/30 dark:text-green-300';
case 'busy': return 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900/30 dark:text-yellow-300';
case 'offline': return 'bg-red-100 text-red-800 dark:bg-red-900/30 dark:text-red-300';
case 'maintenance': return 'bg-slate-200 text-slate-800 dark:bg-slate-700 dark:text-slate-300';
default: return 'bg-slate-200 text-slate-800 dark:bg-slate-700 dark:text-slate-300';
}
}
function getPrinterStatusText(status) {
switch (status) {
case 'available': return 'Steckdose AN';
case 'busy': return 'Beschäftigt';
case 'offline': return 'Steckdose AUS';
case 'maintenance': return 'Wartung';
default: return 'Unbekannt';
}
}
// Format date helper
function formatDate(dateString) {
if (!dateString) return 'Unbekannt';
try {
const date = new Date(dateString);
return date.toLocaleString('de-DE');
} catch (e) {
console.error('Fehler beim Formatieren des Datums:', e);
return 'Ungültiges Datum';
}
}
// Format time helper für Status-Zeitstempel
function formatTime(dateString) {
if (!dateString) return 'Unbekannt';
try {
const date = new Date(dateString);
const now = new Date();
const diffMs = now - date;
const diffSecs = Math.floor(diffMs / 1000);
const diffMins = Math.floor(diffSecs / 60);
if (diffSecs < 60) {
return 'gerade eben';
} else if (diffMins < 60) {
return `vor ${diffMins} Min`;
} else {
return date.toLocaleTimeString('de-DE', { hour: '2-digit', minute: '2-digit' });
}
} catch (e) {
console.error('Fehler beim Formatieren der Zeit:', e);
return 'Ungültige Zeit';
}
}
// Refresh printers mit Status-Check - Make it globally available
function refreshPrinters() {
console.log('refreshPrinters function called');
const grid = document.getElementById('printers-grid');
const refreshBtn = document.querySelector('button[onclick="refreshPrinters()"]');
// Button deaktivieren und Loading-State anzeigen
if (refreshBtn) {
refreshBtn.disabled = true;
refreshBtn.innerHTML = `
<svg class="animate-spin h-4 w-4 sm:h-5 sm:w-5 mr-1 sm:mr-2" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<circle cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4" class="opacity-25"></circle>
<path fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z" class="opacity-75"></path>
</svg>
Überprüfe Status...
`;
}
// Loading-State im Grid anzeigen
grid.innerHTML = `
<div class="col-span-full text-center py-6 sm:py-12">
<div class="animate-spin rounded-full h-8 w-8 sm:h-12 sm:w-12 border-b-2 border-indigo-600 dark:border-indigo-400 mx-auto"></div>
<p class="mt-3 sm:mt-4 text-sm sm:text-base text-slate-600 dark:text-slate-400">Überprüfe Drucker-Status...</p>
<p class="mt-1 text-xs text-slate-500 dark:text-slate-500">Dies kann bis zu 7 Sekunden dauern</p>
</div>
`;
// Drucker laden mit Live-Status-Check
loadPrintersWithLiveStatus().finally(() => {
// Button wieder aktivieren
if (refreshBtn) {
refreshBtn.disabled = false;
refreshBtn.innerHTML = `
<svg class="h-4 w-4 sm:h-5 sm:w-5 mr-1 sm:mr-2" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
Jetzt aktualisieren
`;
}
});
}
// Show error message
function showError(message) {
const grid = document.getElementById('printers-grid');
if (!grid) return;
grid.innerHTML = `
<div class="col-span-full text-center py-6 sm:py-12">
<svg class="h-12 w-12 sm:h-16 sm:w-16 text-red-500 dark:text-red-400 mx-auto mb-3 sm:mb-4" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 8v4m0 4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
<h3 class="text-lg font-medium text-slate-900 dark:text-white mb-2">Drucker konnten nicht geladen werden</h3>
<p class="text-slate-700 dark:text-slate-300 text-sm sm:text-base mb-4 max-w-md mx-auto">${message}</p>
<div class="flex flex-col sm:flex-row gap-3 justify-center items-center">
<button onclick="loadPrinters()" class="bg-indigo-600 hover:bg-indigo-700 dark:bg-indigo-600 dark:hover:bg-indigo-500 text-white px-4 sm:px-6 py-2 rounded-lg transition-all duration-200 text-sm sm:text-base flex items-center">
<svg class="h-4 w-4 mr-2" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
Erneut versuchen
</button>
<button onclick="refreshPrinters()" class="bg-green-600 hover:bg-green-700 dark:bg-green-600 dark:hover:bg-green-500 text-white px-4 sm:px-6 py-2 rounded-lg transition-all duration-200 text-sm sm:text-base flex items-center">
<svg class="h-4 w-4 mr-2" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
Mit Status-Check
</button>
</div>
<p class="text-xs text-slate-500 dark:text-slate-400 mt-3">
Tipp: "Mit Status-Check" dauert länger, überprüft aber die Verfügbarkeit aller Drucker
</p>
</div>
`;
}
// Show status message (success, info, warning, error)
function showStatusMessage(message, type = 'info') {
// Vorzeitig beenden, wenn der Ladevorgang noch nicht abgeschlossen ist
if (document.readyState !== 'complete') {
console.log('Status-Nachricht unterdrückt während des Ladens:', message);
return;
}
// Entferne vorherige Status-Nachrichten
const existingMessage = document.getElementById('status-message');
if (existingMessage) {
existingMessage.remove();
}
// Bestimme Farben basierend auf Typ
let bgColor, textColor, iconPath;
switch (type) {
case 'success':
bgColor = 'bg-green-100 dark:bg-green-900/30';
textColor = 'text-green-800 dark:text-green-200';
iconPath = 'M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z';
break;
case 'warning':
bgColor = 'bg-yellow-100 dark:bg-yellow-900/30';
textColor = 'text-yellow-800 dark:text-yellow-200';
iconPath = 'M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-2.5L13.732 4c-.77-.833-1.964-.833-2.732 0L3.732 16.5c-.77.833.192 2.5 1.732 2.5z';
break;
case 'error':
bgColor = 'bg-red-100 dark:bg-red-900/30';
textColor = 'text-red-800 dark:text-red-200';
iconPath = 'M12 8v4m0 4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z';
break;
default: // info
bgColor = 'bg-blue-100 dark:bg-blue-900/30';
textColor = 'text-blue-800 dark:text-blue-200';
iconPath = 'M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z';
}
// Erstelle Status-Nachricht
const statusMessage = document.createElement('div');
statusMessage.id = 'status-message';
statusMessage.className = `fixed top-4 right-4 z-50 ${bgColor} ${textColor} px-4 py-3 rounded-lg shadow-lg flex items-center space-x-3 max-w-md`;
statusMessage.innerHTML = `
<svg class="h-5 w-5 flex-shrink-0" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="${iconPath}" />
</svg>
<span class="text-sm font-medium">${message}</span>
<button onclick="this.parentElement.remove()" class="ml-2 text-current hover:opacity-75">
<svg class="h-4 w-4" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M6 18L18 6M6 6l12 12"></path>
</svg>
</button>
`;
// Füge zur Seite hinzu
document.body.appendChild(statusMessage);
// Automatisch nach 5 Sekunden entfernen
setTimeout(() => {
if (statusMessage && statusMessage.parentElement) {
statusMessage.remove();
}
}, 5000);
}
// Load printers (schnelles Laden ohne Status-Check)
async function loadPrinters() {
const grid = document.getElementById('printers-grid');
if (!grid) return;
// Loading-State anzeigen
grid.innerHTML = `
<div class="col-span-full text-center py-6 sm:py-12">
<div class="animate-spin rounded-full h-8 w-8 sm:h-12 sm:w-12 border-b-2 border-indigo-600 dark:border-indigo-400 mx-auto"></div>
<p class="mt-3 sm:mt-4 text-sm sm:text-base text-slate-600 dark:text-slate-400">Lade Drucker...</p>
<p class="mt-1 text-xs text-slate-500 dark:text-slate-500">Dies sollte nur wenige Sekunden dauern</p>
</div>
`;
try {
// Erstelle einen AbortController für Timeout
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 10000); // 10 Sekunden Timeout
const response = await fetch('/api/printers', {
signal: controller.signal,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
});
clearTimeout(timeoutId);
if (!response.ok) {
if (response.status === 408) {
throw new Error('Timeout beim Laden der Drucker. Bitte versuchen Sie es erneut.');
}
throw new Error(`Server-Fehler: ${response.status} ${response.statusText}`);
}
const data = await response.json();
// Prüfe auf Server-seitige Fehler
if (data.error) {
throw new Error(data.error);
}
// Verwende die korrekten Daten aus der neuen API-Antwort
printers = data.printers || [];
console.log(`Successfully loaded ${printers.length} printers (ohne Status-Check)`);
renderPrinters();
// Zeige Erfolgsmeldung nur wenn Drucker vorhanden sind
if (printers.length > 0) {
showStatusMessage(`${printers.length} Drucker erfolgreich geladen (ohne aktuellen Status)`, 'success');
}
return printers;
} catch (error) {
console.error('Error loading printers:', error);
// Spezielle Behandlung für verschiedene Fehlertypen
let errorMessage = 'Fehler beim Laden der Drucker';
if (error.name === 'AbortError') {
errorMessage = 'Timeout beim Laden der Drucker. Die Anfrage dauerte zu lange.';
} else if (error.message) {
errorMessage = error.message;
}
showError(errorMessage);
return [];
}
}
// Status-Übersicht aktualisieren
function updateStatusOverview(onlineCount, offlineCount, totalCount) {
const onlineElement = document.getElementById('online-count');
const offlineElement = document.getElementById('offline-count');
const totalElement = document.getElementById('total-count');
if (onlineElement) onlineElement.textContent = onlineCount || 0;
if (offlineElement) offlineElement.textContent = offlineCount || 0;
if (totalElement) totalElement.textContent = totalCount || 0;
// Animiere die Online-Anzeige bei Änderungen
if (onlineElement && onlineElement.dataset.lastValue !== String(onlineCount)) {
onlineElement.classList.add('animate-pulse');
setTimeout(() => onlineElement.classList.remove('animate-pulse'), 1000);
onlineElement.dataset.lastValue = String(onlineCount);
}
}
// Erweiterte Funktion zum Laden der Drucker mit Live-Status
async function loadPrintersWithLiveStatus() {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 20000); // 20 Sekunden Timeout für Live-Status
const response = await fetch('/api/printers/status/live', {
signal: controller.signal,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
});
clearTimeout(timeoutId);
if (!response.ok) {
if (response.status === 408) {
throw new Error('Timeout beim Live-Status-Check der Drucker.');
}
throw new Error(`Fehler beim Laden des Live-Status: ${response.status} ${response.statusText}`);
}
const data = await response.json();
if (data.error) {
throw new Error(data.error);
}
// Drucker-Daten aktualisieren
printers = data.printers || [];
// Status-Übersicht aktualisieren
updateStatusOverview(data.online_count, data.offline_count, data.count);
// Drucker rendern
renderPrinters();
// Auto-Update-Timer aktualisieren
if (data.next_update) {
nextUpdateTime = data.next_update;
updateNextUpdateDisplay();
}
// Erfolgs-Nachricht nur bei manueller Aktualisierung
if (!autoRefreshEnabled) {
showStatusMessage(
`Live-Status aktualisiert: ${data.online_count} von ${data.count} Drucker online`,
data.online_count > 0 ? 'success' : 'warning'
);
}
return true; // Erfolg signalisieren
} catch (error) {
console.error('Error loading live printer status:', error);
if (!autoRefreshEnabled) {
showStatusMessage('Fehler beim Live-Status-Check: ' + error.message, 'error');
}
// Fallback: Lade normale Drucker-Liste ohne Status-Check
try {
await loadPrinters();
return true; // Fallback war erfolgreich
} catch (fallbackError) {
console.error('Fallback error:', fallbackError);
showError('Drucker konnten nicht geladen werden. Bitte versuchen Sie es später erneut.');
return false; // Auch Fallback fehlgeschlagen
}
}
}
// Auto-Refresh-Funktionalität
function toggleAutoRefresh() {
autoRefreshEnabled = !autoRefreshEnabled;
const btn = document.getElementById('auto-refresh-btn');
const icon = document.getElementById('auto-refresh-icon');
if (autoRefreshEnabled) {
if (btn) {
btn.classList.remove('bg-blue-600', 'hover:bg-blue-700');
btn.classList.add('bg-green-600', 'hover:bg-green-700');
btn.innerHTML = `
<svg class="h-4 w-4 sm:h-5 sm:w-5 mr-1 sm:mr-2 animate-spin" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
Auto-Update AN
`;
}
// Starte Auto-Refresh
startAutoRefresh();
showStatusMessage('Auto-Update aktiviert - Drucker werden alle 30 Sekunden aktualisiert', 'info');
} else {
if (btn) {
btn.classList.remove('bg-green-600', 'hover:bg-green-700');
btn.classList.add('bg-blue-600', 'hover:bg-blue-700');
btn.innerHTML = `
<svg class="h-4 w-4 sm:h-5 sm:w-5 mr-1 sm:mr-2" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 8v4l3 3m6-3a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
Auto-Update
`;
}
// Stoppe Auto-Refresh
stopAutoRefresh();
showStatusMessage('Auto-Update deaktiviert', 'info');
}
}
function startAutoRefresh() {
stopAutoRefresh(); // Stoppe vorherige Intervalle
nextUpdateTime = 30;
updateNextUpdateDisplay();
// Countdown-Timer
nextUpdateCountdown = setInterval(() => {
nextUpdateTime--;
updateNextUpdateDisplay();
if (nextUpdateTime <= 0) {
loadPrintersWithLiveStatus();
nextUpdateTime = 30;
}
}, 1000);
// Auto-Refresh-Interval
autoRefreshInterval = setInterval(() => {
loadPrintersWithLiveStatus();
}, 30000); // Alle 30 Sekunden
}
function stopAutoRefresh() {
if (autoRefreshInterval) {
clearInterval(autoRefreshInterval);
autoRefreshInterval = null;
}
if (nextUpdateCountdown) {
clearInterval(nextUpdateCountdown);
nextUpdateCountdown = null;
}
const element = document.getElementById('next-update-time');
if (element) element.textContent = '-';
}
function updateNextUpdateDisplay() {
const element = document.getElementById('next-update-time');
if (!element) return;
if (autoRefreshEnabled && nextUpdateTime > 0) {
element.textContent = nextUpdateTime;
element.parentElement.style.opacity = '1';
} else {
element.textContent = '-';
element.parentElement.style.opacity = '0.5';
}
}
// Filter-Funktionalität
function setupFilters() {
const filterButtons = document.querySelectorAll('.filter-btn-new');
filterButtons.forEach(btn => {
btn.addEventListener('click', function() {
// Entferne active-Klasse von allen Buttons
filterButtons.forEach(b => {
b.classList.remove('active');
});
// Füge active-Klasse zum geklickten Button hinzu
this.classList.add('active');
// Setze aktuellen Filter
currentFilter = this.id.replace('filter-', '');
// Rendere Drucker mit Filter
renderPrinters();
});
});
}
// Delete printer
async function deletePrinter(printerId) {
if (!confirm('Sind Sie sicher, dass Sie diesen Drucker löschen möchten?')) {
return;
}
try {
const response = await fetch(`/api/printers/${printerId}`, {
method: 'DELETE',
headers: {
'Content-Type': 'application/json'
}
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(errorData.error || 'Fehler beim Löschen des Druckers');
}
const result = await response.json();
hidePrinterDetailModal();
showStatusMessage(result.message || 'Drucker erfolgreich gelöscht', 'success');
loadPrinters();
} catch (error) {
console.error('Error deleting printer:', error);
showStatusMessage('Fehler beim Löschen des Druckers: ' + error.message, 'error');
}
}
// Add printer
async function handleAddPrinter(event) {
event.preventDefault();
const form = document.getElementById('addPrinterForm');
const submitBtn = form.querySelector('button[type="submit"]');
const formData = new FormData(form);
// Deaktiviere Submit Button während der Verarbeitung
const originalBtnText = submitBtn.innerHTML;
submitBtn.disabled = true;
submitBtn.innerHTML = `
<svg class="animate-spin h-4 w-4 mr-2" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<circle cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4" class="opacity-25"></circle>
<path fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z" class="opacity-75"></path>
</svg>
Wird hinzugefügt...
`;
// Sammle und validiere Formulardaten
const printerData = {
name: formData.get('name')?.trim(),
model: formData.get('model')?.trim(),
location: formData.get('location')?.trim(),
mac_address: formData.get('mac_address')?.trim().toUpperCase(),
plug_ip: formData.get('plug_ip')?.trim(),
plug_username: formData.get('plug_username')?.trim(),
plug_password: formData.get('plug_password') // Passwort nicht trimmen
};
// Client-seitige Validierung
const errors = [];
if (!printerData.name) errors.push('Name ist erforderlich');
if (!printerData.model) errors.push('Modell ist erforderlich');
if (!printerData.location) errors.push('Standort ist erforderlich');
if (!printerData.mac_address) errors.push('MAC-Adresse ist erforderlich');
if (!printerData.plug_ip) errors.push('Plug IP ist erforderlich');
if (!printerData.plug_username) errors.push('Plug Benutzername ist erforderlich');
if (!printerData.plug_password) errors.push('Plug Passwort ist erforderlich');
// MAC-Adresse Format validieren
if (printerData.mac_address && !/^([0-9A-F]{2}:){5}[0-9A-F]{2}$/.test(printerData.mac_address)) {
// Versuche automatische Formatierung
const cleanMac = printerData.mac_address.replace(/[^0-9A-F]/g, '');
if (cleanMac.length === 12) {
printerData.mac_address = cleanMac.match(/.{2}/g).join(':');
} else {
errors.push('MAC-Adresse muss im Format AA:BB:CC:DD:EE:FF sein');
}
}
// IP-Adresse Format validieren
if (printerData.plug_ip && !/^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/.test(printerData.plug_ip)) {
errors.push('IP-Adresse muss im Format 192.168.1.100 sein');
}
if (errors.length > 0) {
showStatusMessage(`Bitte korrigieren Sie folgende Fehler:\n${errors.join('\n')}`, 'error');
// Button wieder aktivieren
submitBtn.disabled = false;
submitBtn.innerHTML = originalBtnText;
return;
}
try {
// Erstelle AbortController für Timeout
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 15000); // 15 Sekunden Timeout
const response = await fetch('/api/printers/add', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json'
},
body: JSON.stringify(printerData),
signal: controller.signal
});
clearTimeout(timeoutId);
const result = await response.json();
if (!response.ok) {
throw new Error(result.error || `Server-Fehler: ${response.status}`);
}
// Erfolg!
hideAddPrinterModal();
showStatusMessage(result.message || 'Drucker erfolgreich hinzugefügt', 'success');
// Drucker-Liste neu laden
await loadPrinters();
console.log('Printer added successfully:', result.printer);
} catch (error) {
console.error('Error adding printer:', error);
let errorMessage = 'Fehler beim Hinzufügen des Druckers';
if (error.name === 'AbortError') {
errorMessage = 'Timeout beim Hinzufügen des Druckers. Bitte versuchen Sie es erneut.';
} else if (error.message) {
errorMessage = error.message;
}
showStatusMessage(errorMessage, 'error');
} finally {
// Button wieder aktivieren
submitBtn.disabled = false;
submitBtn.innerHTML = originalBtnText;
}
}
// Make all functions globally available for onclick handlers
window.showPrinterDetail = showPrinterDetail;
window.hidePrinterDetailModal = hidePrinterDetailModal;
window.deletePrinter = deletePrinter;
window.loadPrinters = loadPrinters;
window.handleAddPrinter = handleAddPrinter;
window.toggleAutoRefresh = toggleAutoRefresh;
window.loadPrintersWithLiveStatus = loadPrintersWithLiveStatus;
window.refreshPrinters = refreshPrinters;
window.hideAddPrinterModal = hideAddPrinterModal;
window.showAddPrinterModal = showAddPrinterModal;
</script>
{% endblock %}

View File

@ -0,0 +1,392 @@
"""
Debug-Utilities für die MYP-Anwendung
Hilft bei der Diagnose und Behebung von Problemen in der Anwendung
"""
import os
import sys
import time
import json
import traceback
import inspect
from datetime import datetime
from functools import wraps
import logging
from typing import Any, Dict, List, Optional, Tuple, Union, Callable
from utils.logging_config import get_logger
# Logger für dieses Modul erstellen
debug_logger = get_logger("app")
# Konstanten für Formatierung
DEBUG_SEPARATOR = "=" * 60
DEBUG_SUBSEPARATOR = "-" * 60
class DebugLevel:
"""Enum für Debug-Level"""
MINIMAL = 0 # Nur kritische Fehler
NORMAL = 1 # Standardfehler und wichtige Informationen
VERBOSE = 2 # Ausführliche Informationen
TRACE = 3 # Vollständige Trace-Informationen
# Aktuelles Debug-Level (kann zur Laufzeit geändert werden)
CURRENT_DEBUG_LEVEL = DebugLevel.NORMAL
def set_debug_level(level: int):
"""Setzt das aktuelle Debug-Level für die Anwendung"""
global CURRENT_DEBUG_LEVEL
CURRENT_DEBUG_LEVEL = level
debug_logger.info(f"🔧 Debug-Level gesetzt auf: {level}")
def debug_print(message: str, level: int = DebugLevel.NORMAL):
"""
Gibt eine Debug-Nachricht aus, wenn das aktuelle Debug-Level mindestens dem angegebenen entspricht.
Args:
message: Die auszugebende Nachricht
level: Das erforderliche Debug-Level
"""
if level <= CURRENT_DEBUG_LEVEL:
# Aktuelle Funktion und Zeilennummer ermitteln
frame = inspect.currentframe().f_back
func_name = frame.f_code.co_name
file_name = os.path.basename(frame.f_code.co_filename)
line_no = frame.f_lineno
# Debug-Ausgabe formatieren
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
debug_prefix = f"[DEBUG {timestamp} {file_name}:{func_name}:{line_no}]"
# Verschiedene Levels mit unterschiedlichen Emojis markieren
level_emoji = "🐞" if level >= DebugLevel.VERBOSE else "🔍"
# Ausgabe
print(f"{level_emoji} {debug_prefix} {message}")
def debug_dump(obj: Any, name: str = "Object", level: int = DebugLevel.VERBOSE):
"""
Gibt den Inhalt eines Objekts für Debug-Zwecke aus.
Args:
obj: Das zu untersuchende Objekt
name: Name des Objekts für die Ausgabe
level: Das erforderliche Debug-Level
"""
if level > CURRENT_DEBUG_LEVEL:
return
debug_print(f"📦 Debug-Dump von {name}:", level)
try:
# Für dict-ähnliche Objekte
if hasattr(obj, 'items'):
for k, v in obj.items():
debug_print(f" {k}: {v}", level)
# Für list/tuple-ähnliche Objekte
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
for i, item in enumerate(obj):
debug_print(f" [{i}]: {item}", level)
# Für einfache Objekte
else:
# Versuche als JSON zu formatieren
try:
json_str = json.dumps(obj, indent=2, default=str)
debug_print(f" {json_str}", level)
except:
# Fallback auf einfache String-Darstellung
debug_print(f" {obj}", level)
except Exception as e:
debug_print(f" Fehler beim Dump: {e}", level)
def debug_trace(message: str = "Execution trace"):
"""
Gibt einen vollständigen Stack-Trace für Debug-Zwecke aus.
Args:
message: Begleitende Nachricht für den Trace
"""
if CURRENT_DEBUG_LEVEL < DebugLevel.TRACE:
return
debug_print(f"🔬 TRACE: {message}", DebugLevel.TRACE)
debug_print(DEBUG_SUBSEPARATOR, DebugLevel.TRACE)
# Stack-Trace sammeln
stack = traceback.extract_stack()
# Letzten Frame (diese Funktion) entfernen
stack = stack[:-1]
for frame in stack:
file_name = os.path.basename(frame.filename)
debug_print(f" {file_name}:{frame.lineno} - {frame.name}", DebugLevel.TRACE)
debug_print(DEBUG_SUBSEPARATOR, DebugLevel.TRACE)
def debug_function(func=None, level: int = DebugLevel.NORMAL):
"""
Dekorator, der Eingang und Ausgang einer Funktion sowie die Ausführungszeit loggt.
Args:
func: Die zu dekorierende Funktion
level: Das erforderliche Debug-Level
Returns:
Dekorierte Funktion
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if CURRENT_DEBUG_LEVEL < level:
return fn(*args, **kwargs)
# Funktionsaufruf loggen
arg_str = ", ".join([
*[str(arg) for arg in args],
*[f"{k}={v}" for k, v in kwargs.items()]
])
if len(arg_str) > 100:
arg_str = arg_str[:97] + "..."
debug_print(f"▶️ Starte {fn.__name__}({arg_str})", level)
# Ausführungszeit messen
start_time = time.time()
try:
# Funktion ausführen
result = fn(*args, **kwargs)
# Ausführungszeit und Ergebnis loggen
end_time = time.time()
duration = (end_time - start_time) * 1000
result_str = str(result)
if len(result_str) > 100:
result_str = result_str[:97] + "..."
duration_emoji = "⏱️" if duration < 1000 else ""
debug_print(f"{duration_emoji} {fn.__name__} beendet in {duration:.2f} ms", level)
debug_print(f"📤 Ergebnis: {result_str}", level)
return result
except Exception as e:
# Fehler loggen
end_time = time.time()
duration = (end_time - start_time) * 1000
debug_print(f"{fn.__name__} fehlgeschlagen nach {duration:.2f} ms: {str(e)}", level)
# Stack-Trace nur bei hohem Debug-Level
if CURRENT_DEBUG_LEVEL >= DebugLevel.VERBOSE:
debug_print(f"🔬 Stack-Trace für {fn.__name__}:", DebugLevel.VERBOSE)
traceback_str = traceback.format_exc()
for line in traceback_str.split('\n'):
debug_print(f" {line}", DebugLevel.VERBOSE)
# Exception weiterleiten
raise
return wrapper
if func:
return decorator(func)
return decorator
def debug_timer(name: str = None, level: int = DebugLevel.NORMAL):
"""
Kontext-Manager, der die Ausführungszeit eines Code-Blocks misst.
Args:
name: Name des Code-Blocks für die Ausgabe
level: Das erforderliche Debug-Level
Beispiel:
with debug_timer("Datenbankabfrage"):
result = db.execute_query()
"""
class Timer:
def __init__(self, block_name, debug_level):
self.block_name = block_name
self.debug_level = debug_level
self.start_time = None
def __enter__(self):
if CURRENT_DEBUG_LEVEL >= self.debug_level:
self.start_time = time.time()
block_name = self.block_name or "Code-Block"
debug_print(f"⏱️ Starte Timer für: {block_name}", self.debug_level)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if CURRENT_DEBUG_LEVEL >= self.debug_level and self.start_time:
end_time = time.time()
duration = (end_time - self.start_time) * 1000
block_name = self.block_name or "Code-Block"
if exc_type:
debug_print(f"{block_name} fehlgeschlagen nach {duration:.2f} ms: {exc_val}", self.debug_level)
else:
duration_emoji = "⏱️" if duration < 1000 else ""
debug_print(f"{duration_emoji} {block_name} beendet in {duration:.2f} ms", self.debug_level)
return Timer(name, level)
def debug_exception_handler(logger: Optional[logging.Logger] = None):
"""
Dekorator, der Ausnahmen abfängt und Details loggt.
Args:
logger: Logger-Instanz für die Protokollierung (optional)
Returns:
Dekorierte Funktion
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# Logger verwenden oder Fallback auf Standardausgabe
log = logger or debug_logger
# Ausnahmedetails loggen
log.error(f"❌ Ausnahme in {func.__name__}: {str(e)}")
# Stack-Trace bei hohem Debug-Level
if CURRENT_DEBUG_LEVEL >= DebugLevel.VERBOSE:
log.error("🔬 Stack-Trace:")
traceback_str = traceback.format_exc()
for line in traceback_str.split('\n'):
if line.strip():
log.error(f" {line}")
# Ausnahme weiterleiten
raise
return wrapper
return decorator
# Konsolen-Befehle für interaktives Debugging
def dump_all_loggers():
"""Gibt Informationen über alle konfigurierten Logger aus."""
import logging
debug_print("📋 Konfigurierte Logger:", DebugLevel.VERBOSE)
for name, logger in logging.Logger.manager.loggerDict.items():
if isinstance(logger, logging.Logger):
level_name = logging.getLevelName(logger.level)
handlers = len(logger.handlers)
debug_print(f" {name}: Level={level_name}, Handlers={handlers}", DebugLevel.VERBOSE)
def dump_environment():
"""Gibt Umgebungsvariablen und Systeminformationen aus."""
debug_print("🌐 Umgebungsinformationen:", DebugLevel.VERBOSE)
debug_print(f" Python: {sys.version}", DebugLevel.VERBOSE)
debug_print(f" Plattform: {sys.platform}", DebugLevel.VERBOSE)
debug_print(f" Arbeitsverzeichnis: {os.getcwd()}", DebugLevel.VERBOSE)
debug_print("🔑 Umgebungsvariablen:", DebugLevel.VERBOSE)
for key, value in sorted(os.environ.items()):
# Passwörter und Secrets ausblenden
if any(secret_key in key.lower() for secret_key in ['key', 'pass', 'secret', 'token', 'pwd']):
value = "********"
debug_print(f" {key}={value}", DebugLevel.VERBOSE)
def memory_usage(obj: Any = None) -> Dict[str, Any]:
"""
Gibt Informationen über den Speicherverbrauch zurück.
Args:
obj: Optional ein Objekt, dessen Größe gemessen werden soll
Returns:
Dict mit Speicherverbrauchsinformationen
"""
import psutil
import sys
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
result = {
"rss": memory_info.rss / (1024 * 1024), # MB
"vms": memory_info.vms / (1024 * 1024), # MB
"percent": process.memory_percent(),
}
if obj is not None:
try:
import sys
result["object_size"] = sys.getsizeof(obj) / 1024 # KB
except:
result["object_size"] = "Nicht messbar"
return result
def log_memory_usage(obj_name: str = "Anwendung", obj: Any = None, logger: Optional[logging.Logger] = None):
"""
Loggt den aktuellen Speicherverbrauch.
Args:
obj_name: Name des Objekts oder der Anwendung
obj: Optional ein Objekt, dessen Größe gemessen werden soll
logger: Logger-Instanz für die Protokollierung (optional)
"""
log = logger or debug_logger
memory = memory_usage(obj)
log.info(f"📊 Speicherverbrauch von {obj_name}:")
log.info(f" RSS: {memory['rss']:.2f} MB")
log.info(f" VMS: {memory['vms']:.2f} MB")
log.info(f" Prozent: {memory['percent']:.2f}%")
if 'object_size' in memory:
if isinstance(memory['object_size'], (int, float)):
log.info(f" Objektgröße: {memory['object_size']:.2f} KB")
else:
log.info(f" Objektgröße: {memory['object_size']}")
def profile_function(func):
"""
Dekorator, der eine Funktion profiliert und Statistiken ausgibt.
Args:
func: Die zu profilierende Funktion
Returns:
Dekorierte Funktion
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
import cProfile
import pstats
import io
# Profiler erstellen und Funktion ausführen
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# Statistiken sammeln
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(20) # Top 20 Zeilen
# Statistiken ausgeben
debug_print(f"📊 Profiling-Ergebnis für {func.__name__}:", DebugLevel.VERBOSE)
for line in s.getvalue().split('\n'):
if line.strip():
debug_print(f" {line}", DebugLevel.VERBOSE)
return result
except ImportError:
debug_print(f"⚠️ cProfile nicht verfügbar, Funktion wird ohne Profiling ausgeführt", DebugLevel.NORMAL)
return func(*args, **kwargs)
return wrapper