2025-06-04 10:03:22 +02:00

2033 lines
76 KiB
Python

import os
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float, event, text, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, Session, Mapped, mapped_column, scoped_session
from sqlalchemy.pool import StaticPool, QueuePool
from sqlalchemy.engine import Engine
from flask_login import UserMixin
import bcrypt
import secrets
from config.settings import DATABASE_PATH, ensure_database_directory
from utils.logging_config import get_logger
# ===== DATABASE CLEANUP INTEGRATION =====
# Importiere den neuen Cleanup-Manager
try:
from utils.database_cleanup import get_cleanup_manager
CLEANUP_MANAGER_AVAILABLE = True
except ImportError:
CLEANUP_MANAGER_AVAILABLE = False
logger = get_logger("app")
logger.warning("DatabaseCleanupManager nicht verfügbar - Fallback auf Legacy-Cleanup")
Base = declarative_base()
logger = get_logger("app")
# Thread-lokale Session-Factory für sichere Concurrent-Zugriffe
_session_factory = None
_scoped_session = None
_engine = None
_connection_pool_lock = threading.Lock()
# Cache für häufig abgerufene Daten
_cache = {}
_cache_lock = threading.Lock()
_cache_ttl = {} # Time-to-live für Cache-Einträge
# Alle exportierten Modelle
__all__ = ['User', 'Printer', 'Job', 'Stats', 'SystemLog', 'Base', 'GuestRequest', 'UserPermission', 'Notification', 'JobOrder', 'SystemTimer', 'PlugStatusLog', 'init_db', 'init_database', 'create_initial_admin', 'get_db_session', 'get_cached_session', 'clear_cache', 'engine']
# ===== DATENBANK-KONFIGURATION MIT WAL UND OPTIMIERUNGEN =====
def configure_sqlite_for_production(dbapi_connection, _connection_record):
"""
Konfiguriert SQLite für Produktionsumgebung mit WAL-Modus und Optimierungen.
"""
cursor = dbapi_connection.cursor()
# WAL-Modus aktivieren (Write-Ahead Logging)
cursor.execute("PRAGMA journal_mode=WAL")
# Synchronous-Modus für bessere Performance bei WAL
cursor.execute("PRAGMA synchronous=NORMAL")
# Cache-Größe erhöhen (in KB, negative Werte = KB)
cursor.execute("PRAGMA cache_size=-64000") # 64MB Cache
# Memory-mapped I/O aktivieren
cursor.execute("PRAGMA mmap_size=268435456") # 256MB
# Temp-Store im Memory
cursor.execute("PRAGMA temp_store=MEMORY")
# Optimierungen für bessere Performance
cursor.execute("PRAGMA optimize")
# Foreign Key Constraints aktivieren
cursor.execute("PRAGMA foreign_keys=ON")
# Auto-Vacuum für automatische Speicherbereinigung
cursor.execute("PRAGMA auto_vacuum=INCREMENTAL")
# Busy Timeout für Concurrent Access
cursor.execute("PRAGMA busy_timeout=30000") # 30 Sekunden
# Checkpoint-Intervall für WAL
cursor.execute("PRAGMA wal_autocheckpoint=1000")
# ===== RASPBERRY PI SPEZIFISCHE OPTIMIERUNGEN =====
# Reduzierte Cache-Größe für schwache Hardware
cursor.execute("PRAGMA cache_size=-32000") # 32MB statt 64MB für Pi
# Kleinere Memory-mapped I/O für SD-Karten
cursor.execute("PRAGMA mmap_size=134217728") # 128MB statt 256MB
# Weniger aggressive Vacuum-Einstellungen
cursor.execute("PRAGMA auto_vacuum=INCREMENTAL")
cursor.execute("PRAGMA incremental_vacuum(10)") # Nur 10 Seiten pro Mal
# Optimierungen für SD-Karten I/O
cursor.execute("PRAGMA page_size=4096") # Optimal für SD-Karten
cursor.execute("PRAGMA temp_store=MEMORY") # Temp im RAM
cursor.execute("PRAGMA locking_mode=NORMAL") # Normale Sperrung
# Query Planner Optimierung
cursor.execute("PRAGMA optimize=0x10002") # Aggressive Optimierung
# Reduzierte WAL-Datei-Größe für Pi
cursor.execute("PRAGMA journal_size_limit=32768000") # 32MB WAL-Limit
cursor.close()
logger.info("SQLite für Raspberry Pi optimiert (reduzierte Cache-Größe, SD-Karten I/O)")
def create_optimized_engine():
"""
Erstellt eine optimierte SQLite-Engine mit korrekten SQLite-spezifischen Parametern.
"""
global _engine
if _engine is not None:
return _engine
with _connection_pool_lock:
if _engine is not None:
return _engine
ensure_database_directory()
# Connection String mit optimierten Parametern
connection_string = f"sqlite:///{DATABASE_PATH}"
# Engine mit SQLite-spezifischen Parametern (ohne Server-DB Pool-Parameter)
_engine = create_engine(
connection_string,
# SQLite-spezifische Pool-Konfiguration
poolclass=StaticPool,
pool_pre_ping=True,
pool_recycle=7200, # Recycling-Zeit (für SQLite sinnvoll)
connect_args={
"check_same_thread": False,
"timeout": 45, # Längerer Timeout für SD-Karten
"isolation_level": None,
# Raspberry Pi spezifische SQLite-Einstellungen
"cached_statements": 100, # Reduzierte Statement-Cache
},
echo=False,
# Performance-optimierte Execution-Optionen für Pi
execution_options={
"autocommit": False,
"compiled_cache": {}, # Statement-Kompilierung cachen
}
# Entfernt: pool_size, max_overflow, pool_timeout (nicht für SQLite/StaticPool)
)
# Event-Listener für SQLite-Optimierungen
event.listen(_engine, "connect", configure_sqlite_for_production)
# Regelmäßige Wartungsaufgaben
event.listen(_engine, "connect", lambda conn, rec: schedule_maintenance())
# ===== CLEANUP MANAGER INTEGRATION =====
# Registriere Engine beim Cleanup-Manager für sicheres Shutdown
if CLEANUP_MANAGER_AVAILABLE:
try:
cleanup_manager = get_cleanup_manager()
cleanup_manager.register_engine(_engine)
logger.debug("Engine beim DatabaseCleanupManager registriert")
except Exception as e:
logger.warning(f"Fehler bei Cleanup-Manager-Registrierung: {e}")
logger.info(f"Optimierte SQLite-Engine erstellt: {DATABASE_PATH}")
return _engine
def schedule_maintenance():
"""
Plant regelmäßige Wartungsaufgaben für die Datenbank.
"""
def maintenance_worker():
time.sleep(300) # 5 Minuten warten
while True:
try:
with get_maintenance_session() as session:
# WAL-Checkpoint ausführen (aggressive Strategie)
checkpoint_result = session.execute(text("PRAGMA wal_checkpoint(TRUNCATE)")).fetchone()
# Nur loggen wenn tatsächlich Daten übertragen wurden
if checkpoint_result and checkpoint_result[1] > 0:
logger.info(f"WAL-Checkpoint: {checkpoint_result[1]} Seiten übertragen, {checkpoint_result[2]} Seiten zurückgesetzt")
# Statistiken aktualisieren (alle 30 Minuten)
session.execute(text("ANALYZE"))
# Incremental Vacuum (alle 60 Minuten)
session.execute(text("PRAGMA incremental_vacuum"))
session.commit()
except Exception as e:
logger.error(f"Fehler bei Datenbank-Wartung: {str(e)}")
# Warte 30 Minuten bis zur nächsten Wartung
time.sleep(1800)
# Wartung in separatem Thread ausführen
maintenance_thread = threading.Thread(target=maintenance_worker, daemon=True)
maintenance_thread.start()
def get_session_factory():
"""
Gibt die Thread-sichere Session-Factory zurück.
"""
global _session_factory, _scoped_session
if _session_factory is None:
with _connection_pool_lock:
if _session_factory is None:
engine = create_optimized_engine()
_session_factory = sessionmaker(
bind=engine,
autoflush=True,
autocommit=False,
expire_on_commit=False # Objekte nach Commit nicht expiren
)
_scoped_session = scoped_session(_session_factory)
return _scoped_session
@contextmanager
def get_maintenance_session():
"""
Context Manager für Wartungs-Sessions.
"""
engine = create_optimized_engine()
session = sessionmaker(bind=engine)()
try:
yield session
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# ===== CACHING-SYSTEM =====
def get_cache_key(model_class: str, identifier: Any, extra: str = "") -> str:
"""
Generiert einen Cache-Schlüssel.
"""
return f"{model_class}:{identifier}:{extra}"
def set_cache(key: str, value: Any, ttl_seconds: int = 300):
"""
Setzt einen Wert im Cache mit TTL.
"""
with _cache_lock:
_cache[key] = value
_cache_ttl[key] = time.time() + ttl_seconds
def get_cache(key: str) -> Optional[Any]:
"""
Holt einen Wert aus dem Cache.
"""
with _cache_lock:
if key in _cache:
if key in _cache_ttl and time.time() > _cache_ttl[key]:
# Cache-Eintrag abgelaufen
del _cache[key]
del _cache_ttl[key]
return None
return _cache[key]
return None
def clear_cache(pattern: str = None):
"""
Löscht Cache-Einträge (optional mit Pattern).
"""
with _cache_lock:
if pattern is None:
_cache.clear()
_cache_ttl.clear()
else:
keys_to_delete = [k for k in _cache.keys() if pattern in k]
for key in keys_to_delete:
del _cache[key]
if key in _cache_ttl:
del _cache_ttl[key]
def invalidate_model_cache(model_class: str, identifier: Any = None):
"""
Invalidiert Cache-Einträge für ein bestimmtes Modell.
"""
if identifier is not None:
pattern = f"{model_class}:{identifier}"
else:
pattern = f"{model_class}:"
clear_cache(pattern)
# ===== ERWEITERTE SESSION-VERWALTUNG =====
@contextmanager
def get_cached_session():
"""
Context Manager für gecachte Sessions mit automatischem Rollback.
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Datenbank-Transaktion fehlgeschlagen: {str(e)}")
raise e
finally:
session.close()
def get_db_session() -> Session:
"""
Gibt eine neue Datenbank-Session zurück (Legacy-Kompatibilität).
"""
session_factory = get_session_factory()
return session_factory()
# ===== MODELL-DEFINITIONEN =====
class User(UserMixin, Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(120), unique=True, nullable=False)
username = Column(String(100), unique=True, nullable=False) # Füge username hinzu für login
password_hash = Column(String(128), nullable=False)
name = Column(String(100), nullable=False)
role = Column(String(20), default="user") # "admin" oder "user"
active = Column(Boolean, default=True) # Für Flask-Login is_active
created_at = Column(DateTime, default=datetime.now)
last_login = Column(DateTime, nullable=True) # Letzter Login-Zeitstempel
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Automatische Aktualisierung
settings = Column(Text, nullable=True) # JSON-String für Benutzereinstellungen
last_activity = Column(DateTime, default=datetime.now)
# Zusätzliche Profil-Felder für bessere Benutzerverwaltung
department = Column(String(100), nullable=True) # Abteilung
position = Column(String(100), nullable=True) # Position/Rolle im Unternehmen
phone = Column(String(50), nullable=True) # Telefonnummer
bio = Column(Text, nullable=True) # Kurze Beschreibung/Bio
jobs = relationship("Job", back_populates="user", foreign_keys="Job.user_id", cascade="all, delete-orphan")
owned_jobs = relationship("Job", foreign_keys="Job.owner_id", overlaps="owner")
permissions = relationship("UserPermission", back_populates="user", uselist=False, cascade="all, delete-orphan")
notifications = relationship("Notification", back_populates="user", cascade="all, delete-orphan")
def set_password(self, password: str) -> None:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
# Cache invalidieren
invalidate_model_cache("User", self.id)
def check_password(self, password: str) -> bool:
password_bytes = password.encode('utf-8')
hash_bytes = self.password_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, hash_bytes)
@property
def is_admin(self) -> bool:
return self.role == "admin"
@property
def is_active(self) -> bool:
"""Required for Flask-Login"""
return self.active
def get_id(self) -> str:
"""Required for Flask-Login - return user id as unicode string"""
return str(self.id)
def to_dict(self) -> dict:
# Cache-Key für User-Dict
cache_key = get_cache_key("User", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"email": self.email,
"username": self.username,
"name": self.name,
"role": self.role,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_login": self.last_login.isoformat() if self.last_login else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"settings": self.settings,
"department": self.department,
"position": self.position,
"phone": self.phone,
"last_login": self.last_login.isoformat() if self.last_login else None
}
# Ergebnis cachen (5 Minuten)
set_cache(cache_key, result, 300)
return result
@classmethod
def get_by_username_or_email(cls, identifier: str) -> Optional['User']:
"""
Holt einen Benutzer anhand von Username oder E-Mail mit Caching.
"""
cache_key = get_cache_key("User", identifier, "login")
cached_user = get_cache(cache_key)
if cached_user is not None:
return cached_user
with get_cached_session() as session:
user = session.query(cls).filter(
(cls.username == identifier) | (cls.email == identifier)
).first()
if user:
# User für 10 Minuten cachen
set_cache(cache_key, user, 600)
return user
@classmethod
def get_by_id_cached(cls, user_id: int) -> Optional['User']:
"""
Holt einen Benutzer anhand der ID mit Caching.
"""
cache_key = get_cache_key("User", user_id, "id")
cached_user = get_cache(cache_key)
if cached_user is not None:
return cached_user
with get_cached_session() as session:
user = session.query(cls).filter(cls.id == user_id).first()
if user:
# User für 10 Minuten cachen
set_cache(cache_key, user, 600)
return user
def update_last_login(self):
"""
Aktualisiert den letzten Login-Zeitstempel.
"""
self.last_login = datetime.now()
# Cache invalidieren
invalidate_model_cache("User", self.id)
class Printer(Base):
__tablename__ = "printers"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
model = Column(String(100)) # Drucker-Modell
location = Column(String(100))
ip_address = Column(String(50)) # IP-Adresse des Druckers
mac_address = Column(String(50), nullable=False, unique=True)
plug_ip = Column(String(50), nullable=False)
plug_username = Column(String(100), nullable=False)
plug_password = Column(String(100), nullable=False)
status = Column(String(20), default="offline") # online, offline, busy, idle
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
last_checked = Column(DateTime, nullable=True) # Zeitstempel der letzten Status-Überprüfung
jobs = relationship("Job", back_populates="printer", cascade="all, delete-orphan")
def to_dict(self) -> dict:
# Cache-Key für Printer-Dict
cache_key = get_cache_key("Printer", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"model": self.model,
"location": self.location,
"ip_address": self.ip_address,
"mac_address": self.mac_address,
"plug_ip": self.plug_ip,
"status": self.status,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_checked": self.last_checked.isoformat() if self.last_checked else None
}
# Ergebnis cachen (2 Minuten für Drucker-Status)
set_cache(cache_key, result, 120)
return result
def update_status(self, new_status: str, active: bool = None):
"""
Aktualisiert den Drucker-Status und invalidiert den Cache.
"""
self.status = new_status
self.last_checked = datetime.now()
if active is not None:
self.active = active
# Cache invalidieren
invalidate_model_cache("Printer", self.id)
@classmethod
def get_all_cached(cls) -> List['Printer']:
"""
Holt alle Drucker mit Caching.
"""
cache_key = get_cache_key("Printer", "all", "list")
cached_printers = get_cache(cache_key)
if cached_printers is not None:
return cached_printers
with get_cached_session() as session:
printers = session.query(cls).all()
# Drucker für 5 Minuten cachen
set_cache(cache_key, printers, 300)
return printers
@classmethod
def get_online_printers(cls) -> List['Printer']:
"""
Holt alle online Drucker mit Caching.
"""
cache_key = get_cache_key("Printer", "online", "list")
cached_printers = get_cache(cache_key)
if cached_printers is not None:
return cached_printers
with get_cached_session() as session:
printers = session.query(cls).filter(
cls.status.in_(["online", "available", "idle"])
).all()
# Online-Drucker für 1 Minute cachen (häufiger aktualisiert)
set_cache(cache_key, printers, 60)
return printers
class Job(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
description = Column(String(500)) # Beschreibung des Jobs
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
start_at = Column(DateTime)
end_at = Column(DateTime)
actual_end_time = Column(DateTime)
status = Column(String(20), default="scheduled") # scheduled|running|finished|aborted
created_at = Column(DateTime, default=datetime.now)
notes = Column(String(500))
material_used = Column(Float) # in Gramm
file_path = Column(String(500), nullable=True)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=True)
duration_minutes = Column(Integer, nullable=False) # Dauer in Minuten
user = relationship("User", back_populates="jobs", foreign_keys=[user_id])
owner = relationship("User", foreign_keys=[owner_id], overlaps="owned_jobs")
printer = relationship("Printer", back_populates="jobs")
def to_dict(self) -> dict:
# Cache-Key für Job-Dict
cache_key = get_cache_key("Job", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"description": self.description,
"user_id": self.user_id,
"printer_id": self.printer_id,
"start_at": self.start_at.isoformat() if self.start_at else None,
"end_at": self.end_at.isoformat() if self.end_at else None,
"actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"notes": self.notes,
"material_used": self.material_used,
"file_path": self.file_path,
"owner_id": self.owner_id,
"duration_minutes": self.duration_minutes,
"user": self.user.to_dict() if self.user else None,
"printer": self.printer.to_dict() if self.printer else None
}
# Ergebnis cachen (3 Minuten für Jobs)
set_cache(cache_key, result, 180)
return result
def update_status(self, new_status: str):
"""
Aktualisiert den Job-Status und invalidiert den Cache.
"""
self.status = new_status
if new_status in ["finished", "failed", "cancelled"]:
self.actual_end_time = datetime.now()
# Cache invalidieren
invalidate_model_cache("Job", self.id)
# Auch User- und Printer-Caches invalidieren
invalidate_model_cache("User", self.user_id)
invalidate_model_cache("Printer", self.printer_id)
@classmethod
def get_active_jobs(cls) -> List['Job']:
"""
Holt alle aktiven Jobs mit Caching.
"""
cache_key = get_cache_key("Job", "active", "list")
cached_jobs = get_cache(cache_key)
if cached_jobs is not None:
return cached_jobs
with get_cached_session() as session:
jobs = session.query(cls).filter(
cls.status.in_(["scheduled", "running"])
).all()
# Aktive Jobs für 30 Sekunden cachen (häufig aktualisiert)
set_cache(cache_key, jobs, 30)
return jobs
@classmethod
def get_user_jobs(cls, user_id: int) -> List['Job']:
"""
Holt alle Jobs eines Benutzers mit Caching.
"""
cache_key = get_cache_key("Job", f"user_{user_id}", "list")
cached_jobs = get_cache(cache_key)
if cached_jobs is not None:
return cached_jobs
with get_cached_session() as session:
jobs = session.query(cls).filter(cls.user_id == user_id).all()
# Benutzer-Jobs für 5 Minuten cachen
set_cache(cache_key, jobs, 300)
return jobs
class Stats(Base):
__tablename__ = "stats"
id = Column(Integer, primary_key=True)
total_print_time = Column(Integer, default=0) # in Sekunden
total_jobs_completed = Column(Integer, default=0)
total_material_used = Column(Float, default=0.0) # in Gramm
last_updated = Column(DateTime, default=datetime.now)
def to_dict(self) -> dict:
# Cache-Key für Stats-Dict
cache_key = get_cache_key("Stats", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"total_print_time": self.total_print_time,
"total_jobs_completed": self.total_jobs_completed,
"total_material_used": self.total_material_used,
"last_updated": self.last_updated.isoformat() if self.last_updated else None
}
# Statistiken für 10 Minuten cachen
set_cache(cache_key, result, 600)
return result
class SystemLog(Base):
"""System-Log Modell für Logging von System-Events"""
__tablename__ = "system_logs"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False)
level = Column(String(20), nullable=False) # DEBUG, INFO, WARNING, ERROR, CRITICAL
message = Column(String(1000), nullable=False)
module = Column(String(100)) # Welches Modul/Blueprint den Log erstellt hat
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Optional: welcher User
ip_address = Column(String(50)) # Optional: IP-Adresse
user_agent = Column(String(500)) # Optional: Browser/Client Info
user = relationship("User", foreign_keys=[user_id])
def to_dict(self) -> dict:
return {
"id": self.id,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"level": self.level,
"message": self.message,
"module": self.module,
"user_id": self.user_id,
"ip_address": self.ip_address,
"user_agent": self.user_agent,
"user": self.user.to_dict() if self.user else None
}
@classmethod
def log_system_event(cls, level: str, message: str, module: str = None,
user_id: int = None, ip_address: str = None,
user_agent: str = None) -> 'SystemLog':
"""
Hilfsmethode zum Erstellen eines System-Log-Eintrags
Args:
level: Log-Level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
message: Log-Nachricht
module: Optional - Modul/Blueprint Name
user_id: Optional - Benutzer-ID
ip_address: Optional - IP-Adresse
user_agent: Optional - User-Agent String
Returns:
SystemLog: Das erstellte Log-Objekt
"""
return cls(
level=level.upper(),
message=message,
module=module,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent
)
class UserPermission(Base):
"""
Berechtigungen für Benutzer.
"""
__tablename__ = "user_permissions"
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
can_start_jobs = Column(Boolean, default=False)
needs_approval = Column(Boolean, default=True)
can_approve_jobs = Column(Boolean, default=False)
user = relationship("User", back_populates="permissions")
def to_dict(self) -> dict:
"""
Konvertiert die Benutzerberechtigungen in ein Dictionary.
"""
return {
"user_id": self.user_id,
"can_start_jobs": self.can_start_jobs,
"needs_approval": self.needs_approval,
"can_approve_jobs": self.can_approve_jobs
}
class Notification(Base):
"""
Benachrichtigungen für Benutzer.
"""
__tablename__ = "notifications"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
type = Column(String(50), nullable=False)
payload = Column(Text) # JSON-Daten als String
created_at = Column(DateTime, default=datetime.now)
read = Column(Boolean, default=False)
user = relationship("User", back_populates="notifications")
def to_dict(self) -> dict:
"""
Konvertiert die Benachrichtigung in ein Dictionary.
"""
return {
"id": self.id,
"user_id": self.user_id,
"type": self.type,
"payload": self.payload,
"created_at": self.created_at.isoformat() if self.created_at else None,
"read": self.read
}
@classmethod
def create_for_approvers(cls, notification_type: str, payload: dict):
"""
Erstellt Benachrichtigungen für alle Benutzer mit can_approve_jobs-Berechtigung.
Args:
notification_type: Art der Benachrichtigung
payload: Daten für die Benachrichtigung als Dictionary
"""
import json
payload_json = json.dumps(payload)
with get_cached_session() as session:
# Alle Benutzer mit can_approve_jobs-Berechtigung finden
approvers = session.query(User).join(UserPermission).filter(
UserPermission.can_approve_jobs == True
).all()
# Benachrichtigungen für alle Genehmiger erstellen
for approver in approvers:
notification = cls(
user_id=approver.id,
type=notification_type,
payload=payload_json
)
session.add(notification)
session.commit()
class GuestRequest(Base):
"""
Gastanfragen für Druckaufträge.
"""
__tablename__ = "guest_requests"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(120))
reason = Column(Text)
duration_min = Column(Integer) # Bestehend - wird für Backward-Kompatibilität beibehalten
duration_minutes = Column(Integer) # Neu hinzugefügt für API-Kompatibilität
created_at = Column(DateTime, default=datetime.now)
status = Column(String(20), default="pending") # pending|approved|denied
printer_id = Column(Integer, ForeignKey("printers.id"))
otp_code = Column(String(100), nullable=True) # Hash des OTP-Codes
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=True)
author_ip = Column(String(50))
otp_used_at = Column(DateTime, nullable=True) # Zeitpunkt der OTP-Verwendung
# Erweiterte Attribute für Datei-Management
file_name = Column(String(255), nullable=True) # Name der hochgeladenen Datei
file_path = Column(String(500), nullable=True) # Pfad zur hochgeladenen Datei
copies = Column(Integer, default=1) # Anzahl der Kopien
# Neue Felder für Admin-Verwaltung
processed_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der die Anfrage bearbeitet hat
processed_at = Column(DateTime, nullable=True) # Zeitpunkt der Bearbeitung
approval_notes = Column(Text, nullable=True) # Notizen bei Genehmigung
rejection_reason = Column(Text, nullable=True) # Grund bei Ablehnung
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Automatische Aktualisierung
# Zusätzliche Zeitstempel für bessere Verwaltung
approved_at = Column(DateTime, nullable=True) # Zeitpunkt der Genehmigung
rejected_at = Column(DateTime, nullable=True) # Zeitpunkt der Ablehnung
approved_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der genehmigt hat
rejected_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der abgelehnt hat
# OTP-Verwaltung erweitert
otp_expires_at = Column(DateTime, nullable=True) # Ablaufzeit des OTP-Codes
assigned_printer_id = Column(Integer, ForeignKey("printers.id"), nullable=True) # Zugewiesener Drucker
# Beziehungen
printer = relationship("Printer", foreign_keys=[printer_id])
assigned_printer = relationship("Printer", foreign_keys=[assigned_printer_id])
job = relationship("Job")
processed_by_user = relationship("User", foreign_keys=[processed_by]) # Admin der bearbeitet hat
approved_by_user = relationship("User", foreign_keys=[approved_by]) # Admin der genehmigt hat
rejected_by_user = relationship("User", foreign_keys=[rejected_by]) # Admin der abgelehnt hat
def to_dict(self) -> dict:
# Cache-Key für GuestRequest-Dict
cache_key = get_cache_key("GuestRequest", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"email": self.email,
"reason": self.reason,
"duration_min": self.duration_min,
"duration_minutes": self.duration_minutes,
"created_at": self.created_at.isoformat() if self.created_at else None,
"status": self.status,
"printer_id": self.printer_id,
"job_id": self.job_id,
"author_ip": self.author_ip,
"otp_used_at": self.otp_used_at.isoformat() if self.otp_used_at else None,
"file_name": self.file_name,
"file_path": self.file_path,
"copies": self.copies,
"processed_by": self.processed_by,
"processed_at": self.processed_at.isoformat() if self.processed_at else None,
"approval_notes": self.approval_notes,
"rejection_reason": self.rejection_reason,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"approved_at": self.approved_at.isoformat() if self.approved_at else None,
"rejected_at": self.rejected_at.isoformat() if self.rejected_at else None,
"approved_by": self.approved_by,
"rejected_by": self.rejected_by,
"otp_expires_at": self.otp_expires_at.isoformat() if self.otp_expires_at else None,
"assigned_printer_id": self.assigned_printer_id,
}
# Ergebnis cachen (5 Minuten)
set_cache(cache_key, result, 300)
return result
def generate_otp(self) -> str:
"""
Generiert einen neuen OTP-Code und speichert den Hash.
"""
otp_plain = secrets.token_hex(8) # 16-stelliger hexadezimaler Code
# Hash des OTP-Codes speichern
otp_bytes = otp_plain.encode('utf-8')
salt = bcrypt.gensalt()
self.otp_code = bcrypt.hashpw(otp_bytes, salt).decode('utf-8')
logger.info(f"OTP generiert für Guest Request {self.id}")
# Cache invalidieren
invalidate_model_cache("GuestRequest", self.id)
return otp_plain
def verify_otp(self, otp_plain: str) -> bool:
"""
Verifiziert einen OTP-Code.
"""
if not self.otp_code or not otp_plain:
return False
try:
otp_bytes = otp_plain.encode('utf-8')
hash_bytes = self.otp_code.encode('utf-8')
is_valid = bcrypt.checkpw(otp_bytes, hash_bytes)
if is_valid:
self.otp_used_at = datetime.now()
logger.info(f"OTP erfolgreich verifiziert für Guest Request {self.id}")
# Cache invalidieren
invalidate_model_cache("GuestRequest", self.id)
else:
logger.warning(f"Ungültiger OTP-Code für Guest Request {self.id}")
return is_valid
except Exception as e:
logger.error(f"Fehler bei OTP-Verifizierung: {str(e)}")
return False
class JobOrder(Base):
"""
Job-Reihenfolge für Drucker im Drag & Drop System.
Speichert die benutzerdefinierte Reihenfolge der Jobs pro Drucker.
"""
__tablename__ = "job_orders"
id = Column(Integer, primary_key=True)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=False)
order_position = Column(Integer, nullable=False) # Position in der Reihenfolge (0-basiert)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
last_modified_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Wer die Reihenfolge geändert hat
# Beziehungen
printer = relationship("Printer", foreign_keys=[printer_id])
job = relationship("Job", foreign_keys=[job_id])
modified_by_user = relationship("User", foreign_keys=[last_modified_by])
# Eindeutige Kombination: Ein Job kann nur eine Position pro Drucker haben
__table_args__ = (
# Hier könnten Constraints definiert werden
)
def to_dict(self) -> dict:
return {
"id": self.id,
"printer_id": self.printer_id,
"job_id": self.job_id,
"order_position": self.order_position,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"last_modified_by": self.last_modified_by,
"printer": self.printer.to_dict() if self.printer else None,
"job": self.job.to_dict() if self.job else None
}
@classmethod
def get_order_for_printer(cls, printer_id: int) -> List['JobOrder']:
"""
Holt die Job-Reihenfolge für einen bestimmten Drucker.
"""
cache_key = get_cache_key("JobOrder", printer_id, "printer_order")
cached_order = get_cache(cache_key)
if cached_order is not None:
return cached_order
with get_cached_session() as session:
order = session.query(cls).filter(
cls.printer_id == printer_id
).order_by(cls.order_position.asc()).all()
# Ergebnis für 5 Minuten cachen
set_cache(cache_key, order, 300)
return order
@classmethod
def update_printer_order(cls, printer_id: int, job_ids: List[int],
modified_by_user_id: int = None) -> bool:
"""
Aktualisiert die Job-Reihenfolge für einen Drucker.
"""
try:
with get_cached_session() as session:
# Bestehende Reihenfolge für diesen Drucker löschen
session.query(cls).filter(cls.printer_id == printer_id).delete()
# Neue Reihenfolge erstellen
for position, job_id in enumerate(job_ids):
order = cls(
printer_id=printer_id,
job_id=job_id,
order_position=position,
last_modified_by=modified_by_user_id
)
session.add(order)
session.commit()
# Cache invalidieren
invalidate_model_cache("JobOrder", printer_id)
return True
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der Job-Reihenfolge: {str(e)}")
return False
@classmethod
def get_ordered_job_ids(cls, printer_id: int) -> List[int]:
"""
Holt die geordneten Job-IDs für einen bestimmten Drucker.
"""
cache_key = get_cache_key("JobOrder", printer_id, "ordered_ids")
cached_ids = get_cache(cache_key)
if cached_ids is not None:
return cached_ids
orders = cls.get_order_for_printer(printer_id)
job_ids = [order.job_id for order in orders]
# Ergebnis für 5 Minuten cachen
set_cache(cache_key, job_ids, 300)
return job_ids
@classmethod
def remove_job_from_orders(cls, job_id: int):
"""
Entfernt einen Job aus allen Reihenfolgen (wenn Job gelöscht wird).
"""
try:
with get_cached_session() as session:
# Job aus allen Reihenfolgen entfernen
affected_printers = session.query(cls.printer_id).filter(
cls.job_id == job_id
).distinct().all()
session.query(cls).filter(cls.job_id == job_id).delete()
# Positionen neu arrangieren für betroffene Drucker
for (printer_id,) in affected_printers:
remaining_orders = session.query(cls).filter(
cls.printer_id == printer_id
).order_by(cls.order_position.asc()).all()
# Positionen neu vergeben
for new_position, order in enumerate(remaining_orders):
order.order_position = new_position
# Cache für diesen Drucker invalidieren
invalidate_model_cache("JobOrder", printer_id)
session.commit()
except Exception as e:
logger.error(f"Fehler beim Entfernen des Jobs aus Reihenfolgen: {str(e)}")
@classmethod
def cleanup_invalid_orders(cls):
"""
Bereinigt ungültige Reihenfolgen-Einträge (Jobs/Drucker die nicht mehr existieren).
"""
try:
with get_cached_session() as session:
# Finde Reihenfolgen mit nicht-existierenden Jobs
invalid_job_orders = session.query(cls).outerjoin(
Job, cls.job_id == Job.id
).filter(Job.id.is_(None)).all()
# Finde Reihenfolgen mit nicht-existierenden Druckern
invalid_printer_orders = session.query(cls).outerjoin(
Printer, cls.printer_id == Printer.id
).filter(Printer.id.is_(None)).all()
# Alle ungültigen Einträge löschen
for order in invalid_job_orders + invalid_printer_orders:
session.delete(order)
session.commit()
# Kompletten Cache leeren für Cleanup
clear_cache()
logger.info(f"Bereinigung: {len(invalid_job_orders + invalid_printer_orders)} ungültige Reihenfolgen-Einträge entfernt")
except Exception as e:
logger.error(f"Fehler bei der Bereinigung der Job-Reihenfolgen: {str(e)}")
class SystemTimer(Base):
"""
System-Timer für Countdown-Zähler mit Force-Quit-Funktionalität.
Unterstützt verschiedene Timer-Typen für Kiosk, Sessions, Jobs, etc.
"""
__tablename__ = "system_timers"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False) # Eindeutiger Name des Timers
timer_type = Column(String(50), nullable=False) # kiosk, session, job, system, maintenance
duration_seconds = Column(Integer, nullable=False) # Timer-Dauer in Sekunden
remaining_seconds = Column(Integer, nullable=False) # Verbleibende Sekunden
target_timestamp = Column(DateTime, nullable=False) # Ziel-Zeitstempel wann Timer abläuft
# Timer-Status und Kontrolle
status = Column(String(20), default="stopped") # stopped, running, paused, expired, force_quit
auto_start = Column(Boolean, default=False) # Automatischer Start nach Erstellung
auto_restart = Column(Boolean, default=False) # Automatischer Neustart nach Ablauf
# Force-Quit-Konfiguration
force_quit_enabled = Column(Boolean, default=True) # Force-Quit aktiviert
force_quit_action = Column(String(50), default="logout") # logout, restart, shutdown, custom
force_quit_warning_seconds = Column(Integer, default=30) # Warnung X Sekunden vor Force-Quit
# Zusätzliche Konfiguration
show_warning = Column(Boolean, default=True) # Warnung anzeigen
warning_message = Column(Text, nullable=True) # Benutzerdefinierte Warnung
custom_action_endpoint = Column(String(200), nullable=True) # Custom API-Endpoint für Force-Quit
# Verwaltung und Tracking
created_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Ersteller (optional für System-Timer)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
last_activity = Column(DateTime, default=datetime.now) # Letzte Aktivität (für Session-Timer)
# Kontext-spezifische Felder
context_id = Column(Integer, nullable=True) # Job-ID, Session-ID, etc.
context_data = Column(Text, nullable=True) # JSON-String für zusätzliche Kontext-Daten
# Statistiken
start_count = Column(Integer, default=0) # Wie oft wurde der Timer gestartet
force_quit_count = Column(Integer, default=0) # Wie oft wurde Force-Quit ausgeführt
# Beziehungen
created_by_user = relationship("User", foreign_keys=[created_by])
def to_dict(self) -> dict:
"""
Konvertiert den Timer zu einem Dictionary für API-Responses.
"""
# Cache-Key für Timer-Dict
cache_key = get_cache_key("SystemTimer", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
# Berechne aktuelle verbleibende Zeit
current_remaining = self.get_current_remaining_seconds()
result = {
"id": self.id,
"name": self.name,
"timer_type": self.timer_type,
"duration_seconds": self.duration_seconds,
"remaining_seconds": current_remaining,
"target_timestamp": self.target_timestamp.isoformat() if self.target_timestamp else None,
"status": self.status,
"auto_start": self.auto_start,
"auto_restart": self.auto_restart,
"force_quit_enabled": self.force_quit_enabled,
"force_quit_action": self.force_quit_action,
"force_quit_warning_seconds": self.force_quit_warning_seconds,
"show_warning": self.show_warning,
"warning_message": self.warning_message,
"custom_action_endpoint": self.custom_action_endpoint,
"created_by": self.created_by,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"last_activity": self.last_activity.isoformat() if self.last_activity else None,
"context_id": self.context_id,
"context_data": self.context_data,
"start_count": self.start_count,
"force_quit_count": self.force_quit_count,
# Berechnete Felder
"is_running": self.is_running(),
"is_expired": self.is_expired(),
"should_show_warning": self.should_show_warning(),
"progress_percentage": self.get_progress_percentage()
}
# Ergebnis für 10 Sekunden cachen (kurz wegen sich ändernder Zeit)
set_cache(cache_key, result, 10)
return result
def get_current_remaining_seconds(self) -> int:
"""
Berechnet die aktuell verbleibenden Sekunden basierend auf dem Ziel-Zeitstempel.
"""
if self.status != "running":
return self.remaining_seconds
now = datetime.now()
if now >= self.target_timestamp:
return 0
remaining = int((self.target_timestamp - now).total_seconds())
return max(0, remaining)
def is_running(self) -> bool:
"""
Prüft ob der Timer aktuell läuft.
"""
return self.status == "running"
def is_expired(self) -> bool:
"""
Prüft ob der Timer abgelaufen ist.
"""
return self.status == "expired" or self.get_current_remaining_seconds() <= 0
def should_show_warning(self) -> bool:
"""
Prüft ob eine Warnung angezeigt werden soll.
"""
if not self.show_warning or not self.is_running():
return False
remaining = self.get_current_remaining_seconds()
return remaining <= self.force_quit_warning_seconds and remaining > 0
def get_progress_percentage(self) -> float:
"""
Berechnet den Fortschritt in Prozent (0.0 bis 100.0).
"""
if self.duration_seconds <= 0:
return 100.0
elapsed = self.duration_seconds - self.get_current_remaining_seconds()
return min(100.0, max(0.0, (elapsed / self.duration_seconds) * 100.0))
def start_timer(self) -> bool:
"""
Startet den Timer.
"""
try:
if self.status == "running":
return True # Bereits laufend
now = datetime.now()
self.target_timestamp = now + timedelta(seconds=self.remaining_seconds)
self.status = "running"
self.last_activity = now
self.start_count += 1
self.updated_at = now
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
logger.info(f"Timer '{self.name}' gestartet - läuft für {self.remaining_seconds} Sekunden")
return True
except Exception as e:
logger.error(f"Fehler beim Starten des Timers '{self.name}': {str(e)}")
return False
def pause_timer(self) -> bool:
"""
Pausiert den Timer.
"""
try:
if self.status != "running":
return False
# Verbleibende Zeit aktualisieren
self.remaining_seconds = self.get_current_remaining_seconds()
self.status = "paused"
self.updated_at = datetime.now()
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
logger.info(f"Timer '{self.name}' pausiert - {self.remaining_seconds} Sekunden verbleiben")
return True
except Exception as e:
logger.error(f"Fehler beim Pausieren des Timers '{self.name}': {str(e)}")
return False
def stop_timer(self) -> bool:
"""
Stoppt den Timer.
"""
try:
self.status = "stopped"
self.remaining_seconds = self.duration_seconds # Zurücksetzen
self.updated_at = datetime.now()
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
logger.info(f"Timer '{self.name}' gestoppt und zurückgesetzt")
return True
except Exception as e:
logger.error(f"Fehler beim Stoppen des Timers '{self.name}': {str(e)}")
return False
def reset_timer(self) -> bool:
"""
Setzt den Timer auf die ursprüngliche Dauer zurück.
"""
try:
self.remaining_seconds = self.duration_seconds
if self.status == "running":
# Neu berechnen wenn laufend
now = datetime.now()
self.target_timestamp = now + timedelta(seconds=self.duration_seconds)
self.updated_at = datetime.now()
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
logger.info(f"Timer '{self.name}' zurückgesetzt auf {self.duration_seconds} Sekunden")
return True
except Exception as e:
logger.error(f"Fehler beim Zurücksetzen des Timers '{self.name}': {str(e)}")
return False
def extend_timer(self, additional_seconds: int) -> bool:
"""
Verlängert den Timer um zusätzliche Sekunden.
"""
try:
if additional_seconds <= 0:
return False
self.duration_seconds += additional_seconds
self.remaining_seconds += additional_seconds
if self.status == "running":
# Ziel-Zeitstempel aktualisieren
self.target_timestamp = self.target_timestamp + timedelta(seconds=additional_seconds)
self.updated_at = datetime.now()
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
logger.info(f"Timer '{self.name}' um {additional_seconds} Sekunden verlängert")
return True
except Exception as e:
logger.error(f"Fehler beim Verlängern des Timers '{self.name}': {str(e)}")
return False
def force_quit_execute(self) -> bool:
"""
Führt die Force-Quit-Aktion aus.
"""
try:
if not self.force_quit_enabled:
logger.warning(f"Force-Quit für Timer '{self.name}' ist deaktiviert")
return False
self.status = "force_quit"
self.force_quit_count += 1
self.updated_at = datetime.now()
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
logger.warning(f"Force-Quit für Timer '{self.name}' ausgeführt - Aktion: {self.force_quit_action}")
return True
except Exception as e:
logger.error(f"Fehler beim Force-Quit des Timers '{self.name}': {str(e)}")
return False
def update_activity(self) -> bool:
"""
Aktualisiert die letzte Aktivität (für Session-Timer).
"""
try:
self.last_activity = datetime.now()
self.updated_at = datetime.now()
# Cache invalidieren
invalidate_model_cache("SystemTimer", self.id)
return True
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der Aktivität für Timer '{self.name}': {str(e)}")
return False
@classmethod
def get_by_name(cls, name: str) -> Optional['SystemTimer']:
"""
Holt einen Timer anhand des Namens.
"""
cache_key = get_cache_key("SystemTimer", name, "by_name")
cached_timer = get_cache(cache_key)
if cached_timer is not None:
return cached_timer
with get_cached_session() as session:
timer = session.query(cls).filter(cls.name == name).first()
if timer:
# Timer für 5 Minuten cachen
set_cache(cache_key, timer, 300)
return timer
@classmethod
def get_by_type(cls, timer_type: str) -> List['SystemTimer']:
"""
Holt alle Timer eines bestimmten Typs.
"""
cache_key = get_cache_key("SystemTimer", timer_type, "by_type")
cached_timers = get_cache(cache_key)
if cached_timers is not None:
return cached_timers
with get_cached_session() as session:
timers = session.query(cls).filter(cls.timer_type == timer_type).all()
# Timer für 2 Minuten cachen
set_cache(cache_key, timers, 120)
return timers
@classmethod
def get_running_timers(cls) -> List['SystemTimer']:
"""
Holt alle aktuell laufenden Timer.
"""
cache_key = get_cache_key("SystemTimer", "all", "running")
cached_timers = get_cache(cache_key)
if cached_timers is not None:
return cached_timers
with get_cached_session() as session:
timers = session.query(cls).filter(cls.status == "running").all()
# Nur 30 Sekunden cachen wegen sich ändernder Zeiten
set_cache(cache_key, timers, 30)
return timers
@classmethod
def get_expired_timers(cls) -> List['SystemTimer']:
"""
Holt alle abgelaufenen Timer die Force-Quit-Aktionen benötigen.
"""
with get_cached_session() as session:
now = datetime.now()
# Timer die laufen aber abgelaufen sind
expired_timers = session.query(cls).filter(
cls.status == "running",
cls.target_timestamp <= now,
cls.force_quit_enabled == True
).all()
return expired_timers
@classmethod
def cleanup_expired_timers(cls) -> int:
"""
Bereinigt abgelaufene Timer und führt Force-Quit-Aktionen aus.
"""
try:
expired_timers = cls.get_expired_timers()
cleanup_count = 0
for timer in expired_timers:
if timer.force_quit_execute():
cleanup_count += 1
if cleanup_count > 0:
# Cache für alle Timer invalidieren
clear_cache("SystemTimer")
logger.info(f"Cleanup: {cleanup_count} abgelaufene Timer verarbeitet")
return cleanup_count
except Exception as e:
logger.error(f"Fehler beim Cleanup abgelaufener Timer: {str(e)}")
return 0
@classmethod
def create_kiosk_timer(cls, duration_minutes: int = 30, auto_start: bool = True) -> Optional['SystemTimer']:
"""
Erstellt einen Standard-Kiosk-Timer.
"""
try:
with get_cached_session() as session:
# Prüfe ob bereits ein Kiosk-Timer existiert
existing = session.query(cls).filter(
cls.timer_type == "kiosk",
cls.name == "kiosk_session"
).first()
if existing:
# Bestehenden Timer aktualisieren
existing.duration_seconds = duration_minutes * 60
existing.remaining_seconds = duration_minutes * 60
existing.auto_start = auto_start
existing.updated_at = datetime.now()
if auto_start and existing.status != "running":
existing.start_timer()
# Cache invalidieren
invalidate_model_cache("SystemTimer", existing.id)
session.commit()
return existing
# Neuen Timer erstellen
timer = cls(
name="kiosk_session",
timer_type="kiosk",
duration_seconds=duration_minutes * 60,
remaining_seconds=duration_minutes * 60,
auto_start=auto_start,
force_quit_enabled=True,
force_quit_action="logout",
force_quit_warning_seconds=30,
show_warning=True,
warning_message="Kiosk-Session läuft ab. Bitte speichern Sie Ihre Arbeit.",
target_timestamp=datetime.now() + timedelta(minutes=duration_minutes)
)
session.add(timer)
session.commit()
if auto_start:
timer.start_timer()
logger.info(f"Kiosk-Timer erstellt: {duration_minutes} Minuten")
return timer
except Exception as e:
logger.error(f"Fehler beim Erstellen des Kiosk-Timers: {str(e)}")
return None
class PlugStatusLog(Base):
"""
Logging-System für Steckdosen-Status Monitoring.
Protokolliert alle Zustandsänderungen der Smart Plugs (TAPO).
"""
__tablename__ = "plug_status_logs"
id = Column(Integer, primary_key=True)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
status = Column(String(20), nullable=False) # 'connected', 'disconnected', 'on', 'off'
timestamp = Column(DateTime, default=datetime.now, nullable=False)
# Zusätzliche Monitoring-Daten
ip_address = Column(String(50), nullable=True) # IP der Steckdose/des Druckers
power_consumption = Column(Float, nullable=True) # Stromverbrauch in Watt (falls verfügbar)
voltage = Column(Float, nullable=True) # Spannung in Volt (falls verfügbar)
current = Column(Float, nullable=True) # Stromstärke in Ampere (falls verfügbar)
# Monitoring-Kontext
source = Column(String(50), default="system") # 'system', 'manual', 'api', 'scheduler'
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Bei manueller Änderung
notes = Column(Text, nullable=True) # Zusätzliche Notizen oder Fehlerinfos
# Technische Details
response_time_ms = Column(Integer, nullable=True) # Antwortzeit der Steckdose in ms
error_message = Column(Text, nullable=True) # Fehlermeldung bei Verbindungsproblemen
firmware_version = Column(String(50), nullable=True) # Firmware-Version der Steckdose
# Beziehungen
printer = relationship("Printer", foreign_keys=[printer_id])
user = relationship("User", foreign_keys=[user_id])
def to_dict(self) -> dict:
"""
Konvertiert das PlugStatusLog-Objekt in ein Dictionary.
"""
cache_key = get_cache_key("PlugStatusLog", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"printer_id": self.printer_id,
"printer_name": self.printer.name if self.printer else None,
"status": self.status,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"ip_address": self.ip_address,
"power_consumption": self.power_consumption,
"voltage": self.voltage,
"current": self.current,
"source": self.source,
"user_id": self.user_id,
"user_name": self.user.name if self.user else None,
"notes": self.notes,
"response_time_ms": self.response_time_ms,
"error_message": self.error_message,
"firmware_version": self.firmware_version
}
# Ergebnis cachen (5 Minuten)
set_cache(cache_key, result, 300)
return result
@classmethod
def log_status_change(cls, printer_id: int, status: str, source: str = "system",
user_id: int = None, ip_address: str = None,
power_consumption: float = None, voltage: float = None,
current: float = None, notes: str = None,
response_time_ms: int = None, error_message: str = None,
firmware_version: str = None) -> 'PlugStatusLog':
"""
Erstellt einen neuen Status-Log-Eintrag für eine Steckdose.
Args:
printer_id: ID des zugehörigen Druckers
status: Status der Steckdose ('connected', 'disconnected', 'on', 'off')
source: Quelle der Statusänderung ('system', 'manual', 'api', 'scheduler')
user_id: ID des Benutzers (bei manueller Änderung)
ip_address: IP-Adresse der Steckdose
power_consumption: Stromverbrauch in Watt
voltage: Spannung in Volt
current: Stromstärke in Ampere
notes: Zusätzliche Notizen
response_time_ms: Antwortzeit in Millisekunden
error_message: Fehlermeldung bei Problemen
firmware_version: Firmware-Version der Steckdose
Returns:
Das erstellte PlugStatusLog-Objekt
"""
try:
with get_cached_session() as session:
log_entry = cls(
printer_id=printer_id,
status=status,
ip_address=ip_address,
power_consumption=power_consumption,
voltage=voltage,
current=current,
source=source,
user_id=user_id,
notes=notes,
response_time_ms=response_time_ms,
error_message=error_message,
firmware_version=firmware_version
)
session.add(log_entry)
session.commit()
# Cache invalidieren
invalidate_model_cache("PlugStatusLog")
logger.info(f"Steckdosen-Status geloggt: Drucker {printer_id}, Status: {status}, Quelle: {source}")
return log_entry
except Exception as e:
logger.error(f"Fehler beim Loggen des Steckdosen-Status: {str(e)}")
raise e
@classmethod
def get_printer_history(cls, printer_id: int, hours: int = 24) -> List['PlugStatusLog']:
"""
Holt die Steckdosen-Historie für einen bestimmten Drucker.
Args:
printer_id: ID des Druckers
hours: Anzahl der Stunden zurück (Standard: 24)
Returns:
Liste der PlugStatusLog-Einträge
"""
cache_key = get_cache_key("PlugStatusLog", printer_id, f"history_{hours}h")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
try:
with get_cached_session() as session:
cutoff_time = datetime.now() - timedelta(hours=hours)
logs = session.query(cls)\
.filter(cls.printer_id == printer_id)\
.filter(cls.timestamp >= cutoff_time)\
.order_by(cls.timestamp.desc())\
.all()
# Ergebnis cachen (10 Minuten)
set_cache(cache_key, logs, 600)
return logs
except Exception as e:
logger.error(f"Fehler beim Abrufen der Steckdosen-Historie: {str(e)}")
return []
@classmethod
def get_all_recent_logs(cls, hours: int = 24, limit: int = 1000) -> List['PlugStatusLog']:
"""
Holt alle aktuellen Steckdosen-Logs für die Administrator-Übersicht.
Args:
hours: Anzahl der Stunden zurück (Standard: 24)
limit: Maximale Anzahl der Einträge (Standard: 1000)
Returns:
Liste der PlugStatusLog-Einträge
"""
cache_key = get_cache_key("PlugStatusLog", "all", f"recent_{hours}h_{limit}")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
try:
with get_cached_session() as session:
cutoff_time = datetime.now() - timedelta(hours=hours)
logs = session.query(cls)\
.filter(cls.timestamp >= cutoff_time)\
.order_by(cls.timestamp.desc())\
.limit(limit)\
.all()
# Ergebnis cachen (5 Minuten für Admin-Übersicht)
set_cache(cache_key, logs, 300)
return logs
except Exception as e:
logger.error(f"Fehler beim Abrufen der aktuellen Steckdosen-Logs: {str(e)}")
return []
@classmethod
def get_status_statistics(cls, hours: int = 24) -> Dict[str, Any]:
"""
Erstellt Statistiken über Steckdosen-Status für einen Zeitraum.
Args:
hours: Anzahl der Stunden zurück (Standard: 24)
Returns:
Dictionary mit Statistiken
"""
cache_key = get_cache_key("PlugStatusLog", "stats", f"{hours}h")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
try:
with get_cached_session() as session:
cutoff_time = datetime.now() - timedelta(hours=hours)
# Gesamtanzahl der Logs
total_logs = session.query(cls)\
.filter(cls.timestamp >= cutoff_time)\
.count()
# Status-Verteilung
status_counts = session.query(cls.status, func.count(cls.id))\
.filter(cls.timestamp >= cutoff_time)\
.group_by(cls.status)\
.all()
# Drucker mit den meisten Statusänderungen
printer_counts = session.query(cls.printer_id, func.count(cls.id))\
.filter(cls.timestamp >= cutoff_time)\
.group_by(cls.printer_id)\
.order_by(func.count(cls.id).desc())\
.limit(10)\
.all()
# Durchschnittliche Antwortzeit
avg_response_time = session.query(func.avg(cls.response_time_ms))\
.filter(cls.timestamp >= cutoff_time)\
.filter(cls.response_time_ms.isnot(None))\
.scalar()
# Fehlerrate
error_count = session.query(cls)\
.filter(cls.timestamp >= cutoff_time)\
.filter(cls.error_message.isnot(None))\
.count()
stats = {
"total_logs": total_logs,
"status_distribution": dict(status_counts),
"top_printers": dict(printer_counts),
"average_response_time_ms": float(avg_response_time) if avg_response_time else None,
"error_count": error_count,
"error_rate": (error_count / total_logs * 100) if total_logs > 0 else 0,
"timeframe_hours": hours,
"generated_at": datetime.now().isoformat()
}
# Ergebnis cachen (10 Minuten)
set_cache(cache_key, stats, 600)
return stats
except Exception as e:
logger.error(f"Fehler beim Erstellen der Steckdosen-Statistiken: {str(e)}")
return {
"total_logs": 0,
"status_distribution": {},
"top_printers": {},
"average_response_time_ms": None,
"error_count": 0,
"error_rate": 0,
"timeframe_hours": hours,
"generated_at": datetime.now().isoformat(),
"error": str(e)
}
@classmethod
def cleanup_old_logs(cls, days: int = 30) -> int:
"""
Bereinigt alte Steckdosen-Logs (älter als X Tage).
Args:
days: Anzahl der Tage (Standard: 30)
Returns:
Anzahl der gelöschten Einträge
"""
try:
with get_cached_session() as session:
cutoff_date = datetime.now() - timedelta(days=days)
deleted_count = session.query(cls)\
.filter(cls.timestamp < cutoff_date)\
.delete()
session.commit()
# Cache invalidieren
invalidate_model_cache("PlugStatusLog")
logger.info(f"Steckdosen-Logs bereinigt: {deleted_count} Einträge gelöscht (älter als {days} Tage)")
return deleted_count
except Exception as e:
logger.error(f"Fehler beim Bereinigen der Steckdosen-Logs: {str(e)}")
return 0
# ===== DATENBANK-INITIALISIERUNG MIT OPTIMIERUNGEN =====
def init_db() -> None:
"""Initialisiert die Datenbank und erstellt alle Tabellen mit Optimierungen."""
ensure_database_directory()
engine = create_optimized_engine()
# Tabellen erstellen
Base.metadata.create_all(engine)
# Indizes für bessere Performance erstellen
with engine.connect() as conn:
# Index für User-Login
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_users_username_email
ON users(username, email)
"""))
# Index für Job-Status und Zeiten
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_jobs_status_times
ON jobs(status, start_at, end_at)
"""))
# Index für Printer-Status
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_printers_status
ON printers(status, active)
"""))
# Index für System-Logs
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_system_logs_timestamp
ON system_logs(timestamp, level)
"""))
conn.commit()
logger.info("Datenbank mit Optimierungen initialisiert")
def init_database() -> None:
"""Alias für init_db() - initialisiert die Datenbank und erstellt alle Tabellen."""
init_db()
def create_initial_admin(email: str = "admin@mercedes-benz.com", password: str = "744563017196A", name: str = "Administrator", username: str = "admin") -> bool:
"""
Erstellt einen initialen Admin-Benutzer, falls die Datenbank leer ist.
Args:
email: E-Mail-Adresse des Admins
password: Passwort des Admins
name: Name des Admins
username: Benutzername des Admins
Returns:
bool: True, wenn der Admin erstellt wurde, False sonst
"""
try:
with get_cached_session() as session:
# Prüfen, ob der Admin bereits existiert
admin = session.query(User).filter(User.email == email).first()
if admin:
# Admin existiert bereits, Passwort zurücksetzen
admin.set_password(password)
admin.role = "admin" # Sicherstellen, dass der Benutzer Admin-Rechte hat
admin.active = True # Sicherstellen, dass der Account aktiv ist
session.commit()
logger.info(f"Admin-Benutzer {username} ({email}) existiert bereits. Passwort wurde zurückgesetzt.")
return True
# Admin erstellen, wenn er nicht existiert
admin = User(
email=email,
username=username,
name=name,
role="admin",
active=True
)
admin.set_password(password)
session.add(admin)
session.commit()
# Statistik-Eintrag anlegen, falls noch nicht vorhanden
stats = session.query(Stats).first()
if not stats:
stats = Stats()
session.add(stats)
session.commit()
logger.info(f"Admin-Benutzer {username} ({email}) wurde angelegt.")
return True
except Exception as e:
logger.error(f"Fehler beim Erstellen des Admin-Benutzers: {str(e)}")
return False
# Engine für Export verfügbar machen
def get_engine():
"""Gibt die optimierte Datenbank-Engine zurück."""
return create_optimized_engine()
# Engine-Variable für direkten Import
engine = get_engine()
# ===== CACHE-VERWALTUNG =====
def clear_model_cache():
"""
Leert den Application-Level Cache für Modelle.
Diese Funktion kann erweitert werden, um verschiedene Cache-Mechanismen
zu unterstützen, wie z.B. SQLAlchemy Session Cache, Redis Cache, etc.
"""
try:
# SQLAlchemy Session Cache leeren
from sqlalchemy.orm import scoped_session
if _scoped_session:
_scoped_session.remove()
# Weitere Cache-Clearing-Operationen hier hinzufügen
# z.B. Redis Cache, Memcached, etc.
return True
except Exception as e:
print(f"Fehler beim Leeren des Model-Cache: {str(e)}")
return False