From c4bd6ff4dc0a8703857eae1968be528ba16c3f98 Mon Sep 17 00:00:00 2001 From: Till Tomczak Date: Wed, 11 Jun 2025 14:10:01 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=86=97=20=F0=9F=9A=80=20=F0=9F=93=9A=20Re?= =?UTF-8?q?moved=20unused=20utility=20files=20for=20code=20optimization.?= =?UTF-8?q?=20=F0=9F=8E=89=F0=9F=94=A7=F0=9F=93=9A=F0=9F=92=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/utils/add_test_printers.py | 178 ---- backend/utils/create_ssl_cert.py | 95 --- backend/utils/create_test_printers.py | 106 --- backend/utils/email_notification.py | 175 ---- backend/utils/maintenance_system.py | 790 ------------------ backend/utils/multi_location_system.py | 899 --------------------- backend/utils/offline_config.py | 229 ------ backend/utils/test_button_functionality.py | 243 ------ backend/utils/test_p110.py | 175 ---- backend/utils/test_tapo_direkt.py | 212 ----- backend/utils/test_tapo_sofort.py | 132 --- backend/utils/update_requirements.py | 295 ------- 12 files changed, 3529 deletions(-) delete mode 100644 backend/utils/add_test_printers.py delete mode 100644 backend/utils/create_ssl_cert.py delete mode 100644 backend/utils/create_test_printers.py delete mode 100644 backend/utils/email_notification.py delete mode 100644 backend/utils/maintenance_system.py delete mode 100644 backend/utils/multi_location_system.py delete mode 100644 backend/utils/offline_config.py delete mode 100644 backend/utils/test_button_functionality.py delete mode 100644 backend/utils/test_p110.py delete mode 100644 backend/utils/test_tapo_direkt.py delete mode 100644 backend/utils/test_tapo_sofort.py delete mode 100644 backend/utils/update_requirements.py diff --git a/backend/utils/add_test_printers.py b/backend/utils/add_test_printers.py deleted file mode 100644 index d7232e25b..000000000 --- a/backend/utils/add_test_printers.py +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Skript zum Hinzufügen von Testdruckern zur Datenbank -""" - -import sys -import os -from datetime import datetime - -# Füge das Anwendungsverzeichnis zum Python-Pfad hinzu -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from models import get_db_session, Printer - -def add_test_printers(): - """Fügt Testdrucker zur Datenbank hinzu""" - - test_printers = [ - { - "name": "Prusa i3 MK3S+", - "model": "Prusa i3 MK3S+", - "location": "Labor A - Arbeitsplatz 1", - "mac_address": "AA:BB:CC:DD:EE:01", - "plug_ip": "192.168.1.101", - "status": "available", - "active": True - }, - { - "name": "Ender 3 V2", - "model": "Creality Ender 3 V2", - "location": "Labor A - Arbeitsplatz 2", - "mac_address": "AA:BB:CC:DD:EE:02", - "plug_ip": "192.168.1.102", - "status": "available", - "active": True - }, - { - "name": "Ultimaker S3", - "model": "Ultimaker S3", - "location": "Labor B - Arbeitsplatz 1", - "mac_address": "AA:BB:CC:DD:EE:03", - "plug_ip": "192.168.1.103", - "status": "available", - "active": True - }, - { - "name": "Bambu Lab X1 Carbon", - "model": "Bambu Lab X1 Carbon", - "location": "Labor B - Arbeitsplatz 2", - "mac_address": "AA:BB:CC:DD:EE:04", - "plug_ip": "192.168.1.104", - "status": "available", - "active": True - }, - { - "name": "Formlabs Form 3", - "model": "Formlabs Form 3", - "location": "Labor C - Harz-Bereich", - "mac_address": "AA:BB:CC:DD:EE:05", - "plug_ip": "192.168.1.105", - "status": "offline", - "active": False - } - ] - - db_session = get_db_session() - - try: - added_count = 0 - - for printer_data in test_printers: - # Prüfen, ob Drucker bereits existiert - existing = db_session.query(Printer).filter( - Printer.name == printer_data["name"] - ).first() - - if existing: - print(f"⚠️ Drucker '{printer_data['name']}' existiert bereits - überspringe") - continue - - # Neuen Drucker erstellen - new_printer = Printer( - name=printer_data["name"], - model=printer_data["model"], - location=printer_data["location"], - mac_address=printer_data["mac_address"], - plug_ip=printer_data["plug_ip"], - status=printer_data["status"], - active=printer_data["active"], - created_at=datetime.now() - ) - - db_session.add(new_printer) - added_count += 1 - print(f"✅ Drucker '{printer_data['name']}' hinzugefügt") - - if added_count > 0: - db_session.commit() - print(f"\n🎉 {added_count} Testdrucker erfolgreich zur Datenbank hinzugefügt!") - else: - print("\n📋 Alle Testdrucker existieren bereits in der Datenbank") - - # Zeige alle Drucker in der Datenbank - all_printers = db_session.query(Printer).all() - print(f"\n📊 Gesamt {len(all_printers)} Drucker in der Datenbank:") - print("-" * 80) - print(f"{'ID':<4} {'Name':<20} {'Modell':<20} {'Status':<12} {'Aktiv':<6}") - print("-" * 80) - - for printer in all_printers: - active_str = "✅" if printer.active else "❌" - print(f"{printer.id:<4} {printer.name[:19]:<20} {(printer.model or 'Unbekannt')[:19]:<20} {printer.status:<12} {active_str:<6}") - - db_session.close() - - except Exception as e: - db_session.rollback() - db_session.close() - print(f"❌ Fehler beim Hinzufügen der Testdrucker: {str(e)}") - return False - - return True - -def remove_test_printers(): - """Entfernt alle Testdrucker aus der Datenbank""" - - test_printer_names = [ - "Prusa i3 MK3S+", - "Ender 3 V2", - "Ultimaker S3", - "Bambu Lab X1 Carbon", - "Formlabs Form 3" - ] - - db_session = get_db_session() - - try: - removed_count = 0 - - for name in test_printer_names: - printer = db_session.query(Printer).filter(Printer.name == name).first() - if printer: - db_session.delete(printer) - removed_count += 1 - print(f"🗑️ Drucker '{name}' entfernt") - - if removed_count > 0: - db_session.commit() - print(f"\n🧹 {removed_count} Testdrucker erfolgreich entfernt!") - else: - print("\n📋 Keine Testdrucker zum Entfernen gefunden") - - db_session.close() - - except Exception as e: - db_session.rollback() - db_session.close() - print(f"❌ Fehler beim Entfernen der Testdrucker: {str(e)}") - return False - - return True - -if __name__ == "__main__": - print("=== MYP Druckerverwaltung - Testdrucker-Verwaltung ===") - print() - - if len(sys.argv) > 1 and sys.argv[1] == "--remove": - print("Entferne Testdrucker...") - remove_test_printers() - else: - print("Füge Testdrucker hinzu...") - print("(Verwende --remove um Testdrucker zu entfernen)") - print() - add_test_printers() - - print("\nFertig! 🚀") \ No newline at end of file diff --git a/backend/utils/create_ssl_cert.py b/backend/utils/create_ssl_cert.py deleted file mode 100644 index 4426ae4e9..000000000 --- a/backend/utils/create_ssl_cert.py +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -SSL-Zertifikat-Generator für die MYP-Plattform -Erstellt selbstsignierte SSL-Zertifikate für die lokale Entwicklung -""" - -import os -import datetime -import sys - -# Überprüfen, ob die notwendigen Pakete installiert sind -try: - from cryptography import x509 - from cryptography.x509.oid import NameOID - from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.primitives.asymmetric import rsa - from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption -except ImportError: - print("Fehler: Paket 'cryptography' nicht gefunden.") - print("Bitte installieren Sie es mit: pip install cryptography") - sys.exit(1) - -def create_self_signed_cert(cert_path, key_path, hostname="localhost"): - """ - Erstellt ein selbstsigniertes SSL-Zertifikat mit dem angegebenen Hostnamen. - - Args: - cert_path: Pfad zur Zertifikatsdatei - key_path: Pfad zur privaten Schlüsseldatei - hostname: Hostname für das Zertifikat (Standard: localhost) - """ - # Verzeichnis erstellen, falls es nicht existiert - cert_dir = os.path.dirname(cert_path) - if cert_dir and not os.path.exists(cert_dir): - os.makedirs(cert_dir, exist_ok=True) - - # Privaten Schlüssel generieren - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - ) - - # Schlüsseldatei schreiben - with open(key_path, "wb") as key_file: - key_file.write(private_key.private_bytes( - encoding=Encoding.PEM, - format=PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=NoEncryption() - )) - - # Name für das Zertifikat erstellen - subject = issuer = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, hostname), - ]) - - # Zertifikat erstellen - cert = x509.CertificateBuilder().subject_name( - subject - ).issuer_name( - issuer - ).public_key( - private_key.public_key() - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - datetime.datetime.utcnow() - ).not_valid_after( - datetime.datetime.utcnow() + datetime.timedelta(days=365) - ).add_extension( - x509.SubjectAlternativeName([x509.DNSName(hostname)]), - critical=False, - ).sign(private_key, hashes.SHA256()) - - # Zertifikatsdatei schreiben - with open(cert_path, "wb") as cert_file: - cert_file.write(cert.public_bytes(Encoding.PEM)) - - print(f"Selbstsigniertes SSL-Zertifikat für '{hostname}' erstellt:") - print(f"Zertifikat: {cert_path}") - print(f"Schlüssel: {key_path}") - print(f"Gültig für 1 Jahr.") - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="Erstellt selbstsignierte SSL-Zertifikate für die lokale Entwicklung") - parser.add_argument("-c", "--cert", default="/home/user/Projektarbeit-MYP/backend/app/certs/myp.crt", help="Pfad zur Zertifikatsdatei") - parser.add_argument("-k", "--key", default="/home/user/Projektarbeit-MYP/backend/app/certs/myp.key", help="Pfad zur Schlüsseldatei") - parser.add_argument("-n", "--hostname", default="localhost", help="Hostname für das Zertifikat") - - args = parser.parse_args() - - create_self_signed_cert(args.cert, args.key, args.hostname) \ No newline at end of file diff --git a/backend/utils/create_test_printers.py b/backend/utils/create_test_printers.py deleted file mode 100644 index 759645601..000000000 --- a/backend/utils/create_test_printers.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env python3 -""" -Script zum Erstellen von Test-Druckern für die MYP Plattform -""" - -import sys -import os -sys.path.append('.') - -from models import * -from datetime import datetime - -def create_test_printers(): - """Erstellt Test-Drucker in der Datenbank.""" - - # Verbindung zur Datenbank - db_session = get_db_session() - - # Test-Drucker Daten - test_printers = [ - { - 'name': 'Mercedes-Benz FDM Pro #01', - 'model': 'Ultimaker S5 Pro', - 'location': 'Werkhalle Sindelfingen', - 'plug_ip': '192.168.10.101', - 'status': 'available', - 'active': True - }, - { - 'name': 'Mercedes-Benz FDM #02', - 'model': 'Prusa MK3S+', - 'location': 'Entwicklungszentrum Stuttgart', - 'plug_ip': '192.168.10.102', - 'status': 'printing', - 'active': True - }, - { - 'name': 'Mercedes-Benz SLA #01', - 'model': 'Formlabs Form 3+', - 'location': 'Prototypenlabor', - 'plug_ip': '192.168.10.103', - 'status': 'available', - 'active': True - }, - { - 'name': 'Mercedes-Benz Industrial #01', - 'model': 'Stratasys F370', - 'location': 'Industriehalle Bremen', - 'plug_ip': '192.168.10.104', - 'status': 'maintenance', - 'active': False - }, - { - 'name': 'Mercedes-Benz Rapid #01', - 'model': 'Bambu Lab X1 Carbon', - 'location': 'Designabteilung', - 'plug_ip': '192.168.10.105', - 'status': 'offline', - 'active': True - }, - { - 'name': 'Mercedes-Benz SLS #01', - 'model': 'HP Jet Fusion 5200', - 'location': 'Produktionszentrum Berlin', - 'plug_ip': '192.168.10.106', - 'status': 'available', - 'active': True - } - ] - try: - created_count = 0 - for printer_data in test_printers: - # Prüfen ob Drucker bereits existiert - existing = db_session.query(Printer).filter_by(name=printer_data['name']).first() - if not existing: - printer = Printer( - name=printer_data['name'], - model=printer_data['model'], - location=printer_data['location'], - plug_ip=printer_data['plug_ip'], - status=printer_data['status'], - active=printer_data['active'], - created_at=datetime.now() - ) - db_session.add(printer) - created_count += 1 - print(f"✅ Drucker '{printer_data['name']}' erstellt") - else: - print(f"ℹ️ Drucker '{printer_data['name']}' existiert bereits") - - db_session.commit() - - total_count = db_session.query(Printer).count() - print(f"\n🎉 {created_count} neue Test-Drucker erstellt!") - print(f"📊 Insgesamt {total_count} Drucker in der Datenbank.") - - except Exception as e: - print(f"❌ Fehler beim Erstellen der Test-Drucker: {str(e)}") - db_session.rollback() - finally: - db_session.close() - -if __name__ == "__main__": - print("🚀 Erstelle Test-Drucker für MYP Plattform...") - create_test_printers() - print("✅ Fertig!") \ No newline at end of file diff --git a/backend/utils/email_notification.py b/backend/utils/email_notification.py deleted file mode 100644 index fefab3504..000000000 --- a/backend/utils/email_notification.py +++ /dev/null @@ -1,175 +0,0 @@ -""" -Offline-kompatible E-Mail-Benachrichtigung für MYP-System -======================================================== - -Da das System im Produktionsbetrieb offline läuft, werden alle E-Mail-Benachrichtigungen -nur geloggt aber nicht tatsächlich versendet. -""" - -import logging -from datetime import datetime -from typing import Optional, Dict, Any - -from utils.logging_config import get_logger - -logger = get_logger("email_notification") - -class OfflineEmailNotification: - """ - Offline-E-Mail-Benachrichtigung die nur Logs erstellt. - Simuliert E-Mail-Versand für Offline-Betrieb. - """ - - def __init__(self): - self.enabled = False # Immer deaktiviert im Offline-Modus - logger.info("📧 Offline-E-Mail-Benachrichtigung initialisiert (kein echter E-Mail-Versand)") - - def send_email(self, to: str, subject: str, body: str, **kwargs) -> bool: - """ - Simuliert E-Mail-Versand durch Logging. - - Args: - to: E-Mail-Empfänger - subject: E-Mail-Betreff - body: E-Mail-Inhalt - **kwargs: Zusätzliche Parameter - - Returns: - bool: Immer True (Simulation erfolgreich) - """ - logger.info(f"📧 [OFFLINE-SIMULATION] E-Mail würde versendet werden:") - logger.info(f" 📮 An: {to}") - logger.info(f" 📋 Betreff: {subject}") - logger.info(f" 📝 Inhalt: {body[:100]}{'...' if len(body) > 100 else ''}") - logger.info(f" 🕒 Zeitpunkt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}") - - if kwargs: - logger.info(f" ⚙️ Zusätzliche Parameter: {kwargs}") - - return True - - def send_notification_email(self, recipient: str, notification_type: str, - data: Dict[str, Any]) -> bool: - """ - Sendet Benachrichtigungs-E-Mail (Offline-Simulation). - - Args: - recipient: E-Mail-Empfänger - notification_type: Art der Benachrichtigung - data: Daten für die Benachrichtigung - - Returns: - bool: Immer True (Simulation erfolgreich) - """ - subject = f"MYP-Benachrichtigung: {notification_type}" - body = f"Benachrichtigung vom MYP-System:\n\n{data}" - - return self.send_email(recipient, subject, body, notification_type=notification_type) - - def send_maintenance_notification(self, recipient: str, task_title: str, - task_description: str) -> bool: - """ - Sendet Wartungs-Benachrichtigung (Offline-Simulation). - - Args: - recipient: E-Mail-Empfänger - task_title: Titel der Wartungsaufgabe - task_description: Beschreibung der Wartungsaufgabe - - Returns: - bool: Immer True (Simulation erfolgreich) - """ - subject = f"MYP-Wartungsaufgabe: {task_title}" - body = f""" -Neue Wartungsaufgabe im MYP-System: - -Titel: {task_title} -Beschreibung: {task_description} -Erstellt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')} - -Bitte loggen Sie sich in das MYP-System ein, um weitere Details zu sehen. - """ - - return self.send_email(recipient, subject, body, task_type="maintenance") - -# Globale Instanz für einfache Verwendung -email_notifier = OfflineEmailNotification() - -def send_email_notification(recipient: str, subject: str, body: str, **kwargs) -> bool: - """ - Haupt-Funktion für E-Mail-Versand (Offline-kompatibel). - - Args: - recipient: E-Mail-Empfänger - subject: E-Mail-Betreff - body: E-Mail-Inhalt - **kwargs: Zusätzliche Parameter - - Returns: - bool: True wenn "erfolgreich" (geloggt) - """ - return email_notifier.send_email(recipient, subject, body, **kwargs) - -def send_maintenance_email(recipient: str, task_title: str, task_description: str) -> bool: - """ - Sendet Wartungs-E-Mail (Offline-kompatibel). - - Args: - recipient: E-Mail-Empfänger - task_title: Titel der Wartungsaufgabe - task_description: Beschreibung der Wartungsaufgabe - - Returns: - bool: True wenn "erfolgreich" (geloggt) - """ - return email_notifier.send_maintenance_notification(recipient, task_title, task_description) - -def send_guest_approval_email(recipient: str, otp_code: str, expires_at: str) -> bool: - """ - Sendet Gastauftrags-Genehmigung-E-Mail (Offline-kompatibel). - - Args: - recipient: E-Mail-Empfänger - otp_code: OTP-Code für den Gastauftrag - expires_at: Ablaufzeit des OTP-Codes - - Returns: - bool: True wenn "erfolgreich" (geloggt) - """ - subject = "MYP-Gastauftrag genehmigt" - body = f""" -Ihr Gastauftrag wurde genehmigt! - -OTP-Code: {otp_code} -Gültig bis: {expires_at} - -Bitte verwenden Sie diesen Code am MYP-Terminal, um Ihren Druckauftrag zu starten. - """ - - return email_notifier.send_email(recipient, subject, body, - otp_code=otp_code, expires_at=expires_at) - -def send_guest_rejection_email(recipient: str, reason: str) -> bool: - """ - Sendet Gastauftrags-Ablehnungs-E-Mail (Offline-kompatibel). - - Args: - recipient: E-Mail-Empfänger - reason: Grund für die Ablehnung - - Returns: - bool: True wenn "erfolgreich" (geloggt) - """ - subject = "MYP-Gastauftrag abgelehnt" - body = f""" -Ihr Gastauftrag wurde leider abgelehnt. - -Grund: {reason} - -Bei Fragen wenden Sie sich bitte an das MYP-Team. - """ - - return email_notifier.send_email(recipient, subject, body, rejection_reason=reason) - -# Für Backward-Kompatibilität -send_notification = send_email_notification \ No newline at end of file diff --git a/backend/utils/maintenance_system.py b/backend/utils/maintenance_system.py deleted file mode 100644 index e9bc752ab..000000000 --- a/backend/utils/maintenance_system.py +++ /dev/null @@ -1,790 +0,0 @@ -""" -Wartungsplanungs- und Tracking-System für das MYP-System -======================================================== - -Dieses Modul stellt umfassende Wartungsfunktionalität bereit: -- Geplante und ungeplante Wartungen -- Wartungsintervalle und Erinnerungen -- Wartungshistorie und Berichte -- Automatische Wartungsprüfungen -- Ersatzteil-Management -- Techniker-Zuweisungen -""" - -import asyncio -import json -import logging -from datetime import datetime, timedelta -from typing import Dict, List, Any, Optional, Callable -from dataclasses import dataclass, asdict -from enum import Enum -import threading -import schedule -import time - -from utils.logging_config import get_logger -from models import Printer, get_db_session -from utils.email_notification import send_email_notification -from utils.realtime_dashboard import emit_system_alert - -logger = get_logger("maintenance") - -class MaintenanceType(Enum): - """Arten von Wartungen""" - PREVENTIVE = "preventive" # Vorbeugende Wartung - CORRECTIVE = "corrective" # Reparatur/Korrektur - EMERGENCY = "emergency" # Notfall-Wartung - SCHEDULED = "scheduled" # Geplante Wartung - INSPECTION = "inspection" # Inspektion - -class MaintenanceStatus(Enum): - """Status einer Wartung""" - PLANNED = "planned" # Geplant - SCHEDULED = "scheduled" # Terminiert - IN_PROGRESS = "in_progress" # In Bearbeitung - COMPLETED = "completed" # Abgeschlossen - CANCELLED = "cancelled" # Abgebrochen - OVERDUE = "overdue" # Überfällig - -class MaintenancePriority(Enum): - """Priorität einer Wartung""" - LOW = "low" # Niedrig - NORMAL = "normal" # Normal - HIGH = "high" # Hoch - CRITICAL = "critical" # Kritisch - EMERGENCY = "emergency" # Notfall - -@dataclass -class MaintenanceTask: - """Wartungsaufgabe""" - id: Optional[int] = None - printer_id: int = None - title: str = "" - description: str = "" - maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE - priority: MaintenancePriority = MaintenancePriority.NORMAL - status: MaintenanceStatus = MaintenanceStatus.PLANNED - scheduled_date: Optional[datetime] = None - due_date: Optional[datetime] = None - estimated_duration: int = 60 # Minuten - actual_duration: Optional[int] = None - assigned_technician: Optional[str] = None - created_at: datetime = None - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - notes: str = "" - required_parts: List[str] = None - actual_parts_used: List[str] = None - cost: Optional[float] = None - checklist: List[Dict[str, Any]] = None - photos: List[str] = None - created_by: Optional[int] = None - -@dataclass -class MaintenanceSchedule: - """Wartungsplan""" - printer_id: int - maintenance_type: MaintenanceType - interval_days: int - next_due: datetime - last_completed: Optional[datetime] = None - is_active: bool = True - description: str = "" - checklist_template: List[str] = None - -@dataclass -class MaintenanceMetrics: - """Wartungsmetriken""" - total_tasks: int = 0 - completed_tasks: int = 0 - overdue_tasks: int = 0 - average_completion_time: float = 0.0 - total_cost: float = 0.0 - mtbf: float = 0.0 # Mean Time Between Failures - mttr: float = 0.0 # Mean Time To Repair - uptime_percentage: float = 0.0 - -class MaintenanceManager: - """Manager für Wartungsplanung und -tracking""" - - def __init__(self): - self.tasks: Dict[int, MaintenanceTask] = {} - self.schedules: Dict[int, List[MaintenanceSchedule]] = {} - self.maintenance_history: List[MaintenanceTask] = [] - self.next_task_id = 1 - self.is_running = False - - self._setup_scheduler() - - def _setup_scheduler(self): - """Richtet automatische Wartungsplanung ein""" - schedule.every().day.at("06:00").do(self._check_scheduled_maintenance) - schedule.every().hour.do(self._check_overdue_tasks) - schedule.every().monday.at("08:00").do(self._generate_weekly_report) - - # Scheduler in separatem Thread - def run_scheduler(): - while self.is_running: - schedule.run_pending() - time.sleep(60) # Check every minute - - self.is_running = True - scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) - scheduler_thread.start() - - logger.info("Wartungs-Scheduler gestartet") - - def create_task(self, task: MaintenanceTask) -> int: - """Erstellt eine neue Wartungsaufgabe""" - task.id = self.next_task_id - self.next_task_id += 1 - task.created_at = datetime.now() - - self.tasks[task.id] = task - - # Automatische Terminierung für vorbeugende Wartungen - if task.maintenance_type == MaintenanceType.PREVENTIVE and not task.scheduled_date: - task.scheduled_date = self._calculate_next_maintenance_date(task.printer_id) - - # Benachrichtigungen senden - self._send_task_notifications(task, "created") - - logger.info(f"Wartungsaufgabe erstellt: {task.title} für Drucker {task.printer_id}") - return task.id - - def update_task_status(self, task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool: - """Aktualisiert den Status einer Wartungsaufgabe""" - if task_id not in self.tasks: - return False - - task = self.tasks[task_id] - old_status = task.status - task.status = new_status - - # Zeitstempel setzen - if new_status == MaintenanceStatus.IN_PROGRESS: - task.started_at = datetime.now() - elif new_status == MaintenanceStatus.COMPLETED: - task.completed_at = datetime.now() - if task.started_at: - task.actual_duration = int((task.completed_at - task.started_at).total_seconds() / 60) - - # Zur Historie hinzufügen - self.maintenance_history.append(task) - - # Nächste Wartung planen - self._schedule_next_maintenance(task) - - if notes: - task.notes += f"\n{datetime.now().strftime('%d.%m.%Y %H:%M')}: {notes}" - - # Benachrichtigungen senden - if old_status != new_status: - self._send_task_notifications(task, "status_changed") - - logger.info(f"Wartungsaufgabe {task_id} Status: {old_status.value} → {new_status.value}") - return True - - def schedule_maintenance(self, printer_id: int, maintenance_type: MaintenanceType, - interval_days: int, description: str = "") -> MaintenanceSchedule: - """Plant regelmäßige Wartungen""" - schedule_item = MaintenanceSchedule( - printer_id=printer_id, - maintenance_type=maintenance_type, - interval_days=interval_days, - next_due=datetime.now() + timedelta(days=interval_days), - description=description - ) - - if printer_id not in self.schedules: - self.schedules[printer_id] = [] - - self.schedules[printer_id].append(schedule_item) - - logger.info(f"Wartungsplan erstellt: {maintenance_type.value} alle {interval_days} Tage für Drucker {printer_id}") - return schedule_item - - def get_upcoming_maintenance(self, days_ahead: int = 7) -> List[MaintenanceTask]: - """Holt anstehende Wartungen""" - cutoff_date = datetime.now() + timedelta(days=days_ahead) - - upcoming = [] - for task in self.tasks.values(): - if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and - task.due_date and task.due_date <= cutoff_date): - upcoming.append(task) - - return sorted(upcoming, key=lambda t: t.due_date or datetime.max) - - def get_overdue_tasks(self) -> List[MaintenanceTask]: - """Holt überfällige Wartungen""" - now = datetime.now() - overdue = [] - - for task in self.tasks.values(): - if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and - task.due_date and task.due_date < now): - task.status = MaintenanceStatus.OVERDUE - overdue.append(task) - - return overdue - - def get_maintenance_metrics(self, printer_id: Optional[int] = None, - start_date: Optional[datetime] = None, - end_date: Optional[datetime] = None) -> MaintenanceMetrics: - """Berechnet Wartungsmetriken""" - # Filter tasks - tasks = self.maintenance_history.copy() - if printer_id: - tasks = [t for t in tasks if t.printer_id == printer_id] - if start_date: - tasks = [t for t in tasks if t.completed_at and t.completed_at >= start_date] - if end_date: - tasks = [t for t in tasks if t.completed_at and t.completed_at <= end_date] - - if not tasks: - return MaintenanceMetrics() - - completed_tasks = [t for t in tasks if t.status == MaintenanceStatus.COMPLETED] - - # Grundmetriken - total_tasks = len(tasks) - completed_count = len(completed_tasks) - - # Durchschnittliche Bearbeitungszeit - completion_times = [t.actual_duration for t in completed_tasks if t.actual_duration] - avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else 0 - - # Gesamtkosten - total_cost = sum(t.cost for t in completed_tasks if t.cost) - - # MTBF und MTTR berechnen - mtbf = self._calculate_mtbf(tasks, printer_id) - mttr = avg_completion_time / 60 # Konvertiere zu Stunden - - # Verfügbarkeit berechnen - uptime_percentage = self._calculate_uptime(printer_id, start_date, end_date) - - return MaintenanceMetrics( - total_tasks=total_tasks, - completed_tasks=completed_count, - overdue_tasks=len(self.get_overdue_tasks()), - average_completion_time=avg_completion_time, - total_cost=total_cost, - mtbf=mtbf, - mttr=mttr, - uptime_percentage=uptime_percentage - ) - - def create_maintenance_checklist(self, maintenance_type: MaintenanceType) -> List[Dict[str, Any]]: - """Erstellt eine Wartungs-Checkliste""" - checklists = { - MaintenanceType.PREVENTIVE: [ - {"task": "Drucker äußerlich reinigen", "completed": False, "required": True}, - {"task": "Druckbett-Level prüfen", "completed": False, "required": True}, - {"task": "Extruder-Düse reinigen", "completed": False, "required": True}, - {"task": "Riemen-Spannung prüfen", "completed": False, "required": True}, - {"task": "Filament-Führung prüfen", "completed": False, "required": False}, - {"task": "Software-Updates prüfen", "completed": False, "required": False}, - {"task": "Lüfter reinigen", "completed": False, "required": True}, - {"task": "Schrauben nachziehen", "completed": False, "required": False} - ], - MaintenanceType.CORRECTIVE: [ - {"task": "Problem-Diagnose durchführen", "completed": False, "required": True}, - {"task": "Defekte Teile identifizieren", "completed": False, "required": True}, - {"task": "Ersatzteile bestellen/bereitstellen", "completed": False, "required": True}, - {"task": "Reparatur durchführen", "completed": False, "required": True}, - {"task": "Funktionstest durchführen", "completed": False, "required": True}, - {"task": "Kalibrierung prüfen", "completed": False, "required": True} - ], - MaintenanceType.INSPECTION: [ - {"task": "Sichtprüfung der Mechanik", "completed": False, "required": True}, - {"task": "Druckqualität testen", "completed": False, "required": True}, - {"task": "Temperaturen prüfen", "completed": False, "required": True}, - {"task": "Bewegungen testen", "completed": False, "required": True}, - {"task": "Verschleiß bewerten", "completed": False, "required": True} - ] - } - - return checklists.get(maintenance_type, []) - - def _check_scheduled_maintenance(self): - """Prüft täglich auf fällige Wartungen""" - logger.info("Prüfe fällige Wartungen...") - - today = datetime.now() - - for printer_id, schedules in self.schedules.items(): - for schedule_item in schedules: - if not schedule_item.is_active: - continue - - if schedule_item.next_due <= today: - # Erstelle Wartungsaufgabe - task = MaintenanceTask( - printer_id=printer_id, - title=f"{schedule_item.maintenance_type.value.title()} Wartung", - description=schedule_item.description, - maintenance_type=schedule_item.maintenance_type, - priority=MaintenancePriority.NORMAL, - due_date=schedule_item.next_due, - checklist=self.create_maintenance_checklist(schedule_item.maintenance_type) - ) - - task_id = self.create_task(task) - - # Nächsten Termin berechnen - schedule_item.next_due = today + timedelta(days=schedule_item.interval_days) - - logger.info(f"Automatische Wartungsaufgabe erstellt: {task_id}") - - def _check_overdue_tasks(self): - """Prüft stündlich auf überfällige Aufgaben""" - overdue = self.get_overdue_tasks() - - if overdue: - logger.warning(f"{len(overdue)} überfällige Wartungsaufgaben gefunden") - - for task in overdue: - emit_system_alert( - f"Wartung überfällig: {task.title} (Drucker {task.printer_id})", - "warning", - "high" - ) - - def _generate_weekly_report(self): - """Generiert wöchentlichen Wartungsbericht""" - logger.info("Generiere wöchentlichen Wartungsbericht...") - - # Sammle Daten der letzten Woche - last_week = datetime.now() - timedelta(days=7) - metrics = self.get_maintenance_metrics(start_date=last_week) - - # Sende Report (Implementation abhängig von verfügbaren Services) - # send_maintenance_report(metrics) - - def _calculate_next_maintenance_date(self, printer_id: int) -> datetime: - """Berechnet nächstes Wartungsdatum basierend auf Nutzung""" - # Vereinfachte Implementierung - kann erweitert werden - base_interval = 30 # Tage - - # Hier könnte man Nutzungsstatistiken einbeziehen - with get_db_session() as db_session: - printer = db_session.query(Printer).filter(Printer.id == printer_id).first() - if printer: - # Berücksichtige letzten Check - if printer.last_checked: - days_since_check = (datetime.now() - printer.last_checked).days - if days_since_check < 15: # Kürzlich gecheckt - base_interval += 15 - - return datetime.now() + timedelta(days=base_interval) - - def _schedule_next_maintenance(self, completed_task: MaintenanceTask): - """Plant nächste Wartung nach Abschluss einer Aufgabe""" - if completed_task.maintenance_type == MaintenanceType.PREVENTIVE: - # Finde entsprechenden Schedule - printer_schedules = self.schedules.get(completed_task.printer_id, []) - for schedule_item in printer_schedules: - if schedule_item.maintenance_type == completed_task.maintenance_type: - schedule_item.last_completed = completed_task.completed_at - schedule_item.next_due = datetime.now() + timedelta(days=schedule_item.interval_days) - break - - def _calculate_mtbf(self, tasks: List[MaintenanceTask], printer_id: Optional[int]) -> float: - """Berechnet Mean Time Between Failures""" - # Vereinfachte MTBF-Berechnung - failure_tasks = [t for t in tasks if t.maintenance_type == MaintenanceType.CORRECTIVE] - - if len(failure_tasks) < 2: - return 0.0 - - # Zeitspanne zwischen ersten und letzten Ausfall - first_failure = min(failure_tasks, key=lambda t: t.created_at) - last_failure = max(failure_tasks, key=lambda t: t.created_at) - - total_time = (last_failure.created_at - first_failure.created_at).total_seconds() / 3600 # Stunden - failure_count = len(failure_tasks) - 1 - - return total_time / failure_count if failure_count > 0 else 0.0 - - def _calculate_uptime(self, printer_id: Optional[int], start_date: Optional[datetime], - end_date: Optional[datetime]) -> float: - """Berechnet Verfügbarkeit in Prozent""" - # Vereinfachte Uptime-Berechnung - if not start_date: - start_date = datetime.now() - timedelta(days=30) - if not end_date: - end_date = datetime.now() - - total_time = (end_date - start_date).total_seconds() - - # Berechne Downtime aus Wartungszeiten - downtime = 0 - for task in self.maintenance_history: - if printer_id and task.printer_id != printer_id: - continue - - if (task.status == MaintenanceStatus.COMPLETED and - task.started_at and task.completed_at and - task.started_at >= start_date and task.completed_at <= end_date): - downtime += (task.completed_at - task.started_at).total_seconds() - - uptime = ((total_time - downtime) / total_time) * 100 if total_time > 0 else 0 - return max(0, min(100, uptime)) - - def _send_task_notifications(self, task: MaintenanceTask, event_type: str): - """Sendet Benachrichtigungen für Wartungsaufgaben""" - try: - if event_type == "created": - emit_system_alert( - f"Neue Wartungsaufgabe: {task.title} (Drucker {task.printer_id})", - "info", - "normal" - ) - elif event_type == "status_changed": - emit_system_alert( - f"Wartungsstatus geändert: {task.title} → {task.status.value}", - "info", - "normal" - ) - except Exception as e: - logger.error(f"Fehler beim Senden der Wartungsbenachrichtigung: {str(e)}") - -# Globale Instanz -maintenance_manager = MaintenanceManager() - -def get_maintenance_dashboard_data() -> Dict[str, Any]: - """Holt Dashboard-Daten für Wartungen""" - upcoming = maintenance_manager.get_upcoming_maintenance() - overdue = maintenance_manager.get_overdue_tasks() - metrics = maintenance_manager.get_maintenance_metrics() - - return { - 'upcoming_count': len(upcoming), - 'overdue_count': len(overdue), - 'upcoming_tasks': [asdict(task) for task in upcoming[:5]], - 'overdue_tasks': [asdict(task) for task in overdue], - 'metrics': asdict(metrics), - 'next_scheduled': upcoming[0] if upcoming else None - } - -def create_emergency_maintenance(printer_id: int, description: str, - priority: MaintenancePriority = MaintenancePriority.CRITICAL) -> int: - """Erstellt eine Notfall-Wartung""" - task = MaintenanceTask( - printer_id=printer_id, - title="Notfall-Wartung", - description=description, - maintenance_type=MaintenanceType.EMERGENCY, - priority=priority, - due_date=datetime.now(), # Sofort fällig - checklist=maintenance_manager.create_maintenance_checklist(MaintenanceType.CORRECTIVE) - ) - - return maintenance_manager.create_task(task) - -def schedule_preventive_maintenance(printer_id: int, interval_days: int = 30) -> MaintenanceSchedule: - """Plant vorbeugende Wartung""" - return maintenance_manager.schedule_maintenance( - printer_id=printer_id, - maintenance_type=MaintenanceType.PREVENTIVE, - interval_days=interval_days, - description="Regelmäßige vorbeugende Wartung" - ) - -# JavaScript für Wartungs-Frontend -def get_maintenance_javascript() -> str: - """JavaScript für Wartungsmanagement""" - return """ - class MaintenanceManager { - constructor() { - this.currentTasks = []; - this.selectedTask = null; - - this.init(); - } - - init() { - this.loadTasks(); - this.setupEventListeners(); - this.startAutoRefresh(); - } - - setupEventListeners() { - // Task status updates - document.addEventListener('click', (e) => { - if (e.target.matches('.maintenance-status-btn')) { - const taskId = e.target.dataset.taskId; - const newStatus = e.target.dataset.status; - this.updateTaskStatus(taskId, newStatus); - } - - if (e.target.matches('.maintenance-details-btn')) { - const taskId = e.target.dataset.taskId; - this.showTaskDetails(taskId); - } - }); - - // Create maintenance form - const createForm = document.getElementById('create-maintenance-form'); - createForm?.addEventListener('submit', (e) => { - e.preventDefault(); - this.createTask(new FormData(createForm)); - }); - } - - async loadTasks() { - try { - const response = await fetch('/api/maintenance/tasks'); - const data = await response.json(); - - if (data.success) { - this.currentTasks = data.tasks; - this.renderTasks(); - } - } catch (error) { - console.error('Fehler beim Laden der Wartungsaufgaben:', error); - } - } - - async updateTaskStatus(taskId, newStatus) { - try { - const response = await fetch(`/api/maintenance/tasks/${taskId}/status`, { - method: 'PUT', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ status: newStatus }) - }); - - const result = await response.json(); - - if (result.success) { - this.loadTasks(); // Refresh - this.showNotification('Wartungsstatus aktualisiert', 'success'); - } else { - this.showNotification('Fehler beim Aktualisieren', 'error'); - } - } catch (error) { - console.error('Status-Update fehlgeschlagen:', error); - } - } - - renderTasks() { - const container = document.getElementById('maintenance-tasks-container'); - if (!container) return; - - container.innerHTML = this.currentTasks.map(task => ` -
-
-

