#!/usr/bin/env python3 """ MYP Debug CLI Kommandozeilen-Tool für Diagnose und Debugging der MYP-Anwendung """ import os import sys import argparse import time import json import importlib import logging import sqlite3 from datetime import datetime import traceback from pprint import pprint # Eigene Module importieren sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Farbige Ausgabe für die Konsole COLORS = { 'RESET': '\033[0m', 'BOLD': '\033[1m', 'RED': '\033[31m', 'GREEN': '\033[32m', 'YELLOW': '\033[33m', 'BLUE': '\033[34m', 'MAGENTA': '\033[35m', 'CYAN': '\033[36m', } # Emojis für verschiedene Log-Level und Kategorien LOG_EMOJIS = { 'DEBUG': '🔍', 'INFO': 'ℹ️', 'WARNING': '⚠️', 'ERROR': '❌', 'CRITICAL': '🔥', 'SUCCESS': '✅', 'DATABASE': '💾', 'NETWORK': '🌐', 'SYSTEM': '💻', 'PRINTER': '🖨️', 'API': '📡', 'USER': '👤' } # Prüfen, ob das Terminal Farben unterstützt def supports_color(): """Prüft, ob das Terminal Farben unterstützt.""" if os.name == 'nt': try: import ctypes kernel32 = ctypes.windll.kernel32 # Aktiviere VT100-Unterstützung unter Windows kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) return True except: return False else: return sys.stdout.isatty() USE_COLOR = supports_color() def colorize(text, color): """Färbt den Text ein, wenn Farben unterstützt werden.""" if USE_COLOR and color in COLORS: return f"{COLORS[color]}{text}{COLORS['RESET']}" return text def print_success(message): print(f"{LOG_EMOJIS['SUCCESS']} {colorize(message, 'GREEN')}") def print_error(message): print(f"{LOG_EMOJIS['ERROR']} {colorize(message, 'RED')}") def print_warning(message): print(f"{LOG_EMOJIS['WARNING']} {colorize(message, 'YELLOW')}") def print_info(message): print(f"{LOG_EMOJIS['INFO']} {colorize(message, 'BLUE')}") def print_debug(message): print(f"{LOG_EMOJIS['DEBUG']} {colorize(message, 'CYAN')}") def print_database(message): print(f"{LOG_EMOJIS['DATABASE']} {colorize(message, 'MAGENTA')}") def print_network(message): print(f"{LOG_EMOJIS['NETWORK']} {colorize(message, 'CYAN')}") def print_system(message): print(f"{LOG_EMOJIS['SYSTEM']} {colorize(message, 'BLUE')}") def print_printer(message): print(f"{LOG_EMOJIS['PRINTER']} {colorize(message, 'GREEN')}") def print_header(message): print(f"\n{colorize('='*80, 'BOLD')}") print(f"{colorize(message.center(80), 'BOLD')}") print(f"{colorize('='*80, 'BOLD')}\n") def print_section(message): print(f"\n{colorize('-'*40, 'BOLD')}") print(f"{colorize(message, 'BOLD')}") print(f"{colorize('-'*40, 'BOLD')}\n") # Hilfsfunktionen def get_database_path(): """Gibt den Pfad zur Datenbank zurück.""" try: from config.settings import DATABASE_PATH return DATABASE_PATH except ImportError: # Fallback auf Standard-Pfad base_dir = os.path.dirname(os.path.abspath(__file__)) return os.path.join(base_dir, "database", "myp.db") def check_database(): """Prüft den Zustand der Datenbank.""" db_path = get_database_path() if not os.path.exists(db_path): print_error(f"Datenbank nicht gefunden: {db_path}") return False try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Tabellen auflisten cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() print_database(f"Datenbank gefunden: {db_path}") print_database(f"Größe: {os.path.getsize(db_path) / (1024*1024):.2f} MB") print_database(f"Tabellen ({len(tables)}):") for table in tables: # Anzahl der Datensätze pro Tabelle cursor.execute(f"SELECT COUNT(*) FROM {table[0]}") count = cursor.fetchone()[0] print(f" 📋 {table[0]}: {count} Einträge") conn.close() return True except sqlite3.Error as e: print_error(f"Datenbankfehler: {e}") return False except Exception as e: print_error(f"Fehler beim Prüfen der Datenbank: {e}") return False def check_log_files(): """Prüft die Log-Dateien und zeigt die neuesten Einträge an.""" try: from config.settings import LOG_DIR, LOG_SUBDIRS if not os.path.exists(LOG_DIR): print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}") return False print_info(f"Log-Verzeichnis: {LOG_DIR}") for subdir in LOG_SUBDIRS: log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log") if not os.path.exists(log_path): print_warning(f"Log-Datei nicht gefunden: {log_path}") continue size = os.path.getsize(log_path) / 1024 # KB print_info(f"Log-Datei: {subdir}.log ({size:.1f} KB)") # Letzte Zeilen anzeigen try: with open(log_path, 'r') as f: lines = f.readlines() last_lines = lines[-5:] # Letzte 5 Zeilen print(" Letzte Einträge:") for line in last_lines: line = line.strip() # Farbliche Hervorhebung je nach Log-Level if "ERROR" in line: print(f" {colorize(line, 'RED')}") elif "WARNING" in line: print(f" {colorize(line, 'YELLOW')}") elif "INFO" in line: print(f" {colorize(line, 'GREEN')}") elif "DEBUG" in line: print(f" {colorize(line, 'CYAN')}") else: print(f" {line}") except Exception as e: print_warning(f" Fehler beim Lesen der Log-Datei: {e}") return True except ImportError: print_error("Konfiguration für Logs nicht gefunden") return False except Exception as e: print_error(f"Fehler beim Prüfen der Log-Dateien: {e}") return False def check_environment(): """Prüft die Umgebungsvariablen und System-Einstellungen.""" print_info("Umgebungsinformationen:") print(f" Python-Version: {sys.version.split()[0]}") print(f" Betriebssystem: {os.name} - {sys.platform}") print(f" Arbeitsverzeichnis: {os.getcwd()}") print_info("Wichtige Umgebungsvariablen:") env_vars = [ "FLASK_ENV", "FLASK_DEBUG", "MYP_SSL_ENABLED", "MYP_SSL_HOSTNAME", "PYTHONPATH" ] for var in env_vars: value = os.environ.get(var, "nicht gesetzt") print(f" {var}: {value}") try: # Flask-Konfiguration prüfen print_info("Flask-Konfiguration:") from config.settings import FLASK_HOST, FLASK_PORT, FLASK_DEBUG, SSL_ENABLED print(f" Host: {FLASK_HOST}") print(f" Port: {FLASK_PORT}") print(f" Debug-Modus: {FLASK_DEBUG}") print(f" SSL aktiviert: {SSL_ENABLED}") # Module prüfen required_modules = [ 'flask', 'sqlalchemy', 'flask_login', 'werkzeug' ] print_info("Benötigte Module:") for module in required_modules: try: mod = importlib.import_module(module) version = getattr(mod, '__version__', 'unbekannt') print(f" {module}: {colorize('OK', 'GREEN')} (Version {version})") except ImportError: print(f" {module}: {colorize('FEHLT', 'RED')}") except ImportError: print_warning("Flask-Konfiguration konnte nicht geladen werden") except Exception as e: print_error(f"Fehler beim Prüfen der Umgebung: {e}") def scan_printer(ip_address, timeout=5): """Scannt einen Drucker und zeigt Informationen an.""" import socket print_printer(f"Prüfe Drucker mit IP: {ip_address}") # Ping testen import subprocess try: if os.name == 'nt': # Windows cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address] else: # Unix/Linux/macOS cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address] print(f" 🏓 Ping-Test: ", end="") result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='replace') if result.returncode == 0: print(colorize("Erreichbar", "GREEN")) else: print(colorize("Nicht erreichbar", "RED")) print(f" 📄 Details: {result.stdout}") return except Exception as e: print(colorize(f"Fehler bei Ping-Test: {e}", "RED")) # Offene Ports prüfen common_ports = [80, 443, 8080, 8443, 631, 9100, 9101, 9102] open_ports = [] print(" 🔍 Port-Scan: ", end="") for port in common_ports: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((ip_address, port)) if result == 0: open_ports.append(port) sock.close() if open_ports: print(colorize(f"Offene Ports: {', '.join(map(str, open_ports))}", "GREEN")) else: print(colorize("Keine offenen Ports gefunden", "YELLOW")) # Drucker-Info über Tapo-API testen (wenn vorhanden) try: from PyP100 import PyP110 print(" 🔌 Smart Plug Test: ", end="") try: # Standardmäßig Anmeldeinformationen aus der Konfiguration verwenden from config.settings import TAPO_USERNAME, TAPO_PASSWORD p110 = PyP110.P110(ip_address, TAPO_USERNAME, TAPO_PASSWORD) p110.handshake() p110.login() device_info = p110.getDeviceInfo() print(colorize("Verbunden", "GREEN")) print(f" 📛 Gerätename: {device_info.get('nickname', 'Unbekannt')}") print(f" ⚡ Status: {'Ein' if device_info.get('device_on', False) else 'Aus'}") if 'on_time' in device_info: on_time = device_info['on_time'] print(f" ⏱️ Betriebszeit: {on_time // 60} Minuten, {on_time % 60} Sekunden") except Exception as e: print(colorize(f"Fehler: {e}", "RED")) except ImportError: print_warning(" PyP100-Modul nicht verfügbar - Smart Plug Test übersprungen") def check_printers_from_db(): """Prüft die in der Datenbank gespeicherten Drucker.""" db_path = get_database_path() if not os.path.exists(db_path): print_error(f"Datenbank nicht gefunden: {db_path}") return try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Drucker-Tabelle prüfen cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='printer';") if not cursor.fetchone(): print_error("Drucker-Tabelle nicht gefunden") conn.close() return # Drucker auslesen cursor.execute("SELECT * FROM printer;") printers = cursor.fetchall() if not printers: print_warning("Keine Drucker in der Datenbank gefunden") conn.close() return print_info(f"{len(printers)} Drucker gefunden:") for printer in printers: status_color = 'GREEN' if printer['status'] == 'online' else 'RED' print(f" {printer['name']}: {colorize(printer['status'], status_color)}") print(f" IP: {printer['ip_address']}") print(f" Plug IP: {printer['plug_ip'] or 'Nicht konfiguriert'}") # Detaillierteren Status prüfen if printer['plug_ip']: ask = input(f" Möchten Sie den Drucker {printer['name']} scannen? (j/n): ") if ask.lower() in ('j', 'ja', 'y', 'yes'): scan_printer(printer['plug_ip']) conn.close() except Exception as e: print_error(f"Fehler beim Prüfen der Drucker: {e}") traceback.print_exc() def check_flask_routes(): """Zeigt alle verfügbaren Flask-Routen an.""" try: # Versuche, die Flask-App zu importieren sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from app import app as flask_app except ImportError: print_error("Flask-App konnte nicht importiert werden") return # Alle Routen auflisten print_info("Verfügbare Flask-Routen:") routes = [] for rule in flask_app.url_map.iter_rules(): routes.append({ 'endpoint': rule.endpoint, 'methods': ', '.join(sorted(rule.methods - {'HEAD', 'OPTIONS'})), 'path': rule.rule }) # Nach Pfad sortieren routes = sorted(routes, key=lambda x: x['path']) # Routen anzeigen for route in routes: method_color = 'GREEN' if 'GET' in route['methods'] else 'BLUE' print(f" {colorize(route['methods'], method_color)} {route['path']}") print(f" → {route['endpoint']}") print_info(f"Insgesamt {len(routes)} Routen gefunden") except Exception as e: print_error(f"Fehler beim Abrufen der Flask-Routen: {e}") traceback.print_exc() def print_system_info(): """Zeigt detaillierte Systeminformationen an.""" print_header("Systeminformationen") print_section("Basisinformationen") import platform print(f"Python-Version: {platform.python_version()}") print(f"Betriebssystem: {platform.system()} {platform.release()}") print(f"Architektur: {platform.machine()}") print(f"Prozessor: {platform.processor()}") print_section("Speicher") try: import psutil vm = psutil.virtual_memory() print(f"Gesamter Speicher: {vm.total / (1024**3):.1f} GB") print(f"Verfügbarer Speicher: {vm.available / (1024**3):.1f} GB") print(f"Speicherauslastung: {vm.percent}%") disk = psutil.disk_usage('/') print(f"Festplatte gesamt: {disk.total / (1024**3):.1f} GB") print(f"Festplatte frei: {disk.free / (1024**3):.1f} GB") print(f"Festplattenauslastung: {disk.percent}%") except ImportError: print_warning("psutil-Modul nicht verfügbar - eingeschränkte Informationen") print_section("Netzwerk") try: import socket hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"Hostname: {hostname}") print(f"IP-Adresse: {ip_address}") # Netzwerkschnittstellen if 'psutil' in sys.modules: print("Netzwerkschnittstellen:") for name, addrs in psutil.net_if_addrs().items(): for addr in addrs: if addr.family == socket.AF_INET: print(f" {name}: {addr.address}") except Exception as e: print_warning(f"Fehler beim Abrufen der Netzwerkinformationen: {e}") def test_logging_system(): """Testet das verbesserte Logging-System mit allen Features.""" print_header("Logging-System Test") try: # Versuche die neuen Logging-Funktionen zu importieren from utils.logging_config import get_logger, debug_request, debug_response, measure_execution_time print_success("Neue Logging-Module erfolgreich importiert") # Test verschiedener Logger test_loggers = ['app', 'auth', 'jobs', 'printers', 'errors'] print_section("Logger-Tests") for logger_name in test_loggers: try: logger = get_logger(logger_name) # Test verschiedener Log-Level logger.debug(f"🔍 Debug-Test für {logger_name}") logger.info(f"ℹ️ Info-Test für {logger_name}") logger.warning(f"⚠️ Warning-Test für {logger_name}") print_success(f"Logger '{logger_name}' funktioniert korrekt") except Exception as e: print_error(f"Fehler beim Testen von Logger '{logger_name}': {e}") # Test Performance-Monitoring print_section("Performance-Monitoring Test") @measure_execution_time(logger=get_logger("app"), task_name="Test-Funktion") def test_function(): """Eine Test-Funktion für das Performance-Monitoring.""" import time time.sleep(0.1) # Simuliere etwas Arbeit return "Test erfolgreich" result = test_function() print_success(f"Performance-Monitoring Test: {result}") # Test der Debug-Utilities print_section("Debug-Utilities Test") try: from utils.debug_utils import debug_dump, debug_trace, memory_usage # Test debug_dump test_data = { "version": "1.0.0", "features": ["emojis", "colors", "performance-monitoring"], "status": "active" } debug_dump(test_data, "Test-Konfiguration") # Test memory_usage memory_info = memory_usage() print_system(f"Aktueller Speicherverbrauch: {memory_info['rss']:.2f} MB") print_success("Debug-Utilities funktionieren korrekt") except ImportError as e: print_warning(f"Debug-Utilities nicht verfügbar: {e}") # Zusammenfassung print_section("Test-Zusammenfassung") print_success("🎉 Alle Logging-System-Tests erfolgreich abgeschlossen!") print_info("Features verfügbar:") print(" ✅ Farbige Log-Ausgaben mit ANSI-Codes") print(" ✅ Emoji-Integration für bessere Lesbarkeit") print(" ✅ HTTP-Request/Response-Logging") print(" ✅ Performance-Monitoring mit Ausführungszeit") print(" ✅ Cross-Platform-Unterstützung (Windows/Unix)") print(" ✅ Strukturierte Debug-Informationen") except ImportError as e: print_error(f"Logging-Module nicht verfügbar: {e}") print_warning("Stelle sicher, dass alle Module korrekt installiert sind") except Exception as e: print_error(f"Unerwarteter Fehler beim Logging-Test: {e}") traceback.print_exc() # Hauptfunktionen für die Befehlszeile def diagnose(): """Führt eine umfassende Diagnose durch.""" print_header("MYP Diagnose-Tool") print_section("Systemprüfung") check_environment() print_section("Datenbankprüfung") check_database() print_section("Log-Dateien") check_log_files() print_success("Diagnose abgeschlossen!") def scan_printers(): """Scannt und prüft alle Drucker.""" print_header("Drucker-Scanner") # Direkter Scan einer IP-Adresse ip = input("IP-Adresse zum Scannen (leer lassen, um Drucker aus der Datenbank zu prüfen): ") if ip: scan_printer(ip) else: check_printers_from_db() def show_routes(): """Zeigt alle verfügbaren API-Routen an.""" print_header("API-Routen") check_flask_routes() def system_info(): """Zeigt detaillierte Systeminformationen an.""" print_system_info() def show_logs(): """Zeigt und analysiert Log-Dateien.""" print_header("Log-Analyse") try: from config.settings import LOG_DIR, LOG_SUBDIRS if not os.path.exists(LOG_DIR): print_error(f"Log-Verzeichnis nicht gefunden: {LOG_DIR}") return print_info(f"Log-Verzeichnis: {LOG_DIR}") print_info("Verfügbare Logs:") for i, subdir in enumerate(LOG_SUBDIRS, 1): log_path = os.path.join(LOG_DIR, subdir, f"{subdir}.log") size = "Nicht gefunden" if os.path.exists(log_path): size = f"{os.path.getsize(log_path) / 1024:.1f} KB" print(f" {i}. {subdir}.log ({size})") choice = input("\nWelches Log möchten Sie anzeigen? (Nummer oder Name): ") # Nummer in Namen umwandeln try: choice_num = int(choice) - 1 if 0 <= choice_num < len(LOG_SUBDIRS): choice = LOG_SUBDIRS[choice_num] except ValueError: pass # Prüfen, ob die Wahl gültig ist if choice not in LOG_SUBDIRS: print_error(f"Ungültige Auswahl: {choice}") return log_path = os.path.join(LOG_DIR, choice, f"{choice}.log") if not os.path.exists(log_path): print_error(f"Log-Datei nicht gefunden: {log_path}") return # Anzahl der anzuzeigenden Zeilen lines_count = input("Anzahl der anzuzeigenden Zeilen (Standard: 20): ") lines_count = int(lines_count) if lines_count.isdigit() else 20 # Filter für bestimmte Log-Level level_filter = input("Nach Log-Level filtern (INFO, WARNING, ERROR oder leer für alle): ").upper() # Log-Datei anzeigen with open(log_path, 'r') as f: lines = f.readlines() # Filtern nach Log-Level if level_filter: lines = [line for line in lines if level_filter in line] # Letzte n Zeilen auswählen lines = lines[-lines_count:] print_section(f"Log-Datei: {choice}.log (letzte {len(lines)} Einträge)") for line in lines: line = line.strip() # Farbliche Hervorhebung je nach Log-Level if "ERROR" in line: print(colorize(line, 'RED')) elif "WARNING" in line: print(colorize(line, 'YELLOW')) elif "INFO" in line: print(colorize(line, 'GREEN')) elif "DEBUG" in line: print(colorize(line, 'CYAN')) else: print(line) except ImportError: print_error("Konfiguration für Logs nicht gefunden") except Exception as e: print_error(f"Fehler beim Anzeigen der Log-Dateien: {e}") traceback.print_exc() def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="MYP Debug CLI") subparsers = parser.add_subparsers(dest="command", help="Befehl") # Diagnose diag_parser = subparsers.add_parser("diagnose", help="Führt eine umfassende Diagnose durch") # Drucker scannen scan_parser = subparsers.add_parser("scan", help="Scannt und prüft alle Drucker") # Routen anzeigen routes_parser = subparsers.add_parser("routes", help="Zeigt alle verfügbaren API-Routen an") # Systeminformationen sysinfo_parser = subparsers.add_parser("sysinfo", help="Zeigt detaillierte Systeminformationen an") # Logs anzeigen logs_parser = subparsers.add_parser("logs", help="Zeigt und analysiert Log-Dateien") # Logging-System testen logging_test_parser = subparsers.add_parser("test-logging", help="Testet das verbesserte Logging-System") return parser.parse_args() def main(): """Hauptfunktion.""" args = parse_args() if args.command == "diagnose": diagnose() elif args.command == "scan": scan_printers() elif args.command == "routes": show_routes() elif args.command == "sysinfo": system_info() elif args.command == "logs": show_logs() elif args.command == "test-logging": test_logging_system() else: # Interaktives Menü, wenn kein Befehl angegeben wurde print_header("MYP Debug CLI") print("Wählen Sie eine Option:") print(" 1. Diagnose durchführen") print(" 2. Drucker scannen") print(" 3. API-Routen anzeigen") print(" 4. Systeminformationen anzeigen") print(" 5. Log-Dateien anzeigen") print(" 6. Logging-System testen") print(" 0. Beenden") choice = input("\nIhre Wahl: ") if choice == "1": diagnose() elif choice == "2": scan_printers() elif choice == "3": show_routes() elif choice == "4": system_info() elif choice == "5": show_logs() elif choice == "6": test_logging_system() elif choice == "0": print("Auf Wiedersehen!") sys.exit(0) else: print_error("Ungültige Auswahl") if __name__ == "__main__": try: main() except KeyboardInterrupt: print_info("\nProgramm wurde durch Benutzer abgebrochen") except Exception as e: print_error(f"Unerwarteter Fehler: {e}") traceback.print_exc()