import os import logging import threading import time from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from contextlib import contextmanager from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float, event, text, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, sessionmaker, Session, Mapped, mapped_column, scoped_session from sqlalchemy.pool import StaticPool, QueuePool from sqlalchemy.engine import Engine from flask_login import UserMixin import bcrypt import secrets from config.settings import DATABASE_PATH, ensure_database_directory from utils.logging_config import get_logger # ===== DATABASE CLEANUP INTEGRATION ===== # Importiere den neuen Cleanup-Manager try: from utils.database_cleanup import get_cleanup_manager CLEANUP_MANAGER_AVAILABLE = True except ImportError: CLEANUP_MANAGER_AVAILABLE = False logger = get_logger("app") logger.warning("DatabaseCleanupManager nicht verfügbar - Fallback auf Legacy-Cleanup") Base = declarative_base() logger = get_logger("app") # Thread-lokale Session-Factory für sichere Concurrent-Zugriffe _session_factory = None _scoped_session = None _engine = None _connection_pool_lock = threading.Lock() # Cache für häufig abgerufene Daten _cache = {} _cache_lock = threading.Lock() _cache_ttl = {} # Time-to-live für Cache-Einträge # Alle exportierten Modelle __all__ = ['User', 'Printer', 'Job', 'Stats', 'SystemLog', 'Base', 'GuestRequest', 'UserPermission', 'Notification', 'JobOrder', 'SystemTimer', 'PlugStatusLog', 'init_db', 'init_database', 'create_initial_admin', 'get_db_session', 'get_cached_session', 'clear_cache', 'engine'] # ===== DATENBANK-KONFIGURATION MIT WAL UND OPTIMIERUNGEN ===== def configure_sqlite_for_production(dbapi_connection, _connection_record): """ Konfiguriert SQLite für Produktionsumgebung mit WAL-Modus und Optimierungen. """ cursor = dbapi_connection.cursor() # WAL-Modus aktivieren (Write-Ahead Logging) cursor.execute("PRAGMA journal_mode=WAL") # Synchronous-Modus für bessere Performance bei WAL cursor.execute("PRAGMA synchronous=NORMAL") # Cache-Größe erhöhen (in KB, negative Werte = KB) cursor.execute("PRAGMA cache_size=-64000") # 64MB Cache # Memory-mapped I/O aktivieren cursor.execute("PRAGMA mmap_size=268435456") # 256MB # Temp-Store im Memory cursor.execute("PRAGMA temp_store=MEMORY") # Optimierungen für bessere Performance cursor.execute("PRAGMA optimize") # Foreign Key Constraints aktivieren cursor.execute("PRAGMA foreign_keys=ON") # Auto-Vacuum für automatische Speicherbereinigung cursor.execute("PRAGMA auto_vacuum=INCREMENTAL") # Busy Timeout für Concurrent Access cursor.execute("PRAGMA busy_timeout=30000") # 30 Sekunden # Checkpoint-Intervall für WAL cursor.execute("PRAGMA wal_autocheckpoint=1000") # ===== RASPBERRY PI SPEZIFISCHE OPTIMIERUNGEN ===== # Reduzierte Cache-Größe für schwache Hardware cursor.execute("PRAGMA cache_size=-32000") # 32MB statt 64MB für Pi # Kleinere Memory-mapped I/O für SD-Karten cursor.execute("PRAGMA mmap_size=134217728") # 128MB statt 256MB # Weniger aggressive Vacuum-Einstellungen cursor.execute("PRAGMA auto_vacuum=INCREMENTAL") cursor.execute("PRAGMA incremental_vacuum(10)") # Nur 10 Seiten pro Mal # Optimierungen für SD-Karten I/O cursor.execute("PRAGMA page_size=4096") # Optimal für SD-Karten cursor.execute("PRAGMA temp_store=MEMORY") # Temp im RAM cursor.execute("PRAGMA locking_mode=NORMAL") # Normale Sperrung # Query Planner Optimierung cursor.execute("PRAGMA optimize=0x10002") # Aggressive Optimierung # Reduzierte WAL-Datei-Größe für Pi cursor.execute("PRAGMA journal_size_limit=32768000") # 32MB WAL-Limit cursor.close() logger.info("SQLite für Raspberry Pi optimiert (reduzierte Cache-Größe, SD-Karten I/O)") def create_optimized_engine(): """ Erstellt eine optimierte SQLite-Engine mit korrekten SQLite-spezifischen Parametern. """ global _engine if _engine is not None: return _engine with _connection_pool_lock: if _engine is not None: return _engine ensure_database_directory() # Connection String mit optimierten Parametern connection_string = f"sqlite:///{DATABASE_PATH}" # Engine mit SQLite-spezifischen Parametern (ohne Server-DB Pool-Parameter) _engine = create_engine( connection_string, # SQLite-spezifische Pool-Konfiguration poolclass=StaticPool, pool_pre_ping=True, pool_recycle=7200, # Recycling-Zeit (für SQLite sinnvoll) connect_args={ "check_same_thread": False, "timeout": 45, # Längerer Timeout für SD-Karten "isolation_level": None, # Raspberry Pi spezifische SQLite-Einstellungen "cached_statements": 100, # Reduzierte Statement-Cache }, echo=False, # Performance-optimierte Execution-Optionen für Pi execution_options={ "autocommit": False, "compiled_cache": {}, # Statement-Kompilierung cachen } # Entfernt: pool_size, max_overflow, pool_timeout (nicht für SQLite/StaticPool) ) # Event-Listener für SQLite-Optimierungen event.listen(_engine, "connect", configure_sqlite_for_production) # Regelmäßige Wartungsaufgaben event.listen(_engine, "connect", lambda conn, rec: schedule_maintenance()) # ===== CLEANUP MANAGER INTEGRATION ===== # Registriere Engine beim Cleanup-Manager für sicheres Shutdown if CLEANUP_MANAGER_AVAILABLE: try: cleanup_manager = get_cleanup_manager() cleanup_manager.register_engine(_engine) logger.debug("Engine beim DatabaseCleanupManager registriert") except Exception as e: logger.warning(f"Fehler bei Cleanup-Manager-Registrierung: {e}") logger.info(f"Optimierte SQLite-Engine erstellt: {DATABASE_PATH}") return _engine def schedule_maintenance(): """ Plant regelmäßige Wartungsaufgaben für die Datenbank. """ def maintenance_worker(): time.sleep(300) # 5 Minuten warten while True: try: with get_maintenance_session() as session: # WAL-Checkpoint ausführen (aggressive Strategie) checkpoint_result = session.execute(text("PRAGMA wal_checkpoint(TRUNCATE)")).fetchone() # Nur loggen wenn tatsächlich Daten übertragen wurden if checkpoint_result and checkpoint_result[1] > 0: logger.info(f"WAL-Checkpoint: {checkpoint_result[1]} Seiten übertragen, {checkpoint_result[2]} Seiten zurückgesetzt") # Statistiken aktualisieren (alle 30 Minuten) session.execute(text("ANALYZE")) # Incremental Vacuum (alle 60 Minuten) session.execute(text("PRAGMA incremental_vacuum")) session.commit() except Exception as e: logger.error(f"Fehler bei Datenbank-Wartung: {str(e)}") # Warte 30 Minuten bis zur nächsten Wartung time.sleep(1800) # Wartung in separatem Thread ausführen maintenance_thread = threading.Thread(target=maintenance_worker, daemon=True) maintenance_thread.start() def get_session_factory(): """ Gibt die Thread-sichere Session-Factory zurück. """ global _session_factory, _scoped_session if _session_factory is None: with _connection_pool_lock: if _session_factory is None: engine = create_optimized_engine() _session_factory = sessionmaker( bind=engine, autoflush=True, autocommit=False, expire_on_commit=False # Objekte nach Commit nicht expiren ) _scoped_session = scoped_session(_session_factory) return _scoped_session @contextmanager def get_maintenance_session(): """ Context Manager für Wartungs-Sessions. """ engine = create_optimized_engine() session = sessionmaker(bind=engine)() try: yield session except Exception as e: session.rollback() raise e finally: session.close() # ===== CACHING-SYSTEM ===== def get_cache_key(model_class: str, identifier: Any, extra: str = "") -> str: """ Generiert einen Cache-Schlüssel. """ return f"{model_class}:{identifier}:{extra}" def set_cache(key: str, value: Any, ttl_seconds: int = 300): """ Setzt einen Wert im Cache mit TTL. """ with _cache_lock: _cache[key] = value _cache_ttl[key] = time.time() + ttl_seconds def get_cache(key: str) -> Optional[Any]: """ Holt einen Wert aus dem Cache. """ with _cache_lock: if key in _cache: if key in _cache_ttl and time.time() > _cache_ttl[key]: # Cache-Eintrag abgelaufen del _cache[key] del _cache_ttl[key] return None return _cache[key] return None def clear_cache(pattern: str = None): """ Löscht Cache-Einträge (optional mit Pattern). """ with _cache_lock: if pattern is None: _cache.clear() _cache_ttl.clear() else: keys_to_delete = [k for k in _cache.keys() if pattern in k] for key in keys_to_delete: del _cache[key] if key in _cache_ttl: del _cache_ttl[key] def invalidate_model_cache(model_class: str, identifier: Any = None): """ Invalidiert Cache-Einträge für ein bestimmtes Modell. """ if identifier is not None: pattern = f"{model_class}:{identifier}" else: pattern = f"{model_class}:" clear_cache(pattern) # ===== ERWEITERTE SESSION-VERWALTUNG ===== @contextmanager def get_cached_session(): """ Context Manager für gecachte Sessions mit automatischem Rollback. """ session_factory = get_session_factory() session = session_factory() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f"Datenbank-Transaktion fehlgeschlagen: {str(e)}") raise e finally: session.close() def get_db_session() -> Session: """ Gibt eine neue Datenbank-Session zurück (Legacy-Kompatibilität). """ session_factory = get_session_factory() return session_factory() # ===== MODELL-DEFINITIONEN ===== class User(UserMixin, Base): __tablename__ = "users" id = Column(Integer, primary_key=True) email = Column(String(120), unique=True, nullable=False) username = Column(String(100), unique=True, nullable=False) # Füge username hinzu für login password_hash = Column(String(128), nullable=False) name = Column(String(100), nullable=False) role = Column(String(20), default="user") # "admin" oder "user" active = Column(Boolean, default=True) # Für Flask-Login is_active created_at = Column(DateTime, default=datetime.now) last_login = Column(DateTime, nullable=True) # Letzter Login-Zeitstempel updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Automatische Aktualisierung settings = Column(Text, nullable=True) # JSON-String für Benutzereinstellungen last_activity = Column(DateTime, default=datetime.now) # Zusätzliche Profil-Felder für bessere Benutzerverwaltung department = Column(String(100), nullable=True) # Abteilung position = Column(String(100), nullable=True) # Position/Rolle im Unternehmen phone = Column(String(50), nullable=True) # Telefonnummer bio = Column(Text, nullable=True) # Kurze Beschreibung/Bio jobs = relationship("Job", back_populates="user", foreign_keys="Job.user_id", cascade="all, delete-orphan") owned_jobs = relationship("Job", foreign_keys="Job.owner_id", overlaps="owner") permissions = relationship("UserPermission", back_populates="user", uselist=False, cascade="all, delete-orphan") notifications = relationship("Notification", back_populates="user", cascade="all, delete-orphan") def set_password(self, password: str) -> None: password_bytes = password.encode('utf-8') salt = bcrypt.gensalt() self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8') # Cache invalidieren invalidate_model_cache("User", self.id) def check_password(self, password: str) -> bool: password_bytes = password.encode('utf-8') hash_bytes = self.password_hash.encode('utf-8') return bcrypt.checkpw(password_bytes, hash_bytes) @property def is_admin(self) -> bool: return self.role == "admin" @property def is_active(self) -> bool: """Required for Flask-Login""" return self.active def get_id(self) -> str: """Required for Flask-Login - return user id as unicode string""" return str(self.id) def to_dict(self) -> dict: # Cache-Key für User-Dict cache_key = get_cache_key("User", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result result = { "id": self.id, "email": self.email, "username": self.username, "name": self.name, "role": self.role, "active": self.active, "created_at": self.created_at.isoformat() if self.created_at else None, "last_login": self.last_login.isoformat() if self.last_login else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "settings": self.settings, "department": self.department, "position": self.position, "phone": self.phone, "last_login": self.last_login.isoformat() if self.last_login else None } # Ergebnis cachen (5 Minuten) set_cache(cache_key, result, 300) return result @classmethod def get_by_username_or_email(cls, identifier: str) -> Optional['User']: """ Holt einen Benutzer anhand von Username oder E-Mail mit Caching. """ cache_key = get_cache_key("User", identifier, "login") cached_user = get_cache(cache_key) if cached_user is not None: return cached_user with get_cached_session() as session: user = session.query(cls).filter( (cls.username == identifier) | (cls.email == identifier) ).first() if user: # User für 10 Minuten cachen set_cache(cache_key, user, 600) return user @classmethod def get_by_id_cached(cls, user_id: int) -> Optional['User']: """ Holt einen Benutzer anhand der ID mit Caching. """ cache_key = get_cache_key("User", user_id, "id") cached_user = get_cache(cache_key) if cached_user is not None: return cached_user with get_cached_session() as session: user = session.query(cls).filter(cls.id == user_id).first() if user: # User für 10 Minuten cachen set_cache(cache_key, user, 600) return user def update_last_login(self): """ Aktualisiert den letzten Login-Zeitstempel. """ self.last_login = datetime.now() # Cache invalidieren invalidate_model_cache("User", self.id) class Printer(Base): __tablename__ = "printers" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) model = Column(String(100)) # Drucker-Modell location = Column(String(100)) ip_address = Column(String(50)) # IP-Adresse des Druckers mac_address = Column(String(50), nullable=False, unique=True) plug_ip = Column(String(50), nullable=False) plug_username = Column(String(100), nullable=False) plug_password = Column(String(100), nullable=False) status = Column(String(20), default="offline") # online, offline, busy, idle active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.now) last_checked = Column(DateTime, nullable=True) # Zeitstempel der letzten Status-Überprüfung jobs = relationship("Job", back_populates="printer", cascade="all, delete-orphan") def to_dict(self) -> dict: # Cache-Key für Printer-Dict cache_key = get_cache_key("Printer", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result result = { "id": self.id, "name": self.name, "model": self.model, "location": self.location, "ip_address": self.ip_address, "mac_address": self.mac_address, "plug_ip": self.plug_ip, "status": self.status, "active": self.active, "created_at": self.created_at.isoformat() if self.created_at else None, "last_checked": self.last_checked.isoformat() if self.last_checked else None } # Ergebnis cachen (2 Minuten für Drucker-Status) set_cache(cache_key, result, 120) return result def update_status(self, new_status: str, active: bool = None): """ Aktualisiert den Drucker-Status und invalidiert den Cache. """ self.status = new_status self.last_checked = datetime.now() if active is not None: self.active = active # Cache invalidieren invalidate_model_cache("Printer", self.id) @classmethod def get_all_cached(cls) -> List['Printer']: """ Holt alle Drucker mit Caching. """ cache_key = get_cache_key("Printer", "all", "list") cached_printers = get_cache(cache_key) if cached_printers is not None: return cached_printers with get_cached_session() as session: printers = session.query(cls).all() # Drucker für 5 Minuten cachen set_cache(cache_key, printers, 300) return printers @classmethod def get_online_printers(cls) -> List['Printer']: """ Holt alle online Drucker mit Caching. """ cache_key = get_cache_key("Printer", "online", "list") cached_printers = get_cache(cache_key) if cached_printers is not None: return cached_printers with get_cached_session() as session: printers = session.query(cls).filter( cls.status.in_(["online", "available", "idle"]) ).all() # Online-Drucker für 1 Minute cachen (häufiger aktualisiert) set_cache(cache_key, printers, 60) return printers class Job(Base): __tablename__ = "jobs" id = Column(Integer, primary_key=True) name = Column(String(200), nullable=False) description = Column(String(500)) # Beschreibung des Jobs user_id = Column(Integer, ForeignKey("users.id"), nullable=False) printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False) start_at = Column(DateTime) end_at = Column(DateTime) actual_end_time = Column(DateTime) status = Column(String(20), default="scheduled") # scheduled|running|finished|aborted created_at = Column(DateTime, default=datetime.now) notes = Column(String(500)) material_used = Column(Float) # in Gramm file_path = Column(String(500), nullable=True) owner_id = Column(Integer, ForeignKey("users.id"), nullable=True) duration_minutes = Column(Integer, nullable=False) # Dauer in Minuten user = relationship("User", back_populates="jobs", foreign_keys=[user_id]) owner = relationship("User", foreign_keys=[owner_id], overlaps="owned_jobs") printer = relationship("Printer", back_populates="jobs") def to_dict(self) -> dict: # Cache-Key für Job-Dict cache_key = get_cache_key("Job", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result result = { "id": self.id, "name": self.name, "description": self.description, "user_id": self.user_id, "printer_id": self.printer_id, "start_at": self.start_at.isoformat() if self.start_at else None, "end_at": self.end_at.isoformat() if self.end_at else None, "actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None, "status": self.status, "created_at": self.created_at.isoformat() if self.created_at else None, "notes": self.notes, "material_used": self.material_used, "file_path": self.file_path, "owner_id": self.owner_id, "duration_minutes": self.duration_minutes, "user": self.user.to_dict() if self.user else None, "printer": self.printer.to_dict() if self.printer else None } # Ergebnis cachen (3 Minuten für Jobs) set_cache(cache_key, result, 180) return result def update_status(self, new_status: str): """ Aktualisiert den Job-Status und invalidiert den Cache. """ self.status = new_status if new_status in ["finished", "failed", "cancelled"]: self.actual_end_time = datetime.now() # Cache invalidieren invalidate_model_cache("Job", self.id) # Auch User- und Printer-Caches invalidieren invalidate_model_cache("User", self.user_id) invalidate_model_cache("Printer", self.printer_id) @classmethod def get_active_jobs(cls) -> List['Job']: """ Holt alle aktiven Jobs mit Caching. """ cache_key = get_cache_key("Job", "active", "list") cached_jobs = get_cache(cache_key) if cached_jobs is not None: return cached_jobs with get_cached_session() as session: jobs = session.query(cls).filter( cls.status.in_(["scheduled", "running"]) ).all() # Aktive Jobs für 30 Sekunden cachen (häufig aktualisiert) set_cache(cache_key, jobs, 30) return jobs @classmethod def get_user_jobs(cls, user_id: int) -> List['Job']: """ Holt alle Jobs eines Benutzers mit Caching. """ cache_key = get_cache_key("Job", f"user_{user_id}", "list") cached_jobs = get_cache(cache_key) if cached_jobs is not None: return cached_jobs with get_cached_session() as session: jobs = session.query(cls).filter(cls.user_id == user_id).all() # Benutzer-Jobs für 5 Minuten cachen set_cache(cache_key, jobs, 300) return jobs class Stats(Base): __tablename__ = "stats" id = Column(Integer, primary_key=True) total_print_time = Column(Integer, default=0) # in Sekunden total_jobs_completed = Column(Integer, default=0) total_material_used = Column(Float, default=0.0) # in Gramm last_updated = Column(DateTime, default=datetime.now) def to_dict(self) -> dict: # Cache-Key für Stats-Dict cache_key = get_cache_key("Stats", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result result = { "id": self.id, "total_print_time": self.total_print_time, "total_jobs_completed": self.total_jobs_completed, "total_material_used": self.total_material_used, "last_updated": self.last_updated.isoformat() if self.last_updated else None } # Statistiken für 10 Minuten cachen set_cache(cache_key, result, 600) return result class SystemLog(Base): """System-Log Modell für Logging von System-Events""" __tablename__ = "system_logs" id = Column(Integer, primary_key=True) timestamp = Column(DateTime, default=datetime.now, nullable=False) level = Column(String(20), nullable=False) # DEBUG, INFO, WARNING, ERROR, CRITICAL message = Column(String(1000), nullable=False) module = Column(String(100)) # Welches Modul/Blueprint den Log erstellt hat user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Optional: welcher User ip_address = Column(String(50)) # Optional: IP-Adresse user_agent = Column(String(500)) # Optional: Browser/Client Info user = relationship("User", foreign_keys=[user_id]) def to_dict(self) -> dict: return { "id": self.id, "timestamp": self.timestamp.isoformat() if self.timestamp else None, "level": self.level, "message": self.message, "module": self.module, "user_id": self.user_id, "ip_address": self.ip_address, "user_agent": self.user_agent, "user": self.user.to_dict() if self.user else None } @classmethod def log_system_event(cls, level: str, message: str, module: str = None, user_id: int = None, ip_address: str = None, user_agent: str = None) -> 'SystemLog': """ Hilfsmethode zum Erstellen eines System-Log-Eintrags Args: level: Log-Level (DEBUG, INFO, WARNING, ERROR, CRITICAL) message: Log-Nachricht module: Optional - Modul/Blueprint Name user_id: Optional - Benutzer-ID ip_address: Optional - IP-Adresse user_agent: Optional - User-Agent String Returns: SystemLog: Das erstellte Log-Objekt """ return cls( level=level.upper(), message=message, module=module, user_id=user_id, ip_address=ip_address, user_agent=user_agent ) class UserPermission(Base): """ Berechtigungen für Benutzer. """ __tablename__ = "user_permissions" user_id = Column(Integer, ForeignKey("users.id"), primary_key=True) can_start_jobs = Column(Boolean, default=False) needs_approval = Column(Boolean, default=True) can_approve_jobs = Column(Boolean, default=False) user = relationship("User", back_populates="permissions") def to_dict(self) -> dict: """ Konvertiert die Benutzerberechtigungen in ein Dictionary. """ return { "user_id": self.user_id, "can_start_jobs": self.can_start_jobs, "needs_approval": self.needs_approval, "can_approve_jobs": self.can_approve_jobs } class Notification(Base): """ Benachrichtigungen für Benutzer. """ __tablename__ = "notifications" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) type = Column(String(50), nullable=False) payload = Column(Text) # JSON-Daten als String created_at = Column(DateTime, default=datetime.now) read = Column(Boolean, default=False) user = relationship("User", back_populates="notifications") def to_dict(self) -> dict: """ Konvertiert die Benachrichtigung in ein Dictionary. """ return { "id": self.id, "user_id": self.user_id, "type": self.type, "payload": self.payload, "created_at": self.created_at.isoformat() if self.created_at else None, "read": self.read } @classmethod def create_for_approvers(cls, notification_type: str, payload: dict): """ Erstellt Benachrichtigungen für alle Benutzer mit can_approve_jobs-Berechtigung. Args: notification_type: Art der Benachrichtigung payload: Daten für die Benachrichtigung als Dictionary """ import json payload_json = json.dumps(payload) with get_cached_session() as session: # Alle Benutzer mit can_approve_jobs-Berechtigung finden approvers = session.query(User).join(UserPermission).filter( UserPermission.can_approve_jobs == True ).all() # Benachrichtigungen für alle Genehmiger erstellen for approver in approvers: notification = cls( user_id=approver.id, type=notification_type, payload=payload_json ) session.add(notification) session.commit() class GuestRequest(Base): """ Gastanfragen für Druckaufträge. """ __tablename__ = "guest_requests" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) email = Column(String(120)) reason = Column(Text) duration_min = Column(Integer) # Bestehend - wird für Backward-Kompatibilität beibehalten duration_minutes = Column(Integer) # Neu hinzugefügt für API-Kompatibilität created_at = Column(DateTime, default=datetime.now) status = Column(String(20), default="pending") # pending|approved|denied printer_id = Column(Integer, ForeignKey("printers.id")) otp_code = Column(String(100), nullable=True) # Hash des OTP-Codes job_id = Column(Integer, ForeignKey("jobs.id"), nullable=True) author_ip = Column(String(50)) otp_used_at = Column(DateTime, nullable=True) # Zeitpunkt der OTP-Verwendung # Erweiterte Attribute für Datei-Management file_name = Column(String(255), nullable=True) # Name der hochgeladenen Datei file_path = Column(String(500), nullable=True) # Pfad zur hochgeladenen Datei copies = Column(Integer, default=1) # Anzahl der Kopien # Neue Felder für Admin-Verwaltung processed_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der die Anfrage bearbeitet hat processed_at = Column(DateTime, nullable=True) # Zeitpunkt der Bearbeitung approval_notes = Column(Text, nullable=True) # Notizen bei Genehmigung rejection_reason = Column(Text, nullable=True) # Grund bei Ablehnung updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Automatische Aktualisierung # Zusätzliche Zeitstempel für bessere Verwaltung approved_at = Column(DateTime, nullable=True) # Zeitpunkt der Genehmigung rejected_at = Column(DateTime, nullable=True) # Zeitpunkt der Ablehnung approved_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der genehmigt hat rejected_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der abgelehnt hat # OTP-Verwaltung erweitert otp_expires_at = Column(DateTime, nullable=True) # Ablaufzeit des OTP-Codes assigned_printer_id = Column(Integer, ForeignKey("printers.id"), nullable=True) # Zugewiesener Drucker # Beziehungen printer = relationship("Printer", foreign_keys=[printer_id]) assigned_printer = relationship("Printer", foreign_keys=[assigned_printer_id]) job = relationship("Job") processed_by_user = relationship("User", foreign_keys=[processed_by]) # Admin der bearbeitet hat approved_by_user = relationship("User", foreign_keys=[approved_by]) # Admin der genehmigt hat rejected_by_user = relationship("User", foreign_keys=[rejected_by]) # Admin der abgelehnt hat def to_dict(self) -> dict: # Cache-Key für GuestRequest-Dict cache_key = get_cache_key("GuestRequest", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result result = { "id": self.id, "name": self.name, "email": self.email, "reason": self.reason, "duration_min": self.duration_min, "duration_minutes": self.duration_minutes, "created_at": self.created_at.isoformat() if self.created_at else None, "status": self.status, "printer_id": self.printer_id, "job_id": self.job_id, "author_ip": self.author_ip, "otp_used_at": self.otp_used_at.isoformat() if self.otp_used_at else None, "file_name": self.file_name, "file_path": self.file_path, "copies": self.copies, "processed_by": self.processed_by, "processed_at": self.processed_at.isoformat() if self.processed_at else None, "approval_notes": self.approval_notes, "rejection_reason": self.rejection_reason, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "approved_at": self.approved_at.isoformat() if self.approved_at else None, "rejected_at": self.rejected_at.isoformat() if self.rejected_at else None, "approved_by": self.approved_by, "rejected_by": self.rejected_by, "otp_expires_at": self.otp_expires_at.isoformat() if self.otp_expires_at else None, "assigned_printer_id": self.assigned_printer_id, } # Ergebnis cachen (5 Minuten) set_cache(cache_key, result, 300) return result def generate_otp(self) -> str: """ Generiert einen neuen OTP-Code und speichert den Hash. """ otp_plain = secrets.token_hex(8) # 16-stelliger hexadezimaler Code # Hash des OTP-Codes speichern otp_bytes = otp_plain.encode('utf-8') salt = bcrypt.gensalt() self.otp_code = bcrypt.hashpw(otp_bytes, salt).decode('utf-8') logger.info(f"OTP generiert für Guest Request {self.id}") # Cache invalidieren invalidate_model_cache("GuestRequest", self.id) return otp_plain def verify_otp(self, otp_plain: str) -> bool: """ Verifiziert einen OTP-Code. """ if not self.otp_code or not otp_plain: return False try: otp_bytes = otp_plain.encode('utf-8') hash_bytes = self.otp_code.encode('utf-8') is_valid = bcrypt.checkpw(otp_bytes, hash_bytes) if is_valid: self.otp_used_at = datetime.now() logger.info(f"OTP erfolgreich verifiziert für Guest Request {self.id}") # Cache invalidieren invalidate_model_cache("GuestRequest", self.id) else: logger.warning(f"Ungültiger OTP-Code für Guest Request {self.id}") return is_valid except Exception as e: logger.error(f"Fehler bei OTP-Verifizierung: {str(e)}") return False class JobOrder(Base): """ Job-Reihenfolge für Drucker im Drag & Drop System. Speichert die benutzerdefinierte Reihenfolge der Jobs pro Drucker. """ __tablename__ = "job_orders" id = Column(Integer, primary_key=True) printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False) job_id = Column(Integer, ForeignKey("jobs.id"), nullable=False) order_position = Column(Integer, nullable=False) # Position in der Reihenfolge (0-basiert) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) last_modified_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Wer die Reihenfolge geändert hat # Beziehungen printer = relationship("Printer", foreign_keys=[printer_id]) job = relationship("Job", foreign_keys=[job_id]) modified_by_user = relationship("User", foreign_keys=[last_modified_by]) # Eindeutige Kombination: Ein Job kann nur eine Position pro Drucker haben __table_args__ = ( # Hier könnten Constraints definiert werden ) def to_dict(self) -> dict: return { "id": self.id, "printer_id": self.printer_id, "job_id": self.job_id, "order_position": self.order_position, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "last_modified_by": self.last_modified_by, "printer": self.printer.to_dict() if self.printer else None, "job": self.job.to_dict() if self.job else None } @classmethod def get_order_for_printer(cls, printer_id: int) -> List['JobOrder']: """ Holt die Job-Reihenfolge für einen bestimmten Drucker. """ cache_key = get_cache_key("JobOrder", printer_id, "printer_order") cached_order = get_cache(cache_key) if cached_order is not None: return cached_order with get_cached_session() as session: order = session.query(cls).filter( cls.printer_id == printer_id ).order_by(cls.order_position.asc()).all() # Ergebnis für 5 Minuten cachen set_cache(cache_key, order, 300) return order @classmethod def update_printer_order(cls, printer_id: int, job_ids: List[int], modified_by_user_id: int = None) -> bool: """ Aktualisiert die Job-Reihenfolge für einen Drucker. """ try: with get_cached_session() as session: # Bestehende Reihenfolge für diesen Drucker löschen session.query(cls).filter(cls.printer_id == printer_id).delete() # Neue Reihenfolge erstellen for position, job_id in enumerate(job_ids): order = cls( printer_id=printer_id, job_id=job_id, order_position=position, last_modified_by=modified_by_user_id ) session.add(order) session.commit() # Cache invalidieren invalidate_model_cache("JobOrder", printer_id) return True except Exception as e: logger.error(f"Fehler beim Aktualisieren der Job-Reihenfolge: {str(e)}") return False @classmethod def get_ordered_job_ids(cls, printer_id: int) -> List[int]: """ Holt die geordneten Job-IDs für einen bestimmten Drucker. """ cache_key = get_cache_key("JobOrder", printer_id, "ordered_ids") cached_ids = get_cache(cache_key) if cached_ids is not None: return cached_ids orders = cls.get_order_for_printer(printer_id) job_ids = [order.job_id for order in orders] # Ergebnis für 5 Minuten cachen set_cache(cache_key, job_ids, 300) return job_ids @classmethod def remove_job_from_orders(cls, job_id: int): """ Entfernt einen Job aus allen Reihenfolgen (wenn Job gelöscht wird). """ try: with get_cached_session() as session: # Job aus allen Reihenfolgen entfernen affected_printers = session.query(cls.printer_id).filter( cls.job_id == job_id ).distinct().all() session.query(cls).filter(cls.job_id == job_id).delete() # Positionen neu arrangieren für betroffene Drucker for (printer_id,) in affected_printers: remaining_orders = session.query(cls).filter( cls.printer_id == printer_id ).order_by(cls.order_position.asc()).all() # Positionen neu vergeben for new_position, order in enumerate(remaining_orders): order.order_position = new_position # Cache für diesen Drucker invalidieren invalidate_model_cache("JobOrder", printer_id) session.commit() except Exception as e: logger.error(f"Fehler beim Entfernen des Jobs aus Reihenfolgen: {str(e)}") @classmethod def cleanup_invalid_orders(cls): """ Bereinigt ungültige Reihenfolgen-Einträge (Jobs/Drucker die nicht mehr existieren). """ try: with get_cached_session() as session: # Finde Reihenfolgen mit nicht-existierenden Jobs invalid_job_orders = session.query(cls).outerjoin( Job, cls.job_id == Job.id ).filter(Job.id.is_(None)).all() # Finde Reihenfolgen mit nicht-existierenden Druckern invalid_printer_orders = session.query(cls).outerjoin( Printer, cls.printer_id == Printer.id ).filter(Printer.id.is_(None)).all() # Alle ungültigen Einträge löschen for order in invalid_job_orders + invalid_printer_orders: session.delete(order) session.commit() # Kompletten Cache leeren für Cleanup clear_cache() logger.info(f"Bereinigung: {len(invalid_job_orders + invalid_printer_orders)} ungültige Reihenfolgen-Einträge entfernt") except Exception as e: logger.error(f"Fehler bei der Bereinigung der Job-Reihenfolgen: {str(e)}") class SystemTimer(Base): """ System-Timer für Countdown-Zähler mit Force-Quit-Funktionalität. Unterstützt verschiedene Timer-Typen für Kiosk, Sessions, Jobs, etc. """ __tablename__ = "system_timers" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) # Eindeutiger Name des Timers timer_type = Column(String(50), nullable=False) # kiosk, session, job, system, maintenance duration_seconds = Column(Integer, nullable=False) # Timer-Dauer in Sekunden remaining_seconds = Column(Integer, nullable=False) # Verbleibende Sekunden target_timestamp = Column(DateTime, nullable=False) # Ziel-Zeitstempel wann Timer abläuft # Timer-Status und Kontrolle status = Column(String(20), default="stopped") # stopped, running, paused, expired, force_quit auto_start = Column(Boolean, default=False) # Automatischer Start nach Erstellung auto_restart = Column(Boolean, default=False) # Automatischer Neustart nach Ablauf # Force-Quit-Konfiguration force_quit_enabled = Column(Boolean, default=True) # Force-Quit aktiviert force_quit_action = Column(String(50), default="logout") # logout, restart, shutdown, custom force_quit_warning_seconds = Column(Integer, default=30) # Warnung X Sekunden vor Force-Quit # Zusätzliche Konfiguration show_warning = Column(Boolean, default=True) # Warnung anzeigen warning_message = Column(Text, nullable=True) # Benutzerdefinierte Warnung custom_action_endpoint = Column(String(200), nullable=True) # Custom API-Endpoint für Force-Quit # Verwaltung und Tracking created_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Ersteller (optional für System-Timer) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) last_activity = Column(DateTime, default=datetime.now) # Letzte Aktivität (für Session-Timer) # Kontext-spezifische Felder context_id = Column(Integer, nullable=True) # Job-ID, Session-ID, etc. context_data = Column(Text, nullable=True) # JSON-String für zusätzliche Kontext-Daten # Statistiken start_count = Column(Integer, default=0) # Wie oft wurde der Timer gestartet force_quit_count = Column(Integer, default=0) # Wie oft wurde Force-Quit ausgeführt # Beziehungen created_by_user = relationship("User", foreign_keys=[created_by]) def to_dict(self) -> dict: """ Konvertiert den Timer zu einem Dictionary für API-Responses. """ # Cache-Key für Timer-Dict cache_key = get_cache_key("SystemTimer", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result # Berechne aktuelle verbleibende Zeit current_remaining = self.get_current_remaining_seconds() result = { "id": self.id, "name": self.name, "timer_type": self.timer_type, "duration_seconds": self.duration_seconds, "remaining_seconds": current_remaining, "target_timestamp": self.target_timestamp.isoformat() if self.target_timestamp else None, "status": self.status, "auto_start": self.auto_start, "auto_restart": self.auto_restart, "force_quit_enabled": self.force_quit_enabled, "force_quit_action": self.force_quit_action, "force_quit_warning_seconds": self.force_quit_warning_seconds, "show_warning": self.show_warning, "warning_message": self.warning_message, "custom_action_endpoint": self.custom_action_endpoint, "created_by": self.created_by, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "last_activity": self.last_activity.isoformat() if self.last_activity else None, "context_id": self.context_id, "context_data": self.context_data, "start_count": self.start_count, "force_quit_count": self.force_quit_count, # Berechnete Felder "is_running": self.is_running(), "is_expired": self.is_expired(), "should_show_warning": self.should_show_warning(), "progress_percentage": self.get_progress_percentage() } # Ergebnis für 10 Sekunden cachen (kurz wegen sich ändernder Zeit) set_cache(cache_key, result, 10) return result def get_current_remaining_seconds(self) -> int: """ Berechnet die aktuell verbleibenden Sekunden basierend auf dem Ziel-Zeitstempel. """ if self.status != "running": return self.remaining_seconds now = datetime.now() if now >= self.target_timestamp: return 0 remaining = int((self.target_timestamp - now).total_seconds()) return max(0, remaining) def is_running(self) -> bool: """ Prüft ob der Timer aktuell läuft. """ return self.status == "running" def is_expired(self) -> bool: """ Prüft ob der Timer abgelaufen ist. """ return self.status == "expired" or self.get_current_remaining_seconds() <= 0 def should_show_warning(self) -> bool: """ Prüft ob eine Warnung angezeigt werden soll. """ if not self.show_warning or not self.is_running(): return False remaining = self.get_current_remaining_seconds() return remaining <= self.force_quit_warning_seconds and remaining > 0 def get_progress_percentage(self) -> float: """ Berechnet den Fortschritt in Prozent (0.0 bis 100.0). """ if self.duration_seconds <= 0: return 100.0 elapsed = self.duration_seconds - self.get_current_remaining_seconds() return min(100.0, max(0.0, (elapsed / self.duration_seconds) * 100.0)) def start_timer(self) -> bool: """ Startet den Timer. """ try: if self.status == "running": return True # Bereits laufend now = datetime.now() self.target_timestamp = now + timedelta(seconds=self.remaining_seconds) self.status = "running" self.last_activity = now self.start_count += 1 self.updated_at = now # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) logger.info(f"Timer '{self.name}' gestartet - läuft für {self.remaining_seconds} Sekunden") return True except Exception as e: logger.error(f"Fehler beim Starten des Timers '{self.name}': {str(e)}") return False def pause_timer(self) -> bool: """ Pausiert den Timer. """ try: if self.status != "running": return False # Verbleibende Zeit aktualisieren self.remaining_seconds = self.get_current_remaining_seconds() self.status = "paused" self.updated_at = datetime.now() # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) logger.info(f"Timer '{self.name}' pausiert - {self.remaining_seconds} Sekunden verbleiben") return True except Exception as e: logger.error(f"Fehler beim Pausieren des Timers '{self.name}': {str(e)}") return False def stop_timer(self) -> bool: """ Stoppt den Timer. """ try: self.status = "stopped" self.remaining_seconds = self.duration_seconds # Zurücksetzen self.updated_at = datetime.now() # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) logger.info(f"Timer '{self.name}' gestoppt und zurückgesetzt") return True except Exception as e: logger.error(f"Fehler beim Stoppen des Timers '{self.name}': {str(e)}") return False def reset_timer(self) -> bool: """ Setzt den Timer auf die ursprüngliche Dauer zurück. """ try: self.remaining_seconds = self.duration_seconds if self.status == "running": # Neu berechnen wenn laufend now = datetime.now() self.target_timestamp = now + timedelta(seconds=self.duration_seconds) self.updated_at = datetime.now() # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) logger.info(f"Timer '{self.name}' zurückgesetzt auf {self.duration_seconds} Sekunden") return True except Exception as e: logger.error(f"Fehler beim Zurücksetzen des Timers '{self.name}': {str(e)}") return False def extend_timer(self, additional_seconds: int) -> bool: """ Verlängert den Timer um zusätzliche Sekunden. """ try: if additional_seconds <= 0: return False self.duration_seconds += additional_seconds self.remaining_seconds += additional_seconds if self.status == "running": # Ziel-Zeitstempel aktualisieren self.target_timestamp = self.target_timestamp + timedelta(seconds=additional_seconds) self.updated_at = datetime.now() # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) logger.info(f"Timer '{self.name}' um {additional_seconds} Sekunden verlängert") return True except Exception as e: logger.error(f"Fehler beim Verlängern des Timers '{self.name}': {str(e)}") return False def force_quit_execute(self) -> bool: """ Führt die Force-Quit-Aktion aus. """ try: if not self.force_quit_enabled: logger.warning(f"Force-Quit für Timer '{self.name}' ist deaktiviert") return False self.status = "force_quit" self.force_quit_count += 1 self.updated_at = datetime.now() # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) logger.warning(f"Force-Quit für Timer '{self.name}' ausgeführt - Aktion: {self.force_quit_action}") return True except Exception as e: logger.error(f"Fehler beim Force-Quit des Timers '{self.name}': {str(e)}") return False def update_activity(self) -> bool: """ Aktualisiert die letzte Aktivität (für Session-Timer). """ try: self.last_activity = datetime.now() self.updated_at = datetime.now() # Cache invalidieren invalidate_model_cache("SystemTimer", self.id) return True except Exception as e: logger.error(f"Fehler beim Aktualisieren der Aktivität für Timer '{self.name}': {str(e)}") return False @classmethod def get_by_name(cls, name: str) -> Optional['SystemTimer']: """ Holt einen Timer anhand des Namens. """ cache_key = get_cache_key("SystemTimer", name, "by_name") cached_timer = get_cache(cache_key) if cached_timer is not None: return cached_timer with get_cached_session() as session: timer = session.query(cls).filter(cls.name == name).first() if timer: # Timer für 5 Minuten cachen set_cache(cache_key, timer, 300) return timer @classmethod def get_by_type(cls, timer_type: str) -> List['SystemTimer']: """ Holt alle Timer eines bestimmten Typs. """ cache_key = get_cache_key("SystemTimer", timer_type, "by_type") cached_timers = get_cache(cache_key) if cached_timers is not None: return cached_timers with get_cached_session() as session: timers = session.query(cls).filter(cls.timer_type == timer_type).all() # Timer für 2 Minuten cachen set_cache(cache_key, timers, 120) return timers @classmethod def get_running_timers(cls) -> List['SystemTimer']: """ Holt alle aktuell laufenden Timer. """ cache_key = get_cache_key("SystemTimer", "all", "running") cached_timers = get_cache(cache_key) if cached_timers is not None: return cached_timers with get_cached_session() as session: timers = session.query(cls).filter(cls.status == "running").all() # Nur 30 Sekunden cachen wegen sich ändernder Zeiten set_cache(cache_key, timers, 30) return timers @classmethod def get_expired_timers(cls) -> List['SystemTimer']: """ Holt alle abgelaufenen Timer die Force-Quit-Aktionen benötigen. """ with get_cached_session() as session: now = datetime.now() # Timer die laufen aber abgelaufen sind expired_timers = session.query(cls).filter( cls.status == "running", cls.target_timestamp <= now, cls.force_quit_enabled == True ).all() return expired_timers @classmethod def cleanup_expired_timers(cls) -> int: """ Bereinigt abgelaufene Timer und führt Force-Quit-Aktionen aus. """ try: expired_timers = cls.get_expired_timers() cleanup_count = 0 for timer in expired_timers: if timer.force_quit_execute(): cleanup_count += 1 if cleanup_count > 0: # Cache für alle Timer invalidieren clear_cache("SystemTimer") logger.info(f"Cleanup: {cleanup_count} abgelaufene Timer verarbeitet") return cleanup_count except Exception as e: logger.error(f"Fehler beim Cleanup abgelaufener Timer: {str(e)}") return 0 @classmethod def create_kiosk_timer(cls, duration_minutes: int = 30, auto_start: bool = True) -> Optional['SystemTimer']: """ Erstellt einen Standard-Kiosk-Timer. """ try: with get_cached_session() as session: # Prüfe ob bereits ein Kiosk-Timer existiert existing = session.query(cls).filter( cls.timer_type == "kiosk", cls.name == "kiosk_session" ).first() if existing: # Bestehenden Timer aktualisieren existing.duration_seconds = duration_minutes * 60 existing.remaining_seconds = duration_minutes * 60 existing.auto_start = auto_start existing.updated_at = datetime.now() if auto_start and existing.status != "running": existing.start_timer() # Cache invalidieren invalidate_model_cache("SystemTimer", existing.id) session.commit() return existing # Neuen Timer erstellen timer = cls( name="kiosk_session", timer_type="kiosk", duration_seconds=duration_minutes * 60, remaining_seconds=duration_minutes * 60, auto_start=auto_start, force_quit_enabled=True, force_quit_action="logout", force_quit_warning_seconds=30, show_warning=True, warning_message="Kiosk-Session läuft ab. Bitte speichern Sie Ihre Arbeit.", target_timestamp=datetime.now() + timedelta(minutes=duration_minutes) ) session.add(timer) session.commit() if auto_start: timer.start_timer() logger.info(f"Kiosk-Timer erstellt: {duration_minutes} Minuten") return timer except Exception as e: logger.error(f"Fehler beim Erstellen des Kiosk-Timers: {str(e)}") return None class PlugStatusLog(Base): """ Logging-System für Steckdosen-Status Monitoring. Protokolliert alle Zustandsänderungen der Smart Plugs (TAPO). """ __tablename__ = "plug_status_logs" id = Column(Integer, primary_key=True) printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False) status = Column(String(20), nullable=False) # 'connected', 'disconnected', 'on', 'off' timestamp = Column(DateTime, default=datetime.now, nullable=False) # Zusätzliche Monitoring-Daten ip_address = Column(String(50), nullable=True) # IP der Steckdose/des Druckers power_consumption = Column(Float, nullable=True) # Stromverbrauch in Watt (falls verfügbar) voltage = Column(Float, nullable=True) # Spannung in Volt (falls verfügbar) current = Column(Float, nullable=True) # Stromstärke in Ampere (falls verfügbar) # Monitoring-Kontext source = Column(String(50), default="system") # 'system', 'manual', 'api', 'scheduler' user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Bei manueller Änderung notes = Column(Text, nullable=True) # Zusätzliche Notizen oder Fehlerinfos # Technische Details response_time_ms = Column(Integer, nullable=True) # Antwortzeit der Steckdose in ms error_message = Column(Text, nullable=True) # Fehlermeldung bei Verbindungsproblemen firmware_version = Column(String(50), nullable=True) # Firmware-Version der Steckdose # Beziehungen printer = relationship("Printer", foreign_keys=[printer_id]) user = relationship("User", foreign_keys=[user_id]) def to_dict(self) -> dict: """ Konvertiert das PlugStatusLog-Objekt in ein Dictionary. """ cache_key = get_cache_key("PlugStatusLog", self.id, "dict") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result result = { "id": self.id, "printer_id": self.printer_id, "printer_name": self.printer.name if self.printer else None, "status": self.status, "timestamp": self.timestamp.isoformat() if self.timestamp else None, "ip_address": self.ip_address, "power_consumption": self.power_consumption, "voltage": self.voltage, "current": self.current, "source": self.source, "user_id": self.user_id, "user_name": self.user.name if self.user else None, "notes": self.notes, "response_time_ms": self.response_time_ms, "error_message": self.error_message, "firmware_version": self.firmware_version } # Ergebnis cachen (5 Minuten) set_cache(cache_key, result, 300) return result @classmethod def log_status_change(cls, printer_id: int, status: str, source: str = "system", user_id: int = None, ip_address: str = None, power_consumption: float = None, voltage: float = None, current: float = None, notes: str = None, response_time_ms: int = None, error_message: str = None, firmware_version: str = None) -> 'PlugStatusLog': """ Erstellt einen neuen Status-Log-Eintrag für eine Steckdose. Args: printer_id: ID des zugehörigen Druckers status: Status der Steckdose ('connected', 'disconnected', 'on', 'off') source: Quelle der Statusänderung ('system', 'manual', 'api', 'scheduler') user_id: ID des Benutzers (bei manueller Änderung) ip_address: IP-Adresse der Steckdose power_consumption: Stromverbrauch in Watt voltage: Spannung in Volt current: Stromstärke in Ampere notes: Zusätzliche Notizen response_time_ms: Antwortzeit in Millisekunden error_message: Fehlermeldung bei Problemen firmware_version: Firmware-Version der Steckdose Returns: Das erstellte PlugStatusLog-Objekt """ try: with get_cached_session() as session: log_entry = cls( printer_id=printer_id, status=status, ip_address=ip_address, power_consumption=power_consumption, voltage=voltage, current=current, source=source, user_id=user_id, notes=notes, response_time_ms=response_time_ms, error_message=error_message, firmware_version=firmware_version ) session.add(log_entry) session.commit() # Cache invalidieren invalidate_model_cache("PlugStatusLog") logger.info(f"Steckdosen-Status geloggt: Drucker {printer_id}, Status: {status}, Quelle: {source}") return log_entry except Exception as e: logger.error(f"Fehler beim Loggen des Steckdosen-Status: {str(e)}") raise e @classmethod def get_printer_history(cls, printer_id: int, hours: int = 24) -> List['PlugStatusLog']: """ Holt die Steckdosen-Historie für einen bestimmten Drucker. Args: printer_id: ID des Druckers hours: Anzahl der Stunden zurück (Standard: 24) Returns: Liste der PlugStatusLog-Einträge """ cache_key = get_cache_key("PlugStatusLog", printer_id, f"history_{hours}h") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result try: with get_cached_session() as session: cutoff_time = datetime.now() - timedelta(hours=hours) logs = session.query(cls)\ .filter(cls.printer_id == printer_id)\ .filter(cls.timestamp >= cutoff_time)\ .order_by(cls.timestamp.desc())\ .all() # Ergebnis cachen (10 Minuten) set_cache(cache_key, logs, 600) return logs except Exception as e: logger.error(f"Fehler beim Abrufen der Steckdosen-Historie: {str(e)}") return [] @classmethod def get_all_recent_logs(cls, hours: int = 24, limit: int = 1000) -> List['PlugStatusLog']: """ Holt alle aktuellen Steckdosen-Logs für die Administrator-Übersicht. Args: hours: Anzahl der Stunden zurück (Standard: 24) limit: Maximale Anzahl der Einträge (Standard: 1000) Returns: Liste der PlugStatusLog-Einträge """ cache_key = get_cache_key("PlugStatusLog", "all", f"recent_{hours}h_{limit}") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result try: with get_cached_session() as session: cutoff_time = datetime.now() - timedelta(hours=hours) logs = session.query(cls)\ .filter(cls.timestamp >= cutoff_time)\ .order_by(cls.timestamp.desc())\ .limit(limit)\ .all() # Ergebnis cachen (5 Minuten für Admin-Übersicht) set_cache(cache_key, logs, 300) return logs except Exception as e: logger.error(f"Fehler beim Abrufen der aktuellen Steckdosen-Logs: {str(e)}") return [] @classmethod def get_status_statistics(cls, hours: int = 24) -> Dict[str, Any]: """ Erstellt Statistiken über Steckdosen-Status für einen Zeitraum. Args: hours: Anzahl der Stunden zurück (Standard: 24) Returns: Dictionary mit Statistiken """ cache_key = get_cache_key("PlugStatusLog", "stats", f"{hours}h") cached_result = get_cache(cache_key) if cached_result is not None: return cached_result try: with get_cached_session() as session: cutoff_time = datetime.now() - timedelta(hours=hours) # Gesamtanzahl der Logs total_logs = session.query(cls)\ .filter(cls.timestamp >= cutoff_time)\ .count() # Status-Verteilung status_counts = session.query(cls.status, func.count(cls.id))\ .filter(cls.timestamp >= cutoff_time)\ .group_by(cls.status)\ .all() # Drucker mit den meisten Statusänderungen printer_counts = session.query(cls.printer_id, func.count(cls.id))\ .filter(cls.timestamp >= cutoff_time)\ .group_by(cls.printer_id)\ .order_by(func.count(cls.id).desc())\ .limit(10)\ .all() # Durchschnittliche Antwortzeit avg_response_time = session.query(func.avg(cls.response_time_ms))\ .filter(cls.timestamp >= cutoff_time)\ .filter(cls.response_time_ms.isnot(None))\ .scalar() # Fehlerrate error_count = session.query(cls)\ .filter(cls.timestamp >= cutoff_time)\ .filter(cls.error_message.isnot(None))\ .count() stats = { "total_logs": total_logs, "status_distribution": dict(status_counts), "top_printers": dict(printer_counts), "average_response_time_ms": float(avg_response_time) if avg_response_time else None, "error_count": error_count, "error_rate": (error_count / total_logs * 100) if total_logs > 0 else 0, "timeframe_hours": hours, "generated_at": datetime.now().isoformat() } # Ergebnis cachen (10 Minuten) set_cache(cache_key, stats, 600) return stats except Exception as e: logger.error(f"Fehler beim Erstellen der Steckdosen-Statistiken: {str(e)}") return { "total_logs": 0, "status_distribution": {}, "top_printers": {}, "average_response_time_ms": None, "error_count": 0, "error_rate": 0, "timeframe_hours": hours, "generated_at": datetime.now().isoformat(), "error": str(e) } @classmethod def cleanup_old_logs(cls, days: int = 30) -> int: """ Bereinigt alte Steckdosen-Logs (älter als X Tage). Args: days: Anzahl der Tage (Standard: 30) Returns: Anzahl der gelöschten Einträge """ try: with get_cached_session() as session: cutoff_date = datetime.now() - timedelta(days=days) deleted_count = session.query(cls)\ .filter(cls.timestamp < cutoff_date)\ .delete() session.commit() # Cache invalidieren invalidate_model_cache("PlugStatusLog") logger.info(f"Steckdosen-Logs bereinigt: {deleted_count} Einträge gelöscht (älter als {days} Tage)") return deleted_count except Exception as e: logger.error(f"Fehler beim Bereinigen der Steckdosen-Logs: {str(e)}") return 0 # ===== DATENBANK-INITIALISIERUNG MIT OPTIMIERUNGEN ===== def init_db() -> None: """Initialisiert die Datenbank und erstellt alle Tabellen mit Optimierungen.""" ensure_database_directory() engine = create_optimized_engine() # Tabellen erstellen Base.metadata.create_all(engine) # Indizes für bessere Performance erstellen with engine.connect() as conn: # Index für User-Login conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_users_username_email ON users(username, email) """)) # Index für Job-Status und Zeiten conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_jobs_status_times ON jobs(status, start_at, end_at) """)) # Index für Printer-Status conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_printers_status ON printers(status, active) """)) # Index für System-Logs conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_system_logs_timestamp ON system_logs(timestamp, level) """)) conn.commit() logger.info("Datenbank mit Optimierungen initialisiert") def init_database() -> None: """Alias für init_db() - initialisiert die Datenbank und erstellt alle Tabellen.""" init_db() def create_initial_admin(email: str = "admin@mercedes-benz.com", password: str = "744563017196A", name: str = "Administrator", username: str = "admin") -> bool: """ Erstellt einen initialen Admin-Benutzer, falls die Datenbank leer ist. Args: email: E-Mail-Adresse des Admins password: Passwort des Admins name: Name des Admins username: Benutzername des Admins Returns: bool: True, wenn der Admin erstellt wurde, False sonst """ try: with get_cached_session() as session: # Prüfen, ob der Admin bereits existiert admin = session.query(User).filter(User.email == email).first() if admin: # Admin existiert bereits, Passwort zurücksetzen admin.set_password(password) admin.role = "admin" # Sicherstellen, dass der Benutzer Admin-Rechte hat admin.active = True # Sicherstellen, dass der Account aktiv ist session.commit() logger.info(f"Admin-Benutzer {username} ({email}) existiert bereits. Passwort wurde zurückgesetzt.") return True # Admin erstellen, wenn er nicht existiert admin = User( email=email, username=username, name=name, role="admin", active=True ) admin.set_password(password) session.add(admin) session.commit() # Statistik-Eintrag anlegen, falls noch nicht vorhanden stats = session.query(Stats).first() if not stats: stats = Stats() session.add(stats) session.commit() logger.info(f"Admin-Benutzer {username} ({email}) wurde angelegt.") return True except Exception as e: logger.error(f"Fehler beim Erstellen des Admin-Benutzers: {str(e)}") return False # Engine für Export verfügbar machen def get_engine(): """Gibt die optimierte Datenbank-Engine zurück.""" return create_optimized_engine() # Engine-Variable für direkten Import engine = get_engine() # ===== CACHE-VERWALTUNG ===== def clear_model_cache(): """ Leert den Application-Level Cache für Modelle. Diese Funktion kann erweitert werden, um verschiedene Cache-Mechanismen zu unterstützen, wie z.B. SQLAlchemy Session Cache, Redis Cache, etc. """ try: # SQLAlchemy Session Cache leeren from sqlalchemy.orm import scoped_session if _scoped_session: _scoped_session.remove() # Weitere Cache-Clearing-Operationen hier hinzufügen # z.B. Redis Cache, Memcached, etc. return True except Exception as e: print(f"Fehler beim Leeren des Model-Cache: {str(e)}") return False