📚 Improved backend structure & documentation, added new features, and refactored scripts. 🚀🔧📝💻🖥️

This commit is contained in:
2025-06-01 00:47:00 +02:00
parent 7f38f8a7e5
commit 070f4a6165
20 changed files with 3336 additions and 403 deletions

View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
"""
Robuste Datenbank-Cleanup-Utilities
Verhindert "database is locked" Fehler durch intelligente Retry-Logik und Verbindungsmanagement
"""
import os
import time
import sqlite3
import threading
from datetime import datetime
from typing import Optional, Tuple, List
from contextlib import contextmanager
from sqlalchemy import text, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.pool import StaticPool
from config.settings import DATABASE_PATH
from utils.logging_config import get_logger
logger = get_logger("database_cleanup")
class DatabaseCleanupManager:
"""
Verwaltet sichere Datenbank-Cleanup-Operationen mit Retry-Logik
Verhindert "database is locked" Fehler durch intelligente Session-Verwaltung
"""
def __init__(self):
self._cleanup_lock = threading.Lock()
self._cleanup_completed = False
self._active_engines = []
def register_engine(self, engine: Engine):
"""Registriert eine Engine für das Cleanup"""
with self._cleanup_lock:
if engine not in self._active_engines:
self._active_engines.append(engine)
def force_close_all_connections(self, max_wait_seconds: int = 10) -> bool:
"""
Schließt alle aktiven Datenbankverbindungen forciert
Args:
max_wait_seconds: Maximale Wartezeit für graceful shutdown
Returns:
bool: True wenn erfolgreich
"""
try:
logger.info("🔄 Schließe alle aktiven Datenbankverbindungen...")
# Alle registrierten Engines disposen
with self._cleanup_lock:
for engine in self._active_engines:
try:
logger.debug(f"Disposing Engine: {engine}")
engine.dispose()
except Exception as e:
logger.warning(f"Fehler beim Engine Dispose: {e}")
self._active_engines.clear()
# Kurz warten damit alle Verbindungen sich schließen können
time.sleep(1)
# Prüfe ob noch WAL-Locks bestehen
wal_path = DATABASE_PATH + "-wal"
shm_path = DATABASE_PATH + "-shm"
start_time = time.time()
while time.time() - start_time < max_wait_seconds:
try:
# Teste kurze Verbindung
test_conn = sqlite3.connect(DATABASE_PATH, timeout=2)
test_conn.execute("BEGIN IMMEDIATE") # Teste exklusiven Zugriff
test_conn.rollback()
test_conn.close()
logger.info("✅ Alle Datenbankverbindungen erfolgreich geschlossen")
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
logger.debug(f"Warte auf Verbindungsschließung... ({time.time() - start_time:.1f}s)")
time.sleep(0.5)
continue
else:
raise
logger.warning(f"⚠️ Timeout beim Warten auf Verbindungsschließung ({max_wait_seconds}s)")
return False
except Exception as e:
logger.error(f"❌ Fehler beim Schließen der Verbindungen: {e}")
return False
def safe_wal_checkpoint(self, retry_attempts: int = 5) -> Tuple[bool, Optional[str]]:
"""
Führt sicheren WAL-Checkpoint mit Retry-Logik durch
Args:
retry_attempts: Anzahl der Wiederholungsversuche
Returns:
Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung)
"""
for attempt in range(retry_attempts):
try:
# Kurze, direkte SQLite-Verbindung für Checkpoint
conn = sqlite3.connect(DATABASE_PATH, timeout=10)
# WAL-Checkpoint mit verschiedenen Strategien
strategies = ["TRUNCATE", "RESTART", "FULL", "PASSIVE"]
for strategy in strategies:
try:
cursor = conn.execute(f"PRAGMA wal_checkpoint({strategy})")
result = cursor.fetchone()
if result and result[0] == 0: # Erfolg (0 = success)
pages_transferred = result[1] if len(result) > 1 else 0
pages_reset = result[2] if len(result) > 2 else 0
if pages_transferred > 0:
logger.info(f"✅ WAL-Checkpoint ({strategy}): {pages_transferred} Seiten übertragen, {pages_reset} Seiten zurückgesetzt")
else:
logger.debug(f"WAL-Checkpoint ({strategy}): Keine Seiten zu übertragen")
conn.close()
return True, None
else:
logger.warning(f"WAL-Checkpoint ({strategy}) unvollständig: {result}")
except Exception as strategy_error:
logger.warning(f"WAL-Checkpoint ({strategy}) fehlgeschlagen: {strategy_error}")
continue
conn.close()
# Wenn alle Strategien fehlschlagen, versuche VACUUM als Fallback
if attempt == 0: # Nur beim ersten Versuch
logger.info("Versuche VACUUM als Fallback...")
conn = sqlite3.connect(DATABASE_PATH, timeout=10)
conn.execute("VACUUM")
conn.close()
logger.info("✅ VACUUM erfolgreich")
return True, None
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
wait_time = (2 ** attempt) * 0.5 # Exponential backoff
logger.warning(f"Database locked - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...")
time.sleep(wait_time)
continue
else:
return False, f"SQLite-Fehler: {e}"
except Exception as e:
return False, f"Unerwarteter Fehler: {e}"
return False, f"Database nach {retry_attempts} Versuchen immer noch gesperrt"
def safe_journal_mode_switch(self, target_mode: str = "DELETE", retry_attempts: int = 3) -> Tuple[bool, Optional[str]]:
"""
Führt sicheren Journal-Mode-Switch mit Retry-Logik durch
Args:
target_mode: Ziel-Journal-Mode (DELETE, WAL, etc.)
retry_attempts: Anzahl der Wiederholungsversuche
Returns:
Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung)
"""
for attempt in range(retry_attempts):
try:
conn = sqlite3.connect(DATABASE_PATH, timeout=15)
# Prüfe aktuellen Journal-Mode
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
logger.debug(f"Aktueller Journal-Mode: {current_mode}")
if current_mode.upper() == target_mode.upper():
logger.info(f"Journal-Mode bereits auf {target_mode}")
conn.close()
return True, None
# Mode-Switch durchführen
result = conn.execute(f"PRAGMA journal_mode={target_mode}").fetchone()
new_mode = result[0] if result else None
conn.close()
if new_mode and new_mode.upper() == target_mode.upper():
logger.info(f"✅ Journal-Mode erfolgreich auf {new_mode} umgeschaltet")
return True, None
else:
logger.warning(f"Journal-Mode-Switch unvollständig: {new_mode} != {target_mode}")
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
wait_time = (2 ** attempt) * 1.0 # Exponential backoff
logger.warning(f"Database locked bei Mode-Switch - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...")
time.sleep(wait_time)
continue
else:
return False, f"SQLite-Fehler: {e}"
except Exception as e:
return False, f"Unerwarteter Fehler: {e}"
return False, f"Journal-Mode-Switch nach {retry_attempts} Versuchen fehlgeschlagen"
def comprehensive_cleanup(self, force_mode_switch: bool = True) -> dict:
"""
Führt umfassendes, sicheres Datenbank-Cleanup durch
Args:
force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll
Returns:
dict: Cleanup-Ergebnis mit Details
"""
with self._cleanup_lock:
if self._cleanup_completed:
logger.info("Datenbank-Cleanup bereits durchgeführt")
return {"success": True, "message": "Bereits durchgeführt", "operations": []}
logger.info("🧹 Starte umfassendes Datenbank-Cleanup...")
operations = []
errors = []
try:
# Schritt 1: Alle Verbindungen schließen
logger.info("📝 Schritt 1: Schließe alle Datenbankverbindungen...")
connection_success = self.force_close_all_connections(max_wait_seconds=15)
if connection_success:
operations.append("Alle Verbindungen geschlossen")
else:
errors.append("Timeout beim Verbindungsschließen")
# Schritt 2: WAL-Checkpoint
logger.info("📝 Schritt 2: Führe WAL-Checkpoint durch...")
checkpoint_success, checkpoint_error = self.safe_wal_checkpoint(retry_attempts=5)
if checkpoint_success:
operations.append("WAL-Checkpoint erfolgreich")
else:
errors.append(f"WAL-Checkpoint fehlgeschlagen: {checkpoint_error}")
# Schritt 3: Journal-Mode-Switch (nur wenn gewünscht und Checkpoint erfolgreich)
if force_mode_switch and checkpoint_success:
logger.info("📝 Schritt 3: Schalte Journal-Mode um...")
mode_success, mode_error = self.safe_journal_mode_switch("DELETE", retry_attempts=3)
if mode_success:
operations.append("Journal-Mode auf DELETE umgeschaltet")
else:
errors.append(f"Journal-Mode-Switch fehlgeschlagen: {mode_error}")
logger.warning(f"Journal-Mode-Switch fehlgeschlagen, aber WAL-Checkpoint war erfolgreich")
# Schritt 4: Finale Optimierungen (nur bei Erfolg)
if checkpoint_success:
logger.info("📝 Schritt 4: Finale Optimierungen...")
try:
conn = sqlite3.connect(DATABASE_PATH, timeout=5)
conn.execute("PRAGMA optimize")
conn.close()
operations.append("Datenbank optimiert")
except Exception as opt_error:
logger.warning(f"Optimierung fehlgeschlagen: {opt_error}")
# Schritt 5: Prüfe Ergebnis
wal_path = DATABASE_PATH + "-wal"
shm_path = DATABASE_PATH + "-shm"
wal_exists = os.path.exists(wal_path)
shm_exists = os.path.exists(shm_path)
if not wal_exists and not shm_exists:
operations.append("WAL/SHM-Dateien erfolgreich entfernt")
logger.info("✅ WAL- und SHM-Dateien erfolgreich entfernt")
elif force_mode_switch:
errors.append(f"WAL/SHM-Dateien bestehen noch (WAL: {wal_exists}, SHM: {shm_exists})")
else:
logger.info("WAL/SHM-Dateien bleiben bestehen (kein Mode-Switch angefordert)")
self._cleanup_completed = True
# Erfolgsstatus bestimmen
success = len(operations) > 0 and (not force_mode_switch or not wal_exists)
result = {
"success": success,
"operations": operations,
"errors": errors,
"timestamp": datetime.now().isoformat(),
"wal_files_removed": not wal_exists and not shm_exists
}
if success:
logger.info(f"✅ Datenbank-Cleanup erfolgreich: {', '.join(operations)}")
else:
logger.error(f"❌ Datenbank-Cleanup mit Fehlern: {', '.join(errors)}")
return result
except Exception as e:
error_msg = f"Kritischer Fehler beim Datenbank-Cleanup: {e}"
logger.error(f"{error_msg}")
return {
"success": False,
"operations": operations,
"errors": errors + [error_msg],
"timestamp": datetime.now().isoformat()
}
# Globale Instanz
cleanup_manager = DatabaseCleanupManager()
def get_cleanup_manager() -> DatabaseCleanupManager:
"""Gibt die globale Cleanup-Manager-Instanz zurück"""
return cleanup_manager
def safe_database_cleanup(force_mode_switch: bool = True) -> dict:
"""
Convenience-Funktion für sicheres Datenbank-Cleanup
Args:
force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll
Returns:
dict: Cleanup-Ergebnis
"""
return cleanup_manager.comprehensive_cleanup(force_mode_switch=force_mode_switch)

