🆗 🚀 📚 Removed unused utility files for code optimization. 🎉🔧📚💄

This commit is contained in:
2025-06-11 14:10:01 +02:00
parent 744edb38b6
commit c4bd6ff4dc
12 changed files with 0 additions and 3529 deletions

View File

@ -1,178 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Skript zum Hinzufügen von Testdruckern zur Datenbank
"""
import sys
import os
from datetime import datetime
# Füge das Anwendungsverzeichnis zum Python-Pfad hinzu
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from models import get_db_session, Printer
def add_test_printers():
"""Fügt Testdrucker zur Datenbank hinzu"""
test_printers = [
{
"name": "Prusa i3 MK3S+",
"model": "Prusa i3 MK3S+",
"location": "Labor A - Arbeitsplatz 1",
"mac_address": "AA:BB:CC:DD:EE:01",
"plug_ip": "192.168.1.101",
"status": "available",
"active": True
},
{
"name": "Ender 3 V2",
"model": "Creality Ender 3 V2",
"location": "Labor A - Arbeitsplatz 2",
"mac_address": "AA:BB:CC:DD:EE:02",
"plug_ip": "192.168.1.102",
"status": "available",
"active": True
},
{
"name": "Ultimaker S3",
"model": "Ultimaker S3",
"location": "Labor B - Arbeitsplatz 1",
"mac_address": "AA:BB:CC:DD:EE:03",
"plug_ip": "192.168.1.103",
"status": "available",
"active": True
},
{
"name": "Bambu Lab X1 Carbon",
"model": "Bambu Lab X1 Carbon",
"location": "Labor B - Arbeitsplatz 2",
"mac_address": "AA:BB:CC:DD:EE:04",
"plug_ip": "192.168.1.104",
"status": "available",
"active": True
},
{
"name": "Formlabs Form 3",
"model": "Formlabs Form 3",
"location": "Labor C - Harz-Bereich",
"mac_address": "AA:BB:CC:DD:EE:05",
"plug_ip": "192.168.1.105",
"status": "offline",
"active": False
}
]
db_session = get_db_session()
try:
added_count = 0
for printer_data in test_printers:
# Prüfen, ob Drucker bereits existiert
existing = db_session.query(Printer).filter(
Printer.name == printer_data["name"]
).first()
if existing:
print(f"⚠️ Drucker '{printer_data['name']}' existiert bereits - überspringe")
continue
# Neuen Drucker erstellen
new_printer = Printer(
name=printer_data["name"],
model=printer_data["model"],
location=printer_data["location"],
mac_address=printer_data["mac_address"],
plug_ip=printer_data["plug_ip"],
status=printer_data["status"],
active=printer_data["active"],
created_at=datetime.now()
)
db_session.add(new_printer)
added_count += 1
print(f"✅ Drucker '{printer_data['name']}' hinzugefügt")
if added_count > 0:
db_session.commit()
print(f"\n🎉 {added_count} Testdrucker erfolgreich zur Datenbank hinzugefügt!")
else:
print("\n📋 Alle Testdrucker existieren bereits in der Datenbank")
# Zeige alle Drucker in der Datenbank
all_printers = db_session.query(Printer).all()
print(f"\n📊 Gesamt {len(all_printers)} Drucker in der Datenbank:")
print("-" * 80)
print(f"{'ID':<4} {'Name':<20} {'Modell':<20} {'Status':<12} {'Aktiv':<6}")
print("-" * 80)
for printer in all_printers:
active_str = "" if printer.active else ""
print(f"{printer.id:<4} {printer.name[:19]:<20} {(printer.model or 'Unbekannt')[:19]:<20} {printer.status:<12} {active_str:<6}")
db_session.close()
except Exception as e:
db_session.rollback()
db_session.close()
print(f"❌ Fehler beim Hinzufügen der Testdrucker: {str(e)}")
return False
return True
def remove_test_printers():
"""Entfernt alle Testdrucker aus der Datenbank"""
test_printer_names = [
"Prusa i3 MK3S+",
"Ender 3 V2",
"Ultimaker S3",
"Bambu Lab X1 Carbon",
"Formlabs Form 3"
]
db_session = get_db_session()
try:
removed_count = 0
for name in test_printer_names:
printer = db_session.query(Printer).filter(Printer.name == name).first()
if printer:
db_session.delete(printer)
removed_count += 1
print(f"🗑️ Drucker '{name}' entfernt")
if removed_count > 0:
db_session.commit()
print(f"\n🧹 {removed_count} Testdrucker erfolgreich entfernt!")
else:
print("\n📋 Keine Testdrucker zum Entfernen gefunden")
db_session.close()
except Exception as e:
db_session.rollback()
db_session.close()
print(f"❌ Fehler beim Entfernen der Testdrucker: {str(e)}")
return False
return True
if __name__ == "__main__":
print("=== MYP Druckerverwaltung - Testdrucker-Verwaltung ===")
print()
if len(sys.argv) > 1 and sys.argv[1] == "--remove":
print("Entferne Testdrucker...")
remove_test_printers()
else:
print("Füge Testdrucker hinzu...")
print("(Verwende --remove um Testdrucker zu entfernen)")
print()
add_test_printers()
print("\nFertig! 🚀")

View File

@ -1,95 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SSL-Zertifikat-Generator für die MYP-Plattform
Erstellt selbstsignierte SSL-Zertifikate für die lokale Entwicklung
"""
import os
import datetime
import sys
# Überprüfen, ob die notwendigen Pakete installiert sind
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
except ImportError:
print("Fehler: Paket 'cryptography' nicht gefunden.")
print("Bitte installieren Sie es mit: pip install cryptography")
sys.exit(1)
def create_self_signed_cert(cert_path, key_path, hostname="localhost"):
"""
Erstellt ein selbstsigniertes SSL-Zertifikat mit dem angegebenen Hostnamen.
Args:
cert_path: Pfad zur Zertifikatsdatei
key_path: Pfad zur privaten Schlüsseldatei
hostname: Hostname für das Zertifikat (Standard: localhost)
"""
# Verzeichnis erstellen, falls es nicht existiert
cert_dir = os.path.dirname(cert_path)
if cert_dir and not os.path.exists(cert_dir):
os.makedirs(cert_dir, exist_ok=True)
# Privaten Schlüssel generieren
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Schlüsseldatei schreiben
with open(key_path, "wb") as key_file:
key_file.write(private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption()
))
# Name für das Zertifikat erstellen
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
# Zertifikat erstellen
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(hostname)]),
critical=False,
).sign(private_key, hashes.SHA256())
# Zertifikatsdatei schreiben
with open(cert_path, "wb") as cert_file:
cert_file.write(cert.public_bytes(Encoding.PEM))
print(f"Selbstsigniertes SSL-Zertifikat für '{hostname}' erstellt:")
print(f"Zertifikat: {cert_path}")
print(f"Schlüssel: {key_path}")
print(f"Gültig für 1 Jahr.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Erstellt selbstsignierte SSL-Zertifikate für die lokale Entwicklung")
parser.add_argument("-c", "--cert", default="/home/user/Projektarbeit-MYP/backend/app/certs/myp.crt", help="Pfad zur Zertifikatsdatei")
parser.add_argument("-k", "--key", default="/home/user/Projektarbeit-MYP/backend/app/certs/myp.key", help="Pfad zur Schlüsseldatei")
parser.add_argument("-n", "--hostname", default="localhost", help="Hostname für das Zertifikat")
args = parser.parse_args()
create_self_signed_cert(args.cert, args.key, args.hostname)

View File

@ -1,106 +0,0 @@
#!/usr/bin/env python3
"""
Script zum Erstellen von Test-Druckern für die MYP Plattform
"""
import sys
import os
sys.path.append('.')
from models import *
from datetime import datetime
def create_test_printers():
"""Erstellt Test-Drucker in der Datenbank."""
# Verbindung zur Datenbank
db_session = get_db_session()
# Test-Drucker Daten
test_printers = [
{
'name': 'Mercedes-Benz FDM Pro #01',
'model': 'Ultimaker S5 Pro',
'location': 'Werkhalle Sindelfingen',
'plug_ip': '192.168.10.101',
'status': 'available',
'active': True
},
{
'name': 'Mercedes-Benz FDM #02',
'model': 'Prusa MK3S+',
'location': 'Entwicklungszentrum Stuttgart',
'plug_ip': '192.168.10.102',
'status': 'printing',
'active': True
},
{
'name': 'Mercedes-Benz SLA #01',
'model': 'Formlabs Form 3+',
'location': 'Prototypenlabor',
'plug_ip': '192.168.10.103',
'status': 'available',
'active': True
},
{
'name': 'Mercedes-Benz Industrial #01',
'model': 'Stratasys F370',
'location': 'Industriehalle Bremen',
'plug_ip': '192.168.10.104',
'status': 'maintenance',
'active': False
},
{
'name': 'Mercedes-Benz Rapid #01',
'model': 'Bambu Lab X1 Carbon',
'location': 'Designabteilung',
'plug_ip': '192.168.10.105',
'status': 'offline',
'active': True
},
{
'name': 'Mercedes-Benz SLS #01',
'model': 'HP Jet Fusion 5200',
'location': 'Produktionszentrum Berlin',
'plug_ip': '192.168.10.106',
'status': 'available',
'active': True
}
]
try:
created_count = 0
for printer_data in test_printers:
# Prüfen ob Drucker bereits existiert
existing = db_session.query(Printer).filter_by(name=printer_data['name']).first()
if not existing:
printer = Printer(
name=printer_data['name'],
model=printer_data['model'],
location=printer_data['location'],
plug_ip=printer_data['plug_ip'],
status=printer_data['status'],
active=printer_data['active'],
created_at=datetime.now()
)
db_session.add(printer)
created_count += 1
print(f"✅ Drucker '{printer_data['name']}' erstellt")
else:
print(f" Drucker '{printer_data['name']}' existiert bereits")
db_session.commit()
total_count = db_session.query(Printer).count()
print(f"\n🎉 {created_count} neue Test-Drucker erstellt!")
print(f"📊 Insgesamt {total_count} Drucker in der Datenbank.")
except Exception as e:
print(f"❌ Fehler beim Erstellen der Test-Drucker: {str(e)}")
db_session.rollback()
finally:
db_session.close()
if __name__ == "__main__":
print("🚀 Erstelle Test-Drucker für MYP Plattform...")
create_test_printers()
print("✅ Fertig!")

View File

