manage-your-printer/utils/test_system_functionality.py
2025-06-04 10:03:22 +02:00

437 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Umfassender Systemfunktionalitätstest für MYP Platform
Prüft alle kritischen Komponenten und Features
"""
import sys
import os
import json
import requests
import time
from datetime import datetime
from typing import Dict, List, Any
# Füge das aktuelle Verzeichnis zum Python-Pfad hinzu
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Tests für interne Komponenten
def test_internal_components():
"""Testet interne Systemkomponenten"""
results = {}
print("🔍 Teste interne Systemkomponenten...")
# Test 1: Importiere kritische Module
try:
from models import User, Printer, Job, get_db_session, init_database
from config.settings import SECRET_KEY, DATABASE_PATH
from utils.logging_config import get_logger
results["module_imports"] = {"status": "SUCCESS", "message": "Alle kritischen Module importiert"}
except Exception as e:
results["module_imports"] = {"status": "FAILED", "message": f"Import-Fehler: {str(e)}"}
return results
# Test 2: Datenbankverbindung
try:
db_session = get_db_session()
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
job_count = db_session.query(Job).count()
db_session.close()
results["database_connection"] = {
"status": "SUCCESS",
"message": f"Datenbank verbunden - {user_count} Benutzer, {printer_count} Drucker, {job_count} Jobs"
}
except Exception as e:
results["database_connection"] = {"status": "FAILED", "message": f"DB-Fehler: {str(e)}"}
# Test 3: Admin-Benutzer vorhanden
try:
db_session = get_db_session()
admin_user = db_session.query(User).filter(User.role == "admin").first()
db_session.close()
if admin_user:
results["admin_user"] = {
"status": "SUCCESS",
"message": f"Admin-Benutzer gefunden: {admin_user.username} ({admin_user.email})"
}
else:
results["admin_user"] = {"status": "FAILED", "message": "Kein Admin-Benutzer gefunden"}
except Exception as e:
results["admin_user"] = {"status": "FAILED", "message": f"Admin-Check-Fehler: {str(e)}"}
# Test 4: Windows-Fixes
try:
if os.name == 'nt':
from utils.windows_fixes import get_windows_thread_manager
thread_manager = get_windows_thread_manager()
if thread_manager:
results["windows_fixes"] = {"status": "SUCCESS", "message": "Windows-Fixes geladen"}
else:
results["windows_fixes"] = {"status": "WARNING", "message": "Windows-Fixes verfügbar aber nicht aktiv"}
else:
results["windows_fixes"] = {"status": "SKIPPED", "message": "Nicht Windows-System"}
except Exception as e:
results["windows_fixes"] = {"status": "WARNING", "message": f"Windows-Fixes-Fehler: {str(e)}"}
# Test 5: Logging-System
try:
logger = get_logger("test")
logger.info("Test-Log-Nachricht")
results["logging_system"] = {"status": "SUCCESS", "message": "Logging-System funktional"}
except Exception as e:
results["logging_system"] = {"status": "FAILED", "message": f"Logging-Fehler: {str(e)}"}
# Test 6: Queue Manager
try:
from utils.queue_manager import get_queue_manager
queue_manager = get_queue_manager()
if queue_manager:
status = queue_manager.get_queue_status()
results["queue_manager"] = {
"status": "SUCCESS",
"message": f"Queue Manager aktiv - Status: {len(status)} Warteschlangen"
}
else:
results["queue_manager"] = {"status": "WARNING", "message": "Queue Manager nicht initialisiert"}
except Exception as e:
results["queue_manager"] = {"status": "WARNING", "message": f"Queue Manager-Fehler: {str(e)}"}
# Test 7: Job Scheduler
try:
from utils.job_scheduler import get_job_scheduler
scheduler = get_job_scheduler()
if scheduler:
results["job_scheduler"] = {"status": "SUCCESS", "message": "Job Scheduler verfügbar"}
else:
results["job_scheduler"] = {"status": "WARNING", "message": "Job Scheduler nicht verfügbar"}
except Exception as e:
results["job_scheduler"] = {"status": "WARNING", "message": f"Job Scheduler-Fehler: {str(e)}"}
return results
def test_api_endpoints():
"""Testet kritische API-Endpunkte"""
results = {}
base_url = "http://localhost:5000"
print("🌐 Teste API-Endpunkte...")
# Test 1: Root-Endpunkt
try:
response = requests.get(f"{base_url}/", timeout=5)
if response.status_code == 200:
results["root_endpoint"] = {"status": "SUCCESS", "message": "Root-Endpunkt erreichbar"}
else:
results["root_endpoint"] = {"status": "FAILED", "message": f"HTTP {response.status_code}"}
except Exception as e:
results["root_endpoint"] = {"status": "FAILED", "message": f"Verbindungsfehler: {str(e)}"}
# Test 2: Login-Seite
try:
response = requests.get(f"{base_url}/auth/login", timeout=5)
if response.status_code == 200:
results["login_page"] = {"status": "SUCCESS", "message": "Login-Seite verfügbar"}
else:
results["login_page"] = {"status": "FAILED", "message": f"HTTP {response.status_code}"}
except Exception as e:
results["login_page"] = {"status": "FAILED", "message": f"Login-Seite-Fehler: {str(e)}"}
# Test 3: API Status (ohne Authentifizierung)
try:
response = requests.get(f"{base_url}/api/kiosk/status", timeout=5)
if response.status_code in [200, 401, 403]: # Diese sind alle erwartete Responses
results["api_status"] = {"status": "SUCCESS", "message": "API grundsätzlich erreichbar"}
else:
results["api_status"] = {"status": "WARNING", "message": f"Unerwarteter HTTP {response.status_code}"}
except Exception as e:
results["api_status"] = {"status": "FAILED", "message": f"API-Status-Fehler: {str(e)}"}
return results
def test_file_structure():
"""Testet die Datei- und Verzeichnisstruktur"""
results = {}
print("📁 Teste Datei- und Verzeichnisstruktur...")
# Kritische Dateien
critical_files = [
"app.py",
"models.py",
"config/settings.py",
"templates/base.html",
"templates/login.html",
"templates/dashboard.html",
"static/css",
"static/js",
"utils/logging_config.py",
"utils/queue_manager.py",
"blueprints/guest.py",
"blueprints/users.py",
"blueprints/calendar.py"
]
missing_files = []
present_files = []
for file_path in critical_files:
if os.path.exists(file_path):
present_files.append(file_path)
else:
missing_files.append(file_path)
if missing_files:
results["file_structure"] = {
"status": "WARNING",
"message": f"Fehlende Dateien: {', '.join(missing_files)}"
}
else:
results["file_structure"] = {
"status": "SUCCESS",
"message": f"Alle {len(present_files)} kritischen Dateien vorhanden"
}
# Verzeichnisse
critical_dirs = ["logs", "database", "uploads", "static", "templates", "utils", "config", "blueprints"]
missing_dirs = []
present_dirs = []
for dir_path in critical_dirs:
if os.path.exists(dir_path) and os.path.isdir(dir_path):
present_dirs.append(dir_path)
else:
missing_dirs.append(dir_path)
if missing_dirs:
results["directory_structure"] = {
"status": "WARNING",
"message": f"Fehlende Verzeichnisse: {', '.join(missing_dirs)}"
}
else:
results["directory_structure"] = {
"status": "SUCCESS",
"message": f"Alle {len(present_dirs)} kritischen Verzeichnisse vorhanden"
}
return results
def test_database_integrity():
"""Testet die Datenbankintegrität"""
results = {}
print("🗄️ Teste Datenbankintegrität...")
try:
from models import User, Printer, Job, Stats, SystemLog, GuestRequest, UserPermission, Notification, get_db_session
db_session = get_db_session()
# Test Tabellen-Existenz
tables_test = {}
models_to_test = [User, Printer, Job, Stats, SystemLog, GuestRequest, UserPermission, Notification]
for model in models_to_test:
try:
count = db_session.query(model).count()
tables_test[model.__tablename__] = {"exists": True, "count": count}
except Exception as e:
tables_test[model.__tablename__] = {"exists": False, "error": str(e)}
existing_tables = sum(1 for t in tables_test.values() if t.get("exists"))
total_tables = len(tables_test)
if existing_tables == total_tables:
results["table_integrity"] = {
"status": "SUCCESS",
"message": f"Alle {total_tables} Tabellen existieren und sind zugänglich"
}
else:
results["table_integrity"] = {
"status": "FAILED",
"message": f"Nur {existing_tables}/{total_tables} Tabellen zugänglich"
}
# Test Datenbank-Constraints
try:
# Teste Foreign Key Constraints
db_session.execute("PRAGMA foreign_key_check")
results["database_constraints"] = {"status": "SUCCESS", "message": "Foreign Key Constraints OK"}
except Exception as e:
results["database_constraints"] = {"status": "WARNING", "message": f"Constraint-Check-Fehler: {str(e)}"}
db_session.close()
except Exception as e:
results["database_integrity"] = {"status": "FAILED", "message": f"DB-Integritätstest fehlgeschlagen: {str(e)}"}
return results
def create_test_data():
"""Erstellt Testdaten falls nötig"""
results = {}
print("🧪 Erstelle Testdaten...")
try:
from models import User, Printer, Job, get_db_session
db_session = get_db_session()
# Teste ob Testdrucker existieren
test_printer = db_session.query(Printer).filter(Printer.name.like("Test%")).first()
if not test_printer:
# Erstelle Test-Drucker
test_printer = Printer(
name="Test Drucker 1",
model="Test Model",
location="Test Labor",
ip_address="192.168.1.100",
mac_address="00:11:22:33:44:55",
plug_ip="192.168.1.101",
plug_username="test_user",
plug_password="test_pass",
status="offline"
)
db_session.add(test_printer)
db_session.commit()
results["test_printer"] = {"status": "SUCCESS", "message": "Test-Drucker erstellt"}
else:
results["test_printer"] = {"status": "SUCCESS", "message": "Test-Drucker bereits vorhanden"}
# Teste ob Testbenutzer existiert
test_user = db_session.query(User).filter(User.username == "testuser").first()
if not test_user:
# Erstelle Test-Benutzer
test_user = User(
username="testuser",
email="test@test.com",
name="Test Benutzer",
role="user"
)
test_user.set_password("testpass")
db_session.add(test_user)
db_session.commit()
results["test_user"] = {"status": "SUCCESS", "message": "Test-Benutzer erstellt"}
else:
results["test_user"] = {"status": "SUCCESS", "message": "Test-Benutzer bereits vorhanden"}
db_session.close()
except Exception as e:
results["test_data_creation"] = {"status": "FAILED", "message": f"Test-Daten-Erstellung fehlgeschlagen: {str(e)}"}
return results
def run_comprehensive_test():
"""Führt alle Tests aus und zeigt Ergebnisse an"""
print("🚀 Starte umfassenden Systemfunktionalitätstest für MYP Platform\n")
print("=" * 70)
all_results = {}
# Interne Komponenten
all_results.update(test_internal_components())
print()
# Datei-/Verzeichnisstruktur
all_results.update(test_file_structure())
print()
# Datenbankintegrität
all_results.update(test_database_integrity())
print()
# Testdaten erstellen
all_results.update(create_test_data())
print()
# API-Endpunkte (nur wenn Server läuft)
all_results.update(test_api_endpoints())
print()
# Ergebnisse zusammenfassen
print("=" * 70)
print("📊 TESTERGEBNISSE ZUSAMMENFASSUNG")
print("=" * 70)
success_count = 0
warning_count = 0
failed_count = 0
skipped_count = 0
for test_name, result in all_results.items():
status = result["status"]
message = result["message"]
if status == "SUCCESS":
print(f"{test_name}: {message}")
success_count += 1
elif status == "WARNING":
print(f"⚠️ {test_name}: {message}")
warning_count += 1
elif status == "FAILED":
print(f"{test_name}: {message}")
failed_count += 1
elif status == "SKIPPED":
print(f"⏭️ {test_name}: {message}")
skipped_count += 1
total_tests = len(all_results)
print("\n" + "=" * 70)
print("📈 STATISTIKEN")
print("=" * 70)
print(f"Gesamt: {total_tests} Tests")
print(f"✅ Erfolgreich: {success_count}")
print(f"⚠️ Warnungen: {warning_count}")
print(f"❌ Fehlgeschlagen: {failed_count}")
print(f"⏭️ Übersprungen: {skipped_count}")
# Empfehlungen
print("\n" + "=" * 70)
print("💡 EMPFEHLUNGEN")
print("=" * 70)
if failed_count == 0 and warning_count <= 2:
print("🎉 System ist voll funktionsfähig!")
print(" Alle kritischen Komponenten arbeiten ordnungsgemäß.")
elif failed_count == 0:
print("✅ System ist grundsätzlich funktionsfähig.")
print(" Einige Warnungen sollten beachtet werden.")
else:
print("⚠️ System hat kritische Probleme.")
print(" Fehlgeschlagene Tests müssen behoben werden.")
# Speichere Ergebnisse in JSON-Datei
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_file = f"test_results_{timestamp}.json"
with open(results_file, "w", encoding="utf-8") as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"summary": {
"total": total_tests,
"success": success_count,
"warnings": warning_count,
"failed": failed_count,
"skipped": skipped_count
},
"detailed_results": all_results
}, f, indent=2, ensure_ascii=False)
print(f"\n📄 Detaillierte Ergebnisse gespeichert in: {results_file}")
return failed_count == 0
if __name__ == "__main__":
success = run_comprehensive_test()
sys.exit(0 if success else 1)