🎉 Feature: Implement MASSIVE KONSOLIDIERUNG PLAN in backend utils
This commit is contained in:
509
backend/utils/hardware_integration.py
Normal file
509
backend/utils/hardware_integration.py
Normal file
@@ -0,0 +1,509 @@
|
||||
#!/usr/bin/env python3.11
|
||||
"""
|
||||
Hardware Integration - Konsolidierte Hardware-Steuerung
|
||||
====================================================
|
||||
|
||||
Konsolidiert alle Hardware-Integration-Funktionalitäten:
|
||||
- TP-Link Tapo Controller (tapo_controller.py)
|
||||
- Printer Monitor (printer_monitor.py)
|
||||
|
||||
Migration: 2 Dateien → 1 Datei
|
||||
Autor: MYP Team - Konsolidierung für IHK-Projektarbeit
|
||||
Datum: 2025-06-09
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
import platform
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
# Logger
|
||||
hardware_logger = get_logger("hardware_integration")
|
||||
|
||||
# ===== TAPO SMART PLUG CONTROLLER =====
|
||||
|
||||
class TapoController:
|
||||
"""TP-Link Tapo Smart Plug Controller"""
|
||||
|
||||
def __init__(self):
|
||||
self.default_username = "till.tomczak@mercedes-benz.com"
|
||||
self.default_password = "744563017196A"
|
||||
hardware_logger.info("🔌 Tapo Controller initialisiert")
|
||||
|
||||
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool:
|
||||
"""Schaltet eine Tapo-Steckdose ein oder aus"""
|
||||
try:
|
||||
from PyP100 import PyP110
|
||||
|
||||
username = username or self.default_username
|
||||
password = password or self.default_password
|
||||
|
||||
p110 = PyP110.P110(ip, username, password)
|
||||
p110.handshake()
|
||||
p110.login()
|
||||
|
||||
if state:
|
||||
p110.turnOn()
|
||||
hardware_logger.info(f"✅ Tapo {ip} eingeschaltet")
|
||||
else:
|
||||
p110.turnOff()
|
||||
hardware_logger.info(f"❌ Tapo {ip} ausgeschaltet")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Tapo-Steuerung fehlgeschlagen {ip}: {str(e)}")
|
||||
return False
|
||||
|
||||
def check_outlet_status(self, ip: str, username: str = None, password: str = None,
|
||||
printer_id: int = None) -> Tuple[bool, str]:
|
||||
"""Prüft den Status einer Tapo-Steckdose"""
|
||||
try:
|
||||
from PyP100 import PyP110
|
||||
|
||||
username = username or self.default_username
|
||||
password = password or self.default_password
|
||||
|
||||
p110 = PyP110.P110(ip, username, password)
|
||||
p110.handshake()
|
||||
p110.login()
|
||||
|
||||
device_info = p110.getDeviceInfo()
|
||||
is_on = device_info.get('device_on', False)
|
||||
status = "Ein" if is_on else "Aus"
|
||||
|
||||
return is_on, status
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Tapo-Status-Prüfung fehlgeschlagen {ip}: {str(e)}")
|
||||
return False, "Fehler"
|
||||
|
||||
# ===== PRINTER MONITOR =====
|
||||
|
||||
class PrinterMonitor:
|
||||
"""3D-Drucker Monitoring System"""
|
||||
|
||||
def __init__(self):
|
||||
self.tapo_controller = TapoController()
|
||||
self.status_cache = {}
|
||||
self.cache_timeout = timedelta(minutes=2)
|
||||
self.last_cache_update = None
|
||||
hardware_logger.info("🖨️ Printer Monitor initialisiert")
|
||||
|
||||
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
|
||||
"""Holt Live-Status aller Drucker"""
|
||||
try:
|
||||
# Cache-Prüfung
|
||||
if (use_session_cache and self.last_cache_update and
|
||||
datetime.now() - self.last_cache_update < self.cache_timeout):
|
||||
return self.status_cache
|
||||
|
||||
# Fresh status fetch
|
||||
status = self._fetch_live_printer_status()
|
||||
|
||||
# Cache aktualisieren
|
||||
self.status_cache = status
|
||||
self.last_cache_update = datetime.now()
|
||||
|
||||
return status
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Live-Status-Abfrage fehlgeschlagen: {str(e)}")
|
||||
return {}
|
||||
|
||||
def _fetch_live_printer_status(self) -> Dict[int, Dict]:
|
||||
"""Holt tatsächlichen Live-Status"""
|
||||
try:
|
||||
from models import get_cached_session, Printer
|
||||
|
||||
status_dict = {}
|
||||
|
||||
with get_cached_session() as session:
|
||||
printers = session.query(Printer).filter(Printer.active == True).all()
|
||||
|
||||
for printer in printers:
|
||||
status = self._check_single_printer_status(printer)
|
||||
status_dict[printer.id] = status
|
||||
|
||||
hardware_logger.debug(f"Status für {len(status_dict)} Drucker abgerufen")
|
||||
return status_dict
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Drucker-Status-Abfrage fehlgeschlagen: {str(e)}")
|
||||
return {}
|
||||
|
||||
def _check_single_printer_status(self, printer) -> Dict:
|
||||
"""Prüft den Status eines einzelnen Druckers"""
|
||||
status = {
|
||||
'printer_id': printer.id,
|
||||
'name': printer.name,
|
||||
'ip_address': printer.ip_address,
|
||||
'plug_ip': printer.plug_ip,
|
||||
'ping_status': 'unknown',
|
||||
'outlet_status': 'unknown',
|
||||
'outlet_on': False,
|
||||
'last_checked': datetime.now(),
|
||||
'error_message': None
|
||||
}
|
||||
|
||||
try:
|
||||
# Ping-Test
|
||||
if printer.ip_address:
|
||||
ping_ok = self._ping_address(printer.ip_address)
|
||||
status['ping_status'] = 'online' if ping_ok else 'offline'
|
||||
|
||||
# Outlet-Status prüfen
|
||||
if printer.plug_ip:
|
||||
outlet_on, outlet_status = self.tapo_controller.check_outlet_status(
|
||||
printer.plug_ip, printer.plug_username, printer.plug_password, printer.id
|
||||
)
|
||||
status['outlet_on'] = outlet_on
|
||||
status['outlet_status'] = outlet_status
|
||||
|
||||
# Gesamtstatus bestimmen
|
||||
if status['ping_status'] == 'online' and status['outlet_on']:
|
||||
status['overall_status'] = 'online'
|
||||
elif status['outlet_on']:
|
||||
status['overall_status'] = 'booting'
|
||||
else:
|
||||
status['overall_status'] = 'offline'
|
||||
|
||||
except Exception as e:
|
||||
status['error_message'] = str(e)
|
||||
status['overall_status'] = 'error'
|
||||
hardware_logger.error(f"Drucker-Status-Prüfung fehlgeschlagen {printer.name}: {str(e)}")
|
||||
|
||||
return status
|
||||
|
||||
def _ping_address(self, ip_address: str, timeout: int = 3) -> bool:
|
||||
"""Pingt eine IP-Adresse an"""
|
||||
try:
|
||||
if platform.system().lower() == "windows":
|
||||
result = subprocess.run(
|
||||
['ping', '-n', '1', '-w', str(timeout * 1000), ip_address],
|
||||
capture_output=True, text=True, timeout=timeout + 2,
|
||||
encoding='utf-8', errors='replace'
|
||||
)
|
||||
else:
|
||||
result = subprocess.run(
|
||||
['ping', '-c', '1', '-W', str(timeout), ip_address],
|
||||
capture_output=True, text=True, timeout=timeout + 2,
|
||||
encoding='utf-8', errors='replace'
|
||||
)
|
||||
|
||||
return result.returncode == 0
|
||||
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
return False
|
||||
except Exception as e:
|
||||
hardware_logger.debug(f"Ping fehlgeschlagen {ip_address}: {str(e)}")
|
||||
return False
|
||||
|
||||
def initialize_all_outlets_on_startup(self) -> Dict[str, bool]:
|
||||
"""Initialisiert alle Outlets beim Systemstart"""
|
||||
try:
|
||||
from models import get_cached_session, Printer
|
||||
|
||||
results = {}
|
||||
|
||||
with get_cached_session() as session:
|
||||
printers = session.query(Printer).filter(
|
||||
Printer.active == True,
|
||||
Printer.plug_ip.isnot(None)
|
||||
).all()
|
||||
|
||||
hardware_logger.info(f"Initialisiere {len(printers)} Drucker-Outlets...")
|
||||
|
||||
for printer in printers:
|
||||
try:
|
||||
# Outlet ausschalten als Standardzustand
|
||||
success = self.tapo_controller.toggle_plug(
|
||||
printer.plug_ip, False,
|
||||
printer.plug_username, printer.plug_password
|
||||
)
|
||||
results[printer.name] = success
|
||||
|
||||
if success:
|
||||
hardware_logger.info(f"✅ {printer.name} Outlet initialisiert (AUS)")
|
||||
else:
|
||||
hardware_logger.warning(f"⚠️ {printer.name} Outlet-Initialisierung fehlgeschlagen")
|
||||
|
||||
except Exception as e:
|
||||
results[printer.name] = False
|
||||
hardware_logger.error(f"❌ {printer.name} Outlet-Fehler: {str(e)}")
|
||||
|
||||
success_count = sum(1 for success in results.values() if success)
|
||||
hardware_logger.info(f"Outlet-Initialisierung: {success_count}/{len(results)} erfolgreich")
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Outlet-Initialisierung fehlgeschlagen: {str(e)}")
|
||||
return {}
|
||||
|
||||
def get_printer_summary(self) -> Dict[str, int]:
|
||||
"""Gibt eine Zusammenfassung des Drucker-Status zurück"""
|
||||
try:
|
||||
status_dict = self.get_live_printer_status()
|
||||
|
||||
summary = {
|
||||
'total': len(status_dict),
|
||||
'online': 0,
|
||||
'offline': 0,
|
||||
'booting': 0,
|
||||
'error': 0,
|
||||
'outlets_on': 0,
|
||||
'outlets_off': 0
|
||||
}
|
||||
|
||||
for printer_status in status_dict.values():
|
||||
overall_status = printer_status.get('overall_status', 'unknown')
|
||||
|
||||
if overall_status in summary:
|
||||
summary[overall_status] += 1
|
||||
|
||||
if printer_status.get('outlet_on'):
|
||||
summary['outlets_on'] += 1
|
||||
else:
|
||||
summary['outlets_off'] += 1
|
||||
|
||||
return summary
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Drucker-Zusammenfassung fehlgeschlagen: {str(e)}")
|
||||
return {'total': 0, 'online': 0, 'offline': 0, 'error': 1}
|
||||
|
||||
def auto_discover_tapo_outlets(self) -> Dict[str, bool]:
|
||||
"""Automatische Erkennung von Tapo-Outlets im Netzwerk"""
|
||||
try:
|
||||
discovered = {}
|
||||
|
||||
# Standard-IP-Bereiche scannen
|
||||
base_ips = ["192.168.0.", "192.168.1.", "192.168.100."]
|
||||
|
||||
for base_ip in base_ips:
|
||||
for i in range(100, 110): # Scan 100-109
|
||||
ip = f"{base_ip}{i}"
|
||||
|
||||
try:
|
||||
# Schneller TCP-Port-Test
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(1)
|
||||
result = sock.connect_ex((ip, 80))
|
||||
sock.close()
|
||||
|
||||
if result == 0:
|
||||
# Versuche Tapo-Verbindung
|
||||
is_on, status = self.tapo_controller.check_outlet_status(ip)
|
||||
if status != "Fehler":
|
||||
discovered[ip] = True
|
||||
hardware_logger.info(f"🔍 Tapo-Outlet entdeckt: {ip}")
|
||||
|
||||
except Exception:
|
||||
pass # Ignoriere Fehler beim Scannen
|
||||
|
||||
hardware_logger.info(f"Auto-Discovery: {len(discovered)} Tapo-Outlets gefunden")
|
||||
return discovered
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Auto-Discovery fehlgeschlagen: {str(e)}")
|
||||
return {}
|
||||
|
||||
def clear_all_caches(self):
|
||||
"""Löscht alle Caches"""
|
||||
self.status_cache.clear()
|
||||
self.last_cache_update = None
|
||||
hardware_logger.info("Alle Hardware-Caches geleert")
|
||||
|
||||
# ===== HARDWARE INTEGRATION MANAGER =====
|
||||
|
||||
class HardwareIntegrationManager:
|
||||
"""Zentraler Manager für alle Hardware-Integrationen"""
|
||||
|
||||
def __init__(self):
|
||||
self.tapo_controller = TapoController()
|
||||
self.printer_monitor = PrinterMonitor()
|
||||
self.monitoring_active = False
|
||||
self.monitoring_thread = None
|
||||
hardware_logger.info("🔧 Hardware Integration Manager initialisiert")
|
||||
|
||||
def start_monitoring(self):
|
||||
"""Startet Hardware-Monitoring"""
|
||||
if self.monitoring_active:
|
||||
return
|
||||
|
||||
self.monitoring_active = True
|
||||
self.monitoring_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
|
||||
self.monitoring_thread.start()
|
||||
hardware_logger.info("Hardware-Monitoring gestartet")
|
||||
|
||||
def stop_monitoring(self):
|
||||
"""Stoppt Hardware-Monitoring"""
|
||||
self.monitoring_active = False
|
||||
if self.monitoring_thread:
|
||||
self.monitoring_thread.join(timeout=5)
|
||||
hardware_logger.info("Hardware-Monitoring gestoppt")
|
||||
|
||||
def _monitoring_loop(self):
|
||||
"""Hauptschleife für Hardware-Monitoring"""
|
||||
while self.monitoring_active:
|
||||
try:
|
||||
# Drucker-Status aktualisieren
|
||||
self.printer_monitor.get_live_printer_status(use_session_cache=False)
|
||||
|
||||
# 60 Sekunden warten
|
||||
for _ in range(60):
|
||||
if not self.monitoring_active:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Hardware-Monitoring-Fehler: {str(e)}")
|
||||
time.sleep(30) # Bei Fehlern länger warten
|
||||
|
||||
def get_hardware_status(self) -> Dict[str, Any]:
|
||||
"""Gibt umfassenden Hardware-Status zurück"""
|
||||
try:
|
||||
printer_summary = self.printer_monitor.get_printer_summary()
|
||||
|
||||
return {
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'monitoring_active': self.monitoring_active,
|
||||
'printer_summary': printer_summary,
|
||||
'tapo_controller_active': True,
|
||||
'cache_status': {
|
||||
'last_update': self.printer_monitor.last_cache_update.isoformat()
|
||||
if self.printer_monitor.last_cache_update else None,
|
||||
'cached_printers': len(self.printer_monitor.status_cache)
|
||||
}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Hardware-Status-Abfrage fehlgeschlagen: {str(e)}")
|
||||
return {'error': str(e), 'timestamp': datetime.now().isoformat()}
|
||||
|
||||
def initialize_system(self) -> Dict[str, Any]:
|
||||
"""Initialisiert das gesamte Hardware-System"""
|
||||
results = {
|
||||
'outlet_initialization': {},
|
||||
'auto_discovery': {},
|
||||
'monitoring_started': False
|
||||
}
|
||||
|
||||
try:
|
||||
# Outlets initialisieren
|
||||
results['outlet_initialization'] = self.printer_monitor.initialize_all_outlets_on_startup()
|
||||
|
||||
# Auto-Discovery
|
||||
results['auto_discovery'] = self.printer_monitor.auto_discover_tapo_outlets()
|
||||
|
||||
# Monitoring starten
|
||||
self.start_monitoring()
|
||||
results['monitoring_started'] = True
|
||||
|
||||
hardware_logger.info("Hardware-System erfolgreich initialisiert")
|
||||
|
||||
except Exception as e:
|
||||
hardware_logger.error(f"Hardware-System-Initialisierung fehlgeschlagen: {str(e)}")
|
||||
results['error'] = str(e)
|
||||
|
||||
return results
|
||||
|
||||
# ===== GLOBALE INSTANZ =====
|
||||
|
||||
# Singleton für Hardware Integration Manager
|
||||
_hardware_manager = None
|
||||
|
||||
def get_hardware_manager() -> HardwareIntegrationManager:
|
||||
"""Gibt die globale Hardware-Manager-Instanz zurück"""
|
||||
global _hardware_manager
|
||||
if _hardware_manager is None:
|
||||
_hardware_manager = HardwareIntegrationManager()
|
||||
return _hardware_manager
|
||||
|
||||
# ===== CONVENIENCE FUNCTIONS =====
|
||||
|
||||
def toggle_plug(ip: str, state: bool, username: str = None, password: str = None) -> bool:
|
||||
"""Convenience-Funktion für Tapo-Steuerung"""
|
||||
manager = get_hardware_manager()
|
||||
return manager.tapo_controller.toggle_plug(ip, state, username, password)
|
||||
|
||||
def get_printer_status(printer_id: int = None) -> Dict:
|
||||
"""Convenience-Funktion für Drucker-Status"""
|
||||
manager = get_hardware_manager()
|
||||
all_status = manager.printer_monitor.get_live_printer_status()
|
||||
|
||||
if printer_id:
|
||||
return all_status.get(printer_id, {})
|
||||
return all_status
|
||||
|
||||
def get_hardware_summary() -> Dict[str, Any]:
|
||||
"""Convenience-Funktion für Hardware-Zusammenfassung"""
|
||||
manager = get_hardware_manager()
|
||||
return manager.get_hardware_status()
|
||||
|
||||
def initialize_hardware_system() -> Dict[str, Any]:
|
||||
"""Convenience-Funktion für Hardware-Initialisierung"""
|
||||
manager = get_hardware_manager()
|
||||
return manager.initialize_system()
|
||||
|
||||
# Legacy-Kompatibilität
|
||||
def check_outlet_status(ip: str, username: str = None, password: str = None,
|
||||
printer_id: int = None) -> Tuple[bool, str]:
|
||||
"""Legacy-Kompatibilität für Outlet-Status"""
|
||||
manager = get_hardware_manager()
|
||||
return manager.tapo_controller.check_outlet_status(ip, username, password, printer_id)
|
||||
|
||||
def auto_discover_tapo_outlets() -> Dict[str, bool]:
|
||||
"""Legacy-Kompatibilität für Auto-Discovery"""
|
||||
manager = get_hardware_manager()
|
||||
return manager.printer_monitor.auto_discover_tapo_outlets()
|
||||
|
||||
def initialize_all_outlets() -> Dict[str, bool]:
|
||||
"""Legacy-Kompatibilität für Outlet-Initialisierung"""
|
||||
manager = get_hardware_manager()
|
||||
return manager.printer_monitor.initialize_all_outlets_on_startup()
|
||||
|
||||
# CLI Interface
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
command = sys.argv[1]
|
||||
manager = get_hardware_manager()
|
||||
|
||||
if command == "status":
|
||||
status = manager.get_hardware_status()
|
||||
print("=== Hardware Status ===")
|
||||
for key, value in status.items():
|
||||
print(f"{key}: {value}")
|
||||
|
||||
elif command == "discover":
|
||||
print("Starte Auto-Discovery...")
|
||||
discovered = manager.printer_monitor.auto_discover_tapo_outlets()
|
||||
print(f"Gefundene Outlets: {discovered}")
|
||||
|
||||
elif command == "init":
|
||||
print("Initialisiere Hardware-System...")
|
||||
results = manager.initialize_system()
|
||||
print(f"Initialisierung: {results}")
|
||||
|
||||
elif command == "monitor":
|
||||
print("Starte Hardware-Monitoring...")
|
||||
manager.start_monitoring()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(10)
|
||||
status = manager.get_hardware_status()
|
||||
print(f"Status: {status['printer_summary']}")
|
||||
except KeyboardInterrupt:
|
||||
print("Stoppe Monitoring...")
|
||||
manager.stop_monitoring()
|
||||
else:
|
||||
print("Available commands: status, discover, init, monitor")
|
||||
else:
|
||||
print("Hardware Integration Manager - Available commands: status, discover, init, monitor")
|
Reference in New Issue
Block a user