import os import logging from datetime import datetime from typing import Optional, List from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, sessionmaker, Session, Mapped, mapped_column from flask_login import UserMixin import bcrypt from config.settings import DATABASE_PATH, ensure_database_directory from utils.logging_config import get_logger Base = declarative_base() logger = get_logger("app") class User(UserMixin, Base): __tablename__ = "users" id = Column(Integer, primary_key=True) email = Column(String(120), unique=True, nullable=False) username = Column(String(100), unique=True, nullable=False) # Füge username hinzu für login password_hash = Column(String(128), nullable=False) name = Column(String(100), nullable=False) role = Column(String(20), default="user") # "admin" oder "user" active = Column(Boolean, default=True) # Für Flask-Login is_active created_at = Column(DateTime, default=datetime.now) jobs = relationship("Job", back_populates="user", foreign_keys="Job.user_id", cascade="all, delete-orphan") owned_jobs = relationship("Job", foreign_keys="Job.owner_id", overlaps="owner") def set_password(self, password: str) -> None: password_bytes = password.encode('utf-8') salt = bcrypt.gensalt() self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8') def check_password(self, password: str) -> bool: password_bytes = password.encode('utf-8') hash_bytes = self.password_hash.encode('utf-8') return bcrypt.checkpw(password_bytes, hash_bytes) @property def is_admin(self) -> bool: return self.role == "admin" @property def is_active(self) -> bool: """Required for Flask-Login""" return self.active def get_id(self) -> str: """Required for Flask-Login - return user id as unicode string""" return str(self.id) def to_dict(self) -> dict: return { "id": self.id, "email": self.email, "username": self.username, "name": self.name, "role": self.role, "active": self.active, "created_at": self.created_at.isoformat() if self.created_at else None } class Printer(Base): __tablename__ = "printers" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) model = Column(String(100)) # Drucker-Modell location = Column(String(100)) ip_address = Column(String(50)) # IP-Adresse des Druckers mac_address = Column(String(50), nullable=False, unique=True) plug_ip = Column(String(50), nullable=False) plug_username = Column(String(100), nullable=False) plug_password = Column(String(100), nullable=False) status = Column(String(20), default="offline") # online, offline, busy, idle active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.now) jobs = relationship("Job", back_populates="printer", cascade="all, delete-orphan") def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "model": self.model, "location": self.location, "ip_address": self.ip_address, "mac_address": self.mac_address, "plug_ip": self.plug_ip, "status": self.status, "active": self.active, "created_at": self.created_at.isoformat() if self.created_at else None } class Job(Base): __tablename__ = "jobs" id = Column(Integer, primary_key=True) name = Column(String(200), nullable=False) description = Column(String(500)) # Beschreibung des Jobs user_id = Column(Integer, ForeignKey("users.id"), nullable=False) printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False) start_at = Column(DateTime) end_at = Column(DateTime) actual_end_time = Column(DateTime) status = Column(String(20), default="scheduled") # scheduled|running|finished|aborted created_at = Column(DateTime, default=datetime.now) notes = Column(String(500)) material_used = Column(Float) # in Gramm file_path = Column(String(500), nullable=True) owner_id = Column(Integer, ForeignKey("users.id"), nullable=True) duration_minutes = Column(Integer, nullable=False) # Dauer in Minuten user = relationship("User", back_populates="jobs", foreign_keys=[user_id]) owner = relationship("User", foreign_keys=[owner_id], overlaps="owned_jobs") printer = relationship("Printer", back_populates="jobs") def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "description": self.description, "user_id": self.user_id, "printer_id": self.printer_id, "start_at": self.start_at.isoformat() if self.start_at else None, "end_at": self.end_at.isoformat() if self.end_at else None, "actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None, "status": self.status, "created_at": self.created_at.isoformat() if self.created_at else None, "notes": self.notes, "material_used": self.material_used, "file_path": self.file_path, "owner_id": self.owner_id, "duration_minutes": self.duration_minutes, "user": self.user.to_dict() if self.user else None, "printer": self.printer.to_dict() if self.printer else None } class Stats(Base): __tablename__ = "stats" id = Column(Integer, primary_key=True) total_print_time = Column(Integer, default=0) # in Sekunden total_jobs_completed = Column(Integer, default=0) total_material_used = Column(Float, default=0.0) # in Gramm last_updated = Column(DateTime, default=datetime.now) def init_db() -> None: """Initialisiert die Datenbank und erstellt alle Tabellen.""" ensure_database_directory() engine = create_engine(f"sqlite:///{DATABASE_PATH}") Base.metadata.create_all(engine) logger.info("Datenbank initialisiert.") def init_database() -> None: """Alias für init_db() - initialisiert die Datenbank und erstellt alle Tabellen.""" init_db() def create_initial_admin(email: str = "admin@mercedes-benz.com", password: str = "744563017196A", name: str = "Administrator", username: str = "admin") -> bool: """ Erstellt einen initialen Admin-Benutzer, falls die Datenbank leer ist. Args: email: E-Mail-Adresse des Admins password: Passwort des Admins name: Name des Admins username: Benutzername des Admins Returns: bool: True, wenn der Admin erstellt wurde, False sonst """ engine = create_engine(f"sqlite:///{DATABASE_PATH}") Session_class = sessionmaker(bind=engine) session = Session_class() # Prüfen, ob der Admin bereits existiert admin = session.query(User).filter(User.email == email).first() if admin: # Admin existiert bereits, Passwort zurücksetzen admin.set_password(password) admin.role = "admin" # Sicherstellen, dass der Benutzer Admin-Rechte hat admin.active = True # Sicherstellen, dass der Account aktiv ist session.commit() session.close() logger.info(f"Admin-Benutzer {username} ({email}) existiert bereits. Passwort wurde zurückgesetzt.") return True # Admin erstellen, wenn er nicht existiert admin = User( email=email, username=username, name=name, role="admin", active=True ) admin.set_password(password) session.add(admin) session.commit() # Statistik-Eintrag anlegen, falls noch nicht vorhanden stats = session.query(Stats).first() if not stats: stats = Stats() session.add(stats) session.commit() session.close() logger.info(f"Admin-Benutzer {username} ({email}) wurde angelegt.") return True def get_db_session() -> Session: """Gibt eine neue Datenbank-Session zurück.""" engine = create_engine(f"sqlite:///{DATABASE_PATH}") Session_class = sessionmaker(bind=engine) return Session_class()