📝 MIGRATION_LOG.md: Renamed backend/utils/test_korrekturen.py to MIGRATION_LOG.md

This commit is contained in:
2025-06-11 12:49:58 +02:00
parent 6be0d6ee88
commit 6961732fc8
28 changed files with 933 additions and 1359 deletions

View File

@@ -1,37 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from models import get_db_session, Printer
def aktiviere_alle_drucker():
"""Aktiviert alle Drucker in der Datenbank."""
try:
session = get_db_session()
drucker = session.query(Printer).all()
if not drucker:
print("Keine Drucker in der Datenbank gefunden.")
session.close()
return
print(f"Anzahl Drucker: {len(drucker)}")
print("Aktiviere alle Drucker...")
for d in drucker:
d.active = True
print(f"Drucker {d.id}: {d.name} - IP: {d.plug_ip} - Aktiv: {d.active}")
session.commit()
print("Alle Drucker wurden erfolgreich aktiviert!")
session.close()
except Exception as e:
print(f"Fehler: {str(e)}")
try:
session.rollback()
session.close()
except:
pass
if __name__ == "__main__":
aktiviere_alle_drucker()

View File

@@ -1,336 +0,0 @@
#!/usr/bin/env python3
"""
Robuste Datenbank-Cleanup-Utilities
Verhindert "database is locked" Fehler durch intelligente Retry-Logik und Verbindungsmanagement
"""
import os
import time
import sqlite3
import threading
from datetime import datetime
from typing import Optional, Tuple, List
from contextlib import contextmanager
from sqlalchemy import text, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.pool import StaticPool
from utils.settings import DATABASE_PATH
from utils.logging_config import get_logger
logger = get_logger("database_cleanup")
class DatabaseCleanupManager:
"""
Verwaltet sichere Datenbank-Cleanup-Operationen mit Retry-Logik
Verhindert "database is locked" Fehler durch intelligente Session-Verwaltung
"""
def __init__(self):
self._cleanup_lock = threading.Lock()
self._cleanup_completed = False
self._active_engines = []
def register_engine(self, engine: Engine):
"""Registriert eine Engine für das Cleanup"""
with self._cleanup_lock:
if engine not in self._active_engines:
self._active_engines.append(engine)
def force_close_all_connections(self, max_wait_seconds: int = 10) -> bool:
"""
Schließt alle aktiven Datenbankverbindungen forciert
Args:
max_wait_seconds: Maximale Wartezeit für graceful shutdown
Returns:
bool: True wenn erfolgreich
"""
try:
logger.info("🔄 Schließe alle aktiven Datenbankverbindungen...")
# Alle registrierten Engines disposen
with self._cleanup_lock:
for engine in self._active_engines:
try:
logger.debug(f"Disposing Engine: {engine}")
engine.dispose()
except Exception as e:
logger.warning(f"Fehler beim Engine Dispose: {e}")
self._active_engines.clear()
# Kurz warten damit alle Verbindungen sich schließen können
time.sleep(1)
# Prüfe ob noch WAL-Locks bestehen
wal_path = DATABASE_PATH + "-wal"
shm_path = DATABASE_PATH + "-shm"
start_time = time.time()
while time.time() - start_time < max_wait_seconds:
try:
# Teste kurze Verbindung
test_conn = sqlite3.connect(DATABASE_PATH, timeout=2)
test_conn.execute("BEGIN IMMEDIATE") # Teste exklusiven Zugriff
test_conn.rollback()
test_conn.close()
logger.info("✅ Alle Datenbankverbindungen erfolgreich geschlossen")
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
logger.debug(f"Warte auf Verbindungsschließung... ({time.time() - start_time:.1f}s)")
time.sleep(0.5)
continue
else:
raise
logger.warning(f"⚠️ Timeout beim Warten auf Verbindungsschließung ({max_wait_seconds}s)")
return False
except Exception as e:
logger.error(f"❌ Fehler beim Schließen der Verbindungen: {e}")
return False
def safe_wal_checkpoint(self, retry_attempts: int = 5) -> Tuple[bool, Optional[str]]:
"""
Führt sicheren WAL-Checkpoint mit Retry-Logik durch
Args:
retry_attempts: Anzahl der Wiederholungsversuche
Returns:
Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung)
"""
for attempt in range(retry_attempts):
try:
# Kurze, direkte SQLite-Verbindung für Checkpoint
conn = sqlite3.connect(DATABASE_PATH, timeout=10)
# WAL-Checkpoint mit verschiedenen Strategien
strategies = ["TRUNCATE", "RESTART", "FULL", "PASSIVE"]
for strategy in strategies:
try:
cursor = conn.execute(f"PRAGMA wal_checkpoint({strategy})")
result = cursor.fetchone()
if result and result[0] == 0: # Erfolg (0 = success)
pages_transferred = result[1] if len(result) > 1 else 0
pages_reset = result[2] if len(result) > 2 else 0
if pages_transferred > 0:
logger.info(f"✅ WAL-Checkpoint ({strategy}): {pages_transferred} Seiten übertragen, {pages_reset} Seiten zurückgesetzt")
else:
logger.debug(f"WAL-Checkpoint ({strategy}): Keine Seiten zu übertragen")
conn.close()
return True, None
else:
logger.warning(f"WAL-Checkpoint ({strategy}) unvollständig: {result}")
except Exception as strategy_error:
logger.warning(f"WAL-Checkpoint ({strategy}) fehlgeschlagen: {strategy_error}")
continue
conn.close()
# Wenn alle Strategien fehlschlagen, versuche VACUUM als Fallback
if attempt == 0: # Nur beim ersten Versuch
logger.info("Versuche VACUUM als Fallback...")
conn = sqlite3.connect(DATABASE_PATH, timeout=10)
conn.execute("VACUUM")
conn.close()
logger.info("✅ VACUUM erfolgreich")
return True, None
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
wait_time = (2 ** attempt) * 0.5 # Exponential backoff
logger.warning(f"Database locked - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...")
time.sleep(wait_time)
continue
else:
return False, f"SQLite-Fehler: {e}"
except Exception as e:
return False, f"Unerwarteter Fehler: {e}"
return False, f"Database nach {retry_attempts} Versuchen immer noch gesperrt"
def safe_journal_mode_switch(self, target_mode: str = "DELETE", retry_attempts: int = 3) -> Tuple[bool, Optional[str]]:
"""
Führt sicheren Journal-Mode-Switch mit Retry-Logik durch
Args:
target_mode: Ziel-Journal-Mode (DELETE, WAL, etc.)
retry_attempts: Anzahl der Wiederholungsversuche
Returns:
Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung)
"""
for attempt in range(retry_attempts):
try:
conn = sqlite3.connect(DATABASE_PATH, timeout=15)
# Prüfe aktuellen Journal-Mode
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
logger.debug(f"Aktueller Journal-Mode: {current_mode}")
if current_mode.upper() == target_mode.upper():
logger.info(f"Journal-Mode bereits auf {target_mode}")
conn.close()
return True, None
# Mode-Switch durchführen
result = conn.execute(f"PRAGMA journal_mode={target_mode}").fetchone()
new_mode = result[0] if result else None
conn.close()
if new_mode and new_mode.upper() == target_mode.upper():
logger.info(f"✅ Journal-Mode erfolgreich auf {new_mode} umgeschaltet")
return True, None
else:
logger.warning(f"Journal-Mode-Switch unvollständig: {new_mode} != {target_mode}")
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
wait_time = (2 ** attempt) * 1.0 # Exponential backoff
logger.warning(f"Database locked bei Mode-Switch - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...")
time.sleep(wait_time)
continue
else:
return False, f"SQLite-Fehler: {e}"
except Exception as e:
return False, f"Unerwarteter Fehler: {e}"
return False, f"Journal-Mode-Switch nach {retry_attempts} Versuchen fehlgeschlagen"
def comprehensive_cleanup(self, force_mode_switch: bool = True) -> dict:
"""
Führt umfassendes, sicheres Datenbank-Cleanup durch
Args:
force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll
Returns:
dict: Cleanup-Ergebnis mit Details
"""
with self._cleanup_lock:
if self._cleanup_completed:
logger.info("Datenbank-Cleanup bereits durchgeführt")
return {"success": True, "message": "Bereits durchgeführt", "operations": []}
logger.info("🧹 Starte umfassendes Datenbank-Cleanup...")
operations = []
errors = []
try:
# Schritt 1: Alle Verbindungen schließen
logger.info("📝 Schritt 1: Schließe alle Datenbankverbindungen...")
connection_success = self.force_close_all_connections(max_wait_seconds=15)
if connection_success:
operations.append("Alle Verbindungen geschlossen")
else:
errors.append("Timeout beim Verbindungsschließen")
# Schritt 2: WAL-Checkpoint
logger.info("📝 Schritt 2: Führe WAL-Checkpoint durch...")
checkpoint_success, checkpoint_error = self.safe_wal_checkpoint(retry_attempts=5)
if checkpoint_success:
operations.append("WAL-Checkpoint erfolgreich")
else:
errors.append(f"WAL-Checkpoint fehlgeschlagen: {checkpoint_error}")
# Schritt 3: Journal-Mode-Switch (nur wenn gewünscht und Checkpoint erfolgreich)
if force_mode_switch and checkpoint_success:
logger.info("📝 Schritt 3: Schalte Journal-Mode um...")
mode_success, mode_error = self.safe_journal_mode_switch("DELETE", retry_attempts=3)
if mode_success:
operations.append("Journal-Mode auf DELETE umgeschaltet")
else:
errors.append(f"Journal-Mode-Switch fehlgeschlagen: {mode_error}")
logger.warning(f"Journal-Mode-Switch fehlgeschlagen, aber WAL-Checkpoint war erfolgreich")
# Schritt 4: Finale Optimierungen (nur bei Erfolg)
if checkpoint_success:
logger.info("📝 Schritt 4: Finale Optimierungen...")
try:
conn = sqlite3.connect(DATABASE_PATH, timeout=5)
conn.execute("PRAGMA optimize")
conn.close()
operations.append("Datenbank optimiert")
except Exception as opt_error:
logger.warning(f"Optimierung fehlgeschlagen: {opt_error}")
# Schritt 5: Prüfe Ergebnis
wal_path = DATABASE_PATH + "-wal"
shm_path = DATABASE_PATH + "-shm"
wal_exists = os.path.exists(wal_path)
shm_exists = os.path.exists(shm_path)
if not wal_exists and not shm_exists:
operations.append("WAL/SHM-Dateien erfolgreich entfernt")
logger.info("✅ WAL- und SHM-Dateien erfolgreich entfernt")
elif force_mode_switch:
errors.append(f"WAL/SHM-Dateien bestehen noch (WAL: {wal_exists}, SHM: {shm_exists})")
else:
logger.info("WAL/SHM-Dateien bleiben bestehen (kein Mode-Switch angefordert)")
self._cleanup_completed = True
# Erfolgsstatus bestimmen
success = len(operations) > 0 and (not force_mode_switch or not wal_exists)
result = {
"success": success,
"operations": operations,
"errors": errors,
"timestamp": datetime.now().isoformat(),
"wal_files_removed": not wal_exists and not shm_exists
}
if success:
logger.info(f"✅ Datenbank-Cleanup erfolgreich: {', '.join(operations)}")
else:
logger.error(f"❌ Datenbank-Cleanup mit Fehlern: {', '.join(errors)}")
return result
except Exception as e:
error_msg = f"Kritischer Fehler beim Datenbank-Cleanup: {e}"
logger.error(f"{error_msg}")
return {
"success": False,
"operations": operations,
"errors": errors + [error_msg],
"timestamp": datetime.now().isoformat()
}
# Globale Instanz
cleanup_manager = DatabaseCleanupManager()
def get_cleanup_manager() -> DatabaseCleanupManager:
"""Gibt die globale Cleanup-Manager-Instanz zurück"""
return cleanup_manager
def safe_database_cleanup(force_mode_switch: bool = True) -> dict:
"""
Convenience-Funktion für sicheres Datenbank-Cleanup
Args:
force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll
Returns:
dict: Cleanup-Ergebnis
"""
return cleanup_manager.comprehensive_cleanup(force_mode_switch=force_mode_switch)

