1297 lines
48 KiB
Python

import os
import logging
import threading
import time
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float, event, text, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, Session, Mapped, mapped_column, scoped_session
from sqlalchemy.pool import StaticPool, QueuePool
from sqlalchemy.engine import Engine
from flask_login import UserMixin
import bcrypt
import secrets
from config.settings import DATABASE_PATH, ensure_database_directory
from utils.logging_config import get_logger
# ===== DATABASE CLEANUP INTEGRATION =====
# Importiere den neuen Cleanup-Manager
try:
from utils.database_cleanup import get_cleanup_manager
CLEANUP_MANAGER_AVAILABLE = True
except ImportError:
CLEANUP_MANAGER_AVAILABLE = False
logger = get_logger("app")
logger.warning("DatabaseCleanupManager nicht verfügbar - Fallback auf Legacy-Cleanup")
Base = declarative_base()
logger = get_logger("app")
# Thread-lokale Session-Factory für sichere Concurrent-Zugriffe
_session_factory = None
_scoped_session = None
_engine = None
_connection_pool_lock = threading.Lock()
# Cache für häufig abgerufene Daten
_cache = {}
_cache_lock = threading.Lock()
_cache_ttl = {} # Time-to-live für Cache-Einträge
# Alle exportierten Modelle
__all__ = ['User', 'Printer', 'Job', 'Stats', 'SystemLog', 'Base', 'GuestRequest', 'UserPermission', 'Notification', 'JobOrder', 'init_db', 'init_database', 'create_initial_admin', 'get_db_session', 'get_cached_session', 'clear_cache', 'engine']
# ===== DATENBANK-KONFIGURATION MIT WAL UND OPTIMIERUNGEN =====
def configure_sqlite_for_production(dbapi_connection, _connection_record):
"""
Konfiguriert SQLite für Produktionsumgebung mit WAL-Modus und Optimierungen.
"""
cursor = dbapi_connection.cursor()
# WAL-Modus aktivieren (Write-Ahead Logging)
cursor.execute("PRAGMA journal_mode=WAL")
# Synchronous-Modus für bessere Performance bei WAL
cursor.execute("PRAGMA synchronous=NORMAL")
# Cache-Größe erhöhen (in KB, negative Werte = KB)
cursor.execute("PRAGMA cache_size=-64000") # 64MB Cache
# Memory-mapped I/O aktivieren
cursor.execute("PRAGMA mmap_size=268435456") # 256MB
# Temp-Store im Memory
cursor.execute("PRAGMA temp_store=MEMORY")
# Optimierungen für bessere Performance
cursor.execute("PRAGMA optimize")
# Foreign Key Constraints aktivieren
cursor.execute("PRAGMA foreign_keys=ON")
# Auto-Vacuum für automatische Speicherbereinigung
cursor.execute("PRAGMA auto_vacuum=INCREMENTAL")
# Busy Timeout für Concurrent Access
cursor.execute("PRAGMA busy_timeout=30000") # 30 Sekunden
# Checkpoint-Intervall für WAL
cursor.execute("PRAGMA wal_autocheckpoint=1000")
cursor.close()
logger.info("SQLite für Produktionsumgebung konfiguriert (WAL-Modus, Cache, Optimierungen)")
def create_optimized_engine():
"""
Erstellt eine optimierte SQLite-Engine mit Connection Pooling und WAL-Modus.
"""
global _engine
if _engine is not None:
return _engine
with _connection_pool_lock:
if _engine is not None:
return _engine
ensure_database_directory()
# Connection String mit optimierten Parametern
connection_string = f"sqlite:///{DATABASE_PATH}"
# Engine mit Connection Pooling erstellen
_engine = create_engine(
connection_string,
# Connection Pool Konfiguration
poolclass=StaticPool,
pool_pre_ping=True, # Verbindungen vor Nutzung testen
pool_recycle=3600, # Verbindungen nach 1 Stunde erneuern
connect_args={
"check_same_thread": False, # Für Multi-Threading
"timeout": 30, # Connection Timeout
"isolation_level": None # Autocommit-Modus für bessere Kontrolle
},
# Echo für Debugging (in Produktion ausschalten)
echo=False,
# Weitere Optimierungen
execution_options={
"autocommit": False
}
)
# Event-Listener für SQLite-Optimierungen
event.listen(_engine, "connect", configure_sqlite_for_production)
# Regelmäßige Wartungsaufgaben
event.listen(_engine, "connect", lambda conn, rec: schedule_maintenance())
# ===== CLEANUP MANAGER INTEGRATION =====
# Registriere Engine beim Cleanup-Manager für sicheres Shutdown
if CLEANUP_MANAGER_AVAILABLE:
try:
cleanup_manager = get_cleanup_manager()
cleanup_manager.register_engine(_engine)
logger.debug("Engine beim DatabaseCleanupManager registriert")
except Exception as e:
logger.warning(f"Fehler bei Cleanup-Manager-Registrierung: {e}")
logger.info(f"Optimierte SQLite-Engine erstellt: {DATABASE_PATH}")
return _engine
def schedule_maintenance():
"""
Plant regelmäßige Wartungsaufgaben für die Datenbank.
"""
def maintenance_worker():
time.sleep(300) # 5 Minuten warten
while True:
try:
with get_maintenance_session() as session:
# WAL-Checkpoint ausführen (aggressive Strategie)
checkpoint_result = session.execute(text("PRAGMA wal_checkpoint(TRUNCATE)")).fetchone()
# Nur loggen wenn tatsächlich Daten übertragen wurden
if checkpoint_result and checkpoint_result[1] > 0:
logger.info(f"WAL-Checkpoint: {checkpoint_result[1]} Seiten übertragen, {checkpoint_result[2]} Seiten zurückgesetzt")
# Statistiken aktualisieren (alle 30 Minuten)
session.execute(text("ANALYZE"))
# Incremental Vacuum (alle 60 Minuten)
session.execute(text("PRAGMA incremental_vacuum"))
session.commit()
except Exception as e:
logger.error(f"Fehler bei Datenbank-Wartung: {str(e)}")
# Warte 30 Minuten bis zur nächsten Wartung
time.sleep(1800)
# Wartung in separatem Thread ausführen
maintenance_thread = threading.Thread(target=maintenance_worker, daemon=True)
maintenance_thread.start()
def get_session_factory():
"""
Gibt die Thread-sichere Session-Factory zurück.
"""
global _session_factory, _scoped_session
if _session_factory is None:
with _connection_pool_lock:
if _session_factory is None:
engine = create_optimized_engine()
_session_factory = sessionmaker(
bind=engine,
autoflush=True,
autocommit=False,
expire_on_commit=False # Objekte nach Commit nicht expiren
)
_scoped_session = scoped_session(_session_factory)
return _scoped_session
@contextmanager
def get_maintenance_session():
"""
Context Manager für Wartungs-Sessions.
"""
engine = create_optimized_engine()
session = sessionmaker(bind=engine)()
try:
yield session
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# ===== CACHING-SYSTEM =====
def get_cache_key(model_class: str, identifier: Any, extra: str = "") -> str:
"""
Generiert einen Cache-Schlüssel.
"""
return f"{model_class}:{identifier}:{extra}"
def set_cache(key: str, value: Any, ttl_seconds: int = 300):
"""
Setzt einen Wert im Cache mit TTL.
"""
with _cache_lock:
_cache[key] = value
_cache_ttl[key] = time.time() + ttl_seconds
def get_cache(key: str) -> Optional[Any]:
"""
Holt einen Wert aus dem Cache.
"""
with _cache_lock:
if key in _cache:
if key in _cache_ttl and time.time() > _cache_ttl[key]:
# Cache-Eintrag abgelaufen
del _cache[key]
del _cache_ttl[key]
return None
return _cache[key]
return None
def clear_cache(pattern: str = None):
"""
Löscht Cache-Einträge (optional mit Pattern).
"""
with _cache_lock:
if pattern is None:
_cache.clear()
_cache_ttl.clear()
else:
keys_to_delete = [k for k in _cache.keys() if pattern in k]
for key in keys_to_delete:
del _cache[key]
if key in _cache_ttl:
del _cache_ttl[key]
def invalidate_model_cache(model_class: str, identifier: Any = None):
"""
Invalidiert Cache-Einträge für ein bestimmtes Modell.
"""
if identifier is not None:
pattern = f"{model_class}:{identifier}"
else:
pattern = f"{model_class}:"
clear_cache(pattern)
# ===== ERWEITERTE SESSION-VERWALTUNG =====
@contextmanager
def get_cached_session():
"""
Context Manager für gecachte Sessions mit automatischem Rollback.
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Datenbank-Transaktion fehlgeschlagen: {str(e)}")
raise e
finally:
session.close()
def get_db_session() -> Session:
"""
Gibt eine neue Datenbank-Session zurück (Legacy-Kompatibilität).
"""
session_factory = get_session_factory()
return session_factory()
# ===== MODELL-DEFINITIONEN =====
class User(UserMixin, Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(120), unique=True, nullable=False)
username = Column(String(100), unique=True, nullable=False) # Füge username hinzu für login
password_hash = Column(String(128), nullable=False)
name = Column(String(100), nullable=False)
role = Column(String(20), default="user") # "admin" oder "user"
active = Column(Boolean, default=True) # Für Flask-Login is_active
created_at = Column(DateTime, default=datetime.now)
last_login = Column(DateTime, nullable=True) # Letzter Login-Zeitstempel
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Automatische Aktualisierung
settings = Column(Text, nullable=True) # JSON-String für Benutzereinstellungen
last_activity = Column(DateTime, default=datetime.now)
# Zusätzliche Profil-Felder für bessere Benutzerverwaltung
department = Column(String(100), nullable=True) # Abteilung
position = Column(String(100), nullable=True) # Position/Rolle im Unternehmen
phone = Column(String(50), nullable=True) # Telefonnummer
bio = Column(Text, nullable=True) # Kurze Beschreibung/Bio
jobs = relationship("Job", back_populates="user", foreign_keys="Job.user_id", cascade="all, delete-orphan")
owned_jobs = relationship("Job", foreign_keys="Job.owner_id", overlaps="owner")
permissions = relationship("UserPermission", back_populates="user", uselist=False, cascade="all, delete-orphan")
notifications = relationship("Notification", back_populates="user", cascade="all, delete-orphan")
def set_password(self, password: str) -> None:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
# Cache invalidieren
invalidate_model_cache("User", self.id)
def check_password(self, password: str) -> bool:
password_bytes = password.encode('utf-8')
hash_bytes = self.password_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, hash_bytes)
@property
def is_admin(self) -> bool:
return self.role == "admin"
@property
def is_active(self) -> bool:
"""Required for Flask-Login"""
return self.active
def get_id(self) -> str:
"""Required for Flask-Login - return user id as unicode string"""
return str(self.id)
def to_dict(self) -> dict:
# Cache-Key für User-Dict
cache_key = get_cache_key("User", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"email": self.email,
"username": self.username,
"name": self.name,
"role": self.role,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_login": self.last_login.isoformat() if self.last_login else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"settings": self.settings,
"department": self.department,
"position": self.position,
"phone": self.phone,
"last_login": self.last_login.isoformat() if self.last_login else None
}
# Ergebnis cachen (5 Minuten)
set_cache(cache_key, result, 300)
return result
@classmethod
def get_by_username_or_email(cls, identifier: str) -> Optional['User']:
"""
Holt einen Benutzer anhand von Username oder E-Mail mit Caching.
"""
cache_key = get_cache_key("User", identifier, "login")
cached_user = get_cache(cache_key)
if cached_user is not None:
return cached_user
with get_cached_session() as session:
user = session.query(cls).filter(
(cls.username == identifier) | (cls.email == identifier)
).first()
if user:
# User für 10 Minuten cachen
set_cache(cache_key, user, 600)
return user
@classmethod
def get_by_id_cached(cls, user_id: int) -> Optional['User']:
"""
Holt einen Benutzer anhand der ID mit Caching.
"""
cache_key = get_cache_key("User", user_id, "id")
cached_user = get_cache(cache_key)
if cached_user is not None:
return cached_user
with get_cached_session() as session:
user = session.query(cls).filter(cls.id == user_id).first()
if user:
# User für 10 Minuten cachen
set_cache(cache_key, user, 600)
return user
def update_last_login(self):
"""
Aktualisiert den letzten Login-Zeitstempel.
"""
self.last_login = datetime.now()
# Cache invalidieren
invalidate_model_cache("User", self.id)
class Printer(Base):
__tablename__ = "printers"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
model = Column(String(100)) # Drucker-Modell
location = Column(String(100))
ip_address = Column(String(50)) # IP-Adresse des Druckers
mac_address = Column(String(50), nullable=False, unique=True)
plug_ip = Column(String(50), nullable=False)
plug_username = Column(String(100), nullable=False)
plug_password = Column(String(100), nullable=False)
status = Column(String(20), default="offline") # online, offline, busy, idle
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
last_checked = Column(DateTime, nullable=True) # Zeitstempel der letzten Status-Überprüfung
jobs = relationship("Job", back_populates="printer", cascade="all, delete-orphan")
def to_dict(self) -> dict:
# Cache-Key für Printer-Dict
cache_key = get_cache_key("Printer", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"model": self.model,
"location": self.location,
"ip_address": self.ip_address,
"mac_address": self.mac_address,
"plug_ip": self.plug_ip,
"status": self.status,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_checked": self.last_checked.isoformat() if self.last_checked else None
}
# Ergebnis cachen (2 Minuten für Drucker-Status)
set_cache(cache_key, result, 120)
return result
def update_status(self, new_status: str, active: bool = None):
"""
Aktualisiert den Drucker-Status und invalidiert den Cache.
"""
self.status = new_status
self.last_checked = datetime.now()
if active is not None:
self.active = active
# Cache invalidieren
invalidate_model_cache("Printer", self.id)
@classmethod
def get_all_cached(cls) -> List['Printer']:
"""
Holt alle Drucker mit Caching.
"""
cache_key = get_cache_key("Printer", "all", "list")
cached_printers = get_cache(cache_key)
if cached_printers is not None:
return cached_printers
with get_cached_session() as session:
printers = session.query(cls).all()
# Drucker für 5 Minuten cachen
set_cache(cache_key, printers, 300)
return printers
@classmethod
def get_online_printers(cls) -> List['Printer']:
"""
Holt alle online Drucker mit Caching.
"""
cache_key = get_cache_key("Printer", "online", "list")
cached_printers = get_cache(cache_key)
if cached_printers is not None:
return cached_printers
with get_cached_session() as session:
printers = session.query(cls).filter(
cls.status.in_(["online", "available", "idle"])
).all()
# Online-Drucker für 1 Minute cachen (häufiger aktualisiert)
set_cache(cache_key, printers, 60)
return printers
class Job(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
description = Column(String(500)) # Beschreibung des Jobs
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
start_at = Column(DateTime)
end_at = Column(DateTime)
actual_end_time = Column(DateTime)
status = Column(String(20), default="scheduled") # scheduled|running|finished|aborted
created_at = Column(DateTime, default=datetime.now)
notes = Column(String(500))
material_used = Column(Float) # in Gramm
file_path = Column(String(500), nullable=True)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=True)
duration_minutes = Column(Integer, nullable=False) # Dauer in Minuten
user = relationship("User", back_populates="jobs", foreign_keys=[user_id])
owner = relationship("User", foreign_keys=[owner_id], overlaps="owned_jobs")
printer = relationship("Printer", back_populates="jobs")
def to_dict(self) -> dict:
# Cache-Key für Job-Dict
cache_key = get_cache_key("Job", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"description": self.description,
"user_id": self.user_id,
"printer_id": self.printer_id,
"start_at": self.start_at.isoformat() if self.start_at else None,
"end_at": self.end_at.isoformat() if self.end_at else None,
"actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"notes": self.notes,
"material_used": self.material_used,
"file_path": self.file_path,
"owner_id": self.owner_id,
"duration_minutes": self.duration_minutes,
"user": self.user.to_dict() if self.user else None,
"printer": self.printer.to_dict() if self.printer else None
}
# Ergebnis cachen (3 Minuten für Jobs)
set_cache(cache_key, result, 180)
return result
def update_status(self, new_status: str):
"""
Aktualisiert den Job-Status und invalidiert den Cache.
"""
self.status = new_status
if new_status in ["finished", "failed", "cancelled"]:
self.actual_end_time = datetime.now()
# Cache invalidieren
invalidate_model_cache("Job", self.id)
# Auch User- und Printer-Caches invalidieren
invalidate_model_cache("User", self.user_id)
invalidate_model_cache("Printer", self.printer_id)
@classmethod
def get_active_jobs(cls) -> List['Job']:
"""
Holt alle aktiven Jobs mit Caching.
"""
cache_key = get_cache_key("Job", "active", "list")
cached_jobs = get_cache(cache_key)
if cached_jobs is not None:
return cached_jobs
with get_cached_session() as session:
jobs = session.query(cls).filter(
cls.status.in_(["scheduled", "running"])
).all()
# Aktive Jobs für 30 Sekunden cachen (häufig aktualisiert)
set_cache(cache_key, jobs, 30)
return jobs
@classmethod
def get_user_jobs(cls, user_id: int) -> List['Job']:
"""
Holt alle Jobs eines Benutzers mit Caching.
"""
cache_key = get_cache_key("Job", f"user_{user_id}", "list")
cached_jobs = get_cache(cache_key)
if cached_jobs is not None:
return cached_jobs
with get_cached_session() as session:
jobs = session.query(cls).filter(cls.user_id == user_id).all()
# Benutzer-Jobs für 5 Minuten cachen
set_cache(cache_key, jobs, 300)
return jobs
class Stats(Base):
__tablename__ = "stats"
id = Column(Integer, primary_key=True)
total_print_time = Column(Integer, default=0) # in Sekunden
total_jobs_completed = Column(Integer, default=0)
total_material_used = Column(Float, default=0.0) # in Gramm
last_updated = Column(DateTime, default=datetime.now)
def to_dict(self) -> dict:
# Cache-Key für Stats-Dict
cache_key = get_cache_key("Stats", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"total_print_time": self.total_print_time,
"total_jobs_completed": self.total_jobs_completed,
"total_material_used": self.total_material_used,
"last_updated": self.last_updated.isoformat() if self.last_updated else None
}
# Statistiken für 10 Minuten cachen
set_cache(cache_key, result, 600)
return result
class SystemLog(Base):
"""System-Log Modell für Logging von System-Events"""
__tablename__ = "system_logs"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False)
level = Column(String(20), nullable=False) # DEBUG, INFO, WARNING, ERROR, CRITICAL
message = Column(String(1000), nullable=False)
module = Column(String(100)) # Welches Modul/Blueprint den Log erstellt hat
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Optional: welcher User
ip_address = Column(String(50)) # Optional: IP-Adresse
user_agent = Column(String(500)) # Optional: Browser/Client Info
user = relationship("User", foreign_keys=[user_id])
def to_dict(self) -> dict:
return {
"id": self.id,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"level": self.level,
"message": self.message,
"module": self.module,
"user_id": self.user_id,
"ip_address": self.ip_address,
"user_agent": self.user_agent,
"user": self.user.to_dict() if self.user else None
}
@classmethod
def log_system_event(cls, level: str, message: str, module: str = None,
user_id: int = None, ip_address: str = None,
user_agent: str = None) -> 'SystemLog':
"""
Hilfsmethode zum Erstellen eines System-Log-Eintrags
Args:
level: Log-Level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
message: Log-Nachricht
module: Optional - Modul/Blueprint Name
user_id: Optional - Benutzer-ID
ip_address: Optional - IP-Adresse
user_agent: Optional - User-Agent String
Returns:
SystemLog: Das erstellte Log-Objekt
"""
return cls(
level=level.upper(),
message=message,
module=module,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent
)
class UserPermission(Base):
"""
Berechtigungen für Benutzer.
"""
__tablename__ = "user_permissions"
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
can_start_jobs = Column(Boolean, default=False)
needs_approval = Column(Boolean, default=True)
can_approve_jobs = Column(Boolean, default=False)
user = relationship("User", back_populates="permissions")
def to_dict(self) -> dict:
"""
Konvertiert die Benutzerberechtigungen in ein Dictionary.
"""
return {
"user_id": self.user_id,
"can_start_jobs": self.can_start_jobs,
"needs_approval": self.needs_approval,
"can_approve_jobs": self.can_approve_jobs
}
class Notification(Base):
"""
Benachrichtigungen für Benutzer.
"""
__tablename__ = "notifications"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
type = Column(String(50), nullable=False)
payload = Column(Text) # JSON-Daten als String
created_at = Column(DateTime, default=datetime.now)
read = Column(Boolean, default=False)
user = relationship("User", back_populates="notifications")
def to_dict(self) -> dict:
"""
Konvertiert die Benachrichtigung in ein Dictionary.
"""
return {
"id": self.id,
"user_id": self.user_id,
"type": self.type,
"payload": self.payload,
"created_at": self.created_at.isoformat() if self.created_at else None,
"read": self.read
}
@classmethod
def create_for_approvers(cls, notification_type: str, payload: dict):
"""
Erstellt Benachrichtigungen für alle Benutzer mit can_approve_jobs-Berechtigung.
Args:
notification_type: Art der Benachrichtigung
payload: Daten für die Benachrichtigung als Dictionary
"""
import json
payload_json = json.dumps(payload)
with get_cached_session() as session:
# Alle Benutzer mit can_approve_jobs-Berechtigung finden
approvers = session.query(User).join(UserPermission).filter(
UserPermission.can_approve_jobs == True
).all()
# Benachrichtigungen für alle Genehmiger erstellen
for approver in approvers:
notification = cls(
user_id=approver.id,
type=notification_type,
payload=payload_json
)
session.add(notification)
session.commit()
class GuestRequest(Base):
"""
Gastanfragen für Druckaufträge.
"""
__tablename__ = "guest_requests"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(120))
reason = Column(Text)
duration_min = Column(Integer) # Bestehend - wird für Backward-Kompatibilität beibehalten
duration_minutes = Column(Integer) # Neu hinzugefügt für API-Kompatibilität
created_at = Column(DateTime, default=datetime.now)
status = Column(String(20), default="pending") # pending|approved|denied
printer_id = Column(Integer, ForeignKey("printers.id"))
otp_code = Column(String(100), nullable=True) # Hash des OTP-Codes
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=True)
author_ip = Column(String(50))
otp_used_at = Column(DateTime, nullable=True) # Zeitpunkt der OTP-Verwendung
# Erweiterte Attribute für Datei-Management
file_name = Column(String(255), nullable=True) # Name der hochgeladenen Datei
file_path = Column(String(500), nullable=True) # Pfad zur hochgeladenen Datei
copies = Column(Integer, default=1) # Anzahl der Kopien
# Neue Felder für Admin-Verwaltung
processed_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der die Anfrage bearbeitet hat
processed_at = Column(DateTime, nullable=True) # Zeitpunkt der Bearbeitung
approval_notes = Column(Text, nullable=True) # Notizen bei Genehmigung
rejection_reason = Column(Text, nullable=True) # Grund bei Ablehnung
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Automatische Aktualisierung
# Zusätzliche Zeitstempel für bessere Verwaltung
approved_at = Column(DateTime, nullable=True) # Zeitpunkt der Genehmigung
rejected_at = Column(DateTime, nullable=True) # Zeitpunkt der Ablehnung
approved_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der genehmigt hat
rejected_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Admin der abgelehnt hat
# OTP-Verwaltung erweitert
otp_expires_at = Column(DateTime, nullable=True) # Ablaufzeit des OTP-Codes
assigned_printer_id = Column(Integer, ForeignKey("printers.id"), nullable=True) # Zugewiesener Drucker
# Beziehungen
printer = relationship("Printer", foreign_keys=[printer_id])
assigned_printer = relationship("Printer", foreign_keys=[assigned_printer_id])
job = relationship("Job")
processed_by_user = relationship("User", foreign_keys=[processed_by]) # Admin der bearbeitet hat
approved_by_user = relationship("User", foreign_keys=[approved_by]) # Admin der genehmigt hat
rejected_by_user = relationship("User", foreign_keys=[rejected_by]) # Admin der abgelehnt hat
def to_dict(self) -> dict:
# Cache-Key für GuestRequest-Dict
cache_key = get_cache_key("GuestRequest", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"email": self.email,
"reason": self.reason,
"duration_min": self.duration_min,
"duration_minutes": self.duration_minutes,
"created_at": self.created_at.isoformat() if self.created_at else None,
"status": self.status,
"printer_id": self.printer_id,
"job_id": self.job_id,
"author_ip": self.author_ip,
"otp_used_at": self.otp_used_at.isoformat() if self.otp_used_at else None,
"file_name": self.file_name,
"file_path": self.file_path,
"copies": self.copies,
"processed_by": self.processed_by,
"processed_at": self.processed_at.isoformat() if self.processed_at else None,
"approval_notes": self.approval_notes,
"rejection_reason": self.rejection_reason,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"approved_at": self.approved_at.isoformat() if self.approved_at else None,
"rejected_at": self.rejected_at.isoformat() if self.rejected_at else None,
"approved_by": self.approved_by,
"rejected_by": self.rejected_by,
"otp_expires_at": self.otp_expires_at.isoformat() if self.otp_expires_at else None,
"assigned_printer_id": self.assigned_printer_id,
}
# Ergebnis cachen (5 Minuten)
set_cache(cache_key, result, 300)
return result
def generate_otp(self) -> str:
"""
Generiert einen neuen OTP-Code und speichert den Hash.
"""
otp_plain = secrets.token_hex(8) # 16-stelliger hexadezimaler Code
# Hash des OTP-Codes speichern
otp_bytes = otp_plain.encode('utf-8')
salt = bcrypt.gensalt()
self.otp_code = bcrypt.hashpw(otp_bytes, salt).decode('utf-8')
logger.info(f"OTP generiert für Guest Request {self.id}")
# Cache invalidieren
invalidate_model_cache("GuestRequest", self.id)
return otp_plain
def verify_otp(self, otp_plain: str) -> bool:
"""
Verifiziert einen OTP-Code.
"""
if not self.otp_code or not otp_plain:
return False
try:
otp_bytes = otp_plain.encode('utf-8')
hash_bytes = self.otp_code.encode('utf-8')
is_valid = bcrypt.checkpw(otp_bytes, hash_bytes)
if is_valid:
self.otp_used_at = datetime.now()
logger.info(f"OTP erfolgreich verifiziert für Guest Request {self.id}")
# Cache invalidieren
invalidate_model_cache("GuestRequest", self.id)
else:
logger.warning(f"Ungültiger OTP-Code für Guest Request {self.id}")
return is_valid
except Exception as e:
logger.error(f"Fehler bei OTP-Verifizierung: {str(e)}")
return False
class JobOrder(Base):
"""
Job-Reihenfolge für Drucker im Drag & Drop System.
Speichert die benutzerdefinierte Reihenfolge der Jobs pro Drucker.
"""
__tablename__ = "job_orders"
id = Column(Integer, primary_key=True)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=False)
order_position = Column(Integer, nullable=False) # Position in der Reihenfolge (0-basiert)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
last_modified_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Wer die Reihenfolge geändert hat
# Beziehungen
printer = relationship("Printer", foreign_keys=[printer_id])
job = relationship("Job", foreign_keys=[job_id])
modified_by_user = relationship("User", foreign_keys=[last_modified_by])
# Eindeutige Kombination: Ein Job kann nur eine Position pro Drucker haben
__table_args__ = (
# Sicherstellen, dass jeder Job nur einmal pro Drucker existiert
# und jede Position pro Drucker nur einmal vergeben wird
)
def to_dict(self) -> dict:
"""
Konvertiert JobOrder zu Dictionary.
"""
cache_key = get_cache_key("JobOrder", f"{self.printer_id}_{self.job_id}", "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"printer_id": self.printer_id,
"job_id": self.job_id,
"order_position": self.order_position,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"last_modified_by": self.last_modified_by
}
# Ergebnis cachen (2 Minuten)
set_cache(cache_key, result, 120)
return result
@classmethod
def get_order_for_printer(cls, printer_id: int) -> List['JobOrder']:
"""
Holt die Job-Reihenfolge für einen bestimmten Drucker.
"""
cache_key = get_cache_key("JobOrder", printer_id, "printer_order")
cached_orders = get_cache(cache_key)
if cached_orders is not None:
return cached_orders
with get_cached_session() as session:
orders = session.query(cls).filter(
cls.printer_id == printer_id
).order_by(cls.order_position).all()
# Ergebnis cachen (1 Minute für häufige Abfragen)
set_cache(cache_key, orders, 60)
return orders
@classmethod
def update_printer_order(cls, printer_id: int, job_ids: List[int],
modified_by_user_id: int = None) -> bool:
"""
Aktualisiert die komplette Job-Reihenfolge für einen Drucker.
Args:
printer_id: ID des Druckers
job_ids: Liste der Job-IDs in der gewünschten Reihenfolge
modified_by_user_id: ID des Users der die Änderung durchführt
Returns:
bool: True wenn erfolgreich, False bei Fehler
"""
try:
with get_cached_session() as session:
# Validiere dass alle Jobs existieren und zum Drucker gehören
valid_jobs = session.query(Job).filter(
Job.id.in_(job_ids),
Job.printer_id == printer_id,
Job.status.in_(['scheduled', 'paused'])
).all()
if len(valid_jobs) != len(job_ids):
logger.warning(f"Nicht alle Jobs gültig für Drucker {printer_id}. "
f"Erwartet: {len(job_ids)}, Gefunden: {len(valid_jobs)}")
return False
# Alte Reihenfolge-Einträge für diesen Drucker löschen
session.query(cls).filter(cls.printer_id == printer_id).delete()
# Neue Reihenfolge-Einträge erstellen
for position, job_id in enumerate(job_ids):
order_entry = cls(
printer_id=printer_id,
job_id=job_id,
order_position=position,
last_modified_by=modified_by_user_id
)
session.add(order_entry)
session.commit()
# Cache invalidieren
clear_cache(f"JobOrder:{printer_id}")
logger.info(f"Job-Reihenfolge für Drucker {printer_id} erfolgreich aktualisiert. "
f"Jobs: {job_ids}, Benutzer: {modified_by_user_id}")
return True
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der Job-Reihenfolge für Drucker {printer_id}: {str(e)}")
return False
@classmethod
def get_ordered_job_ids(cls, printer_id: int) -> List[int]:
"""
Holt die Job-IDs in der korrekten Reihenfolge für einen Drucker.
Args:
printer_id: ID des Druckers
Returns:
List[int]: Liste der Job-IDs in der richtigen Reihenfolge
"""
cache_key = get_cache_key("JobOrder", printer_id, "job_ids")
cached_ids = get_cache(cache_key)
if cached_ids is not None:
return cached_ids
try:
with get_cached_session() as session:
orders = session.query(cls).filter(
cls.printer_id == printer_id
).order_by(cls.order_position).all()
job_ids = [order.job_id for order in orders]
# Ergebnis cachen (1 Minute)
set_cache(cache_key, job_ids, 60)
return job_ids
except Exception as e:
logger.error(f"Fehler beim Laden der Job-Reihenfolge für Drucker {printer_id}: {str(e)}")
return []
@classmethod
def remove_job_from_orders(cls, job_id: int):
"""
Entfernt einen Job aus allen Drucker-Reihenfolgen (z.B. wenn Job gelöscht wird).
Args:
job_id: ID des zu entfernenden Jobs
"""
try:
with get_cached_session() as session:
# Alle Order-Einträge für diesen Job finden
orders_to_remove = session.query(cls).filter(cls.job_id == job_id).all()
printer_ids = {order.printer_id for order in orders_to_remove}
# Order-Einträge löschen
session.query(cls).filter(cls.job_id == job_id).delete()
# Positionen neu ordnen für betroffene Drucker
for printer_id in printer_ids:
remaining_orders = session.query(cls).filter(
cls.printer_id == printer_id
).order_by(cls.order_position).all()
# Positionen neu setzen (lückenlos)
for new_position, order in enumerate(remaining_orders):
order.order_position = new_position
order.updated_at = datetime.now()
session.commit()
# Cache für betroffene Drucker invalidieren
for printer_id in printer_ids:
clear_cache(f"JobOrder:{printer_id}")
logger.info(f"Job {job_id} aus allen Drucker-Reihenfolgen entfernt. "
f"Betroffene Drucker: {list(printer_ids)}")
except Exception as e:
logger.error(f"Fehler beim Entfernen des Jobs {job_id} aus Reihenfolgen: {str(e)}")
@classmethod
def cleanup_invalid_orders(cls):
"""
Bereinigt ungültige Order-Einträge (Jobs die nicht mehr existieren oder abgeschlossen sind).
"""
try:
with get_cached_session() as session:
# Finde Order-Einträge mit nicht existierenden oder abgeschlossenen Jobs
invalid_orders = session.query(cls).join(Job).filter(
Job.status.in_(['finished', 'aborted', 'cancelled'])
).all()
printer_ids = {order.printer_id for order in invalid_orders}
# Ungültige Einträge löschen
session.query(cls).join(Job).filter(
Job.status.in_(['finished', 'aborted', 'cancelled'])
).delete(synchronize_session='fetch')
# Positionen für betroffene Drucker neu ordnen
for printer_id in printer_ids:
remaining_orders = session.query(cls).filter(
cls.printer_id == printer_id
).order_by(cls.order_position).all()
for new_position, order in enumerate(remaining_orders):
order.order_position = new_position
order.updated_at = datetime.now()
session.commit()
# Cache für betroffene Drucker invalidieren
for printer_id in printer_ids:
clear_cache(f"JobOrder:{printer_id}")
logger.info(f"Bereinigung der Job-Reihenfolgen abgeschlossen. "
f"Entfernte Einträge: {len(invalid_orders)}, "
f"Betroffene Drucker: {list(printer_ids)}")
except Exception as e:
logger.error(f"Fehler bei der Bereinigung der Job-Reihenfolgen: {str(e)}")
# ===== DATENBANK-INITIALISIERUNG MIT OPTIMIERUNGEN =====
def init_db() -> None:
"""Initialisiert die Datenbank und erstellt alle Tabellen mit Optimierungen."""
ensure_database_directory()
engine = create_optimized_engine()
# Tabellen erstellen
Base.metadata.create_all(engine)
# Indizes für bessere Performance erstellen
with engine.connect() as conn:
# Index für User-Login
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_users_username_email
ON users(username, email)
"""))
# Index für Job-Status und Zeiten
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_jobs_status_times
ON jobs(status, start_at, end_at)
"""))
# Index für Printer-Status
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_printers_status
ON printers(status, active)
"""))
# Index für System-Logs
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_system_logs_timestamp
ON system_logs(timestamp, level)
"""))
conn.commit()
logger.info("Datenbank mit Optimierungen initialisiert")
def init_database() -> None:
"""Alias für init_db() - initialisiert die Datenbank und erstellt alle Tabellen."""
init_db()
def create_initial_admin(email: str = "admin@mercedes-benz.com", password: str = "744563017196A", name: str = "Administrator", username: str = "admin") -> bool:
"""
Erstellt einen initialen Admin-Benutzer, falls die Datenbank leer ist.
Args:
email: E-Mail-Adresse des Admins
password: Passwort des Admins
name: Name des Admins
username: Benutzername des Admins
Returns:
bool: True, wenn der Admin erstellt wurde, False sonst
"""
try:
with get_cached_session() as session:
# Prüfen, ob der Admin bereits existiert
admin = session.query(User).filter(User.email == email).first()
if admin:
# Admin existiert bereits, Passwort zurücksetzen
admin.set_password(password)
admin.role = "admin" # Sicherstellen, dass der Benutzer Admin-Rechte hat
admin.active = True # Sicherstellen, dass der Account aktiv ist
session.commit()
logger.info(f"Admin-Benutzer {username} ({email}) existiert bereits. Passwort wurde zurückgesetzt.")
return True
# Admin erstellen, wenn er nicht existiert
admin = User(
email=email,
username=username,
name=name,
role="admin",
active=True
)
admin.set_password(password)
session.add(admin)
session.commit()
# Statistik-Eintrag anlegen, falls noch nicht vorhanden
stats = session.query(Stats).first()
if not stats:
stats = Stats()
session.add(stats)
session.commit()
logger.info(f"Admin-Benutzer {username} ({email}) wurde angelegt.")
return True
except Exception as e:
logger.error(f"Fehler beim Erstellen des Admin-Benutzers: {str(e)}")
return False
# Engine für Export verfügbar machen
def get_engine():
"""Gibt die optimierte Datenbank-Engine zurück."""
return create_optimized_engine()
# Engine-Variable für direkten Import
engine = get_engine()