Remove deprecated backend files and documentation, including Docker configurations, environment variables, and various scripts, to streamline the project structure and eliminate unused components.

This commit is contained in:
2025-05-24 17:47:05 +02:00
parent d2f23d589a
commit ead75ae451
98 changed files with 3917 additions and 35610 deletions

1001
backend/app/app.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,66 @@
import os
import json
from datetime import timedelta
# Hardcodierte Konfiguration
SECRET_KEY = "7445630171969DFAC92C53CEC92E67A9CB2E00B3CB2F"
DATABASE_PATH = "database/myp.db"
TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
TAPO_PASSWORD = "744563017196A"
# Drucker-Konfiguration
PRINTERS = {
"Printer 1": {"ip": "192.168.0.100"},
"Printer 2": {"ip": "192.168.0.101"},
"Printer 3": {"ip": "192.168.0.102"},
"Printer 4": {"ip": "192.168.0.103"},
"Printer 5": {"ip": "192.168.0.104"},
"Printer 6": {"ip": "192.168.0.106"}
}
# Logging-Konfiguration
LOG_DIR = "logs"
LOG_SUBDIRS = ["app", "scheduler", "auth", "jobs", "printers", "errors"]
LOG_LEVEL = "INFO"
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# Flask-Konfiguration
FLASK_HOST = "0.0.0.0"
FLASK_PORT = 5000
FLASK_DEBUG = True
SESSION_LIFETIME = timedelta(days=7)
# Scheduler-Konfiguration
SCHEDULER_INTERVAL = 60 # Sekunden
SCHEDULER_ENABLED = True
# Datenbank-Konfiguration
DB_ENGINE = f"sqlite:///{DATABASE_PATH}"
def get_log_file(category: str) -> str:
"""
Gibt den Pfad zur Log-Datei für eine bestimmte Kategorie zurück.
Args:
category: Log-Kategorie (app, scheduler, auth, jobs, printers, errors)
Returns:
str: Pfad zur Log-Datei
"""
if category not in LOG_SUBDIRS:
category = "app"
return os.path.join(LOG_DIR, category, f"{category}.log")
def ensure_log_directories():
"""Erstellt alle erforderlichen Log-Verzeichnisse."""
os.makedirs(LOG_DIR, exist_ok=True)
for subdir in LOG_SUBDIRS:
os.makedirs(os.path.join(LOG_DIR, subdir), exist_ok=True)
def ensure_database_directory():
"""Erstellt das Datenbank-Verzeichnis."""
db_dir = os.path.dirname(DATABASE_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)

180
backend/app/models.py Normal file
View File