View File

@@ -1,133 +0,0 @@
import os
import logging
from typing import List, Optional, Any
from datetime import datetime
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker, Session, joinedload
from models import User, Printer, Job, Stats, Base
from utils.settings import DATABASE_PATH, ensure_database_directory
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Database manager class to handle database operations."""
def __init__(self):
"""Initialize the database manager."""
ensure_database_directory()
self.engine = create_engine(f"sqlite:///{DATABASE_PATH}")
self.Session = sessionmaker(bind=self.engine)
def get_session(self) -> Session:
"""Get a new database session.
Returns:
Session: A new SQLAlchemy session.
"""
return self.Session()
def test_connection(self) -> bool:
"""Test the database connection.
Returns:
bool: True if the connection is successful, False otherwise.
"""
try:
session = self.get_session()
session.execute("SELECT 1")
session.close()
return True
except Exception as e:
logger.error(f"Database connection test failed: {str(e)}")
return False
def get_all_jobs(self) -> List[Job]:
"""Get all jobs with eager loading of relationships.
Returns:
List[Job]: A list of all jobs.
"""
session = self.get_session()
try:
jobs = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).all()
return jobs
finally:
session.close()
def get_jobs_by_status(self, status: str) -> List[Job]:
"""Get jobs by status with eager loading of relationships.
Args:
status: The job status to filter by.
Returns:
List[Job]: A list of jobs with the specified status.
"""
session = self.get_session()
try:
jobs = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(Job.status == status).all()
return jobs
finally:
session.close()
def get_job_by_id(self, job_id: int) -> Optional[Job]:
"""Get a job by ID with eager loading of relationships.
Args:
job_id: The job ID to find.
Returns:
Optional[Job]: The job if found, None otherwise.
"""
session = self.get_session()
try:
job = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(Job.id == job_id).first()
return job
finally:
session.close()
def get_available_printers(self) -> List[Printer]:
"""Get all available printers.
Returns:
List[Printer]: A list of available printers.
"""
session = self.get_session()
try:
printers = session.query(Printer).filter(
Printer.active == True,
Printer.status != "busy"
).all()
return printers
finally:
session.close()
def get_jobs_since(self, since_date: datetime) -> List[Job]:
"""Get jobs created since a specific date.
Args:
since_date: The date to filter jobs from.
Returns:
List[Job]: A list of jobs created since the specified date.
"""
session = self.get_session()
try:
jobs = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(Job.created_at >= since_date).all()
return jobs
finally:
session.close()

View File

@@ -0,0 +1,405 @@
#!/usr/bin/env python3.11
"""
Development Utilities - Konsolidierte Entwicklungs- und Test-Hilfsfunktionen
Zusammenfassung von Datenbank-Tests, Session-Fixes und anderen Entwicklungstools
"""
import re
import os
import sys
from utils.logging_config import get_logger
# Logger initialisieren
logger = get_logger("development_utilities")
# ===== DATENBANK-TESTS =====
def test_database_connectivity():
"""
Testet die grundlegende Datenbank-Konnektivität.
Returns:
dict: Test-Ergebnisse
"""
try:
from models import get_cached_session, User, Printer, Job
logger.info("=== DATENBANK-KONNEKTIVITÄTS-TEST ===")
results = {
'success': True,
'tests': {},
'errors': []
}
with get_cached_session() as session:
# Test User-Query
try:
users = session.query(User).limit(5).all()
results['tests']['users'] = {
'success': True,
'count': len(users),
'sample': users[0].username if users else None
}
logger.info(f"✓ User-Abfrage erfolgreich - {len(users)} Benutzer gefunden")
if users:
user = users[0]
logger.info(f"✓ Test-User: {user.username} ({user.email})")
logger.info(f"✓ updated_at-Feld: {user.updated_at}")
except Exception as e:
results['tests']['users'] = {'success': False, 'error': str(e)}
results['errors'].append(f"User-Test: {str(e)}")
logger.error(f"❌ User-Test fehlgeschlagen: {str(e)}")
# Test Printer-Query
try:
printers = session.query(Printer).limit(5).all()
results['tests']['printers'] = {
'success': True,
'count': len(printers),
'sample': printers[0].name if printers else None
}
logger.info(f"✓ Printer-Abfrage erfolgreich - {len(printers)} Drucker gefunden")
except Exception as e:
results['tests']['printers'] = {'success': False, 'error': str(e)}
results['errors'].append(f"Printer-Test: {str(e)}")
logger.error(f"❌ Printer-Test fehlgeschlagen: {str(e)}")
# Test Job-Query
try:
jobs = session.query(Job).limit(5).all()
results['tests']['jobs'] = {
'success': True,
'count': len(jobs),
'sample': jobs[0].title if jobs else None
}
logger.info(f"✓ Job-Abfrage erfolgreich - {len(jobs)} Jobs gefunden")
except Exception as e:
results['tests']['jobs'] = {'success': False, 'error': str(e)}
results['errors'].append(f"Job-Test: {str(e)}")
logger.error(f"❌ Job-Test fehlgeschlagen: {str(e)}")
if results['errors']:
results['success'] = False
logger.error("❌ DATENBANK-TESTS TEILWEISE FEHLGESCHLAGEN")
else:
logger.info("🎉 ALLE DATENBANK-TESTS ERFOLGREICH!")
logger.info("Die Anwendung sollte jetzt ohne Fehler starten.")
return results
except Exception as e:
logger.error(f"❌ KRITISCHER DATENBANK-TEST-FEHLER: {str(e)}")
return {
'success': False,
'tests': {},
'errors': [f"Kritischer Fehler: {str(e)}"]
}
def test_database_schema():
"""
Testet die Datenbank-Schema-Integrität.
Returns:
dict: Schema-Test-Ergebnisse
"""
try:
from models import get_cached_session, User, Printer, Job, GuestRequest
from sqlalchemy import inspect
logger.info("=== DATENBANK-SCHEMA-TEST ===")
results = {
'success': True,
'tables': {},
'errors': []
}
with get_cached_session() as session:
inspector = inspect(session.bind)
# Erwartete Tabellen
expected_tables = ['users', 'printers', 'jobs', 'guest_requests', 'system_logs']
for table_name in expected_tables:
try:
if inspector.has_table(table_name):
columns = inspector.get_columns(table_name)
results['tables'][table_name] = {
'exists': True,
'columns': len(columns),
'column_names': [col['name'] for col in columns]
}
logger.info(f"✓ Tabelle '{table_name}': {len(columns)} Spalten")
else:
results['tables'][table_name] = {'exists': False}
results['errors'].append(f"Tabelle '{table_name}' nicht gefunden")
logger.error(f"❌ Tabelle '{table_name}' nicht gefunden")
except Exception as e:
results['tables'][table_name] = {'exists': False, 'error': str(e)}
results['errors'].append(f"Tabelle '{table_name}': {str(e)}")
logger.error(f"❌ Fehler bei Tabelle '{table_name}': {str(e)}")
if results['errors']:
results['success'] = False
logger.error("❌ SCHEMA-TESTS TEILWEISE FEHLGESCHLAGEN")
else:
logger.info("🎉 ALLE SCHEMA-TESTS ERFOLGREICH!")
return results
except Exception as e:
logger.error(f"❌ KRITISCHER SCHEMA-TEST-FEHLER: {str(e)}")
return {
'success': False,
'tables': {},
'errors': [f"Kritischer Fehler: {str(e)}"]
}
# ===== SESSION-FIXES =====
def fix_session_usage_in_file(file_path):
"""
Behebt Session-Usage in einer Datei durch Konvertierung zu Context Manager Pattern.
Args:
file_path (str): Pfad zur zu reparierenden Datei
Returns:
dict: Reparatur-Ergebnisse
"""
try:
if not os.path.exists(file_path):
return {
'success': False,
'message': f'Datei nicht gefunden: {file_path}',
'changes_made': False
}
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Pattern für direkte Session-Aufrufe
patterns = [
# session = get_cached_session() -> with get_cached_session() as session:
(r'(\s+)session = get_cached_session\(\)', r'\1with get_cached_session() as session:'),
# session.close() entfernen (wird automatisch durch Context Manager gemacht)
(r'\s+session\.close\(\)\s*\n', '\n'),
]
original_content = content
changes_count = 0
for pattern, replacement in patterns:
new_content = re.sub(pattern, replacement, content, flags=re.MULTILINE)
if new_content != content:
changes_count += 1
content = new_content
# Nur schreiben wenn sich etwas geändert hat
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"{file_path} wurde aktualisiert ({changes_count} Änderungen)")
return {
'success': True,
'message': f'Datei erfolgreich aktualisiert',
'changes_made': True,
'changes_count': changes_count
}
else:
logger.info(f" {file_path} benötigt keine Änderungen")
return {
'success': True,
'message': 'Keine Änderungen erforderlich',
'changes_made': False,
'changes_count': 0
}
except Exception as e:
logger.error(f"❌ Fehler beim Reparieren von {file_path}: {str(e)}")
return {
'success': False,
'message': f'Fehler: {str(e)}',
'changes_made': False
}
def fix_session_usage_bulk(directory_path=None):
"""
Repariert Session-Usage in mehreren Dateien.
Args:
directory_path (str): Verzeichnis zum Durchsuchen (Standard: blueprints/)
Returns:
dict: Bulk-Reparatur-Ergebnisse
"""
if directory_path is None:
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
directory_path = os.path.join(backend_dir, 'blueprints')
results = {
'success': True,
'files_processed': 0,
'files_changed': 0,
'total_changes': 0,
'errors': []
}
try:
for root, dirs, files in os.walk(directory_path):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
result = fix_session_usage_in_file(file_path)
results['files_processed'] += 1
if result['success']:
if result['changes_made']:
results['files_changed'] += 1
results['total_changes'] += result.get('changes_count', 0)
else:
results['errors'].append(f"{file_path}: {result['message']}")
results['success'] = False
logger.info(f"Bulk-Reparatur abgeschlossen: {results['files_processed']} Dateien verarbeitet, {results['files_changed']} geändert")
except Exception as e:
logger.error(f"❌ Fehler bei Bulk-Reparatur: {str(e)}")
results['success'] = False
results['errors'].append(f"Bulk-Fehler: {str(e)}")
return results
# ===== CODE-QUALITÄT =====
def analyze_code_quality(file_path):
"""
Analysiert die Code-Qualität einer Python-Datei.
Args:
file_path (str): Pfad zur zu analysierenden Datei
Returns:
dict: Code-Qualitäts-Analyse
"""
try:
if not os.path.exists(file_path):
return {
'success': False,
'message': f'Datei nicht gefunden: {file_path}'
}
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
analysis = {
'success': True,
'file_path': file_path,
'metrics': {
'total_lines': len(lines),
'code_lines': len([line for line in lines if line.strip() and not line.strip().startswith('#')]),
'comment_lines': len([line for line in lines if line.strip().startswith('#')]),
'empty_lines': len([line for line in lines if not line.strip()]),
'functions': len(re.findall(r'^\s*def\s+\w+', content, re.MULTILINE)),
'classes': len(re.findall(r'^\s*class\s+\w+', content, re.MULTILINE)),
'imports': len(re.findall(r'^\s*(import|from)\s+', content, re.MULTILINE))
},
'issues': []
}
# Potentielle Probleme identifizieren
if analysis['metrics']['total_lines'] > 1000:
analysis['issues'].append('Datei sehr groß (>1000 Zeilen) - Aufteilung empfohlen')
if analysis['metrics']['functions'] > 50:
analysis['issues'].append('Viele Funktionen (>50) - Modularisierung empfohlen')
# TODO-Kommentare finden
todos = re.findall(r'#.*TODO.*', content, re.IGNORECASE)
if todos:
analysis['metrics']['todos'] = len(todos)
analysis['issues'].append(f'{len(todos)} TODO-Kommentare gefunden')
return analysis
except Exception as e:
logger.error(f"❌ Fehler bei Code-Qualitäts-Analyse von {file_path}: {str(e)}")
return {
'success': False,
'message': f'Fehler: {str(e)}'
}
# ===== CLI INTERFACE =====
if __name__ == "__main__":
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "test-db":
result = test_database_connectivity()
if result['success']:
print("🎉 Datenbank-Tests erfolgreich!")
else:
print("❌ Datenbank-Tests fehlgeschlagen:")
for error in result['errors']:
print(f" - {error}")
elif command == "test-schema":
result = test_database_schema()
if result['success']:
print("🎉 Schema-Tests erfolgreich!")
else:
print("❌ Schema-Tests fehlgeschlagen:")
for error in result['errors']:
print(f" - {error}")
elif command == "fix-sessions":
if len(sys.argv) > 2:
file_path = sys.argv[2]
result = fix_session_usage_in_file(file_path)
print(f"{'' if result['success'] else ''} {result['message']}")
else:
result = fix_session_usage_bulk()
print(f"Bulk-Reparatur: {result['files_changed']}/{result['files_processed']} Dateien geändert")
elif command == "analyze":
if len(sys.argv) > 2:
file_path = sys.argv[2]
result = analyze_code_quality(file_path)
if result['success']:
metrics = result['metrics']
print(f"=== Code-Analyse: {os.path.basename(file_path)} ===")
print(f"Zeilen gesamt: {metrics['total_lines']}")
print(f"Code-Zeilen: {metrics['code_lines']}")
print(f"Funktionen: {metrics['functions']}")
print(f"Klassen: {metrics['classes']}")
if result['issues']:
print("Probleme:")
for issue in result['issues']:
print(f" ⚠️ {issue}")
else:
print(f"{result['message']}")
else:
print("Verwendung: python3.11 development_utilities.py analyze <datei>")
else:
print("Verfügbare Kommandos:")
print(" test-db - Testet Datenbank-Konnektivität")
print(" test-schema - Testet Datenbank-Schema")
print(" fix-sessions [datei] - Repariert Session-Usage")
print(" analyze <datei> - Analysiert Code-Qualität")
else:
print("Verwendung: python3.11 development_utilities.py <command>")
print("Verfügbare Kommandos: test-db, test-schema, fix-sessions, analyze")

View File

@@ -1,22 +0,0 @@
#!/usr/bin/env python3
"""Entferne problematischen CSRF-Error-Handler aus app.py"""
import re
# Lese die Backup-Datei
with open('app_backup.py', 'r', encoding='utf-8') as f:
content = f.read()
# Entferne den CSRF-Error-Handler-Block
# Suche nach @csrf.error_handler bis zum ersten leeren Zeilen-Block
pattern = r'@csrf\.error_handler.*?(?=\n\n|\n# [A-Z])'
content = re.sub(pattern, '', content, flags=re.DOTALL)
# Entferne auch mögliche doppelte Leerzeilen
content = re.sub(r'\n\n\n+', '\n\n', content)
# Schreibe die bereinigte Version
with open('app.py', 'w', encoding='utf-8') as f:
f.write(content)
print("CSRF-Error-Handler erfolgreich entfernt!")

View File

@@ -1,29 +0,0 @@
#!/usr/bin/env python3.11
"""
Skript zur Behebung von Einrückungsproblemen in user_management.py
"""
def fix_indentation():
file_path = 'blueprints/user_management.py'
with open(file_path, 'r') as f:
content = f.read()
lines = content.split('\n')
fixed_lines = []
for line in lines:
# Behebe die falsche Einrückung nach 'with get_cached_session() as session:'
if line.startswith(' ') and not line.strip().startswith('#'):
# 7 Leerzeichen entfernen (von 15 auf 8)
fixed_lines.append(' ' + line[15:])
else:
fixed_lines.append(line)
with open(file_path, 'w') as f:
f.write('\n'.join(fixed_lines))
print('✅ Einrückung behoben')
if __name__ == "__main__":
fix_indentation()

View File

@@ -1,62 +0,0 @@
#!/usr/bin/env python3.11
"""
Skript zur automatischen Behebung von get_cached_session() Aufrufen
Konvertiert direkte Session-Aufrufe zu Context Manager Pattern.
Autor: MYP Team
Datum: 2025-06-09
"""
import re
import os
def fix_session_usage(file_path):
"""Behebt Session-Usage in einer Datei"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Pattern für direkte Session-Aufrufe
patterns = [
# session = get_cached_session() -> with get_cached_session() as session:
(r'(\s+)session = get_cached_session\(\)', r'\1with get_cached_session() as session:'),
# session.close() entfernen (wird automatisch durch Context Manager gemacht)
(r'\s+session\.close\(\)\s*\n', '\n'),
# Einrückung nach with-Statement anpassen
(r'(with get_cached_session\(\) as session:\s*\n)(\s+)([^\s])',
lambda m: m.group(1) + m.group(2) + ' ' + m.group(3))
]
original_content = content
for pattern, replacement in patterns:
if callable(replacement):
content = re.sub(pattern, replacement, content, flags=re.MULTILINE)
else:
content = re.sub(pattern, replacement, content, flags=re.MULTILINE)
# Nur schreiben wenn sich etwas geändert hat
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"{file_path} wurde aktualisiert")
return True
else:
print(f" {file_path} benötigt keine Änderungen")
return False
def main():
"""Hauptfunktion"""
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
user_mgmt_file = os.path.join(backend_dir, 'blueprints', 'user_management.py')
if os.path.exists(user_mgmt_file):
print(f"Bearbeite {user_mgmt_file}...")
fix_session_usage(user_mgmt_file)
else:
print(f"❌ Datei nicht gefunden: {user_mgmt_file}")
if __name__ == "__main__":
main()

