743 lines
25 KiB
Python
743 lines
25 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
MYP Debug CLI
|
||
Kommandozeilen-Tool für Diagnose und Debugging der MYP-Anwendung
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import time
|
||
import json
|
||
import importlib
|
||
import logging
|
||
import sqlite3
|
||
from datetime import datetime
|
||
import traceback
|
||
from pprint import pprint
|
||
|
||
# Eigene Module importieren
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
# Farbige Ausgabe für die Konsole
|
||
COLORS = {
|
||
'RESET': '\033[0m',
|
||
'BOLD': '\033[1m',
|
||
'RED': '\033[31m',
|
||
'GREEN': '\033[32m',
|
||
'YELLOW': '\033[33m',
|
||
'BLUE': '\033[34m',
|
||
'MAGENTA': '\033[35m',
|
||
'CYAN': '\033[36m',
|
||
}
|
||
|
||
# Emojis für verschiedene Log-Level und Kategorien
|
||
LOG_EMOJIS = {
|
||
'DEBUG': '🔍',
|
||
'INFO': 'ℹ️',
|
||
'WARNING': '⚠️',
|
||
'ERROR': '❌',
|
||
'CRITICAL': '🔥',
|
||
'SUCCESS': '✅',
|
||
'DATABASE': '💾',
|
||
'NETWORK': '🌐',
|
||
'SYSTEM': '💻',
|
||
'PRINTER': '🖨️',
|
||
'API': '📡',
|
||
'USER': '👤'
|
||
}
|
||
|
||
# Prüfen, ob das Terminal Farben unterstützt
|
||
def supports_color():
|
||
"""Prüft, ob das Terminal Farben unterstützt."""
|
||
if os.name == 'nt':
|
||
try:
|
||
import ctypes
|
||
kernel32 = ctypes.windll.kernel32
|
||
# Aktiviere VT100-Unterstützung unter Windows
|
||
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
|
||
return True
|
||
except:
|
||
return False
|
||
else:
|
||
return sys.stdout.isatty()
|
||
|
||
USE_COLOR = supports_color()
|
||
|
||
def colorize(text, color):
|
||
"""Färbt den Text ein, wenn Farben unterstützt werden."""
|
||
if USE_COLOR and color in COLORS:
|
||
return f"{COLORS[color]}{text}{COLORS['RESET']}"
|
||
return text
|
||
|
||
def print_success(message):
|
||
print(f"{LOG_EMOJIS['SUCCESS']} {colorize(message, 'GREEN')}")
|
||
|
||
def print_error(message):
|
||
print(f"{LOG_EMOJIS['ERROR']} {colorize(message, 'RED')}")
|
||
|
||
def print_warning(message):
|
||
print(f"{LOG_EMOJIS['WARNING']} {colorize(message, 'YELLOW')}")
|
||
|
||
def print_info(message):
|
||
print(f"{LOG_EMOJIS['INFO']} {colorize(message, 'BLUE')}")
|
||
|
||
def print_debug(message):
|
||
print(f"{LOG_EMOJIS['DEBUG']} {colorize(message, 'CYAN')}")
|
||
|
||
def print_database(message):
|
||
print(f"{LOG_EMOJIS['DATABASE']} {colorize(message, 'MAGENTA')}")
|
||
|
||
def print_network(message):
|
||
print(f"{LOG_EMOJIS['NETWORK']} {colorize(message, 'CYAN')}")
|
||
|
||
def print_system(message):
|
||
print(f"{LOG_EMOJIS['SYSTEM']} {colorize(message, 'BLUE')}")
|
||
|
||
def print_printer(message):
|
||
print(f"{LOG_EMOJIS['PRINTER']} {colorize(message, 'GREEN')}")
|
||
|
||
def print_header(message):
|
||
print(f"\n{colorize('='*80, 'BOLD')}")
|
||
print(f"{colorize(message.center(80), 'BOLD')}")
|
||
print(f"{colorize('='*80, 'BOLD')}\n")
|
||
|
||
def print_section(message):
|
||
print(f"\n{colorize('-'*40, 'BOLD')}")
|
||
print(f"{colorize(message, 'BOLD')}")
|
||
print(f"{colorize('-'*40, 'BOLD')}\n")
|
||
|
||
# Hilfsfunktionen
|
||
|
||
def get_database_path():
|
||
"""Gibt den Pfad zur Datenbank zurück."""
|
||
try:
|
||
from config.settings import DATABASE_PATH
|
||
return DATABASE_PATH
|
||
except ImportError:
|
||
# Fallback auf Standard-Pfad
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
return os.path.join(base_dir, "database", "myp.db")
|
||
|
||
def check_database():
|
||
"""Prüft den Zustand der Datenbank."""
|
||
db_path = get_database_path()
|
||
|
||
if not os.path.exists(db_path):
|
||
print_error(f"Datenbank nicht gefunden: {db_path}")
|
||
return False
|
||
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# Tabellen auflisten
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||
tables = cursor.fetchall()
|
||
|
||
print_database(f"Datenbank gefunden: {db_path}")
|
||
print_database(f"Größe: {os.path.getsize(db_path) / (1024*1024):.2f} MB")
|
||
print_database(f"Tabellen ({len(tables)}):")
|
||
|
||
for table in tables:
|
||
# Anzahl der Datensätze pro Tabelle
|
||
cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
|
||
count = cursor.fetchone()[0]
|
||
print(f" 📋 {table[0]}: {count} Einträge")
|
||
|
||
conn.close()
|
||
return True
|
||
except sqlite3.Error as e:
|
||
print_error(f"Datenbankfehler: {e}")
|
||
return False
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Prüfen der Datenbank: {e}")
|
||
return False
|
||
|
||
def check_log_files():
|
||
"""Prüft die Log-Dateien und zeigt die neuesten Einträge an."""
|
||
try:
|
||
from config.settings import LOG_DIR, LOG_SUBDIRS
|
||
|
||
if not os.path.exists(LOG_DIR):
|
||
print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}")
|
||
return False
|
||
|
||
print_info(f"Log-Verzeichnis: {LOG_DIR}")
|
||
|
||
for subdir in LOG_SUBDIRS:
|
||
log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log")
|
||
|
||
if not os.path.exists(log_path):
|
||
print_warning(f"Log-Datei nicht gefunden: {log_path}")
|
||
continue
|
||
|
||
size = os.path.getsize(log_path) / 1024 # KB
|
||
print_info(f"Log-Datei: {subdir}.log ({size:.1f} KB)")
|
||
|
||
# Letzte Zeilen anzeigen
|
||
try:
|
||
with open(log_path, 'r') as f:
|
||
lines = f.readlines()
|
||
last_lines = lines[-5:] # Letzte 5 Zeilen
|
||
|
||
print(" Letzte Einträge:")
|
||
for line in last_lines:
|
||
line = line.strip()
|
||
|
||
# Farbliche Hervorhebung je nach Log-Level
|
||
if "ERROR" in line:
|
||
print(f" {colorize(line, 'RED')}")
|
||
elif "WARNING" in line:
|
||
print(f" {colorize(line, 'YELLOW')}")
|
||
elif "INFO" in line:
|
||
print(f" {colorize(line, 'GREEN')}")
|
||
elif "DEBUG" in line:
|
||
print(f" {colorize(line, 'CYAN')}")
|
||
else:
|
||
print(f" {line}")
|
||
except Exception as e:
|
||
print_warning(f" Fehler beim Lesen der Log-Datei: {e}")
|
||
|
||
return True
|
||
except ImportError:
|
||
print_error("Konfiguration für Logs nicht gefunden")
|
||
return False
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Prüfen der Log-Dateien: {e}")
|
||
return False
|
||
|
||
def check_environment():
|
||
"""Prüft die Umgebungsvariablen und System-Einstellungen."""
|
||
print_info("Umgebungsinformationen:")
|
||
print(f" Python-Version: {sys.version.split()[0]}")
|
||
print(f" Betriebssystem: {os.name} - {sys.platform}")
|
||
print(f" Arbeitsverzeichnis: {os.getcwd()}")
|
||
|
||
print_info("Wichtige Umgebungsvariablen:")
|
||
env_vars = [
|
||
"FLASK_ENV", "FLASK_DEBUG", "MYP_SSL_ENABLED",
|
||
"MYP_SSL_HOSTNAME", "PYTHONPATH"
|
||
]
|
||
|
||
for var in env_vars:
|
||
value = os.environ.get(var, "nicht gesetzt")
|
||
print(f" {var}: {value}")
|
||
|
||
try:
|
||
# Flask-Konfiguration prüfen
|
||
print_info("Flask-Konfiguration:")
|
||
from config.settings import FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SSL_ENABLED
|
||
|
||
print(f" Host: {FLASK_HOST}")
|
||
print(f" Port: {FLASK_PORT}")
|
||
print(f" Debug-Modus: {FLASK_DEBUG}")
|
||
print(f" SSL aktiviert: {SSL_ENABLED}")
|
||
|
||
# Module prüfen
|
||
required_modules = [
|
||
'flask', 'sqlalchemy', 'flask_login', 'werkzeug'
|
||
]
|
||
|
||
print_info("Benötigte Module:")
|
||
for module in required_modules:
|
||
try:
|
||
mod = importlib.import_module(module)
|
||
version = getattr(mod, '__version__', 'unbekannt')
|
||
print(f" {module}: {colorize('OK', 'GREEN')} (Version {version})")
|
||
except ImportError:
|
||
print(f" {module}: {colorize('FEHLT', 'RED')}")
|
||
|
||
except ImportError:
|
||
print_warning("Flask-Konfiguration konnte nicht geladen werden")
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Prüfen der Umgebung: {e}")
|
||
|
||
def scan_printer(ip_address, timeout=5):
|
||
"""Scannt einen Drucker und zeigt Informationen an."""
|
||
import socket
|
||
|
||
print_printer(f"Prüfe Drucker mit IP: {ip_address}")
|
||
|
||
# Ping testen
|
||
import subprocess
|
||
try:
|
||
if os.name == 'nt': # Windows
|
||
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address]
|
||
else: # Unix/Linux/macOS
|
||
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address]
|
||
|
||
print(f" 🏓 Ping-Test: ", end="")
|
||
result = subprocess.run(cmd, capture_output=True, text=True,
|
||
encoding='utf-8', errors='replace')
|
||
|
||
if result.returncode == 0:
|
||
print(colorize("Erreichbar", "GREEN"))
|
||
else:
|
||
print(colorize("Nicht erreichbar", "RED"))
|
||
print(f" 📄 Details: {result.stdout}")
|
||
return
|
||
except Exception as e:
|
||
print(colorize(f"Fehler bei Ping-Test: {e}", "RED"))
|
||
|
||
# Offene Ports prüfen
|
||
common_ports = [80, 443, 8080, 8443, 631, 9100, 9101, 9102]
|
||
open_ports = []
|
||
|
||
print(" 🔍 Port-Scan: ", end="")
|
||
for port in common_ports:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(timeout)
|
||
result = sock.connect_ex((ip_address, port))
|
||
if result == 0:
|
||
open_ports.append(port)
|
||
sock.close()
|
||
|
||
if open_ports:
|
||
print(colorize(f"Offene Ports: {', '.join(map(str, open_ports))}", "GREEN"))
|
||
else:
|
||
print(colorize("Keine offenen Ports gefunden", "YELLOW"))
|
||
|
||
# Drucker-Info über Tapo-API testen (wenn vorhanden)
|
||
try:
|
||
from PyP100 import PyP110
|
||
|
||
print(" 🔌 Smart Plug Test: ", end="")
|
||
try:
|
||
# Standardmäßig Anmeldeinformationen aus der Konfiguration verwenden
|
||
from config.settings import TAPO_USERNAME, TAPO_PASSWORD
|
||
|
||
p110 = PyP110.P110(ip_address, TAPO_USERNAME, TAPO_PASSWORD)
|
||
p110.handshake()
|
||
p110.login()
|
||
|
||
device_info = p110.getDeviceInfo()
|
||
print(colorize("Verbunden", "GREEN"))
|
||
print(f" 📛 Gerätename: {device_info.get('nickname', 'Unbekannt')}")
|
||
print(f" ⚡ Status: {'Ein' if device_info.get('device_on', False) else 'Aus'}")
|
||
|
||
if 'on_time' in device_info:
|
||
on_time = device_info['on_time']
|
||
print(f" ⏱️ Betriebszeit: {on_time // 60} Minuten, {on_time % 60} Sekunden")
|
||
|
||
except Exception as e:
|
||
print(colorize(f"Fehler: {e}", "RED"))
|
||
except ImportError:
|
||
print_warning(" PyP100-Modul nicht verfügbar - Smart Plug Test übersprungen")
|
||
|
||
def check_printers_from_db():
|
||
"""Prüft die in der Datenbank gespeicherten Drucker."""
|
||
db_path = get_database_path()
|
||
|
||
if not os.path.exists(db_path):
|
||
print_error(f"Datenbank nicht gefunden: {db_path}")
|
||
return
|
||
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# Drucker-Tabelle prüfen
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='printer';")
|
||
if not cursor.fetchone():
|
||
print_error("Drucker-Tabelle nicht gefunden")
|
||
conn.close()
|
||
return
|
||
|
||
# Drucker auslesen
|
||
cursor.execute("SELECT * FROM printer;")
|
||
printers = cursor.fetchall()
|
||
|
||
if not printers:
|
||
print_warning("Keine Drucker in der Datenbank gefunden")
|
||
conn.close()
|
||
return
|
||
|
||
print_info(f"{len(printers)} Drucker gefunden:")
|
||
|
||
for printer in printers:
|
||
status_color = 'GREEN' if printer['status'] == 'online' else 'RED'
|
||
print(f" {printer['name']}: {colorize(printer['status'], status_color)}")
|
||
print(f" IP: {printer['ip_address']}")
|
||
print(f" Plug IP: {printer['plug_ip'] or 'Nicht konfiguriert'}")
|
||
|
||
# Detaillierteren Status prüfen
|
||
if printer['plug_ip']:
|
||
ask = input(f" Möchten Sie den Drucker {printer['name']} scannen? (j/n): ")
|
||
if ask.lower() in ('j', 'ja', 'y', 'yes'):
|
||
scan_printer(printer['plug_ip'])
|
||
|
||
conn.close()
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Prüfen der Drucker: {e}")
|
||
traceback.print_exc()
|
||
|
||
def check_flask_routes():
|
||
"""Zeigt alle verfügbaren Flask-Routen an."""
|
||
try:
|
||
# Versuche, die Flask-App zu importieren
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
try:
|
||
from app import app as flask_app
|
||
except ImportError:
|
||
print_error("Flask-App konnte nicht importiert werden")
|
||
return
|
||
|
||
# Alle Routen auflisten
|
||
print_info("Verfügbare Flask-Routen:")
|
||
|
||
routes = []
|
||
for rule in flask_app.url_map.iter_rules():
|
||
routes.append({
|
||
'endpoint': rule.endpoint,
|
||
'methods': ', '.join(sorted(rule.methods - {'HEAD', 'OPTIONS'})),
|
||
'path': rule.rule
|
||
})
|
||
|
||
# Nach Pfad sortieren
|
||
routes = sorted(routes, key=lambda x: x['path'])
|
||
|
||
# Routen anzeigen
|
||
for route in routes:
|
||
method_color = 'GREEN' if 'GET' in route['methods'] else 'BLUE'
|
||
print(f" {colorize(route['methods'], method_color)} {route['path']}")
|
||
print(f" → {route['endpoint']}")
|
||
|
||
print_info(f"Insgesamt {len(routes)} Routen gefunden")
|
||
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Abrufen der Flask-Routen: {e}")
|
||
traceback.print_exc()
|
||
|
||
def print_system_info():
|
||
"""Zeigt detaillierte Systeminformationen an."""
|
||
print_header("Systeminformationen")
|
||
|
||
print_section("Basisinformationen")
|
||
import platform
|
||
print(f"Python-Version: {platform.python_version()}")
|
||
print(f"Betriebssystem: {platform.system()} {platform.release()}")
|
||
print(f"Architektur: {platform.machine()}")
|
||
print(f"Prozessor: {platform.processor()}")
|
||
|
||
print_section("Speicher")
|
||
try:
|
||
import psutil
|
||
vm = psutil.virtual_memory()
|
||
print(f"Gesamter Speicher: {vm.total / (1024**3):.1f} GB")
|
||
print(f"Verfügbarer Speicher: {vm.available / (1024**3):.1f} GB")
|
||
print(f"Speicherauslastung: {vm.percent}%")
|
||
|
||
disk = psutil.disk_usage('/')
|
||
print(f"Festplatte gesamt: {disk.total / (1024**3):.1f} GB")
|
||
print(f"Festplatte frei: {disk.free / (1024**3):.1f} GB")
|
||
print(f"Festplattenauslastung: {disk.percent}%")
|
||
except ImportError:
|
||
print_warning("psutil-Modul nicht verfügbar - eingeschränkte Informationen")
|
||
|
||
print_section("Netzwerk")
|
||
try:
|
||
import socket
|
||
hostname = socket.gethostname()
|
||
ip_address = socket.gethostbyname(hostname)
|
||
print(f"Hostname: {hostname}")
|
||
print(f"IP-Adresse: {ip_address}")
|
||
|
||
# Netzwerkschnittstellen
|
||
if 'psutil' in sys.modules:
|
||
print("Netzwerkschnittstellen:")
|
||
for name, addrs in psutil.net_if_addrs().items():
|
||
for addr in addrs:
|
||
if addr.family == socket.AF_INET:
|
||
print(f" {name}: {addr.address}")
|
||
except Exception as e:
|
||
print_warning(f"Fehler beim Abrufen der Netzwerkinformationen: {e}")
|
||
|
||
def test_logging_system():
|
||
"""Testet das verbesserte Logging-System mit allen Features."""
|
||
print_header("Logging-System Test")
|
||
|
||
try:
|
||
# Versuche die neuen Logging-Funktionen zu importieren
|
||
from utils.logging_config import get_logger, debug_request, debug_response, measure_execution_time
|
||
|
||
print_success("Neue Logging-Module erfolgreich importiert")
|
||
|
||
# Test verschiedener Logger
|
||
test_loggers = ['app', 'auth', 'jobs', 'printers', 'errors']
|
||
|
||
print_section("Logger-Tests")
|
||
for logger_name in test_loggers:
|
||
try:
|
||
logger = get_logger(logger_name)
|
||
|
||
# Test verschiedener Log-Level
|
||
logger.debug(f"🔍 Debug-Test für {logger_name}")
|
||
logger.info(f"ℹ️ Info-Test für {logger_name}")
|
||
logger.warning(f"⚠️ Warning-Test für {logger_name}")
|
||
|
||
print_success(f"Logger '{logger_name}' funktioniert korrekt")
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Testen von Logger '{logger_name}': {e}")
|
||
|
||
# Test Performance-Monitoring
|
||
print_section("Performance-Monitoring Test")
|
||
|
||
@measure_execution_time(logger=get_logger("app"), task_name="Test-Funktion")
|
||
def test_function():
|
||
"""Eine Test-Funktion für das Performance-Monitoring."""
|
||
import time
|
||
time.sleep(0.1) # Simuliere etwas Arbeit
|
||
return "Test erfolgreich"
|
||
|
||
result = test_function()
|
||
print_success(f"Performance-Monitoring Test: {result}")
|
||
|
||
# Test der Debug-Utilities
|
||
print_section("Debug-Utilities Test")
|
||
|
||
try:
|
||
from utils.debug_utils import debug_dump, debug_trace, memory_usage
|
||
|
||
# Test debug_dump
|
||
test_data = {
|
||
"version": "1.0.0",
|
||
"features": ["emojis", "colors", "performance-monitoring"],
|
||
"status": "active"
|
||
}
|
||
debug_dump(test_data, "Test-Konfiguration")
|
||
|
||
# Test memory_usage
|
||
memory_info = memory_usage()
|
||
print_system(f"Aktueller Speicherverbrauch: {memory_info['rss']:.2f} MB")
|
||
|
||
print_success("Debug-Utilities funktionieren korrekt")
|
||
|
||
except ImportError as e:
|
||
print_warning(f"Debug-Utilities nicht verfügbar: {e}")
|
||
|
||
# Zusammenfassung
|
||
print_section("Test-Zusammenfassung")
|
||
print_success("🎉 Alle Logging-System-Tests erfolgreich abgeschlossen!")
|
||
print_info("Features verfügbar:")
|
||
print(" ✅ Farbige Log-Ausgaben mit ANSI-Codes")
|
||
print(" ✅ Emoji-Integration für bessere Lesbarkeit")
|
||
print(" ✅ HTTP-Request/Response-Logging")
|
||
print(" ✅ Performance-Monitoring mit Ausführungszeit")
|
||
print(" ✅ Cross-Platform-Unterstützung (Windows/Unix)")
|
||
print(" ✅ Strukturierte Debug-Informationen")
|
||
|
||
except ImportError as e:
|
||
print_error(f"Logging-Module nicht verfügbar: {e}")
|
||
print_warning("Stelle sicher, dass alle Module korrekt installiert sind")
|
||
except Exception as e:
|
||
print_error(f"Unerwarteter Fehler beim Logging-Test: {e}")
|
||
traceback.print_exc()
|
||
|
||
# Hauptfunktionen für die Befehlszeile
|
||
|
||
def diagnose():
|
||
"""Führt eine umfassende Diagnose durch."""
|
||
print_header("MYP Diagnose-Tool")
|
||
|
||
print_section("Systemprüfung")
|
||
check_environment()
|
||
|
||
print_section("Datenbankprüfung")
|
||
check_database()
|
||
|
||
print_section("Log-Dateien")
|
||
check_log_files()
|
||
|
||
print_success("Diagnose abgeschlossen!")
|
||
|
||
def scan_printers():
|
||
"""Scannt und prüft alle Drucker."""
|
||
print_header("Drucker-Scanner")
|
||
|
||
# Direkter Scan einer IP-Adresse
|
||
ip = input("IP-Adresse zum Scannen (leer lassen, um Drucker aus der Datenbank zu prüfen): ")
|
||
|
||
if ip:
|
||
scan_printer(ip)
|
||
else:
|
||
check_printers_from_db()
|
||
|
||
def show_routes():
|
||
"""Zeigt alle verfügbaren API-Routen an."""
|
||
print_header("API-Routen")
|
||
check_flask_routes()
|
||
|
||
def system_info():
|
||
"""Zeigt detaillierte Systeminformationen an."""
|
||
print_system_info()
|
||
|
||
def show_logs():
|
||
"""Zeigt und analysiert Log-Dateien."""
|
||
print_header("Log-Analyse")
|
||
|
||
try:
|
||
from config.settings import LOG_DIR, LOG_SUBDIRS
|
||
|
||
if not os.path.exists(LOG_DIR):
|
||
print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}")
|
||
return
|
||
|
||
print_info(f"Log-Verzeichnis: {LOG_DIR}")
|
||
print_info("Verfügbare Logs:")
|
||
|
||
for i, subdir in enumerate(LOG_SUBDIRS, 1):
|
||
log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log")
|
||
size = "Nicht gefunden"
|
||
|
||
if os.path.exists(log_path):
|
||
size = f"{os.path.getsize(log_path) / 1024:.1f} KB"
|
||
|
||
print(f" {i}. {subdir}.log ({size})")
|
||
|
||
choice = input("\nWelches Log möchten Sie anzeigen? (Nummer oder Name): ")
|
||
|
||
# Nummer in Namen umwandeln
|
||
try:
|
||
choice_num = int(choice) - 1
|
||
if 0 <= choice_num < len(LOG_SUBDIRS):
|
||
choice = LOG_SUBDIRS[choice_num]
|
||
except ValueError:
|
||
pass
|
||
|
||
# Prüfen, ob die Wahl gültig ist
|
||
if choice not in LOG_SUBDIRS:
|
||
print_error(f"Ungültige Auswahl: {choice}")
|
||
return
|
||
|
||
log_path = os.path.join(LOG_DIR, choice, f"{choice}.log")
|
||
|
||
if not os.path.exists(log_path):
|
||
print_error(f"Log-Datei nicht gefunden: {log_path}")
|
||
return
|
||
|
||
# Anzahl der anzuzeigenden Zeilen
|
||
lines_count = input("Anzahl der anzuzeigenden Zeilen (Standard: 20): ")
|
||
lines_count = int(lines_count) if lines_count.isdigit() else 20
|
||
|
||
# Filter für bestimmte Log-Level
|
||
level_filter = input("Nach Log-Level filtern (INFO, WARNING, ERROR oder leer für alle): ").upper()
|
||
|
||
# Log-Datei anzeigen
|
||
with open(log_path, 'r') as f:
|
||
lines = f.readlines()
|
||
|
||
# Filtern nach Log-Level
|
||
if level_filter:
|
||
lines = [line for line in lines if level_filter in line]
|
||
|
||
# Letzte n Zeilen auswählen
|
||
lines = lines[-lines_count:]
|
||
|
||
print_section(f"Log-Datei: {choice}.log (letzte {len(lines)} Einträge)")
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
|
||
# Farbliche Hervorhebung je nach Log-Level
|
||
if "ERROR" in line:
|
||
print(colorize(line, 'RED'))
|
||
elif "WARNING" in line:
|
||
print(colorize(line, 'YELLOW'))
|
||
elif "INFO" in line:
|
||
print(colorize(line, 'GREEN'))
|
||
elif "DEBUG" in line:
|
||
print(colorize(line, 'CYAN'))
|
||
else:
|
||
print(line)
|
||
|
||
except ImportError:
|
||
print_error("Konfiguration für Logs nicht gefunden")
|
||
except Exception as e:
|
||
print_error(f"Fehler beim Anzeigen der Log-Dateien: {e}")
|
||
traceback.print_exc()
|
||
|
||
def parse_args():
|
||
"""Parse command line arguments."""
|
||
parser = argparse.ArgumentParser(description="MYP Debug CLI")
|
||
|
||
subparsers = parser.add_subparsers(dest="command", help="Befehl")
|
||
|
||
# Diagnose
|
||
diag_parser = subparsers.add_parser("diagnose", help="Führt eine umfassende Diagnose durch")
|
||
|
||
# Drucker scannen
|
||
scan_parser = subparsers.add_parser("scan", help="Scannt und prüft alle Drucker")
|
||
|
||
# Routen anzeigen
|
||
routes_parser = subparsers.add_parser("routes", help="Zeigt alle verfügbaren API-Routen an")
|
||
|
||
# Systeminformationen
|
||
sysinfo_parser = subparsers.add_parser("sysinfo", help="Zeigt detaillierte Systeminformationen an")
|
||
|
||
# Logs anzeigen
|
||
logs_parser = subparsers.add_parser("logs", help="Zeigt und analysiert Log-Dateien")
|
||
|
||
# Logging-System testen
|
||
logging_test_parser = subparsers.add_parser("test-logging", help="Testet das verbesserte Logging-System")
|
||
|
||
return parser.parse_args()
|
||
|
||
def main():
|
||
"""Hauptfunktion."""
|
||
args = parse_args()
|
||
|
||
if args.command == "diagnose":
|
||
diagnose()
|
||
elif args.command == "scan":
|
||
scan_printers()
|
||
elif args.command == "routes":
|
||
show_routes()
|
||
elif args.command == "sysinfo":
|
||
system_info()
|
||
elif args.command == "logs":
|
||
show_logs()
|
||
elif args.command == "test-logging":
|
||
test_logging_system()
|
||
else:
|
||
# Interaktives Menü, wenn kein Befehl angegeben wurde
|
||
print_header("MYP Debug CLI")
|
||
print("Wählen Sie eine Option:")
|
||
print(" 1. Diagnose durchführen")
|
||
print(" 2. Drucker scannen")
|
||
print(" 3. API-Routen anzeigen")
|
||
print(" 4. Systeminformationen anzeigen")
|
||
print(" 5. Log-Dateien anzeigen")
|
||
print(" 6. Logging-System testen")
|
||
print(" 0. Beenden")
|
||
|
||
choice = input("\nIhre Wahl: ")
|
||
|
||
if choice == "1":
|
||
diagnose()
|
||
elif choice == "2":
|
||
scan_printers()
|
||
elif choice == "3":
|
||
show_routes()
|
||
elif choice == "4":
|
||
system_info()
|
||
elif choice == "5":
|
||
show_logs()
|
||
elif choice == "6":
|
||
test_logging_system()
|
||
elif choice == "0":
|
||
print("Auf Wiedersehen!")
|
||
sys.exit(0)
|
||
else:
|
||
print_error("Ungültige Auswahl")
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print_info("\nProgramm wurde durch Benutzer abgebrochen")
|
||
except Exception as e:
|
||
print_error(f"Unerwarteter Fehler: {e}")
|
||
traceback.print_exc() |