#!/usr/bin/env python3 """ Robuste Datenbank-Cleanup-Utilities Verhindert "database is locked" Fehler durch intelligente Retry-Logik und Verbindungsmanagement """ import os import time import sqlite3 import threading from datetime import datetime from typing import Optional, Tuple, List from contextlib import contextmanager from sqlalchemy import text, create_engine from sqlalchemy.engine import Engine from sqlalchemy.pool import StaticPool from config.settings import DATABASE_PATH from utils.logging_config import get_logger logger = get_logger("database_cleanup") class DatabaseCleanupManager: """ Verwaltet sichere Datenbank-Cleanup-Operationen mit Retry-Logik Verhindert "database is locked" Fehler durch intelligente Session-Verwaltung """ def __init__(self): self._cleanup_lock = threading.Lock() self._cleanup_completed = False self._active_engines = [] def register_engine(self, engine: Engine): """Registriert eine Engine für das Cleanup""" with self._cleanup_lock: if engine not in self._active_engines: self._active_engines.append(engine) def force_close_all_connections(self, max_wait_seconds: int = 10) -> bool: """ Schließt alle aktiven Datenbankverbindungen forciert Args: max_wait_seconds: Maximale Wartezeit für graceful shutdown Returns: bool: True wenn erfolgreich """ try: logger.info("🔄 Schließe alle aktiven Datenbankverbindungen...") # Alle registrierten Engines disposen with self._cleanup_lock: for engine in self._active_engines: try: logger.debug(f"Disposing Engine: {engine}") engine.dispose() except Exception as e: logger.warning(f"Fehler beim Engine Dispose: {e}") self._active_engines.clear() # Kurz warten damit alle Verbindungen sich schließen können time.sleep(1) # Prüfe ob noch WAL-Locks bestehen wal_path = DATABASE_PATH + "-wal" shm_path = DATABASE_PATH + "-shm" start_time = time.time() while time.time() - start_time < max_wait_seconds: try: # Teste kurze Verbindung test_conn = sqlite3.connect(DATABASE_PATH, timeout=2) test_conn.execute("BEGIN IMMEDIATE") # Teste exklusiven Zugriff test_conn.rollback() test_conn.close() logger.info("✅ Alle Datenbankverbindungen erfolgreich geschlossen") return True except sqlite3.OperationalError as e: if "database is locked" in str(e): logger.debug(f"Warte auf Verbindungsschließung... ({time.time() - start_time:.1f}s)") time.sleep(0.5) continue else: raise logger.warning(f"⚠️ Timeout beim Warten auf Verbindungsschließung ({max_wait_seconds}s)") return False except Exception as e: logger.error(f"❌ Fehler beim Schließen der Verbindungen: {e}") return False def safe_wal_checkpoint(self, retry_attempts: int = 5) -> Tuple[bool, Optional[str]]: """ Führt sicheren WAL-Checkpoint mit Retry-Logik durch Args: retry_attempts: Anzahl der Wiederholungsversuche Returns: Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung) """ for attempt in range(retry_attempts): try: # Kurze, direkte SQLite-Verbindung für Checkpoint conn = sqlite3.connect(DATABASE_PATH, timeout=10) # WAL-Checkpoint mit verschiedenen Strategien strategies = ["TRUNCATE", "RESTART", "FULL", "PASSIVE"] for strategy in strategies: try: cursor = conn.execute(f"PRAGMA wal_checkpoint({strategy})") result = cursor.fetchone() if result and result[0] == 0: # Erfolg (0 = success) pages_transferred = result[1] if len(result) > 1 else 0 pages_reset = result[2] if len(result) > 2 else 0 if pages_transferred > 0: logger.info(f"✅ WAL-Checkpoint ({strategy}): {pages_transferred} Seiten übertragen, {pages_reset} Seiten zurückgesetzt") else: logger.debug(f"WAL-Checkpoint ({strategy}): Keine Seiten zu übertragen") conn.close() return True, None else: logger.warning(f"WAL-Checkpoint ({strategy}) unvollständig: {result}") except Exception as strategy_error: logger.warning(f"WAL-Checkpoint ({strategy}) fehlgeschlagen: {strategy_error}") continue conn.close() # Wenn alle Strategien fehlschlagen, versuche VACUUM als Fallback if attempt == 0: # Nur beim ersten Versuch logger.info("Versuche VACUUM als Fallback...") conn = sqlite3.connect(DATABASE_PATH, timeout=10) conn.execute("VACUUM") conn.close() logger.info("✅ VACUUM erfolgreich") return True, None except sqlite3.OperationalError as e: if "database is locked" in str(e): wait_time = (2 ** attempt) * 0.5 # Exponential backoff logger.warning(f"Database locked - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...") time.sleep(wait_time) continue else: return False, f"SQLite-Fehler: {e}" except Exception as e: return False, f"Unerwarteter Fehler: {e}" return False, f"Database nach {retry_attempts} Versuchen immer noch gesperrt" def safe_journal_mode_switch(self, target_mode: str = "DELETE", retry_attempts: int = 3) -> Tuple[bool, Optional[str]]: """ Führt sicheren Journal-Mode-Switch mit Retry-Logik durch Args: target_mode: Ziel-Journal-Mode (DELETE, WAL, etc.) retry_attempts: Anzahl der Wiederholungsversuche Returns: Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung) """ for attempt in range(retry_attempts): try: conn = sqlite3.connect(DATABASE_PATH, timeout=15) # Prüfe aktuellen Journal-Mode current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0] logger.debug(f"Aktueller Journal-Mode: {current_mode}") if current_mode.upper() == target_mode.upper(): logger.info(f"Journal-Mode bereits auf {target_mode}") conn.close() return True, None # Mode-Switch durchführen result = conn.execute(f"PRAGMA journal_mode={target_mode}").fetchone() new_mode = result[0] if result else None conn.close() if new_mode and new_mode.upper() == target_mode.upper(): logger.info(f"✅ Journal-Mode erfolgreich auf {new_mode} umgeschaltet") return True, None else: logger.warning(f"Journal-Mode-Switch unvollständig: {new_mode} != {target_mode}") except sqlite3.OperationalError as e: if "database is locked" in str(e): wait_time = (2 ** attempt) * 1.0 # Exponential backoff logger.warning(f"Database locked bei Mode-Switch - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...") time.sleep(wait_time) continue else: return False, f"SQLite-Fehler: {e}" except Exception as e: return False, f"Unerwarteter Fehler: {e}" return False, f"Journal-Mode-Switch nach {retry_attempts} Versuchen fehlgeschlagen" def comprehensive_cleanup(self, force_mode_switch: bool = True) -> dict: """ Führt umfassendes, sicheres Datenbank-Cleanup durch Args: force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll Returns: dict: Cleanup-Ergebnis mit Details """ with self._cleanup_lock: if self._cleanup_completed: logger.info("Datenbank-Cleanup bereits durchgeführt") return {"success": True, "message": "Bereits durchgeführt", "operations": []} logger.info("🧹 Starte umfassendes Datenbank-Cleanup...") operations = [] errors = [] try: # Schritt 1: Alle Verbindungen schließen logger.info("📝 Schritt 1: Schließe alle Datenbankverbindungen...") connection_success = self.force_close_all_connections(max_wait_seconds=15) if connection_success: operations.append("Alle Verbindungen geschlossen") else: errors.append("Timeout beim Verbindungsschließen") # Schritt 2: WAL-Checkpoint logger.info("📝 Schritt 2: Führe WAL-Checkpoint durch...") checkpoint_success, checkpoint_error = self.safe_wal_checkpoint(retry_attempts=5) if checkpoint_success: operations.append("WAL-Checkpoint erfolgreich") else: errors.append(f"WAL-Checkpoint fehlgeschlagen: {checkpoint_error}") # Schritt 3: Journal-Mode-Switch (nur wenn gewünscht und Checkpoint erfolgreich) if force_mode_switch and checkpoint_success: logger.info("📝 Schritt 3: Schalte Journal-Mode um...") mode_success, mode_error = self.safe_journal_mode_switch("DELETE", retry_attempts=3) if mode_success: operations.append("Journal-Mode auf DELETE umgeschaltet") else: errors.append(f"Journal-Mode-Switch fehlgeschlagen: {mode_error}") logger.warning(f"Journal-Mode-Switch fehlgeschlagen, aber WAL-Checkpoint war erfolgreich") # Schritt 4: Finale Optimierungen (nur bei Erfolg) if checkpoint_success: logger.info("📝 Schritt 4: Finale Optimierungen...") try: conn = sqlite3.connect(DATABASE_PATH, timeout=5) conn.execute("PRAGMA optimize") conn.close() operations.append("Datenbank optimiert") except Exception as opt_error: logger.warning(f"Optimierung fehlgeschlagen: {opt_error}") # Schritt 5: Prüfe Ergebnis wal_path = DATABASE_PATH + "-wal" shm_path = DATABASE_PATH + "-shm" wal_exists = os.path.exists(wal_path) shm_exists = os.path.exists(shm_path) if not wal_exists and not shm_exists: operations.append("WAL/SHM-Dateien erfolgreich entfernt") logger.info("✅ WAL- und SHM-Dateien erfolgreich entfernt") elif force_mode_switch: errors.append(f"WAL/SHM-Dateien bestehen noch (WAL: {wal_exists}, SHM: {shm_exists})") else: logger.info("WAL/SHM-Dateien bleiben bestehen (kein Mode-Switch angefordert)") self._cleanup_completed = True # Erfolgsstatus bestimmen success = len(operations) > 0 and (not force_mode_switch or not wal_exists) result = { "success": success, "operations": operations, "errors": errors, "timestamp": datetime.now().isoformat(), "wal_files_removed": not wal_exists and not shm_exists } if success: logger.info(f"✅ Datenbank-Cleanup erfolgreich: {', '.join(operations)}") else: logger.error(f"❌ Datenbank-Cleanup mit Fehlern: {', '.join(errors)}") return result except Exception as e: error_msg = f"Kritischer Fehler beim Datenbank-Cleanup: {e}" logger.error(f"❌ {error_msg}") return { "success": False, "operations": operations, "errors": errors + [error_msg], "timestamp": datetime.now().isoformat() } # Globale Instanz cleanup_manager = DatabaseCleanupManager() def get_cleanup_manager() -> DatabaseCleanupManager: """Gibt die globale Cleanup-Manager-Instanz zurück""" return cleanup_manager def safe_database_cleanup(force_mode_switch: bool = True) -> dict: """ Convenience-Funktion für sicheres Datenbank-Cleanup Args: force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll Returns: dict: Cleanup-Ergebnis """ return cleanup_manager.comprehensive_cleanup(force_mode_switch=force_mode_switch)