View File

@@ -0,0 +1,275 @@
#!/usr/bin/env python3
"""
Test-Script für den DatabaseCleanupManager
Validiert die robuste Datenbank-Cleanup-Funktionalität
"""
import os
import sys
import time
import sqlite3
import threading
from datetime import datetime
# Pfad zur App hinzufügen
app_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, app_dir)
from utils.database_cleanup import DatabaseCleanupManager, safe_database_cleanup
from config.settings import DATABASE_PATH
from utils.logging_config import get_logger
logger = get_logger("database_cleanup_test")
def test_basic_cleanup():
"""Test der grundlegenden Cleanup-Funktionalität"""
print("🧪 Test 1: Grundlegende Cleanup-Funktionalität")
try:
# Erstelle Test-DatabaseCleanupManager
cleanup_manager = DatabaseCleanupManager()
# Teste WAL-Checkpoint
checkpoint_success, checkpoint_error = cleanup_manager.safe_wal_checkpoint(retry_attempts=3)
if checkpoint_success:
print("✅ WAL-Checkpoint erfolgreich")
else:
print(f"❌ WAL-Checkpoint fehlgeschlagen: {checkpoint_error}")
# Teste umfassendes Cleanup
cleanup_result = cleanup_manager.comprehensive_cleanup(force_mode_switch=False) # Kein Mode-Switch für Test
if cleanup_result["success"]:
print(f"✅ Umfassendes Cleanup erfolgreich: {', '.join(cleanup_result['operations'])}")
else:
print(f"❌ Umfassendes Cleanup fehlgeschlagen: {', '.join(cleanup_result['errors'])}")
return cleanup_result["success"]
except Exception as e:
print(f"❌ Test 1 fehlgeschlagen: {e}")
return False
def test_concurrent_access():
"""Test des Cleanup-Verhaltens bei gleichzeitigen Datenbankzugriffen"""
print("\n🧪 Test 2: Cleanup bei gleichzeitigen Datenbankzugriffen")
try:
# Worker-Thread der Datenbankoperationen ausführt
def database_worker():
try:
for i in range(5):
conn = sqlite3.connect(DATABASE_PATH, timeout=2)
conn.execute("SELECT COUNT(*) FROM users")
time.sleep(0.5)
conn.close()
print(f" Worker: Datenbankoperation {i+1} abgeschlossen")
except Exception as e:
print(f" Worker-Fehler: {e}")
# Starte Worker-Thread
worker_thread = threading.Thread(target=database_worker, daemon=True)
worker_thread.start()
# Kurz warten damit Worker startet
time.sleep(1)
# Teste Cleanup während Worker läuft
cleanup_manager = DatabaseCleanupManager()
cleanup_result = cleanup_manager.comprehensive_cleanup(force_mode_switch=False)
if cleanup_result["success"]:
print("✅ Cleanup erfolgreich trotz gleichzeitiger Datenbankzugriffe")
else:
print(f"❌ Cleanup fehlgeschlagen: {', '.join(cleanup_result['errors'])}")
# Warte auf Worker
worker_thread.join(timeout=10)
return cleanup_result["success"]
except Exception as e:
print(f"❌ Test 2 fehlgeschlagen: {e}")
return False
def test_error_recovery():
"""Test der Fehlerbehandlung und Recovery-Mechanismen"""
print("\n🧪 Test 3: Fehlerbehandlung und Recovery")
try:
cleanup_manager = DatabaseCleanupManager()
# Teste mit verschiedenen Retry-Parametern
for retry_attempts in [1, 3, 5]:
print(f" Teste mit {retry_attempts} Retry-Versuchen...")
checkpoint_success, checkpoint_error = cleanup_manager.safe_wal_checkpoint(retry_attempts=retry_attempts)
if checkpoint_success:
print(f" ✅ WAL-Checkpoint mit {retry_attempts} Versuchen erfolgreich")
else:
print(f" ⚠️ WAL-Checkpoint mit {retry_attempts} Versuchen: {checkpoint_error}")
return True
except Exception as e:
print(f"❌ Test 3 fehlgeschlagen: {e}")
return False
def test_journal_mode_operations():
"""Test der Journal-Mode-Operationen"""
print("\n🧪 Test 4: Journal-Mode-Operationen")
try:
cleanup_manager = DatabaseCleanupManager()
# Teste aktuellen Journal-Mode
conn = sqlite3.connect(DATABASE_PATH, timeout=5)
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
print(f" Aktueller Journal-Mode: {current_mode}")
conn.close()
# Teste Journal-Mode-Switch (nur wenn bereits WAL-Mode)
if current_mode.upper() == "WAL":
print(" Teste Journal-Mode-Switch...")
# Teste Switch zu WAL (sollte bereits WAL sein)
mode_success, mode_error = cleanup_manager.safe_journal_mode_switch("WAL", retry_attempts=2)
if mode_success:
print(" ✅ Journal-Mode-Switch zu WAL erfolgreich")
else:
print(f" ❌ Journal-Mode-Switch fehlgeschlagen: {mode_error}")
return mode_success
else:
print(f" Database bereits im {current_mode}-Mode, kein Switch-Test nötig")
return True
except Exception as e:
print(f"❌ Test 4 fehlgeschlagen: {e}")
return False
def test_convenience_function():
"""Test der Convenience-Funktion safe_database_cleanup"""
print("\n🧪 Test 5: Convenience-Funktion safe_database_cleanup")
try:
# Teste die einfache Convenience-Funktion
cleanup_result = safe_database_cleanup(force_mode_switch=False)
if cleanup_result["success"]:
print(f"✅ safe_database_cleanup erfolgreich: {', '.join(cleanup_result['operations'])}")
# Prüfe Cleanup-Details
if "timestamp" in cleanup_result:
print(f" Zeitstempel: {cleanup_result['timestamp']}")
if "wal_files_removed" in cleanup_result:
print(f" WAL-Dateien entfernt: {cleanup_result['wal_files_removed']}")
else:
print(f"❌ safe_database_cleanup fehlgeschlagen: {', '.join(cleanup_result['errors'])}")
return cleanup_result["success"]
except Exception as e:
print(f"❌ Test 5 fehlgeschlagen: {e}")
return False
def test_performance():
"""Test der Performance von Cleanup-Operationen"""
print("\n🧪 Test 6: Performance-Test")
try:
cleanup_manager = DatabaseCleanupManager()
# Messe Zeit für verschiedene Operationen
operations = [
("WAL-Checkpoint", lambda: cleanup_manager.safe_wal_checkpoint(retry_attempts=1)),
("Verbindungsschließung", lambda: cleanup_manager.force_close_all_connections(max_wait_seconds=5)),
("Umfassendes Cleanup", lambda: cleanup_manager.comprehensive_cleanup(force_mode_switch=False))
]
for operation_name, operation_func in operations:
start_time = time.time()
try:
result = operation_func()
duration = time.time() - start_time
success = result if isinstance(result, bool) else result[0] if isinstance(result, tuple) else result.get("success", False)
if success:
print(f"{operation_name}: {duration:.3f}s")
else:
print(f" ⚠️ {operation_name}: {duration:.3f}s (mit Problemen)")
except Exception as e:
duration = time.time() - start_time
print(f"{operation_name}: {duration:.3f}s (Fehler: {e})")
return True
except Exception as e:
print(f"❌ Test 6 fehlgeschlagen: {e}")
return False
def main():
"""Hauptfunktion für alle Tests"""
print("🚀 Starte DatabaseCleanupManager Tests")
print(f"Database-Pfad: {DATABASE_PATH}")
print(f"Zeitstempel: {datetime.now().isoformat()}")
print("=" * 60)
# Prüfe ob Datenbankdatei existiert
if not os.path.exists(DATABASE_PATH):
print(f"❌ Datenbankdatei nicht gefunden: {DATABASE_PATH}")
return False
# Führe alle Tests aus
tests = [
("Grundlegende Cleanup-Funktionalität", test_basic_cleanup),
("Cleanup bei gleichzeitigen Zugriffen", test_concurrent_access),
("Fehlerbehandlung und Recovery", test_error_recovery),
("Journal-Mode-Operationen", test_journal_mode_operations),
("Convenience-Funktion", test_convenience_function),
("Performance-Test", test_performance)
]
passed_tests = 0
failed_tests = 0
for test_name, test_func in tests:
try:
if test_func():
passed_tests += 1
print(f"{test_name}: BESTANDEN")
else:
failed_tests += 1
print(f"{test_name}: FEHLGESCHLAGEN")
except Exception as e:
failed_tests += 1
print(f"{test_name}: EXCEPTION - {e}")
print("\n" + "=" * 60)
print(f"📊 Test-Ergebnis: {passed_tests} bestanden, {failed_tests} fehlgeschlagen")
if failed_tests == 0:
print("🎉 Alle Tests bestanden! DatabaseCleanupManager funktioniert korrekt.")
return True
else:
print(f"⚠️ {failed_tests} Test(s) fehlgeschlagen. Überprüfung erforderlich.")
return False
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n🔄 Test durch Benutzer unterbrochen")
sys.exit(130)
except Exception as e:
print(f"💥 Kritischer Fehler beim Testen: {e}")
sys.exit(1)