490 lines
20 KiB
Python
490 lines
20 KiB
Python
"""
|
|
Live-Drucker-Monitor für MYP Platform
|
|
Überwacht Druckerstatus in Echtzeit mit Session-Caching und automatischer Steckdosen-Initialisierung.
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
import requests
|
|
import subprocess
|
|
import ipaddress
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Tuple, List, Optional
|
|
from flask import session
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from models import get_db_session, Printer
|
|
from utils.logging_config import get_logger
|
|
from config.settings import PRINTERS
|
|
|
|
# Logger initialisieren
|
|
monitor_logger = get_logger("printer_monitor")
|
|
|
|
class PrinterMonitor:
|
|
"""
|
|
Live-Drucker-Monitor mit Session-Caching und automatischer Initialisierung.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.session_cache = {} # Session-basierter Cache für schnelle Zugriffe
|
|
self.db_cache = {} # Datenbank-Cache für persistente Daten
|
|
self.cache_lock = threading.Lock()
|
|
self.last_db_sync = datetime.now()
|
|
self.monitoring_active = False
|
|
self.monitor_thread = None
|
|
self.startup_initialized = False
|
|
|
|
# Cache-Konfiguration
|
|
self.session_cache_ttl = 30 # 30 Sekunden für Session-Cache
|
|
self.db_cache_ttl = 300 # 5 Minuten für DB-Cache
|
|
|
|
monitor_logger.info("🖨️ Drucker-Monitor initialisiert")
|
|
|
|
def initialize_all_outlets_on_startup(self) -> Dict[str, bool]:
|
|
"""
|
|
Schaltet beim Programmstart alle gespeicherten Steckdosen aus (gleicher Startzustand).
|
|
|
|
Returns:
|
|
Dict[str, bool]: Ergebnis der Initialisierung pro Drucker
|
|
"""
|
|
if self.startup_initialized:
|
|
monitor_logger.info("🔄 Steckdosen bereits beim Start initialisiert")
|
|
return {}
|
|
|
|
monitor_logger.info("🚀 Starte Steckdosen-Initialisierung beim Programmstart...")
|
|
results = {}
|
|
|
|
try:
|
|
db_session = get_db_session()
|
|
printers = db_session.query(Printer).filter(Printer.active == True).all()
|
|
|
|
if not printers:
|
|
monitor_logger.warning("⚠️ Keine aktiven Drucker zur Initialisierung gefunden")
|
|
db_session.close()
|
|
self.startup_initialized = True
|
|
return results
|
|
|
|
# Alle Steckdosen ausschalten für einheitlichen Startzustand
|
|
for printer in printers:
|
|
try:
|
|
if printer.plug_ip and printer.plug_username and printer.plug_password:
|
|
success = self._turn_outlet_off(
|
|
printer.plug_ip,
|
|
printer.plug_username,
|
|
printer.plug_password
|
|
)
|
|
|
|
results[printer.name] = success
|
|
|
|
if success:
|
|
monitor_logger.info(f"✅ {printer.name}: Steckdose ausgeschaltet")
|
|
# Status in Datenbank aktualisieren
|
|
printer.status = "offline"
|
|
printer.last_checked = datetime.now()
|
|
else:
|
|
monitor_logger.warning(f"❌ {printer.name}: Steckdose konnte nicht ausgeschaltet werden")
|
|
else:
|
|
monitor_logger.warning(f"⚠️ {printer.name}: Unvollständige Steckdosen-Konfiguration")
|
|
results[printer.name] = False
|
|
|
|
except Exception as e:
|
|
monitor_logger.error(f"❌ Fehler bei Initialisierung von {printer.name}: {str(e)}")
|
|
results[printer.name] = False
|
|
|
|
# Änderungen speichern
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
success_count = sum(1 for success in results.values() if success)
|
|
total_count = len(results)
|
|
|
|
monitor_logger.info(f"🎯 Steckdosen-Initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich")
|
|
self.startup_initialized = True
|
|
|
|
except Exception as e:
|
|
monitor_logger.error(f"❌ Kritischer Fehler bei Steckdosen-Initialisierung: {str(e)}")
|
|
results = {}
|
|
|
|
return results
|
|
|
|
def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5) -> bool:
|
|
"""
|
|
Schaltet eine Smart-Steckdose aus.
|
|
|
|
Args:
|
|
ip_address: IP-Adresse der Steckdose
|
|
username: Benutzername für die Steckdose
|
|
password: Passwort für die Steckdose
|
|
timeout: Timeout in Sekunden
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich ausgeschaltet
|
|
"""
|
|
try:
|
|
# Für TP-Link Tapo und ähnliche Smart Plugs
|
|
auth = (username, password)
|
|
|
|
# Verschiedene API-Endpunkte versuchen
|
|
endpoints = [
|
|
f"http://{ip_address}/relay/off",
|
|
f"http://{ip_address}/api/relay/off",
|
|
f"http://{ip_address}/set?relay=off",
|
|
f"http://{ip_address}/control?action=off"
|
|
]
|
|
|
|
for endpoint in endpoints:
|
|
try:
|
|
response = requests.post(endpoint, auth=auth, timeout=timeout)
|
|
if response.status_code in [200, 201, 204]:
|
|
monitor_logger.debug(f"✅ Steckdose {ip_address} ausgeschaltet via {endpoint}")
|
|
return True
|
|
except requests.RequestException:
|
|
continue
|
|
|
|
# Wenn spezifische Endpunkte fehlschlagen, versuche generische JSON-API
|
|
try:
|
|
payload = {"system": {"set_relay_state": {"state": 0}}}
|
|
response = requests.post(
|
|
f"http://{ip_address}/api",
|
|
json=payload,
|
|
auth=auth,
|
|
timeout=timeout
|
|
)
|
|
if response.status_code in [200, 201]:
|
|
monitor_logger.debug(f"✅ Steckdose {ip_address} ausgeschaltet via JSON-API")
|
|
return True
|
|
except requests.RequestException:
|
|
pass
|
|
|
|
except Exception as e:
|
|
monitor_logger.debug(f"⚠️ Fehler beim Ausschalten der Steckdose {ip_address}: {str(e)}")
|
|
|
|
return False
|
|
|
|
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
|
|
"""
|
|
Holt Live-Druckerstatus mit Session- und DB-Caching.
|
|
|
|
Args:
|
|
use_session_cache: Ob Session-Cache verwendet werden soll
|
|
|
|
Returns:
|
|
Dict[int, Dict]: Status-Dict mit Drucker-ID als Key
|
|
"""
|
|
current_time = datetime.now()
|
|
|
|
# Session-Cache prüfen (nur wenn aktiviert)
|
|
if use_session_cache and hasattr(session, 'get'):
|
|
session_key = "printer_status_cache"
|
|
session_timestamp_key = "printer_status_timestamp"
|
|
|
|
cached_data = session.get(session_key)
|
|
cached_timestamp = session.get(session_timestamp_key)
|
|
|
|
if cached_data and cached_timestamp:
|
|
cache_age = (current_time - datetime.fromisoformat(cached_timestamp)).total_seconds()
|
|
if cache_age < self.session_cache_ttl:
|
|
monitor_logger.debug("📋 Verwende Session-Cache für Druckerstatus")
|
|
return cached_data
|
|
|
|
# DB-Cache prüfen
|
|
with self.cache_lock:
|
|
if self.db_cache and (current_time - self.last_db_sync).total_seconds() < self.db_cache_ttl:
|
|
monitor_logger.debug("🗃️ Verwende DB-Cache für Druckerstatus")
|
|
|
|
# Session-Cache aktualisieren
|
|
if use_session_cache and hasattr(session, '__setitem__'):
|
|
session["printer_status_cache"] = self.db_cache
|
|
session["printer_status_timestamp"] = current_time.isoformat()
|
|
|
|
return self.db_cache
|
|
|
|
# Live-Status von Druckern abrufen
|
|
monitor_logger.info("🔄 Aktualisiere Live-Druckerstatus...")
|
|
status_dict = self._fetch_live_printer_status()
|
|
|
|
# Caches aktualisieren
|
|
with self.cache_lock:
|
|
self.db_cache = status_dict
|
|
self.last_db_sync = current_time
|
|
|
|
if use_session_cache and hasattr(session, '__setitem__'):
|
|
session["printer_status_cache"] = status_dict
|
|
session["printer_status_timestamp"] = current_time.isoformat()
|
|
|
|
return status_dict
|
|
|
|
def _fetch_live_printer_status(self) -> Dict[int, Dict]:
|
|
"""
|
|
Holt den aktuellen Status aller Drucker direkt von den Geräten.
|
|
|
|
Returns:
|
|
Dict[int, Dict]: Status-Dict mit umfassenden Informationen
|
|
"""
|
|
status_dict = {}
|
|
|
|
try:
|
|
db_session = get_db_session()
|
|
printers = db_session.query(Printer).filter(Printer.active == True).all()
|
|
|
|
# Parallel-Status-Prüfung mit ThreadPoolExecutor
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
with ThreadPoolExecutor(max_workers=min(len(printers), 8)) as executor:
|
|
future_to_printer = {
|
|
executor.submit(self._check_single_printer_status, printer): printer
|
|
for printer in printers
|
|
}
|
|
|
|
for future in as_completed(future_to_printer, timeout=15):
|
|
printer = future_to_printer[future]
|
|
try:
|
|
status_info = future.result()
|
|
status_dict[printer.id] = status_info
|
|
|
|
# Status in Datenbank aktualisieren
|
|
printer.status = status_info["status"]
|
|
printer.last_checked = datetime.now()
|
|
|
|
except Exception as e:
|
|
monitor_logger.error(f"❌ Fehler bei Status-Check für Drucker {printer.name}: {str(e)}")
|
|
status_dict[printer.id] = {
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"status": "offline",
|
|
"active": False,
|
|
"ip_address": printer.ip_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"location": printer.location,
|
|
"last_checked": datetime.now().isoformat(),
|
|
"error": str(e)
|
|
}
|
|
|
|
# Änderungen in Datenbank speichern
|
|
db_session.commit()
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
monitor_logger.error(f"❌ Kritischer Fehler beim Abrufen des Live-Status: {str(e)}")
|
|
|
|
return status_dict
|
|
|
|
def _check_single_printer_status(self, printer: Printer, timeout: int = 7) -> Dict:
|
|
"""
|
|
Überprüft den Status eines einzelnen Druckers.
|
|
|
|
Args:
|
|
printer: Printer-Objekt aus der Datenbank
|
|
timeout: Timeout in Sekunden
|
|
|
|
Returns:
|
|
Dict: Umfassende Status-Informationen
|
|
"""
|
|
status_info = {
|
|
"id": printer.id,
|
|
"name": printer.name,
|
|
"status": "offline",
|
|
"active": False,
|
|
"ip_address": printer.ip_address,
|
|
"plug_ip": printer.plug_ip,
|
|
"location": printer.location,
|
|
"last_checked": datetime.now().isoformat(),
|
|
"ping_successful": False,
|
|
"outlet_reachable": False,
|
|
"outlet_state": "unknown"
|
|
}
|
|
|
|
try:
|
|
# 1. Ping-Test für Grundkonnektivität
|
|
if printer.plug_ip:
|
|
ping_success = self._ping_address(printer.plug_ip, timeout=3)
|
|
status_info["ping_successful"] = ping_success
|
|
|
|
if ping_success:
|
|
# 2. Smart Plug Status prüfen
|
|
outlet_reachable, outlet_state = self._check_outlet_status(
|
|
printer.plug_ip,
|
|
printer.plug_username,
|
|
printer.plug_password,
|
|
timeout
|
|
)
|
|
|
|
status_info["outlet_reachable"] = outlet_reachable
|
|
status_info["outlet_state"] = outlet_state
|
|
|
|
if outlet_reachable and outlet_state == "on":
|
|
status_info["status"] = "online"
|
|
status_info["active"] = True
|
|
elif outlet_reachable and outlet_state == "off":
|
|
status_info["status"] = "standby"
|
|
status_info["active"] = False
|
|
else:
|
|
status_info["status"] = "offline"
|
|
status_info["active"] = False
|
|
else:
|
|
status_info["status"] = "unreachable"
|
|
status_info["active"] = False
|
|
else:
|
|
status_info["status"] = "unconfigured"
|
|
status_info["active"] = False
|
|
|
|
except Exception as e:
|
|
monitor_logger.error(f"❌ Fehler bei Status-Check für {printer.name}: {str(e)}")
|
|
status_info["error"] = str(e)
|
|
|
|
return status_info
|
|
|
|
def _ping_address(self, ip_address: str, timeout: int = 3) -> bool:
|
|
"""
|
|
Führt einen Ping-Test zu einer IP-Adresse durch.
|
|
|
|
Args:
|
|
ip_address: Zu testende IP-Adresse
|
|
timeout: Timeout in Sekunden
|
|
|
|
Returns:
|
|
bool: True wenn Ping erfolgreich
|
|
"""
|
|
try:
|
|
# IP-Adresse validieren
|
|
ipaddress.ip_address(ip_address.strip())
|
|
|
|
# Platform-spezifische Ping-Befehle
|
|
import os
|
|
if os.name == 'nt': # Windows
|
|
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()]
|
|
else: # Unix/Linux/macOS
|
|
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding='utf-8',
|
|
errors='ignore',
|
|
timeout=timeout + 1
|
|
)
|
|
|
|
return result.returncode == 0
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5) -> Tuple[bool, str]:
|
|
"""
|
|
Überprüft den Status einer Smart-Steckdose.
|
|
|
|
Args:
|
|
ip_address: IP-Adresse der Steckdose
|
|
username: Benutzername für die Steckdose
|
|
password: Passwort für die Steckdose
|
|
timeout: Timeout in Sekunden
|
|
|
|
Returns:
|
|
Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown"
|
|
"""
|
|
try:
|
|
auth = (username, password)
|
|
|
|
# Verschiedene API-Endpunkte für Status-Abfrage versuchen
|
|
status_endpoints = [
|
|
f"http://{ip_address}/status",
|
|
f"http://{ip_address}/api/status",
|
|
f"http://{ip_address}/relay/status",
|
|
f"http://{ip_address}/state"
|
|
]
|
|
|
|
for endpoint in status_endpoints:
|
|
try:
|
|
response = requests.get(endpoint, auth=auth, timeout=timeout)
|
|
if response.status_code == 200:
|
|
try:
|
|
data = response.json()
|
|
|
|
# Verschiedene JSON-Strukturen handhaben
|
|
if isinstance(data, dict):
|
|
# TP-Link Format
|
|
if 'system' in data and 'get_sysinfo' in data['system']:
|
|
relay_state = data['system']['get_sysinfo'].get('relay_state')
|
|
return True, "on" if relay_state == 1 else "off"
|
|
|
|
# Direktes Format
|
|
if 'relay_state' in data:
|
|
return True, "on" if data['relay_state'] in [1, True, "on"] else "off"
|
|
|
|
if 'state' in data:
|
|
return True, "on" if data['state'] in [1, True, "on"] else "off"
|
|
|
|
if 'power' in data:
|
|
return True, "on" if data['power'] in [1, True, "on"] else "off"
|
|
|
|
# Wenn JSON-Parsing funktioniert, aber Format unbekannt
|
|
return True, "unknown"
|
|
|
|
except ValueError:
|
|
# Nicht-JSON Antwort - versuche Text-Parsing
|
|
text = response.text.lower()
|
|
if "on" in text or "1" in text or "true" in text:
|
|
return True, "on"
|
|
elif "off" in text or "0" in text or "false" in text:
|
|
return True, "off"
|
|
return True, "unknown"
|
|
|
|
except requests.RequestException:
|
|
continue
|
|
|
|
# Wenn spezifische Endpunkte fehlschlagen, einfache Konnektivitätsprüfung
|
|
try:
|
|
response = requests.get(f"http://{ip_address}", auth=auth, timeout=timeout)
|
|
if response.status_code in [200, 401, 403]: # Erreichbar, aber möglicherweise Authentifizierungsproblem
|
|
return True, "unknown"
|
|
except requests.RequestException:
|
|
pass
|
|
|
|
except Exception as e:
|
|
monitor_logger.debug(f"⚠️ Fehler bei Steckdosen-Status-Check {ip_address}: {str(e)}")
|
|
|
|
return False, "unknown"
|
|
|
|
def clear_all_caches(self):
|
|
"""Löscht alle Caches (Session und DB)."""
|
|
with self.cache_lock:
|
|
self.db_cache = {}
|
|
self.last_db_sync = datetime.now()
|
|
|
|
if hasattr(session, 'pop'):
|
|
session.pop("printer_status_cache", None)
|
|
session.pop("printer_status_timestamp", None)
|
|
|
|
monitor_logger.info("🧹 Alle Drucker-Caches gelöscht")
|
|
|
|
def get_printer_summary(self) -> Dict[str, int]:
|
|
"""
|
|
Gibt eine Zusammenfassung der Druckerstatus zurück.
|
|
|
|
Returns:
|
|
Dict[str, int]: Anzahl Drucker pro Status
|
|
"""
|
|
status_dict = self.get_live_printer_status()
|
|
|
|
summary = {
|
|
"total": len(status_dict),
|
|
"online": 0,
|
|
"offline": 0,
|
|
"standby": 0,
|
|
"unreachable": 0,
|
|
"unconfigured": 0
|
|
}
|
|
|
|
for printer_info in status_dict.values():
|
|
status = printer_info.get("status", "offline")
|
|
if status in summary:
|
|
summary[status] += 1
|
|
else:
|
|
summary["offline"] += 1
|
|
|
|
return summary
|
|
|
|
# Globale Instanz
|
|
printer_monitor = PrinterMonitor() |