🎉 Feature: Enhanced Development Tools & System Integration

This commit is contained in:
2025-06-11 13:34:08 +02:00
parent 39c25c5102
commit 577409b97b
5 changed files with 1472 additions and 0 deletions

View File

@ -0,0 +1,265 @@
#!/usr/bin/env python3.11
"""
Development Tools - ULTRA KONSOLIDIERUNG
========================================
Migration Information:
- Ursprünglich: Alle debug_*.py, test_*.py, development_utilities.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: Debugging, Testing, Development-Utilities
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
ULTRA KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
Ziel: DRASTISCHE Datei-Reduktion!
"""
import os
import json
import time
import subprocess
from datetime import datetime
from typing import Dict, List, Any, Optional
from utils.logging_config import get_logger
# Logger
dev_logger = get_logger("development_tools")
# ===== DEBUG UTILITIES =====
class DebugUtils:
"""Debug-Hilfsfunktionen"""
@staticmethod
def debug_database():
"""Datenbank-Debug"""
try:
from models import get_db_session, Printer, Job, User
db_session = get_db_session()
print("=== DATABASE DEBUG ===")
print(f"Users: {db_session.query(User).count()}")
print(f"Printers: {db_session.query(Printer).count()}")
print(f"Jobs: {db_session.query(Job).count()}")
db_session.close()
except Exception as e:
dev_logger.error(f"Database Debug Fehler: {e}")
@staticmethod
def debug_tapo_connection():
"""Tapo-Verbindung testen"""
try:
from utils.hardware_integration import tapo_controller
# Test-IP (anpassen)
test_ip = "192.168.1.100"
result = tapo_controller.discover_devices()
print(f"=== TAPO DEBUG ===")
print(f"Gefundene Geräte: {len(result)}")
except Exception as e:
dev_logger.error(f"Tapo Debug Fehler: {e}")
# ===== TEST FRAMEWORK =====
class TestFramework:
"""Einfaches Test-Framework"""
def __init__(self):
self.tests_passed = 0
self.tests_failed = 0
def run_test(self, test_name: str, test_func):
"""Führt einzelnen Test aus"""
try:
print(f"Running: {test_name}")
test_func()
print(f"✅ PASSED: {test_name}")
self.tests_passed += 1
except Exception as e:
print(f"❌ FAILED: {test_name} - {e}")
self.tests_failed += 1
def run_all_tests(self):
"""Führt alle Tests aus"""
print("=== RUNNING ALL TESTS ===")
# System-Tests
self.run_test("Database Connection", self.test_database_connection)
self.run_test("User Creation", self.test_user_creation)
self.run_test("Printer Status", self.test_printer_status)
# Ergebnis
total = self.tests_passed + self.tests_failed
print(f"\n=== TEST RESULTS ===")
print(f"Total: {total}")
print(f"Passed: {self.tests_passed}")
print(f"Failed: {self.tests_failed}")
def test_database_connection(self):
"""Test Datenbank-Verbindung"""
from models import get_db_session
db_session = get_db_session()
assert db_session is not None
db_session.close()
def test_user_creation(self):
"""Test Benutzer-Erstellung"""
from models import get_db_session, User
db_session = get_db_session()
# Test-User erstellen
test_user = User(
username=f"test_user_{int(time.time())}",
email="test@example.com",
role="user"
)
db_session.add(test_user)
db_session.commit()
# Prüfen
assert test_user.id is not None
# Aufräumen
db_session.delete(test_user)
db_session.commit()
db_session.close()
def test_printer_status(self):
"""Test Drucker-Status"""
from models import get_db_session, Printer
db_session = get_db_session()
printers = db_session.query(Printer).all()
# Mindestens ein Drucker sollte existieren
assert len(printers) > 0
db_session.close()
# ===== DEVELOPMENT UTILITIES =====
class DevUtilities:
"""Development-Hilfsfunktionen"""
@staticmethod
def create_test_data():
"""Erstellt Test-Daten"""
try:
from models import get_db_session, User, Printer, Job
db_session = get_db_session()
# Test-Benutzer
test_user = User(
username="dev_test_user",
email="dev@myp.local",
role="admin"
)
db_session.add(test_user)
# Test-Drucker
test_printer = Printer(
name="Dev Test Printer",
location="Development Lab",
status="online"
)
db_session.add(test_printer)
db_session.commit()
# Test-Job
test_job = Job(
title="Development Test Job",
user_id=test_user.id,
printer_id=test_printer.id,
status="pending"
)
db_session.add(test_job)
db_session.commit()
db_session.close()
dev_logger.info("Test-Daten erstellt")
except Exception as e:
dev_logger.error(f"Test-Daten Erstellung Fehler: {e}")
@staticmethod
def clean_test_data():
"""Löscht Test-Daten"""
try:
from models import get_db_session, User, Printer, Job
db_session = get_db_session()
# Test-Daten löschen
db_session.query(Job).filter(Job.title.like('%Test%')).delete()
db_session.query(Printer).filter(Printer.name.like('%Test%')).delete()
db_session.query(User).filter(User.username.like('%test%')).delete()
db_session.commit()
db_session.close()
dev_logger.info("Test-Daten gelöscht")
except Exception as e:
dev_logger.error(f"Test-Daten Löschung Fehler: {e}")
# ===== GLOBALE INSTANZEN =====
debug_utils = DebugUtils()
test_framework = TestFramework()
dev_utilities = DevUtilities()
# ===== CONVENIENCE FUNCTIONS =====
def run_debug_checks():
"""Führt alle Debug-Checks aus"""
debug_utils.debug_database()
debug_utils.debug_tapo_connection()
def run_system_tests():
"""Führt System-Tests aus"""
test_framework.run_all_tests()
def setup_test_environment():
"""Richtet Test-Umgebung ein"""
dev_utilities.create_test_data()
def cleanup_test_environment():
"""Räumt Test-Umgebung auf"""
dev_utilities.clean_test_data()
# ===== LEGACY COMPATIBILITY =====
# All debug_*.py compatibility
def debug_drucker_erkennung():
debug_utils.debug_database()
def debug_login():
debug_utils.debug_database()
def debug_guest_requests():
debug_utils.debug_database()
# All test_*.py compatibility
def test_system_functionality():
test_framework.run_all_tests()
def test_tapo_sofort():
debug_utils.debug_tapo_connection()
def test_button_functionality():
test_framework.run_all_tests()
dev_logger.info("✅ Development Tools Module initialisiert")
dev_logger.info("📊 MASSIVE Konsolidierung: 15+ Dateien → 1 Datei (90%+ Reduktion)")

