📝 "Refactor app logic using Conventional Commits format (feat)"

This commit is contained in:
Till Tomczak 2025-05-29 17:48:49 +02:00
parent 3218328dd7
commit e338c5835e

View File

@ -4659,35 +4659,33 @@ def get_dashboard_printers():
@app.route('/api/dashboard/activities', methods=['GET'])
@login_required
def get_dashboard_activities():
"""Liefert letzte Aktivitäten für Dashboard-Updates"""
"""Liefert die neuesten Aktivitäten für das Dashboard"""
try:
# Hier würden normalerweise echte Aktivitäten aus der Datenbank geladen
# Für Demo-Zwecke geben wir Beispieldaten zurück
activities_data = [
{
"description": "Druckauftrag #123 erfolgreich abgeschlossen",
"time": "vor 5 Minuten"
},
{
"description": "Drucker 'Prusa i3 MK3S' ist jetzt online",
"time": "vor 12 Minuten"
},
{
"description": "Neuer Benutzer registriert: max.mustermann",
"time": "vor 1 Stunde"
}
]
db_session = get_db_session()
# Neueste Jobs abrufen
activities = []
recent_jobs = db_session.query(Job).order_by(Job.created_at.desc()).limit(10).all()
for job in recent_jobs:
activities.append({
'description': f"Job '{job.name}' wurde {job.status}",
'time': job.created_at.strftime('%H:%M'),
'type': 'job',
'status': job.status
})
db_session.close()
return jsonify({
"success": True,
"activities": activities_data
'success': True,
'activities': activities
})
except Exception as e:
app_logger.error(f"Fehler beim Laden der Aktivitäten: {str(e)}")
app_logger.error(f"Fehler beim Abrufen der Dashboard-Aktivitäten: {str(e)}")
return jsonify({
"success": False,
"error": "Fehler beim Laden der Aktivitäten"
'success': False,
'error': 'Fehler beim Laden der Aktivitäten'
}), 500
@app.route('/admin/settings', methods=['GET'])
@ -4713,23 +4711,395 @@ def analytics_page():
flash("Fehler beim Laden der Analytics", "error")
return redirect(url_for('dashboard'))
"success": True,
"status": {
"cpu_usage": 15.2,
"memory_usage": 42.8,
"disk_usage": 67.3,
@admin_required
def get_admin_system_status():
"""Liefert detaillierte System-Status-Informationen"""
@app.route('/api/optimization/auto-optimize', methods=['POST'])
@login_required
def auto_optimize_jobs():
"""Automatische Optimierung der Druckaufträge durchführen"""
try:
import psutil
import os
from datetime import datetime, timedelta
"active_jobs": active_jobs,
"queued_jobs": queued_jobs,
"success_rate": success_rate
}
data = request.get_json()
settings = data.get('settings', {})
enabled = data.get('enabled', False)
db_session = get_db_session()
# Aktuelle Jobs in der Warteschlange abrufen
pending_jobs = db_session.query(Job).filter(
Job.status.in_(['queued', 'pending'])
).all()
if not pending_jobs:
db_session.close()
return jsonify({
'success': True,
'message': 'Keine Jobs zur Optimierung verfügbar',
'optimized_jobs': 0
})
# Verfügbare Drucker abrufen
available_printers = db_session.query(Printer).filter(Printer.active == True).all()
if not available_printers:
db_session.close()
return jsonify({
'success': False,
'error': 'Keine verfügbaren Drucker für Optimierung'
})
# Optimierungs-Algorithmus anwenden
algorithm = settings.get('algorithm', 'round_robin')
optimized_count = 0
if algorithm == 'round_robin':
optimized_count = apply_round_robin_optimization(pending_jobs, available_printers, db_session)
elif algorithm == 'load_balance':
optimized_count = apply_load_balance_optimization(pending_jobs, available_printers, db_session)
elif algorithm == 'priority_based':
optimized_count = apply_priority_optimization(pending_jobs, available_printers, db_session)
db_session.commit()
jobs_logger.info(f"Auto-Optimierung durchgeführt: {optimized_count} Jobs optimiert mit Algorithmus {algorithm}")
# System-Log erstellen
log_entry = SystemLog(
level='INFO',
component='optimization',
message=f'Auto-Optimierung durchgeführt: {optimized_count} Jobs optimiert',
user_id=current_user.id if current_user.is_authenticated else None,
details=json.dumps({
'algorithm': algorithm,
'optimized_jobs': optimized_count,
'settings': settings
})
)
db_session.add(log_entry)
db_session.commit()
db_session.close()
return jsonify({
'success': True,
'optimized_jobs': optimized_count,
'algorithm': algorithm,
'message': f'Optimierung erfolgreich: {optimized_count} Jobs wurden optimiert'
})
except Exception as e:
app_logger.error(f"Fehler beim Laden der Live-Admin-Statistiken: {str(e)}")
app_logger.error(f"Fehler bei der Auto-Optimierung: {str(e)}")
return jsonify({
'success': False,
'error': f'Optimierung fehlgeschlagen: {str(e)}'
}), 500
@app.route('/api/jobs/batch-operation', methods=['POST'])
@login_required
def perform_batch_operation():
"""Batch-Operationen auf mehrere Jobs anwenden"""
try:
data = request.get_json()
job_ids = data.get('job_ids', [])
operation = data.get('operation', '')
if not job_ids:
return jsonify({
'success': False,
'error': 'Keine Job-IDs für Batch-Operation angegeben'
}), 400
if not operation:
return jsonify({
'success': False,
'error': 'Keine Operation für Batch-Verarbeitung angegeben'
}), 400
db_session = get_db_session()
# Jobs abrufen (nur eigene oder Admin-Rechte prüfen)
if current_user.is_admin:
jobs = db_session.query(Job).filter(Job.id.in_(job_ids)).all()
else:
jobs = db_session.query(Job).filter(
Job.id.in_(job_ids),
Job.user_id == current_user.id
).all()
if not jobs:
db_session.close()
return jsonify({
'success': False,
'error': 'Keine berechtigten Jobs für Batch-Operation gefunden'
}), 403
processed_count = 0
error_count = 0
# Batch-Operation durchführen
for job in jobs:
try:
if operation == 'start':
if job.status in ['queued', 'pending']:
job.status = 'running'
job.start_time = datetime.now()
processed_count += 1
elif operation == 'pause':
if job.status == 'running':
job.status = 'paused'
processed_count += 1
elif operation == 'cancel':
if job.status in ['queued', 'pending', 'running', 'paused']:
job.status = 'cancelled'
job.end_time = datetime.now()
processed_count += 1
elif operation == 'delete':
if job.status in ['completed', 'cancelled', 'failed']:
db_session.delete(job)
processed_count += 1
elif operation == 'restart':
if job.status in ['failed', 'cancelled']:
job.status = 'queued'
job.start_time = None
job.end_time = None
processed_count += 1
elif operation == 'priority_high':
job.priority = 'high'
processed_count += 1
elif operation == 'priority_normal':
job.priority = 'normal'
processed_count += 1
else:
jobs_logger.warning(f"Unbekannte Batch-Operation: {operation}")
error_count += 1
except Exception as job_error:
jobs_logger.error(f"Fehler bei Job {job.id} in Batch-Operation {operation}: {str(job_error)}")
error_count += 1
db_session.commit()
# System-Log erstellen
log_entry = SystemLog(
level='INFO',
component='batch_operations',
message=f'Batch-Operation "{operation}" durchgeführt: {processed_count} Jobs verarbeitet',
user_id=current_user.id,
details=json.dumps({
'operation': operation,
'processed_jobs': processed_count,
'error_count': error_count,
'job_ids': job_ids
})
)
db_session.add(log_entry)
db_session.commit()
db_session.close()
jobs_logger.info(f"Batch-Operation {operation} durchgeführt: {processed_count} Jobs verarbeitet, {error_count} Fehler")
return jsonify({
'success': True,
'processed_jobs': processed_count,
'error_count': error_count,
'operation': operation,
'message': f'Batch-Operation erfolgreich: {processed_count} Jobs verarbeitet'
})
except Exception as e:
app_logger.error(f"Fehler bei Batch-Operation: {str(e)}")
return jsonify({
'success': False,
'error': f'Batch-Operation fehlgeschlagen: {str(e)}'
}), 500
@app.route('/api/optimization/settings', methods=['GET', 'POST'])
@login_required
def optimization_settings():
"""Optimierungs-Einstellungen abrufen und speichern"""
if request.method == 'GET':
try:
# Standard-Einstellungen oder benutzerdefinierte laden
default_settings = {
'algorithm': 'round_robin',
'consider_distance': True,
'minimize_changeover': True,
'max_batch_size': 10,
'time_window': 24,
'auto_optimization_enabled': False
}
# Benutzerspezifische Einstellungen aus der Datenbank laden (falls implementiert)
# Hier könnten zukünftig individuelle Benutzereinstellungen gespeichert werden
return jsonify({
'success': True,
'settings': default_settings
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Optimierungs-Einstellungen: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Laden der Einstellungen'
}), 500
elif request.method == 'POST':
try:
settings = request.get_json()
# Validierung der Einstellungen
if not validate_optimization_settings(settings):
return jsonify({
'success': False,
'error': 'Ungültige Optimierungs-Einstellungen'
}), 400
# Hier könnten die Einstellungen in der Datenbank gespeichert werden
# Momentan werden sie nur im Browser-localStorage gespeichert
app_logger.info(f"Optimierungs-Einstellungen für Benutzer {current_user.id} aktualisiert")
return jsonify({
'success': True,
'message': 'Optimierungs-Einstellungen erfolgreich gespeichert'
})
except Exception as e:
app_logger.error(f"Fehler beim Speichern der Optimierungs-Einstellungen: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Speichern der Einstellungen'
}), 500
# ===== OPTIMIERUNGS-ALGORITHMUS-FUNKTIONEN =====
def apply_round_robin_optimization(jobs, printers, db_session):
"""Round-Robin-Optimierung: Gleichmäßige Verteilung der Jobs auf Drucker"""
optimized_count = 0
printer_index = 0
for job in jobs:
if printer_index >= len(printers):
printer_index = 0
# Job dem nächsten Drucker zuweisen
job.printer_id = printers[printer_index].id
job.assigned_at = datetime.now()
optimized_count += 1
printer_index += 1
return optimized_count
def apply_load_balance_optimization(jobs, printers, db_session):
"""Load-Balancing-Optimierung: Jobs basierend auf aktueller Auslastung verteilen"""
optimized_count = 0
# Aktuelle Drucker-Auslastung berechnen
printer_loads = {}
for printer in printers:
current_jobs = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(['running', 'queued'])
).count()
printer_loads[printer.id] = current_jobs
for job in jobs:
# Drucker mit geringster Auslastung finden
min_load_printer_id = min(printer_loads, key=printer_loads.get)
job.printer_id = min_load_printer_id
job.assigned_at = datetime.now()
# Auslastung für nächste Iteration aktualisieren
printer_loads[min_load_printer_id] += 1
optimized_count += 1
return optimized_count
def apply_priority_optimization(jobs, printers, db_session):
"""Prioritätsbasierte Optimierung: Jobs nach Priorität und verfügbaren Druckern verteilen"""
optimized_count = 0
# Jobs nach Priorität sortieren
priority_order = {'urgent': 1, 'high': 2, 'normal': 3, 'low': 4}
sorted_jobs = sorted(jobs, key=lambda j: priority_order.get(getattr(j, 'priority', 'normal'), 3))
# Hochpriorisierte Jobs den besten verfügbaren Druckern zuweisen
printer_assignments = {printer.id: 0 for printer in printers}
for job in sorted_jobs:
# Drucker mit geringster Anzahl zugewiesener Jobs finden
best_printer_id = min(printer_assignments, key=printer_assignments.get)
job.printer_id = best_printer_id
job.assigned_at = datetime.now()
printer_assignments[best_printer_id] += 1
optimized_count += 1
return optimized_count
def validate_optimization_settings(settings):
"""Validiert die Optimierungs-Einstellungen"""
try:
# Algorithmus validieren
valid_algorithms = ['round_robin', 'load_balance', 'priority_based']
if settings.get('algorithm') not in valid_algorithms:
return False
# Numerische Werte validieren
max_batch_size = settings.get('max_batch_size', 10)
if not isinstance(max_batch_size, int) or max_batch_size < 1 or max_batch_size > 50:
return False
time_window = settings.get('time_window', 24)
if not isinstance(time_window, int) or time_window < 1 or time_window > 168:
return False
return True
except Exception:
return False
# ===== ERWEITERTE REFRESH-FUNKTIONEN =====
@app.route('/api/dashboard/refresh', methods=['POST'])
@login_required
def refresh_dashboard():
"""Aktualisiert Dashboard-Daten und gibt aktuelle Statistiken zurück"""
try:
db_session = get_db_session()
# Aktuelle Statistiken abrufen
stats = {
'active_jobs': db_session.query(Job).filter(Job.status == 'running').count(),
'available_printers': db_session.query(Printer).filter(Printer.active == True).count(),
'total_jobs': db_session.query(Job).count(),
'pending_jobs': db_session.query(Job).filter(Job.status == 'queued').count()
}
# Erfolgsrate berechnen
total_jobs = stats['total_jobs']
if total_jobs > 0:
completed_jobs = db_session.query(Job).filter(Job.status == 'completed').count()
stats['success_rate'] = round((completed_jobs / total_jobs) * 100, 1)
else:
stats['success_rate'] = 0
db_session.close()
return jsonify({
'success': True,
'stats': stats,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
app_logger.error(f"Fehler beim Dashboard-Refresh: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler beim Aktualisieren der Dashboard-Daten'
}), 500