🎉 Improved backend configuration and documentation 🖥️📚

This commit is contained in:
2025-06-02 14:16:23 +02:00
parent 3a0bd3b554
commit f2928b97fc
9 changed files with 2067 additions and 8 deletions

View File

@ -0,0 +1,625 @@
#!/usr/bin/env python3
"""
Erweiterte Druckerkonflikt-Management-Engine - MYP Platform
Dieses Modul behandelt alle Arten von Druckerkonflikten:
- Zeitüberschneidungen
- Ressourcenkonflikte
- Prioritätskonflikte
- Automatische Lösungsfindung
- Benutzerbenachrichtigungen
"""
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass
from enum import Enum
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from models import Job, Printer, User
from database.db_manager import get_cached_session
# Logging setup
logger = logging.getLogger(__name__)
class ConflictType(Enum):
"""Konflikttypen im System"""
TIME_OVERLAP = "zeitüberschneidung"
PRINTER_OFFLINE = "drucker_offline"
RESOURCE_UNAVAILABLE = "ressource_nicht_verfügbar"
PRIORITY_CONFLICT = "prioritätskonflikt"
MAINTENANCE_CONFLICT = "wartungskonflikt"
class ConflictSeverity(Enum):
"""Schweregrade von Konflikten"""
CRITICAL = "kritisch" # Verhindert Job-Ausführung komplett
HIGH = "hoch" # Beeinträchtigt Job-Qualität stark
MEDIUM = "mittel" # Beeinträchtigt Job-Effizienz
LOW = "niedrig" # Geringfügige Beeinträchtigung
INFO = "information" # Nur informativ
class ResolutionStrategy(Enum):
"""Lösungsstrategien für Konflikte"""
AUTO_REASSIGN = "automatische_neuzuweisung"
TIME_SHIFT = "zeitverschiebung"
PRIORITY_PREEMPTION = "prioritäts_verdrängung"
QUEUE_PLACEMENT = "warteschlange"
MANUAL_INTERVENTION = "manuelle_behandlung"
RESOURCE_SUBSTITUTION = "ressourcen_ersatz"
@dataclass
class ConflictDetails:
"""Detaillierte Konfliktinformationen"""
conflict_type: ConflictType
severity: ConflictSeverity
affected_job_id: int
conflicting_job_ids: List[int]
affected_printer_id: Optional[int]
conflict_start: datetime
conflict_end: datetime
description: str
suggested_solutions: List[Dict]
estimated_impact: str
auto_resolvable: bool
@dataclass
class ConflictResolution:
"""Ergebnis einer Konfliktlösung"""
success: bool
strategy_used: ResolutionStrategy
new_printer_id: Optional[int]
new_start_time: Optional[datetime]
new_end_time: Optional[datetime]
affected_jobs: List[int]
user_notification_required: bool
message: str
confidence_score: float
class ConflictManager:
"""Zentrale Konfliktmanagement-Engine"""
def __init__(self):
self.priority_weights = {
'urgent': 4,
'high': 3,
'normal': 2,
'low': 1
}
self.time_slot_preferences = {
'night_shift': {'start': 18, 'end': 6, 'bonus': 25},
'day_shift': {'start': 8, 'end': 17, 'bonus': 15},
'transition': {'start': 6, 'end': 8, 'bonus': 5}
}
self.conflict_resolution_timeout = 300 # 5 Minuten
def detect_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""
Erkennt alle möglichen Konflikte für einen geplanten Job
Args:
job_data: Job-Informationen (printer_id, start_time, end_time, priority)
db_session: Datenbankverbindung
Returns:
Liste aller erkannten Konflikte
"""
conflicts = []
# 1. Zeitüberschneidungs-Konflikte prüfen
time_conflicts = self._detect_time_conflicts(job_data, db_session)
conflicts.extend(time_conflicts)
# 2. Drucker-Verfügbarkeits-Konflikte prüfen
printer_conflicts = self._detect_printer_conflicts(job_data, db_session)
conflicts.extend(printer_conflicts)
# 3. Ressourcen-Konflikte prüfen
resource_conflicts = self._detect_resource_conflicts(job_data, db_session)
conflicts.extend(resource_conflicts)
# 4. Prioritäts-Konflikte prüfen
priority_conflicts = self._detect_priority_conflicts(job_data, db_session)
conflicts.extend(priority_conflicts)
logger.info(f"🔍 Konfliktanalyse abgeschlossen: {len(conflicts)} Konflikte erkannt")
return conflicts
def _detect_time_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Zeitüberschneidungs-Konflikte"""
conflicts = []
printer_id = job_data.get('printer_id')
start_time = job_data.get('start_time')
end_time = job_data.get('end_time')
if not all([printer_id, start_time, end_time]):
return conflicts
# Konflikthafte Jobs finden
conflicting_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).all()
for conflicting_job in conflicting_jobs:
# Konflikt-Schweregrad bestimmen
overlap_duration = self._calculate_overlap_duration(
start_time, end_time,
conflicting_job.start_at, conflicting_job.end_at
)
if overlap_duration.total_seconds() > 3600: # > 1 Stunde
severity = ConflictSeverity.CRITICAL
elif overlap_duration.total_seconds() > 1800: # > 30 Minuten
severity = ConflictSeverity.HIGH
else:
severity = ConflictSeverity.MEDIUM
# Lösungsvorschläge generieren
suggestions = self._generate_time_conflict_solutions(
job_data, conflicting_job, db_session
)
conflict = ConflictDetails(
conflict_type=ConflictType.TIME_OVERLAP,
severity=severity,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[conflicting_job.id],
affected_printer_id=printer_id,
conflict_start=max(start_time, conflicting_job.start_at),
conflict_end=min(end_time, conflicting_job.end_at),
description=f"Zeitüberschneidung mit Job '{conflicting_job.name}' "
f"({overlap_duration.total_seconds()/60:.0f} Minuten)",
suggested_solutions=suggestions,
estimated_impact=f"Verzögerung von {overlap_duration.total_seconds()/60:.0f} Minuten",
auto_resolvable=len(suggestions) > 0
)
conflicts.append(conflict)
return conflicts
def _detect_printer_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Drucker-Verfügbarkeits-Konflikte"""
conflicts = []
printer_id = job_data.get('printer_id')
if not printer_id:
return conflicts
printer = db_session.query(Printer).filter_by(id=printer_id).first()
if not printer:
conflict = ConflictDetails(
conflict_type=ConflictType.PRINTER_OFFLINE,
severity=ConflictSeverity.CRITICAL,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[],
affected_printer_id=printer_id,
conflict_start=job_data.get('start_time'),
conflict_end=job_data.get('end_time'),
description=f"Drucker ID {printer_id} existiert nicht",
suggested_solutions=[],
estimated_impact="Job kann nicht ausgeführt werden",
auto_resolvable=False
)
conflicts.append(conflict)
return conflicts
# Drucker-Status prüfen
if not printer.active:
suggestions = self._generate_printer_alternative_solutions(job_data, db_session)
conflict = ConflictDetails(
conflict_type=ConflictType.PRINTER_OFFLINE,
severity=ConflictSeverity.HIGH,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[],
affected_printer_id=printer_id,
conflict_start=job_data.get('start_time'),
conflict_end=job_data.get('end_time'),
description=f"Drucker '{printer.name}' ist offline oder nicht aktiv",
suggested_solutions=suggestions,
estimated_impact="Automatische Neuzuweisung erforderlich",
auto_resolvable=len(suggestions) > 0
)
conflicts.append(conflict)
return conflicts
def _detect_resource_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Ressourcen-Verfügbarkeits-Konflikte"""
conflicts = []
# TODO: Implementierung für Material-, Personal- und andere Ressourcenkonflikte
# Aktuell Platzhalter für zukünftige Erweiterungen
return conflicts
def _detect_priority_conflicts(self, job_data: Dict, db_session: Session) -> List[ConflictDetails]:
"""Erkennt Prioritäts-basierte Konflikte"""
conflicts = []
job_priority = job_data.get('priority', 'normal')
if job_priority not in ['urgent', 'high']:
return conflicts # Nur hohe Prioritäten können andere verdrängen
printer_id = job_data.get('printer_id')
start_time = job_data.get('start_time')
end_time = job_data.get('end_time')
if not all([printer_id, start_time, end_time]):
return conflicts
# Niedrigerprioisierte Jobs im gleichen Zeitraum finden
lower_priority_jobs = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).all()
for existing_job in lower_priority_jobs:
existing_priority = getattr(existing_job, 'priority', 'normal')
existing_weight = self.priority_weights.get(existing_priority, 2)
new_weight = self.priority_weights.get(job_priority, 2)
if new_weight > existing_weight:
suggestions = self._generate_priority_conflict_solutions(
job_data, existing_job, db_session
)
conflict = ConflictDetails(
conflict_type=ConflictType.PRIORITY_CONFLICT,
severity=ConflictSeverity.MEDIUM,
affected_job_id=job_data.get('job_id', 0),
conflicting_job_ids=[existing_job.id],
affected_printer_id=printer_id,
conflict_start=start_time,
conflict_end=end_time,
description=f"Höherpriorer Job verdrängt '{existing_job.name}' "
f"({job_priority} > {existing_priority})",
suggested_solutions=suggestions,
estimated_impact="Umplanung eines bestehenden Jobs erforderlich",
auto_resolvable=True
)
conflicts.append(conflict)
return conflicts
def resolve_conflicts(self, conflicts: List[ConflictDetails],
job_data: Dict, db_session: Session) -> List[ConflictResolution]:
"""
Löst alle erkannten Konflikte automatisch oder semi-automatisch
Args:
conflicts: Liste der zu lösenden Konflikte
job_data: Job-Informationen
db_session: Datenbankverbindung
Returns:
Liste der Konfliktlösungen
"""
resolutions = []
# Konflikte nach Schweregrad sortieren (kritische zuerst)
sorted_conflicts = sorted(conflicts,
key=lambda c: list(ConflictSeverity).index(c.severity))
for conflict in sorted_conflicts:
if conflict.auto_resolvable and conflict.suggested_solutions:
resolution = self._auto_resolve_conflict(conflict, job_data, db_session)
resolutions.append(resolution)
else:
# Manuelle Behandlung erforderlich
resolution = ConflictResolution(
success=False,
strategy_used=ResolutionStrategy.MANUAL_INTERVENTION,
new_printer_id=None,
new_start_time=None,
new_end_time=None,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Manueller Eingriff erforderlich: {conflict.description}",
confidence_score=0.0
)
resolutions.append(resolution)
logger.info(f"🔧 Konfliktlösung abgeschlossen: {len(resolutions)} Konflikte bearbeitet")
return resolutions
def _auto_resolve_conflict(self, conflict: ConflictDetails,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Automatische Konfliktlösung"""
# Beste Lösung aus Vorschlägen wählen
best_solution = max(conflict.suggested_solutions,
key=lambda s: s.get('confidence', 0))
strategy = ResolutionStrategy(best_solution['strategy'])
try:
if strategy == ResolutionStrategy.AUTO_REASSIGN:
return self._execute_auto_reassignment(conflict, best_solution, job_data, db_session)
elif strategy == ResolutionStrategy.TIME_SHIFT:
return self._execute_time_shift(conflict, best_solution, job_data, db_session)
elif strategy == ResolutionStrategy.PRIORITY_PREEMPTION:
return self._execute_priority_preemption(conflict, best_solution, job_data, db_session)
else:
raise ValueError(f"Unbekannte Strategie: {strategy}")
except Exception as e:
logger.error(f"❌ Fehler bei automatischer Konfliktlösung: {str(e)}")
return ConflictResolution(
success=False,
strategy_used=strategy,
new_printer_id=None,
new_start_time=None,
new_end_time=None,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Automatische Lösung fehlgeschlagen: {str(e)}",
confidence_score=0.0
)
def _execute_auto_reassignment(self, conflict: ConflictDetails, solution: Dict,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Führt automatische Druckerzuweisung durch"""
new_printer_id = solution['new_printer_id']
printer = db_session.query(Printer).filter_by(id=new_printer_id).first()
if not printer or not printer.active:
return ConflictResolution(
success=False,
strategy_used=ResolutionStrategy.AUTO_REASSIGN,
new_printer_id=None,
new_start_time=None,
new_end_time=None,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message="Alternativer Drucker nicht mehr verfügbar",
confidence_score=0.0
)
return ConflictResolution(
success=True,
strategy_used=ResolutionStrategy.AUTO_REASSIGN,
new_printer_id=new_printer_id,
new_start_time=job_data.get('start_time'),
new_end_time=job_data.get('end_time'),
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Job automatisch zu Drucker '{printer.name}' verschoben",
confidence_score=solution.get('confidence', 0.8)
)
def _execute_time_shift(self, conflict: ConflictDetails, solution: Dict,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Führt Zeitverschiebung durch"""
new_start = solution['new_start_time']
new_end = solution['new_end_time']
return ConflictResolution(
success=True,
strategy_used=ResolutionStrategy.TIME_SHIFT,
new_printer_id=job_data.get('printer_id'),
new_start_time=new_start,
new_end_time=new_end,
affected_jobs=[conflict.affected_job_id],
user_notification_required=True,
message=f"Job zeitlich verschoben: {new_start.strftime('%H:%M')} - {new_end.strftime('%H:%M')}",
confidence_score=solution.get('confidence', 0.7)
)
def _execute_priority_preemption(self, conflict: ConflictDetails, solution: Dict,
job_data: Dict, db_session: Session) -> ConflictResolution:
"""Führt Prioritätsverdrängung durch"""
# Bestehenden Job umplanen
conflicting_job_id = conflict.conflicting_job_ids[0]
affected_jobs = [conflict.affected_job_id, conflicting_job_id]
return ConflictResolution(
success=True,
strategy_used=ResolutionStrategy.PRIORITY_PREEMPTION,
new_printer_id=job_data.get('printer_id'),
new_start_time=job_data.get('start_time'),
new_end_time=job_data.get('end_time'),
affected_jobs=affected_jobs,
user_notification_required=True,
message=f"Höherpriorer Job übernimmt Zeitslot, bestehender Job wird umgeplant",
confidence_score=solution.get('confidence', 0.9)
)
# Hilfsmethoden für Lösungsvorschläge
def _generate_time_conflict_solutions(self, job_data: Dict,
conflicting_job: Job, db_session: Session) -> List[Dict]:
"""Generiert Lösungsvorschläge für Zeitkonflikte"""
solutions = []
# 1. Alternative Drucker vorschlagen
alternative_printers = self._find_alternative_printers(job_data, db_session)
for printer_id, confidence in alternative_printers:
printer = db_session.query(Printer).filter_by(id=printer_id).first()
solutions.append({
'strategy': ResolutionStrategy.AUTO_REASSIGN.value,
'new_printer_id': printer_id,
'printer_name': printer.name if printer else f"Drucker {printer_id}",
'confidence': confidence,
'description': f"Automatische Umzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}"
})
# 2. Zeitverschiebung vorschlagen
time_alternatives = self._find_alternative_time_slots(job_data, db_session)
for start_time, end_time, confidence in time_alternatives:
solutions.append({
'strategy': ResolutionStrategy.TIME_SHIFT.value,
'new_start_time': start_time,
'new_end_time': end_time,
'confidence': confidence,
'description': f"Zeitverschiebung: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}"
})
return solutions
def _generate_printer_alternative_solutions(self, job_data: Dict, db_session: Session) -> List[Dict]:
"""Generiert Lösungsvorschläge für Drucker-Ausfälle"""
solutions = []
alternative_printers = self._find_alternative_printers(job_data, db_session)
for printer_id, confidence in alternative_printers:
printer = db_session.query(Printer).filter_by(id=printer_id).first()
solutions.append({
'strategy': ResolutionStrategy.AUTO_REASSIGN.value,
'new_printer_id': printer_id,
'printer_name': printer.name if printer else f"Drucker {printer_id}",
'confidence': confidence,
'description': f"Automatische Neuzuweisung zu {printer.name if printer else f'Drucker {printer_id}'}"
})
return solutions
def _generate_priority_conflict_solutions(self, job_data: Dict,
existing_job: Job, db_session: Session) -> List[Dict]:
"""Generiert Lösungsvorschläge für Prioritätskonflikte"""
solutions = []
# Bestehenden Job umplanen
alternative_slots = self._find_alternative_time_slots({
'printer_id': existing_job.printer_id,
'start_time': existing_job.start_at,
'end_time': existing_job.end_at,
'duration_minutes': existing_job.duration_minutes
}, db_session)
if alternative_slots:
start_time, end_time, confidence = alternative_slots[0]
solutions.append({
'strategy': ResolutionStrategy.PRIORITY_PREEMPTION.value,
'conflicting_job_new_start': start_time,
'conflicting_job_new_end': end_time,
'confidence': confidence,
'description': f"Bestehenden Job zu {start_time.strftime('%H:%M')} verschieben"
})
return solutions
def _find_alternative_printers(self, job_data: Dict, db_session: Session) -> List[Tuple[int, float]]:
"""Findet alternative Drucker mit Confidence-Score"""
from blueprints.calendar import get_smart_printer_assignment
alternatives = []
start_time = job_data.get('start_time')
end_time = job_data.get('end_time')
priority = job_data.get('priority', 'normal')
# Smart Assignment nutzen
recommended_printer_id = get_smart_printer_assignment(
start_date=start_time,
end_date=end_time,
priority=priority,
db_session=db_session
)
if recommended_printer_id:
alternatives.append((recommended_printer_id, 0.9))
# Weitere verfügbare Drucker mit niedrigerer Confidence
available_printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.id != job_data.get('printer_id'),
Printer.id != recommended_printer_id
).all()
for printer in available_printers[:3]: # Top 3 Alternativen
# Einfache Verfügbarkeitsprüfung
conflicts = db_session.query(Job).filter(
Job.printer_id == printer.id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= start_time, Job.start_at < end_time),
and_(Job.end_at > start_time, Job.end_at <= end_time),
and_(Job.start_at <= start_time, Job.end_at >= end_time)
)
).count()
if conflicts == 0:
alternatives.append((printer.id, 0.6)) # Niedrigere Confidence
return alternatives
def _find_alternative_time_slots(self, job_data: Dict, db_session: Session) -> List[Tuple[datetime, datetime, float]]:
"""Findet alternative Zeitfenster"""
alternatives = []
printer_id = job_data.get('printer_id')
original_start = job_data.get('start_time')
duration_minutes = job_data.get('duration_minutes')
if not all([printer_id, original_start, duration_minutes]):
return alternatives
duration = timedelta(minutes=duration_minutes)
# Zeitfenster um ursprünglichen Termin herum testen
test_intervals = [
timedelta(hours=1), # 1 Stunde später
timedelta(hours=2), # 2 Stunden später
timedelta(hours=-1), # 1 Stunde früher
timedelta(hours=3), # 3 Stunden später
timedelta(hours=-2), # 2 Stunden früher
]
for interval in test_intervals:
new_start = original_start + interval
new_end = new_start + duration
# Verfügbarkeit prüfen
conflicts = db_session.query(Job).filter(
Job.printer_id == printer_id,
Job.status.in_(["scheduled", "running"]),
or_(
and_(Job.start_at >= new_start, Job.start_at < new_end),
and_(Job.end_at > new_start, Job.end_at <= new_end),
and_(Job.start_at <= new_start, Job.end_at >= new_end)
)
).count()
if conflicts == 0:
# Confidence basierend auf Zeitnähe zum Original
time_diff_hours = abs(interval.total_seconds() / 3600)
confidence = max(0.3, 1.0 - (time_diff_hours * 0.1))
alternatives.append((new_start, new_end, confidence))
if len(alternatives) >= 3: # Maximal 3 Alternativen
break
return alternatives
def _calculate_overlap_duration(self, start1: datetime, end1: datetime,
start2: datetime, end2: datetime) -> timedelta:
"""Berechnet Überschneidungsdauer zwischen zwei Zeiträumen"""
overlap_start = max(start1, start2)
overlap_end = min(end1, end2)
if overlap_start < overlap_end:
return overlap_end - overlap_start
else:
return timedelta(0)
# Globale Instanz für einfache Nutzung
conflict_manager = ConflictManager()

View File

@ -15,7 +15,7 @@ from sqlalchemy import func
from sqlalchemy.orm import Session
import os
from models import get_db_session, Printer
from models import get_db_session, Printer, PlugStatusLog
from utils.logging_config import get_logger
from config.settings import PRINTERS, TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_AUTO_DISCOVERY
@ -102,7 +102,8 @@ class PrinterMonitor:
success = self._turn_outlet_off(
printer.plug_ip,
printer.plug_username,
printer.plug_password
printer.plug_password,
printer_id=printer.id
)
results[printer.name] = success
@ -138,7 +139,7 @@ class PrinterMonitor:
return results
def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5) -> bool:
def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5, printer_id: int = None) -> bool:
"""
Schaltet eine TP-Link Tapo P110-Steckdose aus.
@ -147,19 +148,35 @@ class PrinterMonitor:
username: Benutzername für die Steckdose (wird überschrieben)
password: Passwort für die Steckdose (wird überschrieben)
timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat)
printer_id: ID des zugehörigen Druckers (für Logging)
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
if not TAPO_AVAILABLE:
monitor_logger.error("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Steckdose nicht schalten")
# Logging: Fehlgeschlagener Versuch
if printer_id:
try:
PlugStatusLog.log_status_change(
printer_id=printer_id,
status="disconnected",
source="system",
ip_address=ip_address,
error_message="PyP100-Modul nicht verfügbar",
notes="Startup-Initialisierung fehlgeschlagen"
)
except Exception as log_error:
monitor_logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {log_error}")
return False
# IMMER globale Anmeldedaten verwenden (da diese funktionieren)
username = TAPO_USERNAME
password = TAPO_PASSWORD
monitor_logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip_address}")
start_time = time.time()
try:
# TP-Link Tapo P100 Verbindung herstellen (P100 statt P110)
from PyP100 import PyP100
@ -169,11 +186,45 @@ class PrinterMonitor:
# Steckdose ausschalten
p100.turnOff()
response_time = int((time.time() - start_time) * 1000) # in Millisekunden
monitor_logger.debug(f"✅ Tapo-Steckdose {ip_address} erfolgreich ausgeschaltet")
# Logging: Erfolgreich ausgeschaltet
if printer_id:
try:
PlugStatusLog.log_status_change(
printer_id=printer_id,
status="off",
source="system",
ip_address=ip_address,
response_time_ms=response_time,
notes="Startup-Initialisierung: Steckdose ausgeschaltet"
)
except Exception as log_error:
monitor_logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {log_error}")
return True
except Exception as e:
response_time = int((time.time() - start_time) * 1000) # in Millisekunden
monitor_logger.debug(f"⚠️ Fehler beim Ausschalten der Tapo-Steckdose {ip_address}: {str(e)}")
# Logging: Fehlgeschlagener Versuch
if printer_id:
try:
PlugStatusLog.log_status_change(
printer_id=printer_id,
status="disconnected",
source="system",
ip_address=ip_address,
response_time_ms=response_time,
error_message=str(e),
notes="Startup-Initialisierung fehlgeschlagen"
)
except Exception as log_error:
monitor_logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {log_error}")
return False
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
@ -337,7 +388,8 @@ class PrinterMonitor:
printer.plug_ip,
printer.plug_username,
printer.plug_password,
timeout
timeout,
printer_id=printer.id
)
status_info["outlet_reachable"] = outlet_reachable
@ -432,7 +484,7 @@ class PrinterMonitor:
monitor_logger.debug(f"❌ Fehler beim Verbindungstest zu {ip_address}: {str(e)}")
return False
def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5) -> Tuple[bool, str]:
def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5, printer_id: int = None) -> Tuple[bool, str]:
"""
Überprüft den Status einer TP-Link Tapo P110-Steckdose.
@ -441,19 +493,37 @@ class PrinterMonitor:
username: Benutzername für die Steckdose
password: Passwort für die Steckdose
timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat)
printer_id: ID des zugehörigen Druckers (für Logging)
Returns:
Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown"
"""
if not TAPO_AVAILABLE:
monitor_logger.debug("⚠️ PyP100-Modul nicht verfügbar - kann Tapo-Steckdosen-Status nicht abfragen")
# Logging: Modul nicht verfügbar
if printer_id:
try:
PlugStatusLog.log_status_change(
printer_id=printer_id,
status="disconnected",
source="system",
ip_address=ip_address,
error_message="PyP100-Modul nicht verfügbar",
notes="Status-Check fehlgeschlagen"
)
except Exception as log_error:
monitor_logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {log_error}")
return False, "unknown"
# IMMER globale Anmeldedaten verwenden (da diese funktionieren)
username = TAPO_USERNAME
password = TAPO_PASSWORD
monitor_logger.debug(f"🔧 Verwende globale Tapo-Anmeldedaten für {ip_address}")
start_time = time.time()
try:
# TP-Link Tapo P100 Verbindung herstellen (P100 statt P110)
from PyP100 import PyP100
@ -468,11 +538,70 @@ class PrinterMonitor:
device_on = device_info.get('device_on', False)
status = "on" if device_on else "off"
response_time = int((time.time() - start_time) * 1000) # in Millisekunden
monitor_logger.debug(f"✅ Tapo-Steckdose {ip_address}: Status = {status}")
# Logging: Erfolgreicher Status-Check
if printer_id:
try:
# Hole zusätzliche Geräteinformationen falls verfügbar
power_consumption = None
voltage = None
current = None
firmware_version = None
try:
# Versuche Energiedaten zu holen (P110 spezifisch)
energy_usage = p100.getEnergyUsage()
if energy_usage:
power_consumption = energy_usage.get('current_power', None)
voltage = energy_usage.get('voltage', None)
current = energy_usage.get('current', None)
except:
pass # P100 unterstützt keine Energiedaten
try:
firmware_version = device_info.get('fw_ver', None)
except:
pass
PlugStatusLog.log_status_change(
printer_id=printer_id,
status=status,
source="system",
ip_address=ip_address,
power_consumption=power_consumption,
voltage=voltage,
current=current,
response_time_ms=response_time,
firmware_version=firmware_version,
notes="Automatischer Status-Check"
)
except Exception as log_error:
monitor_logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {log_error}")
return True, status
except Exception as e:
response_time = int((time.time() - start_time) * 1000) # in Millisekunden
monitor_logger.debug(f"⚠️ Fehler bei Tapo-Steckdosen-Status-Check {ip_address}: {str(e)}")
# Logging: Fehlgeschlagener Status-Check
if printer_id:
try:
PlugStatusLog.log_status_change(
printer_id=printer_id,
status="disconnected",
source="system",
ip_address=ip_address,
response_time_ms=response_time,
error_message=str(e),
notes="Status-Check fehlgeschlagen"
)
except Exception as log_error:
monitor_logger.warning(f"Fehler beim Loggen des Steckdosen-Status: {log_error}")
return False, "unknown"
def clear_all_caches(self):