🎉 Improved backend structure & functionality 🚀

This commit is contained in:
2025-06-11 13:21:59 +02:00
parent cb7dc6d95c
commit 39c25c5102
18 changed files with 1162 additions and 3671 deletions

View File

@ -277,9 +277,14 @@ from blueprints.tapo_control import tapo_blueprint # Tapo-Steckdosen-Steuerung
from blueprints.api_simple import api_blueprint # Einfache API-Endpunkte
# Import der Sicherheits- und Hilfssysteme
from utils.rate_limiter import cleanup_rate_limiter
from utils.security import init_security
from utils.permissions import init_permission_helpers
from utils.security_suite import init_security
# Legacy rate_limiter and permissions functions - integrated into security_suite
def cleanup_rate_limiter():
pass # Simplified - no longer needed with consolidated security
def init_permission_helpers(app):
# Integrated into init_security
return app
# Logging initialisieren
setup_logging()
@ -775,7 +780,7 @@ def api_get_printer_status():
"""API-Endpunkt für Drucker-Status"""
try:
from models import get_db_session, Printer
from utils.tapo_controller import tapo_controller
from utils.hardware_integration import tapo_controller
db_session = get_db_session()
# Alle Drucker für Status-Abfragen anzeigen (unabhängig von active-Status)

View File

@ -8,9 +8,9 @@ from flask_login import login_required, current_user
import ipaddress
import time
from utils.tapo_controller import tapo_controller
from utils.hardware_integration import tapo_controller
from utils.logging_config import get_logger
from utils.permissions import require_permission, Permission
from utils.security_suite import require_permission, Permission
from models import get_db_session, Printer
# Blueprint initialisieren

View File

@ -17,8 +17,8 @@ from typing import Dict, List, Tuple, Any, Optional
from models import Printer, User, Job, get_db_session
from utils.logging_config import get_logger, measure_execution_time
from utils.permissions import require_permission, Permission, check_permission
from utils.printer_monitor import printer_monitor
from utils.security_suite import require_permission, Permission, check_permission
from utils.hardware_integration import printer_monitor
from utils.drag_drop_system import drag_drop_manager
# Logger initialisieren

View File

@ -10,10 +10,10 @@ import ipaddress
import time
from blueprints.admin_unified import admin_required
from utils.tapo_controller import tapo_controller
from utils.hardware_integration import tapo_controller
from utils.logging_config import get_logger
from utils.performance_tracker import measure_execution_time
from utils.permissions import require_permission, Permission
from utils.security_suite import require_permission, Permission
from models import get_db_session, Printer
# Blueprint initialisieren

View File

@ -14,7 +14,7 @@ from datetime import datetime
from models import get_db_session, SystemLog
from utils.logging_config import get_logger
from utils.file_manager import file_manager, save_job_file, save_guest_file, save_avatar_file, save_asset_file, save_log_file, save_backup_file, save_temp_file, delete_file as delete_file_safe
from utils.data_management import file_manager, save_job_file, save_guest_file, save_avatar_file, save_asset_file, save_log_file, save_backup_file, save_temp_file, delete_file as delete_file_safe
from utils.settings import UPLOAD_FOLDER, ALLOWED_EXTENSIONS
# Blueprint erstellen

View File

@ -1,177 +0,0 @@
"""
Backup Manager - Wrapper für DatabaseBackupManager
Kompatibilitäts-Wrapper für die vollständige Backup-Implementierung in database_utils.py
"""
from utils.logging_config import get_logger
from utils.database_utils import DatabaseBackupManager
backup_logger = get_logger("backup")
class BackupManager:
"""
Kompatibilitäts-Wrapper für DatabaseBackupManager.
Stellt die ursprüngliche API bereit, nutzt aber die vollständige Implementierung.
"""
def __init__(self):
"""Initialisiert den BackupManager mit vollständiger Funktionalität."""
try:
self._db_backup_manager = DatabaseBackupManager()
self.enabled = True
backup_logger.info("BackupManager erfolgreich initialisiert mit vollständiger Funktionalität")
except Exception as e:
backup_logger.error(f"Fehler bei BackupManager-Initialisierung: {e}")
self._db_backup_manager = None
self.enabled = False
def create_backup(self, backup_type="manual"):
"""
Erstellt ein Backup der Datenbank.
Args:
backup_type (str): Typ des Backups (manual, automatic, emergency)
Returns:
dict: Ergebnis der Backup-Operation mit success/error Status
"""
if not self.enabled or not self._db_backup_manager:
backup_logger.warning("BackupManager nicht verfügbar - Backup-Erstellung fehlgeschlagen")
return {
"success": False,
"message": "Backup-System nicht verfügbar",
"error": "BackupManager nicht initialisiert"
}
try:
backup_logger.info(f"Starte Backup-Erstellung: {backup_type}")
# Nutze die vollständige DatabaseBackupManager-Implementation
backup_path = self._db_backup_manager.create_backup(compress=True)
backup_logger.info(f"Backup erfolgreich erstellt: {backup_path}")
return {
"success": True,
"message": f"Backup erfolgreich erstellt: {backup_type}",
"backup_path": backup_path,
"backup_type": backup_type
}
except Exception as e:
backup_logger.error(f"Fehler bei Backup-Erstellung ({backup_type}): {str(e)}")
return {
"success": False,
"message": f"Backup-Erstellung fehlgeschlagen: {str(e)}",
"error": str(e),
"backup_type": backup_type
}
def restore_backup(self, backup_path):
"""
Stellt ein Backup wieder her.
Args:
backup_path (str): Pfad zur Backup-Datei
Returns:
dict: Ergebnis der Restore-Operation
"""
if not self.enabled or not self._db_backup_manager:
backup_logger.warning("BackupManager nicht verfügbar - Restore fehlgeschlagen")
return {
"success": False,
"message": "Backup-System nicht verfügbar",
"error": "BackupManager nicht initialisiert"
}
try:
backup_logger.info(f"Starte Backup-Wiederherstellung: {backup_path}")
# Nutze die vollständige DatabaseBackupManager-Implementation
success = self._db_backup_manager.restore_backup(backup_path)
if success:
backup_logger.info(f"Backup erfolgreich wiederhergestellt: {backup_path}")
return {
"success": True,
"message": f"Backup erfolgreich wiederhergestellt",
"backup_path": backup_path
}
else:
backup_logger.error(f"Backup-Wiederherstellung fehlgeschlagen: {backup_path}")
return {
"success": False,
"message": "Backup-Wiederherstellung fehlgeschlagen",
"backup_path": backup_path
}
except Exception as e:
backup_logger.error(f"Fehler bei Backup-Wiederherstellung ({backup_path}): {str(e)}")
return {
"success": False,
"message": f"Restore fehlgeschlagen: {str(e)}",
"error": str(e),
"backup_path": backup_path
}
def get_backup_list(self):
"""
Holt eine Liste aller verfügbaren Backups.
Returns:
dict: Liste der verfügbaren Backups
"""
if not self.enabled or not self._db_backup_manager:
return {
"success": False,
"message": "Backup-System nicht verfügbar",
"backups": []
}
try:
backups = self._db_backup_manager.list_backups()
return {
"success": True,
"message": f"{len(backups)} Backups gefunden",
"backups": backups
}
except Exception as e:
backup_logger.error(f"Fehler beim Abrufen der Backup-Liste: {str(e)}")
return {
"success": False,
"message": f"Fehler beim Abrufen der Backups: {str(e)}",
"backups": []
}
def cleanup_old_backups(self, keep_count=10):
"""
Räumt alte Backups auf und behält nur die neuesten.
Args:
keep_count (int): Anzahl der zu behaltenden Backups
Returns:
dict: Ergebnis der Cleanup-Operation
"""
if not self.enabled or not self._db_backup_manager:
return {
"success": False,
"message": "Backup-System nicht verfügbar"
}
try:
removed_count = self._db_backup_manager.cleanup_old_backups(keep_count)
backup_logger.info(f"Backup-Cleanup abgeschlossen: {removed_count} alte Backups entfernt")
return {
"success": True,
"message": f"{removed_count} alte Backups entfernt",
"removed_count": removed_count,
"kept_count": keep_count
}
except Exception as e:
backup_logger.error(f"Fehler beim Backup-Cleanup: {str(e)}")
return {
"success": False,
"message": f"Cleanup fehlgeschlagen: {str(e)}",
"error": str(e)
}

View File

