📝 'feat': Renamed and moved various documentation files to '/docs/' directory

This commit is contained in:
2025-05-29 15:46:25 +02:00
parent 1c466b199a
commit 0e3f316a88
25 changed files with 990 additions and 165 deletions

View File

@@ -0,0 +1,178 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Skript zum Hinzufügen von Testdruckern zur Datenbank
"""
import sys
import os
from datetime import datetime
# Füge das Anwendungsverzeichnis zum Python-Pfad hinzu
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from models import get_db_session, Printer
def add_test_printers():
"""Fügt Testdrucker zur Datenbank hinzu"""
test_printers = [
{
"name": "Prusa i3 MK3S+",
"model": "Prusa i3 MK3S+",
"location": "Labor A - Arbeitsplatz 1",
"mac_address": "AA:BB:CC:DD:EE:01",
"plug_ip": "192.168.1.101",
"status": "available",
"active": True
},
{
"name": "Ender 3 V2",
"model": "Creality Ender 3 V2",
"location": "Labor A - Arbeitsplatz 2",
"mac_address": "AA:BB:CC:DD:EE:02",
"plug_ip": "192.168.1.102",
"status": "available",
"active": True
},
{
"name": "Ultimaker S3",
"model": "Ultimaker S3",
"location": "Labor B - Arbeitsplatz 1",
"mac_address": "AA:BB:CC:DD:EE:03",
"plug_ip": "192.168.1.103",
"status": "available",
"active": True
},
{
"name": "Bambu Lab X1 Carbon",
"model": "Bambu Lab X1 Carbon",
"location": "Labor B - Arbeitsplatz 2",
"mac_address": "AA:BB:CC:DD:EE:04",
"plug_ip": "192.168.1.104",
"status": "available",
"active": True
},
{
"name": "Formlabs Form 3",
"model": "Formlabs Form 3",
"location": "Labor C - Harz-Bereich",
"mac_address": "AA:BB:CC:DD:EE:05",
"plug_ip": "192.168.1.105",
"status": "offline",
"active": False
}
]
db_session = get_db_session()
try:
added_count = 0
for printer_data in test_printers:
# Prüfen, ob Drucker bereits existiert
existing = db_session.query(Printer).filter(
Printer.name == printer_data["name"]
).first()
if existing:
print(f"⚠️ Drucker '{printer_data['name']}' existiert bereits - überspringe")
continue
# Neuen Drucker erstellen
new_printer = Printer(
name=printer_data["name"],
model=printer_data["model"],
location=printer_data["location"],
mac_address=printer_data["mac_address"],
plug_ip=printer_data["plug_ip"],
status=printer_data["status"],
active=printer_data["active"],
created_at=datetime.now()
)
db_session.add(new_printer)
added_count += 1
print(f"✅ Drucker '{printer_data['name']}' hinzugefügt")
if added_count > 0:
db_session.commit()
print(f"\n🎉 {added_count} Testdrucker erfolgreich zur Datenbank hinzugefügt!")
else:
print("\n📋 Alle Testdrucker existieren bereits in der Datenbank")
# Zeige alle Drucker in der Datenbank
all_printers = db_session.query(Printer).all()
print(f"\n📊 Gesamt {len(all_printers)} Drucker in der Datenbank:")
print("-" * 80)
print(f"{'ID':<4} {'Name':<20} {'Modell':<20} {'Status':<12} {'Aktiv':<6}")
print("-" * 80)
for printer in all_printers:
active_str = "" if printer.active else ""
print(f"{printer.id:<4} {printer.name[:19]:<20} {(printer.model or 'Unbekannt')[:19]:<20} {printer.status:<12} {active_str:<6}")
db_session.close()
except Exception as e:
db_session.rollback()
db_session.close()
print(f"❌ Fehler beim Hinzufügen der Testdrucker: {str(e)}")
return False
return True
def remove_test_printers():
"""Entfernt alle Testdrucker aus der Datenbank"""
test_printer_names = [
"Prusa i3 MK3S+",
"Ender 3 V2",
"Ultimaker S3",
"Bambu Lab X1 Carbon",
"Formlabs Form 3"
]
db_session = get_db_session()
try:
removed_count = 0
for name in test_printer_names:
printer = db_session.query(Printer).filter(Printer.name == name).first()
if printer:
db_session.delete(printer)
removed_count += 1
print(f"🗑️ Drucker '{name}' entfernt")
if removed_count > 0:
db_session.commit()
print(f"\n🧹 {removed_count} Testdrucker erfolgreich entfernt!")
else:
print("\n📋 Keine Testdrucker zum Entfernen gefunden")
db_session.close()
except Exception as e:
db_session.rollback()
db_session.close()
print(f"❌ Fehler beim Entfernen der Testdrucker: {str(e)}")
return False
return True
if __name__ == "__main__":
print("=== MYP Druckerverwaltung - Testdrucker-Verwaltung ===")
print()
if len(sys.argv) > 1 and sys.argv[1] == "--remove":
print("Entferne Testdrucker...")
remove_test_printers()
else:
print("Füge Testdrucker hinzu...")
print("(Verwende --remove um Testdrucker zu entfernen)")
print()
add_test_printers()
print("\nFertig! 🚀")

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
Umfassendes Datenbank-Schema-Migrationsskript
Erkennt und fügt alle fehlenden Spalten basierend auf den Models hinzu.
"""
import os
import sys
import sqlite3
from datetime import datetime
import logging
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config.settings import DATABASE_PATH
from utils.logging_config import get_logger
logger = get_logger("schema_migration")
def get_table_columns(cursor, table_name):
"""Ermittelt alle Spalten einer Tabelle."""
cursor.execute(f"PRAGMA table_info({table_name})")
return {row[1]: row[2] for row in cursor.fetchall()} # {column_name: column_type}
def get_table_exists(cursor, table_name):
"""Prüft, ob eine Tabelle existiert."""
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def migrate_users_table(cursor):
"""Migriert die users Tabelle für fehlende Spalten."""
logger.info("Migriere users Tabelle...")
if not get_table_exists(cursor, 'users'):
logger.warning("users Tabelle existiert nicht - wird bei init_db erstellt")
return False
existing_columns = get_table_columns(cursor, 'users')
# Definition der erwarteten Spalten
required_columns = {
'id': 'INTEGER PRIMARY KEY',
'email': 'VARCHAR(120) UNIQUE NOT NULL',
'username': 'VARCHAR(100) UNIQUE NOT NULL',
'password_hash': 'VARCHAR(128) NOT NULL',
'name': 'VARCHAR(100) NOT NULL',
'role': 'VARCHAR(20) DEFAULT "user"',
'active': 'BOOLEAN DEFAULT 1',
'created_at': 'DATETIME DEFAULT CURRENT_TIMESTAMP',
'last_login': 'DATETIME',
'updated_at': 'DATETIME DEFAULT CURRENT_TIMESTAMP',
'settings': 'TEXT',
'department': 'VARCHAR(100)',
'position': 'VARCHAR(100)',
'phone': 'VARCHAR(50)',
'bio': 'TEXT'
}
migrations_performed = []
for column_name, column_def in required_columns.items():
if column_name not in existing_columns:
try:
# Spezielle Behandlung für updated_at mit Trigger
if column_name == 'updated_at':
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} DATETIME DEFAULT CURRENT_TIMESTAMP")
# Trigger für automatische Aktualisierung
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS update_users_updated_at
AFTER UPDATE ON users
BEGIN
UPDATE users SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id;
END
""")
logger.info(f"Spalte '{column_name}' hinzugefügt mit Auto-Update-Trigger")
else:
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} {column_def}")
logger.info(f"Spalte '{column_name}' hinzugefügt")
migrations_performed.append(column_name)
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Spalte '{column_name}': {str(e)}")
return len(migrations_performed) > 0
def migrate_printers_table(cursor):
"""Migriert die printers Tabelle für fehlende Spalten."""
logger.info("Migriere printers Tabelle...")
if not get_table_exists(cursor, 'printers'):
logger.warning("printers Tabelle existiert nicht - wird bei init_db erstellt")
return False
existing_columns = get_table_columns(cursor, 'printers')
required_columns = {
'id': 'INTEGER PRIMARY KEY',
'name': 'VARCHAR(100) NOT NULL',
'model': 'VARCHAR(100)',
'location': 'VARCHAR(100)',
'ip_address': 'VARCHAR(50)',
'mac_address': 'VARCHAR(50) NOT NULL UNIQUE',
'plug_ip': 'VARCHAR(50) NOT NULL',
'plug_username': 'VARCHAR(100) NOT NULL',
'plug_password': 'VARCHAR(100) NOT NULL',
'status': 'VARCHAR(20) DEFAULT "offline"',
'active': 'BOOLEAN DEFAULT 1',
'created_at': 'DATETIME DEFAULT CURRENT_TIMESTAMP',
'last_checked': 'DATETIME'
}
migrations_performed = []
for column_name, column_def in required_columns.items():
if column_name not in existing_columns:
try:
cursor.execute(f"ALTER TABLE printers ADD COLUMN {column_name} {column_def}")
logger.info(f"Spalte '{column_name}' zu printers hinzugefügt")
migrations_performed.append(column_name)
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Spalte '{column_name}' zu printers: {str(e)}")
return len(migrations_performed) > 0
def migrate_jobs_table(cursor):
"""Migriert die jobs Tabelle für fehlende Spalten."""
logger.info("Migriere jobs Tabelle...")
if not get_table_exists(cursor, 'jobs'):
logger.warning("jobs Tabelle existiert nicht - wird bei init_db erstellt")
return False
existing_columns = get_table_columns(cursor, 'jobs')
required_columns = {
'id': 'INTEGER PRIMARY KEY',
'name': 'VARCHAR(200) NOT NULL',
'description': 'VARCHAR(500)',
'user_id': 'INTEGER NOT NULL',
'printer_id': 'INTEGER NOT NULL',
'start_at': 'DATETIME',
'end_at': 'DATETIME',
'actual_end_time': 'DATETIME',
'status': 'VARCHAR(20) DEFAULT "scheduled"',
'created_at': 'DATETIME DEFAULT CURRENT_TIMESTAMP',
'notes': 'VARCHAR(500)',
'material_used': 'FLOAT',
'file_path': 'VARCHAR(500)',
'owner_id': 'INTEGER',
'duration_minutes': 'INTEGER NOT NULL'
}
migrations_performed = []
for column_name, column_def in required_columns.items():
if column_name not in existing_columns:
try:
cursor.execute(f"ALTER TABLE jobs ADD COLUMN {column_name} {column_def}")
logger.info(f"Spalte '{column_name}' zu jobs hinzugefügt")
migrations_performed.append(column_name)
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Spalte '{column_name}' zu jobs: {str(e)}")
return len(migrations_performed) > 0
def migrate_guest_requests_table(cursor):
"""Migriert die guest_requests Tabelle für fehlende Spalten."""
logger.info("Migriere guest_requests Tabelle...")
if not get_table_exists(cursor, 'guest_requests'):
logger.warning("guest_requests Tabelle existiert nicht - wird bei init_db erstellt")
return False
existing_columns = get_table_columns(cursor, 'guest_requests')
required_columns = {
'processed_by': 'INTEGER',
'processed_at': 'DATETIME',
'approval_notes': 'TEXT',
'rejection_reason': 'TEXT',
'otp_used_at': 'DATETIME'
}
migrations_performed = []
for column_name, column_def in required_columns.items():
if column_name not in existing_columns:
try:
cursor.execute(f"ALTER TABLE guest_requests ADD COLUMN {column_name} {column_def}")
logger.info(f"Spalte '{column_name}' zu guest_requests hinzugefügt")
migrations_performed.append(column_name)
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Spalte '{column_name}' zu guest_requests: {str(e)}")
return len(migrations_performed) > 0
def create_missing_tables(cursor):
"""Erstellt fehlende Tabellen."""
logger.info("Prüfe auf fehlende Tabellen...")
# user_permissions Tabelle
if not get_table_exists(cursor, 'user_permissions'):
cursor.execute("""
CREATE TABLE user_permissions (
user_id INTEGER PRIMARY KEY,
can_start_jobs BOOLEAN DEFAULT 0,
needs_approval BOOLEAN DEFAULT 1,
can_approve_jobs BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
logger.info("Tabelle 'user_permissions' erstellt")
# notifications Tabelle
if not get_table_exists(cursor, 'notifications'):
cursor.execute("""
CREATE TABLE notifications (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
type VARCHAR(50) NOT NULL,
payload TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
read BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
logger.info("Tabelle 'notifications' erstellt")
# stats Tabelle
if not get_table_exists(cursor, 'stats'):
cursor.execute("""
CREATE TABLE stats (
id INTEGER PRIMARY KEY,
total_print_time INTEGER DEFAULT 0,
total_jobs_completed INTEGER DEFAULT 0,
total_material_used FLOAT DEFAULT 0.0,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
logger.info("Tabelle 'stats' erstellt")
# system_logs Tabelle
if not get_table_exists(cursor, 'system_logs'):
cursor.execute("""
CREATE TABLE system_logs (
id INTEGER PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
level VARCHAR(20) NOT NULL,
message VARCHAR(1000) NOT NULL,
module VARCHAR(100),
user_id INTEGER,
ip_address VARCHAR(50),
user_agent VARCHAR(500),
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
logger.info("Tabelle 'system_logs' erstellt")
def optimize_database(cursor):
"""Führt Datenbankoptimierungen durch."""
logger.info("Führe Datenbankoptimierungen durch...")
try:
# Indices für bessere Performance
cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_jobs_user_id ON jobs(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_jobs_printer_id ON jobs(printer_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notifications(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_system_logs_timestamp ON system_logs(timestamp)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_guest_requests_status ON guest_requests(status)")
# Statistiken aktualisieren
cursor.execute("ANALYZE")
logger.info("Datenbankoptimierungen abgeschlossen")
except Exception as e:
logger.error(f"Fehler bei Datenbankoptimierungen: {str(e)}")
def main():
"""Führt die komplette Schema-Migration aus."""
try:
logger.info("Starte umfassende Datenbank-Schema-Migration...")
# Verbindung zur Datenbank
if not os.path.exists(DATABASE_PATH):
logger.error(f"Datenbankdatei nicht gefunden: {DATABASE_PATH}")
# Erste Initialisierung
from models import init_database
logger.info("Führe Erstinitialisierung durch...")
init_database()
logger.info("Erstinitialisierung abgeschlossen")
return
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# WAL-Modus aktivieren für bessere Concurrent-Performance
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA foreign_keys=ON")
logger.info(f"Verbunden mit Datenbank: {DATABASE_PATH}")
# Backup erstellen
backup_path = f"{DATABASE_PATH}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
cursor.execute(f"VACUUM INTO '{backup_path}'")
logger.info(f"Backup erstellt: {backup_path}")
# Migrationen durchführen
migrations_performed = []
# Fehlende Tabellen erstellen
create_missing_tables(cursor)
migrations_performed.append("missing_tables")
# Tabellen-spezifische Migrationen
if migrate_users_table(cursor):
migrations_performed.append("users")
if migrate_printers_table(cursor):
migrations_performed.append("printers")
if migrate_jobs_table(cursor):
migrations_performed.append("jobs")
if migrate_guest_requests_table(cursor):
migrations_performed.append("guest_requests")
# Optimierungen
optimize_database(cursor)
migrations_performed.append("optimizations")
# Änderungen speichern
conn.commit()
conn.close()
logger.info(f"Schema-Migration erfolgreich abgeschlossen. Migrierte Bereiche: {', '.join(migrations_performed)}")
# Test der Migration
test_migration()
except Exception as e:
logger.error(f"Fehler bei der Schema-Migration: {str(e)}")
if 'conn' in locals():
conn.rollback()
conn.close()
sys.exit(1)
def test_migration():
"""Testet die Migration durch Laden der Models."""
try:
logger.info("Teste Migration durch Laden der Models...")
# Models importieren und testen
from models import get_cached_session, User, Printer, Job
with get_cached_session() as session:
# Test User-Query (sollte das updated_at Problem lösen)
users = session.query(User).limit(1).all()
logger.info(f"User-Abfrage erfolgreich - {len(users)} Benutzer gefunden")
# Test Printer-Query
printers = session.query(Printer).limit(1).all()
logger.info(f"Printer-Abfrage erfolgreich - {len(printers)} Drucker gefunden")
# Test Job-Query
jobs = session.query(Job).limit(1).all()
logger.info(f"Job-Abfrage erfolgreich - {len(jobs)} Jobs gefunden")
logger.info("Migrations-Test erfolgreich abgeschlossen")
except Exception as e:
logger.error(f"Fehler beim Migrations-Test: {str(e)}")
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,742 @@
#!/usr/bin/env python3
"""
MYP Debug CLI
Kommandozeilen-Tool für Diagnose und Debugging der MYP-Anwendung
"""
import os
import sys
import argparse
import time
import json
import importlib
import logging
import sqlite3
from datetime import datetime
import traceback
from pprint import pprint
# Eigene Module importieren
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Farbige Ausgabe für die Konsole
COLORS = {
'RESET': '\033[0m',
'BOLD': '\033[1m',
'RED': '\033[31m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'BLUE': '\033[34m',
'MAGENTA': '\033[35m',
'CYAN': '\033[36m',
}
# Emojis für verschiedene Log-Level und Kategorien
LOG_EMOJIS = {
'DEBUG': '🔍',
'INFO': '',
'WARNING': '⚠️',
'ERROR': '',
'CRITICAL': '🔥',
'SUCCESS': '',
'DATABASE': '💾',
'NETWORK': '🌐',
'SYSTEM': '💻',
'PRINTER': '🖨️',
'API': '📡',
'USER': '👤'
}
# Prüfen, ob das Terminal Farben unterstützt
def supports_color():
"""Prüft, ob das Terminal Farben unterstützt."""
if os.name == 'nt':
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# Aktiviere VT100-Unterstützung unter Windows
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
return True
except:
return False
else:
return sys.stdout.isatty()
USE_COLOR = supports_color()
def colorize(text, color):
"""Färbt den Text ein, wenn Farben unterstützt werden."""
if USE_COLOR and color in COLORS:
return f"{COLORS[color]}{text}{COLORS['RESET']}"
return text
def print_success(message):
print(f"{LOG_EMOJIS['SUCCESS']} {colorize(message, 'GREEN')}")
def print_error(message):
print(f"{LOG_EMOJIS['ERROR']} {colorize(message, 'RED')}")
def print_warning(message):
print(f"{LOG_EMOJIS['WARNING']} {colorize(message, 'YELLOW')}")
def print_info(message):
print(f"{LOG_EMOJIS['INFO']} {colorize(message, 'BLUE')}")
def print_debug(message):
print(f"{LOG_EMOJIS['DEBUG']} {colorize(message, 'CYAN')}")
def print_database(message):
print(f"{LOG_EMOJIS['DATABASE']} {colorize(message, 'MAGENTA')}")
def print_network(message):
print(f"{LOG_EMOJIS['NETWORK']} {colorize(message, 'CYAN')}")
def print_system(message):
print(f"{LOG_EMOJIS['SYSTEM']} {colorize(message, 'BLUE')}")
def print_printer(message):
print(f"{LOG_EMOJIS['PRINTER']} {colorize(message, 'GREEN')}")
def print_header(message):
print(f"\n{colorize('='*80, 'BOLD')}")
print(f"{colorize(message.center(80), 'BOLD')}")
print(f"{colorize('='*80, 'BOLD')}\n")
def print_section(message):
print(f"\n{colorize('-'*40, 'BOLD')}")
print(f"{colorize(message, 'BOLD')}")
print(f"{colorize('-'*40, 'BOLD')}\n")
# Hilfsfunktionen
def get_database_path():
"""Gibt den Pfad zur Datenbank zurück."""
try:
from config.settings import DATABASE_PATH
return DATABASE_PATH
except ImportError:
# Fallback auf Standard-Pfad
base_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_dir, "database", "myp.db")
def check_database():
"""Prüft den Zustand der Datenbank."""
db_path = get_database_path()
if not os.path.exists(db_path):
print_error(f"Datenbank nicht gefunden: {db_path}")
return False
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Tabellen auflisten
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print_database(f"Datenbank gefunden: {db_path}")
print_database(f"Größe: {os.path.getsize(db_path) / (1024*1024):.2f} MB")
print_database(f"Tabellen ({len(tables)}):")
for table in tables:
# Anzahl der Datensätze pro Tabelle
cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
count = cursor.fetchone()[0]
print(f" 📋 {table[0]}: {count} Einträge")
conn.close()
return True
except sqlite3.Error as e:
print_error(f"Datenbankfehler: {e}")
return False
except Exception as e:
print_error(f"Fehler beim Prüfen der Datenbank: {e}")
return False
def check_log_files():
"""Prüft die Log-Dateien und zeigt die neuesten Einträge an."""
try:
from config.settings import LOG_DIR, LOG_SUBDIRS
if not os.path.exists(LOG_DIR):
print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}")
return False
print_info(f"Log-Verzeichnis: {LOG_DIR}")
for subdir in LOG_SUBDIRS:
log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log")
if not os.path.exists(log_path):
print_warning(f"Log-Datei nicht gefunden: {log_path}")
continue
size = os.path.getsize(log_path) / 1024 # KB
print_info(f"Log-Datei: {subdir}.log ({size:.1f} KB)")
# Letzte Zeilen anzeigen
try:
with open(log_path, 'r') as f:
lines = f.readlines()
last_lines = lines[-5:] # Letzte 5 Zeilen
print(" Letzte Einträge:")
for line in last_lines:
line = line.strip()
# Farbliche Hervorhebung je nach Log-Level
if "ERROR" in line:
print(f" {colorize(line, 'RED')}")
elif "WARNING" in line:
print(f" {colorize(line, 'YELLOW')}")
elif "INFO" in line:
print(f" {colorize(line, 'GREEN')}")
elif "DEBUG" in line:
print(f" {colorize(line, 'CYAN')}")
else:
print(f" {line}")
except Exception as e:
print_warning(f" Fehler beim Lesen der Log-Datei: {e}")
return True
except ImportError:
print_error("Konfiguration für Logs nicht gefunden")
return False
except Exception as e:
print_error(f"Fehler beim Prüfen der Log-Dateien: {e}")
return False
def check_environment():
"""Prüft die Umgebungsvariablen und System-Einstellungen."""
print_info("Umgebungsinformationen:")
print(f" Python-Version: {sys.version.split()[0]}")
print(f" Betriebssystem: {os.name} - {sys.platform}")
print(f" Arbeitsverzeichnis: {os.getcwd()}")
print_info("Wichtige Umgebungsvariablen:")
env_vars = [
"FLASK_ENV", "FLASK_DEBUG", "MYP_SSL_ENABLED",
"MYP_SSL_HOSTNAME", "PYTHONPATH"
]
for var in env_vars:
value = os.environ.get(var, "nicht gesetzt")
print(f" {var}: {value}")
try:
# Flask-Konfiguration prüfen
print_info("Flask-Konfiguration:")
from config.settings import FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SSL_ENABLED
print(f" Host: {FLASK_HOST}")
print(f" Port: {FLASK_PORT}")
print(f" Debug-Modus: {FLASK_DEBUG}")
print(f" SSL aktiviert: {SSL_ENABLED}")
# Module prüfen
required_modules = [
'flask', 'sqlalchemy', 'flask_login', 'werkzeug'
]
print_info("Benötigte Module:")
for module in required_modules:
try:
mod = importlib.import_module(module)
version = getattr(mod, '__version__', 'unbekannt')
print(f" {module}: {colorize('OK', 'GREEN')} (Version {version})")
except ImportError:
print(f" {module}: {colorize('FEHLT', 'RED')}")
except ImportError:
print_warning("Flask-Konfiguration konnte nicht geladen werden")
except Exception as e:
print_error(f"Fehler beim Prüfen der Umgebung: {e}")
def scan_printer(ip_address, timeout=5):
"""Scannt einen Drucker und zeigt Informationen an."""
import socket
print_printer(f"Prüfe Drucker mit IP: {ip_address}")
# Ping testen
import subprocess
try:
if os.name == 'nt': # Windows
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address]
else: # Unix/Linux/macOS
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address]
print(f" 🏓 Ping-Test: ", end="")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(colorize("Erreichbar", "GREEN"))
else:
print(colorize("Nicht erreichbar", "RED"))
print(f" 📄 Details: {result.stdout}")
return
except Exception as e:
print(colorize(f"Fehler bei Ping-Test: {e}", "RED"))
# Offene Ports prüfen
common_ports = [80, 443, 8080, 8443, 631, 9100, 9101, 9102]
open_ports = []
print(" 🔍 Port-Scan: ", end="")
for port in common_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip_address, port))
if result == 0:
open_ports.append(port)
sock.close()
if open_ports:
print(colorize(f"Offene Ports: {', '.join(map(str, open_ports))}", "GREEN"))
else:
print(colorize("Keine offenen Ports gefunden", "YELLOW"))
# Drucker-Info über Tapo-API testen (wenn vorhanden)
try:
from PyP100 import PyP110
print(" 🔌 Smart Plug Test: ", end="")
try:
# Standardmäßig Anmeldeinformationen aus der Konfiguration verwenden
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
p110 = PyP110.P110(ip_address, TAPO_USERNAME, TAPO_PASSWORD)
p110.handshake()
p110.login()
device_info = p110.getDeviceInfo()
print(colorize("Verbunden", "GREEN"))
print(f" 📛 Gerätename: {device_info.get('nickname', 'Unbekannt')}")
print(f" ⚡ Status: {'Ein' if device_info.get('device_on', False) else 'Aus'}")
if 'on_time' in device_info:
on_time = device_info['on_time']
print(f" ⏱️ Betriebszeit: {on_time // 60} Minuten, {on_time % 60} Sekunden")
except Exception as e:
print(colorize(f"Fehler: {e}", "RED"))
except ImportError:
print_warning(" PyP100-Modul nicht verfügbar - Smart Plug Test übersprungen")
def check_printers_from_db():
"""Prüft die in der Datenbank gespeicherten Drucker."""
db_path = get_database_path()
if not os.path.exists(db_path):
print_error(f"Datenbank nicht gefunden: {db_path}")
return
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Drucker-Tabelle prüfen
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='printer';")
if not cursor.fetchone():
print_error("Drucker-Tabelle nicht gefunden")
conn.close()
return
# Drucker auslesen
cursor.execute("SELECT * FROM printer;")
printers = cursor.fetchall()
if not printers:
print_warning("Keine Drucker in der Datenbank gefunden")
conn.close()
return
print_info(f"{len(printers)} Drucker gefunden:")
for printer in printers:
status_color = 'GREEN' if printer['status'] == 'online' else 'RED'
print(f" {printer['name']}: {colorize(printer['status'], status_color)}")
print(f" IP: {printer['ip_address']}")
print(f" Plug IP: {printer['plug_ip'] or 'Nicht konfiguriert'}")
# Detaillierteren Status prüfen
if printer['plug_ip']:
ask = input(f" Möchten Sie den Drucker {printer['name']} scannen? (j/n): ")
if ask.lower() in ('j', 'ja', 'y', 'yes'):
scan_printer(printer['plug_ip'])
conn.close()
except Exception as e:
print_error(f"Fehler beim Prüfen der Drucker: {e}")
traceback.print_exc()
def check_flask_routes():
"""Zeigt alle verfügbaren Flask-Routen an."""
try:
# Versuche, die Flask-App zu importieren
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from app import app as flask_app
except ImportError:
print_error("Flask-App konnte nicht importiert werden")
return
# Alle Routen auflisten
print_info("Verfügbare Flask-Routen:")
routes = []
for rule in flask_app.url_map.iter_rules():
routes.append({
'endpoint': rule.endpoint,
'methods': ', '.join(sorted(rule.methods - {'HEAD', 'OPTIONS'})),
'path': rule.rule
})
# Nach Pfad sortieren
routes = sorted(routes, key=lambda x: x['path'])
# Routen anzeigen
for route in routes:
method_color = 'GREEN' if 'GET' in route['methods'] else 'BLUE'
print(f" {colorize(route['methods'], method_color)} {route['path']}")
print(f"{route['endpoint']}")
print_info(f"Insgesamt {len(routes)} Routen gefunden")
except Exception as e:
print_error(f"Fehler beim Abrufen der Flask-Routen: {e}")
traceback.print_exc()
def print_system_info():
"""Zeigt detaillierte Systeminformationen an."""
print_header("Systeminformationen")
print_section("Basisinformationen")
import platform
print(f"Python-Version: {platform.python_version()}")
print(f"Betriebssystem: {platform.system()} {platform.release()}")
print(f"Architektur: {platform.machine()}")
print(f"Prozessor: {platform.processor()}")
print_section("Speicher")
try:
import psutil
vm = psutil.virtual_memory()
print(f"Gesamter Speicher: {vm.total / (1024**3):.1f} GB")
print(f"Verfügbarer Speicher: {vm.available / (1024**3):.1f} GB")
print(f"Speicherauslastung: {vm.percent}%")
disk = psutil.disk_usage('/')
print(f"Festplatte gesamt: {disk.total / (1024**3):.1f} GB")
print(f"Festplatte frei: {disk.free / (1024**3):.1f} GB")
print(f"Festplattenauslastung: {disk.percent}%")
except ImportError:
print_warning("psutil-Modul nicht verfügbar - eingeschränkte Informationen")
print_section("Netzwerk")
try:
import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"Hostname: {hostname}")
print(f"IP-Adresse: {ip_address}")
# Netzwerkschnittstellen
if 'psutil' in sys.modules:
print("Netzwerkschnittstellen:")
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET:
print(f" {name}: {addr.address}")
except Exception as e:
print_warning(f"Fehler beim Abrufen der Netzwerkinformationen: {e}")
def test_logging_system():
"""Testet das verbesserte Logging-System mit allen Features."""
print_header("Logging-System Test")
try:
# Versuche die neuen Logging-Funktionen zu importieren
from utils.logging_config import get_logger, debug_request, debug_response, measure_execution_time
print_success("Neue Logging-Module erfolgreich importiert")
# Test verschiedener Logger
test_loggers = ['app', 'auth', 'jobs', 'printers', 'errors']
print_section("Logger-Tests")
for logger_name in test_loggers:
try:
logger = get_logger(logger_name)
# Test verschiedener Log-Level
logger.debug(f"🔍 Debug-Test für {logger_name}")
logger.info(f" Info-Test für {logger_name}")
logger.warning(f"⚠️ Warning-Test für {logger_name}")
print_success(f"Logger '{logger_name}' funktioniert korrekt")
except Exception as e:
print_error(f"Fehler beim Testen von Logger '{logger_name}': {e}")
# Test Performance-Monitoring
print_section("Performance-Monitoring Test")
@measure_execution_time(logger=get_logger("app"), task_name="Test-Funktion")
def test_function():
"""Eine Test-Funktion für das Performance-Monitoring."""
import time
time.sleep(0.1) # Simuliere etwas Arbeit
return "Test erfolgreich"
result = test_function()
print_success(f"Performance-Monitoring Test: {result}")
# Test der Debug-Utilities
print_section("Debug-Utilities Test")
try:
from utils.debug_utils import debug_dump, debug_trace, memory_usage
# Test debug_dump
test_data = {
"version": "1.0.0",
"features": ["emojis", "colors", "performance-monitoring"],
"status": "active"
}
debug_dump(test_data, "Test-Konfiguration")
# Test memory_usage
memory_info = memory_usage()
print_system(f"Aktueller Speicherverbrauch: {memory_info['rss']:.2f} MB")
print_success("Debug-Utilities funktionieren korrekt")
except ImportError as e:
print_warning(f"Debug-Utilities nicht verfügbar: {e}")
# Zusammenfassung
print_section("Test-Zusammenfassung")
print_success("🎉 Alle Logging-System-Tests erfolgreich abgeschlossen!")
print_info("Features verfügbar:")
print(" ✅ Farbige Log-Ausgaben mit ANSI-Codes")
print(" ✅ Emoji-Integration für bessere Lesbarkeit")
print(" ✅ HTTP-Request/Response-Logging")
print(" ✅ Performance-Monitoring mit Ausführungszeit")
print(" ✅ Cross-Platform-Unterstützung (Windows/Unix)")
print(" ✅ Strukturierte Debug-Informationen")
except ImportError as e:
print_error(f"Logging-Module nicht verfügbar: {e}")
print_warning("Stelle sicher, dass alle Module korrekt installiert sind")
except Exception as e:
print_error(f"Unerwarteter Fehler beim Logging-Test: {e}")
traceback.print_exc()
# Hauptfunktionen für die Befehlszeile
def diagnose():
"""Führt eine umfassende Diagnose durch."""
print_header("MYP Diagnose-Tool")
print_section("Systemprüfung")
check_environment()
print_section("Datenbankprüfung")
check_database()
print_section("Log-Dateien")
check_log_files()
print_success("Diagnose abgeschlossen!")
def scan_printers():
"""Scannt und prüft alle Drucker."""
print_header("Drucker-Scanner")
# Direkter Scan einer IP-Adresse
ip = input("IP-Adresse zum Scannen (leer lassen, um Drucker aus der Datenbank zu prüfen): ")
if ip:
scan_printer(ip)
else:
check_printers_from_db()
def show_routes():
"""Zeigt alle verfügbaren API-Routen an."""
print_header("API-Routen")
check_flask_routes()
def system_info():
"""Zeigt detaillierte Systeminformationen an."""
print_system_info()
def show_logs():
"""Zeigt und analysiert Log-Dateien."""
print_header("Log-Analyse")
try:
from config.settings import LOG_DIR, LOG_SUBDIRS
if not os.path.exists(LOG_DIR):
print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}")
return
print_info(f"Log-Verzeichnis: {LOG_DIR}")
print_info("Verfügbare Logs:")
for i, subdir in enumerate(LOG_SUBDIRS, 1):
log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log")
size = "Nicht gefunden"
if os.path.exists(log_path):
size = f"{os.path.getsize(log_path) / 1024:.1f} KB"
print(f" {i}. {subdir}.log ({size})")
choice = input("\nWelches Log möchten Sie anzeigen? (Nummer oder Name): ")
# Nummer in Namen umwandeln
try:
choice_num = int(choice) - 1
if 0 <= choice_num < len(LOG_SUBDIRS):
choice = LOG_SUBDIRS[choice_num]
except ValueError:
pass
# Prüfen, ob die Wahl gültig ist
if choice not in LOG_SUBDIRS:
print_error(f"Ungültige Auswahl: {choice}")
return
log_path = os.path.join(LOG_DIR, choice, f"{choice}.log")
if not os.path.exists(log_path):
print_error(f"Log-Datei nicht gefunden: {log_path}")
return
# Anzahl der anzuzeigenden Zeilen
lines_count = input("Anzahl der anzuzeigenden Zeilen (Standard: 20): ")
lines_count = int(lines_count) if lines_count.isdigit() else 20
# Filter für bestimmte Log-Level
level_filter = input("Nach Log-Level filtern (INFO, WARNING, ERROR oder leer für alle): ").upper()
# Log-Datei anzeigen
with open(log_path, 'r') as f:
lines = f.readlines()
# Filtern nach Log-Level
if level_filter:
lines = [line for line in lines if level_filter in line]
# Letzte n Zeilen auswählen
lines = lines[-lines_count:]
print_section(f"Log-Datei: {choice}.log (letzte {len(lines)} Einträge)")
for line in lines:
line = line.strip()
# Farbliche Hervorhebung je nach Log-Level
if "ERROR" in line:
print(colorize(line, 'RED'))
elif "WARNING" in line:
print(colorize(line, 'YELLOW'))
elif "INFO" in line:
print(colorize(line, 'GREEN'))
elif "DEBUG" in line:
print(colorize(line, 'CYAN'))
else:
print(line)
except ImportError:
print_error("Konfiguration für Logs nicht gefunden")
except Exception as e:
print_error(f"Fehler beim Anzeigen der Log-Dateien: {e}")
traceback.print_exc()
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="MYP Debug CLI")
subparsers = parser.add_subparsers(dest="command", help="Befehl")
# Diagnose
diag_parser = subparsers.add_parser("diagnose", help="Führt eine umfassende Diagnose durch")
# Drucker scannen
scan_parser = subparsers.add_parser("scan", help="Scannt und prüft alle Drucker")
# Routen anzeigen
routes_parser = subparsers.add_parser("routes", help="Zeigt alle verfügbaren API-Routen an")
# Systeminformationen
sysinfo_parser = subparsers.add_parser("sysinfo", help="Zeigt detaillierte Systeminformationen an")
# Logs anzeigen
logs_parser = subparsers.add_parser("logs", help="Zeigt und analysiert Log-Dateien")
# Logging-System testen
logging_test_parser = subparsers.add_parser("test-logging", help="Testet das verbesserte Logging-System")
return parser.parse_args()
def main():
"""Hauptfunktion."""
args = parse_args()
if args.command == "diagnose":
diagnose()
elif args.command == "scan":
scan_printers()
elif args.command == "routes":
show_routes()
elif args.command == "sysinfo":
system_info()
elif args.command == "logs":
show_logs()
elif args.command == "test-logging":
test_logging_system()
else:
# Interaktives Menü, wenn kein Befehl angegeben wurde
print_header("MYP Debug CLI")
print("Wählen Sie eine Option:")
print(" 1. Diagnose durchführen")
print(" 2. Drucker scannen")
print(" 3. API-Routen anzeigen")
print(" 4. Systeminformationen anzeigen")
print(" 5. Log-Dateien anzeigen")
print(" 6. Logging-System testen")
print(" 0. Beenden")
choice = input("\nIhre Wahl: ")
if choice == "1":
diagnose()
elif choice == "2":
scan_printers()
elif choice == "3":
show_routes()
elif choice == "4":
system_info()
elif choice == "5":
show_logs()
elif choice == "6":
test_logging_system()
elif choice == "0":
print("Auf Wiedersehen!")
sys.exit(0)
else:
print_error("Ungültige Auswahl")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print_info("\nProgramm wurde durch Benutzer abgebrochen")
except Exception as e:
print_error(f"Unerwarteter Fehler: {e}")
traceback.print_exc()

View File

@@ -0,0 +1,314 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Debug-Skript für Druckererkennung
Testet die Druckererkennung und identifiziert Probleme
"""
import sys
import os
import requests
import json
import time
import threading
from datetime import datetime
import sqlite3
import subprocess
import platform
# Füge das Anwendungsverzeichnis zum Python-Pfad hinzu
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def log_message(message, level="INFO"):
"""Logge eine Nachricht mit Zeitstempel"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def test_database_connection():
"""Teste die Datenbankverbindung"""
log_message("Teste Datenbankverbindung...")
try:
# Versuche SQLite-Datenbank zu öffnen
db_files = ['database.db', 'app.db', 'myp.db']
for db_file in db_files:
if os.path.exists(db_file):
log_message(f"Gefundene Datenbankdatei: {db_file}")
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# Prüfe ob Printer-Tabelle existiert
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='printer';")
if cursor.fetchone():
log_message("✅ Printer-Tabelle gefunden")
# Zähle Drucker
cursor.execute("SELECT COUNT(*) FROM printer;")
count = cursor.fetchone()[0]
log_message(f"📊 Anzahl Drucker in Datenbank: {count}")
# Zeige Drucker-Details
cursor.execute("SELECT id, name, plug_ip, status FROM printer;")
printers = cursor.fetchall()
for printer in printers:
log_message(f" Drucker {printer[0]}: {printer[1]} ({printer[2]}) - Status: {printer[3]}")
conn.close()
return True
else:
log_message("❌ Printer-Tabelle nicht gefunden")
conn.close()
log_message("❌ Keine gültige Datenbank gefunden")
return False
except Exception as e:
log_message(f"❌ Datenbankfehler: {str(e)}", "ERROR")
return False
def test_api_endpoints():
"""Teste die API-Endpunkte"""
log_message("Teste API-Endpunkte...")
base_url = "http://localhost:5000"
endpoints = [
"/api/printers",
"/api/printers/status"
]
for endpoint in endpoints:
try:
log_message(f"Teste {endpoint}...")
response = requests.get(f"{base_url}{endpoint}", timeout=10)
log_message(f" Status Code: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
if endpoint == "/api/printers":
if 'printers' in data:
log_message(f"{len(data['printers'])} Drucker geladen")
else:
log_message(f" ⚠️ Unerwartete Antwortstruktur: {list(data.keys())}")
else:
if isinstance(data, list):
log_message(f"{len(data)} Drucker mit Status geladen")
else:
log_message(f" ⚠️ Unerwartete Antwortstruktur: {type(data)}")
except json.JSONDecodeError:
log_message(f" ❌ Ungültige JSON-Antwort", "ERROR")
else:
log_message(f" ❌ HTTP-Fehler: {response.status_code}", "ERROR")
try:
error_data = response.json()
log_message(f" Fehlermeldung: {error_data.get('error', 'Unbekannt')}", "ERROR")
except:
log_message(f" Antwort: {response.text[:200]}", "ERROR")
except requests.exceptions.ConnectionError:
log_message(f" ❌ Verbindung zu {base_url} fehlgeschlagen", "ERROR")
log_message(" Ist die Flask-Anwendung gestartet?", "ERROR")
except requests.exceptions.Timeout:
log_message(f" ❌ Timeout bei {endpoint}", "ERROR")
except Exception as e:
log_message(f" ❌ Fehler: {str(e)}", "ERROR")
def test_network_connectivity():
"""Teste Netzwerkverbindung zu Druckern"""
log_message("Teste Netzwerkverbindung zu Druckern...")
# Lade Drucker aus Datenbank
try:
db_files = ['database.db', 'app.db', 'myp.db']
printers = []
for db_file in db_files:
if os.path.exists(db_file):
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute("SELECT name, plug_ip FROM printer WHERE plug_ip IS NOT NULL;")
printers = cursor.fetchall()
conn.close()
break
if not printers:
log_message("❌ Keine Drucker mit IP-Adressen gefunden")
return
for name, ip in printers:
log_message(f"Teste Verbindung zu {name} ({ip})...")
# Ping-Test
try:
if platform.system().lower() == "windows":
result = subprocess.run(['ping', '-n', '1', '-w', '3000', ip],
capture_output=True, text=True, timeout=5)
else:
result = subprocess.run(['ping', '-c', '1', '-W', '3', ip],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
log_message(f" ✅ Ping erfolgreich")
else:
log_message(f" ❌ Ping fehlgeschlagen")
except subprocess.TimeoutExpired:
log_message(f" ❌ Ping-Timeout")
except Exception as e:
log_message(f" ❌ Ping-Fehler: {str(e)}")
# HTTP-Test (falls Drucker Webinterface hat)
try:
response = requests.get(f"http://{ip}", timeout=3)
log_message(f" ✅ HTTP-Verbindung erfolgreich (Status: {response.status_code})")
except requests.exceptions.Timeout:
log_message(f" ⚠️ HTTP-Timeout (normal für Drucker ohne Webinterface)")
except requests.exceptions.ConnectionError:
log_message(f" ⚠️ HTTP-Verbindung fehlgeschlagen (normal für Drucker ohne Webinterface)")
except Exception as e:
log_message(f" ⚠️ HTTP-Fehler: {str(e)}")
except Exception as e:
log_message(f"❌ Fehler beim Testen der Netzwerkverbindung: {str(e)}", "ERROR")
def test_flask_app_status():
"""Teste den Status der Flask-Anwendung"""
log_message("Teste Flask-Anwendung...")
try:
# Teste Hauptseite
response = requests.get("http://localhost:5000", timeout=5)
if response.status_code == 200:
log_message("✅ Flask-Anwendung läuft")
else:
log_message(f"⚠️ Flask-Anwendung antwortet mit Status {response.status_code}")
except requests.exceptions.ConnectionError:
log_message("❌ Flask-Anwendung nicht erreichbar", "ERROR")
log_message(" Starte die Anwendung mit: python app.py", "INFO")
except Exception as e:
log_message(f"❌ Fehler beim Testen der Flask-Anwendung: {str(e)}", "ERROR")
def test_threading_timeout():
"""Teste die Threading-basierte Timeout-Implementierung"""
log_message("Teste Threading-Timeout-Implementierung...")
def test_function():
"""Simuliere eine langsame Datenbankabfrage"""
time.sleep(2)
return "Erfolgreich"
try:
result = None
timeout_occurred = False
def run_test():
nonlocal result, timeout_occurred
try:
result = test_function()
except Exception as e:
log_message(f"Fehler in Test-Thread: {str(e)}", "ERROR")
timeout_occurred = True
# Starte Test in separatem Thread
thread = threading.Thread(target=run_test)
thread.daemon = True
thread.start()
thread.join(timeout=3) # 3 Sekunden Timeout
if thread.is_alive() or timeout_occurred or result is None:
log_message("❌ Threading-Timeout-Test fehlgeschlagen", "ERROR")
else:
log_message("✅ Threading-Timeout-Implementierung funktioniert")
except Exception as e:
log_message(f"❌ Fehler beim Threading-Test: {str(e)}", "ERROR")
def check_system_requirements():
"""Prüfe Systemanforderungen"""
log_message("Prüfe Systemanforderungen...")
# Python-Version
python_version = sys.version_info
log_message(f"Python-Version: {python_version.major}.{python_version.minor}.{python_version.micro}")
if python_version.major >= 3 and python_version.minor >= 7:
log_message("✅ Python-Version ist kompatibel")
else:
log_message("❌ Python 3.7+ erforderlich", "ERROR")
# Erforderliche Module
required_modules = ['flask', 'requests', 'sqlite3', 'threading']
for module in required_modules:
try:
__import__(module)
log_message(f"✅ Modul {module} verfügbar")
except ImportError:
log_message(f"❌ Modul {module} nicht verfügbar", "ERROR")
# Betriebssystem
os_name = platform.system()
log_message(f"Betriebssystem: {os_name}")
if os_name == "Windows":
log_message("✅ Windows-spezifische Fixes wurden angewendet")
else:
log_message(" Unix-basiertes System erkannt")
def run_comprehensive_test():
"""Führe alle Tests aus"""
log_message("=== MYP Druckerverwaltung - Diagnose-Tool ===")
log_message("Starte umfassende Systemdiagnose...")
print()
# Systemanforderungen prüfen
check_system_requirements()
print()
# Threading-Test
test_threading_timeout()
print()
# Datenbanktest
test_database_connection()
print()
# Flask-App-Test
test_flask_app_status()
print()
# API-Tests
test_api_endpoints()
print()
# Netzwerk-Tests
test_network_connectivity()
print()
log_message("=== Diagnose abgeschlossen ===")
print()
# Empfehlungen
log_message("📋 Empfehlungen:")
log_message("1. Stelle sicher, dass die Flask-Anwendung läuft: python app.py")
log_message("2. Prüfe die Datenbankverbindung und Drucker-Konfiguration")
log_message("3. Teste die Netzwerkverbindung zu den Druckern")
log_message("4. Bei Windows: Threading-basierte Timeouts wurden implementiert")
log_message("5. Überprüfe die Logs in logs/app/ für weitere Details")
if __name__ == "__main__":
try:
run_comprehensive_test()
except KeyboardInterrupt:
log_message("Diagnose durch Benutzer abgebrochen", "INFO")
except Exception as e:
log_message(f"Unerwarteter Fehler: {str(e)}", "ERROR")
import traceback
traceback.print_exc()

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""Entferne problematischen CSRF-Error-Handler aus app.py"""
import re
# Lese die Backup-Datei
with open('app_backup.py', 'r', encoding='utf-8') as f:
content = f.read()
# Entferne den CSRF-Error-Handler-Block
# Suche nach @csrf.error_handler bis zum ersten leeren Zeilen-Block
pattern = r'@csrf\.error_handler.*?(?=\n\n|\n# [A-Z])'
content = re.sub(pattern, '', content, flags=re.DOTALL)
# Entferne auch mögliche doppelte Leerzeilen
content = re.sub(r'\n\n\n+', '\n\n', content)
# Schreibe die bereinigte Version
with open('app.py', 'w', encoding='utf-8') as f:
f.write(content)
print("CSRF-Error-Handler erfolgreich entfernt!")

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Sofortige Datenbank-Reparatur für fehlende updated_at Spalte
"""
import os
import sys
import sqlite3
from datetime import datetime
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config.settings import DATABASE_PATH
def fix_users_table_immediate():
"""Repariert die users Tabelle sofort."""
print(f"Repariere Datenbank: {DATABASE_PATH}")
if not os.path.exists(DATABASE_PATH):
print(f"Datenbankdatei nicht gefunden: {DATABASE_PATH}")
return False
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Prüfen, welche Spalten existieren
cursor.execute("PRAGMA table_info(users)")
existing_columns = [row[1] for row in cursor.fetchall()]
print(f"Vorhandene Spalten in users: {existing_columns}")
# Fehlende Spalten hinzufügen
required_columns = [
('updated_at', 'DATETIME'),
('settings', 'TEXT'),
('department', 'VARCHAR(100)'),
('position', 'VARCHAR(100)'),
('phone', 'VARCHAR(50)'),
('bio', 'TEXT')
]
for column_name, column_type in required_columns:
if column_name not in existing_columns:
try:
if column_name == 'updated_at':
# Einfacher Ansatz: NULL erlauben und später updaten
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} {column_type}")
print(f"✓ Spalte '{column_name}' hinzugefügt")
# Alle vorhandenen Benutzer mit aktuellem Timestamp updaten
cursor.execute(f"UPDATE users SET {column_name} = CURRENT_TIMESTAMP WHERE {column_name} IS NULL")
print(f"✓ Vorhandene Benutzer mit {column_name} aktualisiert")
# Trigger für automatische Updates erstellen
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS update_users_updated_at
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
UPDATE users SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id;
END
""")
print(f"✓ Auto-Update-Trigger für {column_name} erstellt")
else:
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} {column_type}")
print(f"✓ Spalte '{column_name}' hinzugefügt")
except Exception as e:
print(f"✗ Fehler bei Spalte '{column_name}': {str(e)}")
else:
print(f"○ Spalte '{column_name}' bereits vorhanden")
# Weitere fehlende Tabellen prüfen und erstellen
create_missing_tables(cursor)
# Optimierungsindizes erstellen
create_performance_indexes(cursor)
conn.commit()
conn.close()
print("✓ Datenbank-Reparatur erfolgreich abgeschlossen")
return True
except Exception as e:
print(f"✗ Fehler bei der Datenbank-Reparatur: {str(e)}")
if 'conn' in locals():
conn.rollback()
conn.close()
return False
def create_missing_tables(cursor):
"""Erstellt fehlende Tabellen."""
# Prüfen, welche Tabellen existieren
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
existing_tables = [row[0] for row in cursor.fetchall()]
print(f"Vorhandene Tabellen: {existing_tables}")
# user_permissions Tabelle
if 'user_permissions' not in existing_tables:
cursor.execute("""
CREATE TABLE user_permissions (
user_id INTEGER PRIMARY KEY,
can_start_jobs BOOLEAN DEFAULT 0,
needs_approval BOOLEAN DEFAULT 1,
can_approve_jobs BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
print("✓ Tabelle 'user_permissions' erstellt")
# notifications Tabelle
if 'notifications' not in existing_tables:
cursor.execute("""
CREATE TABLE notifications (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
type VARCHAR(50) NOT NULL,
payload TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
read BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
print("✓ Tabelle 'notifications' erstellt")
# stats Tabelle
if 'stats' not in existing_tables:
cursor.execute("""
CREATE TABLE stats (
id INTEGER PRIMARY KEY,
total_print_time INTEGER DEFAULT 0,
total_jobs_completed INTEGER DEFAULT 0,
total_material_used REAL DEFAULT 0.0,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
print("✓ Tabelle 'stats' erstellt")
# Initial stats record erstellen
cursor.execute("""
INSERT INTO stats (total_print_time, total_jobs_completed, total_material_used, last_updated)
VALUES (0, 0, 0.0, CURRENT_TIMESTAMP)
""")
print("✓ Initial-Statistiken erstellt")
# system_logs Tabelle
if 'system_logs' not in existing_tables:
cursor.execute("""
CREATE TABLE system_logs (
id INTEGER PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
level VARCHAR(20) NOT NULL,
message VARCHAR(1000) NOT NULL,
module VARCHAR(100),
user_id INTEGER,
ip_address VARCHAR(50),
user_agent VARCHAR(500),
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
print("✓ Tabelle 'system_logs' erstellt")
def create_performance_indexes(cursor):
"""Erstellt Performance-Indices."""
print("Erstelle Performance-Indices...")
indexes = [
("idx_users_email", "users(email)"),
("idx_users_username", "users(username)"),
("idx_users_role", "users(role)"),
("idx_jobs_user_id", "jobs(user_id)"),
("idx_jobs_printer_id", "jobs(printer_id)"),
("idx_jobs_status", "jobs(status)"),
("idx_jobs_start_at", "jobs(start_at)"),
("idx_notifications_user_id", "notifications(user_id)"),
("idx_notifications_read", "notifications(read)"),
("idx_system_logs_timestamp", "system_logs(timestamp)"),
("idx_system_logs_level", "system_logs(level)"),
("idx_guest_requests_status", "guest_requests(status)"),
("idx_printers_status", "printers(status)"),
("idx_printers_active", "printers(active)")
]
for index_name, index_def in indexes:
try:
cursor.execute(f"CREATE INDEX IF NOT EXISTS {index_name} ON {index_def}")
print(f"✓ Index '{index_name}' erstellt")
except Exception as e:
print(f"○ Index '{index_name}': {str(e)}")
def test_database_access():
"""Testet den Datenbankzugriff nach der Reparatur."""
print("\nTeste Datenbankzugriff...")
try:
# Models importieren und testen
from models import get_cached_session, User, Printer, Job
with get_cached_session() as session:
# Test User-Query
users = session.query(User).limit(5).all()
print(f"✓ User-Abfrage erfolgreich - {len(users)} Benutzer gefunden")
# Test Printer-Query
printers = session.query(Printer).limit(5).all()
print(f"✓ Printer-Abfrage erfolgreich - {len(printers)} Drucker gefunden")
# Test Job-Query
jobs = session.query(Job).limit(5).all()
print(f"✓ Job-Abfrage erfolgreich - {len(jobs)} Jobs gefunden")
print("✓ Alle Datenbank-Tests erfolgreich!")
return True
except Exception as e:
print(f"✗ Datenbank-Test fehlgeschlagen: {str(e)}")
return False
def main():
"""Hauptfunktion für die sofortige Datenbank-Reparatur."""
print("=== SOFORTIGE DATENBANK-REPARATUR ===")
print(f"Zeitstempel: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Datenbank: {DATABASE_PATH}")
print()
# Backup erstellen
if os.path.exists(DATABASE_PATH):
backup_path = f"{DATABASE_PATH}.backup_immediate_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
import shutil
shutil.copy2(DATABASE_PATH, backup_path)
print(f"✓ Backup erstellt: {backup_path}")
except Exception as e:
print(f"⚠ Backup-Erstellung fehlgeschlagen: {str(e)}")
# Reparatur durchführen
if fix_users_table_immediate():
print("\n=== DATENBANK-TEST ===")
if test_database_access():
print("\n🎉 DATENBANK-REPARATUR ERFOLGREICH!")
print("Die Anwendung sollte jetzt funktionieren.")
else:
print("\n❌ DATENBANK-TEST FEHLGESCHLAGEN!")
print("Weitere Diagnose erforderlich.")
else:
print("\n❌ DATENBANK-REPARATUR FEHLGESCHLAGEN!")
print("Manuelle Intervention erforderlich.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python3.11
from models import init_database, create_initial_admin
if __name__ == "__main__":
print("Initialisiere Datenbank...")
init_database()
print("Erstelle initialen Admin-Benutzer...")
success = create_initial_admin(
email="admin@mercedes-benz.com",
password="744563017196A",
name="System Administrator",
username="admin"
)
if success:
print("Admin-Benutzer erfolgreich erstellt.")
print("Login-Daten:")
print(" Benutzername: admin")
print(" Passwort: 744563017196A")
else:
print("Admin-Benutzer konnte nicht erstellt werden (existiert bereits?).")
print("Datenbank-Initialisierung abgeschlossen.")

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""
Datenbank-Migrationsskript für Guest-Requests, UserPermissions und Notifications
"""
import os
import sys
import sqlite3
from datetime import datetime
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from models import init_db, get_cached_session, GuestRequest, UserPermission, Notification, User
from utils.logging_config import get_logger
from config.settings import DATABASE_PATH
logger = get_logger("migrate")
def column_exists(cursor, table_name, column_name):
"""Prüft, ob eine Spalte in einer Tabelle existiert."""
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def get_database_path():
"""Ermittelt den Pfad zur Datenbankdatei."""
# Verwende den korrekten Datenbankpfad aus der Konfiguration
if os.path.exists(DATABASE_PATH):
return DATABASE_PATH
# Fallback für alternative Pfade mit korrektem Dateinamen
alternative_paths = [
os.path.join('database', 'myp.db'),
'myp.db',
'../database/myp.db',
'./database/myp.db',
# Legacy-Pfade für Rückwärtskompatibilität
os.path.join('database', 'app.db'),
'app.db',
'../database/app.db',
'./database/app.db'
]
for path in alternative_paths:
if os.path.exists(path):
return path
# Falls keine Datei gefunden wird, verwende den konfigurierten Pfad
return DATABASE_PATH
def migrate_guest_requests_table():
"""Migriert die guest_requests Tabelle für neue Spalten."""
db_path = get_database_path()
if not os.path.exists(db_path):
logger.warning(f"Datenbankdatei nicht gefunden: {db_path}")
return False
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Prüfen ob otp_used_at Spalte bereits existiert
if not column_exists(cursor, 'guest_requests', 'otp_used_at'):
cursor.execute("""
ALTER TABLE guest_requests
ADD COLUMN otp_used_at DATETIME
""")
logger.info("Spalte 'otp_used_at' zur guest_requests Tabelle hinzugefügt")
else:
logger.info("Spalte 'otp_used_at' existiert bereits")
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"Fehler bei der Migration der guest_requests Tabelle: {str(e)}")
if 'conn' in locals():
conn.rollback()
conn.close()
return False
def main():
"""Führt die Datenbank-Migration aus."""
try:
logger.info("Starte Datenbank-Migration...")
# Datenbank initialisieren (erstellt neue Tabellen)
init_db()
# Spezifische Spalten-Migrationen
logger.info("Führe spezifische Tabellen-Migrationen aus...")
migrate_guest_requests_table()
logger.info("Datenbank-Migration erfolgreich abgeschlossen")
# Testen, ob die neuen Tabellen funktionieren
test_new_tables()
except Exception as e:
logger.error(f"Fehler bei der Datenbank-Migration: {str(e)}")
sys.exit(1)
def test_new_tables():
"""Testet, ob die neuen Tabellen korrekt erstellt wurden."""
try:
with get_cached_session() as session:
# Test der GuestRequest-Tabelle
test_request = GuestRequest(
name="Test User",
email="test@example.com",
reason="Test migration",
duration_min=60
)
session.add(test_request)
session.flush()
# Test der UserPermission-Tabelle (mit Admin-User falls vorhanden)
admin_user = session.query(User).filter_by(role="admin").first()
if admin_user:
# Prüfen, ob bereits Permissions für diesen User existieren
existing_permission = session.query(UserPermission).filter_by(user_id=admin_user.id).first()
if not existing_permission:
permission = UserPermission(
user_id=admin_user.id,
can_start_jobs=True,
needs_approval=False,
can_approve_jobs=True
)
session.add(permission)
session.flush()
logger.info(f"UserPermission für Admin-User {admin_user.id} erstellt")
else:
logger.info(f"UserPermission für Admin-User {admin_user.id} existiert bereits")
# Test der Notification-Tabelle
notification = Notification(
user_id=admin_user.id,
type="test",
payload='{"message": "Test notification"}'
)
session.add(notification)
session.flush()
# Test-Daten wieder löschen
session.rollback()
logger.info("Alle neuen Tabellen wurden erfolgreich getestet")
except Exception as e:
logger.error(f"Fehler beim Testen der neuen Tabellen: {str(e)}")
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
Test-Script für die Datenbank-Reparatur
"""
import sys
import os
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_database_fix():
"""Testet ob die Datenbank-Reparatur erfolgreich war."""
try:
from models import get_cached_session, User, Printer, Job
print("=== DATENBANK-TEST NACH REPARATUR ===")
with get_cached_session() as session:
# Test User-Query (das war das ursprüngliche Problem)
users = session.query(User).limit(5).all()
print(f"✓ User-Abfrage erfolgreich - {len(users)} Benutzer gefunden")
# Details des ersten Users anzeigen (falls vorhanden)
if users:
user = users[0]
print(f"✓ Test-User: {user.username} ({user.email})")
print(f"✓ updated_at-Feld: {user.updated_at}")
# Test Printer-Query
printers = session.query(Printer).limit(5).all()
print(f"✓ Printer-Abfrage erfolgreich - {len(printers)} Drucker gefunden")
# Test Job-Query
jobs = session.query(Job).limit(5).all()
print(f"✓ Job-Abfrage erfolgreich - {len(jobs)} Jobs gefunden")
print("\n🎉 ALLE DATENBANK-TESTS ERFOLGREICH!")
print("Die Anwendung sollte jetzt ohne Fehler starten.")
return True
except Exception as e:
print(f"\n❌ DATENBANK-TEST FEHLGESCHLAGEN: {str(e)}")
return False
if __name__ == "__main__":
test_database_fix()

View File

@@ -0,0 +1,437 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Umfassender Systemfunktionalitätstest für MYP Platform
Prüft alle kritischen Komponenten und Features
"""
import sys
import os
import json
import requests
import time
from datetime import datetime
from typing import Dict, List, Any
# Füge das aktuelle Verzeichnis zum Python-Pfad hinzu
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Tests für interne Komponenten
def test_internal_components():
"""Testet interne Systemkomponenten"""
results = {}
print("🔍 Teste interne Systemkomponenten...")
# Test 1: Importiere kritische Module
try:
from models import User, Printer, Job, get_db_session, init_database
from config.settings import SECRET_KEY, DATABASE_PATH
from utils.logging_config import get_logger
results["module_imports"] = {"status": "SUCCESS", "message": "Alle kritischen Module importiert"}
except Exception as e:
results["module_imports"] = {"status": "FAILED", "message": f"Import-Fehler: {str(e)}"}
return results
# Test 2: Datenbankverbindung
try:
db_session = get_db_session()
user_count = db_session.query(User).count()
printer_count = db_session.query(Printer).count()
job_count = db_session.query(Job).count()
db_session.close()
results["database_connection"] = {
"status": "SUCCESS",
"message": f"Datenbank verbunden - {user_count} Benutzer, {printer_count} Drucker, {job_count} Jobs"
}
except Exception as e:
results["database_connection"] = {"status": "FAILED", "message": f"DB-Fehler: {str(e)}"}
# Test 3: Admin-Benutzer vorhanden
try:
db_session = get_db_session()
admin_user = db_session.query(User).filter(User.role == "admin").first()
db_session.close()
if admin_user:
results["admin_user"] = {
"status": "SUCCESS",
"message": f"Admin-Benutzer gefunden: {admin_user.username} ({admin_user.email})"
}
else:
results["admin_user"] = {"status": "FAILED", "message": "Kein Admin-Benutzer gefunden"}
except Exception as e:
results["admin_user"] = {"status": "FAILED", "message": f"Admin-Check-Fehler: {str(e)}"}
# Test 4: Windows-Fixes
try:
if os.name == 'nt':
from utils.windows_fixes import get_windows_thread_manager
thread_manager = get_windows_thread_manager()
if thread_manager:
results["windows_fixes"] = {"status": "SUCCESS", "message": "Windows-Fixes geladen"}
else:
results["windows_fixes"] = {"status": "WARNING", "message": "Windows-Fixes verfügbar aber nicht aktiv"}
else:
results["windows_fixes"] = {"status": "SKIPPED", "message": "Nicht Windows-System"}
except Exception as e:
results["windows_fixes"] = {"status": "WARNING", "message": f"Windows-Fixes-Fehler: {str(e)}"}
# Test 5: Logging-System
try:
logger = get_logger("test")
logger.info("Test-Log-Nachricht")
results["logging_system"] = {"status": "SUCCESS", "message": "Logging-System funktional"}
except Exception as e:
results["logging_system"] = {"status": "FAILED", "message": f"Logging-Fehler: {str(e)}"}
# Test 6: Queue Manager
try:
from utils.queue_manager import get_queue_manager
queue_manager = get_queue_manager()
if queue_manager:
status = queue_manager.get_queue_status()
results["queue_manager"] = {
"status": "SUCCESS",
"message": f"Queue Manager aktiv - Status: {len(status)} Warteschlangen"
}
else:
results["queue_manager"] = {"status": "WARNING", "message": "Queue Manager nicht initialisiert"}
except Exception as e:
results["queue_manager"] = {"status": "WARNING", "message": f"Queue Manager-Fehler: {str(e)}"}
# Test 7: Job Scheduler
try:
from utils.job_scheduler import get_job_scheduler
scheduler = get_job_scheduler()
if scheduler:
results["job_scheduler"] = {"status": "SUCCESS", "message": "Job Scheduler verfügbar"}
else:
results["job_scheduler"] = {"status": "WARNING", "message": "Job Scheduler nicht verfügbar"}
except Exception as e:
results["job_scheduler"] = {"status": "WARNING", "message": f"Job Scheduler-Fehler: {str(e)}"}
return results
def test_api_endpoints():
"""Testet kritische API-Endpunkte"""
results = {}
base_url = "http://localhost:5000"
print("🌐 Teste API-Endpunkte...")
# Test 1: Root-Endpunkt
try:
response = requests.get(f"{base_url}/", timeout=5)
if response.status_code == 200:
results["root_endpoint"] = {"status": "SUCCESS", "message": "Root-Endpunkt erreichbar"}
else:
results["root_endpoint"] = {"status": "FAILED", "message": f"HTTP {response.status_code}"}
except Exception as e:
results["root_endpoint"] = {"status": "FAILED", "message": f"Verbindungsfehler: {str(e)}"}
# Test 2: Login-Seite
try:
response = requests.get(f"{base_url}/auth/login", timeout=5)
if response.status_code == 200:
results["login_page"] = {"status": "SUCCESS", "message": "Login-Seite verfügbar"}
else:
results["login_page"] = {"status": "FAILED", "message": f"HTTP {response.status_code}"}
except Exception as e:
results["login_page"] = {"status": "FAILED", "message": f"Login-Seite-Fehler: {str(e)}"}
# Test 3: API Status (ohne Authentifizierung)
try:
response = requests.get(f"{base_url}/api/kiosk/status", timeout=5)
if response.status_code in [200, 401, 403]: # Diese sind alle erwartete Responses
results["api_status"] = {"status": "SUCCESS", "message": "API grundsätzlich erreichbar"}
else:
results["api_status"] = {"status": "WARNING", "message": f"Unerwarteter HTTP {response.status_code}"}
except Exception as e:
results["api_status"] = {"status": "FAILED", "message": f"API-Status-Fehler: {str(e)}"}
return results
def test_file_structure():
"""Testet die Datei- und Verzeichnisstruktur"""
results = {}
print("📁 Teste Datei- und Verzeichnisstruktur...")
# Kritische Dateien
critical_files = [
"app.py",
"models.py",
"config/settings.py",
"templates/base.html",
"templates/login.html",
"templates/dashboard.html",
"static/css",
"static/js",
"utils/logging_config.py",
"utils/queue_manager.py",
"blueprints/guest.py",
"blueprints/users.py",
"blueprints/calendar.py"
]
missing_files = []
present_files = []
for file_path in critical_files:
if os.path.exists(file_path):
present_files.append(file_path)
else:
missing_files.append(file_path)
if missing_files:
results["file_structure"] = {
"status": "WARNING",
"message": f"Fehlende Dateien: {', '.join(missing_files)}"
}
else:
results["file_structure"] = {
"status": "SUCCESS",
"message": f"Alle {len(present_files)} kritischen Dateien vorhanden"
}
# Verzeichnisse
critical_dirs = ["logs", "database", "uploads", "static", "templates", "utils", "config", "blueprints"]
missing_dirs = []
present_dirs = []
for dir_path in critical_dirs:
if os.path.exists(dir_path) and os.path.isdir(dir_path):
present_dirs.append(dir_path)
else:
missing_dirs.append(dir_path)
if missing_dirs:
results["directory_structure"] = {
"status": "WARNING",
"message": f"Fehlende Verzeichnisse: {', '.join(missing_dirs)}"
}
else:
results["directory_structure"] = {
"status": "SUCCESS",
"message": f"Alle {len(present_dirs)} kritischen Verzeichnisse vorhanden"
}
return results
def test_database_integrity():
"""Testet die Datenbankintegrität"""
results = {}
print("🗄️ Teste Datenbankintegrität...")
try:
from models import User, Printer, Job, Stats, SystemLog, GuestRequest, UserPermission, Notification, get_db_session
db_session = get_db_session()
# Test Tabellen-Existenz
tables_test = {}
models_to_test = [User, Printer, Job, Stats, SystemLog, GuestRequest, UserPermission, Notification]
for model in models_to_test:
try:
count = db_session.query(model).count()
tables_test[model.__tablename__] = {"exists": True, "count": count}
except Exception as e:
tables_test[model.__tablename__] = {"exists": False, "error": str(e)}
existing_tables = sum(1 for t in tables_test.values() if t.get("exists"))
total_tables = len(tables_test)
if existing_tables == total_tables:
results["table_integrity"] = {
"status": "SUCCESS",
"message": f"Alle {total_tables} Tabellen existieren und sind zugänglich"
}
else:
results["table_integrity"] = {
"status": "FAILED",
"message": f"Nur {existing_tables}/{total_tables} Tabellen zugänglich"
}
# Test Datenbank-Constraints
try:
# Teste Foreign Key Constraints
db_session.execute("PRAGMA foreign_key_check")
results["database_constraints"] = {"status": "SUCCESS", "message": "Foreign Key Constraints OK"}
except Exception as e:
results["database_constraints"] = {"status": "WARNING", "message": f"Constraint-Check-Fehler: {str(e)}"}
db_session.close()
except Exception as e:
results["database_integrity"] = {"status": "FAILED", "message": f"DB-Integritätstest fehlgeschlagen: {str(e)}"}
return results
def create_test_data():
"""Erstellt Testdaten falls nötig"""
results = {}
print("🧪 Erstelle Testdaten...")
try:
from models import User, Printer, Job, get_db_session
db_session = get_db_session()
# Teste ob Testdrucker existieren
test_printer = db_session.query(Printer).filter(Printer.name.like("Test%")).first()
if not test_printer:
# Erstelle Test-Drucker
test_printer = Printer(
name="Test Drucker 1",
model="Test Model",
location="Test Labor",
ip_address="192.168.1.100",
mac_address="00:11:22:33:44:55",
plug_ip="192.168.1.101",
plug_username="test_user",
plug_password="test_pass",
status="offline"
)
db_session.add(test_printer)
db_session.commit()
results["test_printer"] = {"status": "SUCCESS", "message": "Test-Drucker erstellt"}
else:
results["test_printer"] = {"status": "SUCCESS", "message": "Test-Drucker bereits vorhanden"}
# Teste ob Testbenutzer existiert
test_user = db_session.query(User).filter(User.username == "testuser").first()
if not test_user:
# Erstelle Test-Benutzer
test_user = User(
username="testuser",
email="test@test.com",
name="Test Benutzer",
role="user"
)
test_user.set_password("testpass")
db_session.add(test_user)
db_session.commit()
results["test_user"] = {"status": "SUCCESS", "message": "Test-Benutzer erstellt"}
else:
results["test_user"] = {"status": "SUCCESS", "message": "Test-Benutzer bereits vorhanden"}
db_session.close()
except Exception as e:
results["test_data_creation"] = {"status": "FAILED", "message": f"Test-Daten-Erstellung fehlgeschlagen: {str(e)}"}
return results
def run_comprehensive_test():
"""Führt alle Tests aus und zeigt Ergebnisse an"""
print("🚀 Starte umfassenden Systemfunktionalitätstest für MYP Platform\n")
print("=" * 70)
all_results = {}
# Interne Komponenten
all_results.update(test_internal_components())
print()
# Datei-/Verzeichnisstruktur
all_results.update(test_file_structure())
print()
# Datenbankintegrität
all_results.update(test_database_integrity())
print()
# Testdaten erstellen
all_results.update(create_test_data())
print()
# API-Endpunkte (nur wenn Server läuft)
all_results.update(test_api_endpoints())
print()
# Ergebnisse zusammenfassen
print("=" * 70)
print("📊 TESTERGEBNISSE ZUSAMMENFASSUNG")
print("=" * 70)
success_count = 0
warning_count = 0
failed_count = 0
skipped_count = 0
for test_name, result in all_results.items():
status = result["status"]
message = result["message"]
if status == "SUCCESS":
print(f"{test_name}: {message}")
success_count += 1
elif status == "WARNING":
print(f"⚠️ {test_name}: {message}")
warning_count += 1
elif status == "FAILED":
print(f"{test_name}: {message}")
failed_count += 1
elif status == "SKIPPED":
print(f"⏭️ {test_name}: {message}")
skipped_count += 1
total_tests = len(all_results)
print("\n" + "=" * 70)
print("📈 STATISTIKEN")
print("=" * 70)
print(f"Gesamt: {total_tests} Tests")
print(f"✅ Erfolgreich: {success_count}")
print(f"⚠️ Warnungen: {warning_count}")
print(f"❌ Fehlgeschlagen: {failed_count}")
print(f"⏭️ Übersprungen: {skipped_count}")
# Empfehlungen
print("\n" + "=" * 70)
print("💡 EMPFEHLUNGEN")
print("=" * 70)
if failed_count == 0 and warning_count <= 2:
print("🎉 System ist voll funktionsfähig!")
print(" Alle kritischen Komponenten arbeiten ordnungsgemäß.")
elif failed_count == 0:
print("✅ System ist grundsätzlich funktionsfähig.")
print(" Einige Warnungen sollten beachtet werden.")
else:
print("⚠️ System hat kritische Probleme.")
print(" Fehlgeschlagene Tests müssen behoben werden.")
# Speichere Ergebnisse in JSON-Datei
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_file = f"test_results_{timestamp}.json"
with open(results_file, "w", encoding="utf-8") as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"summary": {
"total": total_tests,
"success": success_count,
"warnings": warning_count,
"failed": failed_count,
"skipped": skipped_count
},
"detailed_results": all_results
}, f, indent=2, ensure_ascii=False)
print(f"\n📄 Detaillierte Ergebnisse gespeichert in: {results_file}")
return failed_count == 0
if __name__ == "__main__":
success = run_comprehensive_test()
sys.exit(0 if success else 1)