View File

@@ -1,25 +0,0 @@
#!/usr/bin/env python3.11
from models import init_database, create_initial_admin
if __name__ == "__main__":
print("Initialisiere Datenbank...")
init_database()
print("Erstelle initialen Admin-Benutzer...")
success = create_initial_admin(
email="admin@mercedes-benz.com",
password="744563017196A",
name="System Administrator",
username="admin"
)
if success:
print("Admin-Benutzer erfolgreich erstellt.")
print("Login-Daten:")
print(" Benutzername: admin")
print(" Passwort: 744563017196A")
else:
print("Admin-Benutzer konnte nicht erstellt werden (existiert bereits?).")
print("Datenbank-Initialisierung abgeschlossen.")

View File

@@ -1,30 +0,0 @@
"""
Performance Monitor für Production Environment
Minimale Implementierung für Mercedes-Benz TBA Marienfelde
"""
from utils.logging_config import get_logger
logger = get_logger("performance_monitor")
def init_performance_monitoring(app):
"""
Initialisiert Performance-Monitoring für die Flask-App
Args:
app: Flask-App-Instanz
"""
try:
# Basic Performance-Monitoring Setup
logger.info("[PERF] Performance-Monitoring wird initialisiert...")
# Optional: Hier könnten weitere Performance-Monitoring-Tools integriert werden
# Für Air-Gapped Environment halten wir es minimal
app.config['PERFORMANCE_MONITORING_ENABLED'] = True
logger.info("[PERF] ✅ Performance-Monitoring erfolgreich initialisiert")
except Exception as e:
logger.error(f"[PERF] ❌ Fehler bei Performance-Monitoring-Initialisierung: {str(e)}")
app.config['PERFORMANCE_MONITORING_ENABLED'] = False

