"feat: Integrate new printer model functionality in backend/app/models.py and templates/printers.html"

This commit is contained in:
2025-05-29 09:46:58 +02:00
parent d81149229a
commit 8b9ae6b451
2 changed files with 332 additions and 3 deletions

View File

@@ -6,13 +6,14 @@ from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float, event, text
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float, event, text, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, Session, Mapped, mapped_column, scoped_session
from sqlalchemy.pool import StaticPool, QueuePool
from sqlalchemy.engine import Engine
from flask_login import UserMixin
import bcrypt
import secrets
from config.settings import DATABASE_PATH, ensure_database_directory
from utils.logging_config import get_logger
@@ -32,7 +33,7 @@ _cache_lock = threading.Lock()
_cache_ttl = {} # Time-to-live für Cache-Einträge
# Alle exportierten Modelle
__all__ = ['User', 'Printer', 'Job', 'Stats', 'SystemLog', 'Base', 'init_db', 'init_database', 'create_initial_admin', 'get_db_session', 'get_cached_session', 'clear_cache']
__all__ = ['User', 'Printer', 'Job', 'Stats', 'SystemLog', 'Base', 'GuestRequest', 'UserPermission', 'Notification', 'init_db', 'init_database', 'create_initial_admin', 'get_db_session', 'get_cached_session', 'clear_cache']
# ===== DATENBANK-KONFIGURATION MIT WAL UND OPTIMIERUNGEN =====
@@ -283,6 +284,8 @@ class User(UserMixin, Base):
jobs = relationship("Job", back_populates="user", foreign_keys="Job.user_id", cascade="all, delete-orphan")
owned_jobs = relationship("Job", foreign_keys="Job.owner_id", overlaps="owner")
permissions = relationship("UserPermission", back_populates="user", uselist=False, cascade="all, delete-orphan")
notifications = relationship("Notification", back_populates="user", cascade="all, delete-orphan")
def set_password(self, password: str) -> None:
password_bytes = password.encode('utf-8')
@@ -659,6 +662,167 @@ class SystemLog(Base):
)
class UserPermission(Base):
"""
Berechtigungen für Benutzer.
"""
__tablename__ = "user_permissions"
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
can_start_jobs = Column(Boolean, default=False)
needs_approval = Column(Boolean, default=True)
can_approve_jobs = Column(Boolean, default=False)
user = relationship("User", back_populates="permissions")
def to_dict(self) -> dict:
"""
Konvertiert die Benutzerberechtigungen in ein Dictionary.
"""
return {
"user_id": self.user_id,
"can_start_jobs": self.can_start_jobs,
"needs_approval": self.needs_approval,
"can_approve_jobs": self.can_approve_jobs
}
class Notification(Base):
"""
Benachrichtigungen für Benutzer.
"""
__tablename__ = "notifications"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
type = Column(String(50), nullable=False)
payload = Column(Text) # JSON-Daten als String
created_at = Column(DateTime, default=datetime.now)
read = Column(Boolean, default=False)
user = relationship("User", back_populates="notifications")
def to_dict(self) -> dict:
"""
Konvertiert die Benachrichtigung in ein Dictionary.
"""
return {
"id": self.id,
"user_id": self.user_id,
"type": self.type,
"payload": self.payload,
"created_at": self.created_at.isoformat() if self.created_at else None,
"read": self.read
}
@classmethod
def create_for_approvers(cls, notification_type: str, payload: dict):
"""
Erstellt Benachrichtigungen für alle Benutzer mit can_approve_jobs-Berechtigung.
Args:
notification_type: Art der Benachrichtigung
payload: Daten für die Benachrichtigung als Dictionary
"""
import json
payload_json = json.dumps(payload)
with get_cached_session() as session:
# Alle Benutzer mit can_approve_jobs-Berechtigung finden
approvers = session.query(User).join(UserPermission).filter(
UserPermission.can_approve_jobs == True
).all()
# Benachrichtigungen für alle Genehmiger erstellen
for approver in approvers:
notification = cls(
user_id=approver.id,
type=notification_type,
payload=payload_json
)
session.add(notification)
session.commit()
class GuestRequest(Base):
"""
Gastanfragen für Druckaufträge.
"""
__tablename__ = "guest_requests"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(120))
reason = Column(Text)
duration_min = Column(Integer)
created_at = Column(DateTime, default=datetime.now)
status = Column(String(20), default="pending") # pending|approved|denied
printer_id = Column(Integer, ForeignKey("printers.id"))
otp_code = Column(String(100), nullable=True) # Hash des OTP-Codes
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=True)
author_ip = Column(String(50))
printer = relationship("Printer")
job = relationship("Job")
def to_dict(self) -> dict:
"""
Konvertiert die Gastanfrage in ein Dictionary.
"""
return {
"id": self.id,
"name": self.name,
"email": self.email,
"reason": self.reason,
"duration_min": self.duration_min,
"created_at": self.created_at.isoformat() if self.created_at else None,
"status": self.status,
"printer_id": self.printer_id,
"job_id": self.job_id,
"printer": self.printer.to_dict() if self.printer else None,
"job": self.job.to_dict() if self.job else None
}
def generate_otp(self) -> str:
"""
Generiert einen einmaligen OTP-Code und speichert den Hash in der Datenbank.
Returns:
str: Der generierte OTP-Code im Klartext
"""
# Generiere 6-stelligen Code (Großbuchstaben + Ziffern)
otp_plain = ''.join(secrets.choice('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ') for _ in range(6))
# Hash für die Speicherung erstellen
otp_bytes = otp_plain.encode('utf-8')
salt = bcrypt.gensalt()
otp_hash = bcrypt.hashpw(otp_bytes, salt).decode('utf-8')
# Hash in der Datenbank speichern
self.otp_code = otp_hash
return otp_plain
def verify_otp(self, otp_plain: str) -> bool:
"""
Überprüft, ob der angegebene OTP-Code gültig ist.
Args:
otp_plain: Der zu überprüfende OTP-Code im Klartext
Returns:
bool: True, wenn der Code gültig ist, sonst False
"""
if not self.otp_code:
return False
otp_bytes = otp_plain.encode('utf-8')
hash_bytes = self.otp_code.encode('utf-8')
return bcrypt.checkpw(otp_bytes, hash_bytes)
# ===== DATENBANK-INITIALISIERUNG MIT OPTIMIERUNGEN =====
def init_db() -> None: