"feat: Introduce test suite for printer functionality in backend"

This commit is contained in:
Till Tomczak 2025-05-29 17:07:42 +02:00
parent 6e8755775c
commit 2177975513
3 changed files with 1701 additions and 159 deletions

View File

@ -4226,7 +4226,301 @@ def force_update_all_printer_status():
@app.route('/api/admin/cache/clear', methods=['POST']) @app.route('/api/admin/cache/clear', methods=['POST'])
@admin_required @admin_required
def clear_admin_cache(): def clear_admin_cache():
logger.error(f"Fehler beim Ablehnen der Gastanfrage {request_id}: {str(e)}") """Löscht den Admin-Cache."""
try:
# Session-Cache für Admin löschen
admin_cache_keys = [
'admin_users_cache',
'admin_printers_cache',
'admin_stats_cache',
'admin_guest_requests_cache'
]
cleared = 0
for key in admin_cache_keys:
if key in session:
del session[key]
cleared += 1
return jsonify({
"success": True,
"message": f"Admin-Cache geleert ({cleared} Einträge)"
})
except Exception as e:
return jsonify({"error": f"Fehler beim Löschen des Admin-Cache: {str(e)}"}), 500
# ===== ADMIN GUEST REQUEST MANAGEMENT APIs =====
@app.route("/api/admin/requests", methods=["GET"])
@admin_required
def admin_get_guest_requests():
"""
Gibt alle Gastanfragen für Admin-Verwaltung zurück.
Unterstützt Filterung und Paginierung.
"""
try:
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
status_filter = request.args.get('status', 'all')
db_session = get_db_session()
# Basis-Query für Gastanfragen
query = db_session.query(GuestRequest)
# Status-Filter anwenden
if status_filter == 'pending':
query = query.filter(GuestRequest.processed_at.is_(None))
elif status_filter == 'approved':
query = query.filter(
GuestRequest.processed_at.isnot(None),
GuestRequest.rejection_reason.is_(None)
)
elif status_filter == 'denied':
query = query.filter(GuestRequest.rejection_reason.isnot(None))
# Sortierung nach neuesten zuerst
query = query.order_by(desc(GuestRequest.created_at))
# Paginierung
paginated_requests = query.paginate(
page=page,
per_page=per_page,
error_out=False
)
requests_data = []
for req in paginated_requests.items:
request_data = {
"id": req.id,
"guest_name": req.guest_name,
"guest_email": req.guest_email,
"company": req.company,
"visit_purpose": req.visit_purpose,
"requested_printer_id": req.requested_printer_id,
"printer_name": req.printer.name if req.printer else "Beliebiger Drucker",
"urgency": req.urgency,
"additional_notes": req.additional_notes,
"created_at": req.created_at.isoformat(),
"processed_at": req.processed_at.isoformat() if req.processed_at else None,
"processed_by": req.processed_by,
"approval_notes": req.approval_notes,
"rejection_reason": req.rejection_reason,
"status": "approved" if req.processed_at and not req.rejection_reason else "denied" if req.rejection_reason else "pending",
"otp_code": req.otp_code,
"otp_used_at": req.otp_used_at.isoformat() if req.otp_used_at else None
}
requests_data.append(request_data)
db_session.close()
return jsonify({
"requests": requests_data,
"pagination": {
"page": page,
"per_page": per_page,
"total": paginated_requests.total,
"pages": paginated_requests.pages,
"has_next": paginated_requests.has_next,
"has_prev": paginated_requests.has_prev
},
"filters": {
"status": status_filter
}
})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Gastanfragen: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500 return jsonify({"error": "Interner Serverfehler"}), 500
# ===== ADMIN API-ROUTEN FÜR BENUTZER UND DRUCKER ===== @app.route("/api/admin/requests/<int:request_id>", methods=["GET"])
@admin_required
def admin_get_guest_request_details(request_id):
"""Gibt detaillierte Informationen zu einer spezifischen Gastanfrage zurück."""
try:
db_session = get_db_session()
guest_request = db_session.query(GuestRequest).get(request_id)
if not guest_request:
db_session.close()
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
# Detaillierte Daten zusammenstellen
request_data = {
"id": guest_request.id,
"guest_name": guest_request.guest_name,
"guest_email": guest_request.guest_email,
"company": guest_request.company,
"visit_purpose": guest_request.visit_purpose,
"requested_printer_id": guest_request.requested_printer_id,
"printer": {
"id": guest_request.printer.id,
"name": guest_request.printer.name,
"location": guest_request.printer.location,
"model": guest_request.printer.model
} if guest_request.printer else None,
"urgency": guest_request.urgency,
"additional_notes": guest_request.additional_notes,
"created_at": guest_request.created_at.isoformat(),
"processed_at": guest_request.processed_at.isoformat() if guest_request.processed_at else None,
"processed_by": guest_request.processed_by,
"approval_notes": guest_request.approval_notes,
"rejection_reason": guest_request.rejection_reason,
"status": "approved" if guest_request.processed_at and not guest_request.rejection_reason else "denied" if guest_request.rejection_reason else "pending",
"otp_code": guest_request.otp_code,
"otp_used_at": guest_request.otp_used_at.isoformat() if guest_request.otp_used_at else None
}
db_session.close()
return jsonify({"request": request_data})
except Exception as e:
app_logger.error(f"Fehler beim Abrufen der Gastanfrage {request_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/requests/<int:request_id>/approve", methods=["POST"])
@admin_required
def admin_approve_guest_request(request_id):
"""Genehmigt eine Gastanfrage und weist optional einen Drucker zu."""
try:
data = request.json or {}
approval_notes = data.get('approval_notes', '')
assigned_printer_id = data.get('assigned_printer_id')
db_session = get_db_session()
guest_request = db_session.query(GuestRequest).get(request_id)
if not guest_request:
db_session.close()
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
if guest_request.processed_at:
db_session.close()
return jsonify({"error": "Gastanfrage wurde bereits bearbeitet"}), 400
# Drucker validieren (falls angegeben)
if assigned_printer_id:
printer = db_session.query(Printer).get(assigned_printer_id)
if not printer:
db_session.close()
return jsonify({"error": "Angegebener Drucker nicht gefunden"}), 400
guest_request.requested_printer_id = assigned_printer_id
# OTP-Code generieren
import random
import string
otp_code = ''.join(random.choices(string.digits, k=6))
# Gastanfrage genehmigen
guest_request.processed_at = datetime.now()
guest_request.processed_by = current_user.id
guest_request.approval_notes = approval_notes
guest_request.otp_code = otp_code
guest_request.rejection_reason = None # Sicherstellen, dass es nicht abgelehnt ist
db_session.commit()
# Benachrichtigung erstellen
notification = Notification(
user_id=current_user.id,
title="Gastanfrage genehmigt",
message=f"Gastanfrage von {guest_request.guest_name} wurde genehmigt. OTP: {otp_code}",
type="success",
created_at=datetime.now()
)
db_session.add(notification)
db_session.commit()
# E-Mail an Gast senden (falls implementiert)
try:
from utils.email_service import send_approval_email
send_approval_email(guest_request.guest_email, guest_request.guest_name, otp_code)
except ImportError:
app_logger.warning("E-Mail-Service nicht verfügbar")
except Exception as email_error:
app_logger.error(f"Fehler beim Senden der Genehmigungs-E-Mail: {str(email_error)}")
db_session.close()
app_logger.info(f"Gastanfrage {request_id} von Admin {current_user.id} genehmigt")
return jsonify({
"success": True,
"message": "Gastanfrage erfolgreich genehmigt",
"otp_code": otp_code,
"request_id": request_id
})
except Exception as e:
app_logger.error(f"Fehler beim Genehmigen der Gastanfrage {request_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
@app.route("/api/requests/<int:request_id>/deny", methods=["POST"])
@admin_required
def admin_deny_guest_request(request_id):
"""Lehnt eine Gastanfrage ab."""
try:
data = request.json or {}
rejection_reason = data.get('rejection_reason', '').strip()
if not rejection_reason:
return jsonify({"error": "Ablehnungsgrund ist erforderlich"}), 400
db_session = get_db_session()
guest_request = db_session.query(GuestRequest).get(request_id)
if not guest_request:
db_session.close()
return jsonify({"error": "Gastanfrage nicht gefunden"}), 404
if guest_request.processed_at:
db_session.close()
return jsonify({"error": "Gastanfrage wurde bereits bearbeitet"}), 400
# Gastanfrage ablehnen
guest_request.processed_at = datetime.now()
guest_request.processed_by = current_user.id
guest_request.rejection_reason = rejection_reason
guest_request.approval_notes = None
guest_request.otp_code = None
db_session.commit()
# Benachrichtigung erstellen
notification = Notification(
user_id=current_user.id,
title="Gastanfrage abgelehnt",
message=f"Gastanfrage von {guest_request.guest_name} wurde abgelehnt: {rejection_reason}",
type="warning",
created_at=datetime.now()
)
db_session.add(notification)
db_session.commit()
# E-Mail an Gast senden (falls implementiert)
try:
from utils.email_service import send_rejection_email
send_rejection_email(guest_request.guest_email, guest_request.guest_name, rejection_reason)
except ImportError:
app_logger.warning("E-Mail-Service nicht verfügbar")
except Exception as email_error:
app_logger.error(f"Fehler beim Senden der Ablehnungs-E-Mail: {str(email_error)}")
db_session.close()
app_logger.info(f"Gastanfrage {request_id} von Admin {current_user.id} abgelehnt")
return jsonify({
"success": True,
"message": "Gastanfrage erfolgreich abgelehnt",
"request_id": request_id
})
except Exception as e:
app_logger.error(f"Fehler beim Ablehnen der Gastanfrage {request_id}: {str(e)}")
return jsonify({"error": "Interner Serverfehler"}), 500
# ===== ENDE ADMIN GUEST REQUEST MANAGEMENT =====
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000, ssl_context=get_ssl_context())

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,287 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Test-Skript für die MYP Platform Implementierung
Testet alle neu implementierten Funktionen:
1. Font Awesome lokale Installation
2. Drucker-Status-Management (Notfall-Abruf)
3. Admin-Gastaufträge-APIs
4. Datenbank-Integrität
"""
import os
import sys
import requests
import json
import sqlite3
from datetime import datetime
from pathlib import Path
# Farben für Ausgabe
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
BOLD = '\033[1m'
END = '\033[0m'
def print_status(message, status="INFO"):
color = Colors.GREEN if status == "SUCCESS" else Colors.RED if status == "ERROR" else Colors.YELLOW if status == "WARNING" else Colors.BLUE
print(f"{color}[{status}]{Colors.END} {message}")
def test_font_awesome_installation():
"""Test 1: Font Awesome lokale Installation"""
print_status("=== TESTE FONT AWESOME INSTALLATION ===", "INFO")
# Überprüfe ob Font Awesome Dateien existieren
base_path = Path(".")
fa_path = base_path / "static" / "fontawesome"
required_files = [
fa_path / "css" / "all.min.css",
fa_path / "webfonts" / "fa-solid-900.woff2",
fa_path / "webfonts" / "fa-regular-400.woff2",
fa_path / "webfonts" / "fa-brands-400.woff2"
]
all_exist = True
for file_path in required_files:
if file_path.exists():
print_status(f"{file_path.name} gefunden", "SUCCESS")
else:
print_status(f"{file_path.name} fehlt", "ERROR")
all_exist = False
# Überprüfe base.html Integration
base_html = base_path / "templates" / "base.html"
if base_html.exists():
with open(base_html, 'r', encoding='utf-8') as f:
content = f.read()
if 'fontawesome/css/all.min.css' in content:
print_status("✓ Font Awesome in base.html eingebunden", "SUCCESS")
else:
print_status("✗ Font Awesome nicht in base.html gefunden", "ERROR")
all_exist = False
else:
print_status("✗ base.html nicht gefunden", "ERROR")
all_exist = False
return all_exist
def test_database_schema():
"""Test 2: Datenbank-Schema für neue Funktionen"""
print_status("=== TESTE DATENBANK-SCHEMA ===", "INFO")
try:
# Verbindung zur Datenbank
db_path = "database.db"
if not os.path.exists(db_path):
print_status("✗ Datenbank nicht gefunden", "ERROR")
return False
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Teste GuestRequest Tabelle
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='guest_requests'")
guest_table = cursor.fetchone()
if guest_table:
print_status("✓ GuestRequest Tabelle existiert", "SUCCESS")
# Überprüfe wichtige Spalten
required_columns = [
'processed_by', 'processed_at', 'approval_notes',
'rejection_reason', 'otp_code', 'otp_used_at'
]
table_sql = guest_table[0].lower()
missing_columns = []
for col in required_columns:
if col.lower() not in table_sql:
missing_columns.append(col)
if missing_columns:
print_status(f"✗ Fehlende Spalten: {', '.join(missing_columns)}", "ERROR")
conn.close()
return False
else:
print_status("✓ Alle erforderlichen Spalten vorhanden", "SUCCESS")
else:
print_status("✗ GuestRequest Tabelle nicht gefunden", "ERROR")
conn.close()
return False
# Teste Printer Tabelle
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='printers'")
printer_table = cursor.fetchone()
if printer_table:
print_status("✓ Printer Tabelle existiert", "SUCCESS")
# Überprüfe Status-Spalten
table_sql = printer_table[0].lower()
if 'last_checked' in table_sql and 'status' in table_sql:
print_status("✓ Drucker-Status-Spalten vorhanden", "SUCCESS")
else:
print_status("✗ Drucker-Status-Spalten fehlen", "ERROR")
conn.close()
return False
else:
print_status("✗ Printer Tabelle nicht gefunden", "ERROR")
conn.close()
return False
conn.close()
return True
except Exception as e:
print_status(f"✗ Datenbankfehler: {str(e)}", "ERROR")
return False
def test_api_endpoints():
"""Test 3: Neue API-Endpunkte"""
print_status("=== TESTE API-ENDPUNKTE ===", "INFO")
base_url = "http://localhost:5000"
# Test-Endpunkte (ohne Authentifizierung)
test_endpoints = [
"/api/printers/status/emergency",
"/api/admin/requests",
"/api/printers/status/summary"
]
available_endpoints = []
for endpoint in test_endpoints:
try:
response = requests.get(f"{base_url}{endpoint}", timeout=5)
# 401/403 ist OK (Authentifizierung erforderlich), 404 ist nicht OK
if response.status_code in [200, 401, 403]:
print_status(f"{endpoint} - Endpunkt verfügbar (HTTP {response.status_code})", "SUCCESS")
available_endpoints.append(endpoint)
elif response.status_code == 404:
print_status(f"{endpoint} - Endpunkt nicht gefunden (HTTP 404)", "ERROR")
else:
print_status(f"{endpoint} - Unerwarteter Status (HTTP {response.status_code})", "WARNING")
available_endpoints.append(endpoint)
except requests.exceptions.ConnectionError:
print_status(f"{endpoint} - Server nicht erreichbar", "WARNING")
except Exception as e:
print_status(f"{endpoint} - Fehler: {str(e)}", "ERROR")
return len(available_endpoints) > 0
def test_template_integration():
"""Test 4: Template-Integration"""
print_status("=== TESTE TEMPLATE-INTEGRATION ===", "INFO")
templates_to_check = [
("templates/admin.html", "admin_guest_requests"),
("templates/admin_guest_requests.html", "fas fa-"),
("templates/base.html", "fontawesome/css/all.min.css")
]
all_ok = True
for template_path, search_term in templates_to_check:
if os.path.exists(template_path):
with open(template_path, 'r', encoding='utf-8') as f:
content = f.read()
if search_term in content:
print_status(f"{template_path} - {search_term} gefunden", "SUCCESS")
else:
print_status(f"{template_path} - {search_term} nicht gefunden", "ERROR")
all_ok = False
else:
print_status(f"{template_path} - Datei nicht gefunden", "ERROR")
all_ok = False
return all_ok
def test_app_imports():
"""Test 5: App.py Importe und Funktionen"""
print_status("=== TESTE APP.PY INTEGRATION ===", "INFO")
if not os.path.exists("app.py"):
print_status("✗ app.py nicht gefunden", "ERROR")
return False
with open("app.py", 'r', encoding='utf-8') as f:
content = f.read()
required_functions = [
"get_printers_status_emergency",
"force_update_all_printer_status",
"admin_get_guest_requests",
"admin_approve_guest_request",
"admin_deny_guest_request"
]
missing_functions = []
for func in required_functions:
if f"def {func}" in content:
print_status(f"✓ Funktion {func} gefunden", "SUCCESS")
else:
print_status(f"✗ Funktion {func} fehlt", "ERROR")
missing_functions.append(func)
# Teste Import-Statements
required_imports = ["desc", "GuestRequest", "Notification"]
missing_imports = []
for imp in required_imports:
if imp in content:
print_status(f"✓ Import {imp} gefunden", "SUCCESS")
else:
print_status(f"✗ Import {imp} fehlt", "ERROR")
missing_imports.append(imp)
return len(missing_functions) == 0 and len(missing_imports) == 0
def run_comprehensive_test():
"""Führt alle Tests aus"""
print_status("MYP PLATFORM - IMPLEMENTATION TEST", "INFO")
print_status("=" * 50, "INFO")
test_results = {
"Font Awesome Installation": test_font_awesome_installation(),
"Datenbank Schema": test_database_schema(),
"Template Integration": test_template_integration(),
"App.py Integration": test_app_imports(),
"API Endpunkte": test_api_endpoints()
}
print_status("\n=== ZUSAMMENFASSUNG ===", "INFO")
passed = 0
total = len(test_results)
for test_name, result in test_results.items():
status = "SUCCESS" if result else "ERROR"
print_status(f"{test_name}: {'BESTANDEN' if result else 'FEHLGESCHLAGEN'}", status)
if result:
passed += 1
print_status(f"\nErgebnis: {passed}/{total} Tests bestanden", "SUCCESS" if passed == total else "WARNING")
if passed == total:
print_status("\n🎉 ALLE TESTS BESTANDEN! Die Implementierung ist vollständig.", "SUCCESS")
print_status("Folgende Funktionen sind jetzt verfügbar:", "INFO")
print_status("• Font Awesome Icons (lokal installiert)", "INFO")
print_status("• Erweiterte Drucker-Status-Verwaltung", "INFO")
print_status("• Admin-Gastaufträge-Verwaltung", "INFO")
print_status("• Robuste Datenbank-Integration", "INFO")
print_status("• Notfall-Drucker-Status-Abruf", "INFO")
else:
print_status(f"\n⚠️ {total - passed} Test(s) fehlgeschlagen. Bitte überprüfen Sie die Implementierung.", "WARNING")
return passed == total
if __name__ == "__main__":
success = run_comprehensive_test()
sys.exit(0 if success else 1)