@ -1,175 +0,0 @@
"""
Offline-kompatible E-Mail-Benachrichtigung für MYP-System
========================================================
Da das System im Produktionsbetrieb offline läuft, werden alle E-Mail-Benachrichtigungen
nur geloggt aber nicht tatsächlich versendet.
"""
import logging
from datetime import datetime
from typing import Optional, Dict, Any
from utils.logging_config import get_logger
logger = get_logger("email_notification")
class OfflineEmailNotification:
"""
Offline-E-Mail-Benachrichtigung die nur Logs erstellt.
Simuliert E-Mail-Versand für Offline-Betrieb.
"""
def __init__(self):
self.enabled = False # Immer deaktiviert im Offline-Modus
logger.info("📧 Offline-E-Mail-Benachrichtigung initialisiert (kein echter E-Mail-Versand)")
def send_email(self, to: str, subject: str, body: str, **kwargs) -> bool:
"""
Simuliert E-Mail-Versand durch Logging.
Args:
to: E-Mail-Empfänger
subject: E-Mail-Betreff
body: E-Mail-Inhalt
**kwargs: Zusätzliche Parameter
Returns:
bool: Immer True (Simulation erfolgreich)
"""
logger.info(f"📧 [OFFLINE-SIMULATION] E-Mail würde versendet werden:")
logger.info(f" 📮 An: {to}")
logger.info(f" 📋 Betreff: {subject}")
logger.info(f" 📝 Inhalt: {body[:100]}{'...' if len(body) > 100 else ''}")
logger.info(f" 🕒 Zeitpunkt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}")
if kwargs:
logger.info(f" ⚙️ Zusätzliche Parameter: {kwargs}")
return True
def send_notification_email(self, recipient: str, notification_type: str,
data: Dict[str, Any]) -> bool:
"""
Sendet Benachrichtigungs-E-Mail (Offline-Simulation).
Args:
recipient: E-Mail-Empfänger
notification_type: Art der Benachrichtigung
data: Daten für die Benachrichtigung
Returns:
bool: Immer True (Simulation erfolgreich)
"""
subject = f"MYP-Benachrichtigung: {notification_type}"
body = f"Benachrichtigung vom MYP-System:\n\n{data}"
return self.send_email(recipient, subject, body, notification_type=notification_type)
def send_maintenance_notification(self, recipient: str, task_title: str,
task_description: str) -> bool:
"""
Sendet Wartungs-Benachrichtigung (Offline-Simulation).
Args:
recipient: E-Mail-Empfänger
task_title: Titel der Wartungsaufgabe
task_description: Beschreibung der Wartungsaufgabe
Returns:
bool: Immer True (Simulation erfolgreich)
"""
subject = f"MYP-Wartungsaufgabe: {task_title}"
body = f"""
Neue Wartungsaufgabe im MYP-System:
Titel: {task_title}
Beschreibung: {task_description}
Erstellt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}
Bitte loggen Sie sich in das MYP-System ein, um weitere Details zu sehen.
"""
return self.send_email(recipient, subject, body, task_type="maintenance")
# Globale Instanz für einfache Verwendung
email_notifier = OfflineEmailNotification()
def send_email_notification(recipient: str, subject: str, body: str, **kwargs) -> bool:
"""
Haupt-Funktion für E-Mail-Versand (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
subject: E-Mail-Betreff
body: E-Mail-Inhalt
**kwargs: Zusätzliche Parameter
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
return email_notifier.send_email(recipient, subject, body, **kwargs)
def send_maintenance_email(recipient: str, task_title: str, task_description: str) -> bool:
"""
Sendet Wartungs-E-Mail (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
task_title: Titel der Wartungsaufgabe
task_description: Beschreibung der Wartungsaufgabe
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
return email_notifier.send_maintenance_notification(recipient, task_title, task_description)
def send_guest_approval_email(recipient: str, otp_code: str, expires_at: str) -> bool:
"""
Sendet Gastauftrags-Genehmigung-E-Mail (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
otp_code: OTP-Code für den Gastauftrag
expires_at: Ablaufzeit des OTP-Codes
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
subject = "MYP-Gastauftrag genehmigt"
body = f"""
Ihr Gastauftrag wurde genehmigt!
OTP-Code: {otp_code}
Gültig bis: {expires_at}
Bitte verwenden Sie diesen Code am MYP-Terminal, um Ihren Druckauftrag zu starten.
"""
return email_notifier.send_email(recipient, subject, body,
otp_code=otp_code, expires_at=expires_at)
def send_guest_rejection_email(recipient: str, reason: str) -> bool:
"""
Sendet Gastauftrags-Ablehnungs-E-Mail (Offline-kompatibel).
Args:
recipient: E-Mail-Empfänger
reason: Grund für die Ablehnung
Returns:
bool: True wenn "erfolgreich" (geloggt)
"""
subject = "MYP-Gastauftrag abgelehnt"
body = f"""
Ihr Gastauftrag wurde leider abgelehnt.
Grund: {reason}
Bei Fragen wenden Sie sich bitte an das MYP-Team.
"""
return email_notifier.send_email(recipient, subject, body, rejection_reason=reason)
# Für Backward-Kompatibilität
send_notification = send_email_notification

View File

@ -1,790 +0,0 @@
"""
Wartungsplanungs- und Tracking-System für das MYP-System
========================================================
Dieses Modul stellt umfassende Wartungsfunktionalität bereit:
- Geplante und ungeplante Wartungen
- Wartungsintervalle und Erinnerungen
- Wartungshistorie und Berichte
- Automatische Wartungsprüfungen
- Ersatzteil-Management
- Techniker-Zuweisungen
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import schedule
import time
from utils.logging_config import get_logger
from models import Printer, get_db_session
from utils.email_notification import send_email_notification
from utils.realtime_dashboard import emit_system_alert
logger = get_logger("maintenance")
class MaintenanceType(Enum):
"""Arten von Wartungen"""
PREVENTIVE = "preventive" # Vorbeugende Wartung
CORRECTIVE = "corrective" # Reparatur/Korrektur
EMERGENCY = "emergency" # Notfall-Wartung
SCHEDULED = "scheduled" # Geplante Wartung
INSPECTION = "inspection" # Inspektion
class MaintenanceStatus(Enum):
"""Status einer Wartung"""
PLANNED = "planned" # Geplant
SCHEDULED = "scheduled" # Terminiert
IN_PROGRESS = "in_progress" # In Bearbeitung
COMPLETED = "completed" # Abgeschlossen
CANCELLED = "cancelled" # Abgebrochen
OVERDUE = "overdue" # Überfällig
class MaintenancePriority(Enum):
"""Priorität einer Wartung"""
LOW = "low" # Niedrig
NORMAL = "normal" # Normal
HIGH = "high" # Hoch
CRITICAL = "critical" # Kritisch
EMERGENCY = "emergency" # Notfall
@dataclass
class MaintenanceTask:
"""Wartungsaufgabe"""
id: Optional[int] = None
printer_id: int = None
title: str = ""
description: str = ""
maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE
priority: MaintenancePriority = MaintenancePriority.NORMAL
status: MaintenanceStatus = MaintenanceStatus.PLANNED
scheduled_date: Optional[datetime] = None
due_date: Optional[datetime] = None
estimated_duration: int = 60 # Minuten
actual_duration: Optional[int] = None
assigned_technician: Optional[str] = None
created_at: datetime = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: str = ""
required_parts: List[str] = None
actual_parts_used: List[str] = None
cost: Optional[float] = None
checklist: List[Dict[str, Any]] = None
photos: List[str] = None
created_by: Optional[int] = None
@dataclass
class MaintenanceSchedule:
"""Wartungsplan"""
printer_id: int
maintenance_type: MaintenanceType
interval_days: int
next_due: datetime
last_completed: Optional[datetime] = None
is_active: bool = True
description: str = ""
checklist_template: List[str] = None
@dataclass
class MaintenanceMetrics:
"""Wartungsmetriken"""
total_tasks: int = 0
completed_tasks: int = 0
overdue_tasks: int = 0
average_completion_time: float = 0.0
total_cost: float = 0.0
mtbf: float = 0.0 # Mean Time Between Failures
mttr: float = 0.0 # Mean Time To Repair
uptime_percentage: float = 0.0
class MaintenanceManager:
"""Manager für Wartungsplanung und -tracking"""
def __init__(self):
self.tasks: Dict[int, MaintenanceTask] = {}
self.schedules: Dict[int, List[MaintenanceSchedule]] = {}
self.maintenance_history: List[MaintenanceTask] = []
self.next_task_id = 1
self.is_running = False
self._setup_scheduler()
def _setup_scheduler(self):
"""Richtet automatische Wartungsplanung ein"""
schedule.every().day.at("06:00").do(self._check_scheduled_maintenance)
schedule.every().hour.do(self._check_overdue_tasks)
schedule.every().monday.at("08:00").do(self._generate_weekly_report)
# Scheduler in separatem Thread
def run_scheduler():
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
self.is_running = True
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("Wartungs-Scheduler gestartet")
def create_task(self, task: MaintenanceTask) -> int:
"""Erstellt eine neue Wartungsaufgabe"""
task.id = self.next_task_id
self.next_task_id += 1
task.created_at = datetime.now()
self.tasks[task.id] = task
# Automatische Terminierung für vorbeugende Wartungen
if task.maintenance_type == MaintenanceType.PREVENTIVE and not task.scheduled_date:
task.scheduled_date = self._calculate_next_maintenance_date(task.printer_id)
# Benachrichtigungen senden
self._send_task_notifications(task, "created")
logger.info(f"Wartungsaufgabe erstellt: {task.title} für Drucker {task.printer_id}")
return task.id
def update_task_status(self, task_id: int, new_status: MaintenanceStatus, notes: str = "") -> bool:
"""Aktualisiert den Status einer Wartungsaufgabe"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
old_status = task.status
task.status = new_status
# Zeitstempel setzen
if new_status == MaintenanceStatus.IN_PROGRESS:
task.started_at = datetime.now()
elif new_status == MaintenanceStatus.COMPLETED:
task.completed_at = datetime.now()
if task.started_at:
task.actual_duration = int((task.completed_at - task.started_at).total_seconds() / 60)
# Zur Historie hinzufügen
self.maintenance_history.append(task)
# Nächste Wartung planen
self._schedule_next_maintenance(task)
if notes:
task.notes += f"\n{datetime.now().strftime('%d.%m.%Y %H:%M')}: {notes}"
# Benachrichtigungen senden
if old_status != new_status:
self._send_task_notifications(task, "status_changed")
logger.info(f"Wartungsaufgabe {task_id} Status: {old_status.value}{new_status.value}")
return True
def schedule_maintenance(self, printer_id: int, maintenance_type: MaintenanceType,
interval_days: int, description: str = "") -> MaintenanceSchedule:
"""Plant regelmäßige Wartungen"""
schedule_item = MaintenanceSchedule(
printer_id=printer_id,
maintenance_type=maintenance_type,
interval_days=interval_days,
next_due=datetime.now() + timedelta(days=interval_days),
description=description
)
if printer_id not in self.schedules:
self.schedules[printer_id] = []
self.schedules[printer_id].append(schedule_item)
logger.info(f"Wartungsplan erstellt: {maintenance_type.value} alle {interval_days} Tage für Drucker {printer_id}")
return schedule_item
def get_upcoming_maintenance(self, days_ahead: int = 7) -> List[MaintenanceTask]:
"""Holt anstehende Wartungen"""
cutoff_date = datetime.now() + timedelta(days=days_ahead)
upcoming = []
for task in self.tasks.values():
if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
task.due_date and task.due_date <= cutoff_date):
upcoming.append(task)
return sorted(upcoming, key=lambda t: t.due_date or datetime.max)
def get_overdue_tasks(self) -> List[MaintenanceTask]:
"""Holt überfällige Wartungen"""
now = datetime.now()
overdue = []
for task in self.tasks.values():
if (task.status in [MaintenanceStatus.PLANNED, MaintenanceStatus.SCHEDULED] and
task.due_date and task.due_date < now):
task.status = MaintenanceStatus.OVERDUE
overdue.append(task)
return overdue
def get_maintenance_metrics(self, printer_id: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> MaintenanceMetrics:
"""Berechnet Wartungsmetriken"""
# Filter tasks
tasks = self.maintenance_history.copy()
if printer_id:
tasks = [t for t in tasks if t.printer_id == printer_id]
if start_date:
tasks = [t for t in tasks if t.completed_at and t.completed_at >= start_date]
if end_date:
tasks = [t for t in tasks if t.completed_at and t.completed_at <= end_date]
if not tasks:
return MaintenanceMetrics()
completed_tasks = [t for t in tasks if t.status == MaintenanceStatus.COMPLETED]
# Grundmetriken
total_tasks = len(tasks)
completed_count = len(completed_tasks)
# Durchschnittliche Bearbeitungszeit
completion_times = [t.actual_duration for t in completed_tasks if t.actual_duration]
avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else 0
# Gesamtkosten
total_cost = sum(t.cost for t in completed_tasks if t.cost)
# MTBF und MTTR berechnen
mtbf = self._calculate_mtbf(tasks, printer_id)
mttr = avg_completion_time / 60 # Konvertiere zu Stunden
# Verfügbarkeit berechnen
uptime_percentage = self._calculate_uptime(printer_id, start_date, end_date)
return MaintenanceMetrics(
total_tasks=total_tasks,
completed_tasks=completed_count,
overdue_tasks=len(self.get_overdue_tasks()),
average_completion_time=avg_completion_time,
total_cost=total_cost,
mtbf=mtbf,
mttr=mttr,
uptime_percentage=uptime_percentage
)
def create_maintenance_checklist(self, maintenance_type: MaintenanceType) -> List[Dict[str, Any]]:
"""Erstellt eine Wartungs-Checkliste"""
checklists = {
MaintenanceType.PREVENTIVE: [
{"task": "Drucker äußerlich reinigen", "completed": False, "required": True},
{"task": "Druckbett-Level prüfen", "completed": False, "required": True},
{"task": "Extruder-Düse reinigen", "completed": False, "required": True},
{"task": "Riemen-Spannung prüfen", "completed": False, "required": True},
{"task": "Filament-Führung prüfen", "completed": False, "required": False},
{"task": "Software-Updates prüfen", "completed": False, "required": False},
{"task": "Lüfter reinigen", "completed": False, "required": True},
{"task": "Schrauben nachziehen", "completed": False, "required": False}
],
MaintenanceType.CORRECTIVE: [
{"task": "Problem-Diagnose durchführen", "completed": False, "required": True},
{"task": "Defekte Teile identifizieren", "completed": False, "required": True},
{"task": "Ersatzteile bestellen/bereitstellen", "completed": False, "required": True},
{"task": "Reparatur durchführen", "completed": False, "required": True},
{"task": "Funktionstest durchführen", "completed": False, "required": True},
{"task": "Kalibrierung prüfen", "completed": False, "required": True}
],
MaintenanceType.INSPECTION: [
{"task": "Sichtprüfung der Mechanik", "completed": False, "required": True},
{"task": "Druckqualität testen", "completed": False, "required": True},
{"task": "Temperaturen prüfen", "completed": False, "required": True},
{"task": "Bewegungen testen", "completed": False, "required": True},
{"task": "Verschleiß bewerten", "completed": False, "required": True}
]
}
return checklists.get(maintenance_type, [])
def _check_scheduled_maintenance(self):
"""Prüft täglich auf fällige Wartungen"""
logger.info("Prüfe fällige Wartungen...")
today = datetime.now()
for printer_id, schedules in self.schedules.items():
for schedule_item in schedules:
if not schedule_item.is_active:
continue
if schedule_item.next_due <= today:
# Erstelle Wartungsaufgabe
task = MaintenanceTask(
printer_id=printer_id,
title=f"{schedule_item.maintenance_type.value.title()} Wartung",
description=schedule_item.description,
maintenance_type=schedule_item.maintenance_type,
priority=MaintenancePriority.NORMAL,
due_date=schedule_item.next_due,
checklist=self.create_maintenance_checklist(schedule_item.maintenance_type)
)
task_id = self.create_task(task)
# Nächsten Termin berechnen
schedule_item.next_due = today + timedelta(days=schedule_item.interval_days)
logger.info(f"Automatische Wartungsaufgabe erstellt: {task_id}")
def _check_overdue_tasks(self):
"""Prüft stündlich auf überfällige Aufgaben"""
overdue = self.get_overdue_tasks()
if overdue:
logger.warning(f"{len(overdue)} überfällige Wartungsaufgaben gefunden")
for task in overdue:
emit_system_alert(
f"Wartung überfällig: {task.title} (Drucker {task.printer_id})",
"warning",
"high"
)
def _generate_weekly_report(self):
"""Generiert wöchentlichen Wartungsbericht"""
logger.info("Generiere wöchentlichen Wartungsbericht...")
# Sammle Daten der letzten Woche
last_week = datetime.now() - timedelta(days=7)
metrics = self.get_maintenance_metrics(start_date=last_week)
# Sende Report (Implementation abhängig von verfügbaren Services)
# send_maintenance_report(metrics)
def _calculate_next_maintenance_date(self, printer_id: int) -> datetime:
"""Berechnet nächstes Wartungsdatum basierend auf Nutzung"""
# Vereinfachte Implementierung - kann erweitert werden
base_interval = 30 # Tage
# Hier könnte man Nutzungsstatistiken einbeziehen
with get_db_session() as db_session:
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if printer:
# Berücksichtige letzten Check
if printer.last_checked:
days_since_check = (datetime.now() - printer.last_checked).days
if days_since_check < 15: # Kürzlich gecheckt
base_interval += 15
return datetime.now() + timedelta(days=base_interval)
def _schedule_next_maintenance(self, completed_task: MaintenanceTask):
"""Plant nächste Wartung nach Abschluss einer Aufgabe"""
if completed_task.maintenance_type == MaintenanceType.PREVENTIVE:
# Finde entsprechenden Schedule
printer_schedules = self.schedules.get(completed_task.printer_id, [])
for schedule_item in printer_schedules:
if schedule_item.maintenance_type == completed_task.maintenance_type:
schedule_item.last_completed = completed_task.completed_at
schedule_item.next_due = datetime.now() + timedelta(days=schedule_item.interval_days)
break
def _calculate_mtbf(self, tasks: List[MaintenanceTask], printer_id: Optional[int]) -> float:
"""Berechnet Mean Time Between Failures"""
# Vereinfachte MTBF-Berechnung
failure_tasks = [t for t in tasks if t.maintenance_type == MaintenanceType.CORRECTIVE]
if len(failure_tasks) < 2:
return 0.0
# Zeitspanne zwischen ersten und letzten Ausfall
first_failure = min(failure_tasks, key=lambda t: t.created_at)
last_failure = max(failure_tasks, key=lambda t: t.created_at)
total_time = (last_failure.created_at - first_failure.created_at).total_seconds() / 3600 # Stunden
failure_count = len(failure_tasks) - 1
return total_time / failure_count if failure_count > 0 else 0.0
def _calculate_uptime(self, printer_id: Optional[int], start_date: Optional[datetime],
end_date: Optional[datetime]) -> float:
"""Berechnet Verfügbarkeit in Prozent"""
# Vereinfachte Uptime-Berechnung
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
total_time = (end_date - start_date).total_seconds()
# Berechne Downtime aus Wartungszeiten
downtime = 0
for task in self.maintenance_history:
if printer_id and task.printer_id != printer_id:
continue
if (task.status == MaintenanceStatus.COMPLETED and
task.started_at and task.completed_at and
task.started_at >= start_date and task.completed_at <= end_date):
downtime += (task.completed_at - task.started_at).total_seconds()
uptime = ((total_time - downtime) / total_time) * 100 if total_time > 0 else 0
return max(0, min(100, uptime))
def _send_task_notifications(self, task: MaintenanceTask, event_type: str):
"""Sendet Benachrichtigungen für Wartungsaufgaben"""
try:
if event_type == "created":
emit_system_alert(
f"Neue Wartungsaufgabe: {task.title} (Drucker {task.printer_id})",
"info",
"normal"
)
elif event_type == "status_changed":
emit_system_alert(
f"Wartungsstatus geändert: {task.title}{task.status.value}",
"info",
"normal"
)
except Exception as e:
logger.error(f"Fehler beim Senden der Wartungsbenachrichtigung: {str(e)}")
# Globale Instanz
maintenance_manager = MaintenanceManager()
def get_maintenance_dashboard_data() -> Dict[str, Any]:
"""Holt Dashboard-Daten für Wartungen"""
upcoming = maintenance_manager.get_upcoming_maintenance()
overdue = maintenance_manager.get_overdue_tasks()
metrics = maintenance_manager.get_maintenance_metrics()
return {
'upcoming_count': len(upcoming),
'overdue_count': len(overdue),
'upcoming_tasks': [asdict(task) for task in upcoming[:5]],
'overdue_tasks': [asdict(task) for task in overdue],
'metrics': asdict(metrics),
'next_scheduled': upcoming[0] if upcoming else None
}
def create_emergency_maintenance(printer_id: int, description: str,
priority: MaintenancePriority = MaintenancePriority.CRITICAL) -> int:
"""Erstellt eine Notfall-Wartung"""
task = MaintenanceTask(
printer_id=printer_id,
title="Notfall-Wartung",
description=description,
maintenance_type=MaintenanceType.EMERGENCY,
priority=priority,
due_date=datetime.now(), # Sofort fällig
checklist=maintenance_manager.create_maintenance_checklist(MaintenanceType.CORRECTIVE)
)
return maintenance_manager.create_task(task)
def schedule_preventive_maintenance(printer_id: int, interval_days: int = 30) -> MaintenanceSchedule:
"""Plant vorbeugende Wartung"""
return maintenance_manager.schedule_maintenance(
printer_id=printer_id,
maintenance_type=MaintenanceType.PREVENTIVE,
interval_days=interval_days,
description="Regelmäßige vorbeugende Wartung"
)
# JavaScript für Wartungs-Frontend
def get_maintenance_javascript() -> str:
"""JavaScript für Wartungsmanagement"""
return """
class MaintenanceManager {
constructor() {
this.currentTasks = [];
this.selectedTask = null;
this.init();
}
init() {
this.loadTasks();
this.setupEventListeners();
this.startAutoRefresh();
}
setupEventListeners() {
// Task status updates
document.addEventListener('click', (e) => {
if (e.target.matches('.maintenance-status-btn')) {
const taskId = e.target.dataset.taskId;
const newStatus = e.target.dataset.status;
this.updateTaskStatus(taskId, newStatus);
}
if (e.target.matches('.maintenance-details-btn')) {
const taskId = e.target.dataset.taskId;
this.showTaskDetails(taskId);
}
});
// Create maintenance form
const createForm = document.getElementById('create-maintenance-form');
createForm?.addEventListener('submit', (e) => {
e.preventDefault();
this.createTask(new FormData(createForm));
});
}
async loadTasks() {
try {
const response = await fetch('/api/maintenance/tasks');
const data = await response.json();
if (data.success) {
this.currentTasks = data.tasks;
this.renderTasks();
}
} catch (error) {
console.error('Fehler beim Laden der Wartungsaufgaben:', error);
}
}
async updateTaskStatus(taskId, newStatus) {
try {
const response = await fetch(`/api/maintenance/tasks/${taskId}/status`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: newStatus })
});
const result = await response.json();
if (result.success) {
this.loadTasks(); // Refresh
this.showNotification('Wartungsstatus aktualisiert', 'success');
} else {
this.showNotification('Fehler beim Aktualisieren', 'error');
}
} catch (error) {
console.error('Status-Update fehlgeschlagen:', error);
}
}
renderTasks() {
const container = document.getElementById('maintenance-tasks-container');
if (!container) return;
container.innerHTML = this.currentTasks.map(task => `
<div class="maintenance-task-card ${task.status} priority-${task.priority}">
<div class="task-header">
<h3>${task.title}</h3>
<span class="task-priority">${task.priority}</span>
</div>
<div class="task-info">
<p><strong>Drucker:</strong> ${task.printer_id}</p>
<p><strong>Typ:</strong> ${task.maintenance_type}</p>
<p><strong>Fällig:</strong> ${this.formatDate(task.due_date)}</p>
<p><strong>Status:</strong> ${task.status}</p>
</div>
<div class="task-actions">
<button class="maintenance-status-btn" data-task-id="${task.id}" data-status="in_progress">
Starten
</button>
<button class="maintenance-status-btn" data-task-id="${task.id}" data-status="completed">
Abschließen
</button>
<button class="maintenance-details-btn" data-task-id="${task.id}">
Details
</button>
</div>
</div>
`).join('');
}
showTaskDetails(taskId) {
const task = this.currentTasks.find(t => t.id == taskId);
if (!task) return;
// Create modal with task details
const modal = document.createElement('div');
modal.className = 'maintenance-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>${task.title}</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="task-details">
<p><strong>Beschreibung:</strong> ${task.description}</p>
<p><strong>Techniker:</strong> ${task.assigned_technician || 'Nicht zugewiesen'}</p>
<p><strong>Geschätzte Dauer:</strong> ${task.estimated_duration} Minuten</p>
${task.checklist ? this.renderChecklist(task.checklist) : ''}
<div class="task-notes">
<h4>Notizen:</h4>
<textarea id="task-notes-${taskId}" rows="4" cols="50">${task.notes || ''}</textarea>
<button onclick="maintenanceManager.saveNotes(${taskId})">Notizen speichern</button>
</div>
</div>
</div>
</div>
`;
document.body.appendChild(modal);
// Close modal handlers
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderChecklist(checklist) {
return `
<div class="maintenance-checklist">
<h4>Checkliste:</h4>
${checklist.map((item, index) => `
<label class="checklist-item">
<input type="checkbox" ${item.completed ? 'checked' : ''}
onchange="maintenanceManager.updateChecklistItem(${index}, this.checked)">
${item.task}
${item.required ? '<span class="required">*</span>' : ''}
</label>
`).join('')}
</div>
`;
}
formatDate(dateString) {
if (!dateString) return 'Nicht gesetzt';
const date = new Date(dateString);
return date.toLocaleDateString('de-DE') + ' ' + date.toLocaleTimeString('de-DE', {hour: '2-digit', minute: '2-digit'});
}
showNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.className = `notification notification-${type}`;
notification.textContent = message;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 3000);
}
startAutoRefresh() {
setInterval(() => {
this.loadTasks();
}, 30000); // Refresh every 30 seconds
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', function() {
window.maintenanceManager = new MaintenanceManager();
});
"""
def create_maintenance_task(printer_id: int, title: str, description: str = "",
maintenance_type: MaintenanceType = MaintenanceType.PREVENTIVE,
priority: MaintenancePriority = MaintenancePriority.NORMAL) -> int:
"""
Erstellt eine neue Wartungsaufgabe.
Args:
printer_id: ID des Druckers
title: Titel der Wartungsaufgabe
description: Beschreibung der Aufgabe
maintenance_type: Art der Wartung
priority: Priorität der Aufgabe
Returns:
int: ID der erstellten Aufgabe
"""
task = MaintenanceTask(
printer_id=printer_id,
title=title,
description=description,
maintenance_type=maintenance_type,
priority=priority,
checklist=maintenance_manager.create_maintenance_checklist(maintenance_type)
)
return maintenance_manager.create_task(task)
def schedule_maintenance(printer_id: int, maintenance_type: MaintenanceType,
interval_days: int, description: str = "") -> MaintenanceSchedule:
"""
Plant regelmäßige Wartungen (Alias für maintenance_manager.schedule_maintenance).
Args:
printer_id: ID des Druckers
maintenance_type: Art der Wartung
interval_days: Intervall in Tagen
description: Beschreibung
Returns:
MaintenanceSchedule: Erstellter Wartungsplan
"""
return maintenance_manager.schedule_maintenance(
printer_id=printer_id,
maintenance_type=maintenance_type,
interval_days=interval_days,
description=description
)
def get_maintenance_overview() -> Dict[str, Any]:
"""
Holt eine Übersicht aller Wartungsaktivitäten.
Returns:
Dict: Wartungsübersicht mit Statistiken und anstehenden Aufgaben
"""
upcoming = maintenance_manager.get_upcoming_maintenance()
overdue = maintenance_manager.get_overdue_tasks()
metrics = maintenance_manager.get_maintenance_metrics()
# Aktive Tasks
active_tasks = [task for task in maintenance_manager.tasks.values()
if task.status == MaintenanceStatus.IN_PROGRESS]
# Completed tasks in last 30 days
thirty_days_ago = datetime.now() - timedelta(days=30)
recent_completed = [task for task in maintenance_manager.maintenance_history
if task.completed_at and task.completed_at >= thirty_days_ago]
return {
'summary': {
'total_tasks': len(maintenance_manager.tasks),
'active_tasks': len(active_tasks),
'upcoming_tasks': len(upcoming),
'overdue_tasks': len(overdue),
'completed_this_month': len(recent_completed)
},
'upcoming_tasks': [asdict(task) for task in upcoming[:10]],
'overdue_tasks': [asdict(task) for task in overdue],
'active_tasks': [asdict(task) for task in active_tasks],
'recent_completed': [asdict(task) for task in recent_completed[:5]],
'metrics': asdict(metrics),
'schedules': {
printer_id: [asdict(schedule) for schedule in schedules]
for printer_id, schedules in maintenance_manager.schedules.items()
}
}
def update_maintenance_status(task_id: int, new_status: MaintenanceStatus,
notes: str = "") -> bool:
"""
Aktualisiert den Status einer Wartungsaufgabe (Alias für maintenance_manager.update_task_status).
Args:
task_id: ID der Wartungsaufgabe
new_status: Neuer Status
notes: Optionale Notizen
Returns:
bool: True wenn erfolgreich aktualisiert
"""
return maintenance_manager.update_task_status(task_id, new_status, notes)

View File

@ -1,899 +0,0 @@
"""
Multi-Standort-Unterstützungssystem für das MYP-System
======================================================
Dieses Modul stellt umfassende Multi-Location-Funktionalität bereit:
- Standort-Management und Hierarchien
- Standort-spezifische Konfigurationen
- Zentrale und dezentrale Verwaltung
- Standort-übergreifende Berichte
- Ressourcen-Sharing zwischen Standorten
- Benutzer-Standort-Zuweisungen
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import geocoder
import requests
from utils.logging_config import get_logger
from models import User, Printer, Job, get_db_session
logger = get_logger("multi_location")
class LocationType(Enum):
"""Arten von Standorten"""
HEADQUARTERS = "headquarters" # Hauptsitz
BRANCH = "branch" # Niederlassung
DEPARTMENT = "department" # Abteilung
FLOOR = "floor" # Stockwerk
ROOM = "room" # Raum
AREA = "area" # Bereich
class AccessLevel(Enum):
"""Zugriffslevel für Standorte"""
FULL = "full" # Vollzugriff
READ_WRITE = "read_write" # Lesen und Schreiben
READ_ONLY = "read_only" # Nur Lesen
NO_ACCESS = "no_access" # Kein Zugriff
@dataclass
class LocationConfig:
"""Standort-spezifische Konfiguration"""
timezone: str = "Europe/Berlin"
business_hours: Dict[str, str] = None
maintenance_window: Dict[str, str] = None
auto_approval_enabled: bool = False
max_job_duration: int = 480 # Minuten
contact_info: Dict[str, str] = None
notification_settings: Dict[str, Any] = None
@dataclass
class Location:
"""Standort-Definition"""
id: Optional[int] = None
name: str = ""
code: str = "" # Kurzer Code für den Standort
location_type: LocationType = LocationType.BRANCH
parent_id: Optional[int] = None
address: str = ""
city: str = ""
country: str = ""
postal_code: str = ""
latitude: Optional[float] = None
longitude: Optional[float] = None
description: str = ""
config: LocationConfig = None
is_active: bool = True
created_at: datetime = None
manager_id: Optional[int] = None
def __post_init__(self):
if self.config is None:
self.config = LocationConfig()
if self.created_at is None:
self.created_at = datetime.now()
@dataclass
class UserLocationAccess:
"""Benutzer-Standort-Zugriff"""
user_id: int
location_id: int
access_level: AccessLevel
granted_by: int
granted_at: datetime
expires_at: Optional[datetime] = None
is_primary: bool = False
class MultiLocationManager:
"""Manager für Multi-Standort-Funktionalität"""
def __init__(self):
self.locations: Dict[int, Location] = {}
self.user_access: Dict[int, List[UserLocationAccess]] = {}
self.next_location_id = 1
# Standard-Standort erstellen
self._create_default_location()
def _create_default_location(self):
"""Erstellt Standard-Standort falls keiner existiert"""
default_location = Location(
id=1,
name="Hauptstandort",
code="HQ",
location_type=LocationType.HEADQUARTERS,
address="Mercedes-Benz Platz",
city="Stuttgart",
country="Deutschland",
description="Hauptstandort des MYP-Systems"
)
self.locations[1] = default_location
self.next_location_id = 2
logger.info("Standard-Standort erstellt")
def create_location(self, location: Location) -> int:
"""Erstellt einen neuen Standort"""
location.id = self.next_location_id
self.next_location_id += 1
# Koordinaten automatisch ermitteln
if not location.latitude or not location.longitude:
self._geocode_location(location)
self.locations[location.id] = location
logger.info(f"Standort erstellt: {location.name} ({location.code})")
return location.id
def update_location(self, location_id: int, updates: Dict[str, Any]) -> bool:
"""Aktualisiert einen Standort"""
if location_id not in self.locations:
return False
location = self.locations[location_id]
for key, value in updates.items():
if hasattr(location, key):
setattr(location, key, value)
# Koordinaten neu ermitteln bei Adressänderung
if 'address' in updates or 'city' in updates:
self._geocode_location(location)
logger.info(f"Standort aktualisiert: {location.name}")
return True
def delete_location(self, location_id: int) -> bool:
"""Löscht einen Standort (Soft Delete)"""
if location_id not in self.locations:
return False
location = self.locations[location_id]
# Prüfe ob Standort Kinder hat
children = self.get_child_locations(location_id)
if children:
logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat Unterstandorte")
return False
# Prüfe auf aktive Ressourcen
if self._has_active_resources(location_id):
logger.warning(f"Standort {location.name} kann nicht gelöscht werden: hat aktive Ressourcen")
return False
location.is_active = False
logger.info(f"Standort deaktiviert: {location.name}")
return True
def get_location_hierarchy(self, location_id: Optional[int] = None) -> Dict[str, Any]:
"""Holt Standort-Hierarchie"""
if location_id:
# Spezifische Hierarchie ab einem Standort
location = self.locations.get(location_id)
if not location:
return {}
return self._build_hierarchy_node(location)
else:
# Komplette Hierarchie
root_locations = [loc for loc in self.locations.values()
if loc.parent_id is None and loc.is_active]
return {
'locations': [self._build_hierarchy_node(loc) for loc in root_locations]
}
def _build_hierarchy_node(self, location: Location) -> Dict[str, Any]:
"""Erstellt einen Hierarchie-Knoten"""
children = self.get_child_locations(location.id)
return {
'id': location.id,
'name': location.name,
'code': location.code,
'type': location.location_type.value,
'children': [self._build_hierarchy_node(child) for child in children],
'resource_count': self._count_location_resources(location.id)
}
def get_child_locations(self, parent_id: int) -> List[Location]:
"""Holt alle Kinder-Standorte"""
return [loc for loc in self.locations.values()
if loc.parent_id == parent_id and loc.is_active]
def get_location_path(self, location_id: int) -> List[Location]:
"""Holt den Pfad vom Root zum Standort"""
path = []
current_id = location_id
while current_id:
location = self.locations.get(current_id)
if not location:
break
path.insert(0, location)
current_id = location.parent_id
return path
def grant_location_access(self, user_id: int, location_id: int,
access_level: AccessLevel, granted_by: int,
expires_at: Optional[datetime] = None,
is_primary: bool = False) -> bool:
"""Gewährt Benutzer-Zugriff auf einen Standort"""
if location_id not in self.locations:
return False
access = UserLocationAccess(
user_id=user_id,
location_id=location_id,
access_level=access_level,
granted_by=granted_by,
granted_at=datetime.now(),
expires_at=expires_at,
is_primary=is_primary
)
if user_id not in self.user_access:
self.user_access[user_id] = []
# Entferne vorherigen Zugriff für diesen Standort
self.user_access[user_id] = [
acc for acc in self.user_access[user_id]
if acc.location_id != location_id
]
# Setze anderen primary-Zugriff zurück falls nötig
if is_primary:
for access_item in self.user_access[user_id]:
access_item.is_primary = False
self.user_access[user_id].append(access)
logger.info(f"Standort-Zugriff gewährt: User {user_id} → Location {location_id} ({access_level.value})")
return True
def revoke_location_access(self, user_id: int, location_id: int) -> bool:
"""Entzieht Benutzer-Zugriff auf einen Standort"""
if user_id not in self.user_access:
return False
original_count = len(self.user_access[user_id])
self.user_access[user_id] = [
acc for acc in self.user_access[user_id]
if acc.location_id != location_id
]
success = len(self.user_access[user_id]) < original_count
if success:
logger.info(f"Standort-Zugriff entzogen: User {user_id} → Location {location_id}")
return success
def get_user_locations(self, user_id: int, access_level: Optional[AccessLevel] = None) -> List[Location]:
"""Holt alle Standorte eines Benutzers"""
if user_id not in self.user_access:
return []
accessible_locations = []
now = datetime.now()
for access in self.user_access[user_id]:
# Prüfe Ablaufzeit
if access.expires_at and access.expires_at < now:
continue
# Prüfe Access Level
if access_level and access.access_level != access_level:
continue
location = self.locations.get(access.location_id)
if location and location.is_active:
accessible_locations.append(location)
return accessible_locations
def get_user_primary_location(self, user_id: int) -> Optional[Location]:
"""Holt den primären Standort eines Benutzers"""
if user_id not in self.user_access:
return None
for access in self.user_access[user_id]:
if access.is_primary:
return self.locations.get(access.location_id)
# Fallback: ersten verfügbaren Standort nehmen
user_locations = self.get_user_locations(user_id)
return user_locations[0] if user_locations else None
def check_user_access(self, user_id: int, location_id: int,
required_level: AccessLevel = AccessLevel.READ_ONLY) -> bool:
"""Prüft ob Benutzer Zugriff auf Standort hat"""
if user_id not in self.user_access:
return False
access_levels = {
AccessLevel.NO_ACCESS: 0,
AccessLevel.READ_ONLY: 1,
AccessLevel.READ_WRITE: 2,
AccessLevel.FULL: 3
}
required_level_value = access_levels[required_level]
now = datetime.now()
for access in self.user_access[user_id]:
if access.location_id != location_id:
continue
# Prüfe Ablaufzeit
if access.expires_at and access.expires_at < now:
continue
user_level_value = access_levels[access.access_level]
if user_level_value >= required_level_value:
return True
return False
def get_location_resources(self, location_id: int) -> Dict[str, Any]:
"""Holt alle Ressourcen eines Standorts"""
if location_id not in self.locations:
return {}
# Simuliere Datenbankabfrage für Drucker und Jobs
resources = {
'printers': [],
'active_jobs': [],
'users': [],
'pending_maintenance': 0
}
# In echter Implementierung würde hier die Datenbank abgefragt
with get_db_session() as db_session:
# Drucker des Standorts (vereinfacht - benötigt location_id in Printer-Model)
# printers = db_session.query(Printer).filter(Printer.location_id == location_id).all()
# resources['printers'] = [p.to_dict() for p in printers]
pass
return resources
def get_location_statistics(self, location_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> Dict[str, Any]:
"""Holt Statistiken für einen Standort"""
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Sammle Statistiken
stats = {
'location': self.locations.get(location_id, {}).name if location_id in self.locations else 'Unbekannt',
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'totals': {
'printers': 0,
'jobs_completed': 0,
'jobs_failed': 0,
'print_time_hours': 0,
'material_used_kg': 0,
'users_active': 0
},
'averages': {
'jobs_per_day': 0,
'job_duration_minutes': 0,
'printer_utilization': 0
},
'trends': {
'daily_jobs': [],
'printer_usage': []
}
}
# In echter Implementierung würden hier Datenbankabfragen stehen
return stats
def get_multi_location_report(self, location_ids: List[int] = None) -> Dict[str, Any]:
"""Erstellt standortübergreifenden Bericht"""
if not location_ids:
location_ids = list(self.locations.keys())
report = {
'generated_at': datetime.now().isoformat(),
'locations': [],
'summary': {
'total_locations': len(location_ids),
'total_printers': 0,
'total_users': 0,
'total_jobs': 0,
'cross_location_sharing': []
}
}
for location_id in location_ids:
location = self.locations.get(location_id)
if not location:
continue
location_stats = self.get_location_statistics(location_id)
location_data = {
'id': location.id,
'name': location.name,
'code': location.code,
'type': location.location_type.value,
'statistics': location_stats
}
report['locations'].append(location_data)
# Summiere für Gesamtübersicht
totals = location_stats.get('totals', {})
report['summary']['total_printers'] += totals.get('printers', 0)
report['summary']['total_users'] += totals.get('users_active', 0)
report['summary']['total_jobs'] += totals.get('jobs_completed', 0)
return report
def find_nearest_locations(self, latitude: float, longitude: float,
radius_km: float = 50, limit: int = 5) -> List[Tuple[Location, float]]:
"""Findet nächstgelegene Standorte"""
from math import radians, sin, cos, sqrt, atan2
def calculate_distance(lat1, lon1, lat2, lon2):
"""Berechnet Entfernung zwischen zwei Koordinaten (Haversine)"""
R = 6371 # Erdradius in km
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
nearby_locations = []
for location in self.locations.values():
if not location.is_active or not location.latitude or not location.longitude:
continue
distance = calculate_distance(
latitude, longitude,
location.latitude, location.longitude
)
if distance <= radius_km:
nearby_locations.append((location, distance))
# Sortiere nach Entfernung
nearby_locations.sort(key=lambda x: x[1])
return nearby_locations[:limit]
def _geocode_location(self, location: Location):
"""Ermittelt Koordinaten für einen Standort"""
try:
address_parts = [location.address, location.city, location.country]
full_address = ', '.join(filter(None, address_parts))
if not full_address:
return
# Verwende geocoder library
result = geocoder.osm(full_address)
if result.ok:
location.latitude = result.lat
location.longitude = result.lng
logger.info(f"Koordinaten ermittelt für {location.name}: {location.latitude}, {location.longitude}")
else:
logger.warning(f"Koordinaten konnten nicht ermittelt werden für {location.name}")
except Exception as e:
logger.error(f"Fehler bei Geocoding für {location.name}: {str(e)}")
def _has_active_resources(self, location_id: int) -> bool:
"""Prüft ob Standort aktive Ressourcen hat"""
# Vereinfachte Implementierung
# In echter Implementation würde hier die Datenbank geprüft
return False
def _count_location_resources(self, location_id: int) -> Dict[str, int]:
"""Zählt Ressourcen eines Standorts"""
# Vereinfachte Implementierung
return {
'printers': 0,
'users': 0,
'jobs': 0
}
# Globale Instanz
location_manager = MultiLocationManager()
# Alias für Import-Kompatibilität
LocationManager = MultiLocationManager
def create_location(name: str, code: str, location_type: LocationType = LocationType.BRANCH,
address: str = "", city: str = "", country: str = "",
parent_id: Optional[int] = None) -> int:
"""
Erstellt einen neuen Standort (globale Funktion).
Args:
name: Name des Standorts
code: Kurzer Code für den Standort
location_type: Art des Standorts
address: Adresse
city: Stadt
country: Land
parent_id: Parent-Standort ID
Returns:
int: ID des erstellten Standorts
"""
location = Location(
name=name,
code=code,
location_type=location_type,
address=address,
city=city,
country=country,
parent_id=parent_id
)
return location_manager.create_location(location)
def assign_user_to_location(user_id: int, location_id: int,
access_level: AccessLevel = AccessLevel.READ_WRITE,
granted_by: int = 1, is_primary: bool = False) -> bool:
"""
Weist einen Benutzer einem Standort zu.
Args:
user_id: ID des Benutzers
location_id: ID des Standorts
access_level: Zugriffslevel
granted_by: ID des gewährenden Benutzers
is_primary: Ob dies der primäre Standort ist
Returns:
bool: True wenn erfolgreich
"""
return location_manager.grant_location_access(
user_id=user_id,
location_id=location_id,
access_level=access_level,
granted_by=granted_by,
is_primary=is_primary
)
def get_user_locations(user_id: int) -> List[Location]:
"""
Holt alle Standorte eines Benutzers (globale Funktion).
Args:
user_id: ID des Benutzers
Returns:
List[Location]: Liste der zugänglichen Standorte
"""
return location_manager.get_user_locations(user_id)
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Berechnet die Entfernung zwischen zwei Koordinaten (Haversine-Formel).
Args:
lat1, lon1: Koordinaten des ersten Punkts
lat2, lon2: Koordinaten des zweiten Punkts
Returns:
float: Entfernung in Kilometern
"""
from math import radians, sin, cos, sqrt, atan2
R = 6371 # Erdradius in km
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def find_nearest_location(latitude: float, longitude: float,
radius_km: float = 50) -> Optional[Location]:
"""
Findet den nächstgelegenen Standort.
Args:
latitude: Breitengrad
longitude: Längengrad
radius_km: Suchradius in Kilometern
Returns:
Optional[Location]: Nächstgelegener Standort oder None
"""
nearest_locations = location_manager.find_nearest_locations(
latitude=latitude,
longitude=longitude,
radius_km=radius_km,
limit=1
)
return nearest_locations[0][0] if nearest_locations else None
def get_location_dashboard_data(user_id: int) -> Dict[str, Any]:
"""Holt Dashboard-Daten für Standorte eines Benutzers"""
user_locations = location_manager.get_user_locations(user_id)
primary_location = location_manager.get_user_primary_location(user_id)
dashboard_data = {
'user_locations': [asdict(loc) for loc in user_locations],
'primary_location': asdict(primary_location) if primary_location else None,
'location_count': len(user_locations),
'hierarchy': location_manager.get_location_hierarchy()
}
# Füge Statistiken für jeden Standort hinzu
for location in user_locations:
location_stats = location_manager.get_location_statistics(location.id)
dashboard_data[f'stats_{location.id}'] = location_stats
return dashboard_data
def create_location_from_address(name: str, address: str, city: str,
country: str, location_type: LocationType = LocationType.BRANCH) -> int:
"""Erstellt Standort aus Adresse mit automatischer Geocodierung"""
location = Location(
name=name,
code=name[:3].upper(),
location_type=location_type,
address=address,
city=city,
country=country
)
return location_manager.create_location(location)
# JavaScript für Multi-Location Frontend
def get_multi_location_javascript() -> str:
"""JavaScript für Multi-Location Management"""
return """
class MultiLocationManager {
constructor() {
this.currentLocation = null;
this.userLocations = [];
this.locationHierarchy = {};
this.init();
}
init() {
this.loadUserLocations();
this.setupEventListeners();
}
setupEventListeners() {
// Location switcher
document.addEventListener('change', (e) => {
if (e.target.matches('.location-selector')) {
const locationId = parseInt(e.target.value);
this.switchLocation(locationId);
}
});
// Location management buttons
document.addEventListener('click', (e) => {
if (e.target.matches('.manage-locations-btn')) {
this.showLocationManager();
}
if (e.target.matches('.location-hierarchy-btn')) {
this.showLocationHierarchy();
}
});
}
async loadUserLocations() {
try {
const response = await fetch('/api/locations/user');
const data = await response.json();
if (data.success) {
this.userLocations = data.locations;
this.currentLocation = data.primary_location;
this.locationHierarchy = data.hierarchy;
this.updateLocationSelector();
this.updateLocationDisplay();
}
} catch (error) {
console.error('Fehler beim Laden der Standorte:', error);
}
}
updateLocationSelector() {
const selectors = document.querySelectorAll('.location-selector');
selectors.forEach(selector => {
selector.innerHTML = this.userLocations.map(location =>
`<option value="${location.id}" ${location.id === this.currentLocation?.id ? 'selected' : ''}>
${location.name} (${location.code})
</option>`
).join('');
});
}
updateLocationDisplay() {
const displays = document.querySelectorAll('.current-location-display');
displays.forEach(display => {
if (this.currentLocation) {
display.innerHTML = `
<div class="location-info">
<strong>${this.currentLocation.name}</strong>
<span class="location-type">${this.currentLocation.type}</span>
${this.currentLocation.city ? `<span class="location-city">${this.currentLocation.city}</span>` : ''}
</div>
`;
} else {
display.innerHTML = '<span class="no-location">Kein Standort ausgewählt</span>';
}
});
}
async switchLocation(locationId) {
try {
const response = await fetch('/api/locations/switch', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ location_id: locationId })
});
const result = await response.json();
if (result.success) {
this.currentLocation = this.userLocations.find(loc => loc.id === locationId);
this.updateLocationDisplay();
// Seite neu laden um location-spezifische Daten zu aktualisieren
window.location.reload();
} else {
this.showNotification('Fehler beim Wechseln des Standorts', 'error');
}
} catch (error) {
console.error('Standort-Wechsel fehlgeschlagen:', error);
}
}
showLocationManager() {
const modal = document.createElement('div');
modal.className = 'location-manager-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>Standort-Verwaltung</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="location-list">
${this.renderLocationList()}
</div>
<div class="location-actions">
<button class="btn-create-location">Neuen Standort erstellen</button>
</div>
</div>
</div>
`;
document.body.appendChild(modal);
// Event handlers
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderLocationList() {
return this.userLocations.map(location => `
<div class="location-item">
<div class="location-details">
<h4>${location.name} (${location.code})</h4>
<p><strong>Typ:</strong> ${location.type}</p>
<p><strong>Adresse:</strong> ${location.address || 'Nicht angegeben'}</p>
<p><strong>Stadt:</strong> ${location.city || 'Nicht angegeben'}</p>
</div>
<div class="location-actions">
<button class="btn-edit-location" data-location-id="${location.id}">Bearbeiten</button>
<button class="btn-view-stats" data-location-id="${location.id}">Statistiken</button>
</div>
</div>
`).join('');
}
showLocationHierarchy() {
const modal = document.createElement('div');
modal.className = 'hierarchy-modal';
modal.innerHTML = `
<div class="modal-content">
<div class="modal-header">
<h2>Standort-Hierarchie</h2>
<button class="close-modal">&times;</button>
</div>
<div class="modal-body">
<div class="hierarchy-tree">
${this.renderHierarchyTree(this.locationHierarchy.locations || [])}
</div>
</div>
</div>
`;
document.body.appendChild(modal);
modal.querySelector('.close-modal').onclick = () => modal.remove();
modal.onclick = (e) => {
if (e.target === modal) modal.remove();
};
}
renderHierarchyTree(locations, level = 0) {
return locations.map(location => `
<div class="hierarchy-node" style="margin-left: ${level * 20}px;">
<div class="node-content">
<span class="node-icon">${this.getLocationTypeIcon(location.type)}</span>
<span class="node-name">${location.name}</span>
<span class="node-code">(${location.code})</span>
<span class="resource-count">${location.resource_count.printers || 0} Drucker</span>
</div>
${location.children && location.children.length > 0 ?
this.renderHierarchyTree(location.children, level + 1) : ''}
</div>
`).join('');
}
getLocationTypeIcon(type) {
const icons = {
'headquarters': '🏢',
'branch': '🏪',
'department': '🏬',
'floor': '🏢',
'room': '🚪',
'area': '📍'
};
return icons[type] || '📍';
}
showNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.className = `notification notification-${type}`;
notification.textContent = message;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 3000);
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', function() {
window.multiLocationManager = new MultiLocationManager();
});
"""

View File

@ -1,229 +0,0 @@
"""
Offline-Konfiguration für MYP-System
===================================
Konfiguriert das System für den Offline-Betrieb ohne Internetverbindung.
Stellt Fallback-Lösungen für internetabhängige Funktionen bereit.
"""
import os
import logging
from typing import Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger("offline_config")
# ===== OFFLINE-MODUS KONFIGURATION =====
OFFLINE_MODE = True # Produktionseinstellung - System läuft offline
# ===== OFFLINE-KOMPATIBILITÄT PRÜFUNGEN =====
def check_internet_connectivity() -> bool:
"""
Prüft ob eine Internetverbindung verfügbar ist.
Im Offline-Modus gibt immer False zurück.
Returns:
bool: True wenn Internet verfügbar, False im Offline-Modus
"""
if OFFLINE_MODE:
return False
# In einem echten Online-Modus könnte hier eine echte Prüfung stehen
try:
import socket
socket.create_connection(("8.8.8.8", 53), timeout=3)
return True
except OSError:
return False
def is_oauth_available() -> bool:
"""
Prüft ob OAuth-Funktionalität verfügbar ist.
Returns:
bool: False im Offline-Modus
"""
return not OFFLINE_MODE and check_internet_connectivity()
def is_email_sending_available() -> bool:
"""
Prüft ob E-Mail-Versand verfügbar ist.
Returns:
bool: False im Offline-Modus (nur Logging)
"""
return not OFFLINE_MODE and check_internet_connectivity()
def is_cdn_available() -> bool:
"""
Prüft ob CDN-Links verfügbar sind.
Returns:
bool: False im Offline-Modus (lokale Fallbacks verwenden)
"""
return not OFFLINE_MODE and check_internet_connectivity()
# ===== CDN FALLBACK-KONFIGURATION =====
CDN_FALLBACKS = {
# Chart.js CDN -> Lokale Datei
"https://cdnjs.cloudflare.com/ajax/libs/Chart.js/4.4.0/chart.min.js": "/static/js/charts/chart.min.js",
"https://cdn.jsdelivr.net/npm/chart.js": "/static/js/charts/chart.min.js",
# FontAwesome (bereits lokal verfügbar)
"https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css": "/static/fontawesome/css/all.min.css",
# Weitere CDN-Fallbacks können hier hinzugefügt werden
}
def get_local_asset_path(cdn_url: str) -> Optional[str]:
"""
Gibt den lokalen Pfad für eine CDN-URL zurück.
Args:
cdn_url: URL des CDN-Assets
Returns:
str: Lokaler Pfad oder None wenn kein Fallback verfügbar
"""
return CDN_FALLBACKS.get(cdn_url)
def replace_cdn_links(html_content: str) -> str:
"""
Ersetzt CDN-Links durch lokale Fallbacks im HTML-Inhalt.
Args:
html_content: HTML-Inhalt mit CDN-Links
Returns:
str: HTML-Inhalt mit lokalen Links
"""
if not OFFLINE_MODE:
return html_content
modified_content = html_content
for cdn_url, local_path in CDN_FALLBACKS.items():
if cdn_url in modified_content:
modified_content = modified_content.replace(cdn_url, local_path)
logger.info(f"🔄 CDN-Link ersetzt: {cdn_url} -> {local_path}")
return modified_content
# ===== SECURITY POLICY ANPASSUNGEN =====
def get_offline_csp_policy() -> Dict[str, List[str]]:
"""
Gibt CSP-Policy für Offline-Modus zurück.
Entfernt externe CDN-Domains aus der Policy.
Returns:
Dict: CSP-Policy ohne externe Domains
"""
if not OFFLINE_MODE:
# Online-Modus: Originale Policy mit CDNs
return {
"script-src": [
"'self'",
"'unsafe-inline'",
"'unsafe-eval'",
"https://cdn.jsdelivr.net",
"https://unpkg.com",
"https://cdnjs.cloudflare.com"
],
"style-src": [
"'self'",
"'unsafe-inline'",
"https://fonts.googleapis.com",
"https://cdn.jsdelivr.net"
],
"font-src": [
"'self'",
"https://fonts.gstatic.com"
]
}
else:
# Offline-Modus: Nur lokale Ressourcen
return {
"script-src": [
"'self'",
"'unsafe-inline'",
"'unsafe-eval'"
],
"style-src": [
"'self'",
"'unsafe-inline'"
],
"font-src": [
"'self'"
],
"img-src": [
"'self'",
"data:"
]
}
# ===== OFFLINE-MODUS HILFSFUNKTIONEN =====
def log_offline_mode_status():
"""Loggt den aktuellen Offline-Modus Status."""
if OFFLINE_MODE:
logger.info("🌐 System läuft im OFFLINE-MODUS")
logger.info(" ❌ OAuth deaktiviert")
logger.info(" ❌ E-Mail-Versand deaktiviert (nur Logging)")
logger.info(" ❌ CDN-Links werden durch lokale Dateien ersetzt")
logger.info(" ✅ Alle Kernfunktionen verfügbar")
else:
logger.info("🌐 System läuft im ONLINE-MODUS")
logger.info(" ✅ OAuth verfügbar")
logger.info(" ✅ E-Mail-Versand verfügbar")
logger.info(" ✅ CDN-Links verfügbar")
def get_feature_availability() -> Dict[str, bool]:
"""
Gibt die Verfügbarkeit verschiedener Features zurück.
Returns:
Dict: Feature-Verfügbarkeit
"""
return {
"oauth": is_oauth_available(),
"email_sending": is_email_sending_available(),
"cdn_resources": is_cdn_available(),
"offline_mode": OFFLINE_MODE,
"core_functionality": True, # Kernfunktionen immer verfügbar
"printer_control": True, # Drucker-Steuerung immer verfügbar
"job_management": True, # Job-Verwaltung immer verfügbar
"user_management": True # Benutzer-Verwaltung immer verfügbar
}
# ===== STARTUP-FUNKTIONEN =====
def initialize_offline_mode():
"""Initialisiert den Offline-Modus beim System-Start."""
log_offline_mode_status()
if OFFLINE_MODE:
logger.info("🔧 Initialisiere Offline-Modus-Anpassungen...")
# Prüfe ob lokale Chart.js verfügbar ist
chart_js_path = "static/js/charts/chart.min.js"
if not os.path.exists(chart_js_path):
logger.warning(f"⚠️ Lokale Chart.js nicht gefunden: {chart_js_path}")
logger.warning(" Diagramme könnten nicht funktionieren")
else:
logger.info(f"✅ Lokale Chart.js gefunden: {chart_js_path}")
# Prüfe weitere lokale Assets
fontawesome_path = "static/fontawesome/css/all.min.css"
if not os.path.exists(fontawesome_path):
logger.warning(f"⚠️ Lokale FontAwesome nicht gefunden: {fontawesome_path}")
else:
logger.info(f"✅ Lokale FontAwesome gefunden: {fontawesome_path}")
logger.info("✅ Offline-Modus erfolgreich initialisiert")
# Beim Import automatisch initialisieren
initialize_offline_mode()

View File

@ -1,243 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Test-Skript für Button-Funktionalitäten
Testet alle Buttons aus dem Selenium-Test auf echte Funktionalität
"""
import requests
import time
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class ButtonFunctionalityTester:
def __init__(self, base_url="http://127.0.0.1:5000"):
self.base_url = base_url
self.session = requests.Session()
self.driver = None
def setup_driver(self):
"""Selenium WebDriver einrichten"""
try:
self.driver = webdriver.Chrome()
self.driver.set_window_size(1696, 1066)
print("✅ WebDriver erfolgreich initialisiert")
except Exception as e:
print(f"❌ Fehler beim Initialisieren des WebDrivers: {e}")
def login(self, username="admin", password="admin"):
"""Anmeldung durchführen"""
try:
self.driver.get(f"{self.base_url}/auth/login")
# Warten bis Login-Formular geladen ist
username_field = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.NAME, "username"))
)
username_field.send_keys(username)
self.driver.find_element(By.NAME, "password").send_keys(password)
self.driver.find_element(By.XPATH, "//button[@type='submit']").click()
# Warten bis Dashboard geladen ist
WebDriverWait(self.driver, 10).until(
EC.url_contains("/dashboard")
)
print("✅ Erfolgreich angemeldet")
return True
except Exception as e:
print(f"❌ Fehler bei der Anmeldung: {e}")
return False
def test_button_functionality(self, button_selector, button_name, expected_action=""):
"""Teste einen einzelnen Button auf Funktionalität"""
try:
print(f"\n🔍 Teste Button: {button_name} ({button_selector})")
# Button finden
button = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, button_selector))
)
# Ursprünglichen Zustand erfassen
original_url = self.driver.current_url
original_text = button.text if button.text else "Kein Text"
print(f" 📍 Button gefunden: '{original_text}'")
# Button klicken
button.click()
print(f" 👆 Button geklickt")
# Kurz warten für Reaktion
time.sleep(1)
# Reaktion prüfen
reactions = []
# URL-Änderung prüfen
if self.driver.current_url != original_url:
reactions.append(f"URL-Änderung: {self.driver.current_url}")
# Modal-Fenster prüfen
try:
modal = self.driver.find_element(By.CSS_SELECTOR, ".fixed.inset-0")
if modal.is_displayed():
reactions.append("Modal-Fenster geöffnet")
except:
pass
# Loading-Spinner prüfen
try:
spinner = self.driver.find_element(By.CSS_SELECTOR, ".animate-spin")
if spinner.is_displayed():
reactions.append("Loading-Animation aktiv")
except:
pass
# Toast-Benachrichtigung prüfen
try:
toast = self.driver.find_element(By.CSS_SELECTOR, ".fixed.top-4.right-4")
if toast.is_displayed():
reactions.append(f"Toast-Nachricht: {toast.text}")
except:
pass
# Button-Text-Änderung prüfen
new_text = button.text if button.text else "Kein Text"
if new_text != original_text:
reactions.append(f"Text-Änderung: '{original_text}''{new_text}'")
# Ergebnis ausgeben
if reactions:
print(f" ✅ Reaktionen gefunden:")
for reaction in reactions:
print(f" - {reaction}")
return True
else:
print(f" ⚠️ Keine sichtbare Reaktion erkannt")
return False
except Exception as e:
print(f" ❌ Fehler beim Testen: {e}")
return False
def test_all_buttons(self):
"""Teste alle Buttons aus dem Selenium-Test"""
if not self.setup_driver():
return
if not self.login():
return
# Button-Test-Plan basierend auf Selenium-Test
button_tests = [
# Dashboard-Seite (Startseite)
{
"page": f"{self.base_url}/",
"buttons": [
(".mb-8 > .btn-primary", "Haupt-CTA Button"),
(".btn-primary > span", "CTA Button Span")
]
},
# Dashboard-Seite
{
"page": f"{self.base_url}/dashboard",
"buttons": [
("#refreshDashboard > span", "Dashboard Aktualisieren")
]
},
# Drucker-Seite
{
"page": f"{self.base_url}/printers",
"buttons": [
("#refresh-button > span", "Drucker Aktualisieren"),
("#maintenance-toggle > span", "Wartungsmodus Toggle")
]
},
# Jobs-Seite
{
"page": f"{self.base_url}/jobs",
"buttons": [
("#batch-toggle > span", "Mehrfachauswahl Toggle"),
("#refresh-button > span", "Jobs Aktualisieren")
]
},
# Admin-Seite
{
"page": f"{self.base_url}/admin",
"buttons": [
("#analytics-btn", "Analytics Button"),
("#maintenance-btn", "Wartung Button"),
("#system-status-btn", "System Status Button"),
("#add-user-btn", "Benutzer hinzufügen")
]
}
]
results = {"total": 0, "working": 0, "broken": 0}
print("🚀 Starte umfassenden Button-Funktionalitäts-Test...\n")
for test_group in button_tests:
print(f"📄 Navigiere zu Seite: {test_group['page']}")
try:
self.driver.get(test_group["page"])
time.sleep(2) # Seite laden lassen
for selector, name in test_group["buttons"]:
results["total"] += 1
if self.test_button_functionality(selector, name):
results["working"] += 1
else:
results["broken"] += 1
except Exception as e:
print(f"❌ Fehler beim Laden der Seite {test_group['page']}: {e}")
# Zusammenfassung
print(f"\n{'='*60}")
print(f"📊 TEST-ZUSAMMENFASSUNG")
print(f"{'='*60}")
print(f"Getestete Buttons gesamt: {results['total']}")
print(f"✅ Funktional: {results['working']}")
print(f"❌ Nicht funktional: {results['broken']}")
success_rate = (results['working'] / results['total']) * 100 if results['total'] > 0 else 0
print(f"📈 Erfolgsrate: {success_rate:.1f}%")
if success_rate >= 90:
print("🎉 AUSGEZEICHNET! Fast alle Buttons funktionieren korrekt.")
elif success_rate >= 75:
print("✅ GUT! Die meisten Buttons funktionieren korrekt.")
elif success_rate >= 50:
print("⚠️ BEFRIEDIGEND! Einige Buttons benötigen noch Verbesserungen.")
else:
print("❌ VERBESSERUNG ERFORDERLICH! Viele Buttons haben keine Funktionalität.")
def cleanup(self):
"""Aufräumen"""
if self.driver:
self.driver.quit()
print("🧹 WebDriver beendet")
def main():
"""Hauptfunktion"""
tester = ButtonFunctionalityTester()
try:
tester.test_all_buttons()
finally:
tester.cleanup()
if __name__ == "__main__":
main()