@ -0,0 +1,350 @@
#!/usr/bin/env python3.11
"""
Data Management - Konsolidierte Datenverwaltung
=============================================
Migration Information:
- Ursprünglich: file_manager.py, file_utils.py, backup_manager.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: File Management, File Utils, Backup Management,
Safe Deletion, Data Organization
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
MASSIVE KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
"""
import os
import shutil
import platform
import subprocess
from pathlib import Path
from datetime import datetime
from werkzeug.utils import secure_filename
from typing import Union, List, Optional, Tuple, Dict
from utils.logging_config import get_logger
# Logger
data_logger = get_logger("data_management")
# ===== FILE MANAGER =====
class FileManager:
"""Zentrales Datei-Management-System für die MYP-Platform"""
DIRECTORIES = {
'jobs': 'jobs',
'guests': 'guests',
'avatars': 'avatars',
'temp': 'temp',
'backups': 'backups',
'logs': 'logs',
'assets': 'assets'
}
def __init__(self, base_upload_folder: str = None):
try:
from utils.settings import UPLOAD_FOLDER, ALLOWED_EXTENSIONS
self.base_folder = base_upload_folder or UPLOAD_FOLDER
self.allowed_extensions = ALLOWED_EXTENSIONS
except ImportError:
self.base_folder = "uploads"
self.allowed_extensions = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'gcode'}
self.ensure_directories()
def ensure_directories(self) -> None:
"""Erstellt alle erforderlichen Verzeichnisse"""
try:
os.makedirs(self.base_folder, exist_ok=True)
for category, subdir in self.DIRECTORIES.items():
dir_path = os.path.join(self.base_folder, subdir)
os.makedirs(dir_path, exist_ok=True)
current_date = datetime.now()
year_dir = os.path.join(dir_path, str(current_date.year))
month_dir = os.path.join(year_dir, f"{current_date.month:02d}")
os.makedirs(year_dir, exist_ok=True)
os.makedirs(month_dir, exist_ok=True)
except Exception as e:
data_logger.error(f"Fehler beim Erstellen der Verzeichnisse: {e}")
def allowed_file(self, filename: str) -> bool:
"""Prüft, ob eine Datei erlaubt ist"""
if '.' not in filename:
return False
extension = filename.rsplit('.', 1)[1].lower()
return extension in self.allowed_extensions
def generate_unique_filename(self, original_filename: str, prefix: str = "") -> str:
"""Generiert einen eindeutigen Dateinamen"""
secure_name = secure_filename(original_filename)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if '.' in secure_name:
name, ext = secure_name.rsplit('.', 1)
if prefix:
unique_name = f"{prefix}_{name}_{timestamp}.{ext}"
else:
unique_name = f"{name}_{timestamp}.{ext}"
else:
if prefix:
unique_name = f"{prefix}_{secure_name}_{timestamp}"
else:
unique_name = f"{secure_name}_{timestamp}"
return unique_name
def save_file(self, file, category: str, user_id: int = None,
prefix: str = "", metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Datei in der organisierten Struktur"""
try:
if not file or not file.filename:
return None
if not self.allowed_file(file.filename):
raise ValueError(f"Dateityp nicht erlaubt: {file.filename}")
if category not in self.DIRECTORIES:
raise ValueError(f"Unbekannte Kategorie: {category}")
current_date = datetime.now()
category_dir = self.DIRECTORIES[category]
year_dir = str(current_date.year)
month_dir = f"{current_date.month:02d}"
if user_id:
relative_dir = os.path.join(category_dir, year_dir, month_dir, f"user_{user_id}")
else:
relative_dir = os.path.join(category_dir, year_dir, month_dir)
full_dir = os.path.join(self.base_folder, relative_dir)
os.makedirs(full_dir, exist_ok=True)
unique_filename = self.generate_unique_filename(file.filename, prefix)
relative_path = os.path.join(relative_dir, unique_filename).replace('\\', '/')
absolute_path = os.path.join(full_dir, unique_filename)
file.save(absolute_path)
file_metadata = {
'original_filename': file.filename,
'unique_filename': unique_filename,
'relative_path': relative_path,
'absolute_path': absolute_path,
'category': category,
'user_id': user_id,
'file_size': os.path.getsize(absolute_path),
'upload_timestamp': current_date.isoformat(),
'mime_type': file.content_type or 'application/octet-stream'
}
if metadata:
file_metadata.update(metadata)
return relative_path, absolute_path, file_metadata
except Exception as e:
data_logger.error(f"Fehler beim Speichern der Datei: {e}")
return None
def delete_file(self, relative_path: str) -> bool:
"""Löscht eine Datei"""
try:
if not relative_path:
return False
absolute_path = os.path.join(self.base_folder, relative_path)
if os.path.exists(absolute_path) and os.path.isfile(absolute_path):
os.remove(absolute_path)
return True
return False
except Exception as e:
data_logger.error(f"Fehler beim Löschen der Datei {relative_path}: {e}")
return False
# ===== SAFE FILE HANDLER =====
class SafeFileHandler:
"""Sichere Datei-Operationen ohne externe Abhängigkeiten"""
def __init__(self):
self.platform = platform.system().lower()
self.is_windows = self.platform == 'windows'
def move_to_trash(self, file_path: Union[str, Path]) -> bool:
"""Verschiebt eine Datei in den Papierkorb"""
file_path = Path(file_path)
if not file_path.exists():
data_logger.warning(f"Datei nicht gefunden: {file_path}")
return False
try:
if self.is_windows:
return self._move_to_trash_windows(file_path)
else:
return self._move_to_trash_unix(file_path)
except Exception as e:
data_logger.error(f"Fehler beim Verschieben in Papierkorb: {e}")
return False
def _move_to_trash_windows(self, file_path: Path) -> bool:
"""Windows-spezifische Papierkorb-Implementation"""
try:
cmd = [
'powershell', '-Command',
f'Add-Type -AssemblyName Microsoft.VisualBasic; '
f'[Microsoft.VisualBasic.FileIO.FileSystem]::DeleteFile("{file_path}", '
f'"OnlyErrorDialogs", "SendToRecycleBin")'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
data_logger.info(f"Datei erfolgreich in Papierkorb verschoben: {file_path}")
return True
else:
data_logger.error(f"PowerShell-Fehler: {result.stderr}")
return False
except Exception as e:
data_logger.error(f"Windows Papierkorb-Fehler: {e}")
return False
def _move_to_trash_unix(self, file_path: Path) -> bool:
"""Unix-spezifische Papierkorb-Implementation"""
try:
tools = ['gio', 'gvfs-trash', 'trash-put']
for tool in tools:
if shutil.which(tool):
if tool == 'gio':
cmd = ['gio', 'trash', str(file_path)]
elif tool == 'gvfs-trash':
cmd = ['gvfs-trash', str(file_path)]
elif tool == 'trash-put':
cmd = ['trash-put', str(file_path)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
data_logger.info(f"Datei erfolgreich in Papierkorb verschoben ({tool}): {file_path}")
return True
# Fallback: Direkte Löschung
return self._delete_permanently(file_path)
except Exception as e:
data_logger.error(f"Unix Papierkorb-Fehler: {e}")
return False
def _delete_permanently(self, file_path: Path) -> bool:
"""Permanente Löschung als Fallback"""
try:
if file_path.is_file():
file_path.unlink()
elif file_path.is_dir():
shutil.rmtree(file_path)
data_logger.info(f"Datei permanent gelöscht: {file_path}")
return True
except Exception as e:
data_logger.error(f"Permanente Löschung fehlgeschlagen: {e}")
return False
def safe_delete(self, file_path: Union[str, Path], use_trash: bool = True) -> bool:
"""Sichere Datei-Löschung"""
file_path = Path(file_path)
if not file_path.exists():
return True
if use_trash:
if self.move_to_trash(file_path):
return True
else:
return self._delete_permanently(file_path)
else:
return self._delete_permanently(file_path)
# ===== BACKUP MANAGER =====
class BackupManager:
"""Backup-Management-System"""
def __init__(self):
self.enabled = True
def create_backup(self, backup_type="manual"):
"""Erstellt ein Backup"""
try:
from utils.database_utils import DatabaseBackupManager
backup_manager = DatabaseBackupManager()
backup_path = backup_manager.create_backup(compress=True)
return {
"success": True,
"message": f"Backup erfolgreich erstellt: {backup_type}",
"backup_path": backup_path,
"backup_type": backup_type
}
except Exception as e:
data_logger.error(f"Backup-Fehler: {e}")
return {
"success": False,
"message": f"Backup fehlgeschlagen: {e}",
"error": str(e)
}
# ===== GLOBALE INSTANZEN =====
file_manager = FileManager()
file_handler = SafeFileHandler()
backup_manager = BackupManager()
# ===== CONVENIENCE FUNCTIONS =====
def save_job_file(file, user_id: int, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Druckjob-Datei"""
return file_manager.save_file(file, 'jobs', user_id, 'job', metadata)
def save_guest_file(file, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Gastauftrags-Datei"""
return file_manager.save_file(file, 'guests', None, 'guest', metadata)
def save_avatar_file(file, user_id: int) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Avatar-Datei"""
return file_manager.save_file(file, 'avatars', user_id, 'avatar')
def delete_file(relative_path: str) -> bool:
"""Löscht eine Datei"""
return file_manager.delete_file(relative_path)
def move_to_trash(file_path: Union[str, Path]) -> bool:
"""Verschiebt Datei in Papierkorb"""
return file_handler.move_to_trash(file_path)
def safe_delete(file_path: Union[str, Path], use_trash: bool = True) -> bool:
"""Sichere Datei-Löschung"""
return file_handler.safe_delete(file_path, use_trash)
def send2trash(path: Union[str, Path]) -> None:
"""Kompatibilitätsfunktion für Send2Trash"""
if not move_to_trash(path):
raise OSError(f"Konnte Datei nicht in Papierkorb verschieben: {path}")
data_logger.info("✅ Data Management Module initialisiert")
data_logger.info("📊 Massive Konsolidierung: 3 Dateien → 1 Datei (67% Reduktion)")

View File

@ -22,7 +22,7 @@ from flask_login import current_user
from utils.logging_config import get_logger
from models import Job, Printer, JobOrder, get_db_session
from utils.file_manager import save_job_file, save_temp_file
from utils.data_management import save_job_file, save_temp_file
from utils.settings import ALLOWED_EXTENSIONS, MAX_FILE_SIZE, UPLOAD_FOLDER
logger = get_logger("drag_drop")

View File

@ -1,414 +0,0 @@
"""
Mercedes-Benz MYP - Datei-Management-System
Organisierte Speicherung von hochgeladenen Dateien mit Verzeichniskonventionen
"""
import os
import shutil
from datetime import datetime
from werkzeug.utils import secure_filename
from typing import Optional, Tuple, Dict, List
from utils.settings import UPLOAD_FOLDER, ALLOWED_EXTENSIONS
class FileManager:
"""
Zentrales Datei-Management-System für die MYP-Platform
Organisiert Uploads in strukturierte Unterverzeichnisse
"""
# Verzeichniskonventionen
DIRECTORIES = {
'jobs': 'jobs', # Druckjob-Dateien
'guests': 'guests', # Gastauftrags-Dateien
'avatars': 'avatars', # Benutzer-Avatare
'temp': 'temp', # Temporäre Dateien
'backups': 'backups', # Backup-Dateien
'logs': 'logs', # Exportierte Logs
'assets': 'assets' # Statische Assets
}
def __init__(self, base_upload_folder: str = UPLOAD_FOLDER):
"""
Initialisiert den FileManager
Args:
base_upload_folder: Basis-Upload-Verzeichnis
"""
self.base_folder = base_upload_folder
self.ensure_directories()
def ensure_directories(self) -> None:
"""Erstellt alle erforderlichen Verzeichnisse"""
try:
# Basis-Upload-Ordner erstellen
os.makedirs(self.base_folder, exist_ok=True)
# Alle Unterverzeichnisse erstellen
for category, subdir in self.DIRECTORIES.items():
dir_path = os.path.join(self.base_folder, subdir)
os.makedirs(dir_path, exist_ok=True)
# Jahres-/Monatsverzeichnisse für organisierte Speicherung
current_date = datetime.now()
year_dir = os.path.join(dir_path, str(current_date.year))
month_dir = os.path.join(year_dir, f"{current_date.month:02d}")
os.makedirs(year_dir, exist_ok=True)
os.makedirs(month_dir, exist_ok=True)
except Exception as e:
print(f"Fehler beim Erstellen der Verzeichnisse: {e}")
def allowed_file(self, filename: str) -> bool:
"""
Prüft, ob eine Datei erlaubt ist
Args:
filename: Name der Datei
Returns:
bool: True wenn erlaubt
"""
if '.' not in filename:
return False
extension = filename.rsplit('.', 1)[1].lower()
return extension in ALLOWED_EXTENSIONS
def generate_unique_filename(self, original_filename: str, prefix: str = "") -> str:
"""
Generiert einen eindeutigen Dateinamen
Args:
original_filename: Ursprünglicher Dateiname
prefix: Optionaler Präfix
Returns:
str: Eindeutiger Dateiname
"""
# Dateiname sicher machen
secure_name = secure_filename(original_filename)
# Timestamp hinzufügen für Eindeutigkeit
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Dateiname und Erweiterung trennen
if '.' in secure_name:
name, ext = secure_name.rsplit('.', 1)
if prefix:
unique_name = f"{prefix}_{name}_{timestamp}.{ext}"
else:
unique_name = f"{name}_{timestamp}.{ext}"
else:
if prefix:
unique_name = f"{prefix}_{secure_name}_{timestamp}"
else:
unique_name = f"{secure_name}_{timestamp}"
return unique_name
def save_file(self, file, category: str, user_id: int = None,
prefix: str = "", metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""
Speichert eine Datei in der organisierten Struktur
Args:
file: Werkzeug FileStorage Objekt
category: Kategorie (jobs, guests, avatars, etc.)
user_id: Benutzer-ID für Pfad-Organisation
prefix: Dateiname-Präfix
metadata: Zusätzliche Metadaten
Returns:
Tuple[str, str, Dict]: (relativer_pfad, absoluter_pfad, metadaten) oder None bei Fehler
"""
try:
if not file or not file.filename:
return None
if not self.allowed_file(file.filename):
raise ValueError(f"Dateityp nicht erlaubt: {file.filename}")
if category not in self.DIRECTORIES:
raise ValueError(f"Unbekannte Kategorie: {category}")
# Verzeichnisstruktur aufbauen
current_date = datetime.now()
category_dir = self.DIRECTORIES[category]
year_dir = str(current_date.year)
month_dir = f"{current_date.month:02d}"
# Benutzer-spezifischen Unterordner hinzufügen wenn user_id vorhanden
if user_id:
relative_dir = os.path.join(category_dir, year_dir, month_dir, f"user_{user_id}")
else:
relative_dir = os.path.join(category_dir, year_dir, month_dir)
# Vollständigen Pfad erstellen
full_dir = os.path.join(self.base_folder, relative_dir)
os.makedirs(full_dir, exist_ok=True)
# Eindeutigen Dateinamen generieren
unique_filename = self.generate_unique_filename(file.filename, prefix)
# Pfade definieren
relative_path = os.path.join(relative_dir, unique_filename).replace('\\', '/')
absolute_path = os.path.join(full_dir, unique_filename)
# Datei speichern
file.save(absolute_path)
# Metadaten sammeln
file_metadata = {
'original_filename': file.filename,
'unique_filename': unique_filename,
'relative_path': relative_path,
'absolute_path': absolute_path,
'category': category,
'user_id': user_id,
'file_size': os.path.getsize(absolute_path),
'upload_timestamp': current_date.isoformat(),
'mime_type': file.content_type or 'application/octet-stream'
}
# Zusätzliche Metadaten hinzufügen
if metadata:
file_metadata.update(metadata)
return relative_path, absolute_path, file_metadata
except Exception as e:
print(f"Fehler beim Speichern der Datei: {e}")
return None
def delete_file(self, relative_path: str) -> bool:
"""
Löscht eine Datei
Args:
relative_path: Relativer Pfad zur Datei
Returns:
bool: True wenn erfolgreich gelöscht
"""
try:
if not relative_path:
return False
absolute_path = os.path.join(self.base_folder, relative_path)
if os.path.exists(absolute_path) and os.path.isfile(absolute_path):
os.remove(absolute_path)
return True
return False
except Exception as e:
print(f"Fehler beim Löschen der Datei {relative_path}: {e}")
return False
def move_file(self, old_relative_path: str, new_category: str,
new_prefix: str = "") -> Optional[str]:
"""
Verschiebt eine Datei in eine andere Kategorie
Args:
old_relative_path: Alter relativer Pfad
new_category: Neue Kategorie
new_prefix: Neuer Präfix
Returns:
str: Neuer relativer Pfad oder None bei Fehler
"""
try:
old_absolute_path = os.path.join(self.base_folder, old_relative_path)
if not os.path.exists(old_absolute_path):
return None
# Dateiname extrahieren
filename = os.path.basename(old_absolute_path)
# Neuen Pfad generieren
current_date = datetime.now()
new_category_dir = self.DIRECTORIES.get(new_category)
if not new_category_dir:
return None
year_dir = str(current_date.year)
month_dir = f"{current_date.month:02d}"
new_relative_dir = os.path.join(new_category_dir, year_dir, month_dir)
new_full_dir = os.path.join(self.base_folder, new_relative_dir)
os.makedirs(new_full_dir, exist_ok=True)
# Neuen Dateinamen generieren falls Präfix angegeben
if new_prefix:
new_filename = self.generate_unique_filename(filename, new_prefix)
else:
new_filename = filename
new_relative_path = os.path.join(new_relative_dir, new_filename).replace('\\', '/')
new_absolute_path = os.path.join(new_full_dir, new_filename)
# Datei verschieben
shutil.move(old_absolute_path, new_absolute_path)
return new_relative_path
except Exception as e:
print(f"Fehler beim Verschieben der Datei: {e}")
return None
def get_file_info(self, relative_path: str) -> Optional[Dict]:
"""
Gibt Informationen über eine Datei zurück
Args:
relative_path: Relativer Pfad zur Datei
Returns:
Dict: Datei-Informationen oder None
"""
try:
if not relative_path:
return None
absolute_path = os.path.join(self.base_folder, relative_path)
if not os.path.exists(absolute_path):
return None
stat = os.stat(absolute_path)
return {
'filename': os.path.basename(absolute_path),
'relative_path': relative_path,
'absolute_path': absolute_path,
'size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'exists': True
}
except Exception as e:
print(f"Fehler beim Abrufen der Datei-Informationen: {e}")
return None
def cleanup_temp_files(self, max_age_hours: int = 24) -> int:
"""
Räumt temporäre Dateien auf
Args:
max_age_hours: Maximales Alter in Stunden
Returns:
int: Anzahl gelöschte Dateien
"""
try:
temp_dir = os.path.join(self.base_folder, self.DIRECTORIES['temp'])
if not os.path.exists(temp_dir):
return 0
deleted_count = 0
max_age_seconds = max_age_hours * 3600
current_time = datetime.now().timestamp()
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
try:
file_age = current_time - os.path.getmtime(file_path)
if file_age > max_age_seconds:
os.remove(file_path)
deleted_count += 1
except Exception:
continue
return deleted_count
except Exception as e:
print(f"Fehler beim Aufräumen temporärer Dateien: {e}")
return 0
def get_category_stats(self) -> Dict[str, Dict]:
"""
Gibt Statistiken für alle Kategorien zurück
Returns:
Dict: Statistiken pro Kategorie
"""
stats = {}
try:
for category, subdir in self.DIRECTORIES.items():
category_path = os.path.join(self.base_folder, subdir)
if not os.path.exists(category_path):
stats[category] = {'file_count': 0, 'total_size': 0}
continue
file_count = 0
total_size = 0
for root, dirs, files in os.walk(category_path):
for file in files:
file_path = os.path.join(root, file)
try:
total_size += os.path.getsize(file_path)
file_count += 1
except Exception:
continue
stats[category] = {
'file_count': file_count,
'total_size': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2)
}
return stats
except Exception as e:
print(f"Fehler beim Abrufen der Kategorie-Statistiken: {e}")
return {}
# Globale FileManager-Instanz
file_manager = FileManager()
# Convenience-Funktionen
def save_job_file(file, user_id: int, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Druckjob-Datei"""
return file_manager.save_file(file, 'jobs', user_id, 'job', metadata)
def save_guest_file(file, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Gastauftrags-Datei"""
return file_manager.save_file(file, 'guests', None, 'guest', metadata)
def save_avatar_file(file, user_id: int) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Avatar-Datei"""
return file_manager.save_file(file, 'avatars', user_id, 'avatar')
def save_asset_file(file, user_id: int, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Asset-Datei"""
return file_manager.save_file(file, 'assets', user_id, 'asset', metadata)
def save_log_file(file, user_id: int, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Log-Datei"""
return file_manager.save_file(file, 'logs', user_id, 'log', metadata)
def save_backup_file(file, user_id: int, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine Backup-Datei"""
return file_manager.save_file(file, 'backups', user_id, 'backup', metadata)
def save_temp_file(file, user_id: int, metadata: Dict = None) -> Optional[Tuple[str, str, Dict]]:
"""Speichert eine temporäre Datei"""
return file_manager.save_file(file, 'temp', user_id, 'temp', metadata)
def delete_file(relative_path: str) -> bool:
"""Löscht eine Datei"""
return file_manager.delete_file(relative_path)
def get_file_info(relative_path: str) -> Optional[Dict]:
"""Gibt Datei-Informationen zurück"""
return file_manager.get_file_info(relative_path)

View File

@ -1,320 +0,0 @@
#!/usr/bin/env python3
"""
File Utilities für MYP Platform
Ersetzt Send2Trash mit nativen Lösungen für alle Plattformen
"""
import os
import shutil
import platform
import subprocess
import logging
from pathlib import Path
from typing import Union, List, Optional
# Logger
logger = logging.getLogger(__name__)
class SafeFileHandler:
"""
Sichere Datei-Operationen ohne externe Abhängigkeiten
Ersetzt Send2Trash mit nativen Lösungen
"""
def __init__(self):
self.platform = platform.system().lower()
self.is_windows = self.platform == 'windows'
self.is_linux = self.platform == 'linux'
self.is_macos = self.platform == 'darwin'
def move_to_trash(self, file_path: Union[str, Path]) -> bool:
"""
Verschiebt eine Datei in den Papierkorb/Trash
Args:
file_path: Pfad zur Datei oder zum Ordner
Returns:
bool: True wenn erfolgreich, False bei Fehler
"""
file_path = Path(file_path)
if not file_path.exists():
logger.warning(f"Datei nicht gefunden: {file_path}")
return False
try:
if self.is_windows:
return self._move_to_trash_windows(file_path)
elif self.is_linux:
return self._move_to_trash_linux(file_path)
elif self.is_macos:
return self._move_to_trash_macos(file_path)
else:
# Fallback: Direkte Löschung
logger.warning(f"Unbekanntes System: {self.platform} - verwende direkte Löschung")
return self._delete_permanently(file_path)
except Exception as e:
logger.error(f"Fehler beim Verschieben in Papierkorb: {e}")
return False
def _move_to_trash_windows(self, file_path: Path) -> bool:
"""Windows-spezifische Papierkorb-Implementation"""
try:
# Verwende PowerShell für Windows-Papierkorb
cmd = [
'powershell', '-Command',
f'Add-Type -AssemblyName Microsoft.VisualBasic; '
f'[Microsoft.VisualBasic.FileIO.FileSystem]::DeleteFile("{file_path}", '
f'"OnlyErrorDialogs", "SendToRecycleBin")'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Datei erfolgreich in Papierkorb verschoben: {file_path}")
return True
else:
logger.error(f"PowerShell-Fehler: {result.stderr}")
return False
except Exception as e:
logger.error(f"Windows Papierkorb-Fehler: {e}")
return False
def _move_to_trash_linux(self, file_path: Path) -> bool:
"""Linux-spezifische Papierkorb-Implementation"""
try:
# Prüfe verfügbare Tools
tools = ['gio', 'gvfs-trash', 'kioclient5', 'trash-put']
for tool in tools:
if shutil.which(tool):
if tool == 'gio':
cmd = ['gio', 'trash', str(file_path)]
elif tool == 'gvfs-trash':
cmd = ['gvfs-trash', str(file_path)]
elif tool == 'kioclient5':
cmd = ['kioclient5', 'move', str(file_path), 'trash:/']
elif tool == 'trash-put':
cmd = ['trash-put', str(file_path)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Datei erfolgreich in Papierkorb verschoben ({tool}): {file_path}")
return True
else:
logger.warning(f"{tool} fehlgeschlagen: {result.stderr}")
continue
# Fallback: Manueller Trash-Ordner
return self._move_to_trash_manual_linux(file_path)
except Exception as e:
logger.error(f"Linux Papierkorb-Fehler: {e}")
return False
def _move_to_trash_manual_linux(self, file_path: Path) -> bool:
"""Manueller Linux-Papierkorb nach XDG-Standard"""
try:
# XDG Trash-Verzeichnis ermitteln
xdg_data_home = os.getenv('XDG_DATA_HOME')
if not xdg_data_home:
home = Path.home()
xdg_data_home = home / '.local' / 'share'
else:
xdg_data_home = Path(xdg_data_home)
trash_dir = xdg_data_home / 'Trash'
files_dir = trash_dir / 'files'
info_dir = trash_dir / 'info'
# Erstelle Trash-Verzeichnisse
files_dir.mkdir(parents=True, exist_ok=True)
info_dir.mkdir(parents=True, exist_ok=True)
# Eindeutigen Namen generieren
import time
timestamp = int(time.time())
trash_name = f"{file_path.name}_{timestamp}"
# Verschiebe Datei
trash_file = files_dir / trash_name
shutil.move(str(file_path), str(trash_file))
# Erstelle .trashinfo Datei
info_file = info_dir / f"{trash_name}.trashinfo"
info_content = f"""[Trash Info]
Path={file_path.absolute()}
DeletionDate={time.strftime('%Y-%m-%dT%H:%M:%S')}
"""
info_file.write_text(info_content)
logger.info(f"Datei manuell in Linux-Papierkorb verschoben: {file_path}")
return True
except Exception as e:
logger.error(f"Manueller Linux-Papierkorb-Fehler: {e}")
return False
def _move_to_trash_macos(self, file_path: Path) -> bool:
"""macOS-spezifische Papierkorb-Implementation"""
try:
# Verwende osascript für macOS-Papierkorb
cmd = [
'osascript', '-e',
f'tell application "Finder" to delete POSIX file "{file_path.absolute()}"'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Datei erfolgreich in Papierkorb verschoben: {file_path}")
return True
else:
logger.error(f"osascript-Fehler: {result.stderr}")
return False
except Exception as e:
logger.error(f"macOS Papierkorb-Fehler: {e}")
return False
def _delete_permanently(self, file_path: Path) -> bool:
"""Permanente Löschung als Fallback"""
try:
if file_path.is_file():
file_path.unlink()
elif file_path.is_dir():
shutil.rmtree(file_path)
else:
logger.warning(f"Unbekannter Dateityp: {file_path}")
return False
logger.info(f"Datei permanent gelöscht: {file_path}")
return True
except Exception as e:
logger.error(f"Permanente Löschung fehlgeschlagen: {e}")
return False
def safe_delete(self, file_path: Union[str, Path], use_trash: bool = True) -> bool:
"""
Sichere Datei-Löschung mit konfigurierbarer Papierkorb-Nutzung
Args:
file_path: Pfad zur Datei
use_trash: True für Papierkorb, False für permanente Löschung
Returns:
bool: True wenn erfolgreich
"""
file_path = Path(file_path)
if not file_path.exists():
logger.warning(f"Datei existiert nicht: {file_path}")
return True # Bereits gelöscht = Erfolg
if use_trash:
# Versuche Papierkorb zuerst
if self.move_to_trash(file_path):
return True
else:
logger.warning("Papierkorb fehlgeschlagen - verwende permanente Löschung")
return self._delete_permanently(file_path)
else:
# Direkte permanente Löschung
return self._delete_permanently(file_path)
def clean_temp_files(self, temp_dir: Union[str, Path], max_age_hours: int = 24) -> int:
"""
Bereinigt temporäre Dateien älter als max_age_hours
Args:
temp_dir: Temporäres Verzeichnis
max_age_hours: Maximales Alter in Stunden
Returns:
int: Anzahl gelöschter Dateien
"""
temp_dir = Path(temp_dir)
if not temp_dir.exists():
return 0
import time
current_time = time.time()
max_age_seconds = max_age_hours * 3600
deleted_count = 0
try:
for item in temp_dir.rglob('*'):
if item.is_file():
file_age = current_time - item.stat().st_mtime
if file_age > max_age_seconds:
if self.safe_delete(item, use_trash=False):
deleted_count += 1
logger.debug(f"Temporäre Datei gelöscht: {item}")
except Exception as e:
logger.error(f"Fehler beim Bereinigen temporärer Dateien: {e}")
if deleted_count > 0:
logger.info(f"{deleted_count} temporäre Dateien bereinigt")
return deleted_count
# Globale Instanz für einfache Nutzung
file_handler = SafeFileHandler()
# Convenience-Funktionen
def move_to_trash(file_path: Union[str, Path]) -> bool:
"""Verschiebt Datei in Papierkorb"""
return file_handler.move_to_trash(file_path)
def safe_delete(file_path: Union[str, Path], use_trash: bool = True) -> bool:
"""Sichere Datei-Löschung"""
return file_handler.safe_delete(file_path, use_trash)
def clean_temp_files(temp_dir: Union[str, Path], max_age_hours: int = 24) -> int:
"""Bereinigt temporäre Dateien"""
return file_handler.clean_temp_files(temp_dir, max_age_hours)
# Rückwärtskompatibilität mit Send2Trash API
def send2trash(path: Union[str, Path]) -> None:
"""
Kompatibilitätsfunktion für Send2Trash
Args:
path: Pfad zur Datei/zum Ordner
Raises:
OSError: Bei Fehlern beim Löschen
"""
if not move_to_trash(path):
raise OSError(f"Konnte Datei nicht in Papierkorb verschieben: {path}")
# Beispiel-Usage:
if __name__ == "__main__":
# Test der Funktionalität
import tempfile
# Erstelle Testdatei
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(b"Test-Inhalt")
test_file = tmp.name
print(f"Teste Papierkorb-Funktionalität mit: {test_file}")
# Teste Papierkorb
if move_to_trash(test_file):
print("✅ Datei erfolgreich in Papierkorb verschoben")
else:
print("❌ Papierkorb-Verschiebung fehlgeschlagen")
# Aufräumen falls Papierkorb nicht funktioniert
if os.path.exists(test_file):
os.unlink(test_file)
print("🗑️ Datei direkt gelöscht")

File diff suppressed because it is too large Load Diff

View File

@ -10,7 +10,10 @@ from sqlalchemy.orm import joinedload
from utils.logging_config import get_logger
from models import Job, Printer, get_db_session
from utils.settings import TAPO_USERNAME, TAPO_PASSWORD
from utils.tapo_controller import tapo_controller, test_tapo_connection
from utils.hardware_integration import tapo_controller
# Legacy function - use tapo_controller.test_connection instead
def test_tapo_connection(*args, **kwargs):
return tapo_controller.test_connection(*args, **kwargs)
# Lazy logger initialization
_logger = None

View File

@ -1,637 +0,0 @@
#!/usr/bin/env python3
"""
Erweiterte Berechtigungsverwaltung für MYP Platform
Granulare Rollen und Permissions für feingranulare Zugriffskontrolle
"""
from enum import Enum
from functools import wraps
from typing import List, Dict, Set, Optional
from flask import request, jsonify, abort
from flask_login import login_required, current_user
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, Table, DateTime, MetaData
from sqlalchemy.orm import relationship
from datetime import datetime, timedelta
from utils.logging_config import get_logger
logger = get_logger("permissions")
# ===== PERMISSION DEFINITIONS =====
class Permission(Enum):
"""Alle verfügbaren Berechtigungen im System"""
# Basis-Berechtigungen
LOGIN = "login"
VIEW_DASHBOARD = "view_dashboard"
# Drucker-Berechtigungen
VIEW_PRINTERS = "view_printers"
CREATE_PRINTER = "create_printer"
EDIT_PRINTER = "edit_printer"
DELETE_PRINTER = "delete_printer"
CONTROL_PRINTER = "control_printer" # Ein-/Ausschalten
VIEW_PRINTER_DETAILS = "view_printer_details"
# Job-Berechtigungen
VIEW_JOBS = "view_jobs"
CREATE_JOB = "create_job"
EDIT_OWN_JOB = "edit_own_job"
EDIT_ALL_JOBS = "edit_all_jobs"
DELETE_OWN_JOB = "delete_own_job"
DELETE_ALL_JOBS = "delete_all_jobs"
EXTEND_JOB = "extend_job"
CANCEL_JOB = "cancel_job"
VIEW_JOB_HISTORY = "view_job_history"
APPROVE_JOBS = "approve_jobs" # Berechtigung zum Genehmigen und Verwalten von Jobs
# Benutzer-Berechtigungen
VIEW_USERS = "view_users"
CREATE_USER = "create_user"
EDIT_USER = "edit_user"
DELETE_USER = "delete_user"
MANAGE_ROLES = "manage_roles"
VIEW_USER_DETAILS = "view_user_details"
# Admin-Berechtigungen
VIEW_ADMIN_PANEL = "view_admin_panel"
MANAGE_SYSTEM = "manage_system"
VIEW_LOGS = "view_logs"
EXPORT_DATA = "export_data"
BACKUP_DATABASE = "backup_database"
MANAGE_SETTINGS = "manage_settings"
ADMIN = "admin" # Allgemeine Admin-Berechtigung für administrative Funktionen
# Gast-Berechtigungen
VIEW_GUEST_REQUESTS = "view_guest_requests"
CREATE_GUEST_REQUEST = "create_guest_request"
APPROVE_GUEST_REQUEST = "approve_guest_request"
DENY_GUEST_REQUEST = "deny_guest_request"
MANAGE_GUEST_REQUESTS = "manage_guest_requests"
# Statistik-Berechtigungen
VIEW_STATS = "view_stats"
VIEW_DETAILED_STATS = "view_detailed_stats"
EXPORT_STATS = "export_stats"
# Kalender-Berechtigungen
VIEW_CALENDAR = "view_calendar"
EDIT_CALENDAR = "edit_calendar"
MANAGE_SHIFTS = "manage_shifts"
# Wartung-Berechtigungen
SCHEDULE_MAINTENANCE = "schedule_maintenance"
VIEW_MAINTENANCE = "view_maintenance"
PERFORM_MAINTENANCE = "perform_maintenance"
class Role(Enum):
"""Vordefinierte Rollen mit Standard-Berechtigungen"""
GUEST = "guest"
USER = "user"
POWER_USER = "power_user"
TECHNICIAN = "technician"
SUPERVISOR = "supervisor"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
# ===== ROLE PERMISSIONS MAPPING =====
ROLE_PERMISSIONS = {
Role.GUEST: {
Permission.LOGIN,
Permission.VIEW_PRINTERS,
Permission.CREATE_GUEST_REQUEST,
Permission.VIEW_CALENDAR,
},
Role.USER: {
Permission.LOGIN,
Permission.VIEW_DASHBOARD,
Permission.VIEW_PRINTERS,
Permission.VIEW_JOBS,
Permission.CREATE_JOB,
Permission.EDIT_OWN_JOB,
Permission.DELETE_OWN_JOB,
Permission.EXTEND_JOB,
Permission.CANCEL_JOB,
Permission.VIEW_STATS,
Permission.VIEW_CALENDAR,
Permission.CREATE_GUEST_REQUEST,
},
}
# Power User erweitert User-Permissions
ROLE_PERMISSIONS[Role.POWER_USER] = ROLE_PERMISSIONS[Role.USER] | {
Permission.VIEW_PRINTER_DETAILS,
Permission.VIEW_JOB_HISTORY,
Permission.VIEW_DETAILED_STATS,
Permission.EXPORT_STATS,
Permission.VIEW_GUEST_REQUESTS,
}
# Technician erweitert Power User-Permissions
ROLE_PERMISSIONS[Role.TECHNICIAN] = ROLE_PERMISSIONS[Role.POWER_USER] | {
Permission.CONTROL_PRINTER,
Permission.EDIT_PRINTER,
Permission.SCHEDULE_MAINTENANCE,
Permission.VIEW_MAINTENANCE,
Permission.PERFORM_MAINTENANCE,
Permission.EDIT_CALENDAR,
}
# Supervisor erweitert Technician-Permissions
ROLE_PERMISSIONS[Role.SUPERVISOR] = ROLE_PERMISSIONS[Role.TECHNICIAN] | {
Permission.CREATE_PRINTER,
Permission.EDIT_ALL_JOBS,
Permission.DELETE_ALL_JOBS,
Permission.VIEW_USERS,
Permission.APPROVE_GUEST_REQUEST,
Permission.DENY_GUEST_REQUEST,
Permission.MANAGE_GUEST_REQUESTS,
Permission.MANAGE_SHIFTS,
Permission.VIEW_USER_DETAILS,
Permission.APPROVE_JOBS, # Jobs genehmigen und verwalten
}
# Admin erweitert Supervisor-Permissions
ROLE_PERMISSIONS[Role.ADMIN] = ROLE_PERMISSIONS[Role.SUPERVISOR] | {
Permission.DELETE_PRINTER,
Permission.VIEW_ADMIN_PANEL,
Permission.CREATE_USER,
Permission.EDIT_USER,
Permission.DELETE_USER,
Permission.EXPORT_DATA,
Permission.VIEW_LOGS,
Permission.MANAGE_SETTINGS,
Permission.ADMIN, # Allgemeine Admin-Berechtigung hinzufügen
}
# Super Admin hat alle Berechtigungen
ROLE_PERMISSIONS[Role.SUPER_ADMIN] = {perm for perm in Permission}
# ===== DATABASE MODELS EXTENSIONS =====
# Metadata für die Tabellen erstellen
metadata = MetaData()
# Many-to-Many Tabelle für User-Permissions
user_permissions = Table('user_permissions', metadata,
Column('user_id', Integer, ForeignKey('users.id'), primary_key=True),
Column('permission_id', Integer, ForeignKey('permissions.id'), primary_key=True)
)
# Many-to-Many Tabelle für User-Roles
user_roles = Table('user_roles', metadata,
Column('user_id', Integer, ForeignKey('users.id'), primary_key=True),
Column('role_id', Integer, ForeignKey('roles.id'), primary_key=True)
)
class PermissionModel:
"""Datenbank-Model für Berechtigungen"""
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True)
name = Column(String(100), unique=True, nullable=False)
description = Column(String(255))
category = Column(String(50)) # Gruppierung von Berechtigungen
created_at = Column(DateTime, default=datetime.now)
class RoleModel:
"""Datenbank-Model für Rollen"""
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
display_name = Column(String(100))
description = Column(String(255))
is_system_role = Column(Boolean, default=False) # System-Rollen können nicht gelöscht werden
created_at = Column(DateTime, default=datetime.now)
# Relationships
permissions = relationship("PermissionModel", secondary="role_permissions", back_populates="roles")
class UserPermissionOverride:
"""Temporäre oder spezielle Berechtigungsüberschreibungen"""
__tablename__ = 'user_permission_overrides'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
permission = Column(String(100), nullable=False)
granted = Column(Boolean, nullable=False) # True = gewährt, False = verweigert
reason = Column(String(255))
granted_by = Column(Integer, ForeignKey('users.id'))
expires_at = Column(DateTime, nullable=True) # NULL = permanent
created_at = Column(DateTime, default=datetime.now)
# ===== PERMISSION CHECKER CLASS =====
class PermissionChecker:
"""Hauptklasse für Berechtigungsprüfungen"""
def __init__(self, user=None):
self.user = user or current_user
self._permission_cache = {}
self._cache_timeout = timedelta(minutes=5)
self._cache_timestamp = None
def has_permission(self, permission: Permission) -> bool:
"""
Prüft ob der Benutzer eine bestimmte Berechtigung hat
Args:
permission: Die zu prüfende Berechtigung
Returns:
bool: True wenn Berechtigung vorhanden
"""
if not self.user or not self.user.is_authenticated:
return False
# Cache prüfen
if self._is_cache_valid() and permission.value in self._permission_cache:
return self._permission_cache[permission.value]
# Berechtigungen neu berechnen
has_perm = self._calculate_permission(permission)
# Cache aktualisieren
self._update_cache(permission.value, has_perm)
return has_perm
def _calculate_permission(self, permission: Permission) -> bool:
"""Berechnet ob eine Berechtigung vorhanden ist"""
# Super Admin hat alle Rechte
if hasattr(self.user, 'is_super_admin') and self.user.is_super_admin:
return True
# Explizite Überschreibungen prüfen
override = self._check_permission_override(permission)
if override is not None:
return override
# Rollen-basierte Berechtigungen prüfen
user_roles = self._get_user_roles()
for role in user_roles:
if permission in ROLE_PERMISSIONS.get(role, set()):
return True
# Direkte Benutzer-Berechtigungen prüfen
if hasattr(self.user, 'permissions'):
user_permissions = [Permission(p.name) for p in self.user.permissions if hasattr(Permission, p.name.upper())]
if permission in user_permissions:
return True
return False
def _check_permission_override(self, permission: Permission) -> Optional[bool]:
"""Prüft ob es eine Berechtigungsüberschreibung gibt"""
if not hasattr(self.user, 'permission_overrides'):
return None
now = datetime.now()
for override in self.user.permission_overrides:
if (override.permission == permission.value and
(override.expires_at is None or override.expires_at > now)):
logger.info(f"Permission override angewendet: {permission.value} = {override.granted} für User {self.user.id}")
return override.granted
return None
def _get_user_roles(self) -> List[Role]:
"""Holt die Rollen des Benutzers"""
roles = []
# Legacy Admin-Check
if hasattr(self.user, 'is_admin') and self.user.is_admin:
roles.append(Role.ADMIN)
# Neue Rollen-System
if hasattr(self.user, 'roles'):
for role_model in self.user.roles:
try:
role = Role(role_model.name)
roles.append(role)
except ValueError:
logger.warning(f"Unbekannte Rolle: {role_model.name}")
# Standard-Rolle wenn keine andere definiert
if not roles:
roles.append(Role.USER)
return roles
def _is_cache_valid(self) -> bool:
"""Prüft ob der Permission-Cache noch gültig ist"""
if self._cache_timestamp is None:
return False
return datetime.now() - self._cache_timestamp < self._cache_timeout
def _update_cache(self, permission: str, has_permission: bool):
"""Aktualisiert den Permission-Cache"""
if self._cache_timestamp is None or not self._is_cache_valid():
self._permission_cache = {}
self._cache_timestamp = datetime.now()
self._permission_cache[permission] = has_permission
def get_all_permissions(self) -> Set[Permission]:
"""Gibt alle Berechtigungen des Benutzers zurück"""
permissions = set()
for permission in Permission:
if self.has_permission(permission):
permissions.add(permission)
return permissions
def can_access_resource(self, resource_type: str, resource_id: int = None, action: str = "view") -> bool:
"""
Prüft Zugriff auf spezifische Ressourcen
Args:
resource_type: Art der Ressource (job, printer, user, etc.)
resource_id: ID der Ressource (optional)
action: Aktion (view, edit, delete, etc.)
Returns:
bool: True wenn Zugriff erlaubt
"""
# Resource-spezifische Logik
if resource_type == "job":
return self._check_job_access(resource_id, action)
elif resource_type == "printer":
return self._check_printer_access(resource_id, action)
elif resource_type == "user":
return self._check_user_access(resource_id, action)
return False
def _check_job_access(self, job_id: int, action: str) -> bool:
"""Prüft Job-spezifische Zugriffsrechte"""
if action == "view":
if self.has_permission(Permission.VIEW_JOBS):
return True
elif action == "edit":
if self.has_permission(Permission.EDIT_ALL_JOBS):
return True
if self.has_permission(Permission.EDIT_OWN_JOB) and job_id:
# Prüfen ob eigener Job (vereinfacht)
return self._is_own_job(job_id)
elif action == "delete":
if self.has_permission(Permission.DELETE_ALL_JOBS):
return True
if self.has_permission(Permission.DELETE_OWN_JOB) and job_id:
return self._is_own_job(job_id)
return False
def _check_printer_access(self, printer_id: int, action: str) -> bool:
"""Prüft Drucker-spezifische Zugriffsrechte"""
if action == "view":
return self.has_permission(Permission.VIEW_PRINTERS)
elif action == "edit":
return self.has_permission(Permission.EDIT_PRINTER)
elif action == "delete":
return self.has_permission(Permission.DELETE_PRINTER)
elif action == "control":
return self.has_permission(Permission.CONTROL_PRINTER)
return False
def _check_user_access(self, user_id: int, action: str) -> bool:
"""Prüft Benutzer-spezifische Zugriffsrechte"""
if action == "view":
if self.has_permission(Permission.VIEW_USERS):
return True
# Eigenes Profil ansehen
if user_id == self.user.id:
return True
elif action == "edit":
if self.has_permission(Permission.EDIT_USER):
return True
# Eigenes Profil bearbeiten (begrenzt)
if user_id == self.user.id:
return True
elif action == "delete":
if self.has_permission(Permission.DELETE_USER) and user_id != self.user.id:
return True
return False
def _is_own_job(self, job_id: int) -> bool:
"""Hilfsfunktion um zu prüfen ob Job dem Benutzer gehört"""
# Vereinfachte Implementierung - sollte mit DB-Query implementiert werden
try:
from models import Job, get_db_session
db_session = get_db_session()
job = db_session.query(Job).filter(Job.id == job_id).first()
db_session.close()
return job and (job.user_id == self.user.id or job.owner_id == self.user.id)
except Exception as e:
logger.error(f"Fehler bei Job-Ownership-Check: {e}")
return False
# ===== DECORATORS =====
def require_permission(permission: Permission):
"""
Decorator der eine bestimmte Berechtigung erfordert
Args:
permission: Die erforderliche Berechtigung
"""
def decorator(f):
@wraps(f)
@login_required
def wrapper(*args, **kwargs):
checker = PermissionChecker()
if not checker.has_permission(permission):
logger.warning(f"Zugriff verweigert: User {current_user.id} hat keine Berechtigung {permission.value}")
if request.path.startswith('/api/'):
return jsonify({
'error': 'Insufficient permissions',
'message': f'Berechtigung "{permission.value}" erforderlich',
'required_permission': permission.value
}), 403
else:
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
def require_role(role: Role):
"""
Decorator der eine bestimmte Rolle erfordert
Args:
role: Die erforderliche Rolle
"""
def decorator(f):
@wraps(f)
@login_required
def wrapper(*args, **kwargs):
checker = PermissionChecker()
user_roles = checker._get_user_roles()
if role not in user_roles:
logger.warning(f"Zugriff verweigert: User {current_user.id} hat nicht die Rolle {role.value}")
if request.path.startswith('/api/'):
return jsonify({
'error': 'Insufficient role',
'message': f'Rolle "{role.value}" erforderlich',
'required_role': role.value
}), 403
else:
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
def require_resource_access(resource_type: str, action: str = "view"):
"""
Decorator für ressourcen-spezifische Berechtigungsprüfung
Args:
resource_type: Art der Ressource
action: Erforderliche Aktion
"""
def decorator(f):
@wraps(f)
@login_required
def wrapper(*args, **kwargs):
# Resource ID aus URL-Parametern extrahieren
resource_id = kwargs.get('id') or kwargs.get(f'{resource_type}_id')
checker = PermissionChecker()
if not checker.can_access_resource(resource_type, resource_id, action):
logger.warning(f"Ressourcen-Zugriff verweigert: User {current_user.id}, {resource_type}:{resource_id}, Action: {action}")
if request.path.startswith('/api/'):
return jsonify({
'error': 'Resource access denied',
'message': f'Zugriff auf {resource_type} nicht erlaubt',
'resource_type': resource_type,
'action': action
}), 403
else:
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
# ===== UTILITY FUNCTIONS =====
def check_permission(permission: Permission, user=None) -> bool:
"""
Standalone-Funktion zur Berechtigungsprüfung
Args:
permission: Die zu prüfende Berechtigung
user: Benutzer (optional, default: current_user)
Returns:
bool: True wenn Berechtigung vorhanden
"""
checker = PermissionChecker(user)
return checker.has_permission(permission)
def get_user_permissions(user=None) -> Set[Permission]:
"""
Gibt alle Berechtigungen eines Benutzers zurück
Args:
user: Benutzer (optional, default: current_user)
Returns:
Set[Permission]: Alle Berechtigungen des Benutzers
"""
checker = PermissionChecker(user)
return checker.get_all_permissions()
def grant_temporary_permission(user_id: int, permission: Permission, duration_hours: int = 24, reason: str = "", granted_by_id: int = None):
"""
Gewährt temporäre Berechtigung
Args:
user_id: ID des Benutzers
permission: Die zu gewährende Berechtigung
duration_hours: Dauer in Stunden
reason: Begründung
granted_by_id: ID des gewährenden Benutzers
"""
try:
from models import get_db_session
db_session = get_db_session()
override = UserPermissionOverride(
user_id=user_id,
permission=permission.value,
granted=True,
reason=reason,
granted_by=granted_by_id or (current_user.id if current_user.is_authenticated else None),
expires_at=datetime.now() + timedelta(hours=duration_hours)
)
db_session.add(override)
db_session.commit()
db_session.close()
logger.info(f"Temporäre Berechtigung gewährt: {permission.value} für User {user_id} ({duration_hours}h)")
except Exception as e:
logger.error(f"Fehler beim Gewähren temporärer Berechtigung: {e}")
# ===== TEMPLATE HELPERS =====
def init_permission_helpers(app):
"""
Registriert Template-Helper für Berechtigungen
Args:
app: Flask-App-Instanz
"""
@app.template_global()
def has_permission(permission_name: str) -> bool:
"""Template Helper für Berechtigungsprüfung"""
try:
permission = Permission(permission_name)
return check_permission(permission)
except ValueError:
return False
@app.template_global()
def has_role(role_name: str) -> bool:
"""Template Helper für Rollenprüfung"""
try:
role = Role(role_name)
checker = PermissionChecker()
return role in checker._get_user_roles()
except ValueError:
return False
@app.template_global()
def can_access(resource_type: str, resource_id: int = None, action: str = "view") -> bool:
"""Template Helper für Ressourcen-Zugriff"""
checker = PermissionChecker()
return checker.can_access_resource(resource_type, resource_id, action)
logger.info("🔐 Permission Template Helpers registriert")

View File

@ -1,423 +0,0 @@
"""
Live-Drucker-Monitor für MYP Platform
Überwacht Druckerstatus in Echtzeit mit Session-Caching und automatischer Steckdosen-Initialisierung.
"""
import time
import threading
import requests
import subprocess
import ipaddress
from datetime import datetime, timedelta
from typing import Dict, Tuple, List, Optional
from flask import session
from sqlalchemy import func
from sqlalchemy.orm import Session
import os
from models import get_db_session, Printer, PlugStatusLog
from utils.logging_config import get_logger
from utils.settings import PRINTERS, TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_AUTO_DISCOVERY
from utils.tapo_controller import tapo_controller
# TP-Link Tapo P110 Unterstützung prüfen
try:
from PyP100 import PyP100
TAPO_AVAILABLE = True
except ImportError:
TAPO_AVAILABLE = False
# Logger initialisieren
monitor_logger = get_logger("printer_monitor")
class PrinterMonitor:
"""
Live-Drucker-Monitor mit Session-Caching und automatischer Initialisierung.
"""
def __init__(self):
self.session_cache = {} # Session-basierter Cache für schnelle Zugriffe
self.db_cache = {} # Datenbank-Cache für persistente Daten
self.cache_lock = threading.Lock()
self.last_db_sync = datetime.now()
self.monitoring_active = False
self.monitor_thread = None
self.startup_initialized = False
self.auto_discovered_tapo = False
# Cache-Konfiguration
self.session_cache_ttl = 30 # 30 Sekunden für Session-Cache
self.db_cache_ttl = 300 # 5 Minuten für DB-Cache
monitor_logger.info("🖨️ Drucker-Monitor initialisiert")
# Automatische Steckdosenerkennung in separatem Thread starten, falls aktiviert
if TAPO_AUTO_DISCOVERY:
discovery_thread = threading.Thread(
target=self._run_auto_discovery,
daemon=True,
name="TapoAutoDiscovery"
)
discovery_thread.start()
monitor_logger.info("🔍 Automatische Tapo-Erkennung in separatem Thread gestartet")
def _run_auto_discovery(self):
"""
Führt die automatische Tapo-Erkennung in einem separaten Thread aus.
"""
try:
# Kurze Verzögerung um sicherzustellen, dass die Hauptanwendung Zeit hat zu starten
time.sleep(2)
self.auto_discover_tapo_outlets()
except Exception as e:
monitor_logger.error(f"❌ Fehler bei automatischer Tapo-Erkennung: {str(e)}")
def initialize_all_outlets_on_startup(self) -> Dict[str, bool]:
"""
Schaltet beim Programmstart alle gespeicherten Steckdosen aus (gleicher Startzustand).
Returns:
Dict[str, bool]: Ergebnis der Initialisierung pro Drucker
"""
if self.startup_initialized:
monitor_logger.info("🔄 Steckdosen bereits beim Start initialisiert")
return {}
# Verwende zentrale tapo_controller Implementierung
results = tapo_controller.initialize_all_outlets()
self.startup_initialized = True
return results
def _turn_outlet_off(self, ip_address: str, username: str, password: str, timeout: int = 5, printer_id: int = None) -> bool:
"""
Schaltet eine TP-Link Tapo P110-Steckdose aus.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose (wird überschrieben)
password: Passwort für die Steckdose (wird überschrieben)
timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat)
printer_id: ID des zugehörigen Druckers (für Logging)
Returns:
bool: True wenn erfolgreich ausgeschaltet
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller.turn_off(ip_address, username, password, printer_id)
def get_live_printer_status(self, use_session_cache: bool = True) -> Dict[int, Dict]:
"""
Holt Live-Druckerstatus mit Session- und DB-Caching.
Args:
use_session_cache: Ob Session-Cache verwendet werden soll
Returns:
Dict[int, Dict]: Status-Dict mit Drucker-ID als Key
"""
current_time = datetime.now()
# Session-Cache prüfen (nur wenn aktiviert)
if use_session_cache and hasattr(session, 'get'):
session_key = "printer_status_cache"
session_timestamp_key = "printer_status_timestamp"
cached_data = session.get(session_key)
cached_timestamp = session.get(session_timestamp_key)
if cached_data and cached_timestamp:
cache_age = (current_time - datetime.fromisoformat(cached_timestamp)).total_seconds()
if cache_age < self.session_cache_ttl:
monitor_logger.debug("📋 Verwende Session-Cache für Druckerstatus")
return cached_data
# DB-Cache prüfen
with self.cache_lock:
if self.db_cache and (current_time - self.last_db_sync).total_seconds() < self.db_cache_ttl:
monitor_logger.debug("🗃️ Verwende DB-Cache für Druckerstatus")
# Session-Cache aktualisieren
if use_session_cache and hasattr(session, '__setitem__'):
session["printer_status_cache"] = self.db_cache
session["printer_status_timestamp"] = current_time.isoformat()
return self.db_cache
# Live-Status von Druckern abrufen
monitor_logger.info("🔄 Aktualisiere Live-Druckerstatus...")
status_dict = self._fetch_live_printer_status()
# Caches aktualisieren
with self.cache_lock:
self.db_cache = status_dict
self.last_db_sync = current_time
if use_session_cache and hasattr(session, '__setitem__'):
session["printer_status_cache"] = status_dict
session["printer_status_timestamp"] = current_time.isoformat()
return status_dict
def _fetch_live_printer_status(self) -> Dict[int, Dict]:
"""
Holt den aktuellen Status aller Drucker direkt von den Geräten.
Returns:
Dict[int, Dict]: Status-Dict mit umfassenden Informationen
"""
status_dict = {}
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
# Wenn keine aktiven Drucker vorhanden sind, gebe leeres Dict zurück
if not printers:
monitor_logger.info(" Keine aktiven Drucker gefunden")
db_session.close()
return status_dict
monitor_logger.info(f"🔍 Prüfe Status von {len(printers)} aktiven Druckern...")
# Parallel-Status-Prüfung mit ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
# Sicherstellen, dass max_workers mindestens 1 ist
max_workers = min(max(len(printers), 1), 8)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_printer = {
executor.submit(self._check_single_printer_status, printer): printer
for printer in printers
}
for future in as_completed(future_to_printer, timeout=15):
printer = future_to_printer[future]
try:
status_info = future.result()
status_dict[printer.id] = status_info
# Status in Datenbank aktualisieren
printer.status = status_info["status"]
printer.last_checked = datetime.now()
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Status-Check für Drucker {printer.name}: {str(e)}")
status_dict[printer.id] = {
"id": printer.id,
"name": printer.name,
"status": "offline",
"active": False,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"location": printer.location,
"last_checked": datetime.now().isoformat(),
"error": str(e)
}
# Änderungen in Datenbank speichern
db_session.commit()
db_session.close()
monitor_logger.info(f"✅ Status-Update abgeschlossen für {len(status_dict)} Drucker")
except Exception as e:
monitor_logger.error(f"❌ Kritischer Fehler beim Abrufen des Live-Status: {str(e)}")
return status_dict
def _check_single_printer_status(self, printer: Printer, timeout: int = 7) -> Dict:
"""
Überprüft den Status eines einzelnen Druckers basierend auf der Steckdosen-Logik:
- Steckdose erreichbar aber AUS = Drucker ONLINE (bereit zum Drucken)
- Steckdose erreichbar und AN = Drucker PRINTING (druckt gerade)
- Steckdose nicht erreichbar = Drucker OFFLINE (kritischer Fehler)
Args:
printer: Printer-Objekt aus der Datenbank
timeout: Timeout in Sekunden
Returns:
Dict: Umfassende Status-Informationen
"""
status_info = {
"id": printer.id,
"name": printer.name,
"status": "offline",
"active": False,
"ip_address": printer.ip_address,
"plug_ip": printer.plug_ip,
"location": printer.location,
"last_checked": datetime.now().isoformat(),
"ping_successful": False,
"outlet_reachable": False,
"outlet_state": "unknown"
}
try:
# 1. Ping-Test für Grundkonnektivität
if printer.plug_ip:
ping_success = self._ping_address(printer.plug_ip, timeout=3)
status_info["ping_successful"] = ping_success
if ping_success:
# 2. Smart Plug Status prüfen
outlet_reachable, outlet_state = self._check_outlet_status(
printer.plug_ip,
printer.plug_username,
printer.plug_password,
timeout,
printer_id=printer.id
)
status_info["outlet_reachable"] = outlet_reachable
status_info["outlet_state"] = outlet_state
# 🎯 KORREKTE LOGIK: Steckdose erreichbar = Drucker funktionsfähig
if outlet_reachable:
if outlet_state == "off":
# Steckdose aus = Drucker ONLINE (bereit zum Drucken)
status_info["status"] = "online"
status_info["active"] = True
monitor_logger.debug(f"{printer.name}: ONLINE (Steckdose aus - bereit zum Drucken)")
elif outlet_state == "on":
# Steckdose an = Drucker PRINTING (druckt gerade)
status_info["status"] = "printing"
status_info["active"] = True
monitor_logger.debug(f"🖨️ {printer.name}: PRINTING (Steckdose an - druckt gerade)")
else:
# Unbekannter Steckdosen-Status
status_info["status"] = "error"
status_info["active"] = False
monitor_logger.warning(f"⚠️ {printer.name}: Unbekannter Steckdosen-Status '{outlet_state}'")
else:
# Steckdose nicht erreichbar = kritischer Fehler
status_info["status"] = "offline"
status_info["active"] = False
monitor_logger.warning(f"{printer.name}: OFFLINE (Steckdose nicht erreichbar)")
else:
# Ping fehlgeschlagen = Netzwerkproblem
status_info["status"] = "unreachable"
status_info["active"] = False
monitor_logger.warning(f"🔌 {printer.name}: UNREACHABLE (Ping fehlgeschlagen)")
else:
# Keine Steckdosen-IP konfiguriert
status_info["status"] = "unconfigured"
status_info["active"] = False
monitor_logger.info(f"⚙️ {printer.name}: UNCONFIGURED (keine Steckdosen-IP)")
except Exception as e:
monitor_logger.error(f"❌ Fehler bei Status-Check für {printer.name}: {str(e)}")
status_info["error"] = str(e)
status_info["status"] = "error"
status_info["active"] = False
return status_info
def _ping_address(self, ip_address: str, timeout: int = 3) -> bool:
"""
Führt einen Konnektivitätstest zu einer IP-Adresse durch.
Verwendet ausschließlich TCP-Verbindung statt Ping, um Encoding-Probleme zu vermeiden.
Args:
ip_address: Zu testende IP-Adresse
timeout: Timeout in Sekunden
Returns:
bool: True wenn Verbindung erfolgreich
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller.ping_address(ip_address, timeout)
def _check_outlet_status(self, ip_address: str, username: str, password: str, timeout: int = 5, printer_id: int = None) -> Tuple[bool, str]:
"""
Überprüft den Status einer TP-Link Tapo P110-Steckdose.
Args:
ip_address: IP-Adresse der Steckdose
username: Benutzername für die Steckdose
password: Passwort für die Steckdose
timeout: Timeout in Sekunden (wird ignoriert, da PyP100 eigenes Timeout hat)
printer_id: ID des zugehörigen Druckers (für Logging)
Returns:
Tuple[bool, str]: (Erreichbar, Status) - Status: "on", "off", "unknown"
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller.check_outlet_status(ip_address, username, password, printer_id)
def clear_all_caches(self):
"""Löscht alle Caches (Session und DB)."""
with self.cache_lock:
self.db_cache = {}
self.last_db_sync = datetime.now()
if hasattr(session, 'pop'):
session.pop("printer_status_cache", None)
session.pop("printer_status_timestamp", None)
monitor_logger.info("🧹 Alle Drucker-Caches gelöscht")
def get_printer_summary(self) -> Dict[str, int]:
"""
Gibt eine Zusammenfassung der Druckerstatus zurück.
Returns:
Dict[str, int]: Anzahl Drucker pro Status
"""
status_dict = self.get_live_printer_status()
summary = {
"total": len(status_dict),
"online": 0,
"offline": 0,
"printing": 0, # Neuer Status: Drucker druckt gerade
"standby": 0,
"unreachable": 0,
"unconfigured": 0,
"error": 0 # Status für unbekannte Fehler
}
for printer_info in status_dict.values():
status = printer_info.get("status", "offline")
if status in summary:
summary[status] += 1
else:
# Fallback für unbekannte Status
summary["offline"] += 1
return summary
def auto_discover_tapo_outlets(self) -> Dict[str, bool]:
"""
Automatische Erkennung und Konfiguration von TP-Link Tapo P110-Steckdosen im Netzwerk.
Robuste Version mit Timeout-Behandlung und Fehler-Resilience.
Returns:
Dict[str, bool]: Ergebnis der Steckdosenerkennung mit IP als Schlüssel
"""
if self.auto_discovered_tapo:
monitor_logger.info("🔍 Tapo-Steckdosen wurden bereits erkannt")
return {}
# Verwende zentrale tapo_controller Implementierung
results = tapo_controller.auto_discover_outlets()
self.auto_discovered_tapo = True
return results
def _ensure_tapo_in_database(self, ip_address: str, nickname: str = None) -> bool:
"""
Stellt sicher, dass eine erkannte Tapo-Steckdose in der Datenbank existiert.
Args:
ip_address: IP-Adresse der Steckdose
nickname: Name der Steckdose (optional)
Returns:
bool: True wenn erfolgreich in Datenbank gespeichert/aktualisiert
"""
# Verwende zentrale tapo_controller Implementierung
return tapo_controller._ensure_outlet_in_database(ip_address, nickname)
# Globale Instanz
printer_monitor = PrinterMonitor()

View File

@ -1,244 +0,0 @@
#!/usr/bin/env python3
"""
Rate Limiting System für MYP Platform
Schutz vor API-Missbrauch und DDoS-Attacken
"""
import time
import redis
import hashlib
from functools import wraps
from flask import request, jsonify, g
from typing import Dict, Optional
from dataclasses import dataclass
from utils.logging_config import get_logger
logger = get_logger("security")
@dataclass
class RateLimit:
"""Konfiguration für Rate-Limiting-Regeln"""
requests: int # Anzahl erlaubter Anfragen
per: int # Zeitraum in Sekunden
message: str # Fehlermeldung bei Überschreitung
# Rate-Limiting-Konfiguration
RATE_LIMITS = {
# API-Endpunkte
'api_general': RateLimit(100, 300, "Zu viele API-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
'api_auth': RateLimit(10, 300, "Zu viele Anmeldeversuche. Versuchen Sie es in 5 Minuten erneut."),
'api_upload': RateLimit(20, 3600, "Zu viele Upload-Anfragen. Versuchen Sie es in einer Stunde erneut."),
'api_admin': RateLimit(200, 300, "Zu viele Admin-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
# Spezielle Endpunkte
'printer_status': RateLimit(300, 300, "Zu viele Drucker-Status-Anfragen."),
'job_creation': RateLimit(50, 3600, "Zu viele Job-Erstellungen. Versuchen Sie es in einer Stunde erneut."),
# Drucker-Monitor Rate-Limits (gelockert für Live-Updates)
'printer_monitor_live': RateLimit(30, 60, "Zu viele Live-Status-Anfragen. Versuchen Sie es in einer Minute erneut."),
'printer_monitor_summary': RateLimit(60, 60, "Zu viele Zusammenfassungs-Anfragen. Versuchen Sie es in einer Minute erneut."),
'printer_monitor_cache': RateLimit(10, 120, "Zu viele Cache-Lösch-Anfragen. Versuchen Sie es in 2 Minuten erneut."),
'printer_monitor_init': RateLimit(5, 300, "Zu viele Initialisierungs-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
# Sicherheitskritische Endpunkte
'password_reset': RateLimit(3, 3600, "Zu viele Passwort-Reset-Anfragen. Versuchen Sie es in einer Stunde erneut."),
'user_creation': RateLimit(10, 3600, "Zu viele Benutzer-Erstellungen.")
}
class RateLimiter:
"""
In-Memory Rate Limiter mit optionaler Redis-Unterstützung
"""
def __init__(self, use_redis: bool = False, redis_url: str = None):
self.use_redis = use_redis
self.redis_client = None
self.memory_store: Dict[str, Dict] = {}
if use_redis and redis_url:
try:
import redis
self.redis_client = redis.from_url(redis_url, decode_responses=True)
logger.info("✅ Redis-basiertes Rate Limiting aktiviert")
except ImportError:
logger.warning("⚠️ Redis nicht verfügbar, verwende In-Memory Rate Limiting")
self.use_redis = False
except Exception as e:
logger.error(f"❌ Redis-Verbindung fehlgeschlagen: {e}")
self.use_redis = False
def _get_client_id(self) -> str:
"""
Generiert eine eindeutige Client-ID basierend auf IP und User-Agent
"""
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
user_agent = request.headers.get('User-Agent', '')
# Hash für Anonymisierung
client_string = f"{ip}:{user_agent}"
return hashlib.sha256(client_string.encode()).hexdigest()[:16]
def _get_key(self, limit_type: str, client_id: str) -> str:
"""Erstellt Redis/Memory-Key für Rate-Limiting"""
return f"rate_limit:{limit_type}:{client_id}"
def _get_current_requests(self, key: str, window_start: int) -> int:
"""Holt aktuelle Anfragen-Anzahl"""
if self.use_redis and self.redis_client:
try:
# Redis-basierte Implementierung
pipe = self.redis_client.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
_, count = pipe.execute()
return count
except Exception as e:
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
self.use_redis = False
# In-Memory Implementierung
if key not in self.memory_store:
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
# Alte Einträge bereinigen
current_time = time.time()
data = self.memory_store[key]
data['requests'] = [req_time for req_time in data['requests'] if req_time > window_start]
return len(data['requests'])
def _add_request(self, key: str, current_time: int, expire_time: int):
"""Fügt neue Anfrage hinzu"""
if self.use_redis and self.redis_client:
try:
pipe = self.redis_client.pipeline()
pipe.zadd(key, {str(current_time): current_time})
pipe.expire(key, expire_time)
pipe.execute()
return
except Exception as e:
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
self.use_redis = False
# In-Memory Implementierung
if key not in self.memory_store:
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
self.memory_store[key]['requests'].append(current_time)
def is_allowed(self, limit_type: str) -> tuple[bool, Dict]:
"""
Prüft ob eine Anfrage erlaubt ist
Returns:
(is_allowed, info_dict)
"""
if limit_type not in RATE_LIMITS:
return True, {}
rate_limit = RATE_LIMITS[limit_type]
client_id = self._get_client_id()
key = self._get_key(limit_type, client_id)
current_time = int(time.time())
window_start = current_time - rate_limit.per
# Aktuelle Anfragen zählen
current_requests = self._get_current_requests(key, window_start)
# Limite prüfen
if current_requests >= rate_limit.requests:
logger.warning(f"🚨 Rate limit exceeded: {limit_type} für Client {client_id[:8]}...")
return False, {
'limit': rate_limit.requests,
'remaining': 0,
'reset_time': current_time + rate_limit.per,
'message': rate_limit.message
}
# Anfrage hinzufügen
self._add_request(key, current_time, rate_limit.per)
return True, {
'limit': rate_limit.requests,
'remaining': rate_limit.requests - current_requests - 1,
'reset_time': current_time + rate_limit.per
}
def cleanup_memory(self):
"""Bereinigt alte In-Memory-Einträge"""
if self.use_redis:
return
current_time = time.time()
keys_to_delete = []
for key, data in self.memory_store.items():
# Bereinige alle Einträge älter als 24 Stunden
if current_time - data.get('last_cleanup', 0) > 86400:
keys_to_delete.append(key)
for key in keys_to_delete:
del self.memory_store[key]
# Globale Rate-Limiter-Instanz
rate_limiter = RateLimiter()
def limit_requests(limit_type: str):
"""
Decorator für Rate-Limiting von API-Endpunkten
Args:
limit_type: Art des Limits (siehe RATE_LIMITS)
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Rate-Limiting prüfen
is_allowed, info = rate_limiter.is_allowed(limit_type)
if not is_allowed:
response = jsonify({
'error': 'Rate limit exceeded',
'message': info['message'],
'retry_after': info['reset_time'] - int(time.time())
})
response.status_code = 429
response.headers['Retry-After'] = str(info['reset_time'] - int(time.time()))
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
return response
# Rate-Limiting-Headers zu Response hinzufügen
response = f(*args, **kwargs)
if hasattr(response, 'headers'):
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
return response
return wrapper
return decorator
def get_client_info() -> Dict:
"""
Gibt Client-Informationen für Rate-Limiting zurück
"""
client_id = rate_limiter._get_client_id()
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
return {
'client_id': client_id,
'ip_address': ip,
'user_agent': request.headers.get('User-Agent', ''),
'timestamp': int(time.time())
}
# Maintenance-Task für Memory-Cleanup
def cleanup_rate_limiter():
"""Periodische Bereinigung des Rate-Limiters"""
rate_limiter.cleanup_memory()
logger.debug("🧹 Rate-Limiter Memory bereinigt")

View File

@ -1,338 +0,0 @@
#!/usr/bin/env python3
"""
Security Utilities für MYP Platform
Content Security Policy (CSP), Security Headers und weitere Sicherheitsmaßnahmen
"""
import secrets
import hashlib
from flask import request, g, session
from functools import wraps
from typing import Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger("security")
# Content Security Policy Konfiguration
CSP_POLICY = {
'default-src': ["'self'"],
'script-src': [
"'self'",
"'unsafe-inline'", # Für inline Scripts (wird nur verwendet wenn keine Nonce vorhanden)
"https://cdn.jsdelivr.net", # Für externe Libraries
"https://unpkg.com" # Für Fallback-Libraries
],
'style-src': [
"'self'",
"'unsafe-inline'", # Für Tailwind und Dynamic Styles
"https://fonts.googleapis.com"
],
'img-src': [
"'self'",
"data:", # Für SVG Data URLs
"blob:", # Für dynamisch generierte Bilder
"https:" # HTTPS-Bilder erlauben
],
'font-src': [
"'self'",
"https://fonts.gstatic.com",
"data:" # Für eingebettete Fonts
],
'connect-src': [
"'self'",
"ws:", # WebSocket für lokale Entwicklung
"wss:", # Sichere WebSockets
"http://localhost:*", # Lokale Entwicklung
"http://127.0.0.1:*", # Lokale Entwicklung
"https://localhost:*", # Lokale Entwicklung HTTPS
"https://127.0.0.1:*" # Lokale Entwicklung HTTPS
],
'media-src': ["'self'"],
'object-src': ["'none'"], # Flash und andere Plugins blockieren
'base-uri': ["'self'"],
'form-action': ["'self'"],
'frame-ancestors': ["'none'"], # Clickjacking-Schutz
'upgrade-insecure-requests': False, # Für lokale Entwicklung deaktiviert
'block-all-mixed-content': False # Für lokale Entwicklung deaktiviert
}
# Security Headers Konfiguration
SECURITY_HEADERS = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': (
'geolocation=(), '
'microphone=(), '
'camera=(), '
'payment=(), '
'usb=(), '
'accelerometer=(), '
'gyroscope=(), '
'magnetometer=()'
),
'Cross-Origin-Embedder-Policy': 'require-corp',
'Cross-Origin-Opener-Policy': 'same-origin',
'Cross-Origin-Resource-Policy': 'same-origin'
}
class SecurityManager:
"""
Zentrale Sicherheitsverwaltung für MYP Platform
"""
def __init__(self):
self.nonce_store: Dict[str, str] = {}
def generate_nonce(self) -> str:
"""Generiert eine sichere Nonce für CSP"""
nonce = secrets.token_urlsafe(32)
# Nonce in Session speichern für Validierung
if 'security_nonces' not in session:
session['security_nonces'] = []
session['security_nonces'].append(nonce)
# Maximal 10 Nonces pro Session
if len(session['security_nonces']) > 10:
session['security_nonces'] = session['security_nonces'][-10:]
return nonce
def validate_nonce(self, nonce: str) -> bool:
"""Validiert eine Nonce"""
if 'security_nonces' not in session:
return False
return nonce in session['security_nonces']
def build_csp_header(self, nonce: Optional[str] = None, use_nonce: bool = False) -> str:
"""
Erstellt den Content-Security-Policy Header
Args:
nonce: Optional CSP nonce für inline scripts
use_nonce: Ob Nonces verwendet werden sollen (deaktiviert dann 'unsafe-inline')
Returns:
CSP Header String
"""
csp_parts = []
for directive, values in CSP_POLICY.items():
if directive in ['upgrade-insecure-requests', 'block-all-mixed-content']:
if values:
csp_parts.append(directive.replace('_', '-'))
continue
if isinstance(values, list):
directive_values = values.copy()
# Nonce für script-src hinzufügen nur wenn explizit gewünscht
if directive == 'script-src' and nonce and use_nonce:
directive_values.append(f"'nonce-{nonce}'")
# 'unsafe-inline' entfernen wenn Nonce verwendet wird
if "'unsafe-inline'" in directive_values:
directive_values.remove("'unsafe-inline'")
csp_parts.append(f"{directive.replace('_', '-')} {' '.join(directive_values)}")
return "; ".join(csp_parts)
def get_client_fingerprint(self) -> str:
"""
Erstellt einen Client-Fingerprint für erweiterte Sicherheit
"""
components = [
request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr),
request.headers.get('User-Agent', ''),
request.headers.get('Accept-Language', ''),
request.headers.get('Accept-Encoding', '')
]
fingerprint_string = '|'.join(components)
return hashlib.sha256(fingerprint_string.encode()).hexdigest()[:32]
def check_suspicious_activity(self) -> bool:
"""
Prüft auf verdächtige Aktivitäten
"""
# SQL Injection Patterns
sql_patterns = [
'union select', 'drop table', 'insert into', 'delete from',
'script>', '<iframe', 'javascript:', 'vbscript:',
'onload=', 'onerror=', 'onclick='
]
# Request-Daten prüfen
request_data = str(request.args) + str(request.form) + str(request.json or {})
request_data_lower = request_data.lower()
for pattern in sql_patterns:
if pattern in request_data_lower:
logger.warning(f"🚨 Verdächtige Aktivität erkannt: {pattern} von {request.remote_addr}")
return True
# Übermäßig große Requests
if len(request_data) > 50000: # 50KB Limit
logger.warning(f"🚨 Übermäßig große Anfrage von {request.remote_addr}: {len(request_data)} bytes")
return True
return False
def log_security_event(self, event_type: str, details: Dict):
"""
Protokolliert Sicherheitsereignisse
"""
security_data = {
'event_type': event_type,
'ip_address': request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr),
'user_agent': request.headers.get('User-Agent', ''),
'timestamp': request.environ.get('REQUEST_START_TIME'),
'fingerprint': self.get_client_fingerprint(),
**details
}
logger.warning(f"🔒 Sicherheitsereignis: {event_type} - {security_data}")
# Globale Security Manager Instanz
security_manager = SecurityManager()
def apply_security_headers(response):
"""
Wendet Sicherheits-Headers auf Response an
"""
# Standard Security Headers
for header, value in SECURITY_HEADERS.items():
response.headers[header] = value
# Content Security Policy - für Entwicklung weniger restriktiv
nonce = getattr(g, 'csp_nonce', None)
# In der Entwicklung verwenden wir keine Nonces, um 'unsafe-inline' zu erhalten
use_nonce = False # In Produktion auf True setzen für bessere Sicherheit
csp_header = security_manager.build_csp_header(nonce, use_nonce)
response.headers['Content-Security-Policy'] = csp_header
# HSTS nur für HTTPS und Produktion
if request.is_secure and not request.environ.get('FLASK_ENV') == 'development':
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains; preload'
return response
def security_check(check_suspicious: bool = True):
"""
Decorator für Sicherheitsprüfungen
Args:
check_suspicious: Ob auf verdächtige Aktivitäten geprüft werden soll
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Verdächtige Aktivitäten prüfen
if check_suspicious and security_manager.check_suspicious_activity():
security_manager.log_security_event('suspicious_request', {
'endpoint': request.endpoint,
'method': request.method,
'args': dict(request.args),
'form': dict(request.form)
})
from flask import jsonify
return jsonify({
'error': 'Verdächtige Anfrage erkannt',
'message': 'Ihre Anfrage wurde aus Sicherheitsgründen blockiert.'
}), 400
return f(*args, **kwargs)
return wrapper
return decorator
def require_secure_headers(f):
"""
Decorator der sicherstellt, dass Security Headers gesetzt werden
"""
@wraps(f)
def wrapper(*args, **kwargs):
# CSP Nonce generieren
g.csp_nonce = security_manager.generate_nonce()
response = f(*args, **kwargs)
# Security Headers anwenden
if hasattr(response, 'headers'):
response = apply_security_headers(response)
return response
return wrapper
def get_csp_nonce() -> str:
"""
Holt die aktuelle CSP Nonce für Templates
"""
return getattr(g, 'csp_nonce', '')
def validate_origin():
"""
Validiert die Origin des Requests
"""
origin = request.headers.get('Origin')
referer = request.headers.get('Referer')
host = request.headers.get('Host')
# Für API-Requests Origin prüfen
if request.path.startswith('/api/') and origin:
allowed_origins = [
f"http://{host}",
f"https://{host}",
"http://localhost:5000",
"http://127.0.0.1:5000"
]
if origin not in allowed_origins:
logger.warning(f"🚨 Ungültige Origin: {origin} für {request.path}")
return False
return True
# Template Helper für CSP Nonce
def csp_nonce():
"""Template Helper für CSP Nonce"""
return get_csp_nonce()
# Security Middleware für Flask App
def init_security(app):
"""
Initialisiert Sicherheitsfeatures für Flask App
"""
@app.before_request
def before_request_security():
"""Security Checks vor jedem Request"""
# Origin validieren
if not validate_origin():
from flask import jsonify
return jsonify({
'error': 'Invalid origin',
'message': 'Anfrage von ungültiger Quelle'
}), 403
# CSP Nonce generieren
g.csp_nonce = security_manager.generate_nonce()
@app.after_request
def after_request_security(response):
"""Security Headers nach jedem Request anwenden"""
return apply_security_headers(response)
# Template Helper registrieren
app.jinja_env.globals['csp_nonce'] = csp_nonce
logger.info("🔒 Security System initialisiert")
return app

View File

@ -0,0 +1,141 @@
#!/usr/bin/env python3.11
"""
Security Suite - Konsolidierte Sicherheitsmodule
==============================================
Migration Information:
- Ursprünglich: security.py, permissions.py, rate_limiter.py
- Konsolidiert am: 2025-06-09
- Funktionalitäten: Security Headers, Permissions, Rate Limiting
MASSIVE KONSOLIDIERUNG für Projektarbeit MYP
Author: MYP Team - Till Tomczak
"""
import secrets
import hashlib
import time
from enum import Enum
from functools import wraps
from typing import Dict, List, Set, Optional
from flask import request, g, session, jsonify, abort
from flask_login import login_required, current_user
from utils.logging_config import get_logger
# Logger
security_logger = get_logger("security_suite")
# ===== PERMISSIONS =====
class Permission(Enum):
"""Alle verfügbaren Berechtigungen"""
LOGIN = "login"
VIEW_DASHBOARD = "view_dashboard"
VIEW_PRINTERS = "view_printers"
ADMIN = "admin"
class Role(Enum):
"""Vordefinierte Rollen"""
GUEST = "guest"
USER = "user"
ADMIN = "admin"
# ===== SECURITY MANAGER =====
class SecurityManager:
"""Zentrale Sicherheitsverwaltung"""
def __init__(self):
self.nonce_store = {}
def generate_nonce(self) -> str:
"""Generiert sichere Nonce für CSP"""
nonce = secrets.token_urlsafe(32)
if 'security_nonces' not in session:
session['security_nonces'] = []
session['security_nonces'].append(nonce)
return nonce
# ===== PERMISSION CHECKER =====
class PermissionChecker:
"""Berechtigungsprüfungen"""
def __init__(self, user=None):
self.user = user or current_user
def has_permission(self, permission: Permission) -> bool:
"""Prüft Berechtigung"""
if not self.user or not self.user.is_authenticated:
return False
# Admin hat alle Rechte
if hasattr(self.user, 'is_admin') and self.user.is_admin:
return True
return permission == Permission.LOGIN
# ===== DECORATORS =====
def require_permission(permission: Permission):
"""Decorator für Berechtigungsprüfung"""
def decorator(f):
@wraps(f)
@login_required
def wrapper(*args, **kwargs):
checker = PermissionChecker()
if not checker.has_permission(permission):
if request.path.startswith('/api/'):
return jsonify({'error': 'Insufficient permissions'}), 403
else:
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
# ===== GLOBALE INSTANZEN =====
security_manager = SecurityManager()
def get_security_manager():
return security_manager
def check_permission(permission: Permission, user=None) -> bool:
"""Prüft Berechtigung für Benutzer"""
checker = PermissionChecker(user)
return checker.has_permission(permission)
# ===== LEGACY COMPATIBILITY =====
def csp_nonce():
"""Template Helper für CSP Nonce"""
return getattr(g, 'csp_nonce', '')
def apply_security_headers(response):
"""Wendet Sicherheits-Headers an"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
return response
def init_security(app):
"""Initialisiert Security für Flask App"""
@app.before_request
def before_request_security():
g.csp_nonce = security_manager.generate_nonce()
@app.after_request
def after_request_security(response):
return apply_security_headers(response)
app.jinja_env.globals['csp_nonce'] = csp_nonce
security_logger.info("🔒 Security Suite initialisiert")
return app
security_logger.info("✅ Security Suite Module initialisiert")
security_logger.info("📊 Massive Konsolidierung: 3 Dateien → 1 Datei (67% Reduktion)")

View File

@ -1,655 +0,0 @@
"""
tp-link tapo p110 zentraler controller für myp platform
sammelt alle operativen tapo-steckdosen-funktionalitäten an einem ort.
"""
import time
import socket
import signal
import ipaddress
from datetime import datetime
from typing import Dict, Tuple, Optional, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from models import get_db_session, Printer, PlugStatusLog
from utils.logging_config import get_logger
from utils.settings import TAPO_USERNAME, TAPO_PASSWORD, DEFAULT_TAPO_IPS, TAPO_TIMEOUT, TAPO_RETRY_COUNT
# tp-link tapo p110 unterstützung prüfen
try:
from PyP100 import PyP100
TAPO_AVAILABLE = True
except ImportError:
TAPO_AVAILABLE = False
# logger initialisieren
logger = get_logger("tapo_controller")
class TapoController:
"""
zentraler controller für alle tp-link tapo p110 operationen.
"""
def __init__(self):
"""initialisiere den tapo controller."""
self.username = TAPO_USERNAME
self.password = TAPO_PASSWORD
self.timeout = TAPO_TIMEOUT
self.retry_count = TAPO_RETRY_COUNT
self.auto_discovered = False
if not TAPO_AVAILABLE:
logger.error("❌ PyP100-modul nicht installiert - tapo-funktionalität eingeschränkt")
else:
logger.info("✅ tapo controller initialisiert")
def toggle_plug(self, ip: str, state: bool, username: str = None, password: str = None) -> bool:
"""
schaltet eine tp-link tapo p100/p110-steckdose ein oder aus.
args:
ip: ip-adresse der steckdose
state: true = ein, false = aus
username: benutzername (optional, nutzt standard wenn nicht angegeben)
password: passwort (optional, nutzt standard wenn nicht angegeben)
returns:
bool: true wenn erfolgreich geschaltet
"""
if not TAPO_AVAILABLE:
logger.error("❌ PyP100-modul nicht installiert - steckdose kann nicht geschaltet werden")
return False
# immer globale anmeldedaten verwenden
username = self.username
password = self.password
logger.debug(f"🔧 verwende globale tapo-anmeldedaten für {ip}")
for attempt in range(self.retry_count):
try:
# p100-verbindung herstellen
p100 = PyP100.P100(ip, username, password)
p100.handshake()
p100.login()
# steckdose schalten
if state:
p100.turnOn()
logger.info(f"✅ tapo-steckdose {ip} erfolgreich eingeschaltet")
else:
p100.turnOff()
logger.info(f"✅ tapo-steckdose {ip} erfolgreich ausgeschaltet")
return True
except Exception as e:
action = "ein" if state else "aus"
logger.warning(f"⚠️ versuch {attempt+1}/{self.retry_count} fehlgeschlagen beim {action}schalten von {ip}: {str(e)}")
if attempt < self.retry_count - 1:
time.sleep(1) # kurze pause vor erneutem versuch
else:
logger.error(f"❌ fehler beim {action}schalten der tapo-steckdose {ip}: {str(e)}")
return False
def turn_off(self, ip: str, username: str = None, password: str = None, printer_id: int = None) -> bool:
"""
schaltet eine tp-link tapo p110-steckdose aus.
args:
ip: ip-adresse der steckdose
username: benutzername (optional)
password: passwort (optional)
printer_id: id des zugehörigen druckers für logging (optional)
returns:
bool: true wenn erfolgreich ausgeschaltet
"""
if not TAPO_AVAILABLE:
logger.error("⚠️ PyP100-modul nicht verfügbar - kann tapo-steckdose nicht schalten")
self._log_plug_status(printer_id, "disconnected", ip, error_message="PyP100-modul nicht verfügbar")
return False
# immer globale anmeldedaten verwenden
username = self.username
password = self.password
start_time = time.time()
try:
# tp-link tapo p100 verbindung herstellen
p100 = PyP100.P100(ip, username, password)
p100.handshake()
p100.login()
# steckdose ausschalten
p100.turnOff()
response_time = int((time.time() - start_time) * 1000) # in millisekunden
logger.debug(f"✅ tapo-steckdose {ip} erfolgreich ausgeschaltet")
# logging: erfolgreich ausgeschaltet
self._log_plug_status(printer_id, "off", ip, response_time_ms=response_time)
return True
except Exception as e:
response_time = int((time.time() - start_time) * 1000)
logger.debug(f"⚠️ fehler beim ausschalten der tapo-steckdose {ip}: {str(e)}")
# logging: fehlgeschlagener versuch
self._log_plug_status(printer_id, "disconnected", ip,
response_time_ms=response_time,
error_message=str(e))
return False
def check_outlet_status(self, ip: str, username: str = None, password: str = None,
printer_id: int = None) -> Tuple[bool, str]:
"""
überprüft den status einer tp-link tapo p110-steckdose.
args:
ip: ip-adresse der steckdose
username: benutzername (optional)
password: passwort (optional)
printer_id: id des zugehörigen druckers für logging (optional)
returns:
tuple[bool, str]: (erreichbar, status) - status: "on", "off", "unknown"
"""
if not TAPO_AVAILABLE:
logger.debug("⚠️ PyP100-modul nicht verfügbar - kann tapo-steckdosen-status nicht abfragen")
self._log_plug_status(printer_id, "disconnected", ip,
error_message="PyP100-modul nicht verfügbar",
notes="status-check fehlgeschlagen")
return False, "unknown"
# immer globale anmeldedaten verwenden
username = self.username
password = self.password
start_time = time.time()
try:
# tp-link tapo p100 verbindung herstellen
p100 = PyP100.P100(ip, username, password)
p100.handshake()
p100.login()
# geräteinformationen abrufen
device_info = p100.getDeviceInfo()
# status auswerten
device_on = device_info.get('device_on', False)
status = "on" if device_on else "off"
response_time = int((time.time() - start_time) * 1000)
logger.debug(f"✅ tapo-steckdose {ip}: status = {status}")
# erweiterte informationen sammeln
extra_info = self._collect_device_info(p100, device_info)
# logging: erfolgreicher status-check
self._log_plug_status(printer_id, status, ip,
response_time_ms=response_time,
power_consumption=extra_info.get('power_consumption'),
voltage=extra_info.get('voltage'),
current=extra_info.get('current'),
firmware_version=extra_info.get('firmware_version'),
notes="automatischer status-check")
return True, status
except Exception as e:
response_time = int((time.time() - start_time) * 1000)
logger.debug(f"⚠️ fehler bei tapo-steckdosen-status-check {ip}: {str(e)}")
# logging: fehlgeschlagener status-check
self._log_plug_status(printer_id, "disconnected", ip,
response_time_ms=response_time,
error_message=str(e),
notes="status-check fehlgeschlagen")
return False, "unknown"
def test_connection(self, ip: str, username: str = None, password: str = None) -> dict:
"""
testet die verbindung zu einer tp-link tapo p110-steckdose.
args:
ip: ip-adresse der steckdose
username: benutzername (optional)
password: passwort (optional)
returns:
dict: ergebnis mit status und informationen
"""
result = {
"success": False,
"message": "",
"device_info": None,
"error": None
}
if not TAPO_AVAILABLE:
result["message"] = "PyP100-modul nicht verfügbar"
result["error"] = "ModuleNotFound"
logger.error("PyP100-modul nicht verfügbar - kann tapo-steckdosen nicht testen")
return result
# verwende globale anmeldedaten falls nicht angegeben
if not username or not password:
username = self.username
password = self.password
logger.debug(f"verwende globale tapo-anmeldedaten für {ip}")
try:
# tp-link tapo p100 verbindung herstellen
p100 = PyP100.P100(ip, username, password)
p100.handshake()
p100.login()
# geräteinformationen abrufen
device_info = p100.getDeviceInfo()
result["success"] = True
result["message"] = "verbindung erfolgreich"
result["device_info"] = device_info
logger.info(f"tapo-verbindung zu {ip} erfolgreich: {device_info.get('nickname', 'unbekannt')}")
except Exception as e:
result["success"] = False
result["message"] = f"verbindungsfehler: {str(e)}"
result["error"] = str(e)
logger.error(f"fehler bei tapo-test zu {ip}: {str(e)}")
return result
def ping_address(self, ip: str, timeout: int = 3) -> bool:
"""
führt einen konnektivitätstest zu einer ip-adresse durch.
verwendet tcp-verbindung statt ping für bessere kompatibilität.
args:
ip: zu testende ip-adresse
timeout: timeout in sekunden
returns:
bool: true wenn verbindung erfolgreich
"""
try:
# ip-adresse validieren
ipaddress.ip_address(ip.strip())
# standard-ports für tapo-steckdosen testen
test_ports = [9999, 80, 443] # tapo-standard, http, https
for port in test_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip.strip(), port))
sock.close()
if result == 0:
logger.debug(f"✅ verbindung zu {ip}:{port} erfolgreich")
return True
logger.debug(f"❌ keine verbindung zu {ip} auf standard-ports möglich")
return False
except Exception as e:
logger.debug(f"❌ fehler beim verbindungstest zu {ip}: {str(e)}")
return False
def auto_discover_outlets(self) -> Dict[str, bool]:
"""
automatische erkennung und konfiguration von tp-link tapo p110-steckdosen im netzwerk.
returns:
dict[str, bool]: ergebnis der steckdosenerkennung mit ip als schlüssel
"""
if self.auto_discovered:
logger.info("🔍 tapo-steckdosen wurden bereits erkannt")
return {}
logger.info("🔍 starte automatische tapo-steckdosenerkennung...")
results = {}
start_time = time.time()
# standard-ips aus der konfiguration testen
logger.info(f"🔄 teste {len(DEFAULT_TAPO_IPS)} standard-ips aus der konfiguration")
for i, ip in enumerate(DEFAULT_TAPO_IPS):
try:
logger.info(f"🔍 teste ip {i+1}/{len(DEFAULT_TAPO_IPS)}: {ip}")
# schneller ping-test
if self.ping_address(ip, timeout=2):
logger.info(f"✅ steckdose mit ip {ip} ist erreichbar")
# tapo-verbindung testen
test_result = self.test_connection(ip)
if test_result["success"]:
device_info = test_result["device_info"]
nickname = device_info.get('nickname', f"tapo p110 ({ip})")
state = "on" if device_info.get('device_on', False) else "off"
logger.info(f"✅ tapo-steckdose '{nickname}' ({ip}) gefunden - status: {state}")
results[ip] = True
# steckdose in datenbank speichern/aktualisieren
try:
self._ensure_outlet_in_database(ip, nickname)
except Exception as db_error:
logger.warning(f"⚠️ fehler beim speichern in db für {ip}: {str(db_error)}")
else:
logger.debug(f"❌ ip {ip} ist erreichbar, aber keine tapo-steckdose")
results[ip] = False
else:
logger.debug(f"❌ ip {ip} nicht erreichbar")
results[ip] = False
except Exception as e:
logger.warning(f"❌ fehler bei steckdosen-erkennung für ip {ip}: {str(e)}")
results[ip] = False
continue
# erfolgsstatistik
success_count = sum(1 for success in results.values() if success)
elapsed_time = time.time() - start_time
logger.info(f"✅ steckdosen-erkennung abgeschlossen: {success_count}/{len(results)} steckdosen gefunden in {elapsed_time:.1f}s")
self.auto_discovered = True
return results
def initialize_all_outlets(self) -> Dict[str, bool]:
"""
schaltet alle gespeicherten steckdosen aus (einheitlicher startzustand).
returns:
dict[str, bool]: ergebnis der initialisierung pro drucker
"""
logger.info("🚀 starte steckdosen-initialisierung...")
results = {}
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(Printer.active == True).all()
if not printers:
logger.warning("⚠️ keine aktiven drucker zur initialisierung gefunden")
db_session.close()
return results
# alle steckdosen ausschalten
for printer in printers:
try:
if printer.plug_ip:
success = self.turn_off(
printer.plug_ip,
printer_id=printer.id
)
results[printer.name] = success
if success:
logger.info(f"{printer.name}: steckdose ausgeschaltet")
printer.status = "offline"
printer.last_checked = datetime.now()
else:
logger.warning(f"{printer.name}: steckdose konnte nicht ausgeschaltet werden")
else:
logger.warning(f"⚠️ {printer.name}: keine steckdosen-ip konfiguriert")
results[printer.name] = False
except Exception as e:
logger.error(f"❌ fehler bei initialisierung von {printer.name}: {str(e)}")
results[printer.name] = False
# änderungen speichern
db_session.commit()
db_session.close()
success_count = sum(1 for success in results.values() if success)
total_count = len(results)
logger.info(f"🎯 steckdosen-initialisierung abgeschlossen: {success_count}/{total_count} erfolgreich")
except Exception as e:
logger.error(f"❌ kritischer fehler bei steckdosen-initialisierung: {str(e)}")
return results
def get_all_outlet_status(self) -> Dict[str, Dict[str, Any]]:
"""
holt den status aller konfigurierten tapo-steckdosen.
returns:
dict[str, dict]: status aller steckdosen mit ip als schlüssel
"""
status_dict = {}
try:
db_session = get_db_session()
printers = db_session.query(Printer).filter(
Printer.active == True,
Printer.plug_ip.isnot(None)
).all()
if not printers:
logger.info(" keine drucker mit tapo-steckdosen konfiguriert")
db_session.close()
return status_dict
logger.info(f"🔍 prüfe status von {len(printers)} tapo-steckdosen...")
# parallel-status-prüfung
with ThreadPoolExecutor(max_workers=min(len(printers), 8)) as executor:
future_to_printer = {
executor.submit(
self.check_outlet_status,
printer.plug_ip,
printer_id=printer.id
): printer
for printer in printers
}
for future in as_completed(future_to_printer, timeout=15):
printer = future_to_printer[future]
try:
reachable, status = future.result()
status_dict[printer.plug_ip] = {
"printer_name": printer.name,
"printer_id": printer.id,
"reachable": reachable,
"status": status,
"ip": printer.plug_ip,
"last_checked": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ fehler bei status-check für {printer.name}: {str(e)}")
status_dict[printer.plug_ip] = {
"printer_name": printer.name,
"printer_id": printer.id,
"reachable": False,
"status": "error",
"ip": printer.plug_ip,
"error": str(e),
"last_checked": datetime.now().isoformat()
}
db_session.close()
logger.info(f"✅ status-update abgeschlossen für {len(status_dict)} steckdosen")
except Exception as e:
logger.error(f"❌ kritischer fehler beim abrufen des steckdosen-status: {str(e)}")
return status_dict
def _collect_device_info(self, p100: PyP100.P100, device_info: dict) -> dict:
"""
sammelt erweiterte geräteinformationen von der tapo-steckdose.
args:
p100: pyp100-instanz
device_info: basis-geräteinformationen
returns:
dict: erweiterte informationen
"""
extra_info = {}
try:
# firmware-version
extra_info['firmware_version'] = device_info.get('fw_ver', None)
# versuche energiedaten zu holen (nur p110)
try:
energy_usage = p100.getEnergyUsage()
if energy_usage:
extra_info['power_consumption'] = energy_usage.get('current_power', None)
extra_info['voltage'] = energy_usage.get('voltage', None)
extra_info['current'] = energy_usage.get('current', None)
except:
pass # p100 unterstützt keine energiedaten
except Exception as e:
logger.debug(f"fehler beim sammeln erweiterter geräteinformationen: {str(e)}")
return extra_info
def _log_plug_status(self, printer_id: int, status: str, ip_address: str, **kwargs):
"""
protokolliert steckdosen-status in der datenbank.
args:
printer_id: id des druckers
status: status der steckdose
ip_address: ip-adresse der steckdose
**kwargs: zusätzliche parameter für das logging
"""
if not printer_id:
return
try:
PlugStatusLog.log_status_change(
printer_id=printer_id,
status=status,
source="system",
ip_address=ip_address,
**kwargs
)
except Exception as e:
logger.warning(f"fehler beim loggen des steckdosen-status: {e}")
def _ensure_outlet_in_database(self, ip_address: str, nickname: str = None) -> bool:
"""
stellt sicher, dass eine erkannte tapo-steckdose in der datenbank existiert.
args:
ip_address: ip-adresse der steckdose
nickname: name der steckdose (optional)
returns:
bool: true wenn erfolgreich gespeichert/aktualisiert
"""
try:
db_session = get_db_session()
# prüfen, ob drucker mit dieser ip bereits existiert
existing_printer = db_session.query(Printer).filter(
Printer.plug_ip == ip_address
).first()
if existing_printer:
# drucker aktualisieren
if not existing_printer.plug_username or not existing_printer.plug_password:
existing_printer.plug_username = self.username
existing_printer.plug_password = self.password
logger.info(f"✅ drucker {existing_printer.name} mit tapo-anmeldedaten aktualisiert")
if nickname and existing_printer.name != nickname and "Tapo P110" not in existing_printer.name:
old_name = existing_printer.name
existing_printer.name = nickname
logger.info(f"✅ drucker {old_name} umbenannt zu {nickname}")
# drucker als aktiv markieren
if not existing_printer.active:
existing_printer.active = True
logger.info(f"✅ drucker {existing_printer.name} als aktiv markiert")
existing_printer.last_checked = datetime.now()
db_session.commit()
db_session.close()
return True
else:
# neuen drucker erstellen
printer_name = nickname or f"tapo p110 ({ip_address})"
mac_address = f"tapo:{ip_address.replace('.', '-')}"
new_printer = Printer(
name=printer_name,
model="TP-Link Tapo P110",
location="automatisch erkannt",
ip_address=ip_address,
mac_address=mac_address,
plug_ip=ip_address,
plug_username=self.username,
plug_password=self.password,
status="offline",
active=True,
last_checked=datetime.now()
)
db_session.add(new_printer)
db_session.commit()
logger.info(f"✅ neuer drucker '{printer_name}' mit tapo-steckdose {ip_address} erstellt")
db_session.close()
return True
except Exception as e:
logger.error(f"❌ fehler beim speichern der tapo-steckdose {ip_address}: {str(e)}")
try:
db_session.rollback()
db_session.close()
except:
pass
return False
# globale instanz für einfachen zugriff
tapo_controller = TapoController()
# convenience-funktionen für rückwärtskompatibilität
def toggle_plug(ip: str, state: bool, username: str = None, password: str = None) -> bool:
"""schaltet eine tapo-steckdose ein/aus."""
return tapo_controller.toggle_plug(ip, state, username, password)
def test_tapo_connection(ip: str, username: str = None, password: str = None) -> dict:
"""testet die verbindung zu einer tapo-steckdose."""
return tapo_controller.test_connection(ip, username, password)
def check_outlet_status(ip: str, username: str = None, password: str = None,
printer_id: int = None) -> Tuple[bool, str]:
"""prüft den status einer tapo-steckdose."""
return tapo_controller.check_outlet_status(ip, username, password, printer_id)
def auto_discover_tapo_outlets() -> Dict[str, bool]:
"""führt automatische erkennung von tapo-steckdosen durch."""
return tapo_controller.auto_discover_outlets()
def initialize_all_outlets() -> Dict[str, bool]:
"""initialisiert alle tapo-steckdosen (schaltet sie aus)."""
return tapo_controller.initialize_all_outlets()