@@ -0,0 +1,180 @@
import os
import logging
from datetime import datetime
from typing import Optional, List
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, Session
import bcrypt
from config.settings import DATABASE_PATH, ensure_database_directory
from utils.logging_config import get_logger
Base = declarative_base()
logger = get_logger("app")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(120), unique=True, nullable=False)
password_hash = Column(String(128), nullable=False)
name = Column(String(100), nullable=False)
role = Column(String(20), default="user") # "admin" oder "user"
created_at = Column(DateTime, default=datetime.now)
jobs = relationship("Job", back_populates="user", cascade="all, delete-orphan")
def set_password(self, password: str) -> None:
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
def check_password(self, password: str) -> bool:
password_bytes = password.encode('utf-8')
hash_bytes = self.password_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, hash_bytes)
def is_admin(self) -> bool:
return self.role == "admin"
def to_dict(self) -> dict:
return {
"id": self.id,
"email": self.email,
"name": self.name,
"role": self.role,
"created_at": self.created_at.isoformat() if self.created_at else None
}
class Printer(Base):
__tablename__ = "printers"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
location = Column(String(100))
mac_address = Column(String(50), nullable=False, unique=True)
plug_ip = Column(String(50), nullable=False)
plug_username = Column(String(100), nullable=False)
plug_password = Column(String(100), nullable=False)
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
jobs = relationship("Job", back_populates="printer", cascade="all, delete-orphan")
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"location": self.location,
"mac_address": self.mac_address,
"plug_ip": self.plug_ip,
"active": self.active,
"created_at": self.created_at.isoformat() if self.created_at else None
}
class Job(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
printer_id = Column(Integer, ForeignKey("printers.id"), nullable=False)
start_time = Column(DateTime, nullable=False)
end_time = Column(DateTime, nullable=False)
actual_end_time = Column(DateTime)
status = Column(String(20), default="scheduled") # scheduled, active, completed, aborted
created_at = Column(DateTime, default=datetime.now)
notes = Column(String(500))
material_used = Column(Float) # in Gramm
user = relationship("User", back_populates="jobs")
printer = relationship("Printer", back_populates="jobs")
def to_dict(self) -> dict:
return {
"id": self.id,
"title": self.title,
"user_id": self.user_id,
"printer_id": self.printer_id,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"notes": self.notes,
"material_used": self.material_used,
"user": self.user.to_dict() if self.user else None,
"printer": self.printer.to_dict() if self.printer else None
}
class Stats(Base):
__tablename__ = "stats"
id = Column(Integer, primary_key=True)
total_print_time = Column(Integer, default=0) # in Sekunden
total_jobs_completed = Column(Integer, default=0)
total_material_used = Column(Float, default=0.0) # in Gramm
last_updated = Column(DateTime, default=datetime.now)
def init_db() -> None:
"""Initialisiert die Datenbank und erstellt alle Tabellen."""
ensure_database_directory()
engine = create_engine(f"sqlite:///{DATABASE_PATH}")
Base.metadata.create_all(engine)
logger.info("Datenbank initialisiert.")
def create_initial_admin(email: str, password: str, name: str) -> bool:
"""
Erstellt einen initialen Admin-Benutzer, falls die Datenbank leer ist.
Args:
email: E-Mail-Adresse des Admins
password: Passwort des Admins
name: Name des Admins
Returns:
bool: True, wenn der Admin erstellt wurde, False sonst
"""
engine = create_engine(f"sqlite:///{DATABASE_PATH}")
Session_class = sessionmaker(bind=engine)
session = Session_class()
# Prüfen, ob bereits Benutzer existieren
user_count = session.query(User).count()
if user_count > 0:
session.close()
return False
# Ersten Admin anlegen
admin = User(
email=email,
name=name,
role="admin"
)
admin.set_password(password)
session.add(admin)
session.commit()
# Statistik-Eintrag anlegen
stats = Stats()
session.add(stats)
session.commit()
session.close()
logger.info(f"Initialer Admin-Benutzer {email} wurde angelegt.")
return True
def get_db_session() -> Session:
"""Gibt eine neue Datenbank-Session zurück."""
engine = create_engine(f"sqlite:///{DATABASE_PATH}")
Session_class = sessionmaker(bind=engine)
return Session_class()

View File

@@ -0,0 +1 @@
# Utils package for MYP

View File

@@ -0,0 +1,230 @@
import threading
import time
import logging
from typing import Dict, Callable, Any, List, Optional, Union
from datetime import datetime, timedelta
from utils.logging_config import get_logger
logger = get_logger("scheduler")
class BackgroundTaskScheduler:
"""
Ein fortschrittlicher Hintergrund-Task-Scheduler, der registrierbare Worker-Funktionen unterstützt.
Tasks können als Platzhalter registriert und später konfiguriert werden.
"""
def __init__(self):
self._tasks: Dict[str, Dict[str, Any]] = {}
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._running = False
def register_task(self,
task_id: str,
func: Callable,
interval: int = 60,
args: List = None,
kwargs: Dict = None,
enabled: bool = True) -> bool:
"""
Registriert eine neue Hintergrund-Task.
Args:
task_id: Eindeutige ID für die Task
func: Die auszuführende Funktion
interval: Intervall in Sekunden zwischen den Ausführungen
args: Positionsargumente für die Funktion
kwargs: Schlüsselwortargumente für die Funktion
enabled: Ob die Task aktiviert sein soll
Returns:
bool: True wenn erfolgreich, False wenn die ID bereits existiert
"""
if task_id in self._tasks:
logger.error(f"Task mit ID {task_id} existiert bereits")
return False
self._tasks[task_id] = {
"func": func,
"interval": interval,
"args": args or [],
"kwargs": kwargs or {},
"enabled": enabled,
"last_run": None,
"next_run": datetime.now() if enabled else None
}
logger.info(f"Task {task_id} registriert: Intervall {interval}s, Enabled: {enabled}")
return True
def update_task(self,
task_id: str,
interval: Optional[int] = None,
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
enabled: Optional[bool] = None) -> bool:
"""
Aktualisiert die Konfiguration einer bestehenden Task.
Args:
task_id: ID der zu aktualisierenden Task
interval: Neues Intervall in Sekunden
args: Neue Positionsargumente
kwargs: Neue Schlüsselwortargumente
enabled: Neuer Aktivierungsstatus
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
if task_id not in self._tasks:
logger.error(f"Task mit ID {task_id} existiert nicht")
return False
task = self._tasks[task_id]
if interval is not None:
task["interval"] = interval
if args is not None:
task["args"] = args
if kwargs is not None:
task["kwargs"] = kwargs
if enabled is not None and enabled != task["enabled"]:
task["enabled"] = enabled
if enabled:
task["next_run"] = datetime.now()
else:
task["next_run"] = None
logger.info(f"Task {task_id} aktualisiert: Intervall {task['interval']}s, Enabled: {task['enabled']}")
return True
def remove_task(self, task_id: str) -> bool:
"""
Entfernt eine Task aus dem Scheduler.
Args:
task_id: ID der zu entfernenden Task
Returns:
bool: True wenn erfolgreich, False wenn die ID nicht existiert
"""
if task_id not in self._tasks:
logger.error(f"Task mit ID {task_id} existiert nicht")
return False
del self._tasks[task_id]
logger.info(f"Task {task_id} entfernt")
return True
def get_task_info(self, task_id: Optional[str] = None) -> Union[Dict, List[Dict]]:
"""
Gibt Informationen zu einer Task oder allen Tasks zurück.
Args:
task_id: ID der Task oder None für alle Tasks
Returns:
Dict oder List: Task-Informationen
"""
if task_id is not None:
if task_id not in self._tasks:
return {}
task = self._tasks[task_id]
return {
"id": task_id,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
return [
{
"id": tid,
"interval": task["interval"],
"enabled": task["enabled"],
"last_run": task["last_run"].isoformat() if task["last_run"] else None,
"next_run": task["next_run"].isoformat() if task["next_run"] else None
}
for tid, task in self._tasks.items()
]
def start(self) -> bool:
"""
Startet den Scheduler.
Returns:
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
"""
if self._running:
logger.warning("Scheduler läuft bereits")
return False
self._stop_event.clear()
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
self._running = True
logger.info("Scheduler gestartet")
return True
def stop(self) -> bool:
"""
Stoppt den Scheduler.
Returns:
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
"""
if not self._running:
logger.warning("Scheduler läuft nicht")
return False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5.0)
self._running = False
logger.info("Scheduler gestoppt")
return True
def is_running(self) -> bool:
"""
Prüft, ob der Scheduler läuft.
Returns:
bool: True wenn der Scheduler läuft, sonst False
"""
return self._running
def _run(self) -> None:
"""Interne Methode zum Ausführen des Scheduler-Loops."""
while not self._stop_event.is_set():
now = datetime.now()
for task_id, task in self._tasks.items():
if not task["enabled"] or not task["next_run"]:
continue
if now >= task["next_run"]:
logger.info(f"Ausführung von Task {task_id}")
try:
task["func"](*task["args"], **task["kwargs"])
logger.info(f"Task {task_id} erfolgreich ausgeführt")
except Exception as e:
logger.error(f"Fehler bei Ausführung von Task {task_id}: {str(e)}")
task["last_run"] = now
task["next_run"] = now + timedelta(seconds=task["interval"])
# 1 Sekunde warten und erneut prüfen
self._stop_event.wait(1)
# Singleton-Instanz
scheduler = BackgroundTaskScheduler()

View File

@@ -0,0 +1,101 @@
import logging
import logging.handlers
import os
from typing import Dict
from config.settings import (
LOG_DIR, LOG_SUBDIRS, LOG_LEVEL, LOG_FORMAT, LOG_DATE_FORMAT,
get_log_file, ensure_log_directories
)
# Dictionary zur Speicherung der konfigurierten Logger
_loggers: Dict[str, logging.Logger] = {}
def setup_logging():
"""Initialisiert das Logging-System und erstellt alle erforderlichen Verzeichnisse."""
ensure_log_directories()
# Root-Logger konfigurieren
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, LOG_LEVEL))
# Alle Handler entfernen
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Formatter erstellen
formatter = logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT)
# Console Handler für alle Logs
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, LOG_LEVEL))
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File Handler für allgemeine App-Logs
app_log_file = get_log_file("app")
app_handler = logging.handlers.RotatingFileHandler(
app_log_file, maxBytes=10*1024*1024, backupCount=5
)
app_handler.setLevel(getattr(logging, LOG_LEVEL))
app_handler.setFormatter(formatter)
root_logger.addHandler(app_handler)
def get_logger(category: str) -> logging.Logger:
"""
Gibt einen konfigurierten Logger für eine bestimmte Kategorie zurück.
Args:
category: Log-Kategorie (app, scheduler, auth, jobs, printers, errors)
Returns:
logging.Logger: Konfigurierter Logger
"""
if category in _loggers:
return _loggers[category]
# Logger erstellen
logger = logging.getLogger(f"myp.{category}")
logger.setLevel(getattr(logging, LOG_LEVEL))
# Verhindere doppelte Logs durch Parent-Logger
logger.propagate = False
# Formatter erstellen
formatter = logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT)
# Console Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, LOG_LEVEL))
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File Handler für spezifische Kategorie
log_file = get_log_file(category)
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
file_handler.setLevel(getattr(logging, LOG_LEVEL))
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Error-Logs zusätzlich in errors.log schreiben
if category != "errors":
error_log_file = get_log_file("errors")
error_handler = logging.handlers.RotatingFileHandler(
error_log_file, maxBytes=10*1024*1024, backupCount=5
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
_loggers[category] = logger
return logger
def log_startup_info():
"""Loggt Startup-Informationen."""
app_logger = get_logger("app")
app_logger.info("=" * 50)
app_logger.info("MYP (Manage Your Printers) wird gestartet...")
app_logger.info(f"Log-Verzeichnis: {LOG_DIR}")
app_logger.info(f"Log-Level: {LOG_LEVEL}")
app_logger.info("=" * 50)