View File

@ -1,175 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
P110-TAPO-TEST - Speziell für TP-Link Tapo P110-Steckdosen
Testet verschiedene Versionen des PyP100-Moduls
"""
import sys
import time
import socket
import subprocess
from datetime import datetime
# Anmeldedaten
TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
TAPO_PASSWORD = "744563017196"
# Standard-IP-Adressen zum Testen (anpassen an tatsächliche IPs)
TEST_IPS = [
"192.168.0.103", # Diese IPs waren erreichbar im vorherigen Test
"192.168.0.104"
]
def log(message):
"""Logge eine Nachricht mit Zeitstempel"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def check_connection(ip, port=80, timeout=1):
"""Prüft eine TCP-Verbindung"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except:
return False
def install_package(package):
"""Installiert ein Python-Paket"""
try:
log(f"Installiere {package}...")
subprocess.run([sys.executable, "-m", "pip", "install", package, "--force-reinstall"], check=True)
log(f"{package} erfolgreich installiert")
return True
except Exception as e:
log(f"❌ Fehler bei Installation von {package}: {e}")
return False
def test_p110_connection():
"""Testet verschiedene Möglichkeiten, um mit P110-Steckdosen zu kommunizieren"""
log("🚀 TAPO P110 TEST - STARTER")
log(f"👤 Benutzername: {TAPO_USERNAME}")
log(f"🔑 Passwort: {TAPO_PASSWORD}")
# Verfügbare Module testen
log("\n1⃣ SCHRITT 1: Teste verfügbare Module")
try:
from PyP100 import PyP110
log("✅ PyP100 Modul gefunden")
except ImportError:
log("❌ PyP100 Modul nicht gefunden")
install_package("PyP100==0.1.2")
try:
from PyP100 import PyP110
log("✅ PyP100 Modul jetzt installiert")
except ImportError:
log("❌ Konnte PyP100 nicht importieren")
return
# Erreichbare Steckdosen finden
log("\n2⃣ SCHRITT 2: Suche erreichbare IPs")
available_ips = []
for ip in TEST_IPS:
if check_connection(ip):
log(f"✅ IP {ip} ist erreichbar")
available_ips.append(ip)
else:
log(f"❌ IP {ip} nicht erreichbar")
if not available_ips:
log("❌ Keine erreichbaren IPs gefunden!")
return
# P110-Verbindung testen
log("\n3⃣ SCHRITT 3: Teste PyP100 Bibliothek")
for ip in available_ips:
try:
log(f"🔄 Verbinde zu Steckdose {ip} mit PyP100.PyP110...")
# Neue Instanz erstellen
from PyP100 import PyP110
p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD)
# Handshake und Login
log(" Handshake...")
p110.handshake()
log(" Login...")
p110.login()
# Geräteinformationen abrufen
log(" Geräteinformationen abrufen...")
device_info = p110.getDeviceInfo()
# Erfolg!
log(f"✅ ERFOLG! Steckdose {ip} gefunden")
log(f" Name: {device_info.get('nickname', 'Unbekannt')}")
log(f" Status: {'Eingeschaltet' if device_info.get('device_on', False) else 'Ausgeschaltet'}")
# Ein-/Ausschalten testen
if "--toggle" in sys.argv:
current_state = device_info.get('device_on', False)
if current_state:
log(" Schalte AUS...")
p110.turnOff()
else:
log(" Schalte EIN...")
p110.turnOn()
time.sleep(1)
# Status prüfen
device_info = p110.getDeviceInfo()
new_state = device_info.get('device_on', False)
log(f" Neuer Status: {'Eingeschaltet' if new_state else 'Ausgeschaltet'}")
return True
except Exception as e:
log(f"❌ Fehler bei Verbindung zu {ip}: {e}")
# Alternative Bibliothek testen
log("\n4⃣ SCHRITT 4: Teste PyP100 mit alternativer Version")
if install_package("pytapo==1.1.2"):
try:
import pytapo
from pytapo.tapo import Tapo
for ip in available_ips:
try:
log(f"🔄 Verbinde zu Steckdose {ip} mit pytapo...")
# Neue Verbindung
tapo = Tapo(ip, TAPO_USERNAME, TAPO_PASSWORD)
# Geräteinformationen abrufen
device_info = tapo.get_device_info()
# Erfolg!
log(f"✅ ERFOLG mit pytapo! Steckdose {ip} gefunden")
log(f" Name: {device_info.get('nickname', 'Unbekannt')}")
return True
except Exception as e:
log(f"❌ Fehler bei pytapo-Verbindung zu {ip}: {e}")
except Exception as e:
log(f"❌ Fehler beim Import von pytapo: {e}")
log("\n❌ Keine funktionierenden Tapo-Steckdosen gefunden!")
log("Bitte überprüfen Sie die Anmeldedaten und IP-Adressen")
if __name__ == "__main__":
print("\n======= TAPO P110 TEST =======\n")
test_p110_connection()
print("\n======= TEST BEENDET =======\n")

View File

@ -1,212 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DIREKT-TEST für TP-Link Tapo P110-Steckdosen
Umgeht Ping-Befehle und testet direkte TCP-Verbindung
"""
import sys
import os
import socket
import time
from datetime import datetime
# Anmeldedaten für Tapo-Steckdosen
TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
TAPO_PASSWORD = "744563017196A"
# Standard-IPs für Tapo-Steckdosen
# (falls nicht verfügbar, passen Sie diese an die tatsächlichen IPs in Ihrem Netzwerk an)
TAPO_IPS = [
# Typische IP-Bereiche
"192.168.1.100",
"192.168.1.101",
"192.168.1.102",
"192.168.1.103",
"192.168.1.104",
"192.168.1.105",
"192.168.0.100",
"192.168.0.101",
"192.168.0.102",
"192.168.0.103",
"192.168.0.104",
"192.168.0.105",
# Mercedes-Benz Netzwerk spezifisch
"10.0.0.100",
"10.0.0.101",
"10.0.0.102",
"10.0.0.103",
"10.0.0.104",
"10.0.0.105",
# Zusätzliche mögliche IPs
"192.168.178.100",
"192.168.178.101",
"192.168.178.102",
"192.168.178.103",
"192.168.178.104",
"192.168.178.105",
]
def log_message(message, level="INFO"):
"""Logge eine Nachricht mit Zeitstempel"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def check_tcp_connection(host, port=80, timeout=1):
"""
Prüft ob eine TCP-Verbindung zu einem Host und Port möglich ist.
Vermeidet Ping und charmap-Probleme.
Args:
host: Hostname oder IP-Adresse
port: TCP-Port (Standard: 80)
timeout: Timeout in Sekunden
Returns:
bool: True wenn Verbindung erfolgreich
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
def test_tapo_connection():
"""
Testet die Verbindung zu TP-Link Tapo P110-Steckdosen.
"""
log_message("🔄 Überprüfe ob PyP100-Modul installiert ist...")
try:
from PyP100 import PyP110
log_message("✅ PyP100-Modul erfolgreich importiert")
except ImportError:
log_message("❌ PyP100-Modul nicht installiert", "ERROR")
log_message(" Installiere jetzt mit: pip install PyP100==0.1.2")
try:
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "PyP100==0.1.2"], check=True)
log_message("✅ PyP100-Modul erfolgreich installiert")
# Erneut importieren
from PyP100 import PyP110
except Exception as e:
log_message(f"❌ Fehler bei Installation von PyP100: {str(e)}", "ERROR")
log_message(" Bitte installieren Sie manuell: pip install PyP100==0.1.2")
return
log_message("🔍 Starte Test für Tapo-Steckdosen...")
log_message(f"🔐 Anmeldedaten: {TAPO_USERNAME} / {TAPO_PASSWORD}")
successful_connections = 0
found_ips = []
for ip in TAPO_IPS:
log_message(f"🔄 Teste IP-Adresse: {ip}")
# TCP-Verbindungstest für Grundkonnektivität (Port 80 für HTTP)
conn_success = check_tcp_connection(ip, port=80)
if not conn_success:
# Alternativ Port 443 testen für HTTPS
conn_success = check_tcp_connection(ip, port=443)
if not conn_success:
log_message(f" ❌ IP {ip} nicht erreichbar (Verbindung fehlgeschlagen)")
continue
log_message(f" ✅ IP {ip} ist erreichbar (TCP-Verbindung erfolgreich)")
# Tapo-Verbindung testen
try:
log_message(f" 🔄 Verbinde zu Tapo-Steckdose {ip}...")
p110 = PyP110.P110(ip, TAPO_USERNAME, TAPO_PASSWORD)
p110.handshake() # Authentifizierung
p110.login() # Login
# Geräteinformationen abrufen
device_info = p110.getDeviceInfo()
# Status abrufen
is_on = device_info.get('device_on', False)
nickname = device_info.get('nickname', "Unbekannt")
log_message(f" ✅ Verbindung zu Tapo-Steckdose '{nickname}' ({ip}) erfolgreich")
log_message(f" 📱 Gerätename: {nickname}")
log_message(f" ⚡ Status: {'Eingeschaltet' if is_on else 'Ausgeschaltet'}")
if 'on_time' in device_info:
on_time = device_info.get('on_time', 0)
hours, minutes = divmod(on_time // 60, 60)
log_message(f" ⏱️ Betriebszeit: {hours}h {minutes}m")
successful_connections += 1
found_ips.append(ip)
# Steckdose testen: EIN/AUS
if len(sys.argv) > 1 and sys.argv[1] == '--toggle':
if is_on:
log_message(f" 🔄 Schalte Steckdose {nickname} AUS...")
p110.turnOff()
log_message(f" ✅ Steckdose ausgeschaltet")
else:
log_message(f" 🔄 Schalte Steckdose {nickname} EIN...")
p110.turnOn()
log_message(f" ✅ Steckdose eingeschaltet")
# Kurze Pause
time.sleep(1)
# Status erneut abrufen
device_info = p110.getDeviceInfo()
is_on = device_info.get('device_on', False)
log_message(f" ⚡ Neuer Status: {'Eingeschaltet' if is_on else 'Ausgeschaltet'}")
except Exception as e:
log_message(f" ❌ Verbindung zu Tapo-Steckdose {ip} fehlgeschlagen: {str(e)}", "ERROR")
# Zusammenfassung
log_message("\n📊 Zusammenfassung:")
log_message(f" Getestete IPs: {len(TAPO_IPS)}")
log_message(f" Gefundene Tapo-Steckdosen: {successful_connections}")
if successful_connections > 0:
log_message("✅ Tapo-Steckdosen erfolgreich gefunden und getestet!")
log_message(f"📝 Gefundene IPs: {found_ips}")
# Ausgabe für Konfiguration
log_message("\n🔧 Konfiguration für settings.py:")
log_message(f"""
# TP-Link Tapo Standard-Anmeldedaten
TAPO_USERNAME = "{TAPO_USERNAME}"
TAPO_PASSWORD = "{TAPO_PASSWORD}"
# Automatische Steckdosen-Erkennung aktivieren
TAPO_AUTO_DISCOVERY = True
# Standard-Steckdosen-IPs
DEFAULT_TAPO_IPS = {found_ips}
""")
else:
log_message("❌ Keine Tapo-Steckdosen gefunden!", "ERROR")
log_message(" Bitte überprüfen Sie die IP-Adressen und Anmeldedaten")
# Fehlerbehebungs-Tipps
log_message("\n🔧 Fehlerbehebungs-Tipps:")
log_message("1. Stellen Sie sicher, dass die Steckdosen mit dem WLAN verbunden sind")
log_message("2. Prüfen Sie die IP-Adressen in der Tapo-App oder im Router")
log_message("3. Stellen Sie sicher, dass die Anmeldedaten korrekt sind")
log_message("4. Prüfen Sie ob die Steckdosen über die Tapo-App erreichbar sind")
log_message("5. Führen Sie einen Neustart der Steckdosen durch (aus- und wieder einstecken)")
if __name__ == "__main__":
print("\n====== TAPO P110 DIREKT-TEST (OHNE PING) ======\n")
test_tapo_connection()
print("\n====== TEST ABGESCHLOSSEN ======\n")

View File

@ -1,132 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SOFORT-TEST für TP-Link Tapo P110-Steckdosen
Nutzt direkt PyP100 mit hardkodierten Anmeldedaten
"""
import os
import sys
import time
from datetime import datetime
# TAPO Anmeldedaten direkt hardkodiert (wie in den funktionierenden Versionen)
os.environ["TAPO_USERNAME"] = "till.tomczak@mercedes-benz.com"
os.environ["TAPO_PASSWORD"] = "744563017196A" # Das 'A' am Ende ist wichtig
# IPs der Steckdosen
TAPO_IPS = [
"192.168.0.103",
"192.168.0.104",
"192.168.0.100",
"192.168.0.101",
"192.168.0.102"
]
def log(msg):
"""Protokolliert eine Nachricht mit Zeitstempel"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {msg}")
def test_connection():
"""Teste Verbindung zu den Steckdosen"""
log("🔄 Teste PyP100-Import...")
try:
from PyP100 import PyP100
log("✅ PyP100-Modul erfolgreich importiert")
except ImportError:
log("❌ PyP100-Modul nicht gefunden. Installiere es...")
try:
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "PyP100==0.0.12"], check=True)
from PyP100 import PyP100
log("✅ PyP100-Modul installiert")
except Exception as e:
log(f"❌ Fehler bei Installation: {str(e)}")
return False
# Anmeldedaten aus Umgebungsvariablen lesen
username = os.environ.get("TAPO_USERNAME")
password = os.environ.get("TAPO_PASSWORD")
log(f"👤 Benutzername: {username}")
log(f"🔑 Passwort: {password}")
# Teste jede IP
success = False
for ip in TAPO_IPS:
log(f"🔄 Teste Steckdose mit IP: {ip}")
try:
# Wichtig: Verwende PyP100 (nicht PyP110) wie in den funktionierenden Versionen
p100 = PyP100.P100(ip, username, password)
# Handshake und Login
log(" 🔄 Handshake...")
p100.handshake()
log(" 🔄 Login...")
p100.login()
# Status abfragen
log(" 🔄 Status abfragen...")
device_info = p100.getDeviceInfo()
# Erfolg!
state = "Eingeschaltet" if device_info.get("device_on", False) else "Ausgeschaltet"
log(f"✅ ERFOLG! Steckdose {ip} erfolgreich verbunden")
log(f" 📱 Name: {device_info.get('nickname', 'Unbekannt')}")
log(f" ⚡ Status: {state}")
# Steckdose ein-/ausschalten wenn gewünscht
if "--toggle" in sys.argv:
if device_info.get("device_on", False):
log(" 🔄 Schalte Steckdose AUS...")
p100.turnOff()
else:
log(" 🔄 Schalte Steckdose EIN...")
p100.turnOn()
time.sleep(1)
# Neuen Status abrufen
device_info = p100.getDeviceInfo()
state = "Eingeschaltet" if device_info.get("device_on", False) else "Ausgeschaltet"
log(f" ⚡ Neuer Status: {state}")
success = True
# Konfiguration für settings.py ausgeben
log("\n✅ KONFIGURATION FÜR SETTINGS.PY:")
log(f"""
# TP-Link Tapo Standard-Anmeldedaten
TAPO_USERNAME = "{username}"
TAPO_PASSWORD = "{password}"
# Standard-Steckdosen-IPs
DEFAULT_TAPO_IPS = ["{ip}"]
""")
# Nur die erste erfolgreiche Steckdose testen
break
except Exception as e:
log(f"❌ Fehler bei Steckdose {ip}: {str(e)}")
if not success:
log("\n❌ Keine Tapo-Steckdose konnte verbunden werden!")
log("Prüfen Sie folgende mögliche Ursachen:")
log("1. Steckdosen sind nicht eingesteckt oder mit dem WLAN verbunden")
log("2. IP-Adressen sind falsch")
log("3. Anmeldedaten sind falsch (prüfen Sie das 'A' am Ende des Passworts)")
log("4. Netzwerkprobleme verhindern den Zugriff")
return success
if __name__ == "__main__":
print("\n====== TAPO P110 SOFORT-TEST ======\n")
test_connection()
print("\n====== TEST BEENDET ======\n")

