759 lines
26 KiB
Python

import os
import logging
import threading
import time
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float, event, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, Session, Mapped, mapped_column, scoped_session
from sqlalchemy.pool import StaticPool, QueuePool
from sqlalchemy.engine import Engine
from flask_login import UserMixin
import bcrypt
from config.settings import DATABASE_PATH, ensure_database_directory
from utils.logging_config import get_logger
Base = declarative_base()
logger = get_logger("app")
# Thread-lokale Session-Factory für sichere Concurrent-Zugriffe
_session_factory = None
_scoped_session = None
_engine = None
_connection_pool_lock = threading.Lock()
# Cache für häufig abgerufene Daten
_cache = {}
_cache_lock = threading.Lock()
_cache_ttl = {} # Time-to-live für Cache-Einträge
# Alle exportierten Modelle
__all__ = ['User', 'Printer', 'Job', 'Stats', 'SystemLog', 'Base', 'init_db', 'init_database', 'create_initial_admin', 'get_db_session', 'get_cached_session', 'clear_cache']
# ===== DATENBANK-KONFIGURATION MIT WAL UND OPTIMIERUNGEN =====
def configure_sqlite_for_production(dbapi_connection, connection_record):
"""
Konfiguriert SQLite für Produktionsumgebung mit WAL-Modus und Optimierungen.
"""
cursor = dbapi_connection.cursor()
# WAL-Modus aktivieren (Write-Ahead Logging)
cursor.execute("PRAGMA journal_mode=WAL")
# Synchronous-Modus für bessere Performance bei WAL
cursor.execute("PRAGMA synchronous=NORMAL")
# Cache-Größe erhöhen (in KB, negative Werte = KB)
cursor.execute("PRAGMA cache_size=-64000") # 64MB Cache
# Memory-mapped I/O aktivieren
cursor.execute("PRAGMA mmap_size=268435456") # 256MB
# Temp-Store im Memory
cursor.execute("PRAGMA temp_store=MEMORY")
# Optimierungen für bessere Performance
cursor.execute("PRAGMA optimize")
# Foreign Key Constraints aktivieren
cursor.execute("PRAGMA foreign_keys=ON")
# Auto-Vacuum für automatische Speicherbereinigung
cursor.execute("PRAGMA auto_vacuum=INCREMENTAL")
# Busy Timeout für Concurrent Access
cursor.execute("PRAGMA busy_timeout=30000") # 30 Sekunden
# Checkpoint-Intervall für WAL
cursor.execute("PRAGMA wal_autocheckpoint=1000")
cursor.close()
logger.info("SQLite für Produktionsumgebung konfiguriert (WAL-Modus, Cache, Optimierungen)")
def create_optimized_engine():
"""
Erstellt eine optimierte SQLite-Engine mit Connection Pooling und WAL-Modus.
"""
global _engine
if _engine is not None:
return _engine
with _connection_pool_lock:
if _engine is not None:
return _engine
ensure_database_directory()
# Connection String mit optimierten Parametern
connection_string = f"sqlite:///{DATABASE_PATH}"
# Engine mit Connection Pooling erstellen
_engine = create_engine(
connection_string,
# Connection Pool Konfiguration
poolclass=StaticPool,
pool_pre_ping=True, # Verbindungen vor Nutzung testen
pool_recycle=3600, # Verbindungen nach 1 Stunde erneuern
connect_args={
"check_same_thread": False, # Für Multi-Threading
"timeout": 30, # Connection Timeout
"isolation_level": None # Autocommit-Modus für bessere Kontrolle
},
# Echo für Debugging (in Produktion ausschalten)
echo=False,
# Weitere Optimierungen
execution_options={
"autocommit": False
}
)
# Event-Listener für SQLite-Optimierungen
event.listen(_engine, "connect", configure_sqlite_for_production)
# Regelmäßige Wartungsaufgaben
event.listen(_engine, "connect", lambda conn, rec: schedule_maintenance())
logger.info(f"Optimierte SQLite-Engine erstellt: {DATABASE_PATH}")
return _engine
def schedule_maintenance():
"""
Plant regelmäßige Wartungsaufgaben für die Datenbank.
"""
def maintenance_worker():
time.sleep(300) # 5 Minuten warten
try:
with get_maintenance_session() as session:
# WAL-Checkpoint ausführen
session.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
# Statistiken aktualisieren
session.execute(text("ANALYZE"))
# Incremental Vacuum
session.execute(text("PRAGMA incremental_vacuum"))
session.commit()
logger.info("Datenbank-Wartung erfolgreich durchgeführt")
except Exception as e:
logger.error(f"Fehler bei Datenbank-Wartung: {str(e)}")
# Wartung in separatem Thread ausführen
maintenance_thread = threading.Thread(target=maintenance_worker, daemon=True)
maintenance_thread.start()
def get_session_factory():
"""
Gibt die Thread-sichere Session-Factory zurück.
"""
global _session_factory, _scoped_session
if _session_factory is None:
with _connection_pool_lock:
if _session_factory is None:
engine = create_optimized_engine()
_session_factory = sessionmaker(
bind=engine,
autoflush=True,
autocommit=False,
expire_on_commit=False # Objekte nach Commit nicht expiren
)
_scoped_session = scoped_session(_session_factory)
return _scoped_session
@contextmanager
def get_maintenance_session():
"""
Context Manager für Wartungs-Sessions.
"""
engine = create_optimized_engine()
session = sessionmaker(bind=engine)()
try:
yield session
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# ===== CACHING-SYSTEM =====
def get_cache_key(model_class: str, identifier: Any, extra: str = "") -> str:
"""
Generiert einen Cache-Schlüssel.
"""
return f"{model_class}:{identifier}:{extra}"
def set_cache(key: str, value: Any, ttl_seconds: int = 300):
"""
Setzt einen Wert im Cache mit TTL.
"""
with _cache_lock:
_cache[key] = value
_cache_ttl[key] = time.time() + ttl_seconds
def get_cache(key: str) -> Optional[Any]:
"""
Holt einen Wert aus dem Cache.
"""
with _cache_lock:
if key in _cache:
if key in _cache_ttl and time.time() > _cache_ttl[key]:
# Cache-Eintrag abgelaufen
del _cache[key]
del _cache_ttl[key]
return None
return _cache[key]
return None
def clear_cache(pattern: str = None):
"""
Löscht Cache-Einträge (optional mit Pattern).
"""
with _cache_lock:
if pattern is None:
_cache.clear()
_cache_ttl.clear()
else:
keys_to_delete = [k for k in _cache.keys() if pattern in k]
for key in keys_to_delete:
del _cache[key]
if key in _cache_ttl:
del _cache_ttl[key]
def invalidate_model_cache(model_class: str, identifier: Any = None):
"""
Invalidiert Cache-Einträge für ein bestimmtes Modell.
"""
if identifier is not None:
pattern = f"{model_class}:{identifier}"
else:
pattern = f"{model_class}:"
clear_cache(pattern)
# ===== ERWEITERTE SESSION-VERWALTUNG =====
@contextmanager
def get_cached_session():
"""
Context Manager für gecachte Sessions mit automatischem Rollback.
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Datenbank-Transaktion fehlgeschlagen: {str(e)}")
raise e
finally:
session.close()
def get_db_session() -> Session:
"""
Gibt eine neue Datenbank-Session zurück (Legacy-Kompatibilität).
"""
session_factory = get_session_factory()
return session_factory()
# ===== MODELL-DEFINITIONEN =====
class User(UserMixin, Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(120), unique=True, nullable=False)
username = Column(String(100), unique=True, nullable=False) # Füge username hinzu für login
password_hash = Column(String(128), nullable=False)
name = Column(String(100), nullable=False)
role = Column(String(20), default="user") # "admin" oder "user"
active = Column(Boolean, default=True) # Für Flask-Login is_active
created_at = Column(DateTime, default=datetime.now)
last_login = Column(DateTime, nullable=True) # Letzter Login-Zeitstempel
jobs = relationship("Job", back_populates="user", foreign_keys="Job.user_id", cascade="all, delete-orphan")
owned_jobs = relationship("Job", foreign_keys="Job.owner_id", overlaps="owner")
def set_password(self, password: str) -> None:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
# Cache invalidieren
invalidate_model_cache("User", self.id)
def check_password(self, password: str) -> bool:
password_bytes = password.encode('utf-8')
hash_bytes = self.password_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, hash_bytes)
@property
def is_admin(self) -> bool:
return self.role == "admin"
@property
def is_active(self) -> bool:
"""Required for Flask-Login"""
return self.active
def get_id(self) -> str:
"""Required for Flask-Login - return user id as unicode string"""
return str(self.id)
def to_dict(self) -> dict:
# Cache-Key für User-Dict
cache_key = get_cache_key("User", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"email": self.email,
"username": self.username,
"name": self.name,
"role": self.role,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_login": self.last_login.isoformat() if self.last_login else None
}
# Ergebnis cachen (5 Minuten)
set_cache(cache_key, result, 300)
return result
@classmethod
def get_by_username_or_email(cls, identifier: str) -> Optional['User']:
"""
Holt einen Benutzer anhand von Username oder E-Mail mit Caching.
"""
cache_key = get_cache_key("User", identifier, "login")
cached_user = get_cache(cache_key)
if cached_user is not None:
return cached_user
with get_cached_session() as session:
user = session.query(cls).filter(
(cls.username == identifier) | (cls.email == identifier)
).first()
if user:
# User für 10 Minuten cachen
set_cache(cache_key, user, 600)
return user
def update_last_login(self):
"""
Aktualisiert den letzten Login-Zeitstempel.
"""
self.last_login = datetime.now()
# Cache invalidieren
invalidate_model_cache("User", self.id)
class Printer(Base):
__tablename__ = "printers"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
model = Column(String(100)) # Drucker-Modell
location = Column(String(100))
ip_address = Column(String(50)) # IP-Adresse des Druckers
mac_address = Column(String(50), nullable=False, unique=True)
plug_ip = Column(String(50), nullable=False)
plug_username = Column(String(100), nullable=False)
plug_password = Column(String(100), nullable=False)
status = Column(String(20), default="offline") # online, offline, busy, idle
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
last_checked = Column(DateTime, nullable=True) # Zeitstempel der letzten Status-Überprüfung
jobs = relationship("Job", back_populates="printer", cascade="all, delete-orphan")
def to_dict(self) -> dict:
# Cache-Key für Printer-Dict
cache_key = get_cache_key("Printer", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"model": self.model,
"location": self.location,
"ip_address": self.ip_address,
"mac_address": self.mac_address,
"plug_ip": self.plug_ip,
"status": self.status,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_checked": self.last_checked.isoformat() if self.last_checked else None
}
# Ergebnis cachen (2 Minuten für Drucker-Status)
set_cache(cache_key, result, 120)
return result
def update_status(self, new_status: str, active: bool = None):
"""
Aktualisiert den Drucker-Status und invalidiert den Cache.
"""
self.status = new_status
self.last_checked = datetime.now()
if active is not None:
self.active = active
# Cache invalidieren
invalidate_model_cache("Printer", self.id)
@classmethod
def get_all_cached(cls) -> List['Printer']:
"""
Holt alle Drucker mit Caching.
"""
cache_key = get_cache_key("Printer", "all", "list")
cached_printers = get_cache(cache_key)
if cached_printers is not None:
return cached_printers
with get_cached_session() as session:
printers = session.query(cls).all()
# Drucker für 5 Minuten cachen
set_cache(cache_key, printers, 300)
return printers
@classmethod
def get_online_printers(cls) -> List['Printer']:
"""
Holt alle online Drucker mit Caching.
"""
cache_key = get_cache_key("Printer", "online", "list")
cached_printers = get_cache(cache_key)
if cached_printers is not None:
return cached_printers
with get_cached_session() as session:
printers = session.query(cls).filter(
cls.status.in_(["online", "available", "idle"])
).all()
# Online-Drucker für 1 Minute cachen (häufiger aktualisiert)
set_cache(cache_key, printers, 60)
return printers
class Job(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
description = Column(String(500)) # Beschreibung des Jobs
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
start_at = Column(DateTime)
end_at = Column(DateTime)
actual_end_time = Column(DateTime)
status = Column(String(20), default="scheduled") # scheduled|running|finished|aborted
created_at = Column(DateTime, default=datetime.now)
notes = Column(String(500))
material_used = Column(Float) # in Gramm
file_path = Column(String(500), nullable=True)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=True)
duration_minutes = Column(Integer, nullable=False) # Dauer in Minuten
user = relationship("User", back_populates="jobs", foreign_keys=[user_id])
owner = relationship("User", foreign_keys=[owner_id], overlaps="owned_jobs")
printer = relationship("Printer", back_populates="jobs")
def to_dict(self) -> dict:
# Cache-Key für Job-Dict
cache_key = get_cache_key("Job", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"name": self.name,
"description": self.description,
"user_id": self.user_id,
"printer_id": self.printer_id,
"start_at": self.start_at.isoformat() if self.start_at else None,
"end_at": self.end_at.isoformat() if self.end_at else None,
"actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"notes": self.notes,
"material_used": self.material_used,
"file_path": self.file_path,
"owner_id": self.owner_id,
"duration_minutes": self.duration_minutes,
"user": self.user.to_dict() if self.user else None,
"printer": self.printer.to_dict() if self.printer else None
}
# Ergebnis cachen (3 Minuten für Jobs)
set_cache(cache_key, result, 180)
return result
def update_status(self, new_status: str):
"""
Aktualisiert den Job-Status und invalidiert den Cache.
"""
self.status = new_status
if new_status in ["finished", "failed", "cancelled"]:
self.actual_end_time = datetime.now()
# Cache invalidieren
invalidate_model_cache("Job", self.id)
# Auch User- und Printer-Caches invalidieren
invalidate_model_cache("User", self.user_id)
invalidate_model_cache("Printer", self.printer_id)
@classmethod
def get_active_jobs(cls) -> List['Job']:
"""
Holt alle aktiven Jobs mit Caching.
"""
cache_key = get_cache_key("Job", "active", "list")
cached_jobs = get_cache(cache_key)
if cached_jobs is not None:
return cached_jobs
with get_cached_session() as session:
jobs = session.query(cls).filter(
cls.status.in_(["scheduled", "running"])
).all()
# Aktive Jobs für 30 Sekunden cachen (häufig aktualisiert)
set_cache(cache_key, jobs, 30)
return jobs
@classmethod
def get_user_jobs(cls, user_id: int) -> List['Job']:
"""
Holt alle Jobs eines Benutzers mit Caching.
"""
cache_key = get_cache_key("Job", f"user_{user_id}", "list")
cached_jobs = get_cache(cache_key)
if cached_jobs is not None:
return cached_jobs
with get_cached_session() as session:
jobs = session.query(cls).filter(cls.user_id == user_id).all()
# Benutzer-Jobs für 5 Minuten cachen
set_cache(cache_key, jobs, 300)
return jobs
class Stats(Base):
__tablename__ = "stats"
id = Column(Integer, primary_key=True)
total_print_time = Column(Integer, default=0) # in Sekunden
total_jobs_completed = Column(Integer, default=0)
total_material_used = Column(Float, default=0.0) # in Gramm
last_updated = Column(DateTime, default=datetime.now)
def to_dict(self) -> dict:
# Cache-Key für Stats-Dict
cache_key = get_cache_key("Stats", self.id, "dict")
cached_result = get_cache(cache_key)
if cached_result is not None:
return cached_result
result = {
"id": self.id,
"total_print_time": self.total_print_time,
"total_jobs_completed": self.total_jobs_completed,
"total_material_used": self.total_material_used,
"last_updated": self.last_updated.isoformat() if self.last_updated else None
}
# Statistiken für 10 Minuten cachen
set_cache(cache_key, result, 600)
return result
class SystemLog(Base):
"""System-Log Modell für Logging von System-Events"""
__tablename__ = "system_logs"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False)
level = Column(String(20), nullable=False) # DEBUG, INFO, WARNING, ERROR, CRITICAL
message = Column(String(1000), nullable=False)
module = Column(String(100)) # Welches Modul/Blueprint den Log erstellt hat
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Optional: welcher User
ip_address = Column(String(50)) # Optional: IP-Adresse
user_agent = Column(String(500)) # Optional: Browser/Client Info
user = relationship("User", foreign_keys=[user_id])
def to_dict(self) -> dict:
return {
"id": self.id,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"level": self.level,
"message": self.message,
"module": self.module,
"user_id": self.user_id,
"ip_address": self.ip_address,
"user_agent": self.user_agent,
"user": self.user.to_dict() if self.user else None
}
@classmethod
def log_system_event(cls, level: str, message: str, module: str = None,
user_id: int = None, ip_address: str = None,
user_agent: str = None) -> 'SystemLog':
"""
Hilfsmethode zum Erstellen eines System-Log-Eintrags
Args:
level: Log-Level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
message: Log-Nachricht
module: Optional - Modul/Blueprint Name
user_id: Optional - Benutzer-ID
ip_address: Optional - IP-Adresse
user_agent: Optional - User-Agent String
Returns:
SystemLog: Das erstellte Log-Objekt
"""
return cls(
level=level.upper(),
message=message,
module=module,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent
)
# ===== DATENBANK-INITIALISIERUNG MIT OPTIMIERUNGEN =====
def init_db() -> None:
"""Initialisiert die Datenbank und erstellt alle Tabellen mit Optimierungen."""
ensure_database_directory()
engine = create_optimized_engine()
# Tabellen erstellen
Base.metadata.create_all(engine)
# Indizes für bessere Performance erstellen
with engine.connect() as conn:
# Index für User-Login
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_users_username_email
ON users(username, email)
"""))
# Index für Job-Status und Zeiten
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_jobs_status_times
ON jobs(status, start_at, end_at)
"""))
# Index für Printer-Status
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_printers_status
ON printers(status, active)
"""))
# Index für System-Logs
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_system_logs_timestamp
ON system_logs(timestamp, level)
"""))
conn.commit()
logger.info("Datenbank mit Optimierungen initialisiert")
def init_database() -> None:
"""Alias für init_db() - initialisiert die Datenbank und erstellt alle Tabellen."""
init_db()
def create_initial_admin(email: str = "admin@mercedes-benz.com", password: str = "744563017196A", name: str = "Administrator", username: str = "admin") -> bool:
"""
Erstellt einen initialen Admin-Benutzer, falls die Datenbank leer ist.
Args:
email: E-Mail-Adresse des Admins
password: Passwort des Admins
name: Name des Admins
username: Benutzername des Admins
Returns:
bool: True, wenn der Admin erstellt wurde, False sonst
"""
try:
with get_cached_session() as session:
# Prüfen, ob der Admin bereits existiert
admin = session.query(User).filter(User.email == email).first()
if admin:
# Admin existiert bereits, Passwort zurücksetzen
admin.set_password(password)
admin.role = "admin" # Sicherstellen, dass der Benutzer Admin-Rechte hat
admin.active = True # Sicherstellen, dass der Account aktiv ist
session.commit()
logger.info(f"Admin-Benutzer {username} ({email}) existiert bereits. Passwort wurde zurückgesetzt.")
return True
# Admin erstellen, wenn er nicht existiert
admin = User(
email=email,
username=username,
name=name,
role="admin",
active=True
)
admin.set_password(password)
session.add(admin)
session.commit()
# Statistik-Eintrag anlegen, falls noch nicht vorhanden
stats = session.query(Stats).first()
if not stats:
stats = Stats()
session.add(stats)
session.commit()
logger.info(f"Admin-Benutzer {username} ({email}) wurde angelegt.")
return True
except Exception as e:
logger.error(f"Fehler beim Erstellen des Admin-Benutzers: {str(e)}")
return False