${task.title}

- ${task.priority} -
-
-

Drucker: ${task.printer_id}

-

Typ: ${task.maintenance_type}

-

Fällig: ${this.formatDate(task.due_date)}

-

Status: ${task.status}

-
-
- - - -
-
- `).join(''); - } - - showTaskDetails(taskId) { - const task = this.currentTasks.find(t => t.id == taskId); - if (!task) return; - - // Create modal with task details - const modal = document.createElement('div'); - modal.className = 'maintenance-modal'; - modal.innerHTML = ` - - `; - - document.body.appendChild(modal); - - // Close modal handlers - modal.querySelector('.close-modal').onclick = () => modal.remove(); - modal.onclick = (e) => { - if (e.target === modal) modal.remove(); - }; - } - - renderChecklist(checklist) { - return ` -
-

Checkliste:

- ${checklist.map((item, index) => ` - - `).join('')} -
- `; - } - - formatDate(dateString) { - if (!dateString) return 'Nicht gesetzt'; - const date = new Date(dateString); - return date.toLocaleDateString('de-DE') + ' ' + date.toLocaleTimeString('de-DE', {hour: '2-digit', minute: '2-digit'}); - } - - showNotification(message, type = 'info') { - const notification = document.createElement('div'); - notification.className = `notification notification-${type}`; - notification.textContent = message; - - document.body.appendChild(notification); - - setTimeout(() => { - notification.remove(); - }, 3000); - } - - startAutoRefresh() { - setInterval(() => { - this.loadTasks(); - }, 30000); // Refresh every 30 seconds - } - } - - // Initialize when DOM is ready - document.addEventListener('DOMContentLoaded', function() { - window.maintenanceManager = new MaintenanceManager(); - }); - """ - -def create_maintenance_task(printer_id: int, title: str, description: str = "", - maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE, - priority: MaintenancePriority = MaintenancePriority.NORMAL) -> int: - """ - Erstellt eine neue Wartungsaufgabe. - - Args: - printer_id: ID des Druckers - title: Titel der Wartungsaufgabe - description: Beschreibung der Aufgabe - maintenance_type: Art der Wartung - priority: Priorität der Aufgabe - - Returns: - int: ID der erstellten Aufgabe - """ - task = MaintenanceTask( - printer_id=printer_id, - title=title, - description=description, - maintenance_type=maintenance_type, - priority=priority, - checklist=maintenance_manager.create_maintenance_checklist(maintenance_type) - ) - - return maintenance_manager.create_task(task) - -def schedule_maintenance(printer_id: int, maintenance_type: MaintenanceType, - interval_days: int, description: str = "") -> MaintenanceSchedule: - """ - Plant regelmäßige Wartungen (Alias für maintenance_manager.schedule_maintenance). - - Args: - printer_id: ID des Druckers - maintenance_type: Art der Wartung - interval_days: Intervall in Tagen - description: Beschreibung - - Returns: - MaintenanceSchedule: Erstellter Wartungsplan - """ - return maintenance_manager.schedule_maintenance( - printer_id=printer_id, - maintenance_type=maintenance_type, - interval_days=interval_days, - description=description - ) - -def get_maintenance_overview() -> Dict[str, Any]: - """ - Holt eine Übersicht aller Wartungsaktivitäten. - - Returns: - Dict: Wartungsübersicht mit Statistiken und anstehenden Aufgaben - """ - upcoming = maintenance_manager.get_upcoming_maintenance() - overdue = maintenance_manager.get_overdue_tasks() - metrics = maintenance_manager.get_maintenance_metrics() - - # Aktive Tasks - active_tasks = [task for task in maintenance_manager.tasks.values() - if task.status == MaintenanceStatus.IN_PROGRESS] - - # Completed tasks in last 30 days - thirty_days_ago = datetime.now() - timedelta(days=30) - recent_completed = [task for task in maintenance_manager.maintenance_history - if task.completed_at and task.completed_at >= thirty_days_ago] - - return { - 'summary': { - 'total_tasks': len(maintenance_manager.tasks), - 'active_tasks': len(active_tasks), - 'upcoming_tasks': len(upcoming), - 'overdue_tasks': len(overdue), - 'completed_this_month': len(recent_completed) - }, - 'upcoming_tasks': [asdict(task) for task in upcoming[:10]], - 'overdue_tasks': [asdict(task) for task in overdue], - 'active_tasks': [asdict(task) for task in active_tasks], - 'recent_completed': [asdict(task) for task in recent_completed[:5]], - 'metrics': asdict(metrics), - 'schedules': { - printer_id: [asdict(schedule) for schedule in schedules] - for printer_id, schedules in maintenance_manager.schedules.items() - } - } - -def update_maintenance_status(task_id: int, new_status: MaintenanceStatus, - notes: str = "") -> bool: - """ - Aktualisiert den Status einer Wartungsaufgabe (Alias für maintenance_manager.update_task_status). - - Args: - task_id: ID der Wartungsaufgabe - new_status: Neuer Status - notes: Optionale Notizen - - Returns: - bool: True wenn erfolgreich aktualisiert - """ - return maintenance_manager.update_task_status(task_id, new_status, notes) \ No newline at end of file diff --git a/backend/utils/multi_location_system.py b/backend/utils/multi_location_system.py deleted file mode 100644 index 7d9e629fd..000000000 --- a/backend/utils/multi_location_system.py +++ /dev/null @@ -1,899 +0,0 @@ -""" -Multi-Standort-Unterstützungssystem für das MYP-System -====================================================== - -Dieses Modul stellt umfassende Multi-Location-Funktionalität bereit: -- Standort-Management und Hierarchien -- Standort-spezifische Konfigurationen -- Zentrale und dezentrale Verwaltung -- Standort-übergreifende Berichte -- Ressourcen-Sharing zwischen Standorten -- Benutzer-Standort-Zuweisungen -""" - -import json -import logging -from datetime import datetime, timedelta -from typing import Dict, List, Any, Optional, Tuple -from dataclasses import dataclass, asdict -from enum import Enum -import geocoder -import requests - -from utils.logging_config import get_logger -from models import User, Printer, Job, get_db_session - -logger = get_logger("multi_location") - -class LocationType(Enum): - """Arten von Standorten""" - HEADQUARTERS = "headquarters" # Hauptsitz - BRANCH = "branch" # Niederlassung - DEPARTMENT = "department" # Abteilung - FLOOR = "floor" # Stockwerk - ROOM = "room" # Raum - AREA = "area" # Bereich - -class AccessLevel(Enum): - """Zugriffslevel für Standorte""" - FULL = "full" # Vollzugriff - READ_WRITE = "read_write" # Lesen und Schreiben - READ_ONLY = "read_only" # Nur Lesen - NO_ACCESS = "no_access" # Kein Zugriff - -@dataclass -class LocationConfig: - """Standort-spezifische Konfiguration""" - timezone: str = "Europe/Berlin" - business_hours: Dict[str, str] = None - maintenance_window: Dict[str, str] = None - auto_approval_enabled: bool = False - max_job_duration: int = 480 # Minuten - contact_info: Dict[str, str] = None - notification_settings: Dict[str, Any] = None - -@dataclass -class Location: - """Standort-Definition""" - id: Optional[int] = None - name: str = "" - code: str = "" # Kurzer Code für den Standort - location_type: LocationType = LocationType.BRANCH - parent_id: Optional[int] = None - address: str = "" - city: str = "" - country: str = "" - postal_code: str = "" - latitude: Optional[float] = None - longitude: Optional[float] = None - description: str = "" - config: LocationConfig = None - is_active: bool = True - created_at: datetime = None - manager_id: Optional[int] = None - - def __post_init__(self): - if self.config is None: - self.config = LocationConfig() - if self.created_at is None: - self.created_at = datetime.now() - -@dataclass -class UserLocationAccess: - """Benutzer-Standort-Zugriff""" - user_id: int - location_id: int - access_level: AccessLevel - granted_by: int - granted_at: datetime - expires_at: Optional[datetime] = None - is_primary: bool = False - -class MultiLocationManager: - """Manager für Multi-Standort-Funktionalität""" - - def __init__(self): - self.locations: Dict[int, Location] = {} - self.user_access: Dict[int, List[UserLocationAccess]] = {} - self.next_location_id = 1 - - # Standard-Standort erstellen - self._create_default_location() - - def _create_default_location(self): - """Erstellt Standard-Standort falls keiner existiert""" - default_location = Location( - id=1, - name="Hauptstandort", - code="HQ", - location_type=LocationType.HEADQUARTERS, - address="Mercedes-Benz Platz", - city="Stuttgart", - country="Deutschland", - description="Hauptstandort des MYP-Systems" - ) - - self.locations[1] = default_location - self.next_location_id = 2 - - logger.info("Standard-Standort erstellt") - - def create_location(self, location: Location) -> int: - """Erstellt einen neuen Standort""" - location.id = self.next_location_id - self.next_location_id += 1 - - # Koordinaten automatisch ermitteln - if not location.latitude or not location.longitude: - self._geocode_location(location) - - self.locations[location.id] = location - - logger.info(f"Standort erstellt: {location.name} ({location.code})") - return location.id - - def update_location(self, location_id: int, updates: Dict[str, Any]) -> bool: - """Aktualisiert einen Standort""" - if location_id not in self.locations: - return False - - location = self.locations[location_id] - - for key, value in updates.items(): - if hasattr(location, key): - setattr(location, key, value) - - # Koordinaten neu ermitteln bei Adressänderung - if 'address' in updates or 'city' in updates: - self._geocode_location(location) - - logger.info(f"Standort aktualisiert: {location.name}") - return True - - def delete_location(self, location_id: int) -> bool: - """Löscht einen Standort (Soft Delete)""" - if location_id not in self.locations: - return False - - location = self.locations[location_id] - - # Prüfe ob Standort Kinder hat - children = self.get_child_locations(location_id) - if children: - logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat Unterstandorte") - return False - - # Prüfe auf aktive Ressourcen - if self._has_active_resources(location_id): - logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat aktive Ressourcen") - return False - - location.is_active = False - logger.info(f"Standort deaktiviert: {location.name}") - return True - - def get_location_hierarchy(self, location_id: Optional[int] = None) -> Dict[str, Any]: - """Holt Standort-Hierarchie""" - if location_id: - # Spezifische Hierarchie ab einem Standort - location = self.locations.get(location_id) - if not location: - return {} - - return self._build_hierarchy_node(location) - else: - # Komplette Hierarchie - root_locations = [loc for loc in self.locations.values() - if loc.parent_id is None and loc.is_active] - - return { - 'locations': [self._build_hierarchy_node(loc) for loc in root_locations] - } - - def _build_hierarchy_node(self, location: Location) -> Dict[str, Any]: - """Erstellt einen Hierarchie-Knoten""" - children = self.get_child_locations(location.id) - - return { - 'id': location.id, - 'name': location.name, - 'code': location.code, - 'type': location.location_type.value, - 'children': [self._build_hierarchy_node(child) for child in children], - 'resource_count': self._count_location_resources(location.id) - } - - def get_child_locations(self, parent_id: int) -> List[Location]: - """Holt alle Kinder-Standorte""" - return [loc for loc in self.locations.values() - if loc.parent_id == parent_id and loc.is_active] - - def get_location_path(self, location_id: int) -> List[Location]: - """Holt den Pfad vom Root zum Standort""" - path = [] - current_id = location_id - - while current_id: - location = self.locations.get(current_id) - if not location: - break - - path.insert(0, location) - current_id = location.parent_id - - return path - - def grant_location_access(self, user_id: int, location_id: int, - access_level: AccessLevel, granted_by: int, - expires_at: Optional[datetime] = None, - is_primary: bool = False) -> bool: - """Gewährt Benutzer-Zugriff auf einen Standort""" - if location_id not in self.locations: - return False - - access = UserLocationAccess( - user_id=user_id, - location_id=location_id, - access_level=access_level, - granted_by=granted_by, - granted_at=datetime.now(), - expires_at=expires_at, - is_primary=is_primary - ) - - if user_id not in self.user_access: - self.user_access[user_id] = [] - - # Entferne vorherigen Zugriff für diesen Standort - self.user_access[user_id] = [ - acc for acc in self.user_access[user_id] - if acc.location_id != location_id - ] - - # Setze anderen primary-Zugriff zurück falls nötig - if is_primary: - for access_item in self.user_access[user_id]: - access_item.is_primary = False - - self.user_access[user_id].append(access) - - logger.info(f"Standort-Zugriff gewährt: User {user_id} → Location {location_id} ({access_level.value})") - return True - - def revoke_location_access(self, user_id: int, location_id: int) -> bool: - """Entzieht Benutzer-Zugriff auf einen Standort""" - if user_id not in self.user_access: - return False - - original_count = len(self.user_access[user_id]) - self.user_access[user_id] = [ - acc for acc in self.user_access[user_id] - if acc.location_id != location_id - ] - - success = len(self.user_access[user_id]) < original_count - if success: - logger.info(f"Standort-Zugriff entzogen: User {user_id} → Location {location_id}") - - return success - - def get_user_locations(self, user_id: int, access_level: Optional[AccessLevel] = None) -> List[Location]: - """Holt alle Standorte eines Benutzers""" - if user_id not in self.user_access: - return [] - - accessible_locations = [] - now = datetime.now() - - for access in self.user_access[user_id]: - # Prüfe Ablaufzeit - if access.expires_at and access.expires_at < now: - continue - - # Prüfe Access Level - if access_level and access.access_level != access_level: - continue - - location = self.locations.get(access.location_id) - if location and location.is_active: - accessible_locations.append(location) - - return accessible_locations - - def get_user_primary_location(self, user_id: int) -> Optional[Location]: - """Holt den primären Standort eines Benutzers""" - if user_id not in self.user_access: - return None - - for access in self.user_access[user_id]: - if access.is_primary: - return self.locations.get(access.location_id) - - # Fallback: ersten verfügbaren Standort nehmen - user_locations = self.get_user_locations(user_id) - return user_locations[0] if user_locations else None - - def check_user_access(self, user_id: int, location_id: int, - required_level: AccessLevel = AccessLevel.READ_ONLY) -> bool: - """Prüft ob Benutzer Zugriff auf Standort hat""" - if user_id not in self.user_access: - return False - - access_levels = { - AccessLevel.NO_ACCESS: 0, - AccessLevel.READ_ONLY: 1, - AccessLevel.READ_WRITE: 2, - AccessLevel.FULL: 3 - } - - required_level_value = access_levels[required_level] - now = datetime.now() - - for access in self.user_access[user_id]: - if access.location_id != location_id: - continue - - # Prüfe Ablaufzeit - if access.expires_at and access.expires_at < now: - continue - - user_level_value = access_levels[access.access_level] - if user_level_value >= required_level_value: - return True - - return False - - def get_location_resources(self, location_id: int) -> Dict[str, Any]: - """Holt alle Ressourcen eines Standorts""" - if location_id not in self.locations: - return {} - - # Simuliere Datenbankabfrage für Drucker und Jobs - resources = { - 'printers': [], - 'active_jobs': [], - 'users': [], - 'pending_maintenance': 0 - } - - # In echter Implementierung würde hier die Datenbank abgefragt - with get_db_session() as db_session: - # Drucker des Standorts (vereinfacht - benötigt location_id in Printer-Model) - # printers = db_session.query(Printer).filter(Printer.location_id == location_id).all() - # resources['printers'] = [p.to_dict() for p in printers] - pass - - return resources - - def get_location_statistics(self, location_id: int, - start_date: Optional[datetime] = None, - end_date: Optional[datetime] = None) -> Dict[str, Any]: - """Holt Statistiken für einen Standort""" - if not start_date: - start_date = datetime.now() - timedelta(days=30) - if not end_date: - end_date = datetime.now() - - # Sammle Statistiken - stats = { - 'location': self.locations.get(location_id, {}).name if location_id in self.locations else 'Unbekannt', - 'period': { - 'start': start_date.isoformat(), - 'end': end_date.isoformat() - }, - 'totals': { - 'printers': 0, - 'jobs_completed': 0, - 'jobs_failed': 0, - 'print_time_hours': 0, - 'material_used_kg': 0, - 'users_active': 0 - }, - 'averages': { - 'jobs_per_day': 0, - 'job_duration_minutes': 0, - 'printer_utilization': 0 - }, - 'trends': { - 'daily_jobs': [], - 'printer_usage': [] - } - } - - # In echter Implementierung würden hier Datenbankabfragen stehen - - return stats - - def get_multi_location_report(self, location_ids: List[int] = None) -> Dict[str, Any]: - """Erstellt standortübergreifenden Bericht""" - if not location_ids: - location_ids = list(self.locations.keys()) - - report = { - 'generated_at': datetime.now().isoformat(), - 'locations': [], - 'summary': { - 'total_locations': len(location_ids), - 'total_printers': 0, - 'total_users': 0, - 'total_jobs': 0, - 'cross_location_sharing': [] - } - } - - for location_id in location_ids: - location = self.locations.get(location_id) - if not location: - continue - - location_stats = self.get_location_statistics(location_id) - location_data = { - 'id': location.id, - 'name': location.name, - 'code': location.code, - 'type': location.location_type.value, - 'statistics': location_stats - } - - report['locations'].append(location_data) - - # Summiere für Gesamtübersicht - totals = location_stats.get('totals', {}) - report['summary']['total_printers'] += totals.get('printers', 0) - report['summary']['total_users'] += totals.get('users_active', 0) - report['summary']['total_jobs'] += totals.get('jobs_completed', 0) - - return report - - def find_nearest_locations(self, latitude: float, longitude: float, - radius_km: float = 50, limit: int = 5) -> List[Tuple[Location, float]]: - """Findet nächstgelegene Standorte""" - from math import radians, sin, cos, sqrt, atan2 - - def calculate_distance(lat1, lon1, lat2, lon2): - """Berechnet Entfernung zwischen zwei Koordinaten (Haversine)""" - R = 6371 # Erdradius in km - - lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) - dlat = lat2 - lat1 - dlon = lon2 - lon1 - - a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 - c = 2 * atan2(sqrt(a), sqrt(1-a)) - - return R * c - - nearby_locations = [] - - for location in self.locations.values(): - if not location.is_active or not location.latitude or not location.longitude: - continue - - distance = calculate_distance( - latitude, longitude, - location.latitude, location.longitude - ) - - if distance <= radius_km: - nearby_locations.append((location, distance)) - - # Sortiere nach Entfernung - nearby_locations.sort(key=lambda x: x[1]) - - return nearby_locations[:limit] - - def _geocode_location(self, location: Location): - """Ermittelt Koordinaten für einen Standort""" - try: - address_parts = [location.address, location.city, location.country] - full_address = ', '.join(filter(None, address_parts)) - - if not full_address: - return - - # Verwende geocoder library - result = geocoder.osm(full_address) - - if result.ok: - location.latitude = result.lat - location.longitude = result.lng - logger.info(f"Koordinaten ermittelt für {location.name}: {location.latitude}, {location.longitude}") - else: - logger.warning(f"Koordinaten konnten nicht ermittelt werden für {location.name}") - - except Exception as e: - logger.error(f"Fehler bei Geocoding für {location.name}: {str(e)}") - - def _has_active_resources(self, location_id: int) -> bool: - """Prüft ob Standort aktive Ressourcen hat""" - # Vereinfachte Implementierung - # In echter Implementation würde hier die Datenbank geprüft - return False - - def _count_location_resources(self, location_id: int) -> Dict[str, int]: - """Zählt Ressourcen eines Standorts""" - # Vereinfachte Implementierung - return { - 'printers': 0, - 'users': 0, - 'jobs': 0 - } - -# Globale Instanz -location_manager = MultiLocationManager() - -# Alias für Import-Kompatibilität -LocationManager = MultiLocationManager - -def create_location(name: str, code: str, location_type: LocationType = LocationType.BRANCH, - address: str = "", city: str = "", country: str = "", - parent_id: Optional[int] = None) -> int: - """ - Erstellt einen neuen Standort (globale Funktion). - - Args: - name: Name des Standorts - code: Kurzer Code für den Standort - location_type: Art des Standorts - address: Adresse - city: Stadt - country: Land - parent_id: Parent-Standort ID - - Returns: - int: ID des erstellten Standorts - """ - location = Location( - name=name, - code=code, - location_type=location_type, - address=address, - city=city, - country=country, - parent_id=parent_id - ) - - return location_manager.create_location(location) - -def assign_user_to_location(user_id: int, location_id: int, - access_level: AccessLevel = AccessLevel.READ_WRITE, - granted_by: int = 1, is_primary: bool = False) -> bool: - """ - Weist einen Benutzer einem Standort zu. - - Args: - user_id: ID des Benutzers - location_id: ID des Standorts - access_level: Zugriffslevel - granted_by: ID des gewährenden Benutzers - is_primary: Ob dies der primäre Standort ist - - Returns: - bool: True wenn erfolgreich - """ - return location_manager.grant_location_access( - user_id=user_id, - location_id=location_id, - access_level=access_level, - granted_by=granted_by, - is_primary=is_primary - ) - -def get_user_locations(user_id: int) -> List[Location]: - """ - Holt alle Standorte eines Benutzers (globale Funktion). - - Args: - user_id: ID des Benutzers - - Returns: - List[Location]: Liste der zugänglichen Standorte - """ - return location_manager.get_user_locations(user_id) - -def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: - """ - Berechnet die Entfernung zwischen zwei Koordinaten (Haversine-Formel). - - Args: - lat1, lon1: Koordinaten des ersten Punkts - lat2, lon2: Koordinaten des zweiten Punkts - - Returns: - float: Entfernung in Kilometern - """ - from math import radians, sin, cos, sqrt, atan2 - - R = 6371 # Erdradius in km - - lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) - dlat = lat2 - lat1 - dlon = lon2 - lon1 - - a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 - c = 2 * atan2(sqrt(a), sqrt(1-a)) - - return R * c - -def find_nearest_location(latitude: float, longitude: float, - radius_km: float = 50) -> Optional[Location]: - """ - Findet den nächstgelegenen Standort. - - Args: - latitude: Breitengrad - longitude: Längengrad - radius_km: Suchradius in Kilometern - - Returns: - Optional[Location]: Nächstgelegener Standort oder None - """ - nearest_locations = location_manager.find_nearest_locations( - latitude=latitude, - longitude=longitude, - radius_km=radius_km, - limit=1 - ) - - return nearest_locations[0][0] if nearest_locations else None - -def get_location_dashboard_data(user_id: int) -> Dict[str, Any]: - """Holt Dashboard-Daten für Standorte eines Benutzers""" - user_locations = location_manager.get_user_locations(user_id) - primary_location = location_manager.get_user_primary_location(user_id) - - dashboard_data = { - 'user_locations': [asdict(loc) for loc in user_locations], - 'primary_location': asdict(primary_location) if primary_location else None, - 'location_count': len(user_locations), - 'hierarchy': location_manager.get_location_hierarchy() - } - - # Füge Statistiken für jeden Standort hinzu - for location in user_locations: - location_stats = location_manager.get_location_statistics(location.id) - dashboard_data[f'stats_{location.id}'] = location_stats - - return dashboard_data - -def create_location_from_address(name: str, address: str, city: str, - country: str, location_type: LocationType = LocationType.BRANCH) -> int: - """Erstellt Standort aus Adresse mit automatischer Geocodierung""" - location = Location( - name=name, - code=name[:3].upper(), - location_type=location_type, - address=address, - city=city, - country=country - ) - - return location_manager.create_location(location) - -# JavaScript für Multi-Location Frontend -def get_multi_location_javascript() -> str: - """JavaScript für Multi-Location Management""" - return """ - class MultiLocationManager { - constructor() { - this.currentLocation = null; - this.userLocations = []; - this.locationHierarchy = {}; - - this.init(); - } - - init() { - this.loadUserLocations(); - this.setupEventListeners(); - } - - setupEventListeners() { - // Location switcher - document.addEventListener('change', (e) => { - if (e.target.matches('.location-selector')) { - const locationId = parseInt(e.target.value); - this.switchLocation(locationId); - } - }); - - // Location management buttons - document.addEventListener('click', (e) => { - if (e.target.matches('.manage-locations-btn')) { - this.showLocationManager(); - } - - if (e.target.matches('.location-hierarchy-btn')) { - this.showLocationHierarchy(); - } - }); - } - - async loadUserLocations() { - try { - const response = await fetch('/api/locations/user'); - const data = await response.json(); - - if (data.success) { - this.userLocations = data.locations; - this.currentLocation = data.primary_location; - this.locationHierarchy = data.hierarchy; - - this.updateLocationSelector(); - this.updateLocationDisplay(); - } - } catch (error) { - console.error('Fehler beim Laden der Standorte:', error); - } - } - - updateLocationSelector() { - const selectors = document.querySelectorAll('.location-selector'); - - selectors.forEach(selector => { - selector.innerHTML = this.userLocations.map(location => - `` - ).join(''); - }); - } - - updateLocationDisplay() { - const displays = document.querySelectorAll('.current-location-display'); - - displays.forEach(display => { - if (this.currentLocation) { - display.innerHTML = ` -
- ${this.currentLocation.name} - ${this.currentLocation.type} - ${this.currentLocation.city ? `${this.currentLocation.city}` : ''} -
- `; - } else { - display.innerHTML = 'Kein Standort ausgewählt'; - } - }); - } - - async switchLocation(locationId) { - try { - const response = await fetch('/api/locations/switch', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ location_id: locationId }) - }); - - const result = await response.json(); - - if (result.success) { - this.currentLocation = this.userLocations.find(loc => loc.id === locationId); - this.updateLocationDisplay(); - - // Seite neu laden um location-spezifische Daten zu aktualisieren - window.location.reload(); - } else { - this.showNotification('Fehler beim Wechseln des Standorts', 'error'); - } - } catch (error) { - console.error('Standort-Wechsel fehlgeschlagen:', error); - } - } - - showLocationManager() { - const modal = document.createElement('div'); - modal.className = 'location-manager-modal'; - modal.innerHTML = ` - - `; - - document.body.appendChild(modal); - - // Event handlers - modal.querySelector('.close-modal').onclick = () => modal.remove(); - modal.onclick = (e) => { - if (e.target === modal) modal.remove(); - }; - } - - renderLocationList() { - return this.userLocations.map(location => ` -
-
-

