📚 Reorganized and optimized utility files for improved code structure & maintainability. 🚀🔧💻

This commit is contained in:
2025-06-11 13:56:04 +02:00
parent e4a322b2b5
commit 744edb38b6
13 changed files with 0 additions and 2743 deletions

View File

@@ -1,107 +0,0 @@
#!/usr/bin/env python3
"""
Skript zum Hinzufügen der hardkodierten Drucker in die Datenbank.
"""
import sys
import os
sys.path.append('.')
from utils.settings import PRINTERS
from database.db_manager import DatabaseManager
from models import Printer
from datetime import datetime
def add_hardcoded_printers():
"""Fügt die hardkodierten Drucker in die Datenbank ein."""
print("=== Hardkodierte Drucker hinzufügen ===")
print(f"Zu erstellende Drucker: {len(PRINTERS)}")
try:
db = DatabaseManager()
session = db.get_session()
added_count = 0
for printer_name, config in PRINTERS.items():
# Prüfen, ob Drucker bereits existiert
existing = session.query(Printer).filter(Printer.name == printer_name).first()
if existing:
print(f"⚠️ {printer_name}: Bereits vorhanden (ID: {existing.id})")
continue
# Neuen Drucker erstellen
new_printer = Printer(
name=printer_name,
model="P115", # Standard-Modell
location="Werk 040 - Berlin - TBA", # Aktualisierter Standort
ip_address=config["ip"],
mac_address=f"98:25:4A:E1:{printer_name[-1]}0:0{printer_name[-1]}", # Dummy MAC
plug_ip=config["ip"],
plug_username="admin",
plug_password="admin",
status="available", # Verfügbar, da in Konfiguration
active=True,
created_at=datetime.now()
)
session.add(new_printer)
print(f"{printer_name}: Hinzugefügt (IP: {config['ip']})")
added_count += 1
# Änderungen speichern
session.commit()
session.close()
print(f"\n{added_count} neue Drucker hinzugefügt")
print("Drucker-Erstellung abgeschlossen!")
except Exception as e:
print(f"❌ Fehler beim Hinzufügen: {e}")
if 'session' in locals():
session.rollback()
session.close()
def list_all_printers():
"""Zeigt alle Drucker in der Datenbank an."""
print("\n=== Alle Drucker in der Datenbank ===")
try:
db = DatabaseManager()
session = db.get_session()
printers = session.query(Printer).all()
if not printers:
print("Keine Drucker in der Datenbank gefunden.")
return
print(f"{'ID':<5} {'Name':<15} {'Status':<12} {'IP-Adresse':<15} {'Aktiv':<8}")
print("-" * 60)
for printer in printers:
active_str = "" if printer.active else ""
print(f"{printer.id:<5} {printer.name:<15} {printer.status:<12} {printer.ip_address:<15} {active_str:<8}")
session.close()
except Exception as e:
print(f"❌ Fehler beim Abrufen: {e}")
if 'session' in locals():
session.close()
if __name__ == "__main__":
print("Hardkodierte Drucker-Erstellung")
print("=" * 35)
# Aktuelle Drucker anzeigen
list_all_printers()
# Hardkodierte Drucker hinzufügen
add_hardcoded_printers()
# Alle Drucker anzeigen
list_all_printers()

View File

@@ -1,110 +0,0 @@
#!/usr/bin/env python3
"""
Skript zur Bereinigung der Drucker-Datenbank und Hinzufügung der korrekten hardkodierten Drucker.
"""
import sys
import os
sys.path.append('.')
from utils.settings import PRINTERS
from database.db_manager import DatabaseManager
from models import Printer
from datetime import datetime
def clean_and_add_printers():
"""Bereinigt die Drucker-Datenbank und fügt die korrekten hardkodierten Drucker hinzu."""
print("=== Drucker-Datenbank bereinigen und neu erstellen ===")
print(f"Hardkodierte Drucker: {len(PRINTERS)}")
try:
db = DatabaseManager()
session = db.get_session()
# Alle existierenden Drucker löschen
existing_printers = session.query(Printer).all()
print(f"Lösche {len(existing_printers)} existierende Drucker...")
for printer in existing_printers:
session.delete(printer)
session.commit()
print("✅ Alle alten Drucker gelöscht")
# Neue Drucker hinzufügen
added_count = 0
for printer_name, config in PRINTERS.items():
# Neuen Drucker erstellen
new_printer = Printer(
name=printer_name,
model="P115", # Standard-Modell
location="Werk 040 - Berlin - TBA", # Aktualisierter Standort
ip_address=config["ip"],
mac_address=f"98:25:4A:E1:{printer_name[-1]}0:0{printer_name[-1]}", # Dummy MAC
plug_ip=config["ip"],
plug_username="admin",
plug_password="admin",
status="available", # Verfügbar, da in Konfiguration
active=True,
created_at=datetime.now()
)
session.add(new_printer)
print(f"{printer_name}: Hinzugefügt (IP: {config['ip']})")
added_count += 1
# Änderungen speichern
session.commit()
session.close()
print(f"\n{added_count} neue Drucker hinzugefügt")
print("Drucker-Datenbank erfolgreich bereinigt und neu erstellt!")
except Exception as e:
print(f"❌ Fehler beim Bereinigen: {e}")
if 'session' in locals():
session.rollback()
session.close()
def list_final_printers():
"""Zeigt die finalen Drucker in der Datenbank an."""
print("\n=== Finale Drucker-Liste ===")
try:
db = DatabaseManager()
session = db.get_session()
printers = session.query(Printer).all()
if not printers:
print("Keine Drucker in der Datenbank gefunden.")
return
print(f"{'ID':<5} {'Name':<15} {'Status':<12} {'IP-Adresse':<15} {'Aktiv':<8}")
print("-" * 60)
for printer in printers:
active_str = "" if printer.active else ""
print(f"{printer.id:<5} {printer.name:<15} {printer.status:<12} {printer.ip_address:<15} {active_str:<8}")
session.close()
print(f"\nGesamt: {len(printers)} Drucker")
except Exception as e:
print(f"❌ Fehler beim Abrufen: {e}")
if 'session' in locals():
session.close()
if __name__ == "__main__":
print("Drucker-Datenbank Bereinigung und Neuerstellung")
print("=" * 50)
# Datenbank bereinigen und neue Drucker hinzufügen
clean_and_add_printers()
# Finale Liste anzeigen
list_final_printers()

View File

