Files
Projektarbeit-MYP/backend/utils/tapo_status_manager.py

817 lines
31 KiB
Python

"""
Tapo Status Manager - Verwaltung der 3 Steckdosen-Status
Dieser Manager stellt sicher, dass:
1. Alle 6 Drucker/Steckdosen immer angezeigt werden
2. Die 3 Status korrekt verwaltet werden: an, aus, nicht erreichbar
3. Der Status persistent gespeichert wird
4. Die automatische Steuerung basierend auf Jobs funktioniert
"""
from typing import Dict, Tuple, Optional, List
from datetime import datetime, timedelta
import asyncio
from concurrent.futures import ThreadPoolExecutor
import threading
from models import Printer, Job, PlugStatusLog, get_db_session
from utils.logging_config import get_logger
logger = get_logger("tapo_status_manager")
class TapoStatusManager:
"""
Zentraler Manager für Tapo-Steckdosen-Status
"""
# Die 3 möglichen Status-Zustände
STATUS_ON = "on"
STATUS_OFF = "off"
STATUS_UNREACHABLE = "unreachable"
# Status-Mapping für UI
STATUS_DISPLAY = {
STATUS_ON: {"text": "An", "color": "green", "icon": "power"},
STATUS_OFF: {"text": "Aus", "color": "gray", "icon": "power-off"},
STATUS_UNREACHABLE: {"text": "Nicht erreichbar", "color": "red", "icon": "exclamation-triangle"}
}
def __init__(self):
"""Initialisiert den Status-Manager"""
self._status_cache = {}
self._cache_lock = threading.RLock()
self._last_check = {}
self.check_interval = 30 # Sekunden zwischen Status-Checks
# Session-spezifischer Status-Cache für Benutzer-Sessions
self._session_cache = {}
self._session_cache_lock = threading.RLock()
self._session_cache_ttl = 300 # 5 Minuten für Session-Cache
# Thread-Pool für asynchrone Operationen
self._executor = ThreadPoolExecutor(max_workers=6)
logger.info("TapoStatusManager mit Session-Caching initialisiert")
def get_printer_status(self, printer_id: int, force_refresh: bool = False) -> Dict[str, any]:
"""
Gibt den aktuellen Status eines Druckers zurück
Args:
printer_id: ID des Druckers
force_refresh: True = Cache umgehen und echten Netzwerk-Test durchführen
Returns:
Dict mit Status-Informationen
"""
if not force_refresh:
with self._cache_lock:
# Aus Cache holen wenn vorhanden und aktuell
if printer_id in self._status_cache:
cache_data = self._status_cache[printer_id]
if self._is_cache_valid(printer_id):
return cache_data
# Neuen Status abrufen (mit Cache-Invalidierung bei force_refresh)
if force_refresh:
self.invalidate_cache(printer_id)
return self._fetch_printer_status(printer_id)
def get_all_printer_status(self, force_refresh: bool = False) -> List[Dict[str, any]]:
"""
Gibt den Status aller Drucker zurück
Args:
force_refresh: True = Cache für alle Drucker umgehen
Returns:
Liste mit Status-Informationen aller Drucker
"""
try:
db_session = get_db_session()
printers = db_session.query(Printer).all()
status_list = []
# Status für jeden Drucker abrufen
for printer in printers:
status = self.get_printer_status(printer.id, force_refresh=force_refresh)
status_list.append(status)
db_session.close()
return status_list
except Exception as e:
logger.error(f"Fehler beim Abrufen aller Drucker-Status: {str(e)}")
return []
def _fetch_printer_status(self, printer_id: int) -> Dict[str, any]:
"""
Holt den aktuellen Status eines Druckers
Args:
printer_id: ID des Druckers
Returns:
Dict mit Status-Informationen
"""
try:
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
logger.warning(f"Drucker {printer_id} nicht gefunden")
return self._create_error_status(printer_id, "Drucker nicht gefunden")
# Basis-Status erstellen
status_info = {
"id": printer.id,
"name": printer.name,
"model": printer.model,
"location": printer.location,
"ip_address": printer.ip_address,
"has_plug": bool(printer.plug_ip),
"plug_ip": printer.plug_ip,
"active": printer.active,
"last_checked": datetime.now()
}
# Wenn keine Steckdose konfiguriert
if not printer.plug_ip:
status_info.update({
"plug_status": "no_plug",
"plug_reachable": False,
"power_status": None,
"can_control": False
})
else:
# Tapo-Status abrufen
plug_status = self._check_tapo_status(printer)
status_info.update(plug_status)
# Aktuelle Jobs prüfen
active_job = self._get_active_job(printer_id, db_session)
if active_job:
status_info["current_job"] = {
"id": active_job.id,
"name": active_job.name,
"user": active_job.user.name,
"start_at": active_job.start_at.isoformat(),
"end_at": active_job.end_at.isoformat(),
"status": active_job.status
}
else:
status_info["current_job"] = None
# Nächster geplanter Job
next_job = self._get_next_job(printer_id, db_session)
if next_job:
status_info["next_job"] = {
"id": next_job.id,
"name": next_job.name,
"user": next_job.user.name,
"start_at": next_job.start_at.isoformat(),
"starts_in_minutes": int((next_job.start_at - datetime.now()).total_seconds() / 60)
}
else:
status_info["next_job"] = None
# Status in Cache speichern
with self._cache_lock:
self._status_cache[printer_id] = status_info
self._last_check[printer_id] = datetime.now()
# Status in Datenbank loggen
self._log_status(printer, status_info.get("plug_status", "unknown"))
db_session.close()
return status_info
except Exception as e:
logger.error(f"Fehler beim Abrufen des Status für Drucker {printer_id}: {str(e)}")
return self._create_error_status(printer_id, str(e))
def _check_tapo_status(self, printer: Printer) -> Dict[str, any]:
"""
Prüft den Tapo-Steckdosen-Status mit erweiterten Fallback-Mechanismen
Args:
printer: Printer-Objekt
Returns:
Dict mit Tapo-Status
"""
try:
# Tapo-Controller importieren
from utils.hardware_integration import tapo_controller
if not tapo_controller:
logger.warning(f"Tapo-Controller nicht verfügbar für {printer.name}")
return {
"plug_status": self.STATUS_UNREACHABLE,
"plug_reachable": False,
"power_status": None,
"can_control": False,
"error": "Tapo-Controller nicht verfügbar"
}
# Status abrufen mit Debug-Informationen
logger.debug(f"Prüfe Tapo-Status für {printer.name} ({printer.plug_ip})")
reachable, plug_status = tapo_controller.check_outlet_status(
printer.plug_ip,
printer_id=printer.id,
debug=False # Weniger Debug-Output für bessere Performance
)
if reachable:
# Erfolgreiche Verbindung
logger.debug(f"✅ Tapo-Steckdose {printer.plug_ip} erreichbar - Status: {plug_status}")
# Status normalisieren
if plug_status in ["on", "true", "1", True]:
normalized_status = self.STATUS_ON
power_status = "on"
elif plug_status in ["off", "false", "0", False]:
normalized_status = self.STATUS_OFF
power_status = "off"
else:
# Unbekannter Status, aber erreichbar
normalized_status = self.STATUS_UNREACHABLE
power_status = "unknown"
logger.warning(f"Unbekannter Tapo-Status '{plug_status}' für {printer.name}")
return {
"plug_status": normalized_status,
"plug_reachable": True,
"power_status": power_status,
"can_control": True,
"last_check": datetime.now().isoformat()
}
else:
# Steckdose nicht erreichbar
logger.warning(f"⚠️ Tapo-Steckdose {printer.plug_ip} nicht erreichbar für {printer.name}")
return {
"plug_status": self.STATUS_UNREACHABLE,
"plug_reachable": False,
"power_status": None,
"can_control": False,
"error": "Steckdose nicht erreichbar",
"last_check": datetime.now().isoformat()
}
except ImportError as e:
logger.error(f"Import-Fehler beim Tapo-Controller für {printer.name}: {str(e)}")
return {
"plug_status": self.STATUS_UNREACHABLE,
"plug_reachable": False,
"power_status": None,
"can_control": False,
"error": f"Import-Fehler: {str(e)}",
"fallback_used": True
}
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Prüfen des Tapo-Status für {printer.name}: {str(e)}")
return {
"plug_status": self.STATUS_UNREACHABLE,
"plug_reachable": False,
"power_status": None,
"can_control": False,
"error": str(e),
"last_check": datetime.now().isoformat()
}
def control_plug(self, printer_id: int, action: str) -> Tuple[bool, str]:
"""
Steuert eine Tapo-Steckdose
Args:
printer_id: ID des Druckers
action: "on" oder "off"
Returns:
Tuple (Erfolg, Nachricht)
"""
try:
db_session = get_db_session()
printer = db_session.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
return False, "Drucker nicht gefunden"
if not printer.plug_ip:
return False, "Keine Steckdose konfiguriert"
# Tapo-Controller verwenden
from utils.hardware_integration import tapo_controller
if not tapo_controller:
return False, "Tapo-Controller nicht verfügbar"
# Aktion ausführen
success = False
if action == "on":
success = tapo_controller.turn_on_outlet(printer.plug_ip, printer_id)
elif action == "off":
success = tapo_controller.turn_off_outlet(printer.plug_ip, printer_id)
else:
return False, f"Ungültige Aktion: {action}"
if success:
# Cache invalidieren
with self._cache_lock:
if printer_id in self._status_cache:
del self._status_cache[printer_id]
# Status loggen
self._log_status(printer, action, source="manual")
db_session.close()
return True, f"Steckdose erfolgreich {action}"
else:
db_session.close()
return False, "Steckdose konnte nicht gesteuert werden"
except Exception as e:
logger.error(f"Fehler beim Steuern der Steckdose für Drucker {printer_id}: {str(e)}")
return False, str(e)
def check_and_control_for_jobs(self):
"""
Prüft alle Jobs und steuert Steckdosen entsprechend
Diese Methode sollte regelmäßig vom Scheduler aufgerufen werden
"""
try:
db_session = get_db_session()
now = datetime.now()
# Jobs die starten sollten
jobs_to_start = db_session.query(Job).filter(
Job.status == "scheduled",
Job.start_at <= now
).all()
for job in jobs_to_start:
logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}")
success, msg = self.control_plug(job.printer_id, "on")
if success:
job.status = "running"
logger.info(f"Steckdose für Job {job.id} eingeschaltet")
else:
logger.error(f"Fehler beim Einschalten für Job {job.id}: {msg}")
# Jobs die enden sollten
jobs_to_end = db_session.query(Job).filter(
Job.status == "running",
Job.end_at <= now
).all()
for job in jobs_to_end:
logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}")
success, msg = self.control_plug(job.printer_id, "off")
if success:
job.status = "finished"
job.actual_end_time = now
logger.info(f"Steckdose für Job {job.id} ausgeschaltet")
else:
logger.error(f"Fehler beim Ausschalten für Job {job.id}: {msg}")
db_session.commit()
db_session.close()
except Exception as e:
logger.error(f"Fehler bei der automatischen Job-Steuerung: {str(e)}")
def _get_active_job(self, printer_id: int, db_session) -> Optional[Job]:
"""Gibt den aktuell aktiven Job für einen Drucker zurück"""
return db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status == "running"
).first()
def _get_next_job(self, printer_id: int, db_session) -> Optional[Job]:
"""Gibt den nächsten geplanten Job für einen Drucker zurück"""
return db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status == "scheduled",
Job.start_at > datetime.now()
).order_by(Job.start_at).first()
def _is_cache_valid(self, printer_id: int) -> bool:
"""Prüft ob der Cache noch gültig ist"""
if printer_id not in self._last_check:
return False
age = (datetime.now() - self._last_check[printer_id]).total_seconds()
return age < self.check_interval
def _create_error_status(self, printer_id: int, error: str) -> Dict[str, any]:
"""Erstellt einen Fehler-Status"""
return {
"id": printer_id,
"name": f"Drucker {printer_id}",
"plug_status": self.STATUS_UNREACHABLE,
"plug_reachable": False,
"error": error,
"last_checked": datetime.now()
}
def _log_status(self, printer: Printer, status: str, source: str = "system"):
"""Loggt einen Status in die Datenbank"""
try:
PlugStatusLog.log_status_change(
printer_id=printer.id,
status=status,
source=source,
ip_address=printer.plug_ip
)
except Exception as e:
logger.error(f"Fehler beim Loggen des Status: {str(e)}")
def get_status_for_calendar(self, start_date: datetime, end_date: datetime) -> List[Dict]:
"""
Gibt Status-Informationen für die Kalender-Ansicht zurück
Args:
start_date: Start-Datum
end_date: End-Datum
Returns:
Liste mit Status-Events für den Kalender
"""
try:
db_session = get_db_session()
# Jobs im Zeitraum abrufen
jobs = db_session.query(Job).filter(
Job.start_at <= end_date,
Job.end_at >= start_date
).all()
events = []
for job in jobs:
# Drucker-Status für Job
printer = job.printer
status = self.get_printer_status(printer.id)
event = {
"id": f"job_{job.id}",
"title": f"{printer.name}: {job.name}",
"start": job.start_at.isoformat(),
"end": job.end_at.isoformat(),
"backgroundColor": self._get_status_color(job.status),
"extendedProps": {
"job_id": job.id,
"printer_id": printer.id,
"printer_name": printer.name,
"printer_status": status.get("plug_status", "unknown"),
"job_status": job.status,
"user": job.user.name,
"plug_reachable": status.get("plug_reachable", False)
}
}
events.append(event)
db_session.close()
return events
except Exception as e:
logger.error(f"Fehler beim Abrufen der Kalender-Status: {str(e)}")
return []
def get_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, any]:
"""
Holt den gecachten Status für eine Session
Args:
session_id: Session-ID
printer_ids: Optional - spezifische Drucker-IDs
Returns:
Dict mit Session-spezifischen Status-Daten
"""
try:
with self._session_cache_lock:
session_data = self._session_cache.get(session_id, {})
# Prüfe Cache-Gültigkeit
cache_time = session_data.get('timestamp', datetime.min)
if (datetime.now() - cache_time).total_seconds() > self._session_cache_ttl:
# Cache abgelaufen
self._session_cache.pop(session_id, None)
return self._create_fresh_session_status(session_id, printer_ids)
# Wenn spezifische Drucker angefragt, filtere diese
if printer_ids:
filtered_status = {}
for printer_id in printer_ids:
if str(printer_id) in session_data.get('printers', {}):
filtered_status[str(printer_id)] = session_data['printers'][str(printer_id)]
return {
'timestamp': session_data['timestamp'],
'session_id': session_id,
'printers': filtered_status,
'from_cache': True
}
return session_data
except Exception as e:
logger.error(f"Fehler beim Abrufen des Session-Status: {str(e)}")
return self._create_fresh_session_status(session_id, printer_ids)
def update_session_status(self, session_id: str, printer_id: int = None) -> bool:
"""
Aktualisiert den Session-Status-Cache
Args:
session_id: Session-ID
printer_id: Optional - spezifischer Drucker
Returns:
bool: True wenn erfolgreich
"""
try:
with self._session_cache_lock:
if printer_id:
# Einzelnen Drucker aktualisieren
printer_status = self.get_printer_status(printer_id)
if session_id not in self._session_cache:
self._session_cache[session_id] = {
'timestamp': datetime.now(),
'session_id': session_id,
'printers': {}
}
self._session_cache[session_id]['printers'][str(printer_id)] = printer_status
self._session_cache[session_id]['timestamp'] = datetime.now()
else:
# Alle Drucker aktualisieren
self._session_cache[session_id] = self._create_fresh_session_status(session_id)
logger.debug(f"Session-Status für {session_id} aktualisiert")
return True
except Exception as e:
logger.error(f"Fehler beim Aktualisieren des Session-Status: {str(e)}")
return False
def clear_session_cache(self, session_id: str = None) -> bool:
"""
Löscht Session-Cache
Args:
session_id: Optional - spezifische Session, sonst alle
Returns:
bool: True wenn erfolgreich
"""
try:
with self._session_cache_lock:
if session_id:
self._session_cache.pop(session_id, None)
logger.debug(f"Session-Cache für {session_id} gelöscht")
else:
self._session_cache.clear()
logger.debug("Kompletter Session-Cache gelöscht")
return True
except Exception as e:
logger.error(f"Fehler beim Löschen des Session-Cache: {str(e)}")
return False
def _create_fresh_session_status(self, session_id: str, printer_ids: List[int] = None) -> Dict[str, any]:
"""
Erstellt frischen Session-Status
Args:
session_id: Session-ID
printer_ids: Optional - spezifische Drucker-IDs
Returns:
Dict mit frischen Status-Daten
"""
try:
db_session = get_db_session()
# Alle oder spezifische Drucker laden
if printer_ids:
printers = db_session.query(Printer).filter(Printer.id.in_(printer_ids)).all()
else:
printers = db_session.query(Printer).all()
session_data = {
'timestamp': datetime.now(),
'session_id': session_id,
'printers': {},
'from_cache': False
}
# Status für jeden Drucker abrufen
for printer in printers:
printer_status = self.get_printer_status(printer.id)
session_data['printers'][str(printer.id)] = printer_status
# In Session-Cache speichern
with self._session_cache_lock:
self._session_cache[session_id] = session_data
db_session.close()
return session_data
except Exception as e:
logger.error(f"Fehler beim Erstellen frischen Session-Status: {str(e)}")
return {
'timestamp': datetime.now(),
'session_id': session_id,
'printers': {},
'error': str(e),
'from_cache': False
}
def get_session_cache_stats(self) -> Dict[str, any]:
"""
Gibt Session-Cache-Statistiken zurück
Returns:
Dict mit Cache-Statistiken
"""
try:
with self._session_cache_lock:
stats = {
'total_sessions': len(self._session_cache),
'cache_ttl_seconds': self._session_cache_ttl,
'cache_size_bytes': len(str(self._session_cache)),
'sessions': {}
}
for session_id, data in self._session_cache.items():
stats['sessions'][session_id] = {
'timestamp': data.get('timestamp', datetime.min).isoformat(),
'printer_count': len(data.get('printers', {})),
'age_seconds': (datetime.now() - data.get('timestamp', datetime.now())).total_seconds()
}
return stats
except Exception as e:
logger.error(f"Fehler beim Abrufen der Cache-Statistiken: {str(e)}")
return {'error': str(e)}
def cleanup_expired_session_cache(self) -> int:
"""
Bereinigt abgelaufene Session-Cache-Einträge
Returns:
int: Anzahl gelöschter Einträge
"""
try:
expired_count = 0
current_time = datetime.now()
with self._session_cache_lock:
expired_sessions = []
for session_id, data in self._session_cache.items():
cache_time = data.get('timestamp', datetime.min)
if (current_time - cache_time).total_seconds() > self._session_cache_ttl:
expired_sessions.append(session_id)
for session_id in expired_sessions:
self._session_cache.pop(session_id, None)
expired_count += 1
if expired_count > 0:
logger.info(f"Session-Cache bereinigt: {expired_count} abgelaufene Einträge entfernt")
return expired_count
except Exception as e:
logger.error(f"Fehler beim Bereinigen des Session-Cache: {str(e)}")
return 0
def invalidate_cache(self, printer_id: int = None) -> bool:
"""
Invalidiert Cache für spezifischen Drucker oder alle
Args:
printer_id: Optional - spezifischer Drucker, None = alle Drucker
Returns:
bool: True wenn erfolgreich
"""
try:
with self._cache_lock:
if printer_id is not None:
# Spezifischen Drucker-Cache löschen
self._status_cache.pop(printer_id, None)
self._last_check.pop(printer_id, None)
logger.debug(f"Cache für Drucker {printer_id} invalidiert")
else:
# Alle Caches löschen
self._status_cache.clear()
self._last_check.clear()
logger.info("Kompletter Status-Cache invalidiert")
return True
except Exception as e:
logger.error(f"Fehler beim Invalidieren des Cache: {str(e)}")
return False
def invalidate_all_caches(self) -> bool:
"""
Invalidiert alle Cache-Systeme (Status + Session)
Verwendet bei Netzwerkwechseln oder Force-Refresh
Returns:
bool: True wenn erfolgreich
"""
try:
# Status-Cache invalidieren
self.invalidate_cache()
# Session-Cache invalidieren
self.clear_session_cache()
logger.info("Alle Caches invalidiert (Status + Session)")
return True
except Exception as e:
logger.error(f"Fehler beim Invalidieren aller Caches: {str(e)}")
return False
def force_network_refresh(self) -> Dict[str, any]:
"""
Forciert komplette Netzwerk-Neuprüfung aller Drucker
Invalidiert alle Caches und führt echte Netzwerk-Tests durch
Returns:
Dict mit Refresh-Ergebnissen
"""
try:
logger.info("Starte Force-Network-Refresh für alle Drucker")
# Alle Caches invalidieren
self.invalidate_all_caches()
# Tapo-Controller Cache leeren falls vorhanden
try:
from utils.hardware_integration import tapo_controller
if tapo_controller and hasattr(tapo_controller, 'clear_cache'):
tapo_controller.clear_cache()
logger.debug("Tapo-Controller Cache geleert")
except Exception as e:
logger.warning(f"Tapo-Controller Cache konnte nicht geleert werden: {str(e)}")
# Frischen Status für alle Drucker abrufen
fresh_status = self.get_all_printer_status(force_refresh=True)
# Ergebnisse zusammenfassen
results = {
"success": True,
"timestamp": datetime.now().isoformat(),
"printers_refreshed": len(fresh_status),
"printers": fresh_status,
"message": f"Netzwerk-Status für {len(fresh_status)} Drucker erfolgreich aktualisiert"
}
logger.info(f"Force-Network-Refresh abgeschlossen: {len(fresh_status)} Drucker aktualisiert")
return results
except Exception as e:
logger.error(f"Fehler beim Force-Network-Refresh: {str(e)}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
"message": "Fehler beim Aktualisieren der Netzwerk-Status"
}
def _get_status_color(self, status: str) -> str:
"""Gibt die Farbe für einen Status zurück"""
colors = {
"scheduled": "#3788d8",
"running": "#28a745",
"finished": "#6c757d",
"aborted": "#dc3545"
}
return colors.get(status, "#6c757d")
# Globale Instanz
tapo_status_manager = TapoStatusManager()
def get_tapo_status_manager() -> TapoStatusManager:
"""Gibt die globale TapoStatusManager-Instanz zurück"""
return tapo_status_manager