${location.name} (${location.code})

-

Typ: ${location.type}

-

Adresse: ${location.address || 'Nicht angegeben'}

-

Stadt: ${location.city || 'Nicht angegeben'}

-
-
- - -
-
- `).join(''); - } - - showLocationHierarchy() { - const modal = document.createElement('div'); - modal.className = 'hierarchy-modal'; - modal.innerHTML = ` - - `; - - document.body.appendChild(modal); - - modal.querySelector('.close-modal').onclick = () => modal.remove(); - modal.onclick = (e) => { - if (e.target === modal) modal.remove(); - }; - } - - renderHierarchyTree(locations, level = 0) { - return locations.map(location => ` -
-
- ${this.getLocationTypeIcon(location.type)} - ${location.name} - (${location.code}) - ${location.resource_count.printers || 0} Drucker -
- ${location.children && location.children.length > 0 ? - this.renderHierarchyTree(location.children, level + 1) : ''} -
- `).join(''); - } - - getLocationTypeIcon(type) { - const icons = { - 'headquarters': '🏢', - 'branch': '🏪', - 'department': '🏬', - 'floor': '🏢', - 'room': '🚪', - 'area': '📍' - }; - return icons[type] || '📍'; - } - - showNotification(message, type = 'info') { - const notification = document.createElement('div'); - notification.className = `notification notification-${type}`; - notification.textContent = message; - - document.body.appendChild(notification); - - setTimeout(() => { - notification.remove(); - }, 3000); - } - } - - // Initialize when DOM is ready - document.addEventListener('DOMContentLoaded', function() { - window.multiLocationManager = new MultiLocationManager(); - }); - """ \ No newline at end of file diff --git a/backend/utils/offline_config.py b/backend/utils/offline_config.py deleted file mode 100644 index 2866f0c93..000000000 --- a/backend/utils/offline_config.py +++ /dev/null @@ -1,229 +0,0 @@ -""" -Offline-Konfiguration für MYP-System -=================================== - -Konfiguriert das System für den Offline-Betrieb ohne Internetverbindung. -Stellt Fallback-Lösungen für internetabhängige Funktionen bereit. -""" - -import os -import logging -from typing import Dict, List, Optional - -from utils.logging_config import get_logger - -logger = get_logger("offline_config") - -# ===== OFFLINE-MODUS KONFIGURATION ===== -OFFLINE_MODE = True # Produktionseinstellung - System läuft offline - -# ===== OFFLINE-KOMPATIBILITÄT PRÜFUNGEN ===== - -def check_internet_connectivity() -> bool: - """ - Prüft ob eine Internetverbindung verfügbar ist. - Im Offline-Modus gibt immer False zurück. - - Returns: - bool: True wenn Internet verfügbar, False im Offline-Modus - """ - if OFFLINE_MODE: - return False - - # In einem echten Online-Modus könnte hier eine echte Prüfung stehen - try: - import socket - socket.create_connection(("8.8.8.8", 53), timeout=3) - return True - except OSError: - return False - -def is_oauth_available() -> bool: - """ - Prüft ob OAuth-Funktionalität verfügbar ist. - - Returns: - bool: False im Offline-Modus - """ - return not OFFLINE_MODE and check_internet_connectivity() - -def is_email_sending_available() -> bool: - """ - Prüft ob E-Mail-Versand verfügbar ist. - - Returns: - bool: False im Offline-Modus (nur Logging) - """ - return not OFFLINE_MODE and check_internet_connectivity() - -def is_cdn_available() -> bool: - """ - Prüft ob CDN-Links verfügbar sind. - - Returns: - bool: False im Offline-Modus (lokale Fallbacks verwenden) - """ - return not OFFLINE_MODE and check_internet_connectivity() - -# ===== CDN FALLBACK-KONFIGURATION ===== - -CDN_FALLBACKS = { - # Chart.js CDN -> Lokale Datei - "https://cdnjs.cloudflare.com/ajax/libs/Chart.js/4.4.0/chart.min.js": "/static/js/charts/chart.min.js", - "https://cdn.jsdelivr.net/npm/chart.js": "/static/js/charts/chart.min.js", - - # FontAwesome (bereits lokal verfügbar) - "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css": "/static/fontawesome/css/all.min.css", - - # Weitere CDN-Fallbacks können hier hinzugefügt werden -} - -def get_local_asset_path(cdn_url: str) -> Optional[str]: - """ - Gibt den lokalen Pfad für eine CDN-URL zurück. - - Args: - cdn_url: URL des CDN-Assets - - Returns: - str: Lokaler Pfad oder None wenn kein Fallback verfügbar - """ - return CDN_FALLBACKS.get(cdn_url) - -def replace_cdn_links(html_content: str) -> str: - """ - Ersetzt CDN-Links durch lokale Fallbacks im HTML-Inhalt. - - Args: - html_content: HTML-Inhalt mit CDN-Links - - Returns: - str: HTML-Inhalt mit lokalen Links - """ - if not OFFLINE_MODE: - return html_content - - modified_content = html_content - - for cdn_url, local_path in CDN_FALLBACKS.items(): - if cdn_url in modified_content: - modified_content = modified_content.replace(cdn_url, local_path) - logger.info(f"🔄 CDN-Link ersetzt: {cdn_url} -> {local_path}") - - return modified_content - -# ===== SECURITY POLICY ANPASSUNGEN ===== - -def get_offline_csp_policy() -> Dict[str, List[str]]: - """ - Gibt CSP-Policy für Offline-Modus zurück. - Entfernt externe CDN-Domains aus der Policy. - - Returns: - Dict: CSP-Policy ohne externe Domains - """ - if not OFFLINE_MODE: - # Online-Modus: Originale Policy mit CDNs - return { - "script-src": [ - "'self'", - "'unsafe-inline'", - "'unsafe-eval'", - "https://cdn.jsdelivr.net", - "https://unpkg.com", - "https://cdnjs.cloudflare.com" - ], - "style-src": [ - "'self'", - "'unsafe-inline'", - "https://fonts.googleapis.com", - "https://cdn.jsdelivr.net" - ], - "font-src": [ - "'self'", - "https://fonts.gstatic.com" - ] - } - else: - # Offline-Modus: Nur lokale Ressourcen - return { - "script-src": [ - "'self'", - "'unsafe-inline'", - "'unsafe-eval'" - ], - "style-src": [ - "'self'", - "'unsafe-inline'" - ], - "font-src": [ - "'self'" - ], - "img-src": [ - "'self'", - "data:" - ] - } - -# ===== OFFLINE-MODUS HILFSFUNKTIONEN ===== - -def log_offline_mode_status(): - """Loggt den aktuellen Offline-Modus Status.""" - if OFFLINE_MODE: - logger.info("🌐 System läuft im OFFLINE-MODUS") - logger.info(" ❌ OAuth deaktiviert") - logger.info(" ❌ E-Mail-Versand deaktiviert (nur Logging)") - logger.info(" ❌ CDN-Links werden durch lokale Dateien ersetzt") - logger.info(" ✅ Alle Kernfunktionen verfügbar") - else: - logger.info("🌐 System läuft im ONLINE-MODUS") - logger.info(" ✅ OAuth verfügbar") - logger.info(" ✅ E-Mail-Versand verfügbar") - logger.info(" ✅ CDN-Links verfügbar") - -def get_feature_availability() -> Dict[str, bool]: - """ - Gibt die Verfügbarkeit verschiedener Features zurück. - - Returns: - Dict: Feature-Verfügbarkeit - """ - return { - "oauth": is_oauth_available(), - "email_sending": is_email_sending_available(), - "cdn_resources": is_cdn_available(), - "offline_mode": OFFLINE_MODE, - "core_functionality": True, # Kernfunktionen immer verfügbar - "printer_control": True, # Drucker-Steuerung immer verfügbar - "job_management": True, # Job-Verwaltung immer verfügbar - "user_management": True # Benutzer-Verwaltung immer verfügbar - } - -# ===== STARTUP-FUNKTIONEN ===== - -def initialize_offline_mode(): - """Initialisiert den Offline-Modus beim System-Start.""" - log_offline_mode_status() - - if OFFLINE_MODE: - logger.info("🔧 Initialisiere Offline-Modus-Anpassungen...") - - # Prüfe ob lokale Chart.js verfügbar ist - chart_js_path = "static/js/charts/chart.min.js" - if not os.path.exists(chart_js_path): - logger.warning(f"⚠️ Lokale Chart.js nicht gefunden: {chart_js_path}") - logger.warning(" Diagramme könnten nicht funktionieren") - else: - logger.info(f"✅ Lokale Chart.js gefunden: {chart_js_path}") - - # Prüfe weitere lokale Assets - fontawesome_path = "static/fontawesome/css/all.min.css" - if not os.path.exists(fontawesome_path): - logger.warning(f"⚠️ Lokale FontAwesome nicht gefunden: {fontawesome_path}") - else: - logger.info(f"✅ Lokale FontAwesome gefunden: {fontawesome_path}") - - logger.info("✅ Offline-Modus erfolgreich initialisiert") - -# Beim Import automatisch initialisieren -initialize_offline_mode() \ No newline at end of file diff --git a/backend/utils/test_button_functionality.py b/backend/utils/test_button_functionality.py deleted file mode 100644 index 4b57d6750..000000000 --- a/backend/utils/test_button_functionality.py +++ /dev/null @@ -1,243 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Test-Skript für Button-Funktionalitäten -Testet alle Buttons aus dem Selenium-Test auf echte Funktionalität -""" - -import requests -import time -import json -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC - -class ButtonFunctionalityTester: - def __init__(self, base_url="http://127.0.0.1:5000"): - self.base_url = base_url - self.session = requests.Session() - self.driver = None - - def setup_driver(self): - """Selenium WebDriver einrichten""" - try: - self.driver = webdriver.Chrome() - self.driver.set_window_size(1696, 1066) - print("✅ WebDriver erfolgreich initialisiert") - except Exception as e: - print(f"❌ Fehler beim Initialisieren des WebDrivers: {e}") - - def login(self, username="admin", password="admin"): - """Anmeldung durchführen""" - try: - self.driver.get(f"{self.base_url}/auth/login") - - # Warten bis Login-Formular geladen ist - username_field = WebDriverWait(self.driver, 10).until( - EC.presence_of_element_located((By.NAME, "username")) - ) - - username_field.send_keys(username) - self.driver.find_element(By.NAME, "password").send_keys(password) - self.driver.find_element(By.XPATH, "//button[@type='submit']").click() - - # Warten bis Dashboard geladen ist - WebDriverWait(self.driver, 10).until( - EC.url_contains("/dashboard") - ) - - print("✅ Erfolgreich angemeldet") - return True - except Exception as e: - print(f"❌ Fehler bei der Anmeldung: {e}") - return False - - def test_button_functionality(self, button_selector, button_name, expected_action=""): - """Teste einen einzelnen Button auf Funktionalität""" - try: - print(f"\n🔍 Teste Button: {button_name} ({button_selector})") - - # Button finden - button = WebDriverWait(self.driver, 5).until( - EC.element_to_be_clickable((By.CSS_SELECTOR, button_selector)) - ) - - # Ursprünglichen Zustand erfassen - original_url = self.driver.current_url - original_text = button.text if button.text else "Kein Text" - - print(f" 📍 Button gefunden: '{original_text}'") - - # Button klicken - button.click() - print(f" 👆 Button geklickt") - - # Kurz warten für Reaktion - time.sleep(1) - - # Reaktion prüfen - reactions = [] - - # URL-Änderung prüfen - if self.driver.current_url != original_url: - reactions.append(f"URL-Änderung: {self.driver.current_url}") - - # Modal-Fenster prüfen - try: - modal = self.driver.find_element(By.CSS_SELECTOR, ".fixed.inset-0") - if modal.is_displayed(): - reactions.append("Modal-Fenster geöffnet") - except: - pass - - # Loading-Spinner prüfen - try: - spinner = self.driver.find_element(By.CSS_SELECTOR, ".animate-spin") - if spinner.is_displayed(): - reactions.append("Loading-Animation aktiv") - except: - pass - - # Toast-Benachrichtigung prüfen - try: - toast = self.driver.find_element(By.CSS_SELECTOR, ".fixed.top-4.right-4") - if toast.is_displayed(): - reactions.append(f"Toast-Nachricht: {toast.text}") - except: - pass - - # Button-Text-Änderung prüfen - new_text = button.text if button.text else "Kein Text" - if new_text != original_text: - reactions.append(f"Text-Änderung: '{original_text}' → '{new_text}'") - - # Ergebnis ausgeben - if reactions: - print(f" ✅ Reaktionen gefunden:") - for reaction in reactions: - print(f" - {reaction}") - return True - else: - print(f" ⚠️ Keine sichtbare Reaktion erkannt") - return False - - except Exception as e: - print(f" ❌ Fehler beim Testen: {e}") - return False - - def test_all_buttons(self): - """Teste alle Buttons aus dem Selenium-Test""" - if not self.setup_driver(): - return - - if not self.login(): - return - - # Button-Test-Plan basierend auf Selenium-Test - button_tests = [ - # Dashboard-Seite (Startseite) - { - "page": f"{self.base_url}/", - "buttons": [ - (".mb-8 > .btn-primary", "Haupt-CTA Button"), - (".btn-primary > span", "CTA Button Span") - ] - }, - - # Dashboard-Seite - { - "page": f"{self.base_url}/dashboard", - "buttons": [ - ("#refreshDashboard > span", "Dashboard Aktualisieren") - ] - }, - - # Drucker-Seite - { - "page": f"{self.base_url}/printers", - "buttons": [ - ("#refresh-button > span", "Drucker Aktualisieren"), - ("#maintenance-toggle > span", "Wartungsmodus Toggle") - ] - }, - - # Jobs-Seite - { - "page": f"{self.base_url}/jobs", - "buttons": [ - ("#batch-toggle > span", "Mehrfachauswahl Toggle"), - ("#refresh-button > span", "Jobs Aktualisieren") - ] - }, - - # Admin-Seite - { - "page": f"{self.base_url}/admin", - "buttons": [ - ("#analytics-btn", "Analytics Button"), - ("#maintenance-btn", "Wartung Button"), - ("#system-status-btn", "System Status Button"), - ("#add-user-btn", "Benutzer hinzufügen") - ] - } - ] - - results = {"total": 0, "working": 0, "broken": 0} - - print("🚀 Starte umfassenden Button-Funktionalitäts-Test...\n") - - for test_group in button_tests: - print(f"📄 Navigiere zu Seite: {test_group['page']}") - - try: - self.driver.get(test_group["page"]) - time.sleep(2) # Seite laden lassen - - for selector, name in test_group["buttons"]: - results["total"] += 1 - if self.test_button_functionality(selector, name): - results["working"] += 1 - else: - results["broken"] += 1 - - except Exception as e: - print(f"❌ Fehler beim Laden der Seite {test_group['page']}: {e}") - - # Zusammenfassung - print(f"\n{'='*60}") - print(f"📊 TEST-ZUSAMMENFASSUNG") - print(f"{'='*60}") - print(f"Getestete Buttons gesamt: {results['total']}") - print(f"✅ Funktional: {results['working']}") - print(f"❌ Nicht funktional: {results['broken']}") - - success_rate = (results['working'] / results['total']) * 100 if results['total'] > 0 else 0 - print(f"📈 Erfolgsrate: {success_rate:.1f}%") - - if success_rate >= 90: - print("🎉 AUSGEZEICHNET! Fast alle Buttons funktionieren korrekt.") - elif success_rate >= 75: - print("✅ GUT! Die meisten Buttons funktionieren korrekt.") - elif success_rate >= 50: - print("⚠️ BEFRIEDIGEND! Einige Buttons benötigen noch Verbesserungen.") - else: - print("❌ VERBESSERUNG ERFORDERLICH! Viele Buttons haben keine Funktionalität.") - - def cleanup(self): - """Aufräumen""" - if self.driver: - self.driver.quit() - print("🧹 WebDriver beendet") - -def main(): - """Hauptfunktion""" - tester = ButtonFunctionalityTester() - - try: - tester.test_all_buttons() - finally: - tester.cleanup() - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/backend/utils/test_p110.py b/backend/utils/test_p110.py deleted file mode 100644 index 06fc27980..000000000 --- a/backend/utils/test_p110.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -P110-TAPO-TEST - Speziell für TP-Link Tapo P110-Steckdosen -Testet verschiedene Versionen des PyP100-Moduls -""" - -import sys -import time -import socket -import subprocess -from datetime import datetime - -# Anmeldedaten -TAPO_USERNAME = "till.tomczak@mercedes-benz.com" -TAPO_PASSWORD = "744563017196" - -# Standard-IP-Adressen zum Testen (anpassen an tatsächliche IPs) -TEST_IPS = [ - "192.168.0.103", # Diese IPs waren erreichbar im vorherigen Test - "192.168.0.104" -] - -def log(message): - """Logge eine Nachricht mit Zeitstempel""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}") - -def check_connection(ip, port=80, timeout=1): - """Prüft eine TCP-Verbindung""" - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(timeout) - result = sock.connect_ex((ip, port)) - sock.close() - return result == 0 - except: - return False - -def install_package(package): - """Installiert ein Python-Paket""" - try: - log(f"Installiere {package}...") - subprocess.run([sys.executable, "-m", "pip", "install", package, "--force-reinstall"], check=True) - log(f"✅ {package} erfolgreich installiert") - return True - except Exception as e: - log(f"❌ Fehler bei Installation von {package}: {e}") - return False - -def test_p110_connection(): - """Testet verschiedene Möglichkeiten, um mit P110-Steckdosen zu kommunizieren""" - - log("🚀 TAPO P110 TEST - STARTER") - log(f"👤 Benutzername: {TAPO_USERNAME}") - log(f"🔑 Passwort: {TAPO_PASSWORD}") - - # Verfügbare Module testen - log("\n1️⃣ SCHRITT 1: Teste verfügbare Module") - - try: - from PyP100 import PyP110 - log("✅ PyP100 Modul gefunden") - except ImportError: - log("❌ PyP100 Modul nicht gefunden") - install_package("PyP100==0.1.2") - - try: - from PyP100 import PyP110 - log("✅ PyP100 Modul jetzt installiert") - except ImportError: - log("❌ Konnte PyP100 nicht importieren") - return - - # Erreichbare Steckdosen finden - log("\n2️⃣ SCHRITT 2: Suche erreichbare IPs") - - available_ips = [] - for ip in TEST_IPS: - if check_connection(ip): - log(f"✅ IP {ip} ist erreichbar") - available_ips.append(ip) - else: - log(f"❌ IP {ip} nicht erreichbar") - - if not available_ips: - log("❌ Keine erreichbaren IPs gefunden!") - return - - # P110-Verbindung testen - log("\n3️⃣ SCHRITT 3: Teste PyP100 Bibliothek") - - for ip in available_ips: - try: - log(f"🔄 Verbinde zu Steckdose {ip} mit PyP100.PyP110...") - - # Neue Instanz erstellen - from PyP100 import PyP110 - p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD) - - # Handshake und Login - log(" Handshake...") - p110.handshake() - log(" Login...") - p110.login() - - # Geräteinformationen abrufen - log(" Geräteinformationen abrufen...") - device_info = p110.getDeviceInfo() - - # Erfolg! - log(f"✅ ERFOLG! Steckdose {ip} gefunden") - log(f" Name: {device_info.get('nickname', 'Unbekannt')}") - log(f" Status: {'Eingeschaltet' if device_info.get('device_on', False) else 'Ausgeschaltet'}") - - # Ein-/Ausschalten testen - if "--toggle" in sys.argv: - current_state = device_info.get('device_on', False) - - if current_state: - log(" Schalte AUS...") - p110.turnOff() - else: - log(" Schalte EIN...") - p110.turnOn() - - time.sleep(1) - - # Status prüfen - device_info = p110.getDeviceInfo() - new_state = device_info.get('device_on', False) - log(f" Neuer Status: {'Eingeschaltet' if new_state else 'Ausgeschaltet'}") - - return True - - except Exception as e: - log(f"❌ Fehler bei Verbindung zu {ip}: {e}") - - # Alternative Bibliothek testen - log("\n4️⃣ SCHRITT 4: Teste PyP100 mit alternativer Version") - - if install_package("pytapo==1.1.2"): - try: - import pytapo - from pytapo.tapo import Tapo - - for ip in available_ips: - try: - log(f"🔄 Verbinde zu Steckdose {ip} mit pytapo...") - - # Neue Verbindung - tapo = Tapo(ip, TAPO_USERNAME, TAPO_PASSWORD) - - # Geräteinformationen abrufen - device_info = tapo.get_device_info() - - # Erfolg! - log(f"✅ ERFOLG mit pytapo! Steckdose {ip} gefunden") - log(f" Name: {device_info.get('nickname', 'Unbekannt')}") - - return True - - except Exception as e: - log(f"❌ Fehler bei pytapo-Verbindung zu {ip}: {e}") - except Exception as e: - log(f"❌ Fehler beim Import von pytapo: {e}") - - log("\n❌ Keine funktionierenden Tapo-Steckdosen gefunden!") - log("Bitte überprüfen Sie die Anmeldedaten und IP-Adressen") - -if __name__ == "__main__": - print("\n======= TAPO P110 TEST =======\n") - test_p110_connection() - print("\n======= TEST BEENDET =======\n") \ No newline at end of file diff --git a/backend/utils/test_tapo_direkt.py b/backend/utils/test_tapo_direkt.py deleted file mode 100644 index 2aadf1242..000000000 --- a/backend/utils/test_tapo_direkt.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -DIREKT-TEST für TP-Link Tapo P110-Steckdosen -Umgeht Ping-Befehle und testet direkte TCP-Verbindung -""" - -import sys -import os -import socket -import time -from datetime import datetime - -# Anmeldedaten für Tapo-Steckdosen -TAPO_USERNAME = "till.tomczak@mercedes-benz.com" -TAPO_PASSWORD = "744563017196A" - -# Standard-IPs für Tapo-Steckdosen -# (falls nicht verfügbar, passen Sie diese an die tatsächlichen IPs in Ihrem Netzwerk an) -TAPO_IPS = [ - # Typische IP-Bereiche - "192.168.1.100", - "192.168.1.101", - "192.168.1.102", - "192.168.1.103", - "192.168.1.104", - "192.168.1.105", - "192.168.0.100", - "192.168.0.101", - "192.168.0.102", - "192.168.0.103", - "192.168.0.104", - "192.168.0.105", - - # Mercedes-Benz Netzwerk spezifisch - "10.0.0.100", - "10.0.0.101", - "10.0.0.102", - "10.0.0.103", - "10.0.0.104", - "10.0.0.105", - - # Zusätzliche mögliche IPs - "192.168.178.100", - "192.168.178.101", - "192.168.178.102", - "192.168.178.103", - "192.168.178.104", - "192.168.178.105", -] - -def log_message(message, level="INFO"): - """Logge eine Nachricht mit Zeitstempel""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] [{level}] {message}") - -def check_tcp_connection(host, port=80, timeout=1): - """ - Prüft ob eine TCP-Verbindung zu einem Host und Port möglich ist. - Vermeidet Ping und charmap-Probleme. - - Args: - host: Hostname oder IP-Adresse - port: TCP-Port (Standard: 80) - timeout: Timeout in Sekunden - - Returns: - bool: True wenn Verbindung erfolgreich - """ - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(timeout) - result = sock.connect_ex((host, port)) - sock.close() - return result == 0 - except: - return False - -def test_tapo_connection(): - """ - Testet die Verbindung zu TP-Link Tapo P110-Steckdosen. - """ - log_message("🔄 Überprüfe ob PyP100-Modul installiert ist...") - - try: - from PyP100 import PyP110 - log_message("✅ PyP100-Modul erfolgreich importiert") - except ImportError: - log_message("❌ PyP100-Modul nicht installiert", "ERROR") - log_message(" Installiere jetzt mit: pip install PyP100==0.1.2") - - try: - import subprocess - subprocess.run([sys.executable, "-m", "pip", "install", "PyP100==0.1.2"], check=True) - log_message("✅ PyP100-Modul erfolgreich installiert") - - # Erneut importieren - from PyP100 import PyP110 - except Exception as e: - log_message(f"❌ Fehler bei Installation von PyP100: {str(e)}", "ERROR") - log_message(" Bitte installieren Sie manuell: pip install PyP100==0.1.2") - return - - log_message("🔍 Starte Test für Tapo-Steckdosen...") - log_message(f"🔐 Anmeldedaten: {TAPO_USERNAME} / {TAPO_PASSWORD}") - - successful_connections = 0 - found_ips = [] - - for ip in TAPO_IPS: - log_message(f"🔄 Teste IP-Adresse: {ip}") - - # TCP-Verbindungstest für Grundkonnektivität (Port 80 für HTTP) - conn_success = check_tcp_connection(ip, port=80) - - if not conn_success: - # Alternativ Port 443 testen für HTTPS - conn_success = check_tcp_connection(ip, port=443) - - if not conn_success: - log_message(f" ❌ IP {ip} nicht erreichbar (Verbindung fehlgeschlagen)") - continue - - log_message(f" ✅ IP {ip} ist erreichbar (TCP-Verbindung erfolgreich)") - - # Tapo-Verbindung testen - try: - log_message(f" 🔄 Verbinde zu Tapo-Steckdose {ip}...") - p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD) - p110.handshake() # Authentifizierung - p110.login() # Login - - # Geräteinformationen abrufen - device_info = p110.getDeviceInfo() - - # Status abrufen - is_on = device_info.get('device_on', False) - nickname = device_info.get('nickname', "Unbekannt") - - log_message(f" ✅ Verbindung zu Tapo-Steckdose '{nickname}' ({ip}) erfolgreich") - log_message(f" 📱 Gerätename: {nickname}") - log_message(f" ⚡ Status: {'Eingeschaltet' if is_on else 'Ausgeschaltet'}") - - if 'on_time' in device_info: - on_time = device_info.get('on_time', 0) - hours, minutes = divmod(on_time // 60, 60) - log_message(f" ⏱️ Betriebszeit: {hours}h {minutes}m") - - successful_connections += 1 - found_ips.append(ip) - - # Steckdose testen: EIN/AUS - if len(sys.argv) > 1 and sys.argv[1] == '--toggle': - if is_on: - log_message(f" 🔄 Schalte Steckdose {nickname} AUS...") - p110.turnOff() - log_message(f" ✅ Steckdose ausgeschaltet") - else: - log_message(f" 🔄 Schalte Steckdose {nickname} EIN...") - p110.turnOn() - log_message(f" ✅ Steckdose eingeschaltet") - - # Kurze Pause - time.sleep(1) - - # Status erneut abrufen - device_info = p110.getDeviceInfo() - is_on = device_info.get('device_on', False) - log_message(f" ⚡ Neuer Status: {'Eingeschaltet' if is_on else 'Ausgeschaltet'}") - - except Exception as e: - log_message(f" ❌ Verbindung zu Tapo-Steckdose {ip} fehlgeschlagen: {str(e)}", "ERROR") - - # Zusammenfassung - log_message("\n📊 Zusammenfassung:") - log_message(f" Getestete IPs: {len(TAPO_IPS)}") - log_message(f" Gefundene Tapo-Steckdosen: {successful_connections}") - - if successful_connections > 0: - log_message("✅ Tapo-Steckdosen erfolgreich gefunden und getestet!") - log_message(f"📝 Gefundene IPs: {found_ips}") - - # Ausgabe für Konfiguration - log_message("\n🔧 Konfiguration für settings.py:") - log_message(f""" -# TP-Link Tapo Standard-Anmeldedaten -TAPO_USERNAME = "{TAPO_USERNAME}" -TAPO_PASSWORD = "{TAPO_PASSWORD}" - -# Automatische Steckdosen-Erkennung aktivieren -TAPO_AUTO_DISCOVERY = True - -# Standard-Steckdosen-IPs -DEFAULT_TAPO_IPS = {found_ips} -""") - else: - log_message("❌ Keine Tapo-Steckdosen gefunden!", "ERROR") - log_message(" Bitte überprüfen Sie die IP-Adressen und Anmeldedaten") - - # Fehlerbehebungs-Tipps - log_message("\n🔧 Fehlerbehebungs-Tipps:") - log_message("1. Stellen Sie sicher, dass die Steckdosen mit dem WLAN verbunden sind") - log_message("2. Prüfen Sie die IP-Adressen in der Tapo-App oder im Router") - log_message("3. Stellen Sie sicher, dass die Anmeldedaten korrekt sind") - log_message("4. Prüfen Sie ob die Steckdosen über die Tapo-App erreichbar sind") - log_message("5. Führen Sie einen Neustart der Steckdosen durch (aus- und wieder einstecken)") - -if __name__ == "__main__": - print("\n====== TAPO P110 DIREKT-TEST (OHNE PING) ======\n") - test_tapo_connection() - print("\n====== TEST ABGESCHLOSSEN ======\n") \ No newline at end of file diff --git a/backend/utils/test_tapo_sofort.py b/backend/utils/test_tapo_sofort.py deleted file mode 100644 index c8030a457..000000000 --- a/backend/utils/test_tapo_sofort.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -SOFORT-TEST für TP-Link Tapo P110-Steckdosen -Nutzt direkt PyP100 mit hardkodierten Anmeldedaten -""" - -import os -import sys -import time -from datetime import datetime - -# TAPO Anmeldedaten direkt hardkodiert (wie in den funktionierenden Versionen) -os.environ["TAPO_USERNAME"] = "till.tomczak@mercedes-benz.com" -os.environ["TAPO_PASSWORD"] = "744563017196A" # Das 'A' am Ende ist wichtig - -# IPs der Steckdosen -TAPO_IPS = [ - "192.168.0.103", - "192.168.0.104", - "192.168.0.100", - "192.168.0.101", - "192.168.0.102" -] - -def log(msg): - """Protokolliert eine Nachricht mit Zeitstempel""" - timestamp = datetime.now().strftime("%H:%M:%S") - print(f"[{timestamp}] {msg}") - -def test_connection(): - """Teste Verbindung zu den Steckdosen""" - log("🔄 Teste PyP100-Import...") - - try: - from PyP100 import PyP100 - log("✅ PyP100-Modul erfolgreich importiert") - except ImportError: - log("❌ PyP100-Modul nicht gefunden. Installiere es...") - try: - import subprocess - subprocess.run([sys.executable, "-m", "pip", "install", "PyP100==0.0.12"], check=True) - from PyP100 import PyP100 - log("✅ PyP100-Modul installiert") - except Exception as e: - log(f"❌ Fehler bei Installation: {str(e)}") - return False - - # Anmeldedaten aus Umgebungsvariablen lesen - username = os.environ.get("TAPO_USERNAME") - password = os.environ.get("TAPO_PASSWORD") - - log(f"👤 Benutzername: {username}") - log(f"🔑 Passwort: {password}") - - # Teste jede IP - success = False - - for ip in TAPO_IPS: - log(f"🔄 Teste Steckdose mit IP: {ip}") - - try: - # Wichtig: Verwende PyP100 (nicht PyP110) wie in den funktionierenden Versionen - p100 = PyP100.P100(ip, username, password) - - # Handshake und Login - log(" 🔄 Handshake...") - p100.handshake() - - log(" 🔄 Login...") - p100.login() - - # Status abfragen - log(" 🔄 Status abfragen...") - device_info = p100.getDeviceInfo() - - # Erfolg! - state = "Eingeschaltet" if device_info.get("device_on", False) else "Ausgeschaltet" - log(f"✅ ERFOLG! Steckdose {ip} erfolgreich verbunden") - log(f" 📱 Name: {device_info.get('nickname', 'Unbekannt')}") - log(f" ⚡ Status: {state}") - - # Steckdose ein-/ausschalten wenn gewünscht - if "--toggle" in sys.argv: - if device_info.get("device_on", False): - log(" 🔄 Schalte Steckdose AUS...") - p100.turnOff() - else: - log(" 🔄 Schalte Steckdose EIN...") - p100.turnOn() - - time.sleep(1) - - # Neuen Status abrufen - device_info = p100.getDeviceInfo() - state = "Eingeschaltet" if device_info.get("device_on", False) else "Ausgeschaltet" - log(f" ⚡ Neuer Status: {state}") - - success = True - - # Konfiguration für settings.py ausgeben - log("\n✅ KONFIGURATION FÜR SETTINGS.PY:") - log(f""" -# TP-Link Tapo Standard-Anmeldedaten -TAPO_USERNAME = "{username}" -TAPO_PASSWORD = "{password}" - -# Standard-Steckdosen-IPs -DEFAULT_TAPO_IPS = ["{ip}"] -""") - - # Nur die erste erfolgreiche Steckdose testen - break - - except Exception as e: - log(f"❌ Fehler bei Steckdose {ip}: {str(e)}") - - if not success: - log("\n❌ Keine Tapo-Steckdose konnte verbunden werden!") - log("Prüfen Sie folgende mögliche Ursachen:") - log("1. Steckdosen sind nicht eingesteckt oder mit dem WLAN verbunden") - log("2. IP-Adressen sind falsch") - log("3. Anmeldedaten sind falsch (prüfen Sie das 'A' am Ende des Passworts)") - log("4. Netzwerkprobleme verhindern den Zugriff") - - return success - -if __name__ == "__main__": - print("\n====== TAPO P110 SOFORT-TEST ======\n") - test_connection() - print("\n====== TEST BEENDET ======\n") \ No newline at end of file diff --git a/backend/utils/update_requirements.py b/backend/utils/update_requirements.py deleted file mode 100644 index 84d50ec1d..000000000 --- a/backend/utils/update_requirements.py +++ /dev/null @@ -1,295 +0,0 @@ -#!/usr/bin/env python3 -""" -MYP Platform - Requirements Update Script -Aktualisiert die Requirements basierend auf tatsächlich verwendeten Imports -""" - -import os -import sys -import subprocess -import ast -import importlib.util -from pathlib import Path -from typing import Set, List, Dict - -def get_imports_from_file(file_path: Path) -> Set[str]: - """Extrahiert alle Import-Statements aus einer Python-Datei.""" - imports = set() - - try: - with open(file_path, 'r', encoding='utf-8') as f: - content = f.read() - - tree = ast.parse(content) - - for node in ast.walk(tree): - if isinstance(node, ast.Import): - for alias in node.names: - imports.add(alias.name.split('.')[0]) - elif isinstance(node, ast.ImportFrom): - if node.module: - imports.add(node.module.split('.')[0]) - - except Exception as e: - print(f"Fehler beim Parsen von {file_path}: {e}") - - return imports - -def get_all_imports(project_root: Path) -> Set[str]: - """Sammelt alle Imports aus dem Projekt.""" - all_imports = set() - - # Wichtige Dateien analysieren - important_files = [ - 'app.py', - 'models.py', - 'utils/rate_limiter.py', - 'utils/job_scheduler.py', - 'utils/queue_manager.py', - 'utils/ssl_manager.py', - 'utils/security.py', - 'utils/permissions.py', - 'utils/analytics.py', - 'utils/template_helpers.py', - 'utils/logging_config.py' - ] - - for file_path in important_files: - full_path = project_root / file_path - if full_path.exists(): - imports = get_imports_from_file(full_path) - all_imports.update(imports) - print(f"✓ Analysiert: {file_path} ({len(imports)} Imports)") - - return all_imports - -def get_package_mapping() -> Dict[str, str]: - """Mapping von Import-Namen zu PyPI-Paketnamen.""" - return { - 'flask': 'Flask', - 'flask_login': 'Flask-Login', - 'flask_wtf': 'Flask-WTF', - 'sqlalchemy': 'SQLAlchemy', - 'werkzeug': 'Werkzeug', - 'bcrypt': 'bcrypt', - 'cryptography': 'cryptography', - 'PyP100': 'PyP100', - 'redis': 'redis', - 'requests': 'requests', - 'jinja2': 'Jinja2', - 'markupsafe': 'MarkupSafe', - 'itsdangerous': 'itsdangerous', - 'psutil': 'psutil', - 'click': 'click', - 'blinker': 'blinker', - 'pywin32': 'pywin32', - 'pytest': 'pytest', - 'gunicorn': 'gunicorn' - } - -def get_current_versions() -> Dict[str, str]: - """Holt die aktuell installierten Versionen.""" - versions = {} - - try: - result = subprocess.run(['pip', 'list', '--format=freeze'], - capture_output=True, text=True, - encoding='utf-8', errors='replace') - - for line in result.stdout.strip().split('\n'): - if '==' in line: - package, version = line.split('==', 1) - versions[package.lower()] = version - - except Exception as e: - print(f"Fehler beim Abrufen der Versionen: {e}") - - return versions - -def check_package_availability(package: str, version: str = None) -> bool: - """Prüft, ob ein Paket in der angegebenen Version verfügbar ist.""" - try: - if version: - cmd = ['pip', 'index', 'versions', package] - else: - cmd = ['pip', 'show', package] - - result = subprocess.run(cmd, capture_output=True, text=True, - encoding='utf-8', errors='replace') - return result.returncode == 0 - - except Exception: - return False - -def generate_requirements(imports: Set[str], versions: Dict[str, str]) -> List[str]: - """Generiert die Requirements-Liste.""" - package_mapping = get_package_mapping() - requirements = [] - - # Standard-Bibliotheken, die nicht installiert werden müssen - stdlib_modules = { - 'os', 'sys', 'logging', 'atexit', 'datetime', 'time', 'subprocess', - 'json', 'signal', 'threading', 'functools', 'typing', 'contextlib', - 'secrets', 'hashlib', 'calendar', 'random', 'socket', 'ipaddress', - 'enum', 'dataclasses', 'concurrent', 'collections' - } - - # Lokale Module ausschließen - local_modules = { - 'models', 'utils', 'config', 'blueprints' - } - - # Nur externe Pakete berücksichtigen - external_imports = imports - stdlib_modules - local_modules - - for import_name in sorted(external_imports): - package_name = package_mapping.get(import_name, import_name) - - # Version aus installierten Paketen holen - version = versions.get(package_name.lower()) - - if version and check_package_availability(package_name, version): - if package_name == 'pywin32': - requirements.append(f"{package_name}=={version}; sys_platform == \"win32\"") - else: - requirements.append(f"{package_name}=={version}") - else: - print(f"⚠️ Paket {package_name} nicht gefunden oder Version unbekannt") - - return requirements - -def write_requirements_file(requirements: List[str], output_file: Path): - """Schreibt die Requirements in eine Datei.""" - header = """# MYP Platform - Python Dependencies -# Basierend auf tatsächlich verwendeten Imports in app.py -# Automatisch generiert am: {date} -# Installiere mit: pip install -r requirements.txt - -# ===== CORE FLASK FRAMEWORK ===== -# Direkt in app.py verwendet -{flask_requirements} - -# ===== DATENBANK ===== -# SQLAlchemy für Datenbankoperationen (models.py, app.py) -{db_requirements} - -# ===== SICHERHEIT UND AUTHENTIFIZIERUNG ===== -# Werkzeug für Passwort-Hashing und Utilities (app.py) -{security_requirements} - -# ===== SMART PLUG STEUERUNG ===== -# PyP100 für TP-Link Tapo Smart Plugs (utils/job_scheduler.py) -{smartplug_requirements} - -# ===== RATE LIMITING UND CACHING ===== -# Redis für Rate Limiting (utils/rate_limiter.py) - optional -{cache_requirements} - -# ===== HTTP REQUESTS ===== -# Requests für HTTP-Anfragen (utils/queue_manager.py, utils/debug_drucker_erkennung.py) -{http_requirements} - -# ===== TEMPLATE ENGINE ===== -# Jinja2 und MarkupSafe (automatisch mit Flask installiert, aber explizit für utils/template_helpers.py) -{template_requirements} - -# ===== SYSTEM MONITORING ===== -# psutil für System-Monitoring (utils/debug_utils.py, utils/debug_cli.py) -{monitoring_requirements} - -# ===== ZUSÄTZLICHE CORE ABHÄNGIGKEITEN ===== -# Click für CLI-Kommandos (automatisch mit Flask) -{core_requirements} - -# ===== WINDOWS-SPEZIFISCHE ABHÄNGIGKEITEN ===== -# Nur für Windows-Systeme erforderlich -{windows_requirements} - -# ===== OPTIONAL: ENTWICKLUNG UND TESTING ===== -# Nur für Entwicklungsumgebung -{dev_requirements} - -# ===== OPTIONAL: PRODUKTIONS-SERVER ===== -# Gunicorn für Produktionsumgebung -{prod_requirements} -""" - - # Requirements kategorisieren - flask_reqs = [r for r in requirements if any(x in r.lower() for x in ['flask'])] - db_reqs = [r for r in requirements if 'SQLAlchemy' in r] - security_reqs = [r for r in requirements if any(x in r for x in ['Werkzeug', 'bcrypt', 'cryptography'])] - smartplug_reqs = [r for r in requirements if 'PyP100' in r] - cache_reqs = [r for r in requirements if 'redis' in r] - http_reqs = [r for r in requirements if 'requests' in r] - template_reqs = [r for r in requirements if any(x in r for x in ['Jinja2', 'MarkupSafe', 'itsdangerous'])] - monitoring_reqs = [r for r in requirements if 'psutil' in r] - core_reqs = [r for r in requirements if any(x in r for x in ['click', 'blinker'])] - windows_reqs = [r for r in requirements if 'sys_platform' in r] - - from datetime import datetime - - content = header.format( - date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - flask_requirements='\n'.join(flask_reqs) or '# Keine Flask-spezifischen Requirements', - db_requirements='\n'.join(db_reqs) or '# Keine Datenbank-Requirements', - security_requirements='\n'.join(security_reqs) or '# Keine Sicherheits-Requirements', - smartplug_requirements='\n'.join(smartplug_reqs) or '# Keine Smart Plug Requirements', - cache_requirements='\n'.join(cache_reqs) or '# Keine Cache-Requirements', - http_requirements='\n'.join(http_reqs) or '# Keine HTTP-Requirements', - template_requirements='\n'.join(template_reqs) or '# Keine Template-Requirements', - monitoring_requirements='\n'.join(monitoring_reqs) or '# Keine Monitoring-Requirements', - core_requirements='\n'.join(core_reqs) or '# Keine Core-Requirements', - windows_requirements='\n'.join(windows_reqs) or '# Keine Windows-Requirements', - dev_requirements='pytest==8.3.4; extra == "dev"\npytest-cov==6.0.0; extra == "dev"', - prod_requirements='gunicorn==23.0.0; extra == "prod"' - ) - - with open(output_file, 'w', encoding='utf-8') as f: - f.write(content) - -def main(): - """Hauptfunktion.""" - print("🔄 MYP Platform Requirements Update") - print("=" * 50) - - # Projekt-Root ermitteln - project_root = Path(__file__).parent - - print(f"📁 Projekt-Verzeichnis: {project_root}") - - # Imports sammeln - print("\n📋 Sammle Imports aus wichtigen Dateien...") - imports = get_all_imports(project_root) - - print(f"\n📦 Gefundene externe Imports: {len(imports)}") - for imp in sorted(imports): - print(f" - {imp}") - - # Aktuelle Versionen abrufen - print("\n🔍 Prüfe installierte Versionen...") - versions = get_current_versions() - - # Requirements generieren - print("\n⚙️ Generiere Requirements...") - requirements = generate_requirements(imports, versions) - - # Requirements-Datei schreiben - output_file = project_root / 'requirements.txt' - write_requirements_file(requirements, output_file) - - print(f"\n✅ Requirements aktualisiert: {output_file}") - print(f"📊 {len(requirements)} Pakete in requirements.txt") - - # Zusammenfassung - print("\n📋 Generierte Requirements:") - for req in requirements: - print(f" - {req}") - - print("\n🎉 Requirements-Update abgeschlossen!") - print("\nNächste Schritte:") - print("1. pip install -r requirements.txt") - print("2. Anwendung testen") - print("3. requirements-dev.txt und requirements-prod.txt bei Bedarf anpassen") - -if __name__ == "__main__": - main() \ No newline at end of file