1045 lines
43 KiB
Python
1045 lines
43 KiB
Python
#!/usr/bin/env python3.11
|
||
"""
|
||
Hardware Integration - Konsolidierte Hardware-Steuerung
|
||
====================================================
|
||
|
||
Migration Information:
|
||
- Ursprünglich: tapo_controller.py, printer_monitor.py
|
||
- Konsolidiert am: 2025-06-09
|
||
- Funktionalitäten: TP-Link Tapo Smart Plug Control, 3D-Printer Monitoring,
|
||
Hardware Discovery, Status Management
|
||
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
|
||
- Legacy Imports: Verfügbar über Wrapper-Funktionen
|
||
|
||
Changelog:
|
||
- v1.0 (2025-06-09): Initial massive consolidation for IHK project
|
||
|
||
MASSIVE KONSOLIDIERUNG für Projektarbeit MYP - IHK-Dokumentation
|
||
Author: MYP Team - Till Tomczak
|
||
Ziel: 88% Datei-Reduktion bei vollständiger Funktionalitäts-Erhaltung
|
||
"""
|
||
|
||
import time
|
||
import socket
|
||
import threading
|
||
import ipaddress
|
||
import requests
|
||
import subprocess
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Any, Optional, Tuple
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from flask import session
|
||
from sqlalchemy import func
|
||
from sqlalchemy.orm import Session
|
||
|
||
# MYP Models & Utils
|
||
from models import get_db_session, Printer, PlugStatusLog
|
||
from utils.logging_config import get_logger
|
||
|
||
# Logger
|
||
hardware_logger = get_logger("hardware_integration")
|
||
tapo_logger = get_logger("tapo_controller") # Legacy kompatibilität
|
||
monitor_logger = get_logger("printer_monitor") # Legacy kompatibilität
|
||
|
||
# Hardware-Verfügbarkeit prüfen
|
||
try:
|
||
from PyP100 import PyP100
|
||
TAPO_AVAILABLE = True
|
||
hardware_logger.info("✅ PyP100 (TP-Link Tapo) verfügbar")
|
||
except ImportError:
|
||
TAPO_AVAILABLE = False
|
||
hardware_logger.warning("⚠️ PyP100 nicht verfügbar - Tapo-Funktionen eingeschränkt")
|
||
|
||
# Exportierte Funktionen für Legacy-Kompatibilität
|
||
__all__ = [
|
||
# Tapo Controller
|
||
'TapoController', 'get_tapo_controller',
|
||
# Printer Monitor
|
||
'PrinterMonitor', 'get_printer_monitor',
|
||
# Legacy Compatibility
|
||
'toggle_plug', 'test_tapo_connection', 'check_outlet_status',
|
||
'auto_discover_tapo_outlets', 'initialize_all_outlets',
|
||
'printer_monitor', 'tapo_controller'
|
||
]
|
||
|
||
# ===== TAPO SMART PLUG CONTROLLER =====
|
||
|
||
class TapoController:
|
||
"""TP-Link Tapo Smart Plug Controller - Konsolidiert aus tapo_controller.py"""
|
||
|
||
def __init__(self):
|
||
"""Initialisiere den Tapo Controller"""
|
||
# Lazy import um zirkuläre Abhängigkeiten zu vermeiden
|
||
try:
|
||
from utils.utilities_collection import TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_TIMEOUT, TAPO_RETRY_COUNT
|
||
self.username = TAPO_USERNAME
|
||
self.password = TAPO_PASSWORD
|
||
self.default_ips = DEFAULT_TAPO_IPS
|
||
self.timeout = TAPO_TIMEOUT
|
||
self.retry_count = TAPO_RETRY_COUNT
|
||
except ImportError:
|
||
# Fallback-Werte
|
||
self.username = "admin"
|
||
self.password = "admin"
|
||
self.default_ips = []
|
||
self.timeout = 10
|
||
self.retry_count = 3
|
||
|
||
self.auto_discovered = False
|
||
|
||
if not TAPO_AVAILABLE:
|
||
tapo_logger.error("❌ PyP100-modul nicht installiert - tapo-funktionalität eingeschränkt")
|
||
else:
|
||
tapo_logger.info("✅ tapo controller initialisiert")
|
||
|
||
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool:
|
||
"""
|
||
Schaltet eine TP-Link Tapo P100/P110-Steckdose ein oder aus
|
||
|
||
Args:
|
||
ip: IP-Adresse der Steckdose
|
||
state: True = ein, False = aus
|
||
username: Benutzername (optional, nutzt Standard wenn nicht angegeben)
|
||
password: Passwort (optional, nutzt Standard wenn nicht angegeben)
|
||
|
||
Returns:
|
||
bool: True wenn erfolgreich geschaltet
|
||
"""
|
||
if not TAPO_AVAILABLE:
|
||
tapo_logger.error("❌ PyP100-modul nicht installiert - steckdose kann nicht geschaltet werden")
|
||
return False
|
||
|
||
# Immer globale Anmeldedaten verwenden
|
||
username = self.username
|
||
password = self.password
|
||
|
||
tapo_logger.debug(f"🔧 verwende globale tapo-anmeldedaten für {ip}")
|
||
|
||
for attempt in range(self.retry_count):
|
||
try:
|
||
# P100-Verbindung herstellen
|
||
p100 = PyP100.P100(ip, username, password)
|
||
p100.handshake()
|
||
p100.login()
|
||
|
||
# Steckdose schalten
|
||
if state:
|
||
p100.turnOn()
|
||
tapo_logger.info(f"✅ tapo-steckdose {ip} erfolgreich eingeschaltet")
|
||
else:
|
||
p100.turnOff()
|
||
tapo_logger.info(f"✅ tapo-steckdose {ip} erfolgreich ausgeschaltet")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
action = "ein" if state else "aus"
|
||
tapo_logger.warning(f"⚠️ versuch {attempt+1}/{self.retry_count} fehlgeschlagen beim {action}schalten von {ip}: {str(e)}")
|
||
|
||
if attempt < self.retry_count - 1:
|
||
time.sleep(1) # Kurze pause vor erneutem versuch
|
||
else:
|
||
tapo_logger.error(f"❌ fehler beim {action}schalten der tapo-steckdose {ip}: {str(e)}")
|
||
|
||
return False
|
||
|
||
def turn_off(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool:
|
||
"""
|
||
Schaltet eine TP-Link Tapo P110-Steckdose aus
|
||
|
||
Args:
|
||
ip: IP-Adresse der Steckdose
|
||
username: Benutzername (optional)
|
||
password: Passwort (optional)
|
||
printer_id: ID des zugehörigen Druckers für Logging (optional)
|
||
|
||
Returns:
|
||
bool: True wenn erfolgreich ausgeschaltet
|
||
"""
|
||
if not TAPO_AVAILABLE:
|
||
tapo_logger.error("⚠️ PyP100-modul nicht verfügbar - kann tapo-steckdose nicht schalten")
|
||
self._log_plug_status(printer_id, "disconnected", ip, error_message="PyP100-modul nicht verfügbar")
|
||
return False
|
||
|
||
# Immer globale Anmeldedaten verwenden
|
||
username = self.username
|
||
password = self.password
|
||
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# TP-Link Tapo P100 Verbindung herstellen
|
||
p100 = PyP100.P100(ip, username, password)
|
||
p100.handshake()
|
||
p100.login()
|
||
|
||
# Steckdose ausschalten
|
||
p100.turnOff()
|
||
|
||
response_time = int((time.time() - start_time) * 1000) # in millisekunden
|
||
tapo_logger.debug(f"✅ tapo-steckdose {ip} erfolgreich ausgeschaltet")
|
||
|
||
# Logging: erfolgreich ausgeschaltet
|
||
self._log_plug_status(printer_id, "off", ip, response_time_ms=response_time)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
response_time = int((time.time() - start_time) * 1000)
|
||
tapo_logger.debug(f"⚠️ fehler beim ausschalten der tapo-steckdose {ip}: {str(e)}")
|
||
|
||
# Logging: fehlgeschlagener versuch
|
||
self._log_plug_status(printer_id, "disconnected", ip,
|
||
response_time_ms=response_time,
|
||
error_message=str(e))
|
||
|
||
return False
|
||
|
||
def check_outlet_status(self, ip: str, username: str = None, password: str = None,
|
||
printer_id: int = None) -> Tuple[bool, str]:
|
||
"""
|
||
Überprüft den Status einer TP-Link Tapo P110-Steckdose
|
||
|
||
Args:
|
||
ip: IP-Adresse der Steckdose
|
||
username: Benutzername (optional)
|
||
password: Passwort (optional)
|
||
printer_id: ID des zugehörigen Druckers für Logging (optional)
|
||
|
||
Returns:
|
||
Tuple[bool, str]: (erreichbar, status) - status: "on", "off", "unknown"
|
||
"""
|
||
if not TAPO_AVAILABLE:
|
||
tapo_logger.debug("⚠️ PyP100-modul nicht verfügbar - kann tapo-steckdosen-status nicht abfragen")
|
||
self._log_plug_status(printer_id, "disconnected", ip,
|
||
error_message="PyP100-modul nicht verfügbar",
|
||
notes="status-check fehlgeschlagen")
|
||
return False, "unknown"
|
||
|
||
# Immer globale Anmeldedaten verwenden
|
||
username = self.username
|
||
password = self.password
|
||
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# TP-Link Tapo P100 Verbindung herstellen
|
||
p100 = PyP100.P100(ip, username, password)
|
||
p100.handshake()
|
||
p100.login()
|
||
|
||
# Geräteinformationen abrufen
|
||
device_info = p100.getDeviceInfo()
|
||
|
||
# Status auswerten
|
||
device_on = device_info.get('device_on', False)
|
||
status = "on" if device_on else "off"
|
||
|
||
response_time = int((time.time() - start_time) * 1000)
|
||
tapo_logger.debug(f"✅ tapo-steckdose {ip}: status = {status}")
|
||
|
||
# Erweiterte Informationen sammeln
|
||
extra_info = self._collect_device_info(p100, device_info)
|
||
|
||
# Logging: erfolgreicher status-check
|
||
self._log_plug_status(printer_id, status, ip,
|
||
response_time_ms=response_time,
|
||
power_consumption=extra_info.get('power_consumption'),
|
||
voltage=extra_info.get('voltage'),
|
||
current=extra_info.get('current'),
|
||
firmware_version=extra_info.get('firmware_version'),
|
||
notes="automatischer status-check")
|
||
|
||
return True, status
|
||
|
||
except Exception as e:
|
||
response_time = int((time.time() - start_time) * 1000)
|
||
tapo_logger.debug(f"⚠️ fehler bei tapo-steckdosen-status-check {ip}: {str(e)}")
|
||
|
||
# Logging: fehlgeschlagener status-check
|
||
self._log_plug_status(printer_id, "disconnected", ip,
|
||
response_time_ms=response_time,
|
||
error_message=str(e),
|
||
notes="status-check fehlgeschlagen")
|
||
|
||
return False, "unknown"
|
||
|
||
def test_connection(self, ip: str, username: str = None, password: str = None) -> dict:
|
||
"""
|
||
Testet die Verbindung zu einer TP-Link Tapo P110-Steckdose
|
||
|
||
Args:
|
||
ip: IP-Adresse der Steckdose
|
||
username: Benutzername (optional)
|
||
password: Passwort (optional)
|
||
|
||
Returns:
|
||
dict: Ergebnis mit Status und Informationen
|
||
"""
|
||
result = {
|
||
"success": False,
|
||
"message": "",
|
||
"device_info": None,
|
||
"error": None
|
||
}
|
||
|
||
if not TAPO_AVAILABLE:
|
||
result["message"] = "PyP100-modul nicht verfügbar"
|
||
result["error"] = "ModuleNotFound"
|
||
tapo_logger.error("PyP100-modul nicht verfügbar - kann tapo-steckdosen nicht testen")
|
||
return result
|
||
|
||
# Verwende globale Anmeldedaten falls nicht angegeben
|
||
if not username or not password:
|
||
username = self.username
|
||
password = self.password
|
||
tapo_logger.debug(f"verwende globale tapo-anmeldedaten für {ip}")
|
||
|
||
try:
|
||
# TP-Link Tapo P100 Verbindung herstellen
|
||
p100 = PyP100.P100(ip, username, password)
|
||
p100.handshake()
|
||
p100.login()
|
||
|
||
# Geräteinformationen abrufen
|
||
device_info = p100.getDeviceInfo()
|
||
|
||
result["success"] = True
|
||
result["message"] = "verbindung erfolgreich"
|
||
result["device_info"] = device_info
|
||
|
||
tapo_logger.info(f"tapo-verbindung zu {ip} erfolgreich: {device_info.get('nickname', 'unbekannt')}")
|
||
|
||
except Exception as e:
|
||
result["success"] = False
|
||
result["message"] = f"verbindungsfehler: {str(e)}"
|
||
result["error"] = str(e)
|
||
tapo_logger.error(f"fehler bei tapo-test zu {ip}: {str(e)}")
|
||
|
||
return result
|
||
|
||
def ping_address(self, ip: str, timeout: int = 3) -> bool:
|
||
"""
|
||
Führt einen Konnektivitätstest zu einer IP-Adresse durch
|
||
Verwendet TCP-Verbindung statt Ping für bessere Kompatibilität
|
||
|
||
Args:
|
||
ip: Zu testende IP-Adresse
|
||
timeout: Timeout in Sekunden
|
||
|
||
Returns:
|
||
bool: True wenn Verbindung erfolgreich
|
||
"""
|
||
try:
|
||
# IP-Adresse validieren
|
||
ipaddress.ip_address(ip.strip())
|
||
|
||
# Standard-Ports für Tapo-Steckdosen testen
|
||
test_ports = [9999, 80, 443] # Tapo-Standard, HTTP, HTTPS
|
||
|
||
for port in test_ports:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(timeout)
|
||
result = sock.connect_ex((ip.strip(), port))
|
||
sock.close()
|
||
|
||
if result == 0:
|
||
tapo_logger.debug(f"✅ verbindung zu {ip}:{port} erfolgreich")
|
||
return True
|
||
|
||
tapo_logger.debug(f"❌ keine verbindung zu {ip} auf standard-ports möglich")
|
||
return False
|
||
|
||
except Exception as e:
|
||
tapo_logger.debug(f"❌ fehler beim verbindungstest zu {ip}: {str(e)}")
|
||
return False
|
||
|
||
def auto_discover_outlets(self) -> Dict[str, bool]:
|
||
"""
|
||
Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk
|
||
|
||
Returns:
|
||
Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel
|
||
"""
|
||
if self.auto_discovered:
|
||
tapo_logger.info("🔍 tapo-steckdosen wurden bereits erkannt")
|
||
return {}
|
||
|
||
tapo_logger.info("🔍 starte automatische tapo-steckdosenerkennung...")
|
||
results = {}
|
||
start_time = time.time()
|
||
|
||
# Standard-IPs aus der Konfiguration testen
|
||
tapo_logger.info(f"🔄 teste {len(self.default_ips)} standard-ips aus der konfiguration")
|
||
|
||
for i, ip in enumerate(self.default_ips):
|
||
try:
|
||
tapo_logger.info(f"🔍 teste ip {i+1}/{len(self.default_ips)}: {ip}")
|
||
|
||
# Schneller Ping-Test
|
||
if self.ping_address(ip, timeout=2):
|
||
tapo_logger.info(f"✅ steckdose mit ip {ip} ist erreichbar")
|
||
|
||
# Tapo-Verbindung testen
|
||
test_result = self.test_connection(ip)
|
||
|
||
if test_result["success"]:
|
||
device_info = test_result["device_info"]
|
||
nickname = device_info.get('nickname', f"tapo p110 ({ip})")
|
||
state = "on" if device_info.get('device_on', False) else "off"
|
||
|
||
tapo_logger.info(f"✅ tapo-steckdose '{nickname}' ({ip}) gefunden - status: {state}")
|
||
results[ip] = True
|
||
|
||
# Steckdose in Datenbank speichern/aktualisieren
|
||
try:
|
||
self._ensure_outlet_in_database(ip, nickname)
|
||
except Exception as db_error:
|
||
tapo_logger.warning(f"⚠️ fehler beim speichern in db für {ip}: {str(db_error)}")
|
||
else:
|
||
tapo_logger.debug(f"❌ ip {ip} ist erreichbar, aber keine tapo-steckdose")
|
||
results[ip] = False
|
||
else:
|
||
tapo_logger.debug(f"❌ ip {ip} nicht erreichbar")
|
||
results[ip] = False
|
||
|
||
except Exception as e:
|
||
tapo_logger.warning(f"❌ fehler bei steckdosen-erkennung für ip {ip}: {str(e)}")
|
||
results[ip] = False
|
||
continue
|
||
|
||
# Erfolgsstatistik
|
||
success_count = sum(1 for success in results.values() if success)
|
||
elapsed_time = time.time() - start_time
|
||
|
||
tapo_logger.info(f"✅ steckdosen-erkennung abgeschlossen: {success_count}/{len(results)} steckdosen gefunden in {elapsed_time:.1f}s")
|
||
|
||
self.auto_discovered = True
|
||
return results
|
||
|
||
def initialize_all_outlets(self) -> Dict[str, bool]:
|
||
"""
|
||
Schaltet alle gespeicherten Steckdosen aus (einheitlicher Startzustand)
|
||
|
||
Returns:
|
||
Dict[str, bool]: Ergebnis der Initialisierung pro Drucker
|
||
"""
|
||
tapo_logger.info("🚀 starte steckdosen-initialisierung...")
|
||
results = {}
|
||
|
||
try:
|
||
db_session = get_db_session()
|
||
printers = db_session.query(Printer).filter(Printer.active == True).all()
|
||
|
||
if not printers:
|
||
tapo_logger.warning("⚠️ keine aktiven drucker zur initialisierung gefunden")
|
||
db_session.close()
|
||
return results
|
||
|
||
# Alle Steckdosen ausschalten
|
||
for printer in printers:
|
||
try:
|
||
if printer.plug_ip:
|
||
success = self.turn_off(
|
||
printer.plug_ip,
|
||
printer_id=printer.id
|
||
)
|
||
|
||
results[printer.name] = success
|
||
|
||
if success:
|
||
tapo_logger.info(f"✅ {printer.name}: steckdose ausgeschaltet")
|
||
printer.status = "offline"
|
||
printer.last_checked = datetime.now()
|
||
else:
|
||
tapo_logger.warning(f"❌ {printer.name}: steckdose konnte nicht ausgeschaltet werden")
|
||
else:
|
||
tapo_logger.warning(f"⚠️ {printer.name}: keine steckdosen-ip konfiguriert")
|
||
results[printer.name] = False
|
||
|
||
except Exception as e:
|
||
tapo_logger.error(f"❌ fehler bei initialisierung von {printer.name}: {str(e)}")
|
||
results[printer.name] = False
|
||
|
||
# Änderungen speichern
|
||
db_session.commit()
|
||
db_session.close()
|
||
|
||
success_count = sum(1 for success in results.values() if success)
|
||
total_count = len(results)
|
||
|
||
tapo_logger.info(f"🎯 steckdosen-initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich")
|
||
|
||
except Exception as e:
|
||
tapo_logger.error(f"❌ kritischer fehler bei steckdosen-initialisierung: {str(e)}")
|
||
|
||
return results
|
||
|
||
def get_all_outlet_status(self) -> Dict[str, Dict[str, Any]]:
|
||
"""
|
||
Holt den Status aller konfigurierten Tapo-Steckdosen
|
||
|
||
Returns:
|
||
Dict[str, Dict]: Status aller Steckdosen mit IP als Schlüssel
|
||
"""
|
||
status_dict = {}
|
||
|
||
try:
|
||
db_session = get_db_session()
|
||
printers = db_session.query(Printer).filter(
|
||
Printer.active == True,
|
||
Printer.plug_ip.isnot(None)
|
||
).all()
|
||
|
||
if not printers:
|
||
tapo_logger.info("ℹ️ keine drucker mit tapo-steckdosen konfiguriert")
|
||
db_session.close()
|
||
return status_dict
|
||
|
||
tapo_logger.info(f"🔍 prüfe status von {len(printers)} tapo-steckdosen...")
|
||
|
||
# Parallel-Status-Prüfung
|
||
with ThreadPoolExecutor(max_workers=min(len(printers), 8)) as executor:
|
||
future_to_printer = {
|
||
executor.submit(
|
||
self.check_outlet_status,
|
||
printer.plug_ip,
|
||
printer_id=printer.id
|
||
): printer
|
||
for printer in printers
|
||
}
|
||
|
||
for future in as_completed(future_to_printer, timeout=15):
|
||
printer = future_to_printer[future]
|
||
try:
|
||
reachable, status = future.result()
|
||
status_dict[printer.plug_ip] = {
|
||
"printer_name": printer.name,
|
||
"printer_id": printer.id,
|
||
"reachable": reachable,
|
||
"status": status,
|
||
"ip": printer.plug_ip,
|
||
"last_checked": datetime.now().isoformat()
|
||
}
|
||
except Exception as e:
|
||
tapo_logger.error(f"❌ fehler bei status-check für {printer.name}: {str(e)}")
|
||
status_dict[printer.plug_ip] = {
|
||
"printer_name": printer.name,
|
||
"printer_id": printer.id,
|
||
"reachable": False,
|
||
"status": "error",
|
||
"ip": printer.plug_ip,
|
||
"error": str(e),
|
||
"last_checked": datetime.now().isoformat()
|
||
}
|
||
|
||
db_session.close()
|
||
tapo_logger.info(f"✅ status-update abgeschlossen für {len(status_dict)} steckdosen")
|
||
|
||
except Exception as e:
|
||
tapo_logger.error(f"❌ kritischer fehler beim abrufen des steckdosen-status: {str(e)}")
|
||
|
||
return status_dict
|
||
|
||
def _collect_device_info(self, p100: 'PyP100.P100', device_info: dict) -> dict:
|
||
"""
|
||
Sammelt erweiterte Geräteinformationen von der Tapo-Steckdose
|
||
|
||
Args:
|
||
p100: P100-Instanz
|
||
device_info: Basis-Geräteinformationen
|
||
|
||
Returns:
|
||
Dict: Erweiterte Informationen
|
||
"""
|
||
extra_info = {}
|
||
|
||
try:
|
||
# Stromverbrauch abrufen (nur bei P110)
|
||
try:
|
||
energy_usage = p100.getEnergyUsage()
|
||
if energy_usage:
|
||
extra_info['power_consumption'] = energy_usage.get('current_power', 0)
|
||
extra_info['voltage'] = energy_usage.get('voltage_mv', 0) / 1000.0
|
||
extra_info['current'] = energy_usage.get('current_ma', 0) / 1000.0
|
||
except:
|
||
pass # P100 unterstützt keine Energiemessung
|
||
|
||
# Firmware-Version
|
||
extra_info['firmware_version'] = device_info.get('fw_ver', 'unknown')
|
||
extra_info['hardware_version'] = device_info.get('hw_ver', 'unknown')
|
||
extra_info['device_id'] = device_info.get('device_id', 'unknown')
|
||
extra_info['mac_address'] = device_info.get('mac', 'unknown')
|
||
|
||
except Exception as e:
|
||
tapo_logger.debug(f"Konnte erweiterte Geräteinformationen nicht abrufen: {e}")
|
||
|
||
return extra_info
|
||
|
||
def _log_plug_status(self, printer_id: int, status: str, ip_address: str, **kwargs):
|
||
"""
|
||
Loggt den Status einer Steckdose in die Datenbank
|
||
|
||
Args:
|
||
printer_id: ID des zugehörigen Druckers
|
||
status: Status der Steckdose ("on", "off", "disconnected")
|
||
ip_address: IP-Adresse der Steckdose
|
||
**kwargs: Zusätzliche Informationen für das Log
|
||
"""
|
||
try:
|
||
db_session = get_db_session()
|
||
|
||
log_entry = PlugStatusLog(
|
||
printer_id=printer_id,
|
||
status=status,
|
||
ip_address=ip_address,
|
||
response_time_ms=kwargs.get('response_time_ms'),
|
||
power_consumption=kwargs.get('power_consumption'),
|
||
voltage=kwargs.get('voltage'),
|
||
current=kwargs.get('current'),
|
||
error_message=kwargs.get('error_message'),
|
||
firmware_version=kwargs.get('firmware_version'),
|
||
notes=kwargs.get('notes'),
|
||
timestamp=datetime.now()
|
||
)
|
||
|
||
db_session.add(log_entry)
|
||
db_session.commit()
|
||
db_session.close()
|
||
|
||
except Exception as e:
|
||
tapo_logger.debug(f"Fehler beim Loggen des Plug-Status: {e}")
|
||
|
||
def _ensure_outlet_in_database(self, ip_address: str, nickname: str = None) -> bool:
|
||
"""
|
||
Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert
|
||
|
||
Args:
|
||
ip_address: IP-Adresse der Steckdose
|
||
nickname: Name der Steckdose (optional)
|
||
|
||
Returns:
|
||
bool: True wenn erfolgreich in Datenbank gespeichert/aktualisiert
|
||
"""
|
||
try:
|
||
db_session = get_db_session()
|
||
|
||
# Prüfen, ob bereits ein Drucker mit dieser Steckdosen-IP existiert
|
||
existing_printer = db_session.query(Printer).filter(
|
||
Printer.plug_ip == ip_address
|
||
).first()
|
||
|
||
if existing_printer:
|
||
tapo_logger.debug(f"Steckdose {ip_address} bereits mit Drucker {existing_printer.name} verknüpft")
|
||
db_session.close()
|
||
return True
|
||
|
||
# Neuen Drucker-Eintrag für die Steckdose erstellen
|
||
printer_name = nickname or f"Tapo Plug {ip_address}"
|
||
|
||
new_printer = Printer(
|
||
name=printer_name,
|
||
ip_address=ip_address, # Gleiche IP für Drucker und Steckdose
|
||
plug_ip=ip_address,
|
||
location="Automatisch erkannt",
|
||
active=True,
|
||
status="offline",
|
||
plug_username=self.username,
|
||
plug_password=self.password,
|
||
last_checked=datetime.now()
|
||
)
|
||
|
||
db_session.add(new_printer)
|
||
db_session.commit()
|
||
|
||
tapo_logger.info(f"✅ Neue Tapo-Steckdose '{printer_name}' ({ip_address}) in Datenbank gespeichert")
|
||
db_session.close()
|
||
return True
|
||
|
||
except Exception as e:
|
||
tapo_logger.error(f"❌ Fehler beim Speichern der Steckdose {ip_address} in Datenbank: {str(e)}")
|
||
return False
|
||
|
||
def _collect_device_info(self, p110, device_info):
|
||
"""
|
||
Sammelt erweiterte Geräteinformationen einschließlich Energiedaten.
|
||
|
||
Args:
|
||
p110: PyP110 Instanz
|
||
device_info: Basis-Geräteinformationen
|
||
|
||
Returns:
|
||
Dict: Erweiterte Geräteinformationen
|
||
"""
|
||
extra_info = {}
|
||
|
||
try:
|
||
# Firmware-Version extrahieren
|
||
if 'fw_ver' in device_info.get('result', {}):
|
||
extra_info['firmware_version'] = device_info['result']['fw_ver']
|
||
|
||
# Energiedaten abrufen (nur für P110)
|
||
if 'P110' in device_info.get('result', {}).get('model', ''):
|
||
try:
|
||
energy_usage = p110.getEnergyUsage()
|
||
|
||
if energy_usage and 'result' in energy_usage:
|
||
energy_data = energy_usage['result']
|
||
|
||
# Aktuelle Leistungsdaten
|
||
extra_info['current_power'] = energy_data.get('current_power', 0) / 1000 # mW zu W
|
||
extra_info['power_consumption'] = extra_info['current_power']
|
||
|
||
# Historische Energiedaten
|
||
extra_info['today_energy'] = energy_data.get('today_energy', 0)
|
||
extra_info['month_energy'] = energy_data.get('month_energy', 0)
|
||
extra_info['today_runtime'] = energy_data.get('today_runtime', 0)
|
||
extra_info['month_runtime'] = energy_data.get('month_runtime', 0)
|
||
|
||
# 24h Verbrauchsdaten
|
||
extra_info['past24h'] = energy_data.get('past24h', [])
|
||
extra_info['past30d'] = energy_data.get('past30d', [])
|
||
extra_info['past1y'] = energy_data.get('past1y', [])
|
||
|
||
# Zusätzliche Metriken
|
||
if 'voltage' in energy_data:
|
||
extra_info['voltage'] = energy_data['voltage'] / 1000 # mV zu V
|
||
if 'current' in energy_data:
|
||
extra_info['current'] = energy_data['current'] / 1000 # mA zu A
|
||
|
||
hardware_logger.debug(f"Energiedaten erfolgreich abgerufen: {extra_info['current_power']}W")
|
||
|
||
except Exception as e:
|
||
hardware_logger.warning(f"Konnte Energiedaten nicht abrufen: {str(e)}")
|
||
|
||
except Exception as e:
|
||
hardware_logger.warning(f"Fehler beim Sammeln erweiterter Geräteinformationen: {str(e)}")
|
||
|
||
return extra_info
|
||
|
||
def get_energy_statistics(self) -> Dict[str, Any]:
|
||
"""
|
||
Sammelt Energiestatistiken von allen P110 Steckdosen.
|
||
|
||
Returns:
|
||
Dict: Aggregierte Energiestatistiken
|
||
"""
|
||
hardware_logger.info("🔋 Sammle Energiestatistiken von allen P110 Steckdosen...")
|
||
|
||
try:
|
||
db_session = get_db_session()
|
||
printers = db_session.query(Printer).filter(
|
||
Printer.active == True,
|
||
Printer.plug_ip.isnot(None)
|
||
).all()
|
||
|
||
statistics = {
|
||
'total_devices': 0,
|
||
'online_devices': 0,
|
||
'total_current_power': 0.0,
|
||
'total_today_energy': 0.0,
|
||
'total_month_energy': 0.0,
|
||
'devices': [],
|
||
'hourly_consumption': [0] * 24, # 24 Stunden
|
||
'daily_consumption': [0] * 30, # 30 Tage
|
||
'monthly_consumption': [0] * 12, # 12 Monate
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
|
||
for printer in printers:
|
||
device_stats = {
|
||
'id': printer.id,
|
||
'name': printer.name,
|
||
'location': printer.location,
|
||
'model': printer.model,
|
||
'ip': printer.plug_ip,
|
||
'online': False,
|
||
'current_power': 0.0,
|
||
'today_energy': 0.0,
|
||
'month_energy': 0.0,
|
||
'voltage': 0.0,
|
||
'current': 0.0,
|
||
'past24h': [],
|
||
'past30d': [],
|
||
'past1y': []
|
||
}
|
||
|
||
statistics['total_devices'] += 1
|
||
|
||
try:
|
||
# P110 Energiedaten abrufen
|
||
p110 = PyP100.P110(printer.plug_ip, self.username, self.password)
|
||
p110.handshake()
|
||
p110.login()
|
||
|
||
# Geräteinformationen
|
||
device_info = p110.getDeviceInfo()
|
||
if not device_info or 'result' not in device_info:
|
||
continue
|
||
|
||
# Nur P110 Geräte verarbeiten
|
||
if 'P110' not in device_info['result'].get('model', ''):
|
||
continue
|
||
|
||
# Energiedaten abrufen
|
||
energy_usage = p110.getEnergyUsage()
|
||
if energy_usage and 'result' in energy_usage:
|
||
energy_data = energy_usage['result']
|
||
|
||
device_stats['online'] = True
|
||
device_stats['current_power'] = energy_data.get('current_power', 0) / 1000 # mW zu W
|
||
device_stats['today_energy'] = energy_data.get('today_energy', 0)
|
||
device_stats['month_energy'] = energy_data.get('month_energy', 0)
|
||
device_stats['past24h'] = energy_data.get('past24h', [])
|
||
device_stats['past30d'] = energy_data.get('past30d', [])
|
||
device_stats['past1y'] = energy_data.get('past1y', [])
|
||
|
||
# Aggregierte Werte
|
||
statistics['online_devices'] += 1
|
||
statistics['total_current_power'] += device_stats['current_power']
|
||
statistics['total_today_energy'] += device_stats['today_energy']
|
||
statistics['total_month_energy'] += device_stats['month_energy']
|
||
|
||
# Stündliche Daten aggregieren (letzten 24h)
|
||
if device_stats['past24h']:
|
||
for i, hourly_value in enumerate(device_stats['past24h'][:24]):
|
||
if i < len(statistics['hourly_consumption']):
|
||
statistics['hourly_consumption'][i] += hourly_value
|
||
|
||
# Tägliche Daten aggregieren (letzten 30 Tage)
|
||
if device_stats['past30d']:
|
||
for i, daily_value in enumerate(device_stats['past30d'][:30]):
|
||
if i < len(statistics['daily_consumption']):
|
||
statistics['daily_consumption'][i] += daily_value
|
||
|
||
# Monatliche Daten aggregieren (letzten 12 Monate)
|
||
if device_stats['past1y']:
|
||
for i, monthly_value in enumerate(device_stats['past1y'][:12]):
|
||
if i < len(statistics['monthly_consumption']):
|
||
statistics['monthly_consumption'][i] += monthly_value
|
||
|
||
hardware_logger.debug(f"✅ Energiedaten für {printer.name}: {device_stats['current_power']}W")
|
||
|
||
except Exception as e:
|
||
hardware_logger.warning(f"⚠️ Konnte Energiedaten für {printer.name} nicht abrufen: {str(e)}")
|
||
|
||
statistics['devices'].append(device_stats)
|
||
|
||
db_session.close()
|
||
|
||
# Durchschnittswerte berechnen
|
||
if statistics['online_devices'] > 0:
|
||
statistics['avg_current_power'] = statistics['total_current_power'] / statistics['online_devices']
|
||
statistics['avg_today_energy'] = statistics['total_today_energy'] / statistics['online_devices']
|
||
statistics['avg_month_energy'] = statistics['total_month_energy'] / statistics['online_devices']
|
||
else:
|
||
statistics['avg_current_power'] = 0.0
|
||
statistics['avg_today_energy'] = 0.0
|
||
statistics['avg_month_energy'] = 0.0
|
||
|
||
hardware_logger.info(f"✅ Energiestatistiken erfolgreich gesammelt: {statistics['online_devices']}/{statistics['total_devices']} Geräte online")
|
||
hardware_logger.info(f"📊 Gesamtverbrauch: {statistics['total_current_power']:.1f}W aktuell, {statistics['total_today_energy']}Wh heute")
|
||
|
||
return statistics
|
||
|
||
except Exception as e:
|
||
hardware_logger.error(f"❌ Fehler beim Sammeln der Energiestatistiken: {str(e)}")
|
||
return {
|
||
'total_devices': 0,
|
||
'online_devices': 0,
|
||
'total_current_power': 0.0,
|
||
'total_today_energy': 0.0,
|
||
'total_month_energy': 0.0,
|
||
'devices': [],
|
||
'hourly_consumption': [0] * 24,
|
||
'daily_consumption': [0] * 30,
|
||
'monthly_consumption': [0] * 12,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'error': str(e)
|
||
}
|
||
|
||
# ===== PRINTER MONITOR =====
|
||
|
||
class PrinterMonitor:
|
||
"""3D-Drucker Monitor"""
|
||
|
||
def __init__(self):
|
||
self.cache = {}
|
||
self._cache_timeout = 300 # 5 Minuten Cache
|
||
hardware_logger.info("✅ Printer Monitor initialisiert")
|
||
|
||
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
|
||
"""
|
||
Holt Live-Druckerstatus mit Cache-Unterstützung.
|
||
|
||
Args:
|
||
use_session_cache: Ob Cache verwendet werden soll
|
||
|
||
Returns:
|
||
Dict: Druckerstatus mit Drucker-ID als Schlüssel
|
||
"""
|
||
try:
|
||
# Cache prüfen wenn aktiviert
|
||
if use_session_cache and 'live_status' in self.cache:
|
||
cache_entry = self.cache['live_status']
|
||
if (datetime.now() - cache_entry['timestamp']).total_seconds() < self._cache_timeout:
|
||
hardware_logger.debug("Live-Status aus Cache abgerufen")
|
||
return cache_entry['data']
|
||
|
||
db_session = get_db_session()
|
||
printers = db_session.query(Printer).filter(Printer.active == True).all()
|
||
|
||
status_dict = {}
|
||
for printer in printers:
|
||
# Basis-Status
|
||
printer_status = {
|
||
"id": printer.id,
|
||
"name": printer.name,
|
||
"model": printer.model,
|
||
"location": printer.location,
|
||
"status": printer.status,
|
||
"ip_address": printer.ip_address,
|
||
"plug_ip": printer.plug_ip,
|
||
"has_plug": bool(printer.plug_ip),
|
||
"active": printer.active,
|
||
"last_checked": printer.last_checked.isoformat() if printer.last_checked else None,
|
||
"created_at": printer.created_at.isoformat() if printer.created_at else None
|
||
}
|
||
|
||
# Tapo-Status wenn verfügbar
|
||
if printer.plug_ip and TAPO_AVAILABLE:
|
||
try:
|
||
tapo_controller = get_tapo_controller()
|
||
reachable, plug_status = tapo_controller.check_outlet_status(
|
||
printer.plug_ip, printer_id=printer.id
|
||
)
|
||
printer_status.update({
|
||
"plug_reachable": reachable,
|
||
"plug_status": plug_status,
|
||
"can_control": reachable
|
||
})
|
||
except Exception as e:
|
||
hardware_logger.error(f"Tapo-Status-Fehler für {printer.name}: {e}")
|
||
printer_status.update({
|
||
"plug_reachable": False,
|
||
"plug_status": "error",
|
||
"can_control": False,
|
||
"error": str(e)
|
||
})
|
||
else:
|
||
printer_status.update({
|
||
"plug_reachable": False,
|
||
"plug_status": "no_plug",
|
||
"can_control": False
|
||
})
|
||
|
||
status_dict[printer.id] = printer_status
|
||
|
||
db_session.close()
|
||
|
||
# Cache aktualisieren
|
||
if use_session_cache:
|
||
self.cache['live_status'] = {
|
||
'data': status_dict,
|
||
'timestamp': datetime.now()
|
||
}
|
||
|
||
hardware_logger.info(f"Live-Status für {len(status_dict)} Drucker abgerufen")
|
||
return status_dict
|
||
|
||
except Exception as e:
|
||
hardware_logger.error(f"Status-Fehler: {e}")
|
||
return {}
|
||
|
||
def get_printer_summary(self) -> Dict[str, Any]:
|
||
"""
|
||
Erstellt eine Zusammenfassung des Druckerstatus.
|
||
|
||
Returns:
|
||
Dict: Zusammenfassung mit Zählern und Statistiken
|
||
"""
|
||
try:
|
||
status_data = self.get_live_printer_status(use_session_cache=True)
|
||
|
||
summary = {
|
||
'total': len(status_data),
|
||
'online': 0,
|
||
'offline': 0,
|
||
'standby': 0,
|
||
'unreachable': 0,
|
||
'with_plug': 0,
|
||
'plug_online': 0,
|
||
'plug_offline': 0
|
||
}
|
||
|
||
for printer_id, printer_data in status_data.items():
|
||
status = printer_data.get('status', 'offline')
|
||
|
||
# Status-Zähler
|
||
if status == 'online':
|
||
summary['online'] += 1
|
||
elif status == 'standby':
|
||
summary['standby'] += 1
|
||
elif status == 'unreachable':
|
||
summary['unreachable'] += 1
|
||
else:
|
||
summary['offline'] += 1
|
||
|
||
# Plug-Zähler
|
||
if printer_data.get('has_plug'):
|
||
summary['with_plug'] += 1
|
||
plug_status = printer_data.get('plug_status', 'unknown')
|
||
if plug_status == 'on':
|
||
summary['plug_online'] += 1
|
||
elif plug_status == 'off':
|
||
summary['plug_offline'] += 1
|
||
|
||
return summary
|
||
|
||
except Exception as e:
|
||
hardware_logger.error(f"Summary-Fehler: {e}")
|
||
return {
|
||
'total': 0,
|
||
'online': 0,
|
||
'offline': 0,
|
||
'standby': 0,
|
||
'unreachable': 0,
|
||
'with_plug': 0,
|
||
'plug_online': 0,
|
||
'plug_offline': 0
|
||
}
|
||
|
||
def clear_all_caches(self):
|
||
"""Leert alle Caches des Printer Monitors."""
|
||
self.cache.clear()
|
||
hardware_logger.debug("Printer Monitor Cache geleert")
|
||
|
||
# ===== GLOBALE INSTANZEN =====
|
||
|
||
_tapo_controller = None
|
||
_printer_monitor = None
|
||
|
||
def get_tapo_controller() -> TapoController:
|
||
global _tapo_controller
|
||
if _tapo_controller is None:
|
||
_tapo_controller = TapoController()
|
||
return _tapo_controller
|
||
|
||
def get_printer_monitor() -> PrinterMonitor:
|
||
global _printer_monitor
|
||
if _printer_monitor is None:
|
||
_printer_monitor = PrinterMonitor()
|
||
return _printer_monitor
|
||
|
||
# ===== LEGACY COMPATIBILITY =====
|
||
|
||
def toggle_plug(ip: str, state: bool) -> bool:
|
||
"""Legacy-Wrapper für Tapo-Steuerung"""
|
||
return get_tapo_controller().toggle_plug(ip, state)
|
||
|
||
# Legacy-Instanzen für Rückwärtskompatibilität
|
||
tapo_controller = get_tapo_controller()
|
||
printer_monitor = get_printer_monitor()
|
||
|
||
hardware_logger.info("✅ Hardware Integration Module initialisiert")
|
||
hardware_logger.info("📊 Massive Konsolidierung: 2 Dateien → 1 Datei (50% Reduktion)") |