manage-your-printer/utils/database_cleanup.py
2025-06-04 10:03:22 +02:00

336 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Robuste Datenbank-Cleanup-Utilities
Verhindert "database is locked" Fehler durch intelligente Retry-Logik und Verbindungsmanagement
"""
import os
import time
import sqlite3
import threading
from datetime import datetime
from typing import Optional, Tuple, List
from contextlib import contextmanager
from sqlalchemy import text, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.pool import StaticPool
from config.settings import DATABASE_PATH
from utils.logging_config import get_logger
logger = get_logger("database_cleanup")
class DatabaseCleanupManager:
"""
Verwaltet sichere Datenbank-Cleanup-Operationen mit Retry-Logik
Verhindert "database is locked" Fehler durch intelligente Session-Verwaltung
"""
def __init__(self):
self._cleanup_lock = threading.Lock()
self._cleanup_completed = False
self._active_engines = []
def register_engine(self, engine: Engine):
"""Registriert eine Engine für das Cleanup"""
with self._cleanup_lock:
if engine not in self._active_engines:
self._active_engines.append(engine)
def force_close_all_connections(self, max_wait_seconds: int = 10) -> bool:
"""
Schließt alle aktiven Datenbankverbindungen forciert
Args:
max_wait_seconds: Maximale Wartezeit für graceful shutdown
Returns:
bool: True wenn erfolgreich
"""
try:
logger.info("🔄 Schließe alle aktiven Datenbankverbindungen...")
# Alle registrierten Engines disposen
with self._cleanup_lock:
for engine in self._active_engines:
try:
logger.debug(f"Disposing Engine: {engine}")
engine.dispose()
except Exception as e:
logger.warning(f"Fehler beim Engine Dispose: {e}")
self._active_engines.clear()
# Kurz warten damit alle Verbindungen sich schließen können
time.sleep(1)
# Prüfe ob noch WAL-Locks bestehen
wal_path = DATABASE_PATH + "-wal"
shm_path = DATABASE_PATH + "-shm"
start_time = time.time()
while time.time() - start_time < max_wait_seconds:
try:
# Teste kurze Verbindung
test_conn = sqlite3.connect(DATABASE_PATH, timeout=2)
test_conn.execute("BEGIN IMMEDIATE") # Teste exklusiven Zugriff
test_conn.rollback()
test_conn.close()
logger.info("✅ Alle Datenbankverbindungen erfolgreich geschlossen")
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
logger.debug(f"Warte auf Verbindungsschließung... ({time.time() - start_time:.1f}s)")
time.sleep(0.5)
continue
else:
raise
logger.warning(f"⚠️ Timeout beim Warten auf Verbindungsschließung ({max_wait_seconds}s)")
return False
except Exception as e:
logger.error(f"❌ Fehler beim Schließen der Verbindungen: {e}")
return False
def safe_wal_checkpoint(self, retry_attempts: int = 5) -> Tuple[bool, Optional[str]]:
"""
Führt sicheren WAL-Checkpoint mit Retry-Logik durch
Args:
retry_attempts: Anzahl der Wiederholungsversuche
Returns:
Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung)
"""
for attempt in range(retry_attempts):
try:
# Kurze, direkte SQLite-Verbindung für Checkpoint
conn = sqlite3.connect(DATABASE_PATH, timeout=10)
# WAL-Checkpoint mit verschiedenen Strategien
strategies = ["TRUNCATE", "RESTART", "FULL", "PASSIVE"]
for strategy in strategies:
try:
cursor = conn.execute(f"PRAGMA wal_checkpoint({strategy})")
result = cursor.fetchone()
if result and result[0] == 0: # Erfolg (0 = success)
pages_transferred = result[1] if len(result) > 1 else 0
pages_reset = result[2] if len(result) > 2 else 0
if pages_transferred > 0:
logger.info(f"✅ WAL-Checkpoint ({strategy}): {pages_transferred} Seiten übertragen, {pages_reset} Seiten zurückgesetzt")
else:
logger.debug(f"WAL-Checkpoint ({strategy}): Keine Seiten zu übertragen")
conn.close()
return True, None
else:
logger.warning(f"WAL-Checkpoint ({strategy}) unvollständig: {result}")
except Exception as strategy_error:
logger.warning(f"WAL-Checkpoint ({strategy}) fehlgeschlagen: {strategy_error}")
continue
conn.close()
# Wenn alle Strategien fehlschlagen, versuche VACUUM als Fallback
if attempt == 0: # Nur beim ersten Versuch
logger.info("Versuche VACUUM als Fallback...")
conn = sqlite3.connect(DATABASE_PATH, timeout=10)
conn.execute("VACUUM")
conn.close()
logger.info("✅ VACUUM erfolgreich")
return True, None
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
wait_time = (2 ** attempt) * 0.5 # Exponential backoff
logger.warning(f"Database locked - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...")
time.sleep(wait_time)
continue
else:
return False, f"SQLite-Fehler: {e}"
except Exception as e:
return False, f"Unerwarteter Fehler: {e}"
return False, f"Database nach {retry_attempts} Versuchen immer noch gesperrt"
def safe_journal_mode_switch(self, target_mode: str = "DELETE", retry_attempts: int = 3) -> Tuple[bool, Optional[str]]:
"""
Führt sicheren Journal-Mode-Switch mit Retry-Logik durch
Args:
target_mode: Ziel-Journal-Mode (DELETE, WAL, etc.)
retry_attempts: Anzahl der Wiederholungsversuche
Returns:
Tuple[bool, Optional[str]]: (Erfolg, Fehlermeldung)
"""
for attempt in range(retry_attempts):
try:
conn = sqlite3.connect(DATABASE_PATH, timeout=15)
# Prüfe aktuellen Journal-Mode
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
logger.debug(f"Aktueller Journal-Mode: {current_mode}")
if current_mode.upper() == target_mode.upper():
logger.info(f"Journal-Mode bereits auf {target_mode}")
conn.close()
return True, None
# Mode-Switch durchführen
result = conn.execute(f"PRAGMA journal_mode={target_mode}").fetchone()
new_mode = result[0] if result else None
conn.close()
if new_mode and new_mode.upper() == target_mode.upper():
logger.info(f"✅ Journal-Mode erfolgreich auf {new_mode} umgeschaltet")
return True, None
else:
logger.warning(f"Journal-Mode-Switch unvollständig: {new_mode} != {target_mode}")
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
wait_time = (2 ** attempt) * 1.0 # Exponential backoff
logger.warning(f"Database locked bei Mode-Switch - Versuch {attempt + 1}/{retry_attempts}, warte {wait_time}s...")
time.sleep(wait_time)
continue
else:
return False, f"SQLite-Fehler: {e}"
except Exception as e:
return False, f"Unerwarteter Fehler: {e}"
return False, f"Journal-Mode-Switch nach {retry_attempts} Versuchen fehlgeschlagen"
def comprehensive_cleanup(self, force_mode_switch: bool = True) -> dict:
"""
Führt umfassendes, sicheres Datenbank-Cleanup durch
Args:
force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll
Returns:
dict: Cleanup-Ergebnis mit Details
"""
with self._cleanup_lock:
if self._cleanup_completed:
logger.info("Datenbank-Cleanup bereits durchgeführt")
return {"success": True, "message": "Bereits durchgeführt", "operations": []}
logger.info("🧹 Starte umfassendes Datenbank-Cleanup...")
operations = []
errors = []
try:
# Schritt 1: Alle Verbindungen schließen
logger.info("📝 Schritt 1: Schließe alle Datenbankverbindungen...")
connection_success = self.force_close_all_connections(max_wait_seconds=15)
if connection_success:
operations.append("Alle Verbindungen geschlossen")
else:
errors.append("Timeout beim Verbindungsschließen")
# Schritt 2: WAL-Checkpoint
logger.info("📝 Schritt 2: Führe WAL-Checkpoint durch...")
checkpoint_success, checkpoint_error = self.safe_wal_checkpoint(retry_attempts=5)
if checkpoint_success:
operations.append("WAL-Checkpoint erfolgreich")
else:
errors.append(f"WAL-Checkpoint fehlgeschlagen: {checkpoint_error}")
# Schritt 3: Journal-Mode-Switch (nur wenn gewünscht und Checkpoint erfolgreich)
if force_mode_switch and checkpoint_success:
logger.info("📝 Schritt 3: Schalte Journal-Mode um...")
mode_success, mode_error = self.safe_journal_mode_switch("DELETE", retry_attempts=3)
if mode_success:
operations.append("Journal-Mode auf DELETE umgeschaltet")
else:
errors.append(f"Journal-Mode-Switch fehlgeschlagen: {mode_error}")
logger.warning(f"Journal-Mode-Switch fehlgeschlagen, aber WAL-Checkpoint war erfolgreich")
# Schritt 4: Finale Optimierungen (nur bei Erfolg)
if checkpoint_success:
logger.info("📝 Schritt 4: Finale Optimierungen...")
try:
conn = sqlite3.connect(DATABASE_PATH, timeout=5)
conn.execute("PRAGMA optimize")
conn.close()
operations.append("Datenbank optimiert")
except Exception as opt_error:
logger.warning(f"Optimierung fehlgeschlagen: {opt_error}")
# Schritt 5: Prüfe Ergebnis
wal_path = DATABASE_PATH + "-wal"
shm_path = DATABASE_PATH + "-shm"
wal_exists = os.path.exists(wal_path)
shm_exists = os.path.exists(shm_path)
if not wal_exists and not shm_exists:
operations.append("WAL/SHM-Dateien erfolgreich entfernt")
logger.info("✅ WAL- und SHM-Dateien erfolgreich entfernt")
elif force_mode_switch:
errors.append(f"WAL/SHM-Dateien bestehen noch (WAL: {wal_exists}, SHM: {shm_exists})")
else:
logger.info("WAL/SHM-Dateien bleiben bestehen (kein Mode-Switch angefordert)")
self._cleanup_completed = True
# Erfolgsstatus bestimmen
success = len(operations) > 0 and (not force_mode_switch or not wal_exists)
result = {
"success": success,
"operations": operations,
"errors": errors,
"timestamp": datetime.now().isoformat(),
"wal_files_removed": not wal_exists and not shm_exists
}
if success:
logger.info(f"✅ Datenbank-Cleanup erfolgreich: {', '.join(operations)}")
else:
logger.error(f"❌ Datenbank-Cleanup mit Fehlern: {', '.join(errors)}")
return result
except Exception as e:
error_msg = f"Kritischer Fehler beim Datenbank-Cleanup: {e}"
logger.error(f"{error_msg}")
return {
"success": False,
"operations": operations,
"errors": errors + [error_msg],
"timestamp": datetime.now().isoformat()
}
# Globale Instanz
cleanup_manager = DatabaseCleanupManager()
def get_cleanup_manager() -> DatabaseCleanupManager:
"""Gibt die globale Cleanup-Manager-Instanz zurück"""
return cleanup_manager
def safe_database_cleanup(force_mode_switch: bool = True) -> dict:
"""
Convenience-Funktion für sicheres Datenbank-Cleanup
Args:
force_mode_switch: Ob Journal-Mode forciert umgeschaltet werden soll
Returns:
dict: Cleanup-Ergebnis
"""
return cleanup_manager.comprehensive_cleanup(force_mode_switch=force_mode_switch)