diff --git a/backend/utils/add_test_printers.py b/backend/utils/add_test_printers.py
deleted file mode 100644
index d7232e25b..000000000
--- a/backend/utils/add_test_printers.py
+++ /dev/null
@@ -1,178 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-"""
-Skript zum Hinzufügen von Testdruckern zur Datenbank
-"""
-
-import sys
-import os
-from datetime import datetime
-
-# Füge das Anwendungsverzeichnis zum Python-Pfad hinzu
-sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
-
-from models import get_db_session, Printer
-
-def add_test_printers():
- """Fügt Testdrucker zur Datenbank hinzu"""
-
- test_printers = [
- {
- "name": "Prusa i3 MK3S+",
- "model": "Prusa i3 MK3S+",
- "location": "Labor A - Arbeitsplatz 1",
- "mac_address": "AA:BB:CC:DD:EE:01",
- "plug_ip": "192.168.1.101",
- "status": "available",
- "active": True
- },
- {
- "name": "Ender 3 V2",
- "model": "Creality Ender 3 V2",
- "location": "Labor A - Arbeitsplatz 2",
- "mac_address": "AA:BB:CC:DD:EE:02",
- "plug_ip": "192.168.1.102",
- "status": "available",
- "active": True
- },
- {
- "name": "Ultimaker S3",
- "model": "Ultimaker S3",
- "location": "Labor B - Arbeitsplatz 1",
- "mac_address": "AA:BB:CC:DD:EE:03",
- "plug_ip": "192.168.1.103",
- "status": "available",
- "active": True
- },
- {
- "name": "Bambu Lab X1 Carbon",
- "model": "Bambu Lab X1 Carbon",
- "location": "Labor B - Arbeitsplatz 2",
- "mac_address": "AA:BB:CC:DD:EE:04",
- "plug_ip": "192.168.1.104",
- "status": "available",
- "active": True
- },
- {
- "name": "Formlabs Form 3",
- "model": "Formlabs Form 3",
- "location": "Labor C - Harz-Bereich",
- "mac_address": "AA:BB:CC:DD:EE:05",
- "plug_ip": "192.168.1.105",
- "status": "offline",
- "active": False
- }
- ]
-
- db_session = get_db_session()
-
- try:
- added_count = 0
-
- for printer_data in test_printers:
- # Prüfen, ob Drucker bereits existiert
- existing = db_session.query(Printer).filter(
- Printer.name == printer_data["name"]
- ).first()
-
- if existing:
- print(f"⚠️ Drucker '{printer_data['name']}' existiert bereits - überspringe")
- continue
-
- # Neuen Drucker erstellen
- new_printer = Printer(
- name=printer_data["name"],
- model=printer_data["model"],
- location=printer_data["location"],
- mac_address=printer_data["mac_address"],
- plug_ip=printer_data["plug_ip"],
- status=printer_data["status"],
- active=printer_data["active"],
- created_at=datetime.now()
- )
-
- db_session.add(new_printer)
- added_count += 1
- print(f"✅ Drucker '{printer_data['name']}' hinzugefügt")
-
- if added_count > 0:
- db_session.commit()
- print(f"\n🎉 {added_count} Testdrucker erfolgreich zur Datenbank hinzugefügt!")
- else:
- print("\n📋 Alle Testdrucker existieren bereits in der Datenbank")
-
- # Zeige alle Drucker in der Datenbank
- all_printers = db_session.query(Printer).all()
- print(f"\n📊 Gesamt {len(all_printers)} Drucker in der Datenbank:")
- print("-" * 80)
- print(f"{'ID':<4} {'Name':<20} {'Modell':<20} {'Status':<12} {'Aktiv':<6}")
- print("-" * 80)
-
- for printer in all_printers:
- active_str = "✅" if printer.active else "❌"
- print(f"{printer.id:<4} {printer.name[:19]:<20} {(printer.model or 'Unbekannt')[:19]:<20} {printer.status:<12} {active_str:<6}")
-
- db_session.close()
-
- except Exception as e:
- db_session.rollback()
- db_session.close()
- print(f"❌ Fehler beim Hinzufügen der Testdrucker: {str(e)}")
- return False
-
- return True
-
-def remove_test_printers():
- """Entfernt alle Testdrucker aus der Datenbank"""
-
- test_printer_names = [
- "Prusa i3 MK3S+",
- "Ender 3 V2",
- "Ultimaker S3",
- "Bambu Lab X1 Carbon",
- "Formlabs Form 3"
- ]
-
- db_session = get_db_session()
-
- try:
- removed_count = 0
-
- for name in test_printer_names:
- printer = db_session.query(Printer).filter(Printer.name == name).first()
- if printer:
- db_session.delete(printer)
- removed_count += 1
- print(f"🗑️ Drucker '{name}' entfernt")
-
- if removed_count > 0:
- db_session.commit()
- print(f"\n🧹 {removed_count} Testdrucker erfolgreich entfernt!")
- else:
- print("\n📋 Keine Testdrucker zum Entfernen gefunden")
-
- db_session.close()
-
- except Exception as e:
- db_session.rollback()
- db_session.close()
- print(f"❌ Fehler beim Entfernen der Testdrucker: {str(e)}")
- return False
-
- return True
-
-if __name__ == "__main__":
- print("=== MYP Druckerverwaltung - Testdrucker-Verwaltung ===")
- print()
-
- if len(sys.argv) > 1 and sys.argv[1] == "--remove":
- print("Entferne Testdrucker...")
- remove_test_printers()
- else:
- print("Füge Testdrucker hinzu...")
- print("(Verwende --remove um Testdrucker zu entfernen)")
- print()
- add_test_printers()
-
- print("\nFertig! 🚀")
\ No newline at end of file
diff --git a/backend/utils/create_ssl_cert.py b/backend/utils/create_ssl_cert.py
deleted file mode 100644
index 4426ae4e9..000000000
--- a/backend/utils/create_ssl_cert.py
+++ /dev/null
@@ -1,95 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-SSL-Zertifikat-Generator für die MYP-Plattform
-Erstellt selbstsignierte SSL-Zertifikate für die lokale Entwicklung
-"""
-
-import os
-import datetime
-import sys
-
-# Überprüfen, ob die notwendigen Pakete installiert sind
-try:
- from cryptography import x509
- from cryptography.x509.oid import NameOID
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.primitives.asymmetric import rsa
- from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
-except ImportError:
- print("Fehler: Paket 'cryptography' nicht gefunden.")
- print("Bitte installieren Sie es mit: pip install cryptography")
- sys.exit(1)
-
-def create_self_signed_cert(cert_path, key_path, hostname="localhost"):
- """
- Erstellt ein selbstsigniertes SSL-Zertifikat mit dem angegebenen Hostnamen.
-
- Args:
- cert_path: Pfad zur Zertifikatsdatei
- key_path: Pfad zur privaten Schlüsseldatei
- hostname: Hostname für das Zertifikat (Standard: localhost)
- """
- # Verzeichnis erstellen, falls es nicht existiert
- cert_dir = os.path.dirname(cert_path)
- if cert_dir and not os.path.exists(cert_dir):
- os.makedirs(cert_dir, exist_ok=True)
-
- # Privaten Schlüssel generieren
- private_key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- )
-
- # Schlüsseldatei schreiben
- with open(key_path, "wb") as key_file:
- key_file.write(private_key.private_bytes(
- encoding=Encoding.PEM,
- format=PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=NoEncryption()
- ))
-
- # Name für das Zertifikat erstellen
- subject = issuer = x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, hostname),
- ])
-
- # Zertifikat erstellen
- cert = x509.CertificateBuilder().subject_name(
- subject
- ).issuer_name(
- issuer
- ).public_key(
- private_key.public_key()
- ).serial_number(
- x509.random_serial_number()
- ).not_valid_before(
- datetime.datetime.utcnow()
- ).not_valid_after(
- datetime.datetime.utcnow() + datetime.timedelta(days=365)
- ).add_extension(
- x509.SubjectAlternativeName([x509.DNSName(hostname)]),
- critical=False,
- ).sign(private_key, hashes.SHA256())
-
- # Zertifikatsdatei schreiben
- with open(cert_path, "wb") as cert_file:
- cert_file.write(cert.public_bytes(Encoding.PEM))
-
- print(f"Selbstsigniertes SSL-Zertifikat für '{hostname}' erstellt:")
- print(f"Zertifikat: {cert_path}")
- print(f"Schlüssel: {key_path}")
- print(f"Gültig für 1 Jahr.")
-
-if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description="Erstellt selbstsignierte SSL-Zertifikate für die lokale Entwicklung")
- parser.add_argument("-c", "--cert", default="/home/user/Projektarbeit-MYP/backend/app/certs/myp.crt", help="Pfad zur Zertifikatsdatei")
- parser.add_argument("-k", "--key", default="/home/user/Projektarbeit-MYP/backend/app/certs/myp.key", help="Pfad zur Schlüsseldatei")
- parser.add_argument("-n", "--hostname", default="localhost", help="Hostname für das Zertifikat")
-
- args = parser.parse_args()
-
- create_self_signed_cert(args.cert, args.key, args.hostname)
\ No newline at end of file
diff --git a/backend/utils/create_test_printers.py b/backend/utils/create_test_printers.py
deleted file mode 100644
index 759645601..000000000
--- a/backend/utils/create_test_printers.py
+++ /dev/null
@@ -1,106 +0,0 @@
-#!/usr/bin/env python3
-"""
-Script zum Erstellen von Test-Druckern für die MYP Plattform
-"""
-
-import sys
-import os
-sys.path.append('.')
-
-from models import *
-from datetime import datetime
-
-def create_test_printers():
- """Erstellt Test-Drucker in der Datenbank."""
-
- # Verbindung zur Datenbank
- db_session = get_db_session()
-
- # Test-Drucker Daten
- test_printers = [
- {
- 'name': 'Mercedes-Benz FDM Pro #01',
- 'model': 'Ultimaker S5 Pro',
- 'location': 'Werkhalle Sindelfingen',
- 'plug_ip': '192.168.10.101',
- 'status': 'available',
- 'active': True
- },
- {
- 'name': 'Mercedes-Benz FDM #02',
- 'model': 'Prusa MK3S+',
- 'location': 'Entwicklungszentrum Stuttgart',
- 'plug_ip': '192.168.10.102',
- 'status': 'printing',
- 'active': True
- },
- {
- 'name': 'Mercedes-Benz SLA #01',
- 'model': 'Formlabs Form 3+',
- 'location': 'Prototypenlabor',
- 'plug_ip': '192.168.10.103',
- 'status': 'available',
- 'active': True
- },
- {
- 'name': 'Mercedes-Benz Industrial #01',
- 'model': 'Stratasys F370',
- 'location': 'Industriehalle Bremen',
- 'plug_ip': '192.168.10.104',
- 'status': 'maintenance',
- 'active': False
- },
- {
- 'name': 'Mercedes-Benz Rapid #01',
- 'model': 'Bambu Lab X1 Carbon',
- 'location': 'Designabteilung',
- 'plug_ip': '192.168.10.105',
- 'status': 'offline',
- 'active': True
- },
- {
- 'name': 'Mercedes-Benz SLS #01',
- 'model': 'HP Jet Fusion 5200',
- 'location': 'Produktionszentrum Berlin',
- 'plug_ip': '192.168.10.106',
- 'status': 'available',
- 'active': True
- }
- ]
- try:
- created_count = 0
- for printer_data in test_printers:
- # Prüfen ob Drucker bereits existiert
- existing = db_session.query(Printer).filter_by(name=printer_data['name']).first()
- if not existing:
- printer = Printer(
- name=printer_data['name'],
- model=printer_data['model'],
- location=printer_data['location'],
- plug_ip=printer_data['plug_ip'],
- status=printer_data['status'],
- active=printer_data['active'],
- created_at=datetime.now()
- )
- db_session.add(printer)
- created_count += 1
- print(f"✅ Drucker '{printer_data['name']}' erstellt")
- else:
- print(f"ℹ️ Drucker '{printer_data['name']}' existiert bereits")
-
- db_session.commit()
-
- total_count = db_session.query(Printer).count()
- print(f"\n🎉 {created_count} neue Test-Drucker erstellt!")
- print(f"📊 Insgesamt {total_count} Drucker in der Datenbank.")
-
- except Exception as e:
- print(f"❌ Fehler beim Erstellen der Test-Drucker: {str(e)}")
- db_session.rollback()
- finally:
- db_session.close()
-
-if __name__ == "__main__":
- print("🚀 Erstelle Test-Drucker für MYP Plattform...")
- create_test_printers()
- print("✅ Fertig!")
\ No newline at end of file
diff --git a/backend/utils/email_notification.py b/backend/utils/email_notification.py
deleted file mode 100644
index fefab3504..000000000
--- a/backend/utils/email_notification.py
+++ /dev/null
@@ -1,175 +0,0 @@
-"""
-Offline-kompatible E-Mail-Benachrichtigung für MYP-System
-========================================================
-
-Da das System im Produktionsbetrieb offline läuft, werden alle E-Mail-Benachrichtigungen
-nur geloggt aber nicht tatsächlich versendet.
-"""
-
-import logging
-from datetime import datetime
-from typing import Optional, Dict, Any
-
-from utils.logging_config import get_logger
-
-logger = get_logger("email_notification")
-
-class OfflineEmailNotification:
- """
- Offline-E-Mail-Benachrichtigung die nur Logs erstellt.
- Simuliert E-Mail-Versand für Offline-Betrieb.
- """
-
- def __init__(self):
- self.enabled = False # Immer deaktiviert im Offline-Modus
- logger.info("📧 Offline-E-Mail-Benachrichtigung initialisiert (kein echter E-Mail-Versand)")
-
- def send_email(self, to: str, subject: str, body: str, **kwargs) -> bool:
- """
- Simuliert E-Mail-Versand durch Logging.
-
- Args:
- to: E-Mail-Empfänger
- subject: E-Mail-Betreff
- body: E-Mail-Inhalt
- **kwargs: Zusätzliche Parameter
-
- Returns:
- bool: Immer True (Simulation erfolgreich)
- """
- logger.info(f"📧 [OFFLINE-SIMULATION] E-Mail würde versendet werden:")
- logger.info(f" 📮 An: {to}")
- logger.info(f" 📋 Betreff: {subject}")
- logger.info(f" 📝 Inhalt: {body[:100]}{'...' if len(body) > 100 else ''}")
- logger.info(f" 🕒 Zeitpunkt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}")
-
- if kwargs:
- logger.info(f" ⚙️ Zusätzliche Parameter: {kwargs}")
-
- return True
-
- def send_notification_email(self, recipient: str, notification_type: str,
- data: Dict[str, Any]) -> bool:
- """
- Sendet Benachrichtigungs-E-Mail (Offline-Simulation).
-
- Args:
- recipient: E-Mail-Empfänger
- notification_type: Art der Benachrichtigung
- data: Daten für die Benachrichtigung
-
- Returns:
- bool: Immer True (Simulation erfolgreich)
- """
- subject = f"MYP-Benachrichtigung: {notification_type}"
- body = f"Benachrichtigung vom MYP-System:\n\n{data}"
-
- return self.send_email(recipient, subject, body, notification_type=notification_type)
-
- def send_maintenance_notification(self, recipient: str, task_title: str,
- task_description: str) -> bool:
- """
- Sendet Wartungs-Benachrichtigung (Offline-Simulation).
-
- Args:
- recipient: E-Mail-Empfänger
- task_title: Titel der Wartungsaufgabe
- task_description: Beschreibung der Wartungsaufgabe
-
- Returns:
- bool: Immer True (Simulation erfolgreich)
- """
- subject = f"MYP-Wartungsaufgabe: {task_title}"
- body = f"""
-Neue Wartungsaufgabe im MYP-System:
-
-Titel: {task_title}
-Beschreibung: {task_description}
-Erstellt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}
-
-Bitte loggen Sie sich in das MYP-System ein, um weitere Details zu sehen.
- """
-
- return self.send_email(recipient, subject, body, task_type="maintenance")
-
-# Globale Instanz für einfache Verwendung
-email_notifier = OfflineEmailNotification()
-
-def send_email_notification(recipient: str, subject: str, body: str, **kwargs) -> bool:
- """
- Haupt-Funktion für E-Mail-Versand (Offline-kompatibel).
-
- Args:
- recipient: E-Mail-Empfänger
- subject: E-Mail-Betreff
- body: E-Mail-Inhalt
- **kwargs: Zusätzliche Parameter
-
- Returns:
- bool: True wenn "erfolgreich" (geloggt)
- """
- return email_notifier.send_email(recipient, subject, body, **kwargs)
-
-def send_maintenance_email(recipient: str, task_title: str, task_description: str) -> bool:
- """
- Sendet Wartungs-E-Mail (Offline-kompatibel).
-
- Args:
- recipient: E-Mail-Empfänger
- task_title: Titel der Wartungsaufgabe
- task_description: Beschreibung der Wartungsaufgabe
-
- Returns:
- bool: True wenn "erfolgreich" (geloggt)
- """
- return email_notifier.send_maintenance_notification(recipient, task_title, task_description)
-
-def send_guest_approval_email(recipient: str, otp_code: str, expires_at: str) -> bool:
- """
- Sendet Gastauftrags-Genehmigung-E-Mail (Offline-kompatibel).
-
- Args:
- recipient: E-Mail-Empfänger
- otp_code: OTP-Code für den Gastauftrag
- expires_at: Ablaufzeit des OTP-Codes
-
- Returns:
- bool: True wenn "erfolgreich" (geloggt)
- """
- subject = "MYP-Gastauftrag genehmigt"
- body = f"""
-Ihr Gastauftrag wurde genehmigt!
-
-OTP-Code: {otp_code}
-Gültig bis: {expires_at}
-
-Bitte verwenden Sie diesen Code am MYP-Terminal, um Ihren Druckauftrag zu starten.
- """
-
- return email_notifier.send_email(recipient, subject, body,
- otp_code=otp_code, expires_at=expires_at)
-
-def send_guest_rejection_email(recipient: str, reason: str) -> bool:
- """
- Sendet Gastauftrags-Ablehnungs-E-Mail (Offline-kompatibel).
-
- Args:
- recipient: E-Mail-Empfänger
- reason: Grund für die Ablehnung
-
- Returns:
- bool: True wenn "erfolgreich" (geloggt)
- """
- subject = "MYP-Gastauftrag abgelehnt"
- body = f"""
-Ihr Gastauftrag wurde leider abgelehnt.
-
-Grund: {reason}
-
-Bei Fragen wenden Sie sich bitte an das MYP-Team.
- """
-
- return email_notifier.send_email(recipient, subject, body, rejection_reason=reason)
-
-# Für Backward-Kompatibilität
-send_notification = send_email_notification
\ No newline at end of file
diff --git a/backend/utils/maintenance_system.py b/backend/utils/maintenance_system.py
deleted file mode 100644
index e9bc752ab..000000000
--- a/backend/utils/maintenance_system.py
+++ /dev/null
@@ -1,790 +0,0 @@
-"""
-Wartungsplanungs- und Tracking-System für das MYP-System
-========================================================
-
-Dieses Modul stellt umfassende Wartungsfunktionalität bereit:
-- Geplante und ungeplante Wartungen
-- Wartungsintervalle und Erinnerungen
-- Wartungshistorie und Berichte
-- Automatische Wartungsprüfungen
-- Ersatzteil-Management
-- Techniker-Zuweisungen
-"""
-
-import asyncio
-import json
-import logging
-from datetime import datetime, timedelta
-from typing import Dict, List, Any, Optional, Callable
-from dataclasses import dataclass, asdict
-from enum import Enum
-import threading
-import schedule
-import time
-
-from utils.logging_config import get_logger
-from models import Printer, get_db_session
-from utils.email_notification import send_email_notification
-from utils.realtime_dashboard import emit_system_alert
-
-logger = get_logger("maintenance")
-
-class MaintenanceType(Enum):
- """Arten von Wartungen"""
- PREVENTIVE = "preventive" # Vorbeugende Wartung
- CORRECTIVE = "corrective" # Reparatur/Korrektur
- EMERGENCY = "emergency" # Notfall-Wartung
- SCHEDULED = "scheduled" # Geplante Wartung
- INSPECTION = "inspection" # Inspektion
-
-class MaintenanceStatus(Enum):
- """Status einer Wartung"""
- PLANNED = "planned" # Geplant
- SCHEDULED = "scheduled" # Terminiert
- IN_PROGRESS = "in_progress" # In Bearbeitung
- COMPLETED = "completed" # Abgeschlossen
- CANCELLED = "cancelled" # Abgebrochen
- OVERDUE = "overdue" # Überfällig
-
-class MaintenancePriority(Enum):
- """Priorität einer Wartung"""
- LOW = "low" # Niedrig
- NORMAL = "normal" # Normal
- HIGH = "high" # Hoch
- CRITICAL = "critical" # Kritisch
- EMERGENCY = "emergency" # Notfall
-
-@dataclass
-class MaintenanceTask:
- """Wartungsaufgabe"""
- id: Optional[int] = None
- printer_id: int = None
- title: str = ""
- description: str = ""
- maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE
- priority: MaintenancePriority = MaintenancePriority.NORMAL
- status: MaintenanceStatus = MaintenanceStatus.PLANNED
- scheduled_date: Optional[datetime] = None
- due_date: Optional[datetime] = None
- estimated_duration: int = 60 # Minuten
- actual_duration: Optional[int] = None
- assigned_technician: Optional[str] = None
- created_at: datetime = None
- started_at: Optional[datetime] = None
- completed_at: Optional[datetime] = None
- notes: str = ""
- required_parts: List[str] = None
- actual_parts_used: List[str] = None
- cost: Optional[float] = None
- checklist: List[Dict[str, Any]] = None
- photos: List[str] = None
- created_by: Optional[int] = None
-
-@dataclass
-class MaintenanceSchedule:
- """Wartungsplan"""
- printer_id: int
- maintenance_type: MaintenanceType
- interval_days: int
- next_due: datetime
- last_completed: Optional[datetime] = None
- is_active: bool = True
- description: str = ""
- checklist_template: List[str] = None
-
-@dataclass
-class MaintenanceMetrics:
- """Wartungsmetriken"""
- total_tasks: int = 0
- completed_tasks: int = 0
- overdue_tasks: int = 0
- average_completion_time: float = 0.0
- total_cost: float = 0.0
- mtbf: float = 0.0 # Mean Time Between Failures
- mttr: float = 0.0 # Mean Time To Repair
- uptime_percentage: float = 0.0
-
-class MaintenanceManager:
- """Manager für Wartungsplanung und -tracking"""
-
- def __init__(self):
- self.tasks: Dict[int, MaintenanceTask] = {}
- self.schedules: Dict[int, List[MaintenanceSchedule]] = {}
- self.maintenance_history: List[MaintenanceTask] = []
- self.next_task_id = 1
- self.is_running = False
-
- self._setup_scheduler()
-
- def _setup_scheduler(self):
- """Richtet automatische Wartungsplanung ein"""
- schedule.every().day.at("06:00").do(self._check_scheduled_maintenance)
- schedule.every().hour.do(self._check_overdue_tasks)
- schedule.every().monday.at("08:00").do(self._generate_weekly_report)
-
- # Scheduler in separatem Thread
- def run_scheduler():
- while self.is_running:
- schedule.run_pending()
- time.sleep(60) # Check every minute
-
- self.is_running = True
- scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
- scheduler_thread.start()
-
- logger.info("Wartungs-Scheduler gestartet")
-
- def create_task(self, task: MaintenanceTask) -> int:
- """Erstellt eine neue Wartungsaufgabe"""
- task.id = self.next_task_id
- self.next_task_id += 1
- task.created_at = datetime.now()
-
- self.tasks[task.id] = task
-
- # Automatische Terminierung für vorbeugende Wartungen
- if task.maintenance_type == MaintenanceType.PREVENTIVE and not task.scheduled_date:
- task.scheduled_date = self._calculate_next_maintenance_date(task.printer_id)
-
- # Benachrichtigungen senden
- self._send_task_notifications(task, "created")
-
- logger.info(f"Wartungsaufgabe erstellt: {task.title} für Drucker {task.printer_id}")
- return task.id
-
- def update_task_status(self, task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool:
- """Aktualisiert den Status einer Wartungsaufgabe"""
- if task_id not in self.tasks:
- return False
-
- task = self.tasks[task_id]
- old_status = task.status
- task.status = new_status
-
- # Zeitstempel setzen
- if new_status == MaintenanceStatus.IN_PROGRESS:
- task.started_at = datetime.now()
- elif new_status == MaintenanceStatus.COMPLETED:
- task.completed_at = datetime.now()
- if task.started_at:
- task.actual_duration = int((task.completed_at - task.started_at).total_seconds() / 60)
-
- # Zur Historie hinzufügen
- self.maintenance_history.append(task)
-
- # Nächste Wartung planen
- self._schedule_next_maintenance(task)
-
- if notes:
- task.notes += f"\n{datetime.now().strftime('%d.%m.%Y %H:%M')}: {notes}"
-
- # Benachrichtigungen senden
- if old_status != new_status:
- self._send_task_notifications(task, "status_changed")
-
- logger.info(f"Wartungsaufgabe {task_id} Status: {old_status.value} → {new_status.value}")
- return True
-
- def schedule_maintenance(self, printer_id: int, maintenance_type: MaintenanceType,
- interval_days: int, description: str = "") -> MaintenanceSchedule:
- """Plant regelmäßige Wartungen"""
- schedule_item = MaintenanceSchedule(
- printer_id=printer_id,
- maintenance_type=maintenance_type,
- interval_days=interval_days,
- next_due=datetime.now() + timedelta(days=interval_days),
- description=description
- )
-
- if printer_id not in self.schedules:
- self.schedules[printer_id] = []
-
- self.schedules[printer_id].append(schedule_item)
-
- logger.info(f"Wartungsplan erstellt: {maintenance_type.value} alle {interval_days} Tage für Drucker {printer_id}")
- return schedule_item
-
- def get_upcoming_maintenance(self, days_ahead: int = 7) -> List[MaintenanceTask]:
- """Holt anstehende Wartungen"""
- cutoff_date = datetime.now() + timedelta(days=days_ahead)
-
- upcoming = []
- for task in self.tasks.values():
- if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
- task.due_date and task.due_date <= cutoff_date):
- upcoming.append(task)
-
- return sorted(upcoming, key=lambda t: t.due_date or datetime.max)
-
- def get_overdue_tasks(self) -> List[MaintenanceTask]:
- """Holt überfällige Wartungen"""
- now = datetime.now()
- overdue = []
-
- for task in self.tasks.values():
- if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
- task.due_date and task.due_date < now):
- task.status = MaintenanceStatus.OVERDUE
- overdue.append(task)
-
- return overdue
-
- def get_maintenance_metrics(self, printer_id: Optional[int] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None) -> MaintenanceMetrics:
- """Berechnet Wartungsmetriken"""
- # Filter tasks
- tasks = self.maintenance_history.copy()
- if printer_id:
- tasks = [t for t in tasks if t.printer_id == printer_id]
- if start_date:
- tasks = [t for t in tasks if t.completed_at and t.completed_at >= start_date]
- if end_date:
- tasks = [t for t in tasks if t.completed_at and t.completed_at <= end_date]
-
- if not tasks:
- return MaintenanceMetrics()
-
- completed_tasks = [t for t in tasks if t.status == MaintenanceStatus.COMPLETED]
-
- # Grundmetriken
- total_tasks = len(tasks)
- completed_count = len(completed_tasks)
-
- # Durchschnittliche Bearbeitungszeit
- completion_times = [t.actual_duration for t in completed_tasks if t.actual_duration]
- avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else 0
-
- # Gesamtkosten
- total_cost = sum(t.cost for t in completed_tasks if t.cost)
-
- # MTBF und MTTR berechnen
- mtbf = self._calculate_mtbf(tasks, printer_id)
- mttr = avg_completion_time / 60 # Konvertiere zu Stunden
-
- # Verfügbarkeit berechnen
- uptime_percentage = self._calculate_uptime(printer_id, start_date, end_date)
-
- return MaintenanceMetrics(
- total_tasks=total_tasks,
- completed_tasks=completed_count,
- overdue_tasks=len(self.get_overdue_tasks()),
- average_completion_time=avg_completion_time,
- total_cost=total_cost,
- mtbf=mtbf,
- mttr=mttr,
- uptime_percentage=uptime_percentage
- )
-
- def create_maintenance_checklist(self, maintenance_type: MaintenanceType) -> List[Dict[str, Any]]:
- """Erstellt eine Wartungs-Checkliste"""
- checklists = {
- MaintenanceType.PREVENTIVE: [
- {"task": "Drucker äußerlich reinigen", "completed": False, "required": True},
- {"task": "Druckbett-Level prüfen", "completed": False, "required": True},
- {"task": "Extruder-Düse reinigen", "completed": False, "required": True},
- {"task": "Riemen-Spannung prüfen", "completed": False, "required": True},
- {"task": "Filament-Führung prüfen", "completed": False, "required": False},
- {"task": "Software-Updates prüfen", "completed": False, "required": False},
- {"task": "Lüfter reinigen", "completed": False, "required": True},
- {"task": "Schrauben nachziehen", "completed": False, "required": False}
- ],
- MaintenanceType.CORRECTIVE: [
- {"task": "Problem-Diagnose durchführen", "completed": False, "required": True},
- {"task": "Defekte Teile identifizieren", "completed": False, "required": True},
- {"task": "Ersatzteile bestellen/bereitstellen", "completed": False, "required": True},
- {"task": "Reparatur durchführen", "completed": False, "required": True},
- {"task": "Funktionstest durchführen", "completed": False, "required": True},
- {"task": "Kalibrierung prüfen", "completed": False, "required": True}
- ],
- MaintenanceType.INSPECTION: [
- {"task": "Sichtprüfung der Mechanik", "completed": False, "required": True},
- {"task": "Druckqualität testen", "completed": False, "required": True},
- {"task": "Temperaturen prüfen", "completed": False, "required": True},
- {"task": "Bewegungen testen", "completed": False, "required": True},
- {"task": "Verschleiß bewerten", "completed": False, "required": True}
- ]
- }
-
- return checklists.get(maintenance_type, [])
-
- def _check_scheduled_maintenance(self):
- """Prüft täglich auf fällige Wartungen"""
- logger.info("Prüfe fällige Wartungen...")
-
- today = datetime.now()
-
- for printer_id, schedules in self.schedules.items():
- for schedule_item in schedules:
- if not schedule_item.is_active:
- continue
-
- if schedule_item.next_due <= today:
- # Erstelle Wartungsaufgabe
- task = MaintenanceTask(
- printer_id=printer_id,
- title=f"{schedule_item.maintenance_type.value.title()} Wartung",
- description=schedule_item.description,
- maintenance_type=schedule_item.maintenance_type,
- priority=MaintenancePriority.NORMAL,
- due_date=schedule_item.next_due,
- checklist=self.create_maintenance_checklist(schedule_item.maintenance_type)
- )
-
- task_id = self.create_task(task)
-
- # Nächsten Termin berechnen
- schedule_item.next_due = today + timedelta(days=schedule_item.interval_days)
-
- logger.info(f"Automatische Wartungsaufgabe erstellt: {task_id}")
-
- def _check_overdue_tasks(self):
- """Prüft stündlich auf überfällige Aufgaben"""
- overdue = self.get_overdue_tasks()
-
- if overdue:
- logger.warning(f"{len(overdue)} überfällige Wartungsaufgaben gefunden")
-
- for task in overdue:
- emit_system_alert(
- f"Wartung überfällig: {task.title} (Drucker {task.printer_id})",
- "warning",
- "high"
- )
-
- def _generate_weekly_report(self):
- """Generiert wöchentlichen Wartungsbericht"""
- logger.info("Generiere wöchentlichen Wartungsbericht...")
-
- # Sammle Daten der letzten Woche
- last_week = datetime.now() - timedelta(days=7)
- metrics = self.get_maintenance_metrics(start_date=last_week)
-
- # Sende Report (Implementation abhängig von verfügbaren Services)
- # send_maintenance_report(metrics)
-
- def _calculate_next_maintenance_date(self, printer_id: int) -> datetime:
- """Berechnet nächstes Wartungsdatum basierend auf Nutzung"""
- # Vereinfachte Implementierung - kann erweitert werden
- base_interval = 30 # Tage
-
- # Hier könnte man Nutzungsstatistiken einbeziehen
- with get_db_session() as db_session:
- printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
- if printer:
- # Berücksichtige letzten Check
- if printer.last_checked:
- days_since_check = (datetime.now() - printer.last_checked).days
- if days_since_check < 15: # Kürzlich gecheckt
- base_interval += 15
-
- return datetime.now() + timedelta(days=base_interval)
-
- def _schedule_next_maintenance(self, completed_task: MaintenanceTask):
- """Plant nächste Wartung nach Abschluss einer Aufgabe"""
- if completed_task.maintenance_type == MaintenanceType.PREVENTIVE:
- # Finde entsprechenden Schedule
- printer_schedules = self.schedules.get(completed_task.printer_id, [])
- for schedule_item in printer_schedules:
- if schedule_item.maintenance_type == completed_task.maintenance_type:
- schedule_item.last_completed = completed_task.completed_at
- schedule_item.next_due = datetime.now() + timedelta(days=schedule_item.interval_days)
- break
-
- def _calculate_mtbf(self, tasks: List[MaintenanceTask], printer_id: Optional[int]) -> float:
- """Berechnet Mean Time Between Failures"""
- # Vereinfachte MTBF-Berechnung
- failure_tasks = [t for t in tasks if t.maintenance_type == MaintenanceType.CORRECTIVE]
-
- if len(failure_tasks) < 2:
- return 0.0
-
- # Zeitspanne zwischen ersten und letzten Ausfall
- first_failure = min(failure_tasks, key=lambda t: t.created_at)
- last_failure = max(failure_tasks, key=lambda t: t.created_at)
-
- total_time = (last_failure.created_at - first_failure.created_at).total_seconds() / 3600 # Stunden
- failure_count = len(failure_tasks) - 1
-
- return total_time / failure_count if failure_count > 0 else 0.0
-
- def _calculate_uptime(self, printer_id: Optional[int], start_date: Optional[datetime],
- end_date: Optional[datetime]) -> float:
- """Berechnet Verfügbarkeit in Prozent"""
- # Vereinfachte Uptime-Berechnung
- if not start_date:
- start_date = datetime.now() - timedelta(days=30)
- if not end_date:
- end_date = datetime.now()
-
- total_time = (end_date - start_date).total_seconds()
-
- # Berechne Downtime aus Wartungszeiten
- downtime = 0
- for task in self.maintenance_history:
- if printer_id and task.printer_id != printer_id:
- continue
-
- if (task.status == MaintenanceStatus.COMPLETED and
- task.started_at and task.completed_at and
- task.started_at >= start_date and task.completed_at <= end_date):
- downtime += (task.completed_at - task.started_at).total_seconds()
-
- uptime = ((total_time - downtime) / total_time) * 100 if total_time > 0 else 0
- return max(0, min(100, uptime))
-
- def _send_task_notifications(self, task: MaintenanceTask, event_type: str):
- """Sendet Benachrichtigungen für Wartungsaufgaben"""
- try:
- if event_type == "created":
- emit_system_alert(
- f"Neue Wartungsaufgabe: {task.title} (Drucker {task.printer_id})",
- "info",
- "normal"
- )
- elif event_type == "status_changed":
- emit_system_alert(
- f"Wartungsstatus geändert: {task.title} → {task.status.value}",
- "info",
- "normal"
- )
- except Exception as e:
- logger.error(f"Fehler beim Senden der Wartungsbenachrichtigung: {str(e)}")
-
-# Globale Instanz
-maintenance_manager = MaintenanceManager()
-
-def get_maintenance_dashboard_data() -> Dict[str, Any]:
- """Holt Dashboard-Daten für Wartungen"""
- upcoming = maintenance_manager.get_upcoming_maintenance()
- overdue = maintenance_manager.get_overdue_tasks()
- metrics = maintenance_manager.get_maintenance_metrics()
-
- return {
- 'upcoming_count': len(upcoming),
- 'overdue_count': len(overdue),
- 'upcoming_tasks': [asdict(task) for task in upcoming[:5]],
- 'overdue_tasks': [asdict(task) for task in overdue],
- 'metrics': asdict(metrics),
- 'next_scheduled': upcoming[0] if upcoming else None
- }
-
-def create_emergency_maintenance(printer_id: int, description: str,
- priority: MaintenancePriority = MaintenancePriority.CRITICAL) -> int:
- """Erstellt eine Notfall-Wartung"""
- task = MaintenanceTask(
- printer_id=printer_id,
- title="Notfall-Wartung",
- description=description,
- maintenance_type=MaintenanceType.EMERGENCY,
- priority=priority,
- due_date=datetime.now(), # Sofort fällig
- checklist=maintenance_manager.create_maintenance_checklist(MaintenanceType.CORRECTIVE)
- )
-
- return maintenance_manager.create_task(task)
-
-def schedule_preventive_maintenance(printer_id: int, interval_days: int = 30) -> MaintenanceSchedule:
- """Plant vorbeugende Wartung"""
- return maintenance_manager.schedule_maintenance(
- printer_id=printer_id,
- maintenance_type=MaintenanceType.PREVENTIVE,
- interval_days=interval_days,
- description="Regelmäßige vorbeugende Wartung"
- )
-
-# JavaScript für Wartungs-Frontend
-def get_maintenance_javascript() -> str:
- """JavaScript für Wartungsmanagement"""
- return """
- class MaintenanceManager {
- constructor() {
- this.currentTasks = [];
- this.selectedTask = null;
-
- this.init();
- }
-
- init() {
- this.loadTasks();
- this.setupEventListeners();
- this.startAutoRefresh();
- }
-
- setupEventListeners() {
- // Task status updates
- document.addEventListener('click', (e) => {
- if (e.target.matches('.maintenance-status-btn')) {
- const taskId = e.target.dataset.taskId;
- const newStatus = e.target.dataset.status;
- this.updateTaskStatus(taskId, newStatus);
- }
-
- if (e.target.matches('.maintenance-details-btn')) {
- const taskId = e.target.dataset.taskId;
- this.showTaskDetails(taskId);
- }
- });
-
- // Create maintenance form
- const createForm = document.getElementById('create-maintenance-form');
- createForm?.addEventListener('submit', (e) => {
- e.preventDefault();
- this.createTask(new FormData(createForm));
- });
- }
-
- async loadTasks() {
- try {
- const response = await fetch('/api/maintenance/tasks');
- const data = await response.json();
-
- if (data.success) {
- this.currentTasks = data.tasks;
- this.renderTasks();
- }
- } catch (error) {
- console.error('Fehler beim Laden der Wartungsaufgaben:', error);
- }
- }
-
- async updateTaskStatus(taskId, newStatus) {
- try {
- const response = await fetch(`/api/maintenance/tasks/${taskId}/status`, {
- method: 'PUT',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ status: newStatus })
- });
-
- const result = await response.json();
-
- if (result.success) {
- this.loadTasks(); // Refresh
- this.showNotification('Wartungsstatus aktualisiert', 'success');
- } else {
- this.showNotification('Fehler beim Aktualisieren', 'error');
- }
- } catch (error) {
- console.error('Status-Update fehlgeschlagen:', error);
- }
- }
-
- renderTasks() {
- const container = document.getElementById('maintenance-tasks-container');
- if (!container) return;
-
- container.innerHTML = this.currentTasks.map(task => `
-
-
-
-
Drucker: ${task.printer_id}
-
Typ: ${task.maintenance_type}
-
Fällig: ${this.formatDate(task.due_date)}
-
Status: ${task.status}
-
-
-
-
-
-
-
- `).join('');
- }
-
- showTaskDetails(taskId) {
- const task = this.currentTasks.find(t => t.id == taskId);
- if (!task) return;
-
- // Create modal with task details
- const modal = document.createElement('div');
- modal.className = 'maintenance-modal';
- modal.innerHTML = `
-
-
-
-
-
Beschreibung: ${task.description}
-
Techniker: ${task.assigned_technician || 'Nicht zugewiesen'}
-
Geschätzte Dauer: ${task.estimated_duration} Minuten
-
- ${task.checklist ? this.renderChecklist(task.checklist) : ''}
-
-
-
Notizen:
-
-
-
-
-
-
- `;
-
- document.body.appendChild(modal);
-
- // Close modal handlers
- modal.querySelector('.close-modal').onclick = () => modal.remove();
- modal.onclick = (e) => {
- if (e.target === modal) modal.remove();
- };
- }
-
- renderChecklist(checklist) {
- return `
-
-
Checkliste:
- ${checklist.map((item, index) => `
-
- `).join('')}
-
- `;
- }
-
- formatDate(dateString) {
- if (!dateString) return 'Nicht gesetzt';
- const date = new Date(dateString);
- return date.toLocaleDateString('de-DE') + ' ' + date.toLocaleTimeString('de-DE', {hour: '2-digit', minute: '2-digit'});
- }
-
- showNotification(message, type = 'info') {
- const notification = document.createElement('div');
- notification.className = `notification notification-${type}`;
- notification.textContent = message;
-
- document.body.appendChild(notification);
-
- setTimeout(() => {
- notification.remove();
- }, 3000);
- }
-
- startAutoRefresh() {
- setInterval(() => {
- this.loadTasks();
- }, 30000); // Refresh every 30 seconds
- }
- }
-
- // Initialize when DOM is ready
- document.addEventListener('DOMContentLoaded', function() {
- window.maintenanceManager = new MaintenanceManager();
- });
- """
-
-def create_maintenance_task(printer_id: int, title: str, description: str = "",
- maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE,
- priority: MaintenancePriority = MaintenancePriority.NORMAL) -> int:
- """
- Erstellt eine neue Wartungsaufgabe.
-
- Args:
- printer_id: ID des Druckers
- title: Titel der Wartungsaufgabe
- description: Beschreibung der Aufgabe
- maintenance_type: Art der Wartung
- priority: Priorität der Aufgabe
-
- Returns:
- int: ID der erstellten Aufgabe
- """
- task = MaintenanceTask(
- printer_id=printer_id,
- title=title,
- description=description,
- maintenance_type=maintenance_type,
- priority=priority,
- checklist=maintenance_manager.create_maintenance_checklist(maintenance_type)
- )
-
- return maintenance_manager.create_task(task)
-
-def schedule_maintenance(printer_id: int, maintenance_type: MaintenanceType,
- interval_days: int, description: str = "") -> MaintenanceSchedule:
- """
- Plant regelmäßige Wartungen (Alias für maintenance_manager.schedule_maintenance).
-
- Args:
- printer_id: ID des Druckers
- maintenance_type: Art der Wartung
- interval_days: Intervall in Tagen
- description: Beschreibung
-
- Returns:
- MaintenanceSchedule: Erstellter Wartungsplan
- """
- return maintenance_manager.schedule_maintenance(
- printer_id=printer_id,
- maintenance_type=maintenance_type,
- interval_days=interval_days,
- description=description
- )
-
-def get_maintenance_overview() -> Dict[str, Any]:
- """
- Holt eine Übersicht aller Wartungsaktivitäten.
-
- Returns:
- Dict: Wartungsübersicht mit Statistiken und anstehenden Aufgaben
- """
- upcoming = maintenance_manager.get_upcoming_maintenance()
- overdue = maintenance_manager.get_overdue_tasks()
- metrics = maintenance_manager.get_maintenance_metrics()
-
- # Aktive Tasks
- active_tasks = [task for task in maintenance_manager.tasks.values()
- if task.status == MaintenanceStatus.IN_PROGRESS]
-
- # Completed tasks in last 30 days
- thirty_days_ago = datetime.now() - timedelta(days=30)
- recent_completed = [task for task in maintenance_manager.maintenance_history
- if task.completed_at and task.completed_at >= thirty_days_ago]
-
- return {
- 'summary': {
- 'total_tasks': len(maintenance_manager.tasks),
- 'active_tasks': len(active_tasks),
- 'upcoming_tasks': len(upcoming),
- 'overdue_tasks': len(overdue),
- 'completed_this_month': len(recent_completed)
- },
- 'upcoming_tasks': [asdict(task) for task in upcoming[:10]],
- 'overdue_tasks': [asdict(task) for task in overdue],
- 'active_tasks': [asdict(task) for task in active_tasks],
- 'recent_completed': [asdict(task) for task in recent_completed[:5]],
- 'metrics': asdict(metrics),
- 'schedules': {
- printer_id: [asdict(schedule) for schedule in schedules]
- for printer_id, schedules in maintenance_manager.schedules.items()
- }
- }
-
-def update_maintenance_status(task_id: int, new_status: MaintenanceStatus,
- notes: str = "") -> bool:
- """
- Aktualisiert den Status einer Wartungsaufgabe (Alias für maintenance_manager.update_task_status).
-
- Args:
- task_id: ID der Wartungsaufgabe
- new_status: Neuer Status
- notes: Optionale Notizen
-
- Returns:
- bool: True wenn erfolgreich aktualisiert
- """
- return maintenance_manager.update_task_status(task_id, new_status, notes)
\ No newline at end of file
diff --git a/backend/utils/multi_location_system.py b/backend/utils/multi_location_system.py
deleted file mode 100644
index 7d9e629fd..000000000
--- a/backend/utils/multi_location_system.py
+++ /dev/null
@@ -1,899 +0,0 @@
-"""
-Multi-Standort-Unterstützungssystem für das MYP-System
-======================================================
-
-Dieses Modul stellt umfassende Multi-Location-Funktionalität bereit:
-- Standort-Management und Hierarchien
-- Standort-spezifische Konfigurationen
-- Zentrale und dezentrale Verwaltung
-- Standort-übergreifende Berichte
-- Ressourcen-Sharing zwischen Standorten
-- Benutzer-Standort-Zuweisungen
-"""
-
-import json
-import logging
-from datetime import datetime, timedelta
-from typing import Dict, List, Any, Optional, Tuple
-from dataclasses import dataclass, asdict
-from enum import Enum
-import geocoder
-import requests
-
-from utils.logging_config import get_logger
-from models import User, Printer, Job, get_db_session
-
-logger = get_logger("multi_location")
-
-class LocationType(Enum):
- """Arten von Standorten"""
- HEADQUARTERS = "headquarters" # Hauptsitz
- BRANCH = "branch" # Niederlassung
- DEPARTMENT = "department" # Abteilung
- FLOOR = "floor" # Stockwerk
- ROOM = "room" # Raum
- AREA = "area" # Bereich
-
-class AccessLevel(Enum):
- """Zugriffslevel für Standorte"""
- FULL = "full" # Vollzugriff
- READ_WRITE = "read_write" # Lesen und Schreiben
- READ_ONLY = "read_only" # Nur Lesen
- NO_ACCESS = "no_access" # Kein Zugriff
-
-@dataclass
-class LocationConfig:
- """Standort-spezifische Konfiguration"""
- timezone: str = "Europe/Berlin"
- business_hours: Dict[str, str] = None
- maintenance_window: Dict[str, str] = None
- auto_approval_enabled: bool = False
- max_job_duration: int = 480 # Minuten
- contact_info: Dict[str, str] = None
- notification_settings: Dict[str, Any] = None
-
-@dataclass
-class Location:
- """Standort-Definition"""
- id: Optional[int] = None
- name: str = ""
- code: str = "" # Kurzer Code für den Standort
- location_type: LocationType = LocationType.BRANCH
- parent_id: Optional[int] = None
- address: str = ""
- city: str = ""
- country: str = ""
- postal_code: str = ""
- latitude: Optional[float] = None
- longitude: Optional[float] = None
- description: str = ""
- config: LocationConfig = None
- is_active: bool = True
- created_at: datetime = None
- manager_id: Optional[int] = None
-
- def __post_init__(self):
- if self.config is None:
- self.config = LocationConfig()
- if self.created_at is None:
- self.created_at = datetime.now()
-
-@dataclass
-class UserLocationAccess:
- """Benutzer-Standort-Zugriff"""
- user_id: int
- location_id: int
- access_level: AccessLevel
- granted_by: int
- granted_at: datetime
- expires_at: Optional[datetime] = None
- is_primary: bool = False
-
-class MultiLocationManager:
- """Manager für Multi-Standort-Funktionalität"""
-
- def __init__(self):
- self.locations: Dict[int, Location] = {}
- self.user_access: Dict[int, List[UserLocationAccess]] = {}
- self.next_location_id = 1
-
- # Standard-Standort erstellen
- self._create_default_location()
-
- def _create_default_location(self):
- """Erstellt Standard-Standort falls keiner existiert"""
- default_location = Location(
- id=1,
- name="Hauptstandort",
- code="HQ",
- location_type=LocationType.HEADQUARTERS,
- address="Mercedes-Benz Platz",
- city="Stuttgart",
- country="Deutschland",
- description="Hauptstandort des MYP-Systems"
- )
-
- self.locations[1] = default_location
- self.next_location_id = 2
-
- logger.info("Standard-Standort erstellt")
-
- def create_location(self, location: Location) -> int:
- """Erstellt einen neuen Standort"""
- location.id = self.next_location_id
- self.next_location_id += 1
-
- # Koordinaten automatisch ermitteln
- if not location.latitude or not location.longitude:
- self._geocode_location(location)
-
- self.locations[location.id] = location
-
- logger.info(f"Standort erstellt: {location.name} ({location.code})")
- return location.id
-
- def update_location(self, location_id: int, updates: Dict[str, Any]) -> bool:
- """Aktualisiert einen Standort"""
- if location_id not in self.locations:
- return False
-
- location = self.locations[location_id]
-
- for key, value in updates.items():
- if hasattr(location, key):
- setattr(location, key, value)
-
- # Koordinaten neu ermitteln bei Adressänderung
- if 'address' in updates or 'city' in updates:
- self._geocode_location(location)
-
- logger.info(f"Standort aktualisiert: {location.name}")
- return True
-
- def delete_location(self, location_id: int) -> bool:
- """Löscht einen Standort (Soft Delete)"""
- if location_id not in self.locations:
- return False
-
- location = self.locations[location_id]
-
- # Prüfe ob Standort Kinder hat
- children = self.get_child_locations(location_id)
- if children:
- logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat Unterstandorte")
- return False
-
- # Prüfe auf aktive Ressourcen
- if self._has_active_resources(location_id):
- logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat aktive Ressourcen")
- return False
-
- location.is_active = False
- logger.info(f"Standort deaktiviert: {location.name}")
- return True
-
- def get_location_hierarchy(self, location_id: Optional[int] = None) -> Dict[str, Any]:
- """Holt Standort-Hierarchie"""
- if location_id:
- # Spezifische Hierarchie ab einem Standort
- location = self.locations.get(location_id)
- if not location:
- return {}
-
- return self._build_hierarchy_node(location)
- else:
- # Komplette Hierarchie
- root_locations = [loc for loc in self.locations.values()
- if loc.parent_id is None and loc.is_active]
-
- return {
- 'locations': [self._build_hierarchy_node(loc) for loc in root_locations]
- }
-
- def _build_hierarchy_node(self, location: Location) -> Dict[str, Any]:
- """Erstellt einen Hierarchie-Knoten"""
- children = self.get_child_locations(location.id)
-
- return {
- 'id': location.id,
- 'name': location.name,
- 'code': location.code,
- 'type': location.location_type.value,
- 'children': [self._build_hierarchy_node(child) for child in children],
- 'resource_count': self._count_location_resources(location.id)
- }
-
- def get_child_locations(self, parent_id: int) -> List[Location]:
- """Holt alle Kinder-Standorte"""
- return [loc for loc in self.locations.values()
- if loc.parent_id == parent_id and loc.is_active]
-
- def get_location_path(self, location_id: int) -> List[Location]:
- """Holt den Pfad vom Root zum Standort"""
- path = []
- current_id = location_id
-
- while current_id:
- location = self.locations.get(current_id)
- if not location:
- break
-
- path.insert(0, location)
- current_id = location.parent_id
-
- return path
-
- def grant_location_access(self, user_id: int, location_id: int,
- access_level: AccessLevel, granted_by: int,
- expires_at: Optional[datetime] = None,
- is_primary: bool = False) -> bool:
- """Gewährt Benutzer-Zugriff auf einen Standort"""
- if location_id not in self.locations:
- return False
-
- access = UserLocationAccess(
- user_id=user_id,
- location_id=location_id,
- access_level=access_level,
- granted_by=granted_by,
- granted_at=datetime.now(),
- expires_at=expires_at,
- is_primary=is_primary
- )
-
- if user_id not in self.user_access:
- self.user_access[user_id] = []
-
- # Entferne vorherigen Zugriff für diesen Standort
- self.user_access[user_id] = [
- acc for acc in self.user_access[user_id]
- if acc.location_id != location_id
- ]
-
- # Setze anderen primary-Zugriff zurück falls nötig
- if is_primary:
- for access_item in self.user_access[user_id]:
- access_item.is_primary = False
-
- self.user_access[user_id].append(access)
-
- logger.info(f"Standort-Zugriff gewährt: User {user_id} → Location {location_id} ({access_level.value})")
- return True
-
- def revoke_location_access(self, user_id: int, location_id: int) -> bool:
- """Entzieht Benutzer-Zugriff auf einen Standort"""
- if user_id not in self.user_access:
- return False
-
- original_count = len(self.user_access[user_id])
- self.user_access[user_id] = [
- acc for acc in self.user_access[user_id]
- if acc.location_id != location_id
- ]
-
- success = len(self.user_access[user_id]) < original_count
- if success:
- logger.info(f"Standort-Zugriff entzogen: User {user_id} → Location {location_id}")
-
- return success
-
- def get_user_locations(self, user_id: int, access_level: Optional[AccessLevel] = None) -> List[Location]:
- """Holt alle Standorte eines Benutzers"""
- if user_id not in self.user_access:
- return []
-
- accessible_locations = []
- now = datetime.now()
-
- for access in self.user_access[user_id]:
- # Prüfe Ablaufzeit
- if access.expires_at and access.expires_at < now:
- continue
-
- # Prüfe Access Level
- if access_level and access.access_level != access_level:
- continue
-
- location = self.locations.get(access.location_id)
- if location and location.is_active:
- accessible_locations.append(location)
-
- return accessible_locations
-
- def get_user_primary_location(self, user_id: int) -> Optional[Location]:
- """Holt den primären Standort eines Benutzers"""
- if user_id not in self.user_access:
- return None
-
- for access in self.user_access[user_id]:
- if access.is_primary:
- return self.locations.get(access.location_id)
-
- # Fallback: ersten verfügbaren Standort nehmen
- user_locations = self.get_user_locations(user_id)
- return user_locations[0] if user_locations else None
-
- def check_user_access(self, user_id: int, location_id: int,
- required_level: AccessLevel = AccessLevel.READ_ONLY) -> bool:
- """Prüft ob Benutzer Zugriff auf Standort hat"""
- if user_id not in self.user_access:
- return False
-
- access_levels = {
- AccessLevel.NO_ACCESS: 0,
- AccessLevel.READ_ONLY: 1,
- AccessLevel.READ_WRITE: 2,
- AccessLevel.FULL: 3
- }
-
- required_level_value = access_levels[required_level]
- now = datetime.now()
-
- for access in self.user_access[user_id]:
- if access.location_id != location_id:
- continue
-
- # Prüfe Ablaufzeit
- if access.expires_at and access.expires_at < now:
- continue
-
- user_level_value = access_levels[access.access_level]
- if user_level_value >= required_level_value:
- return True
-
- return False
-
- def get_location_resources(self, location_id: int) -> Dict[str, Any]:
- """Holt alle Ressourcen eines Standorts"""
- if location_id not in self.locations:
- return {}
-
- # Simuliere Datenbankabfrage für Drucker und Jobs
- resources = {
- 'printers': [],
- 'active_jobs': [],
- 'users': [],
- 'pending_maintenance': 0
- }
-
- # In echter Implementierung würde hier die Datenbank abgefragt
- with get_db_session() as db_session:
- # Drucker des Standorts (vereinfacht - benötigt location_id in Printer-Model)
- # printers = db_session.query(Printer).filter(Printer.location_id == location_id).all()
- # resources['printers'] = [p.to_dict() for p in printers]
- pass
-
- return resources
-
- def get_location_statistics(self, location_id: int,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None) -> Dict[str, Any]:
- """Holt Statistiken für einen Standort"""
- if not start_date:
- start_date = datetime.now() - timedelta(days=30)
- if not end_date:
- end_date = datetime.now()
-
- # Sammle Statistiken
- stats = {
- 'location': self.locations.get(location_id, {}).name if location_id in self.locations else 'Unbekannt',
- 'period': {
- 'start': start_date.isoformat(),
- 'end': end_date.isoformat()
- },
- 'totals': {
- 'printers': 0,
- 'jobs_completed': 0,
- 'jobs_failed': 0,
- 'print_time_hours': 0,
- 'material_used_kg': 0,
- 'users_active': 0
- },
- 'averages': {
- 'jobs_per_day': 0,
- 'job_duration_minutes': 0,
- 'printer_utilization': 0
- },
- 'trends': {
- 'daily_jobs': [],
- 'printer_usage': []
- }
- }
-
- # In echter Implementierung würden hier Datenbankabfragen stehen
-
- return stats
-
- def get_multi_location_report(self, location_ids: List[int] = None) -> Dict[str, Any]:
- """Erstellt standortübergreifenden Bericht"""
- if not location_ids:
- location_ids = list(self.locations.keys())
-
- report = {
- 'generated_at': datetime.now().isoformat(),
- 'locations': [],
- 'summary': {
- 'total_locations': len(location_ids),
- 'total_printers': 0,
- 'total_users': 0,
- 'total_jobs': 0,
- 'cross_location_sharing': []
- }
- }
-
- for location_id in location_ids:
- location = self.locations.get(location_id)
- if not location:
- continue
-
- location_stats = self.get_location_statistics(location_id)
- location_data = {
- 'id': location.id,
- 'name': location.name,
- 'code': location.code,
- 'type': location.location_type.value,
- 'statistics': location_stats
- }
-
- report['locations'].append(location_data)
-
- # Summiere für Gesamtübersicht
- totals = location_stats.get('totals', {})
- report['summary']['total_printers'] += totals.get('printers', 0)
- report['summary']['total_users'] += totals.get('users_active', 0)
- report['summary']['total_jobs'] += totals.get('jobs_completed', 0)
-
- return report
-
- def find_nearest_locations(self, latitude: float, longitude: float,
- radius_km: float = 50, limit: int = 5) -> List[Tuple[Location, float]]:
- """Findet nächstgelegene Standorte"""
- from math import radians, sin, cos, sqrt, atan2
-
- def calculate_distance(lat1, lon1, lat2, lon2):
- """Berechnet Entfernung zwischen zwei Koordinaten (Haversine)"""
- R = 6371 # Erdradius in km
-
- lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
- dlat = lat2 - lat1
- dlon = lon2 - lon1
-
- a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
- c = 2 * atan2(sqrt(a), sqrt(1-a))
-
- return R * c
-
- nearby_locations = []
-
- for location in self.locations.values():
- if not location.is_active or not location.latitude or not location.longitude:
- continue
-
- distance = calculate_distance(
- latitude, longitude,
- location.latitude, location.longitude
- )
-
- if distance <= radius_km:
- nearby_locations.append((location, distance))
-
- # Sortiere nach Entfernung
- nearby_locations.sort(key=lambda x: x[1])
-
- return nearby_locations[:limit]
-
- def _geocode_location(self, location: Location):
- """Ermittelt Koordinaten für einen Standort"""
- try:
- address_parts = [location.address, location.city, location.country]
- full_address = ', '.join(filter(None, address_parts))
-
- if not full_address:
- return
-
- # Verwende geocoder library
- result = geocoder.osm(full_address)
-
- if result.ok:
- location.latitude = result.lat
- location.longitude = result.lng
- logger.info(f"Koordinaten ermittelt für {location.name}: {location.latitude}, {location.longitude}")
- else:
- logger.warning(f"Koordinaten konnten nicht ermittelt werden für {location.name}")
-
- except Exception as e:
- logger.error(f"Fehler bei Geocoding für {location.name}: {str(e)}")
-
- def _has_active_resources(self, location_id: int) -> bool:
- """Prüft ob Standort aktive Ressourcen hat"""
- # Vereinfachte Implementierung
- # In echter Implementation würde hier die Datenbank geprüft
- return False
-
- def _count_location_resources(self, location_id: int) -> Dict[str, int]:
- """Zählt Ressourcen eines Standorts"""
- # Vereinfachte Implementierung
- return {
- 'printers': 0,
- 'users': 0,
- 'jobs': 0
- }
-
-# Globale Instanz
-location_manager = MultiLocationManager()
-
-# Alias für Import-Kompatibilität
-LocationManager = MultiLocationManager
-
-def create_location(name: str, code: str, location_type: LocationType = LocationType.BRANCH,
- address: str = "", city: str = "", country: str = "",
- parent_id: Optional[int] = None) -> int:
- """
- Erstellt einen neuen Standort (globale Funktion).
-
- Args:
- name: Name des Standorts
- code: Kurzer Code für den Standort
- location_type: Art des Standorts
- address: Adresse
- city: Stadt
- country: Land
- parent_id: Parent-Standort ID
-
- Returns:
- int: ID des erstellten Standorts
- """
- location = Location(
- name=name,
- code=code,
- location_type=location_type,
- address=address,
- city=city,
- country=country,
- parent_id=parent_id
- )
-
- return location_manager.create_location(location)
-
-def assign_user_to_location(user_id: int, location_id: int,
- access_level: AccessLevel = AccessLevel.READ_WRITE,
- granted_by: int = 1, is_primary: bool = False) -> bool:
- """
- Weist einen Benutzer einem Standort zu.
-
- Args:
- user_id: ID des Benutzers
- location_id: ID des Standorts
- access_level: Zugriffslevel
- granted_by: ID des gewährenden Benutzers
- is_primary: Ob dies der primäre Standort ist
-
- Returns:
- bool: True wenn erfolgreich
- """
- return location_manager.grant_location_access(
- user_id=user_id,
- location_id=location_id,
- access_level=access_level,
- granted_by=granted_by,
- is_primary=is_primary
- )
-
-def get_user_locations(user_id: int) -> List[Location]:
- """
- Holt alle Standorte eines Benutzers (globale Funktion).
-
- Args:
- user_id: ID des Benutzers
-
- Returns:
- List[Location]: Liste der zugänglichen Standorte
- """
- return location_manager.get_user_locations(user_id)
-
-def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
- """
- Berechnet die Entfernung zwischen zwei Koordinaten (Haversine-Formel).
-
- Args:
- lat1, lon1: Koordinaten des ersten Punkts
- lat2, lon2: Koordinaten des zweiten Punkts
-
- Returns:
- float: Entfernung in Kilometern
- """
- from math import radians, sin, cos, sqrt, atan2
-
- R = 6371 # Erdradius in km
-
- lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
- dlat = lat2 - lat1
- dlon = lon2 - lon1
-
- a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
- c = 2 * atan2(sqrt(a), sqrt(1-a))
-
- return R * c
-
-def find_nearest_location(latitude: float, longitude: float,
- radius_km: float = 50) -> Optional[Location]:
- """
- Findet den nächstgelegenen Standort.
-
- Args:
- latitude: Breitengrad
- longitude: Längengrad
- radius_km: Suchradius in Kilometern
-
- Returns:
- Optional[Location]: Nächstgelegener Standort oder None
- """
- nearest_locations = location_manager.find_nearest_locations(
- latitude=latitude,
- longitude=longitude,
- radius_km=radius_km,
- limit=1
- )
-
- return nearest_locations[0][0] if nearest_locations else None
-
-def get_location_dashboard_data(user_id: int) -> Dict[str, Any]:
- """Holt Dashboard-Daten für Standorte eines Benutzers"""
- user_locations = location_manager.get_user_locations(user_id)
- primary_location = location_manager.get_user_primary_location(user_id)
-
- dashboard_data = {
- 'user_locations': [asdict(loc) for loc in user_locations],
- 'primary_location': asdict(primary_location) if primary_location else None,
- 'location_count': len(user_locations),
- 'hierarchy': location_manager.get_location_hierarchy()
- }
-
- # Füge Statistiken für jeden Standort hinzu
- for location in user_locations:
- location_stats = location_manager.get_location_statistics(location.id)
- dashboard_data[f'stats_{location.id}'] = location_stats
-
- return dashboard_data
-
-def create_location_from_address(name: str, address: str, city: str,
- country: str, location_type: LocationType = LocationType.BRANCH) -> int:
- """Erstellt Standort aus Adresse mit automatischer Geocodierung"""
- location = Location(
- name=name,
- code=name[:3].upper(),
- location_type=location_type,
- address=address,
- city=city,
- country=country
- )
-
- return location_manager.create_location(location)
-
-# JavaScript für Multi-Location Frontend
-def get_multi_location_javascript() -> str:
- """JavaScript für Multi-Location Management"""
- return """
- class MultiLocationManager {
- constructor() {
- this.currentLocation = null;
- this.userLocations = [];
- this.locationHierarchy = {};
-
- this.init();
- }
-
- init() {
- this.loadUserLocations();
- this.setupEventListeners();
- }
-
- setupEventListeners() {
- // Location switcher
- document.addEventListener('change', (e) => {
- if (e.target.matches('.location-selector')) {
- const locationId = parseInt(e.target.value);
- this.switchLocation(locationId);
- }
- });
-
- // Location management buttons
- document.addEventListener('click', (e) => {
- if (e.target.matches('.manage-locations-btn')) {
- this.showLocationManager();
- }
-
- if (e.target.matches('.location-hierarchy-btn')) {
- this.showLocationHierarchy();
- }
- });
- }
-
- async loadUserLocations() {
- try {
- const response = await fetch('/api/locations/user');
- const data = await response.json();
-
- if (data.success) {
- this.userLocations = data.locations;
- this.currentLocation = data.primary_location;
- this.locationHierarchy = data.hierarchy;
-
- this.updateLocationSelector();
- this.updateLocationDisplay();
- }
- } catch (error) {
- console.error('Fehler beim Laden der Standorte:', error);
- }
- }
-
- updateLocationSelector() {
- const selectors = document.querySelectorAll('.location-selector');
-
- selectors.forEach(selector => {
- selector.innerHTML = this.userLocations.map(location =>
- ``
- ).join('');
- });
- }
-
- updateLocationDisplay() {
- const displays = document.querySelectorAll('.current-location-display');
-
- displays.forEach(display => {
- if (this.currentLocation) {
- display.innerHTML = `
-
- ${this.currentLocation.name}
- ${this.currentLocation.type}
- ${this.currentLocation.city ? `${this.currentLocation.city}` : ''}
-
- `;
- } else {
- display.innerHTML = 'Kein Standort ausgewählt';
- }
- });
- }
-
- async switchLocation(locationId) {
- try {
- const response = await fetch('/api/locations/switch', {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ location_id: locationId })
- });
-
- const result = await response.json();
-
- if (result.success) {
- this.currentLocation = this.userLocations.find(loc => loc.id === locationId);
- this.updateLocationDisplay();
-
- // Seite neu laden um location-spezifische Daten zu aktualisieren
- window.location.reload();
- } else {
- this.showNotification('Fehler beim Wechseln des Standorts', 'error');
- }
- } catch (error) {
- console.error('Standort-Wechsel fehlgeschlagen:', error);
- }
- }
-
- showLocationManager() {
- const modal = document.createElement('div');
- modal.className = 'location-manager-modal';
- modal.innerHTML = `
-
-
-
-
- ${this.renderLocationList()}
-
-
-
-
-
-
- `;
-
- document.body.appendChild(modal);
-
- // Event handlers
- modal.querySelector('.close-modal').onclick = () => modal.remove();
- modal.onclick = (e) => {
- if (e.target === modal) modal.remove();
- };
- }
-
- renderLocationList() {
- return this.userLocations.map(location => `
-
-
-
${location.name} (${location.code})
-
Typ: ${location.type}
-
Adresse: ${location.address || 'Nicht angegeben'}
-
Stadt: ${location.city || 'Nicht angegeben'}
-
-
-
-
-
-
- `).join('');
- }
-
- showLocationHierarchy() {
- const modal = document.createElement('div');
- modal.className = 'hierarchy-modal';
- modal.innerHTML = `
-
-
-
-
- ${this.renderHierarchyTree(this.locationHierarchy.locations || [])}
-
-
-
- `;
-
- document.body.appendChild(modal);
-
- modal.querySelector('.close-modal').onclick = () => modal.remove();
- modal.onclick = (e) => {
- if (e.target === modal) modal.remove();
- };
- }
-
- renderHierarchyTree(locations, level = 0) {
- return locations.map(location => `
-
-
- ${this.getLocationTypeIcon(location.type)}
- ${location.name}
- (${location.code})
- ${location.resource_count.printers || 0} Drucker
-
- ${location.children && location.children.length > 0 ?
- this.renderHierarchyTree(location.children, level + 1) : ''}
-
- `).join('');
- }
-
- getLocationTypeIcon(type) {
- const icons = {
- 'headquarters': '🏢',
- 'branch': '🏪',
- 'department': '🏬',
- 'floor': '🏢',
- 'room': '🚪',
- 'area': '📍'
- };
- return icons[type] || '📍';
- }
-
- showNotification(message, type = 'info') {
- const notification = document.createElement('div');
- notification.className = `notification notification-${type}`;
- notification.textContent = message;
-
- document.body.appendChild(notification);
-
- setTimeout(() => {
- notification.remove();
- }, 3000);
- }
- }
-
- // Initialize when DOM is ready
- document.addEventListener('DOMContentLoaded', function() {
- window.multiLocationManager = new MultiLocationManager();
- });
- """
\ No newline at end of file
diff --git a/backend/utils/offline_config.py b/backend/utils/offline_config.py
deleted file mode 100644
index 2866f0c93..000000000
--- a/backend/utils/offline_config.py
+++ /dev/null
@@ -1,229 +0,0 @@
-"""
-Offline-Konfiguration für MYP-System
-===================================
-
-Konfiguriert das System für den Offline-Betrieb ohne Internetverbindung.
-Stellt Fallback-Lösungen für internetabhängige Funktionen bereit.
-"""
-
-import os
-import logging
-from typing import Dict, List, Optional
-
-from utils.logging_config import get_logger
-
-logger = get_logger("offline_config")
-
-# ===== OFFLINE-MODUS KONFIGURATION =====
-OFFLINE_MODE = True # Produktionseinstellung - System läuft offline
-
-# ===== OFFLINE-KOMPATIBILITÄT PRÜFUNGEN =====
-
-def check_internet_connectivity() -> bool:
- """
- Prüft ob eine Internetverbindung verfügbar ist.
- Im Offline-Modus gibt immer False zurück.
-
- Returns:
- bool: True wenn Internet verfügbar, False im Offline-Modus
- """
- if OFFLINE_MODE:
- return False
-
- # In einem echten Online-Modus könnte hier eine echte Prüfung stehen
- try:
- import socket
- socket.create_connection(("8.8.8.8", 53), timeout=3)
- return True
- except OSError:
- return False
-
-def is_oauth_available() -> bool:
- """
- Prüft ob OAuth-Funktionalität verfügbar ist.
-
- Returns:
- bool: False im Offline-Modus
- """
- return not OFFLINE_MODE and check_internet_connectivity()
-
-def is_email_sending_available() -> bool:
- """
- Prüft ob E-Mail-Versand verfügbar ist.
-
- Returns:
- bool: False im Offline-Modus (nur Logging)
- """
- return not OFFLINE_MODE and check_internet_connectivity()
-
-def is_cdn_available() -> bool:
- """
- Prüft ob CDN-Links verfügbar sind.
-
- Returns:
- bool: False im Offline-Modus (lokale Fallbacks verwenden)
- """
- return not OFFLINE_MODE and check_internet_connectivity()
-
-# ===== CDN FALLBACK-KONFIGURATION =====
-
-CDN_FALLBACKS = {
- # Chart.js CDN -> Lokale Datei
- "https://cdnjs.cloudflare.com/ajax/libs/Chart.js/4.4.0/chart.min.js": "/static/js/charts/chart.min.js",
- "https://cdn.jsdelivr.net/npm/chart.js": "/static/js/charts/chart.min.js",
-
- # FontAwesome (bereits lokal verfügbar)
- "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css": "/static/fontawesome/css/all.min.css",
-
- # Weitere CDN-Fallbacks können hier hinzugefügt werden
-}
-
-def get_local_asset_path(cdn_url: str) -> Optional[str]:
- """
- Gibt den lokalen Pfad für eine CDN-URL zurück.
-
- Args:
- cdn_url: URL des CDN-Assets
-
- Returns:
- str: Lokaler Pfad oder None wenn kein Fallback verfügbar
- """
- return CDN_FALLBACKS.get(cdn_url)
-
-def replace_cdn_links(html_content: str) -> str:
- """
- Ersetzt CDN-Links durch lokale Fallbacks im HTML-Inhalt.
-
- Args:
- html_content: HTML-Inhalt mit CDN-Links
-
- Returns:
- str: HTML-Inhalt mit lokalen Links
- """
- if not OFFLINE_MODE:
- return html_content
-
- modified_content = html_content
-
- for cdn_url, local_path in CDN_FALLBACKS.items():
- if cdn_url in modified_content:
- modified_content = modified_content.replace(cdn_url, local_path)
- logger.info(f"🔄 CDN-Link ersetzt: {cdn_url} -> {local_path}")
-
- return modified_content
-
-# ===== SECURITY POLICY ANPASSUNGEN =====
-
-def get_offline_csp_policy() -> Dict[str, List[str]]:
- """
- Gibt CSP-Policy für Offline-Modus zurück.
- Entfernt externe CDN-Domains aus der Policy.
-
- Returns:
- Dict: CSP-Policy ohne externe Domains
- """
- if not OFFLINE_MODE:
- # Online-Modus: Originale Policy mit CDNs
- return {
- "script-src": [
- "'self'",
- "'unsafe-inline'",
- "'unsafe-eval'",
- "https://cdn.jsdelivr.net",
- "https://unpkg.com",
- "https://cdnjs.cloudflare.com"
- ],
- "style-src": [
- "'self'",
- "'unsafe-inline'",
- "https://fonts.googleapis.com",
- "https://cdn.jsdelivr.net"
- ],
- "font-src": [
- "'self'",
- "https://fonts.gstatic.com"
- ]
- }
- else:
- # Offline-Modus: Nur lokale Ressourcen
- return {
- "script-src": [
- "'self'",
- "'unsafe-inline'",
- "'unsafe-eval'"
- ],
- "style-src": [
- "'self'",
- "'unsafe-inline'"
- ],
- "font-src": [
- "'self'"
- ],
- "img-src": [
- "'self'",
- "data:"
- ]
- }
-
-# ===== OFFLINE-MODUS HILFSFUNKTIONEN =====
-
-def log_offline_mode_status():
- """Loggt den aktuellen Offline-Modus Status."""
- if OFFLINE_MODE:
- logger.info("🌐 System läuft im OFFLINE-MODUS")
- logger.info(" ❌ OAuth deaktiviert")
- logger.info(" ❌ E-Mail-Versand deaktiviert (nur Logging)")
- logger.info(" ❌ CDN-Links werden durch lokale Dateien ersetzt")
- logger.info(" ✅ Alle Kernfunktionen verfügbar")
- else:
- logger.info("🌐 System läuft im ONLINE-MODUS")
- logger.info(" ✅ OAuth verfügbar")
- logger.info(" ✅ E-Mail-Versand verfügbar")
- logger.info(" ✅ CDN-Links verfügbar")
-
-def get_feature_availability() -> Dict[str, bool]:
- """
- Gibt die Verfügbarkeit verschiedener Features zurück.
-
- Returns:
- Dict: Feature-Verfügbarkeit
- """
- return {
- "oauth": is_oauth_available(),
- "email_sending": is_email_sending_available(),
- "cdn_resources": is_cdn_available(),
- "offline_mode": OFFLINE_MODE,
- "core_functionality": True, # Kernfunktionen immer verfügbar
- "printer_control": True, # Drucker-Steuerung immer verfügbar
- "job_management": True, # Job-Verwaltung immer verfügbar
- "user_management": True # Benutzer-Verwaltung immer verfügbar
- }
-
-# ===== STARTUP-FUNKTIONEN =====
-
-def initialize_offline_mode():
- """Initialisiert den Offline-Modus beim System-Start."""
- log_offline_mode_status()
-
- if OFFLINE_MODE:
- logger.info("🔧 Initialisiere Offline-Modus-Anpassungen...")
-
- # Prüfe ob lokale Chart.js verfügbar ist
- chart_js_path = "static/js/charts/chart.min.js"
- if not os.path.exists(chart_js_path):
- logger.warning(f"⚠️ Lokale Chart.js nicht gefunden: {chart_js_path}")
- logger.warning(" Diagramme könnten nicht funktionieren")
- else:
- logger.info(f"✅ Lokale Chart.js gefunden: {chart_js_path}")
-
- # Prüfe weitere lokale Assets
- fontawesome_path = "static/fontawesome/css/all.min.css"
- if not os.path.exists(fontawesome_path):
- logger.warning(f"⚠️ Lokale FontAwesome nicht gefunden: {fontawesome_path}")
- else:
- logger.info(f"✅ Lokale FontAwesome gefunden: {fontawesome_path}")
-
- logger.info("✅ Offline-Modus erfolgreich initialisiert")
-
-# Beim Import automatisch initialisieren
-initialize_offline_mode()
\ No newline at end of file
diff --git a/backend/utils/test_button_functionality.py b/backend/utils/test_button_functionality.py
deleted file mode 100644
index 4b57d6750..000000000
--- a/backend/utils/test_button_functionality.py
+++ /dev/null
@@ -1,243 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Test-Skript für Button-Funktionalitäten
-Testet alle Buttons aus dem Selenium-Test auf echte Funktionalität
-"""
-
-import requests
-import time
-import json
-from selenium import webdriver
-from selenium.webdriver.common.by import By
-from selenium.webdriver.support.ui import WebDriverWait
-from selenium.webdriver.support import expected_conditions as EC
-
-class ButtonFunctionalityTester:
- def __init__(self, base_url="http://127.0.0.1:5000"):
- self.base_url = base_url
- self.session = requests.Session()
- self.driver = None
-
- def setup_driver(self):
- """Selenium WebDriver einrichten"""
- try:
- self.driver = webdriver.Chrome()
- self.driver.set_window_size(1696, 1066)
- print("✅ WebDriver erfolgreich initialisiert")
- except Exception as e:
- print(f"❌ Fehler beim Initialisieren des WebDrivers: {e}")
-
- def login(self, username="admin", password="admin"):
- """Anmeldung durchführen"""
- try:
- self.driver.get(f"{self.base_url}/auth/login")
-
- # Warten bis Login-Formular geladen ist
- username_field = WebDriverWait(self.driver, 10).until(
- EC.presence_of_element_located((By.NAME, "username"))
- )
-
- username_field.send_keys(username)
- self.driver.find_element(By.NAME, "password").send_keys(password)
- self.driver.find_element(By.XPATH, "//button[@type='submit']").click()
-
- # Warten bis Dashboard geladen ist
- WebDriverWait(self.driver, 10).until(
- EC.url_contains("/dashboard")
- )
-
- print("✅ Erfolgreich angemeldet")
- return True
- except Exception as e:
- print(f"❌ Fehler bei der Anmeldung: {e}")
- return False
-
- def test_button_functionality(self, button_selector, button_name, expected_action=""):
- """Teste einen einzelnen Button auf Funktionalität"""
- try:
- print(f"\n🔍 Teste Button: {button_name} ({button_selector})")
-
- # Button finden
- button = WebDriverWait(self.driver, 5).until(
- EC.element_to_be_clickable((By.CSS_SELECTOR, button_selector))
- )
-
- # Ursprünglichen Zustand erfassen
- original_url = self.driver.current_url
- original_text = button.text if button.text else "Kein Text"
-
- print(f" 📍 Button gefunden: '{original_text}'")
-
- # Button klicken
- button.click()
- print(f" 👆 Button geklickt")
-
- # Kurz warten für Reaktion
- time.sleep(1)
-
- # Reaktion prüfen
- reactions = []
-
- # URL-Änderung prüfen
- if self.driver.current_url != original_url:
- reactions.append(f"URL-Änderung: {self.driver.current_url}")
-
- # Modal-Fenster prüfen
- try:
- modal = self.driver.find_element(By.CSS_SELECTOR, ".fixed.inset-0")
- if modal.is_displayed():
- reactions.append("Modal-Fenster geöffnet")
- except:
- pass
-
- # Loading-Spinner prüfen
- try:
- spinner = self.driver.find_element(By.CSS_SELECTOR, ".animate-spin")
- if spinner.is_displayed():
- reactions.append("Loading-Animation aktiv")
- except:
- pass
-
- # Toast-Benachrichtigung prüfen
- try:
- toast = self.driver.find_element(By.CSS_SELECTOR, ".fixed.top-4.right-4")
- if toast.is_displayed():
- reactions.append(f"Toast-Nachricht: {toast.text}")
- except:
- pass
-
- # Button-Text-Änderung prüfen
- new_text = button.text if button.text else "Kein Text"
- if new_text != original_text:
- reactions.append(f"Text-Änderung: '{original_text}' → '{new_text}'")
-
- # Ergebnis ausgeben
- if reactions:
- print(f" ✅ Reaktionen gefunden:")
- for reaction in reactions:
- print(f" - {reaction}")
- return True
- else:
- print(f" ⚠️ Keine sichtbare Reaktion erkannt")
- return False
-
- except Exception as e:
- print(f" ❌ Fehler beim Testen: {e}")
- return False
-
- def test_all_buttons(self):
- """Teste alle Buttons aus dem Selenium-Test"""
- if not self.setup_driver():
- return
-
- if not self.login():
- return
-
- # Button-Test-Plan basierend auf Selenium-Test
- button_tests = [
- # Dashboard-Seite (Startseite)
- {
- "page": f"{self.base_url}/",
- "buttons": [
- (".mb-8 > .btn-primary", "Haupt-CTA Button"),
- (".btn-primary > span", "CTA Button Span")
- ]
- },
-
- # Dashboard-Seite
- {
- "page": f"{self.base_url}/dashboard",
- "buttons": [
- ("#refreshDashboard > span", "Dashboard Aktualisieren")
- ]
- },
-
- # Drucker-Seite
- {
- "page": f"{self.base_url}/printers",
- "buttons": [
- ("#refresh-button > span", "Drucker Aktualisieren"),
- ("#maintenance-toggle > span", "Wartungsmodus Toggle")
- ]
- },
-
- # Jobs-Seite
- {
- "page": f"{self.base_url}/jobs",
- "buttons": [
- ("#batch-toggle > span", "Mehrfachauswahl Toggle"),
- ("#refresh-button > span", "Jobs Aktualisieren")
- ]
- },
-
- # Admin-Seite
- {
- "page": f"{self.base_url}/admin",
- "buttons": [
- ("#analytics-btn", "Analytics Button"),
- ("#maintenance-btn", "Wartung Button"),
- ("#system-status-btn", "System Status Button"),
- ("#add-user-btn", "Benutzer hinzufügen")
- ]
- }
- ]
-
- results = {"total": 0, "working": 0, "broken": 0}
-
- print("🚀 Starte umfassenden Button-Funktionalitäts-Test...\n")
-
- for test_group in button_tests:
- print(f"📄 Navigiere zu Seite: {test_group['page']}")
-
- try:
- self.driver.get(test_group["page"])
- time.sleep(2) # Seite laden lassen
-
- for selector, name in test_group["buttons"]:
- results["total"] += 1
- if self.test_button_functionality(selector, name):
- results["working"] += 1
- else:
- results["broken"] += 1
-
- except Exception as e:
- print(f"❌ Fehler beim Laden der Seite {test_group['page']}: {e}")
-
- # Zusammenfassung
- print(f"\n{'='*60}")
- print(f"📊 TEST-ZUSAMMENFASSUNG")
- print(f"{'='*60}")
- print(f"Getestete Buttons gesamt: {results['total']}")
- print(f"✅ Funktional: {results['working']}")
- print(f"❌ Nicht funktional: {results['broken']}")
-
- success_rate = (results['working'] / results['total']) * 100 if results['total'] > 0 else 0
- print(f"📈 Erfolgsrate: {success_rate:.1f}%")
-
- if success_rate >= 90:
- print("🎉 AUSGEZEICHNET! Fast alle Buttons funktionieren korrekt.")
- elif success_rate >= 75:
- print("✅ GUT! Die meisten Buttons funktionieren korrekt.")
- elif success_rate >= 50:
- print("⚠️ BEFRIEDIGEND! Einige Buttons benötigen noch Verbesserungen.")
- else:
- print("❌ VERBESSERUNG ERFORDERLICH! Viele Buttons haben keine Funktionalität.")
-
- def cleanup(self):
- """Aufräumen"""
- if self.driver:
- self.driver.quit()
- print("🧹 WebDriver beendet")
-
-def main():
- """Hauptfunktion"""
- tester = ButtonFunctionalityTester()
-
- try:
- tester.test_all_buttons()
- finally:
- tester.cleanup()
-
-if __name__ == "__main__":
- main()
\ No newline at end of file
diff --git a/backend/utils/test_p110.py b/backend/utils/test_p110.py
deleted file mode 100644
index 06fc27980..000000000
--- a/backend/utils/test_p110.py
+++ /dev/null
@@ -1,175 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-"""
-P110-TAPO-TEST - Speziell für TP-Link Tapo P110-Steckdosen
-Testet verschiedene Versionen des PyP100-Moduls
-"""
-
-import sys
-import time
-import socket
-import subprocess
-from datetime import datetime
-
-# Anmeldedaten
-TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
-TAPO_PASSWORD = "744563017196"
-
-# Standard-IP-Adressen zum Testen (anpassen an tatsächliche IPs)
-TEST_IPS = [
- "192.168.0.103", # Diese IPs waren erreichbar im vorherigen Test
- "192.168.0.104"
-]
-
-def log(message):
- """Logge eine Nachricht mit Zeitstempel"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print(f"[{timestamp}] {message}")
-
-def check_connection(ip, port=80, timeout=1):
- """Prüft eine TCP-Verbindung"""
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(timeout)
- result = sock.connect_ex((ip, port))
- sock.close()
- return result == 0
- except:
- return False
-
-def install_package(package):
- """Installiert ein Python-Paket"""
- try:
- log(f"Installiere {package}...")
- subprocess.run([sys.executable, "-m", "pip", "install", package, "--force-reinstall"], check=True)
- log(f"✅ {package} erfolgreich installiert")
- return True
- except Exception as e:
- log(f"❌ Fehler bei Installation von {package}: {e}")
- return False
-
-def test_p110_connection():
- """Testet verschiedene Möglichkeiten, um mit P110-Steckdosen zu kommunizieren"""
-
- log("🚀 TAPO P110 TEST - STARTER")
- log(f"👤 Benutzername: {TAPO_USERNAME}")
- log(f"🔑 Passwort: {TAPO_PASSWORD}")
-
- # Verfügbare Module testen
- log("\n1️⃣ SCHRITT 1: Teste verfügbare Module")
-
- try:
- from PyP100 import PyP110
- log("✅ PyP100 Modul gefunden")
- except ImportError:
- log("❌ PyP100 Modul nicht gefunden")
- install_package("PyP100==0.1.2")
-
- try:
- from PyP100 import PyP110
- log("✅ PyP100 Modul jetzt installiert")
- except ImportError:
- log("❌ Konnte PyP100 nicht importieren")
- return
-
- # Erreichbare Steckdosen finden
- log("\n2️⃣ SCHRITT 2: Suche erreichbare IPs")
-
- available_ips = []
- for ip in TEST_IPS:
- if check_connection(ip):
- log(f"✅ IP {ip} ist erreichbar")
- available_ips.append(ip)
- else:
- log(f"❌ IP {ip} nicht erreichbar")
-
- if not available_ips:
- log("❌ Keine erreichbaren IPs gefunden!")
- return
-
- # P110-Verbindung testen
- log("\n3️⃣ SCHRITT 3: Teste PyP100 Bibliothek")
-
- for ip in available_ips:
- try:
- log(f"🔄 Verbinde zu Steckdose {ip} mit PyP100.PyP110...")
-
- # Neue Instanz erstellen
- from PyP100 import PyP110
- p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD)
-
- # Handshake und Login
- log(" Handshake...")
- p110.handshake()
- log(" Login...")
- p110.login()
-
- # Geräteinformationen abrufen
- log(" Geräteinformationen abrufen...")
- device_info = p110.getDeviceInfo()
-
- # Erfolg!
- log(f"✅ ERFOLG! Steckdose {ip} gefunden")
- log(f" Name: {device_info.get('nickname', 'Unbekannt')}")
- log(f" Status: {'Eingeschaltet' if device_info.get('device_on', False) else 'Ausgeschaltet'}")
-
- # Ein-/Ausschalten testen
- if "--toggle" in sys.argv:
- current_state = device_info.get('device_on', False)
-
- if current_state:
- log(" Schalte AUS...")
- p110.turnOff()
- else:
- log(" Schalte EIN...")
- p110.turnOn()
-
- time.sleep(1)
-
- # Status prüfen
- device_info = p110.getDeviceInfo()
- new_state = device_info.get('device_on', False)
- log(f" Neuer Status: {'Eingeschaltet' if new_state else 'Ausgeschaltet'}")
-
- return True
-
- except Exception as e:
- log(f"❌ Fehler bei Verbindung zu {ip}: {e}")
-
- # Alternative Bibliothek testen
- log("\n4️⃣ SCHRITT 4: Teste PyP100 mit alternativer Version")
-
- if install_package("pytapo==1.1.2"):
- try:
- import pytapo
- from pytapo.tapo import Tapo
-
- for ip in available_ips:
- try:
- log(f"🔄 Verbinde zu Steckdose {ip} mit pytapo...")
-
- # Neue Verbindung
- tapo = Tapo(ip, TAPO_USERNAME, TAPO_PASSWORD)
-
- # Geräteinformationen abrufen
- device_info = tapo.get_device_info()
-
- # Erfolg!
- log(f"✅ ERFOLG mit pytapo! Steckdose {ip} gefunden")
- log(f" Name: {device_info.get('nickname', 'Unbekannt')}")
-
- return True
-
- except Exception as e:
- log(f"❌ Fehler bei pytapo-Verbindung zu {ip}: {e}")
- except Exception as e:
- log(f"❌ Fehler beim Import von pytapo: {e}")
-
- log("\n❌ Keine funktionierenden Tapo-Steckdosen gefunden!")
- log("Bitte überprüfen Sie die Anmeldedaten und IP-Adressen")
-
-if __name__ == "__main__":
- print("\n======= TAPO P110 TEST =======\n")
- test_p110_connection()
- print("\n======= TEST BEENDET =======\n")
\ No newline at end of file
diff --git a/backend/utils/test_tapo_direkt.py b/backend/utils/test_tapo_direkt.py
deleted file mode 100644
index 2aadf1242..000000000
--- a/backend/utils/test_tapo_direkt.py
+++ /dev/null
@@ -1,212 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-"""
-DIREKT-TEST für TP-Link Tapo P110-Steckdosen
-Umgeht Ping-Befehle und testet direkte TCP-Verbindung
-"""
-
-import sys
-import os
-import socket
-import time
-from datetime import datetime
-
-# Anmeldedaten für Tapo-Steckdosen
-TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
-TAPO_PASSWORD = "744563017196A"
-
-# Standard-IPs für Tapo-Steckdosen
-# (falls nicht verfügbar, passen Sie diese an die tatsächlichen IPs in Ihrem Netzwerk an)
-TAPO_IPS = [
- # Typische IP-Bereiche
- "192.168.1.100",
- "192.168.1.101",
- "192.168.1.102",
- "192.168.1.103",
- "192.168.1.104",
- "192.168.1.105",
- "192.168.0.100",
- "192.168.0.101",
- "192.168.0.102",
- "192.168.0.103",
- "192.168.0.104",
- "192.168.0.105",
-
- # Mercedes-Benz Netzwerk spezifisch
- "10.0.0.100",
- "10.0.0.101",
- "10.0.0.102",
- "10.0.0.103",
- "10.0.0.104",
- "10.0.0.105",
-
- # Zusätzliche mögliche IPs
- "192.168.178.100",
- "192.168.178.101",
- "192.168.178.102",
- "192.168.178.103",
- "192.168.178.104",
- "192.168.178.105",
-]
-
-def log_message(message, level="INFO"):
- """Logge eine Nachricht mit Zeitstempel"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print(f"[{timestamp}] [{level}] {message}")
-
-def check_tcp_connection(host, port=80, timeout=1):
- """
- Prüft ob eine TCP-Verbindung zu einem Host und Port möglich ist.
- Vermeidet Ping und charmap-Probleme.
-
- Args:
- host: Hostname oder IP-Adresse
- port: TCP-Port (Standard: 80)
- timeout: Timeout in Sekunden
-
- Returns:
- bool: True wenn Verbindung erfolgreich
- """
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(timeout)
- result = sock.connect_ex((host, port))
- sock.close()
- return result == 0
- except:
- return False
-
-def test_tapo_connection():
- """
- Testet die Verbindung zu TP-Link Tapo P110-Steckdosen.
- """
- log_message("🔄 Überprüfe ob PyP100-Modul installiert ist...")
-
- try:
- from PyP100 import PyP110
- log_message("✅ PyP100-Modul erfolgreich importiert")
- except ImportError:
- log_message("❌ PyP100-Modul nicht installiert", "ERROR")
- log_message(" Installiere jetzt mit: pip install PyP100==0.1.2")
-
- try:
- import subprocess
- subprocess.run([sys.executable, "-m", "pip", "install", "PyP100==0.1.2"], check=True)
- log_message("✅ PyP100-Modul erfolgreich installiert")
-
- # Erneut importieren
- from PyP100 import PyP110
- except Exception as e:
- log_message(f"❌ Fehler bei Installation von PyP100: {str(e)}", "ERROR")
- log_message(" Bitte installieren Sie manuell: pip install PyP100==0.1.2")
- return
-
- log_message("🔍 Starte Test für Tapo-Steckdosen...")
- log_message(f"🔐 Anmeldedaten: {TAPO_USERNAME} / {TAPO_PASSWORD}")
-
- successful_connections = 0
- found_ips = []
-
- for ip in TAPO_IPS:
- log_message(f"🔄 Teste IP-Adresse: {ip}")
-
- # TCP-Verbindungstest für Grundkonnektivität (Port 80 für HTTP)
- conn_success = check_tcp_connection(ip, port=80)
-
- if not conn_success:
- # Alternativ Port 443 testen für HTTPS
- conn_success = check_tcp_connection(ip, port=443)
-
- if not conn_success:
- log_message(f" ❌ IP {ip} nicht erreichbar (Verbindung fehlgeschlagen)")
- continue
-
- log_message(f" ✅ IP {ip} ist erreichbar (TCP-Verbindung erfolgreich)")
-
- # Tapo-Verbindung testen
- try:
- log_message(f" 🔄 Verbinde zu Tapo-Steckdose {ip}...")
- p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD)
- p110.handshake() # Authentifizierung
- p110.login() # Login
-
- # Geräteinformationen abrufen
- device_info = p110.getDeviceInfo()
-
- # Status abrufen
- is_on = device_info.get('device_on', False)
- nickname = device_info.get('nickname', "Unbekannt")
-
- log_message(f" ✅ Verbindung zu Tapo-Steckdose '{nickname}' ({ip}) erfolgreich")
- log_message(f" 📱 Gerätename: {nickname}")
- log_message(f" ⚡ Status: {'Eingeschaltet' if is_on else 'Ausgeschaltet'}")
-
- if 'on_time' in device_info:
- on_time = device_info.get('on_time', 0)
- hours, minutes = divmod(on_time // 60, 60)
- log_message(f" ⏱️ Betriebszeit: {hours}h {minutes}m")
-
- successful_connections += 1
- found_ips.append(ip)
-
- # Steckdose testen: EIN/AUS
- if len(sys.argv) > 1 and sys.argv[1] == '--toggle':
- if is_on:
- log_message(f" 🔄 Schalte Steckdose {nickname} AUS...")
- p110.turnOff()
- log_message(f" ✅ Steckdose ausgeschaltet")
- else:
- log_message(f" 🔄 Schalte Steckdose {nickname} EIN...")
- p110.turnOn()
- log_message(f" ✅ Steckdose eingeschaltet")
-
- # Kurze Pause
- time.sleep(1)
-
- # Status erneut abrufen
- device_info = p110.getDeviceInfo()
- is_on = device_info.get('device_on', False)
- log_message(f" ⚡ Neuer Status: {'Eingeschaltet' if is_on else 'Ausgeschaltet'}")
-
- except Exception as e:
- log_message(f" ❌ Verbindung zu Tapo-Steckdose {ip} fehlgeschlagen: {str(e)}", "ERROR")
-
- # Zusammenfassung
- log_message("\n📊 Zusammenfassung:")
- log_message(f" Getestete IPs: {len(TAPO_IPS)}")
- log_message(f" Gefundene Tapo-Steckdosen: {successful_connections}")
-
- if successful_connections > 0:
- log_message("✅ Tapo-Steckdosen erfolgreich gefunden und getestet!")
- log_message(f"📝 Gefundene IPs: {found_ips}")
-
- # Ausgabe für Konfiguration
- log_message("\n🔧 Konfiguration für settings.py:")
- log_message(f"""
-# TP-Link Tapo Standard-Anmeldedaten
-TAPO_USERNAME = "{TAPO_USERNAME}"
-TAPO_PASSWORD = "{TAPO_PASSWORD}"
-
-# Automatische Steckdosen-Erkennung aktivieren
-TAPO_AUTO_DISCOVERY = True
-
-# Standard-Steckdosen-IPs
-DEFAULT_TAPO_IPS = {found_ips}
-""")
- else:
- log_message("❌ Keine Tapo-Steckdosen gefunden!", "ERROR")
- log_message(" Bitte überprüfen Sie die IP-Adressen und Anmeldedaten")
-
- # Fehlerbehebungs-Tipps
- log_message("\n🔧 Fehlerbehebungs-Tipps:")
- log_message("1. Stellen Sie sicher, dass die Steckdosen mit dem WLAN verbunden sind")
- log_message("2. Prüfen Sie die IP-Adressen in der Tapo-App oder im Router")
- log_message("3. Stellen Sie sicher, dass die Anmeldedaten korrekt sind")
- log_message("4. Prüfen Sie ob die Steckdosen über die Tapo-App erreichbar sind")
- log_message("5. Führen Sie einen Neustart der Steckdosen durch (aus- und wieder einstecken)")
-
-if __name__ == "__main__":
- print("\n====== TAPO P110 DIREKT-TEST (OHNE PING) ======\n")
- test_tapo_connection()
- print("\n====== TEST ABGESCHLOSSEN ======\n")
\ No newline at end of file
diff --git a/backend/utils/test_tapo_sofort.py b/backend/utils/test_tapo_sofort.py
deleted file mode 100644
index c8030a457..000000000
--- a/backend/utils/test_tapo_sofort.py
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-"""
-SOFORT-TEST für TP-Link Tapo P110-Steckdosen
-Nutzt direkt PyP100 mit hardkodierten Anmeldedaten
-"""
-
-import os
-import sys
-import time
-from datetime import datetime
-
-# TAPO Anmeldedaten direkt hardkodiert (wie in den funktionierenden Versionen)
-os.environ["TAPO_USERNAME"] = "till.tomczak@mercedes-benz.com"
-os.environ["TAPO_PASSWORD"] = "744563017196A" # Das 'A' am Ende ist wichtig
-
-# IPs der Steckdosen
-TAPO_IPS = [
- "192.168.0.103",
- "192.168.0.104",
- "192.168.0.100",
- "192.168.0.101",
- "192.168.0.102"
-]
-
-def log(msg):
- """Protokolliert eine Nachricht mit Zeitstempel"""
- timestamp = datetime.now().strftime("%H:%M:%S")
- print(f"[{timestamp}] {msg}")
-
-def test_connection():
- """Teste Verbindung zu den Steckdosen"""
- log("🔄 Teste PyP100-Import...")
-
- try:
- from PyP100 import PyP100
- log("✅ PyP100-Modul erfolgreich importiert")
- except ImportError:
- log("❌ PyP100-Modul nicht gefunden. Installiere es...")
- try:
- import subprocess
- subprocess.run([sys.executable, "-m", "pip", "install", "PyP100==0.0.12"], check=True)
- from PyP100 import PyP100
- log("✅ PyP100-Modul installiert")
- except Exception as e:
- log(f"❌ Fehler bei Installation: {str(e)}")
- return False
-
- # Anmeldedaten aus Umgebungsvariablen lesen
- username = os.environ.get("TAPO_USERNAME")
- password = os.environ.get("TAPO_PASSWORD")
-
- log(f"👤 Benutzername: {username}")
- log(f"🔑 Passwort: {password}")
-
- # Teste jede IP
- success = False
-
- for ip in TAPO_IPS:
- log(f"🔄 Teste Steckdose mit IP: {ip}")
-
- try:
- # Wichtig: Verwende PyP100 (nicht PyP110) wie in den funktionierenden Versionen
- p100 = PyP100.P100(ip, username, password)
-
- # Handshake und Login
- log(" 🔄 Handshake...")
- p100.handshake()
-
- log(" 🔄 Login...")
- p100.login()
-
- # Status abfragen
- log(" 🔄 Status abfragen...")
- device_info = p100.getDeviceInfo()
-
- # Erfolg!
- state = "Eingeschaltet" if device_info.get("device_on", False) else "Ausgeschaltet"
- log(f"✅ ERFOLG! Steckdose {ip} erfolgreich verbunden")
- log(f" 📱 Name: {device_info.get('nickname', 'Unbekannt')}")
- log(f" ⚡ Status: {state}")
-
- # Steckdose ein-/ausschalten wenn gewünscht
- if "--toggle" in sys.argv:
- if device_info.get("device_on", False):
- log(" 🔄 Schalte Steckdose AUS...")
- p100.turnOff()
- else:
- log(" 🔄 Schalte Steckdose EIN...")
- p100.turnOn()
-
- time.sleep(1)
-
- # Neuen Status abrufen
- device_info = p100.getDeviceInfo()
- state = "Eingeschaltet" if device_info.get("device_on", False) else "Ausgeschaltet"
- log(f" ⚡ Neuer Status: {state}")
-
- success = True
-
- # Konfiguration für settings.py ausgeben
- log("\n✅ KONFIGURATION FÜR SETTINGS.PY:")
- log(f"""
-# TP-Link Tapo Standard-Anmeldedaten
-TAPO_USERNAME = "{username}"
-TAPO_PASSWORD = "{password}"
-
-# Standard-Steckdosen-IPs
-DEFAULT_TAPO_IPS = ["{ip}"]
-""")
-
- # Nur die erste erfolgreiche Steckdose testen
- break
-
- except Exception as e:
- log(f"❌ Fehler bei Steckdose {ip}: {str(e)}")
-
- if not success:
- log("\n❌ Keine Tapo-Steckdose konnte verbunden werden!")
- log("Prüfen Sie folgende mögliche Ursachen:")
- log("1. Steckdosen sind nicht eingesteckt oder mit dem WLAN verbunden")
- log("2. IP-Adressen sind falsch")
- log("3. Anmeldedaten sind falsch (prüfen Sie das 'A' am Ende des Passworts)")
- log("4. Netzwerkprobleme verhindern den Zugriff")
-
- return success
-
-if __name__ == "__main__":
- print("\n====== TAPO P110 SOFORT-TEST ======\n")
- test_connection()
- print("\n====== TEST BEENDET ======\n")
\ No newline at end of file
diff --git a/backend/utils/update_requirements.py b/backend/utils/update_requirements.py
deleted file mode 100644
index 84d50ec1d..000000000
--- a/backend/utils/update_requirements.py
+++ /dev/null
@@ -1,295 +0,0 @@
-#!/usr/bin/env python3
-"""
-MYP Platform - Requirements Update Script
-Aktualisiert die Requirements basierend auf tatsächlich verwendeten Imports
-"""
-
-import os
-import sys
-import subprocess
-import ast
-import importlib.util
-from pathlib import Path
-from typing import Set, List, Dict
-
-def get_imports_from_file(file_path: Path) -> Set[str]:
- """Extrahiert alle Import-Statements aus einer Python-Datei."""
- imports = set()
-
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- content = f.read()
-
- tree = ast.parse(content)
-
- for node in ast.walk(tree):
- if isinstance(node, ast.Import):
- for alias in node.names:
- imports.add(alias.name.split('.')[0])
- elif isinstance(node, ast.ImportFrom):
- if node.module:
- imports.add(node.module.split('.')[0])
-
- except Exception as e:
- print(f"Fehler beim Parsen von {file_path}: {e}")
-
- return imports
-
-def get_all_imports(project_root: Path) -> Set[str]:
- """Sammelt alle Imports aus dem Projekt."""
- all_imports = set()
-
- # Wichtige Dateien analysieren
- important_files = [
- 'app.py',
- 'models.py',
- 'utils/rate_limiter.py',
- 'utils/job_scheduler.py',
- 'utils/queue_manager.py',
- 'utils/ssl_manager.py',
- 'utils/security.py',
- 'utils/permissions.py',
- 'utils/analytics.py',
- 'utils/template_helpers.py',
- 'utils/logging_config.py'
- ]
-
- for file_path in important_files:
- full_path = project_root / file_path
- if full_path.exists():
- imports = get_imports_from_file(full_path)
- all_imports.update(imports)
- print(f"✓ Analysiert: {file_path} ({len(imports)} Imports)")
-
- return all_imports
-
-def get_package_mapping() -> Dict[str, str]:
- """Mapping von Import-Namen zu PyPI-Paketnamen."""
- return {
- 'flask': 'Flask',
- 'flask_login': 'Flask-Login',
- 'flask_wtf': 'Flask-WTF',
- 'sqlalchemy': 'SQLAlchemy',
- 'werkzeug': 'Werkzeug',
- 'bcrypt': 'bcrypt',
- 'cryptography': 'cryptography',
- 'PyP100': 'PyP100',
- 'redis': 'redis',
- 'requests': 'requests',
- 'jinja2': 'Jinja2',
- 'markupsafe': 'MarkupSafe',
- 'itsdangerous': 'itsdangerous',
- 'psutil': 'psutil',
- 'click': 'click',
- 'blinker': 'blinker',
- 'pywin32': 'pywin32',
- 'pytest': 'pytest',
- 'gunicorn': 'gunicorn'
- }
-
-def get_current_versions() -> Dict[str, str]:
- """Holt die aktuell installierten Versionen."""
- versions = {}
-
- try:
- result = subprocess.run(['pip', 'list', '--format=freeze'],
- capture_output=True, text=True,
- encoding='utf-8', errors='replace')
-
- for line in result.stdout.strip().split('\n'):
- if '==' in line:
- package, version = line.split('==', 1)
- versions[package.lower()] = version
-
- except Exception as e:
- print(f"Fehler beim Abrufen der Versionen: {e}")
-
- return versions
-
-def check_package_availability(package: str, version: str = None) -> bool:
- """Prüft, ob ein Paket in der angegebenen Version verfügbar ist."""
- try:
- if version:
- cmd = ['pip', 'index', 'versions', package]
- else:
- cmd = ['pip', 'show', package]
-
- result = subprocess.run(cmd, capture_output=True, text=True,
- encoding='utf-8', errors='replace')
- return result.returncode == 0
-
- except Exception:
- return False
-
-def generate_requirements(imports: Set[str], versions: Dict[str, str]) -> List[str]:
- """Generiert die Requirements-Liste."""
- package_mapping = get_package_mapping()
- requirements = []
-
- # Standard-Bibliotheken, die nicht installiert werden müssen
- stdlib_modules = {
- 'os', 'sys', 'logging', 'atexit', 'datetime', 'time', 'subprocess',
- 'json', 'signal', 'threading', 'functools', 'typing', 'contextlib',
- 'secrets', 'hashlib', 'calendar', 'random', 'socket', 'ipaddress',
- 'enum', 'dataclasses', 'concurrent', 'collections'
- }
-
- # Lokale Module ausschließen
- local_modules = {
- 'models', 'utils', 'config', 'blueprints'
- }
-
- # Nur externe Pakete berücksichtigen
- external_imports = imports - stdlib_modules - local_modules
-
- for import_name in sorted(external_imports):
- package_name = package_mapping.get(import_name, import_name)
-
- # Version aus installierten Paketen holen
- version = versions.get(package_name.lower())
-
- if version and check_package_availability(package_name, version):
- if package_name == 'pywin32':
- requirements.append(f"{package_name}=={version}; sys_platform == \"win32\"")
- else:
- requirements.append(f"{package_name}=={version}")
- else:
- print(f"⚠️ Paket {package_name} nicht gefunden oder Version unbekannt")
-
- return requirements
-
-def write_requirements_file(requirements: List[str], output_file: Path):
- """Schreibt die Requirements in eine Datei."""
- header = """# MYP Platform - Python Dependencies
-# Basierend auf tatsächlich verwendeten Imports in app.py
-# Automatisch generiert am: {date}
-# Installiere mit: pip install -r requirements.txt
-
-# ===== CORE FLASK FRAMEWORK =====
-# Direkt in app.py verwendet
-{flask_requirements}
-
-# ===== DATENBANK =====
-# SQLAlchemy für Datenbankoperationen (models.py, app.py)
-{db_requirements}
-
-# ===== SICHERHEIT UND AUTHENTIFIZIERUNG =====
-# Werkzeug für Passwort-Hashing und Utilities (app.py)
-{security_requirements}
-
-# ===== SMART PLUG STEUERUNG =====
-# PyP100 für TP-Link Tapo Smart Plugs (utils/job_scheduler.py)
-{smartplug_requirements}
-
-# ===== RATE LIMITING UND CACHING =====
-# Redis für Rate Limiting (utils/rate_limiter.py) - optional
-{cache_requirements}
-
-# ===== HTTP REQUESTS =====
-# Requests für HTTP-Anfragen (utils/queue_manager.py, utils/debug_drucker_erkennung.py)
-{http_requirements}
-
-# ===== TEMPLATE ENGINE =====
-# Jinja2 und MarkupSafe (automatisch mit Flask installiert, aber explizit für utils/template_helpers.py)
-{template_requirements}
-
-# ===== SYSTEM MONITORING =====
-# psutil für System-Monitoring (utils/debug_utils.py, utils/debug_cli.py)
-{monitoring_requirements}
-
-# ===== ZUSÄTZLICHE CORE ABHÄNGIGKEITEN =====
-# Click für CLI-Kommandos (automatisch mit Flask)
-{core_requirements}
-
-# ===== WINDOWS-SPEZIFISCHE ABHÄNGIGKEITEN =====
-# Nur für Windows-Systeme erforderlich
-{windows_requirements}
-
-# ===== OPTIONAL: ENTWICKLUNG UND TESTING =====
-# Nur für Entwicklungsumgebung
-{dev_requirements}
-
-# ===== OPTIONAL: PRODUKTIONS-SERVER =====
-# Gunicorn für Produktionsumgebung
-{prod_requirements}
-"""
-
- # Requirements kategorisieren
- flask_reqs = [r for r in requirements if any(x in r.lower() for x in ['flask'])]
- db_reqs = [r for r in requirements if 'SQLAlchemy' in r]
- security_reqs = [r for r in requirements if any(x in r for x in ['Werkzeug', 'bcrypt', 'cryptography'])]
- smartplug_reqs = [r for r in requirements if 'PyP100' in r]
- cache_reqs = [r for r in requirements if 'redis' in r]
- http_reqs = [r for r in requirements if 'requests' in r]
- template_reqs = [r for r in requirements if any(x in r for x in ['Jinja2', 'MarkupSafe', 'itsdangerous'])]
- monitoring_reqs = [r for r in requirements if 'psutil' in r]
- core_reqs = [r for r in requirements if any(x in r for x in ['click', 'blinker'])]
- windows_reqs = [r for r in requirements if 'sys_platform' in r]
-
- from datetime import datetime
-
- content = header.format(
- date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- flask_requirements='\n'.join(flask_reqs) or '# Keine Flask-spezifischen Requirements',
- db_requirements='\n'.join(db_reqs) or '# Keine Datenbank-Requirements',
- security_requirements='\n'.join(security_reqs) or '# Keine Sicherheits-Requirements',
- smartplug_requirements='\n'.join(smartplug_reqs) or '# Keine Smart Plug Requirements',
- cache_requirements='\n'.join(cache_reqs) or '# Keine Cache-Requirements',
- http_requirements='\n'.join(http_reqs) or '# Keine HTTP-Requirements',
- template_requirements='\n'.join(template_reqs) or '# Keine Template-Requirements',
- monitoring_requirements='\n'.join(monitoring_reqs) or '# Keine Monitoring-Requirements',
- core_requirements='\n'.join(core_reqs) or '# Keine Core-Requirements',
- windows_requirements='\n'.join(windows_reqs) or '# Keine Windows-Requirements',
- dev_requirements='pytest==8.3.4; extra == "dev"\npytest-cov==6.0.0; extra == "dev"',
- prod_requirements='gunicorn==23.0.0; extra == "prod"'
- )
-
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write(content)
-
-def main():
- """Hauptfunktion."""
- print("🔄 MYP Platform Requirements Update")
- print("=" * 50)
-
- # Projekt-Root ermitteln
- project_root = Path(__file__).parent
-
- print(f"📁 Projekt-Verzeichnis: {project_root}")
-
- # Imports sammeln
- print("\n📋 Sammle Imports aus wichtigen Dateien...")
- imports = get_all_imports(project_root)
-
- print(f"\n📦 Gefundene externe Imports: {len(imports)}")
- for imp in sorted(imports):
- print(f" - {imp}")
-
- # Aktuelle Versionen abrufen
- print("\n🔍 Prüfe installierte Versionen...")
- versions = get_current_versions()
-
- # Requirements generieren
- print("\n⚙️ Generiere Requirements...")
- requirements = generate_requirements(imports, versions)
-
- # Requirements-Datei schreiben
- output_file = project_root / 'requirements.txt'
- write_requirements_file(requirements, output_file)
-
- print(f"\n✅ Requirements aktualisiert: {output_file}")
- print(f"📊 {len(requirements)} Pakete in requirements.txt")
-
- # Zusammenfassung
- print("\n📋 Generierte Requirements:")
- for req in requirements:
- print(f" - {req}")
-
- print("\n🎉 Requirements-Update abgeschlossen!")
- print("\nNächste Schritte:")
- print("1. pip install -r requirements.txt")
- print("2. Anwendung testen")
- print("3. requirements-dev.txt und requirements-prod.txt bei Bedarf anpassen")
-
-if __name__ == "__main__":
- main()
\ No newline at end of file