@@ -1,368 +0,0 @@
"""
Zentrale Konfiguration für das 3D-Druck-Management-System
Diese Datei konsolidiert alle Konfigurationseinstellungen aus dem ehemaligen config-Ordner.
"""
import os
import json
from datetime import timedelta
# ===== BASIS-KONFIGURATION =====
def get_env_variable(name: str, default: str = None) -> str:
"""
Holt eine Umgebungsvariable oder gibt den Standardwert zurück.
Args:
name: Name der Umgebungsvariable
default: Standardwert, falls die Variable nicht gesetzt ist
Returns:
str: Wert der Umgebungsvariable oder Standardwert
"""
return os.environ.get(name, default)
# Geheimschlüssel für Flask-Sessions und CSRF-Schutz
SECRET_KEY = "7445630171969DFAC92C53CEC92E67A9CB2E00B3CB2F"
# Pfad-Konfiguration
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
PROJECT_ROOT = os.path.dirname(BASE_DIR)
DATABASE_PATH = os.path.join(BASE_DIR, "instance", "printer_manager.db")
# ===== SMART PLUG KONFIGURATION =====
# TP-Link Tapo P110 Standardkonfiguration
TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
TAPO_PASSWORD = "744563017196A"
# Automatische Steckdosen-Erkennung
TAPO_AUTO_DISCOVERY = True
# Standard-Steckdosen-IPs
DEFAULT_TAPO_IPS = [
"192.168.0.103",
"192.168.0.104",
"192.168.0.100",
"192.168.0.101",
"192.168.0.102",
"192.168.0.105"
]
# Timeout-Konfiguration für Tapo-Verbindungen
TAPO_TIMEOUT = 10 # Sekunden
TAPO_RETRY_COUNT = 3 # Anzahl Wiederholungsversuche
# ===== DRUCKER-KONFIGURATION =====
PRINTERS = {
"Printer 1": {"ip": "192.168.0.100"},
"Printer 2": {"ip": "192.168.0.101"},
"Printer 3": {"ip": "192.168.0.102"},
"Printer 4": {"ip": "192.168.0.103"},
"Printer 5": {"ip": "192.168.0.104"},
"Printer 6": {"ip": "192.168.0.106"}
}
# ===== LOGGING-KONFIGURATION =====
LOG_DIR = os.path.join(BASE_DIR, "logs")
LOG_SUBDIRS = ["app", "scheduler", "auth", "jobs", "printers", "errors", "admin", "admin_api",
"user", "kiosk", "guest", "uploads", "sessions", "maintenance", "analytics",
"security", "database", "queue_manager", "printer_monitor"]
LOG_LEVEL = get_env_variable("LOG_LEVEL", "INFO")
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
LOG_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5
# ===== FLASK-KONFIGURATION =====
FLASK_HOST = get_env_variable("FLASK_HOST", "0.0.0.0")
FLASK_PORT = int(get_env_variable("FLASK_PORT", "5000"))
FLASK_FALLBACK_PORT = 8080
FLASK_DEBUG = get_env_variable("FLASK_DEBUG", "False").lower() in ("true", "1", "yes")
SESSION_LIFETIME = timedelta(hours=2)
# ===== UPLOAD-KONFIGURATION =====
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'gcode', '3mf', 'stl', 'obj', 'amf'}
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB Maximum-Dateigröße
MAX_FILE_SIZE = 16 * 1024 * 1024 # 16MB Maximum-Dateigröße für Drag & Drop System
# ===== UMGEBUNG =====
ENVIRONMENT = get_env_variable("MYP_ENVIRONMENT", "development")
# ===== SSL-KONFIGURATION =====
SSL_ENABLED = get_env_variable("MYP_SSL_ENABLED", "False").lower() in ("true", "1", "yes")
SSL_CERT_PATH = os.path.join(BASE_DIR, "certs", "myp.crt")
SSL_KEY_PATH = os.path.join(BASE_DIR, "certs", "myp.key")
SSL_HOSTNAME = get_env_variable("MYP_SSL_HOSTNAME", "localhost")
# ===== SCHEDULER-KONFIGURATION =====
SCHEDULER_INTERVAL = 60 # Sekunden
SCHEDULER_ENABLED = get_env_variable("SCHEDULER_ENABLED", "True").lower() in ("true", "1", "yes")
# ===== DATENBANK-KONFIGURATION =====
DATABASE_URL = get_env_variable("DATABASE_URL", f"sqlite:///{DATABASE_PATH}")
SQLALCHEMY_DATABASE_URI = DATABASE_URL
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_pre_ping': True,
'pool_recycle': 300,
'connect_args': {
'check_same_thread': False
}
}
# ===== SICHERHEITS-KONFIGURATION =====
# CSRF-Schutz
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = 3600 # 1 Stunde
# Session-Sicherheit
SESSION_COOKIE_SECURE = SSL_ENABLED # Nur bei HTTPS
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
PERMANENT_SESSION_LIFETIME = SESSION_LIFETIME
# Sicherheits-Headers
SECURITY_HEADERS = {
'Content-Security-Policy': (
"default-src 'self'; "
"script-src 'self' 'unsafe-eval' 'unsafe-inline'; "
"script-src-elem 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"font-src 'self'; "
"img-src 'self' data:; "
"connect-src 'self'; "
"worker-src 'self' blob:; "
"frame-src 'none'; "
"object-src 'none'; "
"base-uri 'self'; "
"form-action 'self'; "
"frame-ancestors 'none';"
),
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': 'geolocation=(), microphone=(), camera=()'
}
# Nur HTTPS-Header wenn SSL aktiviert
if SSL_ENABLED:
SECURITY_HEADERS['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# Rate Limiting
RATE_LIMITS = {
'default': "200 per day, 50 per hour",
'login': "5 per minute",
'api': "100 per hour",
'admin': "500 per hour"
}
# ===== MAIL-KONFIGURATION (Optional) =====
MAIL_SERVER = get_env_variable('MAIL_SERVER')
MAIL_PORT = int(get_env_variable('MAIL_PORT', '587'))
MAIL_USE_TLS = get_env_variable('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = get_env_variable('MAIL_USERNAME')
MAIL_PASSWORD = get_env_variable('MAIL_PASSWORD')
# ===== NETZWERK-KONFIGURATION =====
# Host-Konfiguration für lokalen und Intranet-Zugang
FLASK_HOST = get_env_variable("FLASK_HOST", "0.0.0.0")
FLASK_PORT = int(get_env_variable("FLASK_PORT", "5000"))
FLASK_FALLBACK_PORT = 8080
# Hostname-Konfiguration für Intranet-Zugang
HOSTNAME = "m040tbaraspi001"
INTRANET_DOMAIN = "de040.corpintra.net"
FULL_HOSTNAME = f"{HOSTNAME}.{INTRANET_DOMAIN}"
# Erlaubte Hosts für CORS und Security
ALLOWED_HOSTS = [
"localhost",
"127.0.0.1",
HOSTNAME,
FULL_HOSTNAME,
"0.0.0.0"
]
# URL-Konfiguration
BASE_URL_LOCAL = f"http://localhost:{FLASK_PORT}"
BASE_URL_INTRANET = f"https://{FULL_HOSTNAME}"
# Automatische URL-Ermittlung basierend auf Request
def get_base_url(request=None):
"""
Ermittelt die korrekte Base-URL basierend auf dem Request
Args:
request: Flask Request Objekt (optional)
Returns:
str: Base URL für die Anwendung
"""
if request:
host = request.host
if FULL_HOSTNAME in host:
return BASE_URL_INTRANET
elif "localhost" in host or "127.0.0.1" in host:
return BASE_URL_LOCAL
else:
# Fallback basierend auf Request-Schema
scheme = "https" if request.is_secure else "http"
return f"{scheme}://{host}"
# Fallback ohne Request
return BASE_URL_LOCAL
# ===== HILFSFUNKTIONEN =====
def get_log_file(category: str) -> str:
"""
Gibt den Pfad zur Log-Datei für eine bestimmte Kategorie zurück.
Args:
category: Log-Kategorie
Returns:
str: Pfad zur Log-Datei
"""
if category not in LOG_SUBDIRS:
category = "app"
return os.path.join(LOG_DIR, category, f"{category}.log")
def ensure_log_directories():
"""Erstellt alle erforderlichen Log-Verzeichnisse."""
os.makedirs(LOG_DIR, exist_ok=True)
for subdir in LOG_SUBDIRS:
os.makedirs(os.path.join(LOG_DIR, subdir), exist_ok=True)
def ensure_database_directory():
"""Erstellt das Datenbank-Verzeichnis."""
db_dir = os.path.dirname(DATABASE_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
def ensure_ssl_directory():
"""Erstellt das SSL-Verzeichnis, falls es nicht existiert."""
ssl_dir = os.path.dirname(SSL_CERT_PATH)
if ssl_dir and not os.path.exists(ssl_dir):
os.makedirs(ssl_dir, exist_ok=True)
def ensure_upload_directory():
"""Erstellt das Upload-Verzeichnis, falls es nicht existiert."""
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Unterverzeichnisse erstellen
subdirs = ["jobs", "avatars", "assets", "logs", "backups", "temp", "guests"]
for subdir in subdirs:
os.makedirs(os.path.join(UPLOAD_FOLDER, subdir), exist_ok=True)
def get_security_headers():
"""Gibt die Sicherheits-Headers zurück"""
return SECURITY_HEADERS
def create_simple_ssl_cert():
"""
Erstellt ein Mercedes-Benz SSL-Zertifikat mit dem SSL-Manager.
"""
try:
from utils.ssl_manager import ssl_manager
success = ssl_manager.generate_mercedes_certificate()
if success:
print(f"Mercedes-Benz SSL-Zertifikat erfolgreich erstellt: {SSL_CERT_PATH}")
return True
else:
print("Fehler beim Erstellen des Mercedes-Benz SSL-Zertifikats")
return False
except ImportError as e:
print(f"SSL-Manager nicht verfügbar: {e}")
return False
except Exception as e:
print(f"Fehler beim Erstellen der SSL-Zertifikate: {e}")
return False
# ===== KONFIGURATIONSKLASSEN FÜR VERSCHIEDENE UMGEBUNGEN =====
class Config:
"""Basis-Konfigurationsklasse"""
# Alle Attribute aus den Konstanten übernehmen
SECRET_KEY = SECRET_KEY
DATABASE_URL = DATABASE_URL
SQLALCHEMY_DATABASE_URI = SQLALCHEMY_DATABASE_URI
SQLALCHEMY_TRACK_MODIFICATIONS = SQLALCHEMY_TRACK_MODIFICATIONS
SQLALCHEMY_ENGINE_OPTIONS = SQLALCHEMY_ENGINE_OPTIONS
UPLOAD_FOLDER = UPLOAD_FOLDER
MAX_CONTENT_LENGTH = MAX_CONTENT_LENGTH
ALLOWED_EXTENSIONS = ALLOWED_EXTENSIONS
WTF_CSRF_ENABLED = WTF_CSRF_ENABLED
WTF_CSRF_TIME_LIMIT = WTF_CSRF_TIME_LIMIT
SESSION_COOKIE_SECURE = SESSION_COOKIE_SECURE
SESSION_COOKIE_HTTPONLY = SESSION_COOKIE_HTTPONLY
SESSION_COOKIE_SAMESITE = SESSION_COOKIE_SAMESITE
PERMANENT_SESSION_LIFETIME = PERMANENT_SESSION_LIFETIME
LOG_LEVEL = LOG_LEVEL
LOG_FILE_MAX_BYTES = LOG_FILE_MAX_BYTES
LOG_BACKUP_COUNT = LOG_BACKUP_COUNT
SCHEDULER_ENABLED = SCHEDULER_ENABLED
SCHEDULER_INTERVAL = SCHEDULER_INTERVAL
SSL_ENABLED = SSL_ENABLED
SSL_CERT_PATH = SSL_CERT_PATH
SSL_KEY_PATH = SSL_KEY_PATH
DEFAULT_PORT = FLASK_PORT
DEFAULT_HOST = FLASK_HOST
@staticmethod
def init_app(app):
"""Initialisiert die Anwendung mit dieser Konfiguration."""
pass
class DevelopmentConfig(Config):
"""Entwicklungsumgebung-Konfiguration"""
DEBUG = True
TESTING = False
LOG_LEVEL = 'DEBUG'
SESSION_COOKIE_SECURE = False
WTF_CSRF_ENABLED = False # Für einfacheres API-Testing
class TestingConfig(Config):
"""Test-Umgebung-Konfiguration"""
TESTING = True
DEBUG = True
DATABASE_URL = 'sqlite:///:memory:'
SQLALCHEMY_DATABASE_URI = DATABASE_URL
WTF_CSRF_ENABLED = False
PERMANENT_SESSION_LIFETIME = timedelta(minutes=5)
class ProductionConfig(Config):
"""Produktionsumgebung-Konfiguration"""
DEBUG = False
TESTING = False
SESSION_COOKIE_SECURE = True
WTF_CSRF_ENABLED = True
LOG_LEVEL = 'WARNING'
SSL_ENABLED = True
# Konfigurationswörterbuch
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
def get_config_by_name(config_name):
"""Gibt die Konfigurationsklasse nach Name zurück."""
return config.get(config_name, config['default'])

View File

@@ -1,253 +0,0 @@
#!/usr/bin/env python3
"""
Sofortige Datenbank-Reparatur für fehlende updated_at Spalte
"""
import os
import sys
import sqlite3
from datetime import datetime
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from utils.settings import DATABASE_PATH
def fix_users_table_immediate():
"""Repariert die users Tabelle sofort."""
print(f"Repariere Datenbank: {DATABASE_PATH}")
if not os.path.exists(DATABASE_PATH):
print(f"Datenbankdatei nicht gefunden: {DATABASE_PATH}")
return False
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Prüfen, welche Spalten existieren
cursor.execute("PRAGMA table_info(users)")
existing_columns = [row[1] for row in cursor.fetchall()]
print(f"Vorhandene Spalten in users: {existing_columns}")
# Fehlende Spalten hinzufügen
required_columns = [
('updated_at', 'DATETIME'),
('settings', 'TEXT'),
('department', 'VARCHAR(100)'),
('position', 'VARCHAR(100)'),
('phone', 'VARCHAR(50)'),
('bio', 'TEXT')
]
for column_name, column_type in required_columns:
if column_name not in existing_columns:
try:
if column_name == 'updated_at':
# Einfacher Ansatz: NULL erlauben und später updaten
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} {column_type}")
print(f"✓ Spalte '{column_name}' hinzugefügt")
# Alle vorhandenen Benutzer mit aktuellem Timestamp updaten
cursor.execute(f"UPDATE users SET {column_name} = CURRENT_TIMESTAMP WHERE {column_name} IS NULL")
print(f"✓ Vorhandene Benutzer mit {column_name} aktualisiert")
# Trigger für automatische Updates erstellen
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS update_users_updated_at
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
UPDATE users SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id;
END
""")
print(f"✓ Auto-Update-Trigger für {column_name} erstellt")
else:
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} {column_type}")
print(f"✓ Spalte '{column_name}' hinzugefügt")
except Exception as e:
print(f"✗ Fehler bei Spalte '{column_name}': {str(e)}")
else:
print(f"○ Spalte '{column_name}' bereits vorhanden")
# Weitere fehlende Tabellen prüfen und erstellen
create_missing_tables(cursor)
# Optimierungsindizes erstellen
create_performance_indexes(cursor)
conn.commit()
conn.close()
print("✓ Datenbank-Reparatur erfolgreich abgeschlossen")
return True
except Exception as e:
print(f"✗ Fehler bei der Datenbank-Reparatur: {str(e)}")
if 'conn' in locals():
conn.rollback()
conn.close()
return False
def create_missing_tables(cursor):
"""Erstellt fehlende Tabellen."""
# Prüfen, welche Tabellen existieren
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
existing_tables = [row[0] for row in cursor.fetchall()]
print(f"Vorhandene Tabellen: {existing_tables}")
# user_permissions Tabelle
if 'user_permissions' not in existing_tables:
cursor.execute("""
CREATE TABLE user_permissions (
user_id INTEGER PRIMARY KEY,
can_start_jobs BOOLEAN DEFAULT 0,
needs_approval BOOLEAN DEFAULT 1,
can_approve_jobs BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
print("✓ Tabelle 'user_permissions' erstellt")
# notifications Tabelle
if 'notifications' not in existing_tables:
cursor.execute("""
CREATE TABLE notifications (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
type VARCHAR(50) NOT NULL,
payload TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
read BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
print("✓ Tabelle 'notifications' erstellt")
# stats Tabelle
if 'stats' not in existing_tables:
cursor.execute("""
CREATE TABLE stats (
id INTEGER PRIMARY KEY,
total_print_time INTEGER DEFAULT 0,
total_jobs_completed INTEGER DEFAULT 0,
total_material_used REAL DEFAULT 0.0,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
print("✓ Tabelle 'stats' erstellt")
# Initial stats record erstellen
cursor.execute("""
INSERT INTO stats (total_print_time, total_jobs_completed, total_material_used, last_updated)
VALUES (0, 0, 0.0, CURRENT_TIMESTAMP)
""")
print("✓ Initial-Statistiken erstellt")
# system_logs Tabelle
if 'system_logs' not in existing_tables:
cursor.execute("""
CREATE TABLE system_logs (
id INTEGER PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
level VARCHAR(20) NOT NULL,
message VARCHAR(1000) NOT NULL,
module VARCHAR(100),
user_id INTEGER,
ip_address VARCHAR(50),
user_agent VARCHAR(500),
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
print("✓ Tabelle 'system_logs' erstellt")
def create_performance_indexes(cursor):
"""Erstellt Performance-Indices."""
print("Erstelle Performance-Indices...")
indexes = [
("idx_users_email", "users(email)"),
("idx_users_username", "users(username)"),
("idx_users_role", "users(role)"),
("idx_jobs_user_id", "jobs(user_id)"),
("idx_jobs_printer_id", "jobs(printer_id)"),
("idx_jobs_status", "jobs(status)"),
("idx_jobs_start_at", "jobs(start_at)"),
("idx_notifications_user_id", "notifications(user_id)"),
("idx_notifications_read", "notifications(read)"),
("idx_system_logs_timestamp", "system_logs(timestamp)"),
("idx_system_logs_level", "system_logs(level)"),
("idx_guest_requests_status", "guest_requests(status)"),
("idx_printers_status", "printers(status)"),
("idx_printers_active", "printers(active)")
]
for index_name, index_def in indexes:
try:
cursor.execute(f"CREATE INDEX IF NOT EXISTS {index_name} ON {index_def}")
print(f"✓ Index '{index_name}' erstellt")
except Exception as e:
print(f"○ Index '{index_name}': {str(e)}")
def test_database_access():
"""Testet den Datenbankzugriff nach der Reparatur."""
print("\nTeste Datenbankzugriff...")
try:
# Models importieren und testen
from models import get_cached_session, User, Printer, Job
with get_cached_session() as session:
# Test User-Query
users = session.query(User).limit(5).all()
print(f"✓ User-Abfrage erfolgreich - {len(users)} Benutzer gefunden")
# Test Printer-Query
printers = session.query(Printer).limit(5).all()
print(f"✓ Printer-Abfrage erfolgreich - {len(printers)} Drucker gefunden")
# Test Job-Query
jobs = session.query(Job).limit(5).all()
print(f"✓ Job-Abfrage erfolgreich - {len(jobs)} Jobs gefunden")
print("✓ Alle Datenbank-Tests erfolgreich!")
return True
except Exception as e:
print(f"✗ Datenbank-Test fehlgeschlagen: {str(e)}")
return False
def main():
"""Hauptfunktion für die sofortige Datenbank-Reparatur."""
print("=== SOFORTIGE DATENBANK-REPARATUR ===")
print(f"Zeitstempel: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Datenbank: {DATABASE_PATH}")
print()
# Backup erstellen
if os.path.exists(DATABASE_PATH):
backup_path = f"{DATABASE_PATH}.backup_immediate_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
import shutil
shutil.copy2(DATABASE_PATH, backup_path)
print(f"✓ Backup erstellt: {backup_path}")
except Exception as e:
print(f"⚠ Backup-Erstellung fehlgeschlagen: {str(e)}")
# Reparatur durchführen
if fix_users_table_immediate():
print("\n=== DATENBANK-TEST ===")
if test_database_access():
print("\n🎉 DATENBANK-REPARATUR ERFOLGREICH!")
print("Die Anwendung sollte jetzt funktionieren.")
else:
print("\n❌ DATENBANK-TEST FEHLGESCHLAGEN!")
print("Weitere Diagnose erforderlich.")
else:
print("\n❌ DATENBANK-REPARATUR FEHLGESCHLAGEN!")
print("Manuelle Intervention erforderlich.")
if __name__ == "__main__":
main()

View File

@@ -1,374 +0,0 @@
# -*- coding: utf-8 -*-
"""
Windows-sichere Logging-Konfiguration für MYP Platform
======================================================
Robuste Logging-Konfiguration mit Windows-spezifischen Fixes für File-Locking-Probleme.
"""
import os
import sys
import time
import logging
import threading
from datetime import datetime
from functools import wraps
from typing import Optional, Dict, Any
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# ===== WINDOWS-SICHERE LOGGING-KLASSE =====
class WindowsSafeRotatingFileHandler(RotatingFileHandler):
"""
Windows-sichere Implementierung von RotatingFileHandler.
Behebt das WinError 32 Problem bei gleichzeitigen Log-Dateizugriffen.
"""
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
# Verwende UTF-8 Encoding standardmäßig
if encoding is None:
encoding = 'utf-8'
# Windows-spezifische Konfiguration
self._windows_safe_mode = os.name == 'nt'
self._rotation_lock = threading.Lock()
super().__init__(filename, mode, maxBytes, backupCount, encoding, delay)
def doRollover(self):
"""
Windows-sichere Log-Rotation mit verbessertem Error-Handling.
"""
if not self._windows_safe_mode:
# Normale Rotation für Unix-Systeme
return super().doRollover()
# Windows-spezifische sichere Rotation
with self._rotation_lock:
try:
if self.stream:
self.stream.close()
self.stream = None
# Warte kurz bevor Rotation versucht wird
time.sleep(0.1)
# Versuche Rotation mehrmals mit exponentialem Backoff
max_attempts = 5
for attempt in range(max_attempts):
try:
# Rotation durchführen
super().doRollover()
break
except (PermissionError, OSError) as e:
if attempt == max_attempts - 1:
# Bei letztem Versuch: Erstelle neue Log-Datei ohne Rotation
print(f"WARNUNG: Log-Rotation fehlgeschlagen - erstelle neue Datei: {e}")
self._create_new_log_file()
break
else:
# Warte exponentiell länger bei jedem Versuch
wait_time = 0.5 * (2 ** attempt)
time.sleep(wait_time)
except Exception as e:
print(f"KRITISCHER FEHLER bei Log-Rotation: {e}")
# Notfall: Erstelle neue Log-Datei
self._create_new_log_file()
def _create_new_log_file(self):
"""
Erstellt eine neue Log-Datei als Fallback wenn Rotation fehlschlägt.
"""
try:
# Füge Timestamp zum Dateinamen hinzu
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
base_name, ext = os.path.splitext(self.baseFilename)
new_filename = f"{base_name}_{timestamp}{ext}"
# Öffne neue Datei
self.baseFilename = new_filename
self.stream = self._open()
except Exception as e:
print(f"NOTFALL: Kann keine neue Log-Datei erstellen: {e}")
# Letzter Ausweg: Console-Logging
self.stream = sys.stderr
# ===== GLOBALE LOGGING-KONFIGURATION =====
# Logger-Registry für Singleton-Pattern
_logger_registry: Dict[str, logging.Logger] = {}
_logging_initialized = False
_init_lock = threading.Lock()
def setup_logging(log_level: str = "INFO", base_log_dir: str = None) -> None:
"""
Initialisiert das zentrale Logging-System mit Windows-sicherer Konfiguration.
Args:
log_level: Logging-Level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
base_log_dir: Basis-Verzeichnis für Log-Dateien
"""
global _logging_initialized
with _init_lock:
if _logging_initialized:
return
try:
# Bestimme Log-Verzeichnis
if base_log_dir is None:
current_dir = os.path.dirname(os.path.abspath(__file__))
base_log_dir = os.path.join(current_dir, '..', 'logs')
# Erstelle Log-Verzeichnisse
log_dirs = ['app', 'auth', 'jobs', 'printers', 'scheduler', 'errors']
for log_dir in log_dirs:
full_path = os.path.join(base_log_dir, log_dir)
os.makedirs(full_path, exist_ok=True)
# Konfiguriere Root-Logger
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
# Entferne existierende Handler um Duplikate zu vermeiden
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Console-Handler für kritische Meldungen
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.WARNING)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - [%(levelname)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(console_formatter)
root_logger.addHandler(console_handler)
_logging_initialized = True
print(f"✅ Logging-System erfolgreich initialisiert (Level: {log_level})")
except Exception as e:
print(f"❌ KRITISCHER FEHLER bei Logging-Initialisierung: {e}")
# Notfall-Konfiguration
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format='%(asctime)s - %(name)s - [%(levelname)s] - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
_logging_initialized = True
def get_logger(name: str, log_level: str = None) -> logging.Logger:
"""
Erstellt oder gibt einen konfigurierten Logger zurück.
Args:
name: Name des Loggers (z.B. 'app', 'auth', 'jobs')
log_level: Optionaler spezifischer Log-Level für diesen Logger
Returns:
Konfigurierter Logger
"""
global _logger_registry
# Stelle sicher, dass Logging initialisiert ist
if not _logging_initialized:
setup_logging()
# Prüfe Registry für existierenden Logger
if name in _logger_registry:
return _logger_registry[name]
try:
# Erstelle neuen Logger
logger = logging.getLogger(name)
# Setze spezifischen Level falls angegeben
if log_level:
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
# Erstelle File-Handler mit Windows-sicherer Rotation
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'logs', name)
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'{name}.log')
# Windows-sicherer RotatingFileHandler
file_handler = WindowsSafeRotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
# Detaillierter Formatter für File-Logs
file_formatter = logging.Formatter(
'%(asctime)s - [%(name)s] %(name)s - [%(levelname)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
# Handler hinzufügen
logger.addHandler(file_handler)
# Verhindere Propagation zu Root-Logger um Duplikate zu vermeiden
logger.propagate = False
# In Registry speichern
_logger_registry[name] = logger
return logger
except Exception as e:
print(f"❌ Fehler beim Erstellen des Loggers '{name}': {e}")
# Fallback: Einfacher Logger ohne File-Handler
fallback_logger = logging.getLogger(name)
if name not in _logger_registry:
_logger_registry[name] = fallback_logger
return fallback_logger
# ===== PERFORMANCE-MEASUREMENT DECORATOR =====
def measure_execution_time(logger: logging.Logger = None, task_name: str = "Task"):
"""
Decorator zur Messung und Protokollierung der Ausführungszeit von Funktionen.
Args:
logger: Logger-Instanz für die Ausgabe
task_name: Bezeichnung der Aufgabe für die Logs
Returns:
Decorator-Funktion
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
# Verwende provided Logger oder erstelle Standard-Logger
log = logger or get_logger("performance")
try:
# Führe Funktion aus
result = func(*args, **kwargs)
# Berechne Ausführungszeit
execution_time = (time.time() - start_time) * 1000 # in Millisekunden
# Protokolliere Erfolg
log.info(f"✅ {task_name} '{func.__name__}' erfolgreich in {execution_time:.2f}ms")
return result
except Exception as e:
# Berechne Ausführungszeit auch bei Fehlern
execution_time = (time.time() - start_time) * 1000
# Protokolliere Fehler
log.error(f"❌ {task_name} '{func.__name__}' fehlgeschlagen nach {execution_time:.2f}ms: {str(e)}")
# Exception weiterleiten
raise
return wrapper
return decorator
# ===== STARTUP/DEBUG LOGGING =====
def log_startup_info():
"""
Protokolliert System-Startup-Informationen.
"""
startup_logger = get_logger("startup")
try:
startup_logger.info("=" * 50)
startup_logger.info("🚀 MYP Platform Backend wird gestartet...")
startup_logger.info(f"🐍 Python Version: {sys.version}")
startup_logger.info(f"💻 Betriebssystem: {os.name} ({sys.platform})")
startup_logger.info(f"📁 Arbeitsverzeichnis: {os.getcwd()}")
startup_logger.info(f"⏰ Startzeit: {datetime.now().isoformat()}")
# Windows-spezifische Informationen
if os.name == 'nt':
startup_logger.info("🪟 Windows-Modus: Aktiviert")
startup_logger.info("🔒 Windows-sichere Log-Rotation: Aktiviert")
startup_logger.info("=" * 50)
except Exception as e:
print(f"❌ Fehler beim Startup-Logging: {e}")
def debug_request(logger: logging.Logger, request) -> None:
"""
Detailliertes Request-Debugging.
Args:
logger: Logger für die Ausgabe
request: Flask Request-Objekt
"""
try:
logger.debug(f"📨 REQUEST: {request.method} {request.path}")
logger.debug(f"🌐 Remote-Adresse: {request.remote_addr}")
logger.debug(f"🔤 Content-Type: {request.content_type}")
if request.args:
logger.debug(f"❓ Query-Parameter: {dict(request.args)}")
if request.form and logger.level <= logging.DEBUG:
# Filtere sensible Daten aus Form-Daten
safe_form = {k: '***' if 'password' in k.lower() else v for k, v in request.form.items()}
logger.debug(f"📝 Form-Daten: {safe_form}")
except Exception as e:
logger.error(f"❌ Fehler beim Request-Debugging: {str(e)}")
def debug_response(logger: logging.Logger, response, duration_ms: Optional[float] = None) -> None:
"""
Detailliertes Response-Debugging.
Args:
logger: Logger für die Ausgabe
response: Flask Response-Objekt
duration_ms: Optionale Ausführungszeit in Millisekunden
"""
try:
status_emoji = "✅" if response.status_code < 400 else "❌" if response.status_code >= 500 else "⚠️"
log_msg = f"📤 RESPONSE: {status_emoji} {response.status_code}"
if duration_ms is not None:
log_msg += f" ({duration_ms:.2f}ms)"
logger.debug(log_msg)
logger.debug(f"📏 Content-Length: {response.content_length or 'Unbekannt'}")
except Exception as e:
logger.error(f"❌ Fehler beim Response-Debugging: {str(e)}")
# ===== NOTFALL-LOGGING =====
def emergency_log(message: str, level: str = "ERROR") -> None:
"""
Notfall-Logging das auch funktioniert wenn das Hauptsystem fehlschlägt.
Args:
message: Nachricht
level: Log-Level
"""
try:
# Versuche normales Logging
logger = get_logger("emergency")
getattr(logger, level.lower(), logger.error)(message)
except:
# Fallback zu Print
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[NOTFALL {timestamp}] [{level}] {message}")
# Auto-Initialisierung beim Import
if __name__ != "__main__":
try:
setup_logging()
except Exception as e:
print(f"❌ Auto-Initialisierung des Logging-Systems fehlgeschlagen: {e}")

View File

@@ -1,153 +0,0 @@
#!/usr/bin/env python3
"""
Datenbank-Migrationsskript für Guest-Requests, UserPermissions und Notifications
"""
import os
import sys
import sqlite3
from datetime import datetime
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from models import init_db, get_cached_session, GuestRequest, UserPermission, Notification, User
from utils.logging_config import get_logger
from utils.settings import DATABASE_PATH
logger = get_logger("migrate")
def column_exists(cursor, table_name, column_name):
"""Prüft, ob eine Spalte in einer Tabelle existiert."""
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def get_database_path():
"""Ermittelt den Pfad zur Datenbankdatei."""
# Verwende den korrekten Datenbankpfad aus der Konfiguration
if os.path.exists(DATABASE_PATH):
return DATABASE_PATH
# Fallback für alternative Pfade mit korrektem Dateinamen
alternative_paths = [
os.path.join('database', 'myp.db'),
'myp.db',
'../database/myp.db',
'./database/myp.db'
]
for path in alternative_paths:
if os.path.exists(path):
return path
# Falls keine Datei gefunden wird, verwende den konfigurierten Pfad
return DATABASE_PATH
def migrate_guest_requests_table():
"""Migriert die guest_requests Tabelle für neue Spalten."""
db_path = get_database_path()
if not os.path.exists(db_path):
logger.warning(f"Datenbankdatei nicht gefunden: {db_path}")
return False
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Prüfen ob otp_used_at Spalte bereits existiert
if not column_exists(cursor, 'guest_requests', 'otp_used_at'):
cursor.execute("""
ALTER TABLE guest_requests
ADD COLUMN otp_used_at DATETIME
""")
logger.info("Spalte 'otp_used_at' zur guest_requests Tabelle hinzugefügt")
else:
logger.info("Spalte 'otp_used_at' existiert bereits")
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"Fehler bei der Migration der guest_requests Tabelle: {str(e)}")
if 'conn' in locals():
conn.rollback()
conn.close()
return False
def main():
"""Führt die Datenbank-Migration aus."""
try:
logger.info("Starte Datenbank-Migration...")
# Datenbank initialisieren (erstellt neue Tabellen)
init_db()
# Spezifische Spalten-Migrationen
logger.info("Führe spezifische Tabellen-Migrationen aus...")
migrate_guest_requests_table()
logger.info("Datenbank-Migration erfolgreich abgeschlossen")
# Testen, ob die neuen Tabellen funktionieren
test_new_tables()
except Exception as e:
logger.error(f"Fehler bei der Datenbank-Migration: {str(e)}")
sys.exit(1)
def test_new_tables():
"""Testet, ob die neuen Tabellen korrekt erstellt wurden."""
try:
with get_cached_session() as session:
# Test der GuestRequest-Tabelle
test_request = GuestRequest(
name="Test User",
email="test@example.com",
reason="Test migration",
duration_min=60
)
session.add(test_request)
session.flush()
# Test der UserPermission-Tabelle (mit Admin-User falls vorhanden)
admin_user = session.query(User).filter_by(role="admin").first()
if admin_user:
# Prüfen, ob bereits Permissions für diesen User existieren
existing_permission = session.query(UserPermission).filter_by(user_id=admin_user.id).first()
if not existing_permission:
permission = UserPermission(
user_id=admin_user.id,
can_start_jobs=True,
needs_approval=False,
can_approve_jobs=True
)
session.add(permission)
session.flush()
logger.info(f"UserPermission für Admin-User {admin_user.id} erstellt")
else:
logger.info(f"UserPermission für Admin-User {admin_user.id} existiert bereits")
# Test der Notification-Tabelle
notification = Notification(
user_id=admin_user.id,
type="test",
payload='{"message": "Test notification"}'
)
session.add(notification)
session.flush()
# Test-Daten wieder löschen
session.rollback()
logger.info("Alle neuen Tabellen wurden erfolgreich getestet")
except Exception as e:
logger.error(f"Fehler beim Testen der neuen Tabellen: {str(e)}")
raise
if __name__ == "__main__":
main()

View File

@@ -1,83 +0,0 @@
#!/usr/bin/env python3.11
"""
Migrations-Skript für Benutzereinstellungen
Fügt neue Spalten zur users-Tabelle hinzu für erweiterte Benutzereinstellungen.
Autor: MYP Team
Datum: 2025-06-09
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import text, inspect
from models import get_db_session, engine
from utils.logging_config import get_logger
logger = get_logger("migration")
def check_column_exists(table_name: str, column_name: str) -> bool:
"""Prüft, ob eine Spalte in einer Tabelle existiert"""
try:
inspector = inspect(engine)
columns = [col['name'] for col in inspector.get_columns(table_name)]
return column_name in columns
except Exception as e:
logger.error(f"Fehler beim Prüfen der Spalte {column_name}: {e}")
return False
def add_user_settings_columns():
"""Fügt die neuen Benutzereinstellungs-Spalten hinzu"""
session = get_db_session()
# Neue Spalten definieren
new_columns = [
("theme_preference", "VARCHAR(20) DEFAULT 'auto'"),
("language_preference", "VARCHAR(10) DEFAULT 'de'"),
("email_notifications", "BOOLEAN DEFAULT 1"),
("browser_notifications", "BOOLEAN DEFAULT 1"),
("dashboard_layout", "VARCHAR(20) DEFAULT 'default'"),
("compact_mode", "BOOLEAN DEFAULT 0"),
("show_completed_jobs", "BOOLEAN DEFAULT 1"),
("auto_refresh_interval", "INTEGER DEFAULT 30"),
("auto_logout_timeout", "INTEGER DEFAULT 0")
]
try:
for column_name, column_definition in new_columns:
if not check_column_exists('users', column_name):
logger.info(f"Füge Spalte {column_name} zur users-Tabelle hinzu...")
# SQLite-kompatible ALTER TABLE Syntax
sql = f"ALTER TABLE users ADD COLUMN {column_name} {column_definition}"
session.execute(text(sql))
session.commit()
logger.info(f"Spalte {column_name} erfolgreich hinzugefügt")
else:
logger.info(f"Spalte {column_name} existiert bereits")
logger.info("Migration der Benutzereinstellungen erfolgreich abgeschlossen")
except Exception as e:
logger.error(f"Fehler bei der Migration: {e}")
session.rollback()
raise e
finally:
session.close()
def main():
"""Hauptfunktion für die Migration"""
try:
logger.info("Starte Migration der Benutzereinstellungen...")
add_user_settings_columns()
logger.info("Migration erfolgreich abgeschlossen")
return True
except Exception as e:
logger.error(f"Migration fehlgeschlagen: {e}")
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -1,216 +0,0 @@
#!/usr/bin/env python3
"""
Frontend Optimization Script for MYP Platform
Optimizes JavaScript and CSS files for better performance
"""
import os
import gzip
import shutil
import hashlib
from pathlib import Path
def minify_file(content, file_type='js'):
"""Basic minification - removes comments and extra whitespace"""
if file_type == 'js':
# Remove single-line comments
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Skip lines that are only comments
stripped = line.strip()
if stripped.startswith('//'):
continue
# Remove inline comments
if '//' in line:
line = line.split('//')[0].rstrip()
cleaned_lines.append(line)
content = '\n'.join(cleaned_lines)
# Remove multi-line comments
import re
content = re.sub(r'/\*[\s\S]*?\*/', '', content)
# Remove extra whitespace
content = re.sub(r'\s+', ' ', content)
content = re.sub(r'\s*([{}();,:])\s*', r'\1', content)
elif file_type == 'css':
# Remove CSS comments
import re
content = re.sub(r'/\*[\s\S]*?\*/', '', content)
# Remove extra whitespace
content = re.sub(r'\s+', ' ', content)
content = re.sub(r'\s*([{}:;,])\s*', r'\1', content)
return content.strip()
def compress_file(file_path, force=False):
"""Compress file with gzip"""
gz_path = file_path + '.gz'
# Skip if already compressed and not forcing
if os.path.exists(gz_path) and not force:
return False
with open(file_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb', compresslevel=9) as f_out:
shutil.copyfileobj(f_in, f_out)
return True
def optimize_js_files(js_dir):
"""Optimize JavaScript files"""
js_path = Path(js_dir)
optimized_count = 0
for js_file in js_path.glob('*.js'):
# Skip already minified files
if js_file.name.endswith('.min.js'):
continue
min_file = js_file.with_suffix('.min.js')
# Skip if minified version already exists
if min_file.exists():
continue
print(f"Optimizing {js_file.name}...")
# Read and minify
content = js_file.read_text(encoding='utf-8')
minified = minify_file(content, 'js')
# Write minified version
min_file.write_text(minified, encoding='utf-8')
# Compress both versions
compress_file(str(js_file))
compress_file(str(min_file))
optimized_count += 1
return optimized_count
def optimize_css_files(css_dir):
"""Optimize CSS files"""
css_path = Path(css_dir)
optimized_count = 0
for css_file in css_path.glob('*.css'):
# Skip already minified files
if css_file.name.endswith('.min.css'):
continue
min_file = css_file.with_suffix('.min.css')
# Skip if minified version already exists
if min_file.exists():
continue
print(f"Optimizing {css_file.name}...")
# Read and minify
content = css_file.read_text(encoding='utf-8')
minified = minify_file(content, 'css')
# Write minified version
min_file.write_text(minified, encoding='utf-8')
# Compress both versions
compress_file(str(css_file))
compress_file(str(min_file))
optimized_count += 1
return optimized_count
def create_bundle_js(js_dir):
"""Create bundled JavaScript file with core utilities"""
js_path = Path(js_dir)
# Core files to bundle in order
core_files = [
'core-utilities.js',
'dark-mode.js',
'user-dropdown.js'
]
bundle_content = []
for file_name in core_files:
file_path = js_path / file_name
if file_path.exists():
content = file_path.read_text(encoding='utf-8')
bundle_content.append(f"/* === {file_name} === */\n{content}\n")
if bundle_content:
bundle_path = js_path / 'core-bundle.min.js'
bundled = '\n'.join(bundle_content)
minified = minify_file(bundled, 'js')
bundle_path.write_text(minified, encoding='utf-8')
compress_file(str(bundle_path))
print(f"Created core bundle: {bundle_path.name}")
def main():
"""Main optimization function"""
base_dir = Path(__file__).parent.parent
static_dir = base_dir / 'static'
js_dir = static_dir / 'js'
css_dir = static_dir / 'css'
print("Starting frontend optimization...")
# Optimize JavaScript
js_count = optimize_js_files(js_dir)
print(f"Optimized {js_count} JavaScript files")
# Optimize CSS
css_count = optimize_css_files(css_dir)
print(f"Optimized {css_count} CSS files")
# Create JavaScript bundle
create_bundle_js(js_dir)
# Compress performance-optimized.css if not already done
perf_css = css_dir / 'performance-optimized.css'
if perf_css.exists():
compress_file(str(perf_css), force=True)
# Create minified version
min_perf_css = css_dir / 'performance-optimized.min.css'
if not min_perf_css.exists():
content = perf_css.read_text(encoding='utf-8')
minified = minify_file(content, 'css')
min_perf_css.write_text(minified, encoding='utf-8')
compress_file(str(min_perf_css))
# Compress core-utilities files
core_js = js_dir / 'core-utilities.js'
core_css = css_dir / 'core-utilities.css'
if core_js.exists():
compress_file(str(core_js), force=True)
# Create minified version
min_core_js = js_dir / 'core-utilities.min.js'
if not min_core_js.exists():
content = core_js.read_text(encoding='utf-8')
minified = minify_file(content, 'js')
min_core_js.write_text(minified, encoding='utf-8')
compress_file(str(min_core_js))
if core_css.exists():
compress_file(str(core_css), force=True)
# Create minified version
min_core_css = css_dir / 'core-utilities.min.css'
if not min_core_css.exists():
content = core_css.read_text(encoding='utf-8')
minified = minify_file(content, 'css')
min_core_css.write_text(minified, encoding='utf-8')
compress_file(str(min_core_css))
print("\nOptimization complete!")
print("Remember to update templates to use minified versions in production.")
if __name__ == "__main__":
main()

View File

@@ -1,233 +0,0 @@
#!/usr/bin/env python3
"""
Schnelle Datenbank-Reparatur für kritische Fehler
"""
import sqlite3
import os
import sys
from datetime import datetime
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
from utils.settings import DATABASE_PATH
except ImportError:
# Fallback falls Import fehlschlägt
DATABASE_PATH = "database/myp.db"
def quick_fix_database():
"""Behebt die kritischsten Datenbankprobleme sofort"""
print("🔧 Starte schnelle Datenbank-Reparatur...")
if not os.path.exists(DATABASE_PATH):
print(f"❌ Datenbankdatei nicht gefunden: {DATABASE_PATH}")
return False
try:
# Backup erstellen
backup_path = f"{DATABASE_PATH}.emergency_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
import shutil
shutil.copy2(DATABASE_PATH, backup_path)
print(f"✅ Emergency-Backup erstellt: {backup_path}")
# Verbindung zur Datenbank
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
print("🔧 Repariere Datenbank-Schema...")
# 1. Fehlende Spalte duration_minutes zu guest_requests hinzufügen
try:
cursor.execute("ALTER TABLE guest_requests ADD COLUMN duration_minutes INTEGER")
print("✅ Spalte duration_minutes zu guest_requests hinzugefügt")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e).lower():
print(" Spalte duration_minutes bereits vorhanden")
else:
print(f"⚠️ Fehler bei duration_minutes: {e}")
# 2. Fehlende Spalten zu users hinzufügen
user_columns = [
("username", "VARCHAR(100) UNIQUE"),
("updated_at", "DATETIME DEFAULT CURRENT_TIMESTAMP"),
("department", "VARCHAR(100)"),
("position", "VARCHAR(100)"),
("phone", "VARCHAR(50)"),
("bio", "TEXT")
]
for column_name, column_def in user_columns:
try:
cursor.execute(f"ALTER TABLE users ADD COLUMN {column_name} {column_def}")
print(f"✅ Spalte {column_name} zu users hinzugefügt")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e).lower():
print(f" Spalte {column_name} bereits vorhanden")
else:
print(f"⚠️ Fehler bei {column_name}: {e}")
# 3. Fehlende Spalten zu printers hinzufügen
printer_columns = [
("plug_username", "VARCHAR(100) DEFAULT 'admin'"),
("plug_password", "VARCHAR(100) DEFAULT 'admin'"),
("last_checked", "DATETIME")
]
for column_name, column_def in printer_columns:
try:
cursor.execute(f"ALTER TABLE printers ADD COLUMN {column_name} {column_def}")
print(f"✅ Spalte {column_name} zu printers hinzugefügt")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e).lower():
print(f" Spalte {column_name} bereits vorhanden")
else:
print(f"⚠️ Fehler bei {column_name}: {e}")
# 4. Username für bestehende User setzen (falls NULL)
try:
cursor.execute("UPDATE users SET username = email WHERE username IS NULL")
updated_users = cursor.rowcount
if updated_users > 0:
print(f"✅ Username für {updated_users} Benutzer gesetzt")
except Exception as e:
print(f"⚠️ Fehler beim Setzen der Usernames: {e}")
# 5. Drucker-Daten nachtragen
print("🖨️ Trage Drucker nach...")
# Prüfen ob bereits Drucker vorhanden sind
cursor.execute("SELECT COUNT(*) FROM printers")
printer_count = cursor.fetchone()[0]
if printer_count == 0:
# Standard-Drucker hinzufügen
printers_to_add = [
{
'name': 'Printer 1',
'model': 'P115',
'location': 'Werk 040 - Berlin - TBA',
'ip_address': '192.168.0.100',
'mac_address': '98:254A:E1:2001',
'plug_ip': '192.168.0.100',
'plug_username': 'admin',
'plug_password': 'admin',
'status': 'offline',
'active': 1
},
{
'name': 'Printer 2',
'model': 'P115',
'location': 'Werk 040 - Berlin - TBA',
'ip_address': '192.168.0.101',
'mac_address': '98:254A:E1:2002',
'plug_ip': '192.168.0.101',
'plug_username': 'admin',
'plug_password': 'admin',
'status': 'offline',
'active': 1
},
{
'name': 'Printer 3',
'model': 'P115',
'location': 'Werk 040 - Berlin - TBA',
'ip_address': '192.168.0.102',
'mac_address': '98:254A:E1:2003',
'plug_ip': '192.168.0.102',
'plug_username': 'admin',
'plug_password': 'admin',
'status': 'offline',
'active': 1
},
{
'name': 'Printer 4',
'model': 'P115',
'location': 'Werk 040 - Berlin - TBA',
'ip_address': '192.168.0.103',
'mac_address': '98:254A:E1:2004',
'plug_ip': '192.168.0.103',
'plug_username': 'admin',
'plug_password': 'admin',
'status': 'offline',
'active': 1
},
{
'name': 'Printer 5',
'model': 'P115',
'location': 'Werk 040 - Berlin - TBA',
'ip_address': '192.168.0.104',
'mac_address': '98:254A:E1:2005',
'plug_ip': '192.168.0.104',
'plug_username': 'admin',
'plug_password': 'admin',
'status': 'offline',
'active': 1
},
{
'name': 'Printer 6',
'model': 'P115',
'location': 'Werk 040 - Berlin - TBA',
'ip_address': '192.168.0.106',
'mac_address': '98:254A:E1:2006',
'plug_ip': '192.168.0.106',
'plug_username': 'admin',
'plug_password': 'admin',
'status': 'offline',
'active': 1
}
]
for printer in printers_to_add:
try:
cursor.execute("""
INSERT INTO printers (name, model, location, ip_address, mac_address, plug_ip, plug_username, plug_password, status, active, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
printer['name'], printer['model'], printer['location'],
printer['ip_address'], printer['mac_address'], printer['plug_ip'],
printer['plug_username'], printer['plug_password'],
printer['status'], printer['active'], datetime.now()
))
print(f"✅ Drucker {printer['name']} hinzugefügt")
except Exception as e:
print(f"⚠️ Fehler beim Hinzufügen von {printer['name']}: {e}")
else:
print(f" {printer_count} Drucker bereits vorhanden")
# 6. Optimierungen
print("🔧 Führe Datenbankoptimierungen durch...")
try:
# Indizes erstellen
indices = [
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
"CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)",
"CREATE INDEX IF NOT EXISTS idx_jobs_user_id ON jobs(user_id)",
"CREATE INDEX IF NOT EXISTS idx_jobs_printer_id ON jobs(printer_id)",
"CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)",
"CREATE INDEX IF NOT EXISTS idx_guest_requests_status ON guest_requests(status)"
]
for index_sql in indices:
cursor.execute(index_sql)
# Statistiken aktualisieren
cursor.execute("ANALYZE")
print("✅ Datenbankoptimierungen abgeschlossen")
except Exception as e:
print(f"⚠️ Fehler bei Optimierungen: {e}")
# Änderungen speichern
conn.commit()
conn.close()
print("✅ Schnelle Datenbank-Reparatur erfolgreich abgeschlossen!")
return True
except Exception as e:
print(f"❌ Kritischer Fehler bei der Reparatur: {str(e)}")
return False
if __name__ == "__main__":
quick_fix_database()

View File

@@ -1,344 +0,0 @@
"""
Zentrale Konfigurationsdatei für das 3D-Druck-Management-System
Diese Datei enthält alle Konfigurationseinstellungen, die zuvor im config-Ordner waren.
"""
import os
import json
from datetime import timedelta
def get_env_variable(name: str, default: str = None) -> str:
"""
Holt eine Umgebungsvariable oder gibt den Standardwert zurück.
Args:
name: Name der Umgebungsvariable
default: Standardwert, falls die Variable nicht gesetzt ist
Returns:
str: Wert der Umgebungsvariable oder Standardwert
"""
return os.environ.get(name, default)
# ===== GRUNDLEGENDE KONFIGURATION =====
# Hardcodierte Konfiguration
SECRET_KEY = "7445630171969DFAC92C53CEC92E67A9CB2E00B3CB2F"
# Dynamische Pfade basierend auf dem aktuellen Arbeitsverzeichnis
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Projekt-Wurzel
DATABASE_PATH = os.path.join(BASE_DIR, "instance", "printer_manager.db")
# ===== SMART PLUG KONFIGURATION =====
# TP-Link Tapo P110 Standardkonfiguration
TAPO_USERNAME = "till.tomczak@mercedes-benz.com"
TAPO_PASSWORD = "744563017196A"
# Automatische Steckdosen-Erkennung aktivieren
TAPO_AUTO_DISCOVERY = True
# Standard-Steckdosen-IPs (diese können später in der Datenbank überschrieben werden)
DEFAULT_TAPO_IPS = [
"192.168.0.103", # Erreichbare Steckdose laut Test
"192.168.0.104", # Erreichbare Steckdose laut Test
"192.168.0.100",
"192.168.0.101",
"192.168.0.102",
"192.168.0.105"
]
# Timeout-Konfiguration für Tapo-Verbindungen
TAPO_TIMEOUT = 10 # Sekunden
TAPO_RETRY_COUNT = 3 # Anzahl Wiederholungsversuche
# ===== DRUCKER-KONFIGURATION =====
PRINTERS = {
"Printer 1": {"ip": "192.168.0.100"},
"Printer 2": {"ip": "192.168.0.101"},
"Printer 3": {"ip": "192.168.0.102"},
"Printer 4": {"ip": "192.168.0.103"},
"Printer 5": {"ip": "192.168.0.104"},
"Printer 6": {"ip": "192.168.0.106"}
}
# ===== LOGGING-KONFIGURATION =====
LOG_DIR = os.path.join(BASE_DIR, "logs")
LOG_SUBDIRS = ["app", "scheduler", "auth", "jobs", "printers", "errors", "user", "kiosk",
"admin", "admin_api", "guest", "analytics", "uploads", "sessions"]
LOG_LEVEL = "INFO"
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
LOG_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5
# ===== FLASK-KONFIGURATION =====
FLASK_HOST = "0.0.0.0"
FLASK_PORT = 443 # Kann auf 8443 geändert werden für nicht-privilegierte Ports
FLASK_FALLBACK_PORT = 8080
FLASK_DEBUG = False # In Produktion auf False setzen!
SESSION_LIFETIME = timedelta(hours=2) # Session-Dauer
# ===== UPLOAD-KONFIGURATION =====
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'gcode', '3mf', 'stl', 'obj', 'amf'}
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB Maximum-Dateigröße
MAX_FILE_SIZE = 16 * 1024 * 1024 # 16MB Maximum-Dateigröße für Drag & Drop System
# ===== UMGEBUNGSKONFIGURATION =====
ENVIRONMENT = get_env_variable("MYP_ENVIRONMENT", "development")
# ===== SSL-KONFIGURATION =====
SSL_ENABLED = get_env_variable("MYP_SSL_ENABLED", "True").lower() in ("true", "1", "yes")
SSL_CERT_PATH = os.path.join(BASE_DIR, "certs", "myp.crt")
SSL_KEY_PATH = os.path.join(BASE_DIR, "certs", "myp.key")
SSL_HOSTNAME = get_env_variable("MYP_SSL_HOSTNAME", "localhost")
# ===== SCHEDULER-KONFIGURATION =====
SCHEDULER_INTERVAL = 60 # Sekunden
SCHEDULER_ENABLED = True
# ===== DATENBANK-KONFIGURATION =====
DB_ENGINE = f"sqlite:///{DATABASE_PATH}"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_pre_ping': True,
'pool_recycle': 300,
}
# ===== SICHERHEITSKONFIGURATION =====
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = 3600 # 1 Stunde
SESSION_COOKIE_SECURE = SSL_ENABLED # Nur bei HTTPS
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# ===== E-MAIL-KONFIGURATION (Optional) =====
MAIL_SERVER = get_env_variable('MAIL_SERVER')
MAIL_PORT = int(get_env_variable('MAIL_PORT', '587'))
MAIL_USE_TLS = get_env_variable('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = get_env_variable('MAIL_USERNAME')
MAIL_PASSWORD = get_env_variable('MAIL_PASSWORD')
# ===== HILFSFUNKTIONEN =====
def get_log_file(category: str) -> str:
"""
Gibt den Pfad zur Log-Datei für eine bestimmte Kategorie zurück.
Args:
category: Log-Kategorie (app, scheduler, auth, jobs, printers, errors, etc.)
Returns:
str: Pfad zur Log-Datei
"""
if category not in LOG_SUBDIRS:
category = "app"
return os.path.join(LOG_DIR, category, f"{category}.log")
def ensure_log_directories():
"""Erstellt alle erforderlichen Log-Verzeichnisse."""
os.makedirs(LOG_DIR, exist_ok=True)
for subdir in LOG_SUBDIRS:
os.makedirs(os.path.join(LOG_DIR, subdir), exist_ok=True)
def ensure_database_directory():
"""Erstellt das Datenbank-Verzeichnis."""
db_dir = os.path.dirname(DATABASE_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
def ensure_ssl_directory():
"""Erstellt das SSL-Verzeichnis, falls es nicht existiert."""
ssl_dir = os.path.dirname(SSL_CERT_PATH)
if ssl_dir and not os.path.exists(ssl_dir):
os.makedirs(ssl_dir, exist_ok=True)
def ensure_upload_directory():
"""Erstellt das Upload-Verzeichnis, falls es nicht existiert."""
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Erstelle Unterordner für verschiedene Upload-Typen
subdirs = ['jobs', 'guests', 'avatars', 'assets', 'logs', 'backups', 'temp']
for subdir in subdirs:
os.makedirs(os.path.join(UPLOAD_FOLDER, subdir), exist_ok=True)
def get_ssl_context():
"""
Gibt den SSL-Kontext für Flask zurück, wenn SSL aktiviert ist.
Returns:
tuple oder None: Tuple mit Zertifikat- und Schlüsselpfad, wenn SSL aktiviert ist, sonst None
"""
if not SSL_ENABLED:
return None
# Wenn Zertifikate nicht existieren, diese automatisch erstellen
if not os.path.exists(SSL_CERT_PATH) or not os.path.exists(SSL_KEY_PATH):
ensure_ssl_directory()
# Im Entwicklungsmodus versuchen wir, einfache Zertifikate zu erstellen
if FLASK_DEBUG:
print("SSL-Zertifikate nicht gefunden. Erstelle einfache selbstsignierte Zertifikate...")
try:
# Einfache Zertifikate mit Python erstellen
create_simple_ssl_cert()
# Prüfen, ob die Zertifikate erfolgreich erstellt wurden
if not os.path.exists(SSL_CERT_PATH) or not os.path.exists(SSL_KEY_PATH):
print("Konnte keine SSL-Zertifikate erstellen.")
return None
except Exception as e:
print(f"Fehler beim Erstellen der SSL-Zertifikate: {e}")
return None
else:
print("WARNUNG: SSL-Zertifikate nicht gefunden und Nicht-Debug-Modus. SSL wird deaktiviert.")
return None
return (SSL_CERT_PATH, SSL_KEY_PATH)
def create_simple_ssl_cert():
"""
Erstellt ein Mercedes-Benz SSL-Zertifikat mit dem SSL-Manager.
"""
try:
# Verwende den SSL-Manager
from utils.ssl_manager import ssl_manager
success = ssl_manager.generate_mercedes_certificate()
if success:
print(f"Mercedes-Benz SSL-Zertifikat erfolgreich erstellt: {SSL_CERT_PATH}")
return True
else:
print("Fehler beim Erstellen des Mercedes-Benz SSL-Zertifikats")
return None
except ImportError as e:
print(f"SSL-Manager nicht verfügbar: {e}")
return None
except Exception as e:
print(f"Fehler beim Erstellen der SSL-Zertifikate: {e}")
return None
# ===== KONFIGURATIONSKLASSEN FÜR VERSCHIEDENE UMGEBUNGEN =====
class Config:
"""Basis-Konfigurationsklasse mit gemeinsamen Einstellungen."""
SECRET_KEY = SECRET_KEY
PERMANENT_SESSION_LIFETIME = SESSION_LIFETIME
SESSION_COOKIE_SECURE = SESSION_COOKIE_SECURE
SESSION_COOKIE_HTTPONLY = SESSION_COOKIE_HTTPONLY
SESSION_COOKIE_SAMESITE = SESSION_COOKIE_SAMESITE
SQLALCHEMY_DATABASE_URI = DB_ENGINE
SQLALCHEMY_TRACK_MODIFICATIONS = SQLALCHEMY_TRACK_MODIFICATIONS
SQLALCHEMY_ENGINE_OPTIONS = SQLALCHEMY_ENGINE_OPTIONS
UPLOAD_FOLDER = UPLOAD_FOLDER
MAX_CONTENT_LENGTH = MAX_CONTENT_LENGTH
ALLOWED_EXTENSIONS = ALLOWED_EXTENSIONS
WTF_CSRF_ENABLED = WTF_CSRF_ENABLED
WTF_CSRF_TIME_LIMIT = WTF_CSRF_TIME_LIMIT
LOG_LEVEL = LOG_LEVEL
LOG_FILE_MAX_BYTES = LOG_FILE_MAX_BYTES
LOG_BACKUP_COUNT = LOG_BACKUP_COUNT
SCHEDULER_ENABLED = SCHEDULER_ENABLED
SCHEDULER_INTERVAL = SCHEDULER_INTERVAL
SSL_ENABLED = SSL_ENABLED
SSL_CERT_PATH = SSL_CERT_PATH
SSL_KEY_PATH = SSL_KEY_PATH
DEFAULT_PORT = FLASK_PORT
DEFAULT_HOST = FLASK_HOST
@staticmethod
def init_app(app):
"""Initialisiere Anwendung mit dieser Konfiguration."""
pass
class DevelopmentConfig(Config):
"""Entwicklungsumgebung-Konfiguration."""
DEBUG = True
TESTING = False
LOG_LEVEL = 'DEBUG'
SESSION_COOKIE_SECURE = False
WTF_CSRF_ENABLED = False # Für einfacheres API-Testing
@staticmethod
def init_app(app):
Config.init_app(app)
import logging
logging.basicConfig(level=logging.DEBUG)
class TestingConfig(Config):
"""Test-Umgebung-Konfiguration."""
TESTING = True
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
WTF_CSRF_ENABLED = False
PERMANENT_SESSION_LIFETIME = timedelta(minutes=5)
@staticmethod
def init_app(app):
Config.init_app(app)
class ProductionConfig(Config):
"""Produktionsumgebung-Konfiguration."""
DEBUG = False
TESTING = False
SESSION_COOKIE_SECURE = True # Erfordert HTTPS
WTF_CSRF_ENABLED = True
LOG_LEVEL = 'WARNING'
SSL_ENABLED = True
@staticmethod
def init_app(app):
Config.init_app(app)
# Produktions-spezifische Initialisierung
import logging
from logging.handlers import RotatingFileHandler
# Log-Verzeichnis sicherstellen
ensure_log_directories()
# Datei-Logging für Produktion einrichten
file_handler = RotatingFileHandler(
get_log_file('app'),
maxBytes=Config.LOG_FILE_MAX_BYTES,
backupCount=Config.LOG_BACKUP_COUNT
)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
file_handler.setLevel(logging.WARNING)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.WARNING)
# Konfigurations-Dictionary für einfachen Zugriff
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
def get_config_by_name(config_name):
"""
Hole Konfigurationsklasse nach Name.
Args:
config_name (str): Name der Konfiguration ('development', 'testing', 'production')
Returns:
Config: Konfigurationsklasse
"""
return config.get(config_name, config['default'])

View File

@@ -1,117 +0,0 @@
#!/usr/bin/env python3
"""
Drucker-Datenbank Setup für MYP Platform
Trägt die hardkodierten Drucker in die Datenbank ein.
"""
import os
import sys
sys.path.append('.')
from utils.settings import PRINTERS
from database.db_manager import DatabaseManager
from models import Printer
from datetime import datetime
def setup_drucker():
"""Trägt die hardkodierten Drucker in die Datenbank ein."""
print("=== MYP Platform - Drucker-Setup ===")
print(f"Hardkodierte Drucker: {len(PRINTERS)}")
try:
db = DatabaseManager()
session = db.get_session()
# Alle existierenden Drucker löschen
existing_printers = session.query(Printer).all()
if existing_printers:
print(f"Lösche {len(existing_printers)} existierende Drucker...")
for printer in existing_printers:
session.delete(printer)
session.commit()
print("✅ Alle alten Drucker gelöscht")
else:
print("Keine existierenden Drucker gefunden")
# Neue Drucker hinzufügen
added_count = 0
for printer_name, config in PRINTERS.items():
# Neuen Drucker erstellen
new_printer = Printer(
name=printer_name,
model="P115", # Standard-Modell
location="Werk 040 - Berlin - TBA", # Aktualisierter Standort
ip_address=config["ip"],
mac_address=f"98:25:4A:E1:{printer_name[-1]}0:0{printer_name[-1]}", # Dummy MAC
plug_ip=config["ip"],
plug_username="admin",
plug_password="admin",
status="available", # Verfügbar, da in Konfiguration
active=True,
created_at=datetime.now()
)
session.add(new_printer)
print(f"{printer_name}: Hinzugefügt (IP: {config['ip']})")
added_count += 1
# Änderungen speichern
session.commit()
session.close()
print(f"\n{added_count} neue Drucker erfolgreich hinzugefügt")
return True
except Exception as e:
print(f"❌ Fehler beim Setup der Drucker: {e}")
import traceback
traceback.print_exc()
if 'session' in locals():
session.rollback()
session.close()
return False
def list_drucker():
"""Zeigt alle Drucker in der Datenbank an."""
print("\n=== Drucker in der Datenbank ===")
try:
db = DatabaseManager()
session = db.get_session()
printers = session.query(Printer).all()
if not printers:
print("Keine Drucker in der Datenbank gefunden.")
return True
print(f"{'ID':<5} {'Name':<15} {'Status':<12} {'IP-Adresse':<15} {'Aktiv':<8}")
print("-" * 60)
for printer in printers:
active_str = "" if printer.active else ""
print(f"{printer.id:<5} {printer.name:<15} {printer.status:<12} {printer.ip_address:<15} {active_str:<8}")
session.close()
print(f"\nGesamt: {len(printers)} Drucker")
return True
except Exception as e:
print(f"❌ Fehler beim Abrufen der Drucker: {e}")
if 'session' in locals():
session.close()
return False
if __name__ == "__main__":
print("MYP Platform - Drucker-Datenbank Setup")
print("=" * 40)
success = setup_drucker()
if success:
list_drucker()
print("\n✅ Drucker-Setup erfolgreich abgeschlossen!")
else:
print("\n❌ Drucker-Setup fehlgeschlagen!")
sys.exit(1)

View File

@@ -1,275 +0,0 @@
#!/usr/bin/env python3
"""
Test-Script für den DatabaseCleanupManager
Validiert die robuste Datenbank-Cleanup-Funktionalität
"""
import os
import sys
import time
import sqlite3
import threading
from datetime import datetime
# Pfad zur App hinzufügen
app_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, app_dir)
from utils.database_cleanup import DatabaseCleanupManager, safe_database_cleanup
from utils.settings import DATABASE_PATH
from utils.logging_config import get_logger
logger = get_logger("database_cleanup_test")
def test_basic_cleanup():
"""Test der grundlegenden Cleanup-Funktionalität"""
print("🧪 Test 1: Grundlegende Cleanup-Funktionalität")
try:
# Erstelle Test-DatabaseCleanupManager
cleanup_manager = DatabaseCleanupManager()
# Teste WAL-Checkpoint
checkpoint_success, checkpoint_error = cleanup_manager.safe_wal_checkpoint(retry_attempts=3)
if checkpoint_success:
print("✅ WAL-Checkpoint erfolgreich")
else:
print(f"❌ WAL-Checkpoint fehlgeschlagen: {checkpoint_error}")
# Teste umfassendes Cleanup
cleanup_result = cleanup_manager.comprehensive_cleanup(force_mode_switch=False) # Kein Mode-Switch für Test
if cleanup_result["success"]:
print(f"✅ Umfassendes Cleanup erfolgreich: {', '.join(cleanup_result['operations'])}")
else:
print(f"❌ Umfassendes Cleanup fehlgeschlagen: {', '.join(cleanup_result['errors'])}")
return cleanup_result["success"]
except Exception as e:
print(f"❌ Test 1 fehlgeschlagen: {e}")
return False
def test_concurrent_access():
"""Test des Cleanup-Verhaltens bei gleichzeitigen Datenbankzugriffen"""
print("\n🧪 Test 2: Cleanup bei gleichzeitigen Datenbankzugriffen")
try:
# Worker-Thread der Datenbankoperationen ausführt
def database_worker():
try:
for i in range(5):
conn = sqlite3.connect(DATABASE_PATH, timeout=2)
conn.execute("SELECT COUNT(*) FROM users")
time.sleep(0.5)
conn.close()
print(f" Worker: Datenbankoperation {i+1} abgeschlossen")
except Exception as e:
print(f" Worker-Fehler: {e}")
# Starte Worker-Thread
worker_thread = threading.Thread(target=database_worker, daemon=True)
worker_thread.start()
# Kurz warten damit Worker startet
time.sleep(1)
# Teste Cleanup während Worker läuft
cleanup_manager = DatabaseCleanupManager()
cleanup_result = cleanup_manager.comprehensive_cleanup(force_mode_switch=False)
if cleanup_result["success"]:
print("✅ Cleanup erfolgreich trotz gleichzeitiger Datenbankzugriffe")
else:
print(f"❌ Cleanup fehlgeschlagen: {', '.join(cleanup_result['errors'])}")
# Warte auf Worker
worker_thread.join(timeout=10)
return cleanup_result["success"]
except Exception as e:
print(f"❌ Test 2 fehlgeschlagen: {e}")
return False
def test_error_recovery():
"""Test der Fehlerbehandlung und Recovery-Mechanismen"""
print("\n🧪 Test 3: Fehlerbehandlung und Recovery")
try:
cleanup_manager = DatabaseCleanupManager()
# Teste mit verschiedenen Retry-Parametern
for retry_attempts in [1, 3, 5]:
print(f" Teste mit {retry_attempts} Retry-Versuchen...")
checkpoint_success, checkpoint_error = cleanup_manager.safe_wal_checkpoint(retry_attempts=retry_attempts)
if checkpoint_success:
print(f" ✅ WAL-Checkpoint mit {retry_attempts} Versuchen erfolgreich")
else:
print(f" ⚠️ WAL-Checkpoint mit {retry_attempts} Versuchen: {checkpoint_error}")
return True
except Exception as e:
print(f"❌ Test 3 fehlgeschlagen: {e}")
return False
def test_journal_mode_operations():
"""Test der Journal-Mode-Operationen"""
print("\n🧪 Test 4: Journal-Mode-Operationen")
try:
cleanup_manager = DatabaseCleanupManager()
# Teste aktuellen Journal-Mode
conn = sqlite3.connect(DATABASE_PATH, timeout=5)
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
print(f" Aktueller Journal-Mode: {current_mode}")
conn.close()
# Teste Journal-Mode-Switch (nur wenn bereits WAL-Mode)
if current_mode.upper() == "WAL":
print(" Teste Journal-Mode-Switch...")
# Teste Switch zu WAL (sollte bereits WAL sein)
mode_success, mode_error = cleanup_manager.safe_journal_mode_switch("WAL", retry_attempts=2)
if mode_success:
print(" ✅ Journal-Mode-Switch zu WAL erfolgreich")
else:
print(f" ❌ Journal-Mode-Switch fehlgeschlagen: {mode_error}")
return mode_success
else:
print(f" Database bereits im {current_mode}-Mode, kein Switch-Test nötig")
return True
except Exception as e:
print(f"❌ Test 4 fehlgeschlagen: {e}")
return False
def test_convenience_function():
"""Test der Convenience-Funktion safe_database_cleanup"""
print("\n🧪 Test 5: Convenience-Funktion safe_database_cleanup")
try:
# Teste die einfache Convenience-Funktion
cleanup_result = safe_database_cleanup(force_mode_switch=False)
if cleanup_result["success"]:
print(f"✅ safe_database_cleanup erfolgreich: {', '.join(cleanup_result['operations'])}")
# Prüfe Cleanup-Details
if "timestamp" in cleanup_result:
print(f" Zeitstempel: {cleanup_result['timestamp']}")
if "wal_files_removed" in cleanup_result:
print(f" WAL-Dateien entfernt: {cleanup_result['wal_files_removed']}")
else:
print(f"❌ safe_database_cleanup fehlgeschlagen: {', '.join(cleanup_result['errors'])}")
return cleanup_result["success"]
except Exception as e:
print(f"❌ Test 5 fehlgeschlagen: {e}")
return False
def test_performance():
"""Test der Performance von Cleanup-Operationen"""
print("\n🧪 Test 6: Performance-Test")
try:
cleanup_manager = DatabaseCleanupManager()
# Messe Zeit für verschiedene Operationen
operations = [
("WAL-Checkpoint", lambda: cleanup_manager.safe_wal_checkpoint(retry_attempts=1)),
("Verbindungsschließung", lambda: cleanup_manager.force_close_all_connections(max_wait_seconds=5)),
("Umfassendes Cleanup", lambda: cleanup_manager.comprehensive_cleanup(force_mode_switch=False))
]
for operation_name, operation_func in operations:
start_time = time.time()
try:
result = operation_func()
duration = time.time() - start_time
success = result if isinstance(result, bool) else result[0] if isinstance(result, tuple) else result.get("success", False)
if success:
print(f"{operation_name}: {duration:.3f}s")
else:
print(f" ⚠️ {operation_name}: {duration:.3f}s (mit Problemen)")
except Exception as e:
duration = time.time() - start_time
print(f"{operation_name}: {duration:.3f}s (Fehler: {e})")
return True
except Exception as e:
print(f"❌ Test 6 fehlgeschlagen: {e}")
return False
def main():
"""Hauptfunktion für alle Tests"""
print("🚀 Starte DatabaseCleanupManager Tests")
print(f"Database-Pfad: {DATABASE_PATH}")
print(f"Zeitstempel: {datetime.now().isoformat()}")
print("=" * 60)
# Prüfe ob Datenbankdatei existiert
if not os.path.exists(DATABASE_PATH):
print(f"❌ Datenbankdatei nicht gefunden: {DATABASE_PATH}")
return False
# Führe alle Tests aus
tests = [
("Grundlegende Cleanup-Funktionalität", test_basic_cleanup),
("Cleanup bei gleichzeitigen Zugriffen", test_concurrent_access),
("Fehlerbehandlung und Recovery", test_error_recovery),
("Journal-Mode-Operationen", test_journal_mode_operations),
("Convenience-Funktion", test_convenience_function),
("Performance-Test", test_performance)
]
passed_tests = 0
failed_tests = 0
for test_name, test_func in tests:
try:
if test_func():
passed_tests += 1
print(f"{test_name}: BESTANDEN")
else:
failed_tests += 1
print(f"{test_name}: FEHLGESCHLAGEN")
except Exception as e:
failed_tests += 1
print(f"{test_name}: EXCEPTION - {e}")
print("\n" + "=" * 60)
print(f"📊 Test-Ergebnis: {passed_tests} bestanden, {failed_tests} fehlgeschlagen")
if failed_tests == 0:
print("🎉 Alle Tests bestanden! DatabaseCleanupManager funktioniert korrekt.")
return True
else:
print(f"⚠️ {failed_tests} Test(s) fehlgeschlagen. Überprüfung erforderlich.")
return False
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n🔄 Test durch Benutzer unterbrochen")
sys.exit(130)
except Exception as e:
print(f"💥 Kritischer Fehler beim Testen: {e}")
sys.exit(1)

View File

@@ -1,110 +0,0 @@
#!/usr/bin/env python3
"""
Skript zur Synchronisation der Drucker in der Datenbank mit den hardkodierten Druckern.
Setzt den Status basierend auf der Konfiguration.
"""
import sys
import os
sys.path.append('.')
from utils.settings import PRINTERS
from database.db_manager import DatabaseManager
from models import Printer
from datetime import datetime
def update_printer_status():
"""Aktualisiert den Status aller Drucker basierend auf der hardkodierten Konfiguration."""
print("=== Drucker-Status-Update ===")
print(f"Hardkodierte Drucker: {len(PRINTERS)}")
try:
db = DatabaseManager()
session = db.get_session()
# Alle Drucker aus der Datenbank abrufen
printers = session.query(Printer).all()
print(f"Drucker in Datenbank: {len(printers)}")
updated_count = 0
for printer in printers:
# Prüfen, ob Drucker in der hardkodierten Konfiguration existiert
if printer.name in PRINTERS:
# Drucker ist konfiguriert -> online/verfügbar
old_status = printer.status
printer.status = "available"
printer.active = True
# IP-Adresse aus Konfiguration aktualisieren
config_ip = PRINTERS[printer.name]["ip"]
if printer.ip_address != config_ip:
printer.ip_address = config_ip
print(f"{printer.name}: {old_status} -> available (IP: {config_ip})")
updated_count += 1
else:
# Drucker nicht konfiguriert -> offline
old_status = printer.status
printer.status = "offline"
printer.active = False
print(f"{printer.name}: {old_status} -> offline")
updated_count += 1
# Änderungen speichern
session.commit()
session.close()
print(f"\n{updated_count} Drucker aktualisiert")
print("Status-Update abgeschlossen!")
except Exception as e:
print(f"❌ Fehler beim Update: {e}")
if 'session' in locals():
session.rollback()
session.close()
def list_printer_status():
"""Zeigt den aktuellen Status aller Drucker an."""
print("\n=== Aktueller Drucker-Status ===")
try:
db = DatabaseManager()
session = db.get_session()
printers = session.query(Printer).all()
if not printers:
print("Keine Drucker in der Datenbank gefunden.")
return
print(f"{'Name':<15} {'Status':<12} {'Aktiv':<8} {'IP-Adresse':<15} {'Konfiguriert':<12}")
print("-" * 70)
for printer in printers:
configured = "" if printer.name in PRINTERS else ""
active_str = "" if printer.active else ""
print(f"{printer.name:<15} {printer.status:<12} {active_str:<8} {printer.ip_address or 'N/A':<15} {configured:<12}")
session.close()
except Exception as e:
print(f"❌ Fehler beim Abrufen: {e}")
if 'session' in locals():
session.close()
if __name__ == "__main__":
print("Drucker-Status-Management")
print("=" * 30)
# Aktuellen Status anzeigen
list_printer_status()
# Status aktualisieren
update_printer_status()
# Neuen Status anzeigen
list_printer_status()