Projektarbeit-MYP/backend/utils/printer_monitor.py

423 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Live-Drucker-Monitor für MYP Platform
Überwacht Druckerstatus in Echtzeit mit Session-Caching und automatischer Steckdosen-Initialisierung.
"""
import time
import threading
import requests
import subprocess
import ipaddress
from datetime import datetime, timedelta
from typing import Dict, Tuple, List, Optional
from flask import session
from sqlalchemy import func
from sqlalchemy.orm import Session
import os
from models import get_db_session, Printer, PlugStatusLog
from utils.logging_config import get_logger
from utils.settings import PRINTERS, TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_AUTO_DISCOVERY
from utils.tapo_controller import tapo_controller
# TP-Link Tapo P110 Unterstützung prüfen
try:
from PyP100 import PyP100
TAPO_AVAILABLE = True
except ImportError:
TAPO_AVAILABLE = False
# Logger initialisieren
monitor_logger = get_logger("printer_monitor")
class PrinterMonitor:
"""
Live-Drucker-Monitor mit Session-Caching und automatischer Initialisierung.
"""
def __init__(self):
self.session_cache = {} # Session-basierter Cache für schnelle Zugriffe
self.db_cache = {} # Datenbank-Cache für persistente Daten
self.cache_lock = threading.Lock()
self.last_db_sync = datetime.now()
self.monitoring_active = False
self.monitor_thread = None
self.startup_initialized = False
self.auto_discovered_tapo = False
# Cache-Konfiguration
self.session_cache_ttl = 30 # 30 Sekunden für Session-Cache
self.db_cache_ttl = 300 # 5 Minuten für DB-Cache
monitor_logger.info("🖨️ Drucker-Monitor initialisiert")
# Automatische Steckdosenerkennung in separatem Thread starten, falls aktiviert
if TAPO_AUTO_DISCOVERY:
discovery_thread = threading.Thread(
target=self._run_auto_discovery,
daemon=True,
name="TapoAutoDiscovery"
)
discovery_thread.start()
monitor_logger.info("🔍 Automatische Tapo-Erkennung in separatem Thread gestartet")
def _run_auto_discovery(self):
"""
Führt die automatische Tapo-Erkennung in einem separaten Thread aus.
"""
try:
# Kurze Verzögerung um sicherzustellen, dass die Hauptanwendung Zeit hat zu starten
time.sleep(2)
self.auto_discover_tapo_outlets()
except Exception as e:
monitor_logger.error(f"❌ Fehler bei automatischer Tapo-Erkennung: {str(e)}")
def initialize_all_outlets_on_startup(self) -> Dict[str, bool]:
"""
Schaltet beim Programmstart alle gespeicherten Steckdosen aus (gleicher Startzustand).
Returns:
Dict[str, bool]: Ergebnis der Initialisierung pro Drucker
"""
if self.startup_initialized:
monitor_logger.info("🔄 Steckdosen bereits beim Start initialisiert")
return {}
# Verwende zentrale tapo_controller Implementierung
results = tapo_controller.initialize_all_outlets()
self.startup_initialized = True
return results
def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5, printer_id: int = None) -> bool:
"""
Schaltet eine TP-Link Tapo P110-Steckdose aus.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose (wird überschrieben)
password: Passwort für die Steckdose (wird überschrieben)
timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat)
printer_id: ID des zugehörigen Druckers (für Logging)
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller.turn_off(ip_address, username, password, printer_id)
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
"""
Holt Live-Druckerstatus mit Session- und DB-Caching.
Args:
use_session_cache: Ob Session-Cache verwendet werden soll
Returns:
Dict[int, Dict]: Status-Dict mit Drucker-ID als Key
"""
current_time = datetime.now()
# Session-Cache prüfen (nur wenn aktiviert)
if use_session_cache and hasattr(session, 'get'):
session_key = "printer_status_cache"
session_timestamp_key = "printer_status_timestamp"
cached_data = session.get(session_key)
cached_timestamp = session.get(session_timestamp_key)
if cached_data and cached_timestamp:
cache_age = (current_time - datetime.fromisoformat(cached_timestamp)).total_seconds()
if cache_age < self.session_cache_ttl:
monitor_logger.debug("📋 Verwende Session-Cache für Druckerstatus")
return cached_data
# DB-Cache prüfen
with self.cache_lock:
if self.db_cache and (current_time - self.last_db_sync).total_seconds() < self.db_cache_ttl:
monitor_logger.debug("🗃️ Verwende DB-Cache für Druckerstatus")
# Session-Cache aktualisieren
if use_session_cache and hasattr(session, '__setitem__'):
session["printer_status_cache"] = self.db_cache
session["printer_status_timestamp"] = current_time.isoformat()
return self.db_cache
# Live-Status von Druckern abrufen
monitor_logger.info("🔄 Aktualisiere Live-Druckerstatus...")
status_dict = self._fetch_live_printer_status()
# Caches aktualisieren
with self.cache_lock:
self.db_cache = status_dict
self.last_db_sync = current_time
if use_session_cache and hasattr(session, '__setitem__'):
session["printer_status_cache"] = status_dict
session["printer_status_timestamp"] = current_time.isoformat()
return status_dict
def _fetch_live_printer_status(self) -> Dict[int, Dict]:
"""
Holt den aktuellen Status aller Drucker direkt von den Geräten.
Returns:
Dict[int, Dict]: Status-Dict mit umfassenden Informationen
"""
status_dict = {}
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
# Wenn keine aktiven Drucker vorhanden sind, gebe leeres Dict zurück
if not printers:
monitor_logger.info(" Keine aktiven Drucker gefunden")
db_session.close()
return status_dict
monitor_logger.info(f"🔍 Prüfe Status von {len(printers)} aktiven Druckern...")
# Parallel-Status-Prüfung mit ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
# Sicherstellen, dass max_workers mindestens 1 ist
max_workers = min(max(len(printers), 1), 8)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_printer = {
executor.submit(self._check_single_printer_status, printer): printer
for printer in printers
}
for future in as_completed(future_to_printer, timeout=15):
printer = future_to_printer[future]
try:
status_info = future.result()
status_dict[printer.id] = status_info
# Status in Datenbank aktualisieren
printer.status = status_info["status"]
printer.last_checked = datetime.now()
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Status-Check für Drucker {printer.name}: {str(e)}")
status_dict[printer.id] = {
"id": printer.id,
"name": printer.name,
"status": "offline",
"active": False,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"location": printer.location,
"last_checked": datetime.now().isoformat(),
"error": str(e)
}
# Änderungen in Datenbank speichern
db_session.commit()
db_session.close()
monitor_logger.info(f"✅ Status-Update abgeschlossen für {len(status_dict)} Drucker")
except Exception as e:
monitor_logger.error(f"❌ Kritischer Fehler beim Abrufen des Live-Status: {str(e)}")
return status_dict
def _check_single_printer_status(self, printer: Printer, timeout: int = 7) -> Dict:
"""
Überprüft den Status eines einzelnen Druckers basierend auf der Steckdosen-Logik:
- Steckdose erreichbar aber AUS = Drucker ONLINE (bereit zum Drucken)
- Steckdose erreichbar und AN = Drucker PRINTING (druckt gerade)
- Steckdose nicht erreichbar = Drucker OFFLINE (kritischer Fehler)
Args:
printer: Printer-Objekt aus der Datenbank
timeout: Timeout in Sekunden
Returns:
Dict: Umfassende Status-Informationen
"""
status_info = {
"id": printer.id,
"name": printer.name,
"status": "offline",
"active": False,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"location": printer.location,
"last_checked": datetime.now().isoformat(),
"ping_successful": False,
"outlet_reachable": False,
"outlet_state": "unknown"
}
try:
# 1. Ping-Test für Grundkonnektivität
if printer.plug_ip:
ping_success = self._ping_address(printer.plug_ip, timeout=3)
status_info["ping_successful"] = ping_success
if ping_success:
# 2. Smart Plug Status prüfen
outlet_reachable, outlet_state = self._check_outlet_status(
printer.plug_ip,
printer.plug_username,
printer.plug_password,
timeout,
printer_id=printer.id
)
status_info["outlet_reachable"] = outlet_reachable
status_info["outlet_state"] = outlet_state
# 🎯 KORREKTE LOGIK: Steckdose erreichbar = Drucker funktionsfähig
if outlet_reachable:
if outlet_state == "off":
# Steckdose aus = Drucker ONLINE (bereit zum Drucken)
status_info["status"] = "online"
status_info["active"] = True
monitor_logger.debug(f"{printer.name}: ONLINE (Steckdose aus - bereit zum Drucken)")
elif outlet_state == "on":
# Steckdose an = Drucker PRINTING (druckt gerade)
status_info["status"] = "printing"
status_info["active"] = True
monitor_logger.debug(f"🖨️ {printer.name}: PRINTING (Steckdose an - druckt gerade)")
else:
# Unbekannter Steckdosen-Status
status_info["status"] = "error"
status_info["active"] = False
monitor_logger.warning(f"⚠️ {printer.name}: Unbekannter Steckdosen-Status '{outlet_state}'")
else:
# Steckdose nicht erreichbar = kritischer Fehler
status_info["status"] = "offline"
status_info["active"] = False
monitor_logger.warning(f"{printer.name}: OFFLINE (Steckdose nicht erreichbar)")
else:
# Ping fehlgeschlagen = Netzwerkproblem
status_info["status"] = "unreachable"
status_info["active"] = False
monitor_logger.warning(f"🔌 {printer.name}: UNREACHABLE (Ping fehlgeschlagen)")
else:
# Keine Steckdosen-IP konfiguriert
status_info["status"] = "unconfigured"
status_info["active"] = False
monitor_logger.info(f"⚙️ {printer.name}: UNCONFIGURED (keine Steckdosen-IP)")
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Status-Check für {printer.name}: {str(e)}")
status_info["error"] = str(e)
status_info["status"] = "error"
status_info["active"] = False
return status_info
def _ping_address(self, ip_address: str, timeout: int = 3) -> bool:
"""
Führt einen Konnektivitätstest zu einer IP-Adresse durch.
Verwendet ausschließlich TCP-Verbindung statt Ping, um Encoding-Probleme zu vermeiden.
Args:
ip_address: Zu testende IP-Adresse
timeout: Timeout in Sekunden
Returns:
bool: True wenn Verbindung erfolgreich
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller.ping_address(ip_address, timeout)
def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5, printer_id: int = None) -> Tuple[bool, str]:
"""
Überprüft den Status einer TP-Link Tapo P110-Steckdose.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose
password: Passwort für die Steckdose
timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat)
printer_id: ID des zugehörigen Druckers (für Logging)
Returns:
Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown"
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller.check_outlet_status(ip_address, username, password, printer_id)
def clear_all_caches(self):
"""Löscht alle Caches (Session und DB)."""
with self.cache_lock:
self.db_cache = {}
self.last_db_sync = datetime.now()
if hasattr(session, 'pop'):
session.pop("printer_status_cache", None)
session.pop("printer_status_timestamp", None)
monitor_logger.info("🧹 Alle Drucker-Caches gelöscht")
def get_printer_summary(self) -> Dict[str, int]:
"""
Gibt eine Zusammenfassung der Druckerstatus zurück.
Returns:
Dict[str, int]: Anzahl Drucker pro Status
"""
status_dict = self.get_live_printer_status()
summary = {
"total": len(status_dict),
"online": 0,
"offline": 0,
"printing": 0, # Neuer Status: Drucker druckt gerade
"standby": 0,
"unreachable": 0,
"unconfigured": 0,
"error": 0 # Status für unbekannte Fehler
}
for printer_info in status_dict.values():
status = printer_info.get("status", "offline")
if status in summary:
summary[status] += 1
else:
# Fallback für unbekannte Status
summary["offline"] += 1
return summary
def auto_discover_tapo_outlets(self) -> Dict[str, bool]:
"""
Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk.
Robuste Version mit Timeout-Behandlung und Fehler-Resilience.
Returns:
Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel
"""
if self.auto_discovered_tapo:
monitor_logger.info("🔍 Tapo-Steckdosen wurden bereits erkannt")
return {}
# Verwende zentrale tapo_controller Implementierung
results = tapo_controller.auto_discover_outlets()
self.auto_discovered_tapo = True
return results
def _ensure_tapo_in_database(self, ip_address: str, nickname: str = None) -> bool:
"""
Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert.
Args:
ip_address: IP-Adresse der Steckdose
nickname: Name der Steckdose (optional)
Returns:
bool: True wenn erfolgreich in Datenbank gespeichert/aktualisiert
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller._ensure_outlet_in_database(ip_address, nickname)
# Globale Instanz
printer_monitor = PrinterMonitor()