View File

@ -1,295 +0,0 @@
#!/usr/bin/env python3
"""
MYP Platform - Requirements Update Script
Aktualisiert die Requirements basierend auf tatsächlich verwendeten Imports
"""
import os
import sys
import subprocess
import ast
import importlib.util
from pathlib import Path
from typing import Set, List, Dict
def get_imports_from_file(file_path: Path) -> Set[str]:
"""Extrahiert alle Import-Statements aus einer Python-Datei."""
imports = set()
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
imports.add(node.module.split('.')[0])
except Exception as e:
print(f"Fehler beim Parsen von {file_path}: {e}")
return imports
def get_all_imports(project_root: Path) -> Set[str]:
"""Sammelt alle Imports aus dem Projekt."""
all_imports = set()
# Wichtige Dateien analysieren
important_files = [
'app.py',
'models.py',
'utils/rate_limiter.py',
'utils/job_scheduler.py',
'utils/queue_manager.py',
'utils/ssl_manager.py',
'utils/security.py',
'utils/permissions.py',
'utils/analytics.py',
'utils/template_helpers.py',
'utils/logging_config.py'
]
for file_path in important_files:
full_path = project_root / file_path
if full_path.exists():
imports = get_imports_from_file(full_path)
all_imports.update(imports)
print(f"✓ Analysiert: {file_path} ({len(imports)} Imports)")
return all_imports
def get_package_mapping() -> Dict[str, str]:
"""Mapping von Import-Namen zu PyPI-Paketnamen."""
return {
'flask': 'Flask',
'flask_login': 'Flask-Login',
'flask_wtf': 'Flask-WTF',
'sqlalchemy': 'SQLAlchemy',
'werkzeug': 'Werkzeug',
'bcrypt': 'bcrypt',
'cryptography': 'cryptography',
'PyP100': 'PyP100',
'redis': 'redis',
'requests': 'requests',
'jinja2': 'Jinja2',
'markupsafe': 'MarkupSafe',
'itsdangerous': 'itsdangerous',
'psutil': 'psutil',
'click': 'click',
'blinker': 'blinker',
'pywin32': 'pywin32',
'pytest': 'pytest',
'gunicorn': 'gunicorn'
}
def get_current_versions() -> Dict[str, str]:
"""Holt die aktuell installierten Versionen."""
versions = {}
try:
result = subprocess.run(['pip', 'list', '--format=freeze'],
capture_output=True, text=True,
encoding='utf-8', errors='replace')
for line in result.stdout.strip().split('\n'):
if '==' in line:
package, version = line.split('==', 1)
versions[package.lower()] = version
except Exception as e:
print(f"Fehler beim Abrufen der Versionen: {e}")
return versions
def check_package_availability(package: str, version: str = None) -> bool:
"""Prüft, ob ein Paket in der angegebenen Version verfügbar ist."""
try:
if version:
cmd = ['pip', 'index', 'versions', package]
else:
cmd = ['pip', 'show', package]
result = subprocess.run(cmd, capture_output=True, text=True,
encoding='utf-8', errors='replace')
return result.returncode == 0
except Exception:
return False
def generate_requirements(imports: Set[str], versions: Dict[str, str]) -> List[str]:
"""Generiert die Requirements-Liste."""
package_mapping = get_package_mapping()
requirements = []
# Standard-Bibliotheken, die nicht installiert werden müssen
stdlib_modules = {
'os', 'sys', 'logging', 'atexit', 'datetime', 'time', 'subprocess',
'json', 'signal', 'threading', 'functools', 'typing', 'contextlib',
'secrets', 'hashlib', 'calendar', 'random', 'socket', 'ipaddress',
'enum', 'dataclasses', 'concurrent', 'collections'
}
# Lokale Module ausschließen
local_modules = {
'models', 'utils', 'config', 'blueprints'
}
# Nur externe Pakete berücksichtigen
external_imports = imports - stdlib_modules - local_modules
for import_name in sorted(external_imports):
package_name = package_mapping.get(import_name, import_name)
# Version aus installierten Paketen holen
version = versions.get(package_name.lower())
if version and check_package_availability(package_name, version):
if package_name == 'pywin32':
requirements.append(f"{package_name}=={version}; sys_platform == \"win32\"")
else:
requirements.append(f"{package_name}=={version}")
else:
print(f"⚠️ Paket {package_name} nicht gefunden oder Version unbekannt")
return requirements
def write_requirements_file(requirements: List[str], output_file: Path):
"""Schreibt die Requirements in eine Datei."""
header = """# MYP Platform - Python Dependencies
# Basierend auf tatsächlich verwendeten Imports in app.py
# Automatisch generiert am: {date}
# Installiere mit: pip install -r requirements.txt
# ===== CORE FLASK FRAMEWORK =====
# Direkt in app.py verwendet
{flask_requirements}
# ===== DATENBANK =====
# SQLAlchemy für Datenbankoperationen (models.py, app.py)
{db_requirements}
# ===== SICHERHEIT UND AUTHENTIFIZIERUNG =====
# Werkzeug für Passwort-Hashing und Utilities (app.py)
{security_requirements}
# ===== SMART PLUG STEUERUNG =====
# PyP100 für TP-Link Tapo Smart Plugs (utils/job_scheduler.py)
{smartplug_requirements}
# ===== RATE LIMITING UND CACHING =====
# Redis für Rate Limiting (utils/rate_limiter.py) - optional
{cache_requirements}
# ===== HTTP REQUESTS =====
# Requests für HTTP-Anfragen (utils/queue_manager.py, utils/debug_drucker_erkennung.py)
{http_requirements}
# ===== TEMPLATE ENGINE =====
# Jinja2 und MarkupSafe (automatisch mit Flask installiert, aber explizit für utils/template_helpers.py)
{template_requirements}
# ===== SYSTEM MONITORING =====
# psutil für System-Monitoring (utils/debug_utils.py, utils/debug_cli.py)
{monitoring_requirements}
# ===== ZUSÄTZLICHE CORE ABHÄNGIGKEITEN =====
# Click für CLI-Kommandos (automatisch mit Flask)
{core_requirements}
# ===== WINDOWS-SPEZIFISCHE ABHÄNGIGKEITEN =====
# Nur für Windows-Systeme erforderlich
{windows_requirements}
# ===== OPTIONAL: ENTWICKLUNG UND TESTING =====
# Nur für Entwicklungsumgebung
{dev_requirements}
# ===== OPTIONAL: PRODUKTIONS-SERVER =====
# Gunicorn für Produktionsumgebung
{prod_requirements}
"""
# Requirements kategorisieren
flask_reqs = [r for r in requirements if any(x in r.lower() for x in ['flask'])]
db_reqs = [r for r in requirements if 'SQLAlchemy' in r]
security_reqs = [r for r in requirements if any(x in r for x in ['Werkzeug', 'bcrypt', 'cryptography'])]
smartplug_reqs = [r for r in requirements if 'PyP100' in r]
cache_reqs = [r for r in requirements if 'redis' in r]
http_reqs = [r for r in requirements if 'requests' in r]
template_reqs = [r for r in requirements if any(x in r for x in ['Jinja2', 'MarkupSafe', 'itsdangerous'])]
monitoring_reqs = [r for r in requirements if 'psutil' in r]
core_reqs = [r for r in requirements if any(x in r for x in ['click', 'blinker'])]
windows_reqs = [r for r in requirements if 'sys_platform' in r]
from datetime import datetime
content = header.format(
date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
flask_requirements='\n'.join(flask_reqs) or '# Keine Flask-spezifischen Requirements',
db_requirements='\n'.join(db_reqs) or '# Keine Datenbank-Requirements',
security_requirements='\n'.join(security_reqs) or '# Keine Sicherheits-Requirements',
smartplug_requirements='\n'.join(smartplug_reqs) or '# Keine Smart Plug Requirements',
cache_requirements='\n'.join(cache_reqs) or '# Keine Cache-Requirements',
http_requirements='\n'.join(http_reqs) or '# Keine HTTP-Requirements',
template_requirements='\n'.join(template_reqs) or '# Keine Template-Requirements',
monitoring_requirements='\n'.join(monitoring_reqs) or '# Keine Monitoring-Requirements',
core_requirements='\n'.join(core_reqs) or '# Keine Core-Requirements',
windows_requirements='\n'.join(windows_reqs) or '# Keine Windows-Requirements',
dev_requirements='pytest==8.3.4; extra == "dev"\npytest-cov==6.0.0; extra == "dev"',
prod_requirements='gunicorn==23.0.0; extra == "prod"'
)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
def main():
"""Hauptfunktion."""
print("🔄 MYP Platform Requirements Update")
print("=" * 50)
# Projekt-Root ermitteln
project_root = Path(__file__).parent
print(f"📁 Projekt-Verzeichnis: {project_root}")
# Imports sammeln
print("\n📋 Sammle Imports aus wichtigen Dateien...")
imports = get_all_imports(project_root)
print(f"\n📦 Gefundene externe Imports: {len(imports)}")
for imp in sorted(imports):
print(f" - {imp}")
# Aktuelle Versionen abrufen
print("\n🔍 Prüfe installierte Versionen...")
versions = get_current_versions()
# Requirements generieren
print("\n⚙️ Generiere Requirements...")
requirements = generate_requirements(imports, versions)
# Requirements-Datei schreiben
output_file = project_root / 'requirements.txt'
write_requirements_file(requirements, output_file)
print(f"\n✅ Requirements aktualisiert: {output_file}")
print(f"📊 {len(requirements)} Pakete in requirements.txt")
# Zusammenfassung
print("\n📋 Generierte Requirements:")
for req in requirements:
print(f" - {req}")
print("\n🎉 Requirements-Update abgeschlossen!")
print("\nNächste Schritte:")
print("1. pip install -r requirements.txt")
print("2. Anwendung testen")
print("3. requirements-dev.txt und requirements-prod.txt bei Bedarf anpassen")
if __name__ == "__main__":
main()