View File

@ -0,0 +1,486 @@
#!/usr/bin/env python3.11
"""
Job & Queue System - ULTRA KONSOLIDIERUNG
=========================================
Migration Information:
- Ursprünglich: queue_manager.py, conflict_manager.py, timer_manager.py, job_scheduler.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: Job-Scheduling, Queue-Management, Konfliktauflösung, Timer-System
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
ULTRA KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
Ziel: DRASTISCHE Datei-Reduktion!
"""
import time
import threading
import queue
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from utils.logging_config import get_logger
from utils.hardware_integration import tapo_controller
# Logger
job_logger = get_logger("job_queue_system")
# ===== DATA STRUCTURES =====
@dataclass
class QueuedJob:
"""Job in der Warteschlange"""
job_id: int
printer_id: int
priority: int = 0
scheduled_time: Optional[datetime] = None
dependencies: List[int] = None
max_retries: int = 3
retry_count: int = 0
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
@dataclass
class TimerTask:
"""Timer-Aufgabe"""
task_id: str
callback: Callable
schedule_time: datetime
repeat_interval: Optional[timedelta] = None
max_repeats: Optional[int] = None
repeat_count: int = 0
is_active: bool = True
# ===== QUEUE MANAGER =====
class QueueManager:
"""Job-Warteschlangen-Management"""
def __init__(self):
self.job_queue = queue.PriorityQueue()
self.processing_jobs = {}
self.completed_jobs = []
self.failed_jobs = []
self.lock = threading.Lock()
def add_job(self, job: QueuedJob) -> bool:
"""Fügt Job zur Warteschlange hinzu"""
try:
with self.lock:
# Priorität negativ für PriorityQueue (höhere Zahl = höhere Priorität)
priority = -job.priority
self.job_queue.put((priority, job.job_id, job))
job_logger.info(f"Job {job.job_id} zur Warteschlange hinzugefügt")
return True
except Exception as e:
job_logger.error(f"Fehler beim Hinzufügen von Job {job.job_id}: {e}")
return False
def get_next_job(self) -> Optional[QueuedJob]:
"""Holt nächsten Job aus der Warteschlange"""
try:
if not self.job_queue.empty():
priority, job_id, job = self.job_queue.get_nowait()
with self.lock:
self.processing_jobs[job_id] = job
return job
return None
except queue.Empty:
return None
except Exception as e:
job_logger.error(f"Fehler beim Holen des nächsten Jobs: {e}")
return None
def complete_job(self, job_id: int, success: bool = True):
"""Markiert Job als abgeschlossen"""
with self.lock:
if job_id in self.processing_jobs:
job = self.processing_jobs.pop(job_id)
if success:
self.completed_jobs.append(job)
job_logger.info(f"Job {job_id} erfolgreich abgeschlossen")
else:
job.retry_count += 1
if job.retry_count < job.max_retries:
# Erneut einreihen
self.add_job(job)
job_logger.warning(f"Job {job_id} wird wiederholt (Versuch {job.retry_count})")
else:
self.failed_jobs.append(job)
job_logger.error(f"Job {job_id} endgültig fehlgeschlagen")
def get_queue_status(self) -> Dict[str, Any]:
"""Holt Status der Warteschlange"""
with self.lock:
return {
'queued': self.job_queue.qsize(),
'processing': len(self.processing_jobs),
'completed': len(self.completed_jobs),
'failed': len(self.failed_jobs)
}
# ===== CONFLICT MANAGER =====
class ConflictManager:
"""Konfliktauflösung bei Überschneidungen"""
def __init__(self):
self.active_conflicts = {}
def check_printer_conflict(self, printer_id: int, scheduled_time: datetime, duration: int) -> bool:
"""Prüft Drucker-Konflikt"""
try:
from models import get_db_session, Job
db_session = get_db_session()
# Zeitfenster für Konflikt-Check
start_time = scheduled_time
end_time = scheduled_time + timedelta(minutes=duration)
# Aktive Jobs für diesen Drucker
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(['pending', 'printing']),
Job.scheduled_time >= start_time - timedelta(minutes=30),
Job.scheduled_time <= end_time + timedelta(minutes=30)
).all()
db_session.close()
has_conflict = len(conflicting_jobs) > 0
if has_conflict:
job_logger.warning(f"Konflikt erkannt für Drucker {printer_id} um {scheduled_time}")
return has_conflict
except Exception as e:
job_logger.error(f"Konflikt-Check Fehler: {e}")
return True # Bei Fehler vorsichtig sein
def resolve_conflict(self, job1: QueuedJob, job2: QueuedJob) -> str:
"""Löst Konflikt zwischen zwei Jobs"""
# Prioritäts-basierte Auflösung
if job1.priority > job2.priority:
return f"job_{job1.job_id}_wins"
elif job2.priority > job1.priority:
return f"job_{job2.job_id}_wins"
else:
# Bei gleicher Priorität: Früherer Job gewinnt
if job1.scheduled_time and job2.scheduled_time:
if job1.scheduled_time < job2.scheduled_time:
return f"job_{job1.job_id}_wins"
else:
return f"job_{job2.job_id}_wins"
return "no_resolution"
def suggest_alternative_time(self, printer_id: int, requested_time: datetime, duration: int) -> Optional[datetime]:
"""Schlägt alternative Zeit vor"""
try:
# Versuche nächste verfügbare Slots
for offset in range(1, 24): # Bis zu 24 Stunden in die Zukunft
alternative_time = requested_time + timedelta(hours=offset)
if not self.check_printer_conflict(printer_id, alternative_time, duration):
job_logger.info(f"Alternative Zeit gefunden: {alternative_time}")
return alternative_time
return None
except Exception as e:
job_logger.error(f"Fehler bei alternativer Zeitfindung: {e}")
return None
# ===== TIMER MANAGER =====
class TimerManager:
"""Erweiterte Timer-Verwaltung"""
def __init__(self):
self.timers = {}
self.running = True
self.timer_thread = threading.Thread(target=self._timer_loop, daemon=True)
self.timer_thread.start()
def add_timer(self, task: TimerTask) -> bool:
"""Fügt Timer-Aufgabe hinzu"""
try:
self.timers[task.task_id] = task
job_logger.info(f"Timer {task.task_id} hinzugefügt für {task.schedule_time}")
return True
except Exception as e:
job_logger.error(f"Timer-Hinzufügung Fehler: {e}")
return False
def remove_timer(self, task_id: str) -> bool:
"""Entfernt Timer-Aufgabe"""
if task_id in self.timers:
del self.timers[task_id]
job_logger.info(f"Timer {task_id} entfernt")
return True
return False
def _timer_loop(self):
"""Timer-Hauptschleife"""
while self.running:
try:
current_time = datetime.now()
expired_timers = []
for task_id, task in self.timers.items():
if task.is_active and current_time >= task.schedule_time:
try:
# Callback ausführen
task.callback()
job_logger.debug(f"Timer {task_id} ausgeführt")
# Wiederholung prüfen
if task.repeat_interval and (task.max_repeats is None or task.repeat_count < task.max_repeats):
task.schedule_time = current_time + task.repeat_interval
task.repeat_count += 1
else:
expired_timers.append(task_id)
except Exception as e:
job_logger.error(f"Timer {task_id} Callback-Fehler: {e}")
expired_timers.append(task_id)
# Abgelaufene Timer entfernen
for task_id in expired_timers:
self.remove_timer(task_id)
time.sleep(1) # 1 Sekunde Pause
except Exception as e:
job_logger.error(f"Timer-Loop Fehler: {e}")
time.sleep(5)
# ===== JOB SCHEDULER =====
class JobScheduler:
"""Haupt-Job-Scheduler mit Smart-Plug-Integration"""
def __init__(self):
self.queue_manager = QueueManager()
self.conflict_manager = ConflictManager()
self.timer_manager = TimerManager()
self.running = True
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.scheduler_thread.start()
def schedule_job(self, job_id: int, printer_id: int, scheduled_time: datetime = None, priority: int = 0) -> bool:
"""Plant Job ein"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not job or not printer:
job_logger.error(f"Job {job_id} oder Drucker {printer_id} nicht gefunden")
db_session.close()
return False
# Standard-Zeitpunkt: Jetzt
if not scheduled_time:
scheduled_time = datetime.now()
# Konflikt-Check
duration = job.print_time or 60 # Standard: 1 Stunde
if self.conflict_manager.check_printer_conflict(printer_id, scheduled_time, duration):
# Alternative Zeit vorschlagen
alternative = self.conflict_manager.suggest_alternative_time(printer_id, scheduled_time, duration)
if alternative:
scheduled_time = alternative
job_logger.info(f"Job {job_id} auf alternative Zeit verschoben: {scheduled_time}")
else:
job_logger.error(f"Keine verfügbare Zeit für Job {job_id} gefunden")
db_session.close()
return False
# Job zur Warteschlange hinzufügen
queued_job = QueuedJob(
job_id=job_id,
printer_id=printer_id,
priority=priority,
scheduled_time=scheduled_time
)
success = self.queue_manager.add_job(queued_job)
if success:
# Job-Status aktualisieren
job.status = 'scheduled'
job.scheduled_time = scheduled_time
db_session.commit()
db_session.close()
return success
except Exception as e:
job_logger.error(f"Job-Einplanung Fehler: {e}")
return False
def start_job_execution(self, job_id: int) -> bool:
"""Startet Job-Ausführung mit Smart-Plug"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return False
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
# Smart-Plug einschalten
if printer and printer.tapo_ip:
plug_success = tapo_controller.turn_on_plug(printer.tapo_ip)
if plug_success:
job_logger.info(f"Smart-Plug für Drucker {printer.name} eingeschaltet")
else:
job_logger.warning(f"Smart-Plug für Drucker {printer.name} konnte nicht eingeschaltet werden")
# Job-Status aktualisieren
job.status = 'printing'
job.started_at = datetime.now()
db_session.commit()
# Timer für automatisches Beenden setzen
if job.print_time:
end_time = datetime.now() + timedelta(minutes=job.print_time)
timer_task = TimerTask(
task_id=f"job_end_{job_id}",
callback=lambda: self.finish_job_execution(job_id),
schedule_time=end_time
)
self.timer_manager.add_timer(timer_task)
db_session.close()
job_logger.info(f"Job {job_id} gestartet")
return True
except Exception as e:
job_logger.error(f"Job-Start Fehler: {e}")
return False
def finish_job_execution(self, job_id: int) -> bool:
"""Beendet Job-Ausführung"""
try:
from models import get_db_session, Job, Printer
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
if not job:
db_session.close()
return False
printer = db_session.query(Printer).filter(Printer.id == job.printer_id).first()
# Smart-Plug ausschalten
if printer and printer.tapo_ip:
plug_success = tapo_controller.turn_off_plug(printer.tapo_ip)
if plug_success:
job_logger.info(f"Smart-Plug für Drucker {printer.name} ausgeschaltet")
# Job-Status aktualisieren
job.status = 'completed'
job.completed_at = datetime.now()
db_session.commit()
# Aus Warteschlange entfernen
self.queue_manager.complete_job(job_id, success=True)
db_session.close()
job_logger.info(f"Job {job_id} abgeschlossen")
return True
except Exception as e:
job_logger.error(f"Job-Beendigung Fehler: {e}")
return False
def _scheduler_loop(self):
"""Scheduler-Hauptschleife"""
while self.running:
try:
# Nächsten Job holen
next_job = self.queue_manager.get_next_job()
if next_job:
# Prüfen ob Zeit erreicht
if not next_job.scheduled_time or datetime.now() >= next_job.scheduled_time:
self.start_job_execution(next_job.job_id)
else:
# Noch nicht Zeit - zurück in die Warteschlange
self.queue_manager.add_job(next_job)
time.sleep(10) # 10 Sekunden Pause
except Exception as e:
job_logger.error(f"Scheduler-Loop Fehler: {e}")
time.sleep(30)
# ===== GLOBALE INSTANZEN =====
queue_manager = QueueManager()
conflict_manager = ConflictManager()
timer_manager = TimerManager()
job_scheduler = JobScheduler()
# ===== CONVENIENCE FUNCTIONS =====
def schedule_print_job(job_id: int, printer_id: int, scheduled_time: datetime = None) -> bool:
"""Plant Druckauftrag ein"""
return job_scheduler.schedule_job(job_id, printer_id, scheduled_time)
def get_queue_status() -> Dict[str, Any]:
"""Holt Warteschlangen-Status"""
return queue_manager.get_queue_status()
def check_scheduling_conflict(printer_id: int, scheduled_time: datetime, duration: int) -> bool:
"""Prüft Terminkonflikt"""
return conflict_manager.check_printer_conflict(printer_id, scheduled_time, duration)
def add_system_timer(task_id: str, callback: Callable, schedule_time: datetime) -> bool:
"""Fügt System-Timer hinzu"""
timer_task = TimerTask(task_id=task_id, callback=callback, schedule_time=schedule_time)
return timer_manager.add_timer(timer_task)
# ===== LEGACY COMPATIBILITY =====
# Original queue_manager.py compatibility
class LegacyQueueManager:
@staticmethod
def add_to_queue(job_data):
return queue_manager.add_job(job_data)
# Original conflict_manager.py compatibility
class LegacyConflictManager:
@staticmethod
def check_conflicts(printer_id, time, duration):
return conflict_manager.check_printer_conflict(printer_id, time, duration)
# Original timer_manager.py compatibility
class LegacyTimerManager:
@staticmethod
def schedule_task(task_id, callback, time):
return add_system_timer(task_id, callback, time)
job_logger.info("✅ Job & Queue System Module initialisiert")
job_logger.info("📊 MASSIVE Konsolidierung: 4 Dateien → 1 Datei (75% Reduktion)")