View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3.11
"""
Printer Utilities - Konsolidierte Drucker-Management-Hilfsfunktionen
Zusammenfassung von Drucker-Aktivierung und Standort-Updates
"""
from models import get_db_session, Printer
from utils.logging_config import get_logger
from datetime import datetime
# Logger initialisieren
logger = get_logger("printer_utilities")
# ===== DRUCKER AKTIVIERUNG =====
def aktiviere_alle_drucker():
"""
Aktiviert alle Drucker in der Datenbank.
Returns:
dict: Ergebnis der Aktivierung mit Statistiken
"""
try:
session = get_db_session()
drucker = session.query(Printer).all()
if not drucker:
logger.warning("Keine Drucker in der Datenbank gefunden.")
session.close()
return {
'success': False,
'message': 'Keine Drucker gefunden',
'activated_count': 0
}
logger.info(f"Anzahl Drucker: {len(drucker)}")
logger.info("Aktiviere alle Drucker...")
activated_count = 0
for d in drucker:
if not d.active:
d.active = True
activated_count += 1
logger.info(f"Drucker {d.id}: {d.name} - IP: {d.plug_ip} - Aktiviert")
else:
logger.debug(f"Drucker {d.id}: {d.name} - Bereits aktiv")
session.commit()
session.close()
logger.info(f"{activated_count} Drucker wurden erfolgreich aktiviert!")
return {
'success': True,
'message': f'{activated_count} Drucker aktiviert',
'activated_count': activated_count,
'total_count': len(drucker)
}
except Exception as e:
logger.error(f"Fehler bei Drucker-Aktivierung: {str(e)}")
try:
session.rollback()
session.close()
except:
pass
return {
'success': False,
'message': f'Fehler: {str(e)}',
'activated_count': 0
}
def deaktiviere_alle_drucker():
"""
Deaktiviert alle Drucker in der Datenbank.
Returns:
dict: Ergebnis der Deaktivierung mit Statistiken
"""
try:
session = get_db_session()
drucker = session.query(Printer).all()
if not drucker:
logger.warning("Keine Drucker in der Datenbank gefunden.")
session.close()
return {
'success': False,
'message': 'Keine Drucker gefunden',
'deactivated_count': 0
}
logger.info(f"Anzahl Drucker: {len(drucker)}")
logger.info("Deaktiviere alle Drucker...")
deactivated_count = 0
for d in drucker:
if d.active:
d.active = False
deactivated_count += 1
logger.info(f"Drucker {d.id}: {d.name} - IP: {d.plug_ip} - Deaktiviert")
else:
logger.debug(f"Drucker {d.id}: {d.name} - Bereits inaktiv")
session.commit()
session.close()
logger.info(f"{deactivated_count} Drucker wurden erfolgreich deaktiviert!")
return {
'success': True,
'message': f'{deactivated_count} Drucker deaktiviert',
'deactivated_count': deactivated_count,
'total_count': len(drucker)
}
except Exception as e:
logger.error(f"Fehler bei Drucker-Deaktivierung: {str(e)}")
try:
session.rollback()
session.close()
except:
pass
return {
'success': False,
'message': f'Fehler: {str(e)}',
'deactivated_count': 0
}
# ===== STANDORT-MANAGEMENT =====
def update_printer_locations(new_location="Werk 040 - Berlin - TBA"):
"""
Aktualisiert alle Drucker-Standorte zu einem neuen Standort.
Args:
new_location (str): Neuer Standort für alle Drucker
Returns:
dict: Ergebnis der Standort-Aktualisierung mit Statistiken
"""
try:
session = get_db_session()
# Alle Drucker abrufen
all_printers = session.query(Printer).all()
logger.info(f"Gefundene Drucker: {len(all_printers)}")
if not all_printers:
logger.warning("Keine Drucker in der Datenbank gefunden.")
session.close()
return {
'success': False,
'message': 'Keine Drucker gefunden',
'updated_count': 0
}
updated_count = 0
location_changes = []
# Alle Drucker durchgehen und Standort aktualisieren
for printer in all_printers:
old_location = printer.location
if old_location != new_location:
printer.location = new_location
location_changes.append({
'printer_id': printer.id,
'printer_name': printer.name,
'old_location': old_location,
'new_location': new_location
})
logger.info(f"{printer.name}: '{old_location}''{new_location}'")
updated_count += 1
else:
logger.debug(f"Drucker {printer.name}: Standort bereits korrekt")
# Änderungen speichern
session.commit()
session.close()
logger.info(f"{updated_count} Drucker-Standorte erfolgreich aktualisiert")
logger.info(f"Neuer Standort: {new_location}")
return {
'success': True,
'message': f'{updated_count} Standorte aktualisiert',
'updated_count': updated_count,
'total_count': len(all_printers),
'new_location': new_location,
'changes': location_changes
}
except Exception as e:
logger.error(f"❌ Fehler bei der Standort-Aktualisierung: {e}")
try:
session.rollback()
session.close()
except:
pass
return {
'success': False,
'message': f'Fehler: {str(e)}',
'updated_count': 0
}
def get_printer_locations():
"""
Gibt eine Übersicht aller Drucker-Standorte zurück.
Returns:
dict: Standort-Statistiken
"""
try:
session = get_db_session()
all_printers = session.query(Printer).all()
session.close()
if not all_printers:
return {
'success': False,
'message': 'Keine Drucker gefunden',
'locations': {}
}
# Standorte gruppieren
locations = {}
for printer in all_printers:
location = printer.location or 'Unbekannt'
if location not in locations:
locations[location] = []
locations[location].append({
'id': printer.id,
'name': printer.name,
'active': printer.active,
'plug_ip': printer.plug_ip
})
return {
'success': True,
'total_printers': len(all_printers),
'locations': locations,
'location_count': len(locations)
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der Standorte: {str(e)}")
return {
'success': False,
'message': f'Fehler: {str(e)}',
'locations': {}
}
# ===== STATUS UND STATISTIKEN =====
def get_printer_status_summary():
"""
Gibt eine Zusammenfassung des Drucker-Status zurück.
Returns:
dict: Status-Zusammenfassung
"""
try:
session = get_db_session()
all_printers = session.query(Printer).all()
session.close()
if not all_printers:
return {
'success': False,
'message': 'Keine Drucker gefunden',
'summary': {}
}
active_count = sum(1 for p in all_printers if p.active)
inactive_count = len(all_printers) - active_count
# Standort-Verteilung
location_distribution = {}
for printer in all_printers:
location = printer.location or 'Unbekannt'
location_distribution[location] = location_distribution.get(location, 0) + 1
return {
'success': True,
'summary': {
'total_printers': len(all_printers),
'active_printers': active_count,
'inactive_printers': inactive_count,
'locations': location_distribution,
'last_updated': datetime.now().isoformat()
}
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der Status-Zusammenfassung: {str(e)}")
return {
'success': False,
'message': f'Fehler: {str(e)}',
'summary': {}
}
# ===== CLI INTERFACE =====
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "activate-all":
result = aktiviere_alle_drucker()
print(f"{result['message']}")
elif command == "deactivate-all":
result = deaktiviere_alle_drucker()
print(f"{result['message']}")
elif command == "update-locations":
new_location = sys.argv[2] if len(sys.argv) > 2 else "Werk 040 - Berlin - TBA"
result = update_printer_locations(new_location)
print(f"{result['message']}")
elif command == "locations":
result = get_printer_locations()
if result['success']:
print("=== Drucker-Standorte ===")
for location, printers in result['locations'].items():
print(f"\n📍 {location} ({len(printers)} Drucker):")
for printer in printers:
status = "🟢" if printer['active'] else "🔴"
print(f" {status} {printer['name']} (ID: {printer['id']}, IP: {printer['plug_ip']})")
else:
print(f"{result['message']}")
elif command == "status":
result = get_printer_status_summary()
if result['success']:
summary = result['summary']
print("=== Drucker-Status ===")
print(f"Gesamt: {summary['total_printers']}")
print(f"Aktiv: {summary['active_printers']} 🟢")
print(f"Inaktiv: {summary['inactive_printers']} 🔴")
print(f"Standorte: {len(summary['locations'])}")
print(f"Letzte Aktualisierung: {summary['last_updated']}")
else:
print(f"{result['message']}")
else:
print("Verfügbare Kommandos:")
print(" activate-all - Aktiviert alle Drucker")
print(" deactivate-all - Deaktiviert alle Drucker")
print(" update-locations [STANDORT] - Aktualisiert alle Standorte")
print(" locations - Zeigt Standort-Übersicht")
print(" status - Zeigt Status-Zusammenfassung")
else:
print("Verwendung: python3.11 printer_utilities.py <command>")
print("Verfügbare Kommandos: activate-all, deactivate-all, update-locations, locations, status")

View File

@@ -1,50 +0,0 @@
"""
Scheduler utility functions for the admin panel.
"""
from utils.job_scheduler import scheduler
def scheduler_is_running():
"""
Überprüft, ob der Job-Scheduler läuft.
Returns:
bool: True wenn der Scheduler aktiv ist, sonst False
"""
return scheduler.is_running()
def start_scheduler():
"""
Startet den Job-Scheduler.
Returns:
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
"""
return scheduler.start()
def stop_scheduler():
"""
Stoppt den Job-Scheduler.
Returns:
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
"""
return scheduler.stop()
def get_scheduler_uptime():
"""
Gibt die Laufzeit des Schedulers zurück.
Returns:
str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft
"""
return scheduler.get_uptime()
def get_scheduler_tasks():
"""
Gibt alle registrierten Tasks im Scheduler zurück.
Returns:
dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte
"""
return scheduler.get_tasks()

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3.11
"""
System Utilities - Konsolidierte System-Hilfsfunktionen
Zusammenfassung von performance_monitor, scheduler und init_db Funktionalitäten
"""
from utils.logging_config import get_logger
from utils.job_scheduler import scheduler
from models import init_database, create_initial_admin
# Logger initialisieren
logger = get_logger("system_utilities")
# ===== PERFORMANCE MONITORING =====
def init_performance_monitoring(app):
"""
Initialisiert Performance-Monitoring für die Flask-App
Args:
app: Flask-App-Instanz
"""
try:
# Basic Performance-Monitoring Setup
logger.info("[PERF] Performance-Monitoring wird initialisiert...")
# Optional: Hier könnten weitere Performance-Monitoring-Tools integriert werden
# Für Air-Gapped Environment halten wir es minimal
app.config['PERFORMANCE_MONITORING_ENABLED'] = True
logger.info("[PERF] ✅ Performance-Monitoring erfolgreich initialisiert")
except Exception as e:
logger.error(f"[PERF] ❌ Fehler bei Performance-Monitoring-Initialisierung: {str(e)}")
app.config['PERFORMANCE_MONITORING_ENABLED'] = False
# ===== SCHEDULER UTILITIES =====
def scheduler_is_running():
"""
Überprüft, ob der Job-Scheduler läuft.
Returns:
bool: True wenn der Scheduler aktiv ist, sonst False
"""
return scheduler.is_running()
def start_scheduler():
"""
Startet den Job-Scheduler.
Returns:
bool: True wenn erfolgreich gestartet, False wenn bereits läuft
"""
return scheduler.start()
def stop_scheduler():
"""
Stoppt den Job-Scheduler.
Returns:
bool: True wenn erfolgreich gestoppt, False wenn nicht läuft
"""
return scheduler.stop()
def get_scheduler_uptime():
"""
Gibt die Laufzeit des Schedulers zurück.
Returns:
str: Formatierte Laufzeit oder None, wenn der Scheduler nicht läuft
"""
return scheduler.get_uptime()
def get_scheduler_tasks():
"""
Gibt alle registrierten Tasks im Scheduler zurück.
Returns:
dict: Dictionary mit Task-IDs als Schlüssel und Task-Konfigurationen als Werte
"""
return scheduler.get_tasks()
# ===== DATABASE INITIALIZATION =====
def initialize_database_with_admin():
"""
Initialisiert die Datenbank und erstellt einen initialen Admin-Benutzer.
Returns:
bool: True wenn erfolgreich, False bei Fehlern
"""
try:
logger.info("Initialisiere Datenbank...")
init_database()
logger.info("Erstelle initialen Admin-Benutzer...")
success = create_initial_admin(
email="admin@mercedes-benz.com",
password="744563017196A",
name="System Administrator",
username="admin"
)
if success:
logger.info("Admin-Benutzer erfolgreich erstellt.")
logger.info("Login-Daten: Benutzername: admin, Passwort: 744563017196A")
else:
logger.warning("Admin-Benutzer konnte nicht erstellt werden (existiert bereits?).")
logger.info("Datenbank-Initialisierung abgeschlossen.")
return True
except Exception as e:
logger.error(f"Fehler bei Datenbank-Initialisierung: {str(e)}")
return False
# ===== SYSTEM STATUS =====
def get_system_status():
"""
Gibt den aktuellen System-Status zurück.
Returns:
dict: System-Status-Informationen
"""
return {
'scheduler_running': scheduler_is_running(),
'scheduler_uptime': get_scheduler_uptime(),
'scheduler_tasks': len(get_scheduler_tasks()) if get_scheduler_tasks() else 0,
'performance_monitoring': True # Immer aktiviert in dieser Version
}
# ===== CLI INTERFACE =====
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "init-db":
initialize_database_with_admin()
elif command == "status":
status = get_system_status()
print("=== System Status ===")
print(f"Scheduler läuft: {'' if status['scheduler_running'] else ''}")
print(f"Scheduler Uptime: {status['scheduler_uptime'] or 'N/A'}")
print(f"Scheduler Tasks: {status['scheduler_tasks']}")
print(f"Performance Monitoring: {'' if status['performance_monitoring'] else ''}")
else:
print("Verfügbare Kommandos:")
print(" init-db - Initialisiert Datenbank mit Admin-Benutzer")
print(" status - Zeigt System-Status an")
else:
print("Verwendung: python3.11 system_utilities.py <command>")
print("Verfügbare Kommandos: init-db, status")

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env python3
"""
Test-Script für die Datenbank-Reparatur
"""
import sys
import os
# Pfad zur App hinzufügen
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_database_fix():
"""Testet ob die Datenbank-Reparatur erfolgreich war."""
try:
from models import get_cached_session, User, Printer, Job
print("=== DATENBANK-TEST NACH REPARATUR ===")
with get_cached_session() as session:
# Test User-Query (das war das ursprüngliche Problem)
users = session.query(User).limit(5).all()
print(f"✓ User-Abfrage erfolgreich - {len(users)} Benutzer gefunden")
# Details des ersten Users anzeigen (falls vorhanden)
if users:
user = users[0]
print(f"✓ Test-User: {user.username} ({user.email})")
print(f"✓ updated_at-Feld: {user.updated_at}")
# Test Printer-Query
printers = session.query(Printer).limit(5).all()
print(f"✓ Printer-Abfrage erfolgreich - {len(printers)} Drucker gefunden")
# Test Job-Query
jobs = session.query(Job).limit(5).all()
print(f"✓ Job-Abfrage erfolgreich - {len(jobs)} Jobs gefunden")
print("\n🎉 ALLE DATENBANK-TESTS ERFOLGREICH!")
print("Die Anwendung sollte jetzt ohne Fehler starten.")
return True
except Exception as e:
print(f"\n❌ DATENBANK-TEST FEHLGESCHLAGEN: {str(e)}")
return False
if __name__ == "__main__":
test_database_fix()

View File

@@ -1 +0,0 @@

View File

@@ -1,60 +0,0 @@
#!/usr/bin/env python3.11
"""
Skript zur Aktualisierung der Drucker-Standorte in der Datenbank.
Ändert alle Standorte von "Labor" zu "Werk 040 - Berlin - TBA".
"""
import sys
import os
sys.path.append('.')
from database.db_manager import DatabaseManager
from models import Printer
from datetime import datetime
def update_printer_locations():
"""Aktualisiert alle Drucker-Standorte zu 'Werk 040 - Berlin - TBA'."""
print("=== Drucker-Standorte aktualisieren ===")
try:
db = DatabaseManager()
session = db.get_session()
# Alle Drucker abrufen
all_printers = session.query(Printer).all()
print(f"Gefundene Drucker: {len(all_printers)}")
if not all_printers:
print("Keine Drucker in der Datenbank gefunden.")
session.close()
return
# Neue Standort-Bezeichnung
new_location = "Werk 040 - Berlin - TBA"
updated_count = 0
# Alle Drucker durchgehen und Standort aktualisieren
for printer in all_printers:
old_location = printer.location
printer.location = new_location
print(f"{printer.name}: '{old_location}''{new_location}'")
updated_count += 1
# Änderungen speichern
session.commit()
session.close()
print(f"\n{updated_count} Drucker-Standorte erfolgreich aktualisiert")
print(f"Neuer Standort: {new_location}")
print("Standort-Aktualisierung abgeschlossen!")
except Exception as e:
print(f"❌ Fehler bei der Standort-Aktualisierung: {e}")
if 'session' in locals():
session.rollback()
session.close()
if __name__ == "__main__":
update_printer_locations()