Projektarbeit-MYP/backend/app/utils/printer_monitor.py

503 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Live-Drucker-Monitor für MYP Platform
Überwacht Druckerstatus in Echtzeit mit Session-Caching und automatischer Steckdosen-Initialisierung.
"""
import time
import threading
import requests
import subprocess
import ipaddress
from datetime import datetime, timedelta
from typing import Dict, Tuple, List, Optional
from flask import session
from sqlalchemy import func
from sqlalchemy.orm import Session
from models import get_db_session, Printer
from utils.logging_config import get_logger
from config.settings import PRINTERS
# Logger initialisieren
monitor_logger = get_logger("printer_monitor")
class PrinterMonitor:
"""
Live-Drucker-Monitor mit Session-Caching und automatischer Initialisierung.
"""
def __init__(self):
self.session_cache = {} # Session-basierter Cache für schnelle Zugriffe
self.db_cache = {} # Datenbank-Cache für persistente Daten
self.cache_lock = threading.Lock()
self.last_db_sync = datetime.now()
self.monitoring_active = False
self.monitor_thread = None
self.startup_initialized = False
# Cache-Konfiguration
self.session_cache_ttl = 30 # 30 Sekunden für Session-Cache
self.db_cache_ttl = 300 # 5 Minuten für DB-Cache
monitor_logger.info("🖨️ Drucker-Monitor initialisiert")
def initialize_all_outlets_on_startup(self) -> Dict[str, bool]:
"""
Schaltet beim Programmstart alle gespeicherten Steckdosen aus (gleicher Startzustand).
Returns:
Dict[str, bool]: Ergebnis der Initialisierung pro Drucker
"""
if self.startup_initialized:
monitor_logger.info("🔄 Steckdosen bereits beim Start initialisiert")
return {}
monitor_logger.info("🚀 Starte Steckdosen-Initialisierung beim Programmstart...")
results = {}
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
if not printers:
monitor_logger.warning("⚠️ Keine aktiven Drucker zur Initialisierung gefunden")
db_session.close()
self.startup_initialized = True
return results
# Alle Steckdosen ausschalten für einheitlichen Startzustand
for printer in printers:
try:
if printer.plug_ip and printer.plug_username and printer.plug_password:
success = self._turn_outlet_off(
printer.plug_ip,
printer.plug_username,
printer.plug_password
)
results[printer.name] = success
if success:
monitor_logger.info(f"{printer.name}: Steckdose ausgeschaltet")
# Status in Datenbank aktualisieren
printer.status = "offline"
printer.last_checked = datetime.now()
else:
monitor_logger.warning(f"{printer.name}: Steckdose konnte nicht ausgeschaltet werden")
else:
monitor_logger.warning(f"⚠️ {printer.name}: Unvollständige Steckdosen-Konfiguration")
results[printer.name] = False
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Initialisierung von {printer.name}: {str(e)}")
results[printer.name] = False
# Änderungen speichern
db_session.commit()
db_session.close()
success_count = sum(1 for success in results.values() if success)
total_count = len(results)
monitor_logger.info(f"🎯 Steckdosen-Initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich")
self.startup_initialized = True
except Exception as e:
monitor_logger.error(f"❌ Kritischer Fehler bei Steckdosen-Initialisierung: {str(e)}")
results = {}
return results
def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5) -> bool:
"""
Schaltet eine Smart-Steckdose aus.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose
password: Passwort für die Steckdose
timeout: Timeout in Sekunden
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
try:
# Für TP-Link Tapo und ähnliche Smart Plugs
auth = (username, password)
# Verschiedene API-Endpunkte versuchen
endpoints = [
f"http://{ip_address}/relay/off",
f"http://{ip_address}/api/relay/off",
f"http://{ip_address}/set?relay=off",
f"http://{ip_address}/control?action=off"
]
for endpoint in endpoints:
try:
response = requests.post(endpoint, auth=auth, timeout=timeout)
if response.status_code in [200, 201, 204]:
monitor_logger.debug(f"✅ Steckdose {ip_address} ausgeschaltet via {endpoint}")
return True
except requests.RequestException:
continue
# Wenn spezifische Endpunkte fehlschlagen, versuche generische JSON-API
try:
payload = {"system": {"set_relay_state": {"state": 0}}}
response = requests.post(
f"http://{ip_address}/api",
json=payload,
auth=auth,
timeout=timeout
)
if response.status_code in [200, 201]:
monitor_logger.debug(f"✅ Steckdose {ip_address} ausgeschaltet via JSON-API")
return True
except requests.RequestException:
pass
except Exception as e:
monitor_logger.debug(f"⚠️ Fehler beim Ausschalten der Steckdose {ip_address}: {str(e)}")
return False
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
"""
Holt Live-Druckerstatus mit Session- und DB-Caching.
Args:
use_session_cache: Ob Session-Cache verwendet werden soll
Returns:
Dict[int, Dict]: Status-Dict mit Drucker-ID als Key
"""
current_time = datetime.now()
# Session-Cache prüfen (nur wenn aktiviert)
if use_session_cache and hasattr(session, 'get'):
session_key = "printer_status_cache"
session_timestamp_key = "printer_status_timestamp"
cached_data = session.get(session_key)
cached_timestamp = session.get(session_timestamp_key)
if cached_data and cached_timestamp:
cache_age = (current_time - datetime.fromisoformat(cached_timestamp)).total_seconds()
if cache_age < self.session_cache_ttl:
monitor_logger.debug("📋 Verwende Session-Cache für Druckerstatus")
return cached_data
# DB-Cache prüfen
with self.cache_lock:
if self.db_cache and (current_time - self.last_db_sync).total_seconds() < self.db_cache_ttl:
monitor_logger.debug("🗃️ Verwende DB-Cache für Druckerstatus")
# Session-Cache aktualisieren
if use_session_cache and hasattr(session, '__setitem__'):
session["printer_status_cache"] = self.db_cache
session["printer_status_timestamp"] = current_time.isoformat()
return self.db_cache
# Live-Status von Druckern abrufen
monitor_logger.info("🔄 Aktualisiere Live-Druckerstatus...")
status_dict = self._fetch_live_printer_status()
# Caches aktualisieren
with self.cache_lock:
self.db_cache = status_dict
self.last_db_sync = current_time
if use_session_cache and hasattr(session, '__setitem__'):
session["printer_status_cache"] = status_dict
session["printer_status_timestamp"] = current_time.isoformat()
return status_dict
def _fetch_live_printer_status(self) -> Dict[int, Dict]:
"""
Holt den aktuellen Status aller Drucker direkt von den Geräten.
Returns:
Dict[int, Dict]: Status-Dict mit umfassenden Informationen
"""
status_dict = {}
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
# Wenn keine aktiven Drucker vorhanden sind, gebe leeres Dict zurück
if not printers:
monitor_logger.info(" Keine aktiven Drucker gefunden")
db_session.close()
return status_dict
monitor_logger.info(f"🔍 Prüfe Status von {len(printers)} aktiven Druckern...")
# Parallel-Status-Prüfung mit ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
# Sicherstellen, dass max_workers mindestens 1 ist
max_workers = min(max(len(printers), 1), 8)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_printer = {
executor.submit(self._check_single_printer_status, printer): printer
for printer in printers
}
for future in as_completed(future_to_printer, timeout=15):
printer = future_to_printer[future]
try:
status_info = future.result()
status_dict[printer.id] = status_info
# Status in Datenbank aktualisieren
printer.status = status_info["status"]
printer.last_checked = datetime.now()
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Status-Check für Drucker {printer.name}: {str(e)}")
status_dict[printer.id] = {
"id": printer.id,
"name": printer.name,
"status": "offline",
"active": False,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"location": printer.location,
"last_checked": datetime.now().isoformat(),
"error": str(e)
}
# Änderungen in Datenbank speichern
db_session.commit()
db_session.close()
monitor_logger.info(f"✅ Status-Update abgeschlossen für {len(status_dict)} Drucker")
except Exception as e:
monitor_logger.error(f"❌ Kritischer Fehler beim Abrufen des Live-Status: {str(e)}")
return status_dict
def _check_single_printer_status(self, printer: Printer, timeout: int = 7) -> Dict:
"""
Überprüft den Status eines einzelnen Druckers.
Args:
printer: Printer-Objekt aus der Datenbank
timeout: Timeout in Sekunden
Returns:
Dict: Umfassende Status-Informationen
"""
status_info = {
"id": printer.id,
"name": printer.name,
"status": "offline",
"active": False,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"location": printer.location,
"last_checked": datetime.now().isoformat(),
"ping_successful": False,
"outlet_reachable": False,
"outlet_state": "unknown"
}
try:
# 1. Ping-Test für Grundkonnektivität
if printer.plug_ip:
ping_success = self._ping_address(printer.plug_ip, timeout=3)
status_info["ping_successful"] = ping_success
if ping_success:
# 2. Smart Plug Status prüfen
outlet_reachable, outlet_state = self._check_outlet_status(
printer.plug_ip,
printer.plug_username,
printer.plug_password,
timeout
)
status_info["outlet_reachable"] = outlet_reachable
status_info["outlet_state"] = outlet_state
if outlet_reachable and outlet_state == "on":
status_info["status"] = "online"
status_info["active"] = True
elif outlet_reachable and outlet_state == "off":
status_info["status"] = "standby"
status_info["active"] = False
else:
status_info["status"] = "offline"
status_info["active"] = False
else:
status_info["status"] = "unreachable"
status_info["active"] = False
else:
status_info["status"] = "unconfigured"
status_info["active"] = False
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Status-Check für {printer.name}: {str(e)}")
status_info["error"] = str(e)
return status_info
def _ping_address(self, ip_address: str, timeout: int = 3) -> bool:
"""
Führt einen Ping-Test zu einer IP-Adresse durch.
Args:
ip_address: Zu testende IP-Adresse
timeout: Timeout in Sekunden
Returns:
bool: True wenn Ping erfolgreich
"""
try:
# IP-Adresse validieren
ipaddress.ip_address(ip_address.strip())
# Platform-spezifische Ping-Befehle
import os
if os.name == 'nt': # Windows
cmd = ['ping', '-n', '1', '-w', str(timeout * 1000), ip_address.strip()]
else: # Unix/Linux/macOS
cmd = ['ping', '-c', '1', '-W', str(timeout), ip_address.strip()]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
errors='ignore',
timeout=timeout + 1
)
return result.returncode == 0
except Exception:
return False
def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5) -> Tuple[bool, str]:
"""
Überprüft den Status einer Smart-Steckdose.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose
password: Passwort für die Steckdose
timeout: Timeout in Sekunden
Returns:
Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown"
"""
try:
auth = (username, password)
# Verschiedene API-Endpunkte für Status-Abfrage versuchen
status_endpoints = [
f"http://{ip_address}/status",
f"http://{ip_address}/api/status",
f"http://{ip_address}/relay/status",
f"http://{ip_address}/state"
]
for endpoint in status_endpoints:
try:
response = requests.get(endpoint, auth=auth, timeout=timeout)
if response.status_code == 200:
try:
data = response.json()
# Verschiedene JSON-Strukturen handhaben
if isinstance(data, dict):
# TP-Link Format
if 'system' in data and 'get_sysinfo' in data['system']:
relay_state = data['system']['get_sysinfo'].get('relay_state')
return True, "on" if relay_state == 1 else "off"
# Direktes Format
if 'relay_state' in data:
return True, "on" if data['relay_state'] in [1, True, "on"] else "off"
if 'state' in data:
return True, "on" if data['state'] in [1, True, "on"] else "off"
if 'power' in data:
return True, "on" if data['power'] in [1, True, "on"] else "off"
# Wenn JSON-Parsing funktioniert, aber Format unbekannt
return True, "unknown"
except ValueError:
# Nicht-JSON Antwort - versuche Text-Parsing
text = response.text.lower()
if "on" in text or "1" in text or "true" in text:
return True, "on"
elif "off" in text or "0" in text or "false" in text:
return True, "off"
return True, "unknown"
except requests.RequestException:
continue
# Wenn spezifische Endpunkte fehlschlagen, einfache Konnektivitätsprüfung
try:
response = requests.get(f"http://{ip_address}", auth=auth, timeout=timeout)
if response.status_code in [200, 401, 403]: # Erreichbar, aber möglicherweise Authentifizierungsproblem
return True, "unknown"
except requests.RequestException:
pass
except Exception as e:
monitor_logger.debug(f"⚠️ Fehler bei Steckdosen-Status-Check {ip_address}: {str(e)}")
return False, "unknown"
def clear_all_caches(self):
"""Löscht alle Caches (Session und DB)."""
with self.cache_lock:
self.db_cache = {}
self.last_db_sync = datetime.now()
if hasattr(session, 'pop'):
session.pop("printer_status_cache", None)
session.pop("printer_status_timestamp", None)
monitor_logger.info("🧹 Alle Drucker-Caches gelöscht")
def get_printer_summary(self) -> Dict[str, int]:
"""
Gibt eine Zusammenfassung der Druckerstatus zurück.
Returns:
Dict[str, int]: Anzahl Drucker pro Status
"""
status_dict = self.get_live_printer_status()
summary = {
"total": len(status_dict),
"online": 0,
"offline": 0,
"standby": 0,
"unreachable": 0,
"unconfigured": 0
}
for printer_info in status_dict.values():
status = printer_info.get("status", "offline")
if status in summary:
summary[status] += 1
else:
summary["offline"] += 1
return summary
# Globale Instanz
printer_monitor = PrinterMonitor()