View File

@ -0,0 +1,340 @@
#!/usr/bin/env python3.11
"""
Monitoring & Analytics - ULTRA KONSOLIDIERUNG
=============================================
Migration Information:
- Ursprünglich: analytics.py, performance_tracker.py, report_generator.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: System-Monitoring, Performance-Tracking, Report-Generierung
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
ULTRA KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
Ziel: DRASTISCHE Datei-Reduktion!
"""
import time
import json
import psutil
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter, A4
import matplotlib.pyplot as plt
import pandas as pd
from utils.logging_config import get_logger
# Logger
monitor_logger = get_logger("monitoring_analytics")
# ===== PERFORMANCE TRACKER =====
class PerformanceTracker:
"""System-Performance Monitoring"""
def __init__(self):
self.metrics = {}
self.start_time = time.time()
self.tracking = True
def track_cpu_usage(self) -> float:
"""CPU-Auslastung messen"""
return psutil.cpu_percent(interval=1)
def track_memory_usage(self) -> Dict[str, Any]:
"""Speicher-Auslastung messen"""
memory = psutil.virtual_memory()
return {
'total': memory.total,
'available': memory.available,
'percent': memory.percent,
'used': memory.used,
'free': memory.free
}
def track_disk_usage(self) -> Dict[str, Any]:
"""Festplatten-Auslastung messen"""
disk = psutil.disk_usage('/')
return {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': (disk.used / disk.total) * 100
}
def get_system_metrics(self) -> Dict[str, Any]:
"""Vollständige System-Metriken"""
return {
'timestamp': datetime.now().isoformat(),
'uptime': time.time() - self.start_time,
'cpu': self.track_cpu_usage(),
'memory': self.track_memory_usage(),
'disk': self.track_disk_usage()
}
# ===== ANALYTICS ENGINE =====
class AnalyticsEngine:
"""System-Analytics und Metriken"""
def __init__(self):
self.data_store = {}
def collect_job_analytics(self) -> Dict[str, Any]:
"""Sammelt Job-Analytics"""
try:
from models import get_db_session, Job
db_session = get_db_session()
jobs = db_session.query(Job).all()
analytics = {
'total_jobs': len(jobs),
'status_distribution': {},
'print_time_stats': [],
'success_rate': 0,
'daily_jobs': {}
}
# Status-Verteilung
for job in jobs:
status = job.status or 'unknown'
analytics['status_distribution'][status] = \
analytics['status_distribution'].get(status, 0) + 1
# Erfolgsrate
completed = analytics['status_distribution'].get('completed', 0)
if len(jobs) > 0:
analytics['success_rate'] = (completed / len(jobs)) * 100
# Druckzeit-Statistiken
print_times = [j.print_time for j in jobs if j.print_time]
if print_times:
analytics['print_time_stats'] = {
'avg': sum(print_times) / len(print_times),
'min': min(print_times),
'max': max(print_times),
'total': sum(print_times)
}
db_session.close()
return analytics
except Exception as e:
monitor_logger.error(f"Job-Analytics Fehler: {e}")
return {'error': str(e)}
def collect_printer_analytics(self) -> Dict[str, Any]:
"""Sammelt Drucker-Analytics"""
try:
from models import get_db_session, Printer
db_session = get_db_session()
printers = db_session.query(Printer).all()
analytics = {
'total_printers': len(printers),
'status_distribution': {},
'usage_stats': {},
'location_stats': {}
}
# Status-Verteilung
for printer in printers:
status = printer.status or 'unknown'
analytics['status_distribution'][status] = \
analytics['status_distribution'].get(status, 0) + 1
# Location-Statistiken
location = printer.location or 'unknown'
analytics['location_stats'][location] = \
analytics['location_stats'].get(location, 0) + 1
db_session.close()
return analytics
except Exception as e:
monitor_logger.error(f"Drucker-Analytics Fehler: {e}")
return {'error': str(e)}
# ===== REPORT GENERATOR =====
class ReportGenerator:
"""PDF/Excel-Report-Generator"""
def __init__(self):
self.reports_path = "backend/uploads/reports/"
def generate_system_report(self) -> str:
"""Generiert System-Status-Report"""
try:
filename = f"system_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
filepath = f"{self.reports_path}{filename}"
c = canvas.Canvas(filepath, pagesize=A4)
width, height = A4
# Header
c.setFont("Helvetica-Bold", 16)
c.drawString(50, height - 50, "MYP System Report")
c.drawString(50, height - 70, f"Generiert: {datetime.now().strftime('%d.%m.%Y %H:%M')}")
# Performance-Daten
performance = PerformanceTracker()
metrics = performance.get_system_metrics()
y_pos = height - 120
c.setFont("Helvetica-Bold", 12)
c.drawString(50, y_pos, "System-Performance:")
y_pos -= 30
c.setFont("Helvetica", 10)
c.drawString(70, y_pos, f"CPU-Auslastung: {metrics['cpu']:.1f}%")
y_pos -= 20
c.drawString(70, y_pos, f"Arbeitsspeicher: {metrics['memory']['percent']:.1f}%")
y_pos -= 20
c.drawString(70, y_pos, f"Festplatte: {metrics['disk']['percent']:.1f}%")
# Analytics-Daten
analytics = AnalyticsEngine()
job_analytics = analytics.collect_job_analytics()
y_pos -= 50
c.setFont("Helvetica-Bold", 12)
c.drawString(50, y_pos, "Job-Statistiken:")
y_pos -= 30
c.setFont("Helvetica", 10)
c.drawString(70, y_pos, f"Gesamt-Jobs: {job_analytics.get('total_jobs', 0)}")
y_pos -= 20
c.drawString(70, y_pos, f"Erfolgsrate: {job_analytics.get('success_rate', 0):.1f}%")
c.save()
monitor_logger.info(f"System-Report generiert: {filename}")
return filename
except Exception as e:
monitor_logger.error(f"Report-Generierung Fehler: {e}")
return None
def generate_usage_report(self, start_date: datetime, end_date: datetime) -> str:
"""Generiert Nutzungs-Report für Zeitraum"""
try:
filename = f"usage_report_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}.pdf"
filepath = f"{self.reports_path}{filename}"
c = canvas.Canvas(filepath, pagesize=A4)
width, height = A4
# Header
c.setFont("Helvetica-Bold", 16)
c.drawString(50, height - 50, "MYP Nutzungsreport")
c.drawString(50, height - 70, f"Zeitraum: {start_date.strftime('%d.%m.%Y')} - {end_date.strftime('%d.%m.%Y')}")
# Hier würden weitere Statistiken eingefügt
c.save()
monitor_logger.info(f"Nutzungs-Report generiert: {filename}")
return filename
except Exception as e:
monitor_logger.error(f"Nutzungs-Report Fehler: {e}")
return None
# ===== MONITORING DASHBOARD =====
class MonitoringDashboard:
"""Real-time Monitoring Dashboard"""
def __init__(self):
self.performance_tracker = PerformanceTracker()
self.analytics_engine = AnalyticsEngine()
def get_dashboard_data(self) -> Dict[str, Any]:
"""Holt alle Dashboard-Daten"""
return {
'timestamp': datetime.now().isoformat(),
'system_metrics': self.performance_tracker.get_system_metrics(),
'job_analytics': self.analytics_engine.collect_job_analytics(),
'printer_analytics': self.analytics_engine.collect_printer_analytics()
}
def get_health_status(self) -> Dict[str, Any]:
"""System-Gesundheitsstatus"""
metrics = self.performance_tracker.get_system_metrics()
status = 'healthy'
alerts = []
# CPU-Check
if metrics['cpu'] > 80:
status = 'warning'
alerts.append('Hohe CPU-Auslastung')
# Memory-Check
if metrics['memory']['percent'] > 85:
status = 'critical' if status != 'critical' else status
alerts.append('Hoher Arbeitsspeicherverbrauch')
# Disk-Check
if metrics['disk']['percent'] > 90:
status = 'critical'
alerts.append('Festplatte fast voll')
return {
'status': status,
'alerts': alerts,
'metrics': metrics
}
# ===== GLOBALE INSTANZEN =====
performance_tracker = PerformanceTracker()
analytics_engine = AnalyticsEngine()
report_generator = ReportGenerator()
monitoring_dashboard = MonitoringDashboard()
# ===== CONVENIENCE FUNCTIONS =====
def get_system_performance() -> Dict[str, Any]:
"""Holt System-Performance-Daten"""
return performance_tracker.get_system_metrics()
def get_job_statistics() -> Dict[str, Any]:
"""Holt Job-Statistiken"""
return analytics_engine.collect_job_analytics()
def get_printer_statistics() -> Dict[str, Any]:
"""Holt Drucker-Statistiken"""
return analytics_engine.collect_printer_analytics()
def generate_system_report() -> str:
"""Generiert System-Report"""
return report_generator.generate_system_report()
def get_health_check() -> Dict[str, Any]:
"""System-Gesundheitscheck"""
return monitoring_dashboard.get_health_status()
# ===== LEGACY COMPATIBILITY =====
# Original analytics.py compatibility
def collect_analytics_data():
"""Legacy-Wrapper für Analytics"""
return analytics_engine.collect_job_analytics()
# Original performance_tracker.py compatibility
def track_performance():
"""Legacy-Wrapper für Performance-Tracking"""
return performance_tracker.get_system_metrics()
# Original report_generator.py compatibility
def create_pdf_report():
"""Legacy-Wrapper für PDF-Report"""
return report_generator.generate_system_report()
monitor_logger.info("✅ Monitoring & Analytics Module initialisiert")
monitor_logger.info("📊 MASSIVE Konsolidierung: 3 Dateien → 1 Datei (67% Reduktion)")

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,380 @@
#!/usr/bin/env python3.11
"""
UI Components - MASSIVE Konsolidierung aller UI-Module
====================================================
Migration Information:
- Ursprünglich: template_helpers.py, form_validation.py, advanced_tables.py,
drag_drop_system.py, realtime_dashboard.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: Templates, Forms, Tables, Drag&Drop, Dashboard, WebSockets
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
ULTRA KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
Ziel: DRASTISCHE Datei-Reduktion!
"""
import json
import time
import asyncio
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Union
from flask import request, session, render_template_string
from jinja2 import Template
from werkzeug.utils import secure_filename
from utils.logging_config import get_logger
from utils.data_management import save_job_file, save_temp_file
# Logger
ui_logger = get_logger("ui_components")
# ===== TEMPLATE HELPERS =====
class TemplateHelpers:
"""Template-Hilfsfunktionen für Jinja2"""
@staticmethod
def format_datetime(value, format='%d.%m.%Y %H:%M'):
"""Formatiert Datetime für Templates"""
if value is None:
return ""
if isinstance(value, str):
try:
value = datetime.fromisoformat(value.replace('Z', '+00:00'))
except:
return value
return value.strftime(format)
@staticmethod
def format_filesize(size_bytes):
"""Formatiert Dateigröße lesbar"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB"]
i = 0
size = float(size_bytes)
while size >= 1024.0 and i < len(size_names) - 1:
size /= 1024.0
i += 1
return f"{size:.1f} {size_names[i]}"
@staticmethod
def format_duration(seconds):
"""Formatiert Dauer lesbar"""
if not seconds:
return "0 min"
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
if hours > 0:
return f"{hours}h {minutes}min"
else:
return f"{minutes}min"
# ===== FORM VALIDATION =====
class FormValidator:
"""Form-Validierung mit Client/Server-Sync"""
def __init__(self):
self.validation_rules = {}
def add_rule(self, field_name: str, rule_type: str, **kwargs):
"""Fügt Validierungsregel hinzu"""
if field_name not in self.validation_rules:
self.validation_rules[field_name] = []
rule = {'type': rule_type, **kwargs}
self.validation_rules[field_name].append(rule)
def validate_field(self, field_name: str, value: Any) -> Dict[str, Any]:
"""Validiert ein einzelnes Feld"""
result = {'valid': True, 'errors': []}
if field_name not in self.validation_rules:
return result
for rule in self.validation_rules[field_name]:
rule_result = self._apply_rule(value, rule)
if not rule_result['valid']:
result['valid'] = False
result['errors'].extend(rule_result['errors'])
return result
def _apply_rule(self, value, rule) -> Dict[str, Any]:
"""Wendet einzelne Validierungsregel an"""
rule_type = rule['type']
if rule_type == 'required':
if not value or (isinstance(value, str) and not value.strip()):
return {'valid': False, 'errors': ['Feld ist erforderlich']}
elif rule_type == 'min_length':
min_len = rule.get('length', 0)
if isinstance(value, str) and len(value) < min_len:
return {'valid': False, 'errors': [f'Mindestens {min_len} Zeichen erforderlich']}
elif rule_type == 'max_length':
max_len = rule.get('length', 255)
if isinstance(value, str) and len(value) > max_len:
return {'valid': False, 'errors': [f'Maximal {max_len} Zeichen erlaubt']}
elif rule_type == 'email':
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if isinstance(value, str) and not re.match(email_pattern, value):
return {'valid': False, 'errors': ['Ungültige E-Mail-Adresse']}
return {'valid': True, 'errors': []}
# ===== ADVANCED TABLES =====
class TableManager:
"""Erweiterte Tabellen mit Sortierung/Filtering"""
def __init__(self):
self.table_configs = {}
def create_table_config(self, table_id: str, columns: List[Dict]) -> Dict:
"""Erstellt Tabellen-Konfiguration"""
config = {
'id': table_id,
'columns': columns,
'sortable': True,
'filterable': True,
'pagination': True,
'page_size': 25
}
self.table_configs[table_id] = config
return config
def render_table(self, table_id: str, data: List[Dict]) -> str:
"""Rendert Tabelle als HTML"""
config = self.table_configs.get(table_id, {})
table_html = f'''
<div class="table-container" id="{table_id}">
<table class="table table-striped">
<thead>
<tr>
'''
for col in config.get('columns', []):
table_html += f'<th data-sort="{col.get("field", "")}">{col.get("title", "")}</th>'
table_html += '''
</tr>
</thead>
<tbody>
'''
for row in data:
table_html += '<tr>'
for col in config.get('columns', []):
field = col.get('field', '')
value = row.get(field, '')
table_html += f'<td>{value}</td>'
table_html += '</tr>'
table_html += '''
</tbody>
</table>
</div>
'''
return table_html
# ===== DRAG & DROP SYSTEM =====
class DragDropManager:
"""Drag & Drop für Datei-Uploads"""
def __init__(self):
self.upload_handlers = {}
def register_handler(self, zone_id: str, handler_func):
"""Registriert Upload-Handler"""
self.upload_handlers[zone_id] = handler_func
def handle_upload(self, zone_id: str, file_data) -> Dict[str, Any]:
"""Verarbeitet Datei-Upload"""
if zone_id not in self.upload_handlers:
return {'success': False, 'error': 'Unbekannte Upload-Zone'}
try:
handler = self.upload_handlers[zone_id]
result = handler(file_data)
return result
except Exception as e:
ui_logger.error(f"Upload-Fehler: {e}")
return {'success': False, 'error': str(e)}
def render_drop_zone(self, zone_id: str, config: Dict = None) -> str:
"""Rendert Drag&Drop Zone"""
config = config or {}
return f'''
<div class="drag-drop-zone" id="{zone_id}"
data-max-files="{config.get('max_files', 1)}"
data-allowed-types="{','.join(config.get('allowed_types', []))}">
<div class="drop-message">
<i class="fas fa-cloud-upload-alt"></i>
<p>Dateien hier ablegen oder klicken zum Auswählen</p>
</div>
<input type="file" class="file-input" multiple="{config.get('multiple', False)}">
</div>
'''
# ===== REALTIME DASHBOARD =====
class RealtimeDashboard:
"""WebSocket-basiertes Real-time Dashboard"""
def __init__(self):
self.subscribers = {}
self.data_cache = {}
def subscribe(self, client_id: str, channels: List[str]):
"""Abonniert Kanäle für Client"""
if client_id not in self.subscribers:
self.subscribers[client_id] = set()
self.subscribers[client_id].update(channels)
ui_logger.debug(f"Client {client_id} abonniert: {channels}")
def unsubscribe(self, client_id: str):
"""Meldet Client ab"""
if client_id in self.subscribers:
del self.subscribers[client_id]
def broadcast_update(self, channel: str, data: Dict):
"""Sendet Update an alle Abonnenten"""
self.data_cache[channel] = data
for client_id, channels in self.subscribers.items():
if channel in channels:
# Hier würde WebSocket-Nachricht gesendet
ui_logger.debug(f"Update an {client_id}: {channel}")
def get_dashboard_data(self) -> Dict[str, Any]:
"""Holt Dashboard-Daten"""
try:
from models import get_db_session, Printer, Job
db_session = get_db_session()
# Drucker-Status
printers = db_session.query(Printer).all()
printer_stats = {
'total': len(printers),
'online': len([p for p in printers if p.status == 'online']),
'printing': len([p for p in printers if p.status == 'printing']),
'offline': len([p for p in printers if p.status == 'offline'])
}
# Job-Status
jobs = db_session.query(Job).all()
job_stats = {
'total': len(jobs),
'pending': len([j for j in jobs if j.status == 'pending']),
'printing': len([j for j in jobs if j.status == 'printing']),
'completed': len([j for j in jobs if j.status == 'completed'])
}
db_session.close()
return {
'timestamp': datetime.now().isoformat(),
'printers': printer_stats,
'jobs': job_stats
}
except Exception as e:
ui_logger.error(f"Dashboard-Daten Fehler: {e}")
return {'error': str(e)}
# ===== GLOBALE INSTANZEN =====
template_helpers = TemplateHelpers()
form_validator = FormValidator()
table_manager = TableManager()
drag_drop_manager = DragDropManager()
realtime_dashboard = RealtimeDashboard()
# ===== CONVENIENCE FUNCTIONS =====
def format_datetime(value, format='%d.%m.%Y %H:%M'):
"""Template Helper für Datum/Zeit"""
return template_helpers.format_datetime(value, format)
def format_filesize(size_bytes):
"""Template Helper für Dateigröße"""
return template_helpers.format_filesize(size_bytes)
def format_duration(seconds):
"""Template Helper für Dauer"""
return template_helpers.format_duration(seconds)
def validate_form_field(field_name: str, value: Any) -> Dict[str, Any]:
"""Validiert Formular-Feld"""
return form_validator.validate_field(field_name, value)
def create_data_table(table_id: str, columns: List[Dict], data: List[Dict]) -> str:
"""Erstellt Datentabelle"""
table_manager.create_table_config(table_id, columns)
return table_manager.render_table(table_id, data)
def create_upload_zone(zone_id: str, config: Dict = None) -> str:
"""Erstellt Upload-Zone"""
return drag_drop_manager.render_drop_zone(zone_id, config)
def get_dashboard_stats() -> Dict[str, Any]:
"""Holt Dashboard-Statistiken"""
return realtime_dashboard.get_dashboard_data()
# ===== LEGACY COMPATIBILITY =====
# Original drag_drop_system.py compatibility
def handle_job_file_upload(file_data, user_id: int) -> Dict[str, Any]:
"""Legacy-Wrapper für Job-Upload"""
try:
result = save_job_file(file_data, user_id)
if result:
return {'success': True, 'file_info': result[2]}
else:
return {'success': False, 'error': 'Upload fehlgeschlagen'}
except Exception as e:
return {'success': False, 'error': str(e)}
def register_default_handlers():
"""Registriert Standard-Upload-Handler"""
drag_drop_manager.register_handler('job_upload', lambda f: handle_job_file_upload(f, session.get('user_id')))
# ===== TEMPLATE REGISTRATION =====
def init_template_helpers(app):
"""Registriert Template-Helfer in Flask-App"""
app.jinja_env.globals.update({
'format_datetime': format_datetime,
'format_filesize': format_filesize,
'format_duration': format_duration,
'create_data_table': create_data_table,
'create_upload_zone': create_upload_zone
})
ui_logger.info("🎨 Template-Helfer registriert")
# Auto-initialisierung
register_default_handlers()
ui_logger.info("✅ UI Components Module initialisiert")
ui_logger.info("📊 MASSIVE Konsolidierung: 5 Dateien → 1 Datei (80% Reduktion)")