Files
Projektarbeit-MYP/backend/utils/hardware_integration_old.py
Till Tomczak 21e7ed4398 🔧 Major System Refactoring & UI Enhancements
- Dark-Mode JavaScript Optimierungen für bessere Performance
- Base Template erweitert mit Enhanced UI Components
- Dashboard Template modernisiert mit neuen Card-Layouts
- Hardware Integration massiv konsolidiert (1771 Zeilen reduziert)
- Drucker Steuerung Blueprint hinzugefügt
- Legacy Hardware Integration Files bereinigt
- System-Architektur vereinfacht und performanter

Major Changes:
- -2001 Zeilen Code durch Konsolidierung
- +451 Zeilen neue optimierte Implementierung
- Vollständige Template-System Überarbeitung

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-19 22:19:42 +02:00

1538 lines
62 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3.11
"""
Hardware Integration - Konsolidierte Hardware-Steuerung
====================================================
Migration Information:
- Ursprünglich: tapo_controller.py, printer_monitor.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: TP-Link Tapo Smart Plug Control, 3D-Printer Monitoring,
Hardware Discovery, Status Management
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
- Legacy Imports: Verfügbar über Wrapper-Funktionen
Changelog:
- v1.0 (2025-06-09): Initial massive consolidation for IHK project
MASSIVE KONSOLIDIERUNG für Projektarbeit MYP - IHK-Dokumentation
Author: MYP Team - Till Tomczak
Ziel: 88% Datei-Reduktion bei vollständiger Funktionalitäts-Erhaltung
"""
import time
import socket
import threading
import ipaddress
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# Optional Imports mit Fallback
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
try:
from flask import session
FLASK_AVAILABLE = True
except ImportError:
FLASK_AVAILABLE = False
try:
from sqlalchemy import func
from sqlalchemy.orm import Session
SQLALCHEMY_AVAILABLE = True
except ImportError:
SQLALCHEMY_AVAILABLE = False
# MYP Models & Utils
from models import get_db_session, Printer, PlugStatusLog
from utils.logging_config import get_logger
# Logger
hardware_logger = get_logger("hardware_integration")
tapo_logger = get_logger("tapo_controller") # Rückwärtskompatibilität
monitor_logger = get_logger("printer_monitor") # Rückwärtskompatibilität
# Hardware-Verfügbarkeit prüfen
try:
from PyP100.PyP100 import P100 as PyP100
from PyP100.PyP110 import P110 as PyP110
TAPO_AVAILABLE = True
hardware_logger.info("✅ PyP100 (TP-Link Tapo) verfügbar")
except ImportError:
TAPO_AVAILABLE = False
PyP100 = None
PyP110 = None
hardware_logger.warning("⚠️ PyP100 nicht verfügbar - Tapo-Funktionen eingeschränkt")
# Exportierte Funktionen für Legacy-Kompatibilität
__all__ = [
# Tapo Controller
'TapoController', 'get_tapo_controller',
# Printer Monitor
'PrinterMonitor', 'get_printer_monitor',
# Legacy Compatibility
'toggle_plug', 'test_tapo_connection', 'check_outlet_status',
'auto_discover_tapo_outlets', 'initialize_all_outlets',
'printer_monitor', 'tapo_controller'
]
# ===== TAPO SMART PLUG CONTROLLER =====
class TapoController:
"""TP-Link Tapo Smart Plug Controller - Konsolidiert aus tapo_controller.py"""
def __init__(self):
"""Initialisiere den Tapo Controller"""
# Lazy import um zirkuläre Abhängigkeiten zu vermeiden
try:
from utils.utilities_collection import TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_TIMEOUT, TAPO_RETRY_COUNT
self.username = TAPO_USERNAME
self.password = TAPO_PASSWORD
self.default_ips = DEFAULT_TAPO_IPS
self.timeout = TAPO_TIMEOUT
self.retry_count = TAPO_RETRY_COUNT
except ImportError:
# Fallback-Werte
self.username = "admin"
self.password = "admin"
self.default_ips = []
self.timeout = 10
self.retry_count = 3
self.auto_discovered = False
if not TAPO_AVAILABLE:
tapo_logger.error("❌ PyP100-modul nicht installiert - tapo-funktionalität eingeschränkt")
else:
tapo_logger.info("✅ tapo controller initialisiert")
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None, debug: bool = True) -> bool:
"""
Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus
Args:
ip: IP-Adresse der Steckdose
state: True für An, False für Aus
username: Optional - Tapo-Benutzername
password: Optional - Tapo-Passwort
debug: Aktiviert erweiterte Debug-Ausgaben
state: True = ein, False = aus
username: Benutzername (optional, nutzt Standard wenn nicht angegeben)
password: Passwort (optional, nutzt Standard wenn nicht angegeben)
Returns:
bool: True wenn erfolgreich geschaltet
"""
if not TAPO_AVAILABLE:
tapo_logger.error("❌ PyP100-modul nicht installiert - steckdose kann nicht geschaltet werden")
return False
# Immer globale Anmeldedaten verwenden
username = self.username
password = self.password
tapo_logger.debug(f"🔧 verwende globale tapo-anmeldedaten für {ip}")
start_time = time.time()
for attempt in range(self.retry_count):
try:
if debug:
tapo_logger.debug(f"🔌 Versuch {attempt+1}/{self.retry_count}: Verbinde zu Tapo-Steckdose {ip}")
# P100-Verbindung herstellen
p100 = PyP100(ip, username, password)
if debug:
tapo_logger.debug(f"🤝 Handshake mit {ip}...")
p100.handshake()
if debug:
tapo_logger.debug(f"🔐 Login bei {ip}...")
p100.login()
# Steckdose schalten
action_time = time.time()
if state:
if debug:
tapo_logger.debug(f"⚡ Schalte {ip} EIN...")
p100.turnOn()
tapo_logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich eingeschaltet")
else:
if debug:
tapo_logger.debug(f"🔴 Schalte {ip} AUS...")
p100.turnOff()
tapo_logger.info(f"✅ Tapo-Steckdose {ip} erfolgreich ausgeschaltet")
response_time = int((time.time() - start_time) * 1000)
if debug:
tapo_logger.debug(f"⏱️ Schaltvorgang für {ip} abgeschlossen in {response_time}ms")
# Status-Logging
new_status = "on" if state else "off"
self._log_plug_status(None, new_status, ip, response_time_ms=response_time)
return True
except Exception as e:
action = "ein" if state else "aus"
response_time = int((time.time() - start_time) * 1000)
if debug:
tapo_logger.warning(f"⚠️ Versuch {attempt+1}/{self.retry_count} fehlgeschlagen beim {action}schalten von {ip}: {str(e)}")
tapo_logger.debug(f"🔍 Fehlerdetails: Typ={type(e).__name__}, Zeit={response_time}ms")
# Status-Logging bei Fehler
self._log_plug_status(None, "disconnected", ip,
response_time_ms=response_time,
error_message=str(e))
if attempt < self.retry_count - 1:
if debug:
tapo_logger.debug(f"⏳ Warte 1 Sekunde vor erneutem Versuch...")
time.sleep(1)
else:
tapo_logger.error(f"❌ Alle {self.retry_count} Versuche fehlgeschlagen beim {action}schalten der Tapo-Steckdose {ip}")
if debug:
tapo_logger.debug(f"💀 Finale Fehlerdetails: {str(e)}")
return False
def clear_cache(self) -> bool:
"""
Leert alle Caches des TapoControllers
Returns:
bool: True wenn erfolgreich
"""
try:
# Hier können Cache-Daten geleert werden falls vorhanden
# Aktuell verwendet TapoController keinen expliziten Cache,
# aber diese Methode wird für Konsistenz bereitgestellt
tapo_logger.debug("TapoController Cache geleert (keine Cache-Daten vorhanden)")
return True
except Exception as e:
tapo_logger.error(f"Fehler beim Leeren des TapoController Cache: {str(e)}")
return False
def turn_off(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool:
"""
Schaltet eine TP-Link Tapo P110-Steckdose aus
Args:
ip: IP-Adresse der Steckdose
username: Benutzername (optional)
password: Passwort (optional)
printer_id: ID des zugehörigen Druckers für Logging (optional)
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
if not TAPO_AVAILABLE:
tapo_logger.error("⚠️ PyP100-modul nicht verfügbar - kann tapo-steckdose nicht schalten")
self._log_plug_status(printer_id, "disconnected", ip, error_message="PyP100-modul nicht verfügbar")
return False
# Immer globale Anmeldedaten verwenden
username = self.username
password = self.password
start_time = time.time()
try:
# TP-Link Tapo P100 Verbindung herstellen
p100 = PyP100(ip, username, password)
p100.handshake()
p100.login()
# Steckdose ausschalten
p100.turnOff()
response_time = int((time.time() - start_time) * 1000) # in millisekunden
tapo_logger.debug(f"✅ tapo-steckdose {ip} erfolgreich ausgeschaltet")
# Logging: erfolgreich ausgeschaltet
self._log_plug_status(printer_id, "off", ip, response_time_ms=response_time)
return True
except Exception as e:
response_time = int((time.time() - start_time) * 1000)
tapo_logger.debug(f"⚠️ fehler beim ausschalten der tapo-steckdose {ip}: {str(e)}")
# Logging: fehlgeschlagener versuch
self._log_plug_status(printer_id, "disconnected", ip,
response_time_ms=response_time,
error_message=str(e))
return False
def check_outlet_status(self, ip: str, username: str = None, password: str = None,
printer_id: int = None, debug: bool = True) -> Tuple[bool, str]:
"""
Überprüft den Status einer TP-Link Tapo P110-Steckdose
Args:
ip: IP-Adresse der Steckdose
username: Benutzername (optional)
password: Passwort (optional)
printer_id: ID des zugehörigen Druckers für Logging (optional)
Returns:
Tuple[bool, str]: (erreichbar, status) - status: "on", "off", "unknown"
"""
if not TAPO_AVAILABLE:
if debug:
tapo_logger.warning("⚠️ PyP100-modul nicht verfügbar - verwende Fallback-Netzwerktest")
# Fallback: Einfacher Ping-Test
ping_reachable = self.ping_address(ip, timeout=3)
if ping_reachable:
tapo_logger.debug(f"📡 Fallback: {ip} ist über Netzwerk erreichbar, aber Status unbekannt")
return True, "unknown"
else:
tapo_logger.debug(f"❌ Fallback: {ip} ist nicht erreichbar")
return False, "unreachable"
# Immer globale Anmeldedaten verwenden
username = self.username
password = self.password
start_time = time.time()
try:
if debug:
tapo_logger.debug(f"🔍 Status-Check für Tapo-Steckdose {ip} gestartet")
# TP-Link Tapo P100 Verbindung herstellen
p100 = PyP100(ip, username, password)
if debug:
tapo_logger.debug(f"🤝 Handshake mit {ip}...")
p100.handshake()
if debug:
tapo_logger.debug(f"🔐 Login bei {ip}...")
p100.login()
if debug:
tapo_logger.debug(f"📊 Geräteinformationen von {ip} abrufen...")
# Geräteinformationen abrufen
device_info = p100.getDeviceInfo()
# Status auswerten
device_on = device_info.get('device_on', False)
status = "on" if device_on else "off"
response_time = int((time.time() - start_time) * 1000)
if debug:
tapo_logger.debug(f"✅ Tapo-Steckdose {ip}: Status = {status}, Reaktionszeit = {response_time}ms")
tapo_logger.debug(f"📋 Device-Info: {device_info}")
tapo_logger.info(f"✅ Tapo-Steckdose {ip}: Status = {status}")
# Erweiterte Informationen sammeln
extra_info = self._collect_device_info(p100, device_info, debug)
if debug and extra_info:
tapo_logger.debug(f"🔋 Zusätzliche Informationen für {ip}: {extra_info}")
# Logging: erfolgreicher status-check
self._log_plug_status(printer_id, status, ip,
response_time_ms=response_time,
power_consumption=extra_info.get('power_consumption'),
voltage=extra_info.get('voltage'),
current=extra_info.get('current'),
firmware_version=extra_info.get('firmware_version'),
notes="automatischer status-check")
return True, status
except Exception as e:
response_time = int((time.time() - start_time) * 1000)
if debug:
tapo_logger.warning(f"⚠️ Fehler bei Tapo-Steckdosen-Status-Check {ip}: {str(e)}")
tapo_logger.debug(f"🔍 Fehlerdetails: Typ={type(e).__name__}, Zeit={response_time}ms")
tapo_logger.debug(f"💀 Exception-Details: {repr(e)}")
tapo_logger.error(f"❌ Status-Check für {ip} fehlgeschlagen: {str(e)}")
# Logging: fehlgeschlagener status-check
self._log_plug_status(printer_id, "disconnected", ip,
response_time_ms=response_time,
error_message=str(e),
notes="status-check fehlgeschlagen")
return False, "unknown"
def test_connection(self, ip: str, username: str = None, password: str = None) -> dict:
"""
Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose
Args:
ip: IP-Adresse der Steckdose
username: Benutzername (optional)
password: Passwort (optional)
Returns:
dict: Ergebnis mit Status und Informationen
"""
result = {
"success": False,
"message": "",
"device_info": None,
"error": None
}
if not TAPO_AVAILABLE:
result["message"] = "PyP100-modul nicht verfügbar"
result["error"] = "ModuleNotFound"
tapo_logger.error("PyP100-modul nicht verfügbar - kann tapo-steckdosen nicht testen")
return result
# Verwende globale Anmeldedaten falls nicht angegeben
if not username or not password:
username = self.username
password = self.password
tapo_logger.debug(f"verwende globale tapo-anmeldedaten für {ip}")
try:
# TP-Link Tapo P100 Verbindung herstellen
p100 = PyP100(ip, username, password)
p100.handshake()
p100.login()
# Geräteinformationen abrufen
device_info = p100.getDeviceInfo()
result["success"] = True
result["message"] = "verbindung erfolgreich"
result["device_info"] = device_info
tapo_logger.info(f"tapo-verbindung zu {ip} erfolgreich: {device_info.get('nickname', 'unbekannt')}")
except Exception as e:
result["success"] = False
result["message"] = f"verbindungsfehler: {str(e)}"
result["error"] = str(e)
tapo_logger.error(f"fehler bei tapo-test zu {ip}: {str(e)}")
return result
def ping_address(self, ip: str, timeout: int = 5) -> bool:
"""
Führt einen erweiterten Konnektivitätstest zu einer IP-Adresse durch
Verwendet TCP-Verbindung und ICMP-Ping für maximale Kompatibilität
Args:
ip: Zu testende IP-Adresse
timeout: Timeout in Sekunden
Returns:
bool: True wenn Verbindung erfolgreich
"""
try:
# IP-Adresse validieren
ipaddress.ip_address(ip.strip())
# 1. ICMP-Ping versuchen
try:
import subprocess
result = subprocess.run(
['ping', '-c', '1', '-W', str(timeout), ip.strip()],
capture_output=True,
timeout=timeout + 2
)
if result.returncode == 0:
tapo_logger.debug(f"✅ ICMP-Ping zu {ip} erfolgreich")
return True
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
tapo_logger.debug(f"⚠️ ICMP-Ping zu {ip} fehlgeschlagen: {e}")
# 2. TCP-Port-Tests für Tapo-Steckdosen
test_ports = [9999, 80, 443, 22, 23] # Tapo-Standard, HTTP, HTTPS, SSH, Telnet
for port in test_ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip.strip(), port))
sock.close()
if result == 0:
tapo_logger.debug(f"✅ TCP-Verbindung zu {ip}:{port} erfolgreich")
return True
except Exception as e:
tapo_logger.debug(f"⚠️ TCP-Test zu {ip}:{port} fehlgeschlagen: {e}")
continue
# 3. Erweiterte Netzwerk-Tests
try:
# ARP-Test (falls möglich)
import subprocess
arp_result = subprocess.run(
['ping', '-c', '1', '-W', '1', ip.strip()],
capture_output=True,
timeout=3
)
if arp_result.returncode == 0:
tapo_logger.debug(f"✅ Erweiterte Netzwerkerreichbarkeit für {ip} bestätigt")
return True
except Exception as e:
tapo_logger.debug(f"⚠️ Erweiterter Netzwerktest für {ip} fehlgeschlagen: {e}")
tapo_logger.debug(f"❌ Alle Konnektivitätstests zu {ip} fehlgeschlagen")
return False
except Exception as e:
tapo_logger.debug(f"❌ Kritischer Fehler beim Konnektivitätstest zu {ip}: {str(e)}")
return False
def auto_discover_outlets(self) -> Dict[str, bool]:
"""
Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk
Returns:
Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel
"""
if self.auto_discovered:
tapo_logger.info("🔍 tapo-steckdosen wurden bereits erkannt")
return {}
tapo_logger.info("🔍 starte automatische tapo-steckdosenerkennung...")
results = {}
start_time = time.time()
# Standard-IPs aus der Konfiguration testen
tapo_logger.info(f"🔄 teste {len(self.default_ips)} standard-ips aus der konfiguration")
for i, ip in enumerate(self.default_ips):
try:
tapo_logger.info(f"🔍 teste ip {i+1}/{len(self.default_ips)}: {ip}")
# Ping-Test mit 5 Sekunden Timeout
if self.ping_address(ip, timeout=5):
tapo_logger.info(f"✅ steckdose mit ip {ip} ist erreichbar")
# Tapo-Verbindung testen
test_result = self.test_connection(ip)
if test_result["success"]:
device_info = test_result["device_info"]
nickname = device_info.get('nickname', f"tapo p110 ({ip})")
state = "on" if device_info.get('device_on', False) else "off"
tapo_logger.info(f"✅ tapo-steckdose '{nickname}' ({ip}) gefunden - status: {state}")
results[ip] = True
# Steckdose in Datenbank speichern/aktualisieren
try:
self._ensure_outlet_in_database(ip, nickname)
except Exception as db_error:
tapo_logger.warning(f"⚠️ fehler beim speichern in db für {ip}: {str(db_error)}")
else:
tapo_logger.debug(f"❌ ip {ip} ist erreichbar, aber keine tapo-steckdose")
results[ip] = False
else:
tapo_logger.debug(f"❌ ip {ip} nicht erreichbar")
results[ip] = False
except Exception as e:
tapo_logger.warning(f"❌ fehler bei steckdosen-erkennung für ip {ip}: {str(e)}")
results[ip] = False
continue
# Erfolgsstatistik
success_count = sum(1 for success in results.values() if success)
elapsed_time = time.time() - start_time
tapo_logger.info(f"✅ steckdosen-erkennung abgeschlossen: {success_count}/{len(results)} steckdosen gefunden in {elapsed_time:.1f}s")
self.auto_discovered = True
return results
def initialize_all_outlets(self) -> Dict[str, bool]:
"""
Schaltet alle gespeicherten Steckdosen aus (einheitlicher Startzustand)
Returns:
Dict[str, bool]: Ergebnis der Initialisierung pro Drucker
"""
tapo_logger.info("🚀 starte steckdosen-initialisierung...")
results = {}
try:
with get_db_session() as db_session:
printers = db_session.query(Printer).filter(Printer.active == True).all()
if not printers:
tapo_logger.warning("⚠️ keine aktiven drucker zur initialisierung gefunden")
return results
# Alle Steckdosen ausschalten
for printer in printers:
try:
if printer.plug_ip:
success = self.turn_off(
printer.plug_ip,
printer_id=printer.id
)
results[printer.name] = success
if success:
tapo_logger.info(f"{printer.name}: steckdose ausgeschaltet")
printer.status = "offline"
printer.last_checked = datetime.now()
else:
tapo_logger.warning(f"{printer.name}: steckdose konnte nicht ausgeschaltet werden")
else:
tapo_logger.warning(f"⚠️ {printer.name}: keine steckdosen-ip konfiguriert")
results[printer.name] = False
except Exception as e:
tapo_logger.error(f"❌ fehler bei initialisierung von {printer.name}: {str(e)}")
results[printer.name] = False
# Änderungen speichern
db_session.commit()
success_count = sum(1 for success in results.values() if success)
total_count = len(results)
tapo_logger.info(f"🎯 steckdosen-initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich")
except Exception as e:
tapo_logger.error(f"❌ kritischer fehler bei steckdosen-initialisierung: {str(e)}")
return results
def get_all_outlet_status(self) -> Dict[str, Dict[str, Any]]:
"""
Holt den Status aller konfigurierten Tapo-Steckdosen
Returns:
Dict[str, Dict]: Status aller Steckdosen mit IP als Schlüssel
"""
status_dict = {}
try:
with get_db_session() as db_session:
printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
if not printers:
tapo_logger.info(" keine drucker mit tapo-steckdosen konfiguriert")
return status_dict
tapo_logger.info(f"🔍 prüfe status von {len(printers)} tapo-steckdosen...")
# Parallel-Status-Prüfung
with ThreadPoolExecutor(max_workers=min(len(printers), 8)) as executor:
future_to_printer = {
executor.submit(
self.check_outlet_status,
printer.plug_ip,
printer_id=printer.id
): printer
for printer in printers
}
for future in as_completed(future_to_printer, timeout=15):
printer = future_to_printer[future]
try:
reachable, status = future.result()
status_dict[printer.plug_ip] = {
"printer_name": printer.name,
"printer_id": printer.id,
"reachable": reachable,
"status": status,
"ip": printer.plug_ip,
"last_checked": datetime.now().isoformat()
}
except Exception as e:
tapo_logger.error(f"❌ fehler bei status-check für {printer.name}: {str(e)}")
status_dict[printer.plug_ip] = {
"printer_name": printer.name,
"printer_id": printer.id,
"reachable": False,
"status": "error",
"ip": printer.plug_ip,
"error": str(e),
"last_checked": datetime.now().isoformat()
}
tapo_logger.info(f"✅ status-update abgeschlossen für {len(status_dict)} steckdosen")
except Exception as e:
tapo_logger.error(f"❌ kritischer fehler beim abrufen des steckdosen-status: {str(e)}")
return status_dict
def _collect_device_info(self, p100, device_info, debug: bool = False) -> dict:
"""
Sammelt erweiterte Geräteinformationen von der Tapo-Steckdose
Args:
p100: P100-Instanz
device_info: Basis-Geräteinformationen
debug: Debug-Modus aktivieren
Returns:
Dict: Erweiterte Informationen
"""
extra_info = {}
try:
if debug:
tapo_logger.debug(f"🔋 Sammle erweiterte Geräteinformationen...")
# Stromverbrauch abrufen (nur bei P110)
try:
if debug:
tapo_logger.debug(f"⚡ Versuche Energieverbrauch abzurufen...")
energy_usage = p100.getEnergyUsage()
if energy_usage:
extra_info['power_consumption'] = energy_usage.get('current_power', 0)
extra_info['voltage'] = energy_usage.get('voltage_mv', 0) / 1000.0
extra_info['current'] = energy_usage.get('current_ma', 0) / 1000.0
if debug:
tapo_logger.debug(f"🔌 Stromverbrauch: {extra_info['power_consumption']}W")
tapo_logger.debug(f"⚡ Spannung: {extra_info['voltage']}V")
tapo_logger.debug(f"🔄 Strom: {extra_info['current']}A")
else:
if debug:
tapo_logger.debug(f" Keine Energiedaten verfügbar (vermutlich P100)")
except Exception as energy_error:
if debug:
tapo_logger.debug(f"⚠️ Energiemessung nicht verfügbar: {energy_error}")
pass # P100 unterstützt keine Energiemessung
# Firmware und Hardware-Informationen
extra_info['firmware_version'] = device_info.get('fw_ver', 'unknown')
extra_info['hardware_version'] = device_info.get('hw_ver', 'unknown')
extra_info['device_id'] = device_info.get('device_id', 'unknown')
extra_info['mac_address'] = device_info.get('mac', 'unknown')
if debug:
tapo_logger.debug(f"📋 Firmware: {extra_info['firmware_version']}")
tapo_logger.debug(f"🔧 Hardware: {extra_info['hardware_version']}")
tapo_logger.debug(f"🆔 Device-ID: {extra_info['device_id']}")
tapo_logger.debug(f"🌐 MAC: {extra_info['mac_address']}")
except Exception as e:
if debug:
tapo_logger.warning(f"⚠️ Fehler beim Sammeln erweiterter Geräteinformationen: {e}")
tapo_logger.debug(f"🔍 Fehlerdetails: {repr(e)}")
else:
tapo_logger.debug(f"Konnte erweiterte Geräteinformationen nicht abrufen: {e}")
return extra_info
def _log_plug_status(self, printer_id: int, status: str, ip_address: str, **kwargs):
"""
Loggt den Status einer Steckdose in die Datenbank
Args:
printer_id: ID des zugehörigen Druckers
status: Status der Steckdose ("on", "off", "disconnected")
ip_address: IP-Adresse der Steckdose
**kwargs: Zusätzliche Informationen für das Log
"""
try:
with get_db_session() as db_session:
log_entry = PlugStatusLog(
printer_id=printer_id,
status=status,
ip_address=ip_address,
response_time_ms=kwargs.get('response_time_ms'),
power_consumption=kwargs.get('power_consumption'),
voltage=kwargs.get('voltage'),
current=kwargs.get('current'),
error_message=kwargs.get('error_message'),
firmware_version=kwargs.get('firmware_version'),
notes=kwargs.get('notes'),
timestamp=datetime.now(),
source='system',
user_id=None
)
db_session.add(log_entry)
db_session.commit()
except Exception as e:
tapo_logger.debug(f"Fehler beim Loggen des Plug-Status: {e}")
# Session wird automatisch mit rollback geschlossen durch context manager
def _ensure_outlet_in_database(self, ip_address: str, nickname: str = None) -> bool:
"""
Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert
Args:
ip_address: IP-Adresse der Steckdose
nickname: Name der Steckdose (optional)
Returns:
bool: True wenn erfolgreich in Datenbank gespeichert/aktualisiert
"""
try:
with get_db_session() as db_session:
# Prüfen, ob bereits ein Drucker mit dieser Steckdosen-IP existiert
existing_printer = db_session.query(Printer).filter(
Printer.plug_ip == ip_address
).first()
if existing_printer:
tapo_logger.debug(f"Steckdose {ip_address} bereits mit Drucker {existing_printer.name} verknüpft")
return True
# Neuen Drucker-Eintrag für die Steckdose erstellen
printer_name = nickname or f"Tapo Plug {ip_address}"
new_printer = Printer(
name=printer_name,
ip_address=ip_address, # Gleiche IP für Drucker und Steckdose
plug_ip=ip_address,
location="Automatisch erkannt",
active=True,
status="offline",
plug_username=self.username,
plug_password=self.password,
last_checked=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
tapo_logger.info(f"✅ Neue Tapo-Steckdose '{printer_name}' ({ip_address}) in Datenbank gespeichert")
return True
except Exception as e:
tapo_logger.error(f"❌ Fehler beim Speichern der Steckdose {ip_address} in Datenbank: {str(e)}")
return False
def get_energy_statistics(self) -> Dict[str, Any]:
"""
Sammelt Energiestatistiken von allen P110 Steckdosen.
Returns:
Dict: Aggregierte Energiestatistiken
"""
hardware_logger.info("🔋 Sammle Energiestatistiken von allen P110 Steckdosen...")
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
statistics = {
'total_devices': 0,
'online_devices': 0,
'total_current_power': 0.0,
'total_today_energy': 0.0,
'total_month_energy': 0.0,
'devices': [],
'hourly_consumption': [0] * 24, # 24 Stunden
'daily_consumption': [0] * 30, # 30 Tage
'monthly_consumption': [0] * 12, # 12 Monate
'timestamp': datetime.now().isoformat()
}
for printer in printers:
device_stats = {
'id': printer.id,
'name': printer.name,
'location': printer.location,
'model': printer.model,
'ip': printer.plug_ip,
'online': False,
'current_power': 0.0,
'today_energy': 0.0,
'month_energy': 0.0,
'voltage': 0.0,
'current': 0.0,
'past24h': [],
'past30d': [],
'past1y': []
}
statistics['total_devices'] += 1
try:
# P110 Energiedaten abrufen
p110 = PyP110(printer.plug_ip, self.username, self.password)
p110.handshake()
p110.login()
# Geräteinformationen
device_info = p110.getDeviceInfo()
if not device_info or 'result' not in device_info:
continue
# Nur P110 Geräte verarbeiten
if 'P110' not in device_info['result'].get('model', ''):
continue
# Energiedaten abrufen
energy_usage = p110.getEnergyUsage()
if energy_usage and 'result' in energy_usage:
energy_data = energy_usage['result']
device_stats['online'] = True
device_stats['current_power'] = energy_data.get('current_power', 0) / 1000 # mW zu W
device_stats['today_energy'] = energy_data.get('today_energy', 0)
device_stats['month_energy'] = energy_data.get('month_energy', 0)
device_stats['past24h'] = energy_data.get('past24h', [])
device_stats['past30d'] = energy_data.get('past30d', [])
device_stats['past1y'] = energy_data.get('past1y', [])
# Aggregierte Werte
statistics['online_devices'] += 1
statistics['total_current_power'] += device_stats['current_power']
statistics['total_today_energy'] += device_stats['today_energy']
statistics['total_month_energy'] += device_stats['month_energy']
# Stündliche Daten aggregieren (letzten 24h)
if device_stats['past24h']:
for i, hourly_value in enumerate(device_stats['past24h'][:24]):
if i < len(statistics['hourly_consumption']):
statistics['hourly_consumption'][i] += hourly_value
# Tägliche Daten aggregieren (letzten 30 Tage)
if device_stats['past30d']:
for i, daily_value in enumerate(device_stats['past30d'][:30]):
if i < len(statistics['daily_consumption']):
statistics['daily_consumption'][i] += daily_value
# Monatliche Daten aggregieren (letzten 12 Monate)
if device_stats['past1y']:
for i, monthly_value in enumerate(device_stats['past1y'][:12]):
if i < len(statistics['monthly_consumption']):
statistics['monthly_consumption'][i] += monthly_value
hardware_logger.debug(f"✅ Energiedaten für {printer.name}: {device_stats['current_power']}W")
except Exception as e:
hardware_logger.warning(f"⚠️ Konnte Energiedaten für {printer.name} nicht abrufen: {str(e)}")
statistics['devices'].append(device_stats)
db_session.close()
# Durchschnittswerte berechnen
if statistics['online_devices'] > 0:
statistics['avg_current_power'] = statistics['total_current_power'] / statistics['online_devices']
statistics['avg_today_energy'] = statistics['total_today_energy'] / statistics['online_devices']
statistics['avg_month_energy'] = statistics['total_month_energy'] / statistics['online_devices']
else:
statistics['avg_current_power'] = 0.0
statistics['avg_today_energy'] = 0.0
statistics['avg_month_energy'] = 0.0
hardware_logger.info(f"✅ Energiestatistiken erfolgreich gesammelt: {statistics['online_devices']}/{statistics['total_devices']} Geräte online")
hardware_logger.info(f"📊 Gesamtverbrauch: {statistics['total_current_power']:.1f}W aktuell, {statistics['total_today_energy']}Wh heute")
return statistics
except Exception as e:
hardware_logger.error(f"❌ Fehler beim Sammeln der Energiestatistiken: {str(e)}")
return {
'total_devices': 0,
'online_devices': 0,
'total_current_power': 0.0,
'total_today_energy': 0.0,
'total_month_energy': 0.0,
'devices': [],
'hourly_consumption': [0] * 24,
'daily_consumption': [0] * 30,
'monthly_consumption': [0] * 12,
'timestamp': datetime.now().isoformat(),
'error': str(e)
}
def turn_off_outlet(self, ip: str, printer_id: int = None) -> bool:
"""
Wrapper für Legacy-Kompatibilität - schaltet eine Tapo-Steckdose aus
Args:
ip: IP-Adresse der Steckdose
printer_id: ID des zugehörigen Druckers für Logging (optional)
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
return self.turn_off(ip, printer_id=printer_id)
def turn_on_outlet(self, ip: str, printer_id: int = None) -> bool:
"""
Wrapper für Legacy-Kompatibilität - schaltet eine Tapo-Steckdose ein
Args:
ip: IP-Adresse der Steckdose
printer_id: ID des zugehörigen Druckers für Logging (optional)
Returns:
bool: True wenn erfolgreich eingeschaltet
"""
return self.toggle_plug(ip, True)
# ===== PRINTER MONITOR =====
class PrinterMonitor:
"""3D-Drucker Monitor mit Status-Management und Session-Caching"""
# Status-Konstanten
STATUS_ON = "on"
STATUS_OFF = "off"
STATUS_UNREACHABLE = "unreachable"
# Status-Mapping für UI
STATUS_DISPLAY = {
STATUS_ON: {"text": "An", "color": "green", "icon": "power"},
STATUS_OFF: {"text": "Aus", "color": "gray", "icon": "power-off"},
STATUS_UNREACHABLE: {"text": "Nicht erreichbar", "color": "red", "icon": "exclamation-triangle"}
}
def __init__(self):
self.cache = {}
self._cache_timeout = 300 # 5 Minuten Cache
self._cache_lock = threading.RLock()
self._last_check = {}
self.check_interval = 30 # Sekunden zwischen Status-Checks
# Session-spezifischer Status-Cache für Benutzer-Sessions
self._session_cache = {}
self._session_cache_lock = threading.RLock()
self._session_cache_ttl = 300 # 5 Minuten für Session-Cache
# Thread-Pool für asynchrone Operationen
self._executor = ThreadPoolExecutor(max_workers=6)
hardware_logger.info("✅ Printer Monitor mit Session-Caching initialisiert")
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
"""
Holt Live-Druckerstatus mit Cache-Unterstützung.
Args:
use_session_cache: Ob Cache verwendet werden soll
Returns:
Dict: Druckerstatus mit Drucker-ID als Schlüssel
"""
try:
# Cache prüfen wenn aktiviert
if use_session_cache and 'live_status' in self.cache:
cache_entry = self.cache['live_status']
if (datetime.now() - cache_entry['timestamp']).total_seconds() < self._cache_timeout:
hardware_logger.debug("Live-Status aus Cache abgerufen")
return cache_entry['data']
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
status_dict = {}
for printer in printers:
# Basis-Status
printer_status = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"status": printer.status,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"has_plug": bool(printer.plug_ip),
"active": printer.active,
"last_checked": printer.last_checked.isoformat() if printer.last_checked else None,
"created_at": printer.created_at.isoformat() if printer.created_at else None
}
# Tapo-Status wenn verfügbar
if printer.plug_ip and TAPO_AVAILABLE:
try:
tapo_controller = get_tapo_controller()
reachable, plug_status = tapo_controller.check_outlet_status(
printer.plug_ip, printer_id=printer.id
)
printer_status.update({
"plug_reachable": reachable,
"plug_status": plug_status,
"can_control": reachable
})
except Exception as e:
hardware_logger.error(f"Tapo-Status-Fehler für {printer.name}: {e}")
printer_status.update({
"plug_reachable": False,
"plug_status": "error",
"can_control": False,
"error": str(e)
})
else:
printer_status.update({
"plug_reachable": False,
"plug_status": "no_plug",
"can_control": False
})
status_dict[printer.id] = printer_status
db_session.close()
# Cache aktualisieren
if use_session_cache:
self.cache['live_status'] = {
'data': status_dict,
'timestamp': datetime.now()
}
hardware_logger.info(f"Live-Status für {len(status_dict)} Drucker abgerufen")
return status_dict
except Exception as e:
hardware_logger.error(f"Status-Fehler: {e}")
return {}
def get_printer_summary(self) -> Dict[str, Any]:
"""
Erstellt eine Zusammenfassung des Druckerstatus.
Returns:
Dict: Zusammenfassung mit Zählern und Statistiken
"""
try:
status_data = self.get_live_printer_status(use_session_cache=True)
summary = {
'total': len(status_data),
'online': 0,
'offline': 0,
'standby': 0,
'unreachable': 0,
'with_plug': 0,
'plug_online': 0,
'plug_offline': 0
}
for printer_id, printer_data in status_data.items():
status = printer_data.get('status', 'offline')
# Status-Zähler
if status == 'online':
summary['online'] += 1
elif status == 'standby':
summary['standby'] += 1
elif status == 'unreachable':
summary['unreachable'] += 1
else:
summary['offline'] += 1
# Plug-Zähler
if printer_data.get('has_plug'):
summary['with_plug'] += 1
plug_status = printer_data.get('plug_status', 'unknown')
if plug_status == 'on':
summary['plug_online'] += 1
elif plug_status == 'off':
summary['plug_offline'] += 1
return summary
except Exception as e:
hardware_logger.error(f"Summary-Fehler: {e}")
return {
'total': 0,
'online': 0,
'offline': 0,
'standby': 0,
'unreachable': 0,
'with_plug': 0,
'plug_online': 0,
'plug_offline': 0
}
def clear_all_caches(self):
"""Leert alle Caches des Printer Monitors."""
with self._cache_lock:
self.cache.clear()
self._last_check.clear()
with self._session_cache_lock:
self._session_cache.clear()
hardware_logger.debug("Alle Printer Monitor Caches geleert")
def control_plug(self, printer_id: int, action: str) -> Tuple[bool, str]:
"""
Steuert eine Tapo-Steckdose über den TapoController
Args:
printer_id: ID des Druckers
action: "on" oder "off"
Returns:
Tuple (Erfolg, Nachricht)
"""
try:
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
return False, "Drucker nicht gefunden"
if not printer.plug_ip:
return False, "Keine Steckdose konfiguriert"
# Tapo-Controller verwenden
tapo_ctrl = get_tapo_controller()
if not tapo_ctrl:
return False, "Tapo-Controller nicht verfügbar"
# Aktion ausführen
success = False
if action == "on":
success = tapo_ctrl.toggle_plug(printer.plug_ip, True)
elif action == "off":
success = tapo_ctrl.turn_off(printer.plug_ip, printer_id=printer_id)
else:
return False, f"Ungültige Aktion: {action}"
if success:
# Cache invalidieren
with self._cache_lock:
if printer_id in self.cache:
del self.cache[printer_id]
if printer_id in self._last_check:
del self._last_check[printer_id]
db_session.close()
return True, f"Steckdose erfolgreich {action}"
else:
db_session.close()
return False, "Steckdose konnte nicht gesteuert werden"
except Exception as e:
hardware_logger.error(f"Fehler beim Steuern der Steckdose für Drucker {printer_id}: {str(e)}")
return False, str(e)
def check_and_control_for_jobs(self):
"""
Prüft alle Jobs und steuert Steckdosen entsprechend
Diese Methode sollte regelmäßig vom Scheduler aufgerufen werden
"""
try:
db_session = get_db_session()
now = datetime.now()
# Jobs die starten sollten
jobs_to_start = db_session.query(Job).filter(
Job.status == "scheduled",
Job.start_at <= now
).all()
for job in jobs_to_start:
hardware_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}")
success, msg = self.control_plug(job.printer_id, "on")
if success:
job.status = "running"
hardware_logger.info(f"Steckdose für Job {job.id} eingeschaltet")
else:
hardware_logger.error(f"Fehler beim Einschalten für Job {job.id}: {msg}")
# Jobs die enden sollten
jobs_to_end = db_session.query(Job).filter(
Job.status == "running",
Job.end_at <= now
).all()
for job in jobs_to_end:
hardware_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}")
success, msg = self.control_plug(job.printer_id, "off")
if success:
job.status = "finished"
job.actual_end_time = now
hardware_logger.info(f"Steckdose für Job {job.id} ausgeschaltet")
else:
hardware_logger.error(f"Fehler beim Ausschalten für Job {job.id}: {msg}")
db_session.commit()
db_session.close()
except Exception as e:
hardware_logger.error(f"Fehler bei der automatischen Job-Steuerung: {str(e)}")
def get_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, Any]:
"""
Holt den gecachten Status für eine Session
Args:
session_id: Session-ID
printer_ids: Optional - spezifische Drucker-IDs
Returns:
Dict mit Session-spezifischen Status-Daten
"""
try:
with self._session_cache_lock:
session_data = self._session_cache.get(session_id, {})
# Prüfe Cache-Gültigkeit
cache_time = session_data.get('timestamp', datetime.min)
if (datetime.now() - cache_time).total_seconds() > self._session_cache_ttl:
# Cache abgelaufen
self._session_cache.pop(session_id, None)
return self._create_fresh_session_status(session_id, printer_ids)
# Wenn spezifische Drucker angefragt, filtere diese
if printer_ids:
filtered_status = {}
for printer_id in printer_ids:
if str(printer_id) in session_data.get('printers', {}):
filtered_status[str(printer_id)] = session_data['printers'][str(printer_id)]
return {
'timestamp': session_data['timestamp'],
'session_id': session_id,
'printers': filtered_status,
'from_cache': True
}
return session_data
except Exception as e:
hardware_logger.error(f"Fehler beim Abrufen des Session-Status: {str(e)}")
return self._create_fresh_session_status(session_id, printer_ids)
def update_session_status(self, session_id: str, printer_id: int = None) -> bool:
"""
Aktualisiert den Session-Status-Cache
Args:
session_id: Session-ID
printer_id: Optional - spezifischer Drucker
Returns:
bool: True wenn erfolgreich
"""
try:
with self._session_cache_lock:
if printer_id:
# Einzelnen Drucker aktualisieren
printer_status = self.get_live_printer_status(use_session_cache=False)
if session_id not in self._session_cache:
self._session_cache[session_id] = {
'timestamp': datetime.now(),
'session_id': session_id,
'printers': {}
}
self._session_cache[session_id]['printers'][str(printer_id)] = printer_status.get(printer_id, {})
self._session_cache[session_id]['timestamp'] = datetime.now()
else:
# Alle Drucker aktualisieren
self._session_cache[session_id] = self._create_fresh_session_status(session_id)
hardware_logger.debug(f"Session-Status für {session_id} aktualisiert")
return True
except Exception as e:
hardware_logger.error(f"Fehler beim Aktualisieren des Session-Status: {str(e)}")
return False
def clear_session_cache(self, session_id: str = None) -> bool:
"""
Löscht Session-Cache
Args:
session_id: Optional - spezifische Session, sonst alle
Returns:
bool: True wenn erfolgreich
"""
try:
with self._session_cache_lock:
if session_id:
self._session_cache.pop(session_id, None)
hardware_logger.debug(f"Session-Cache für {session_id} gelöscht")
else:
self._session_cache.clear()
hardware_logger.debug("Kompletter Session-Cache gelöscht")
return True
except Exception as e:
hardware_logger.error(f"Fehler beim Löschen des Session-Cache: {str(e)}")
return False
def _create_fresh_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, Any]:
"""
Erstellt frischen Session-Status
Args:
session_id: Session-ID
printer_ids: Optional - spezifische Drucker-IDs
Returns:
Dict mit frischen Status-Daten
"""
try:
db_session = get_db_session()
# Alle oder spezifische Drucker laden
if printer_ids:
printers = db_session.query(Printer).filter(Printer.id.in_(printer_ids)).all()
else:
printers = db_session.query(Printer).all()
session_data = {
'timestamp': datetime.now(),
'session_id': session_id,
'printers': {},
'from_cache': False
}
# Status für jeden Drucker abrufen
status_dict = self.get_live_printer_status(use_session_cache=False)
for printer in printers:
session_data['printers'][str(printer.id)] = status_dict.get(printer.id, {})
# In Session-Cache speichern
with self._session_cache_lock:
self._session_cache[session_id] = session_data
db_session.close()
return session_data
except Exception as e:
hardware_logger.error(f"Fehler beim Erstellen frischen Session-Status: {str(e)}")
return {
'timestamp': datetime.now(),
'session_id': session_id,
'printers': {},
'error': str(e),
'from_cache': False
}
def invalidate_cache(self, printer_id: int = None) -> bool:
"""
Invalidiert Cache für spezifischen Drucker oder alle
Args:
printer_id: Optional - spezifischer Drucker, None = alle Drucker
Returns:
bool: True wenn erfolgreich
"""
try:
with self._cache_lock:
if printer_id is not None:
# Spezifischen Drucker-Cache löschen
self.cache.pop(printer_id, None)
self._last_check.pop(printer_id, None)
hardware_logger.debug(f"Cache für Drucker {printer_id} invalidiert")
else:
# Alle Caches löschen
self.cache.clear()
self._last_check.clear()
hardware_logger.info("Kompletter Status-Cache invalidiert")
return True
except Exception as e:
hardware_logger.error(f"Fehler beim Invalidieren des Cache: {str(e)}")
return False
def force_network_refresh(self) -> Dict[str, Any]:
"""
Forciert komplette Netzwerk-Neuprüfung aller Drucker
Invalidiert alle Caches und führt echte Netzwerk-Tests durch
Returns:
Dict mit Refresh-Ergebnissen
"""
try:
hardware_logger.info("Starte Force-Network-Refresh für alle Drucker")
# Alle Caches invalidieren
self.invalidate_cache()
self.clear_session_cache()
# Tapo-Controller Cache leeren
try:
tapo_ctrl = get_tapo_controller()
if tapo_ctrl and hasattr(tapo_ctrl, 'clear_cache'):
tapo_ctrl.clear_cache()
hardware_logger.debug("Tapo-Controller Cache geleert")
except Exception as e:
hardware_logger.warning(f"Tapo-Controller Cache konnte nicht geleert werden: {str(e)}")
# Frischen Status für alle Drucker abrufen
fresh_status = self.get_live_printer_status(use_session_cache=False)
# Ergebnisse zusammenfassen
results = {
"success": True,
"timestamp": datetime.now().isoformat(),
"printers_refreshed": len(fresh_status),
"printers": fresh_status,
"message": f"Netzwerk-Status für {len(fresh_status)} Drucker erfolgreich aktualisiert"
}
hardware_logger.info(f"Force-Network-Refresh abgeschlossen: {len(fresh_status)} Drucker aktualisiert")
return results
except Exception as e:
hardware_logger.error(f"Fehler beim Force-Network-Refresh: {str(e)}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
"message": "Fehler beim Aktualisieren der Netzwerk-Status"
}
# ===== GLOBALE INSTANZEN =====
_tapo_controller = None
_printer_monitor = None
def get_tapo_controller() -> TapoController:
global _tapo_controller
if _tapo_controller is None:
_tapo_controller = TapoController()
return _tapo_controller
def get_printer_monitor() -> PrinterMonitor:
global _printer_monitor
if _printer_monitor is None:
_printer_monitor = PrinterMonitor()
return _printer_monitor
# ===== LEGACY COMPATIBILITY =====
def toggle_plug(ip: str, state: bool) -> bool:
"""Legacy-Wrapper für Tapo-Steuerung"""
return get_tapo_controller().toggle_plug(ip, state)
# Legacy-Instanzen für Rückwärtskompatibilität
tapo_controller = get_tapo_controller()
printer_monitor = get_printer_monitor()
hardware_logger.info("✅ Hardware Integration Module initialisiert")
hardware_logger.info("📊 Massive Konsolidierung: 2 Dateien → 1 Datei (50% Reduktion)")