🎉 Refactor backend files & add documentation 📚, remove legacy installer scripts. #123

This commit is contained in:
2025-06-01 00:36:48 +02:00
parent 9e1719df4d
commit 7f38f8a7e5
10 changed files with 2604 additions and 172 deletions

View File

@@ -1595,7 +1595,44 @@ def admin():
flash("Nur Administratoren haben Zugriff auf diesen Bereich.", "error")
return redirect(url_for("index"))
return render_template("admin.html")
# Daten für das Template sammeln (gleiche Logik wie admin-dashboard)
db_session = get_db_session()
try:
# Statistiken sammeln
stats = {
'total_users': db_session.query(User).count(),
'total_printers': db_session.query(Printer).count(),
'online_printers': db_session.query(Printer).filter(Printer.status == 'online').count(),
'active_jobs': db_session.query(Job).filter(Job.status.in_(['running', 'queued'])).count(),
'queued_jobs': db_session.query(Job).filter(Job.status == 'queued').count(),
'success_rate': 85 # Placeholder - könnte aus echten Daten berechnet werden
}
# Tab-Parameter
active_tab = request.args.get('tab', 'users')
# Benutzer laden (für users tab)
users = []
if active_tab == 'users':
users = db_session.query(User).all()
# Drucker laden (für printers tab)
printers = []
if active_tab == 'printers':
printers = db_session.query(Printer).all()
db_session.close()
return render_template("admin.html",
stats=stats,
active_tab=active_tab,
users=users,
printers=printers)
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Daten: {str(e)}")
db_session.close()
flash("Fehler beim Laden des Admin-Bereichs.", "error")
return redirect(url_for("index"))
@app.route("/socket-test")
@login_required
@@ -1643,7 +1680,68 @@ def admin_page():
"""Erweiterte Admin-Dashboard-Seite mit Live-Funktionen"""
if not current_user.is_admin:
return redirect(url_for("index"))
return render_template("admin_dashboard.html", title="Admin Dashboard")
# Daten für das Template sammeln
db_session = get_db_session()
try:
# Statistiken sammeln
stats = {
'total_users': db_session.query(User).count(),
'total_printers': db_session.query(Printer).count(),
'online_printers': db_session.query(Printer).filter(Printer.status == 'online').count(),
'active_jobs': db_session.query(Job).filter(Job.status.in_(['running', 'queued'])).count(),
'queued_jobs': db_session.query(Job).filter(Job.status == 'queued').count(),
'success_rate': 85 # Placeholder - könnte aus echten Daten berechnet werden
}
# Tab-Parameter
active_tab = request.args.get('tab', 'users')
# Benutzer laden (für users tab)
users = []
if active_tab == 'users':
users = db_session.query(User).all()
# Drucker laden (für printers tab)
printers = []
if active_tab == 'printers':
printers = db_session.query(Printer).all()
db_session.close()
return render_template("admin.html",
title="Admin Dashboard",
stats=stats,
active_tab=active_tab,
users=users,
printers=printers)
except Exception as e:
app_logger.error(f"Fehler beim Laden der Admin-Dashboard-Daten: {str(e)}")
db_session.close()
flash("Fehler beim Laden des Admin-Dashboards.", "error")
return redirect(url_for("index"))
# ===== RECHTLICHE SEITEN =====
@app.route("/privacy")
def privacy():
"""Datenschutzerklärung-Seite"""
return render_template("privacy.html", title="Datenschutzerklärung")
@app.route("/terms")
def terms():
"""Nutzungsbedingungen-Seite"""
return render_template("terms.html", title="Nutzungsbedingungen")
@app.route("/imprint")
def imprint():
"""Impressum-Seite"""
return render_template("imprint.html", title="Impressum")
@app.route("/legal")
def legal():
"""Rechtliche Hinweise-Übersichtsseite"""
return render_template("legal.html", title="Rechtliche Hinweise")
# ===== NEUE SYSTEM UI-ROUTEN =====
@@ -5060,190 +5158,282 @@ def setup_database_with_migrations():
app_logger.error(f"❌ Fehler bei Datenbank-Setup: {str(e)}")
raise e
# ===== PRIVACY UND TERMS ROUTEN =====
# ===== LOG-MANAGEMENT API =====
@app.route("/privacy")
def privacy_policy():
"""Datenschutzerklärung anzeigen"""
try:
return render_template("privacy_policy.html", title="Datenschutzerklärung")
except Exception as e:
app_logger.error(f"Fehler beim Laden der Datenschutzerklärung: {str(e)}")
flash("Fehler beim Laden der Datenschutzerklärung", "error")
return redirect(url_for("index"))
@app.route("/terms")
def terms_of_service():
"""Nutzungsbedingungen anzeigen"""
try:
return render_template("terms_of_service.html", title="Nutzungsbedingungen")
except Exception as e:
app_logger.error(f"Fehler beim Laden der Nutzungsbedingungen: {str(e)}")
flash("Fehler beim Laden der Nutzungsbedingungen", "error")
return redirect(url_for("index"))
@app.route("/legal")
def legal_notice():
"""Impressum anzeigen"""
try:
return render_template("legal_notice.html", title="Impressum")
except Exception as e:
app_logger.error(f"Fehler beim Laden des Impressums: {str(e)}")
flash("Fehler beim Laden des Impressums", "error")
return redirect(url_for("index"))
@app.route("/api/privacy/accept", methods=["POST"])
@app.route("/api/logs", methods=['GET'])
@login_required
def accept_privacy_policy():
"""API-Endpunkt für Akzeptierung der Datenschutzerklärung"""
db_session = get_db_session()
@admin_required
def api_logs():
"""
API-Endpunkt für Log-Daten-Abruf
Query Parameter:
level: Log-Level Filter (DEBUG, INFO, WARNING, ERROR, CRITICAL)
limit: Anzahl der Einträge (Standard: 100, Max: 1000)
offset: Offset für Paginierung (Standard: 0)
search: Suchbegriff für Log-Nachrichten
start_date: Start-Datum (ISO-Format)
end_date: End-Datum (ISO-Format)
"""
try:
data = request.get_json() or {}
version = data.get("version", "1.0")
# Parameter aus Query-String extrahieren
level = request.args.get('level', '').upper()
limit = min(int(request.args.get('limit', 100)), 1000)
offset = int(request.args.get('offset', 0))
search = request.args.get('search', '').strip()
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
# Benutzer aus der Datenbank laden
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
# Log-Dateien aus dem logs-Verzeichnis lesen
import os
import glob
from datetime import datetime, timedelta
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
logs_dir = os.path.join(os.path.dirname(__file__), 'logs')
log_entries = []
# Privacy-Akzeptierung in Benutzer-Einstellungen speichern
if hasattr(user, 'settings'):
import json
settings = json.loads(user.settings) if user.settings else {}
else:
settings = session.get('user_settings', {})
if os.path.exists(logs_dir):
# Alle .log Dateien finden
log_files = glob.glob(os.path.join(logs_dir, '*.log'))
log_files.sort(key=os.path.getmtime, reverse=True) # Neueste zuerst
# Datum-Filter vorbereiten
start_dt = None
end_dt = None
if start_date:
try:
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except:
pass
if end_date:
try:
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except:
pass
# Log-Dateien durchgehen (maximal die letzten 5 Dateien)
for log_file in log_files[:5]:
try:
with open(log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Zeilen rückwärts durchgehen (neueste zuerst)
for line in reversed(lines):
line = line.strip()
if not line:
continue
# Log-Zeile parsen
try:
# Format: 2025-06-01 00:34:08 - logger_name - [LEVEL] MESSAGE
parts = line.split(' - ', 3)
if len(parts) >= 4:
timestamp_str = parts[0]
logger_name = parts[1]
level_part = parts[2]
message = parts[3]
# Level extrahieren
if level_part.startswith('[') and ']' in level_part:
log_level = level_part.split(']')[0][1:]
else:
log_level = 'INFO'
# Timestamp parsen
try:
log_timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
except:
continue
# Filter anwenden
if level and log_level != level:
continue
if start_dt and log_timestamp < start_dt:
continue
if end_dt and log_timestamp > end_dt:
continue
if search and search.lower() not in message.lower():
continue
log_entries.append({
'timestamp': log_timestamp.isoformat(),
'level': log_level,
'logger': logger_name,
'message': message,
'file': os.path.basename(log_file)
})
except Exception as parse_error:
# Fehlerhafte Zeile überspringen
continue
except Exception as file_error:
app_logger.error(f"Fehler beim Lesen der Log-Datei {log_file}: {str(file_error)}")
continue
# Privacy-Akzeptierung hinzufügen
if 'privacy_acceptance' not in settings:
settings['privacy_acceptance'] = {}
# Sortieren nach Timestamp (neueste zuerst)
log_entries.sort(key=lambda x: x['timestamp'], reverse=True)
settings['privacy_acceptance'] = {
'accepted': True,
'version': version,
'timestamp': datetime.now().isoformat(),
'ip_address': request.remote_addr
}
# Einstellungen speichern
if hasattr(user, 'settings'):
user.settings = json.dumps(settings)
user.updated_at = datetime.now()
db_session.commit()
else:
session['user_settings'] = settings
user_logger.info(f"Benutzer {current_user.username} hat Datenschutzerklärung v{version} akzeptiert")
# Paginierung anwenden
total_count = len(log_entries)
paginated_entries = log_entries[offset:offset + limit]
return jsonify({
"success": True,
"message": "Datenschutzerklärung erfolgreich akzeptiert",
"version": version,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
db_session.rollback()
app_logger.error(f"Fehler bei Privacy-Akzeptierung: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
finally:
db_session.close()
@app.route("/api/terms/accept", methods=["POST"])
@login_required
def accept_terms_of_service():
"""API-Endpunkt für Akzeptierung der Nutzungsbedingungen"""
db_session = get_db_session()
try:
data = request.get_json() or {}
version = data.get("version", "1.0")
# Benutzer aus der Datenbank laden
user = db_session.query(User).filter(User.id == int(current_user.id)).first()
if not user:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Terms-Akzeptierung in Benutzer-Einstellungen speichern
if hasattr(user, 'settings'):
import json
settings = json.loads(user.settings) if user.settings else {}
else:
settings = session.get('user_settings', {})
# Terms-Akzeptierung hinzufügen
if 'terms_acceptance' not in settings:
settings['terms_acceptance'] = {}
settings['terms_acceptance'] = {
'accepted': True,
'version': version,
'timestamp': datetime.now().isoformat(),
'ip_address': request.remote_addr
}
# Einstellungen speichern
if hasattr(user, 'settings'):
user.settings = json.dumps(settings)
user.updated_at = datetime.now()
db_session.commit()
else:
session['user_settings'] = settings
user_logger.info(f"Benutzer {current_user.username} hat Nutzungsbedingungen v{version} akzeptiert")
return jsonify({
"success": True,
"message": "Nutzungsbedingungen erfolgreich akzeptiert",
"version": version,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
db_session.rollback()
app_logger.error(f"Fehler bei Terms-Akzeptierung: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
finally:
db_session.close()
@app.route("/api/legal/status", methods=["GET"])
@login_required
def get_legal_status():
"""API-Endpunkt für Abfrage des rechtlichen Status (Privacy/Terms Akzeptierung)"""
try:
# Benutzer-Einstellungen laden
if hasattr(current_user, 'settings') and current_user.settings:
import json
settings = json.loads(current_user.settings)
else:
settings = session.get('user_settings', {})
privacy_acceptance = settings.get('privacy_acceptance', {})
terms_acceptance = settings.get('terms_acceptance', {})
return jsonify({
"success": True,
"legal_status": {
"privacy_policy": {
"accepted": privacy_acceptance.get('accepted', False),
"version": privacy_acceptance.get('version'),
"timestamp": privacy_acceptance.get('timestamp')
},
"terms_of_service": {
"accepted": terms_acceptance.get('accepted', False),
"version": terms_acceptance.get('version'),
"timestamp": terms_acceptance.get('timestamp')
},
"compliance_required": not (
privacy_acceptance.get('accepted', False) and
terms_acceptance.get('accepted', False)
)
'success': True,
'logs': paginated_entries,
'pagination': {
'total': total_count,
'limit': limit,
'offset': offset,
'has_more': offset + limit < total_count
},
'filters': {
'level': level or None,
'search': search or None,
'start_date': start_date,
'end_date': end_date
}
})
except Exception as e:
app_logger.error(f"Fehler bei Legal-Status-Abfrage: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
app_logger.error(f"Fehler beim Abrufen der Log-Daten: {str(e)}")
return jsonify({
'error': f'Fehler beim Abrufen der Log-Daten: {str(e)}'
}), 500
# ===== LIVE ADMIN STATISTIKEN API =====
@app.route("/api/admin/stats/live", methods=['GET'])
@login_required
@admin_required
def api_admin_stats_live():
"""
API-Endpunkt für Live-Statistiken im Admin-Dashboard
Liefert aktuelle System-Statistiken für Echtzeit-Updates
"""
try:
db_session = get_db_session()
# Basis-Statistiken sammeln
stats = {
'timestamp': datetime.now().isoformat(),
'users': {
'total': db_session.query(User).count(),
'active_today': db_session.query(User).filter(
User.last_login >= datetime.now() - timedelta(days=1)
).count() if hasattr(User, 'last_login') else 0,
'new_this_week': db_session.query(User).filter(
User.created_at >= datetime.now() - timedelta(days=7)
).count() if hasattr(User, 'created_at') else 0
},
'printers': {
'total': db_session.query(Printer).count(),
'online': db_session.query(Printer).filter(Printer.status == 'online').count(),
'offline': db_session.query(Printer).filter(Printer.status == 'offline').count(),
'maintenance': db_session.query(Printer).filter(Printer.status == 'maintenance').count()
},
'jobs': {
'total': db_session.query(Job).count(),
'running': db_session.query(Job).filter(Job.status == 'running').count(),
'queued': db_session.query(Job).filter(Job.status == 'queued').count(),
'completed_today': db_session.query(Job).filter(
Job.status == 'completed',
Job.updated_at >= datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
).count() if hasattr(Job, 'updated_at') else 0,
'failed_today': db_session.query(Job).filter(
Job.status == 'failed',
Job.updated_at >= datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
).count() if hasattr(Job, 'updated_at') else 0
}
}
# System-Performance-Metriken
import psutil
import os
# CPU und Memory
stats['system'] = {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent if os.name != 'nt' else psutil.disk_usage('C:\\').percent,
'uptime_seconds': int((datetime.now() - datetime.fromtimestamp(psutil.boot_time())).total_seconds())
}
# Erfolgsrate berechnen (letzte 24 Stunden)
try:
completed_jobs = db_session.query(Job).filter(
Job.status == 'completed',
Job.updated_at >= datetime.now() - timedelta(days=1)
).count() if hasattr(Job, 'updated_at') else 0
failed_jobs = db_session.query(Job).filter(
Job.status == 'failed',
Job.updated_at >= datetime.now() - timedelta(days=1)
).count() if hasattr(Job, 'updated_at') else 0
total_finished = completed_jobs + failed_jobs
success_rate = (completed_jobs / total_finished * 100) if total_finished > 0 else 100
stats['performance'] = {
'success_rate': round(success_rate, 1),
'completed_24h': completed_jobs,
'failed_24h': failed_jobs,
'total_finished_24h': total_finished
}
except Exception as perf_error:
app_logger.warning(f"Fehler bei Performance-Berechnung: {str(perf_error)}")
stats['performance'] = {
'success_rate': 0,
'completed_24h': 0,
'failed_24h': 0,
'total_finished_24h': 0
}
# Queue-Status (falls Queue Manager läuft)
try:
from queue_manager import get_queue_status
queue_status = get_queue_status()
stats['queue'] = queue_status
except Exception as queue_error:
stats['queue'] = {
'status': 'unknown',
'pending_jobs': 0,
'active_workers': 0
}
# Letzte Aktivitäten (Top 5)
try:
recent_jobs = db_session.query(Job).order_by(Job.id.desc()).limit(5).all()
stats['recent_activity'] = [
{
'id': job.id,
'filename': job.filename,
'status': job.status,
'user': job.user.username if job.user else 'Unbekannt',
'created_at': job.created_at.isoformat() if hasattr(job, 'created_at') and job.created_at else None
}
for job in recent_jobs
]
except Exception as activity_error:
app_logger.warning(f"Fehler bei Recent Activity: {str(activity_error)}")
stats['recent_activity'] = []
db_session.close()
return jsonify({
'success': True,
'stats': stats
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Live-Statistiken: {str(e)}")
return jsonify({
'error': f'Fehler beim Abrufen der Live-Statistiken: {str(e)}'
}), 500
# ===== STARTUP UND MAIN =====