#!/usr/bin/env python3 """ Optimiertes Datenbank-Schema-Migrationsskript Mit WAL-Checkpoint und ordnungsgemäßer Ressourcenverwaltung """ import os import sys import sqlite3 import signal import time from datetime import datetime import logging from contextlib import contextmanager # Pfad zur App hinzufügen - KORRIGIERT app_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, app_dir) # Alternative Datenbankpfad-Definition falls Import fehlschlägt DATABASE_PATH = None try: from config.settings import DATABASE_PATH except ImportError: # Fallback: Datenbankpfad manuell setzen DATABASE_PATH = os.path.join(app_dir, "database", "myp.db") print(f"⚠️ Fallback: Verwende Datenbankpfad: {DATABASE_PATH}") # Logging-Setup mit Fallback try: from utils.logging_config import get_logger logger = get_logger("schema_migration") except ImportError: # Fallback: Standard-Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("schema_migration") # Globale Variable für sauberes Shutdown _migration_running = False _current_connection = None def signal_handler(signum, frame): """Signal-Handler für ordnungsgemäßes Shutdown""" global _migration_running, _current_connection print(f"\n🛑 Signal {signum} empfangen - beende Migration sauber...") _migration_running = False if _current_connection: try: print("🔄 Führe WAL-Checkpoint durch...") _current_connection.execute("PRAGMA wal_checkpoint(TRUNCATE)") _current_connection.commit() _current_connection.close() print("✅ Datenbank ordnungsgemäß geschlossen") except Exception as e: print(f"⚠️ Fehler beim Schließen: {e}") print("🏁 Migration beendet") sys.exit(0) # Signal-Handler registrieren signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @contextmanager def get_database_connection(timeout=30): """Context Manager für sichere Datenbankverbindung mit WAL-Optimierung""" global _current_connection conn = None try: # Verbindung mit optimierten Einstellungen conn = sqlite3.connect( DATABASE_PATH, timeout=timeout, isolation_level=None # Autocommit aus für manuelle Transaktionen ) _current_connection = conn # WAL-Modus und Optimierungen conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") # Bessere Performance mit WAL conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=30000") # 30 Sekunden Timeout conn.execute("PRAGMA wal_autocheckpoint=1000") # Automatischer Checkpoint alle 1000 Seiten logger.info("Datenbankverbindung mit WAL-Optimierungen hergestellt") yield conn except Exception as e: logger.error(f"Datenbankverbindungsfehler: {e}") if conn: conn.rollback() raise finally: if conn: try: # Kritisch: WAL-Checkpoint vor dem Schließen logger.info("Führe finalen WAL-Checkpoint durch...") conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") conn.commit() # Prüfe WAL-Status wal_info = conn.execute("PRAGMA wal_checkpoint").fetchone() if wal_info: logger.info(f"WAL-Checkpoint: {wal_info[0]} Seiten übertragen, {wal_info[1]} Seiten zurückgesetzt") conn.close() logger.info("Datenbankverbindung ordnungsgemäß geschlossen") except Exception as e: logger.error(f"Fehler beim Schließen der Datenbankverbindung: {e}") finally: _current_connection = None def force_wal_checkpoint(): """Erzwingt WAL-Checkpoint um alle Daten in die Hauptdatei zu schreiben""" try: with get_database_connection(timeout=10) as conn: # Aggressive WAL-Checkpoint-Strategien strategies = [ ("TRUNCATE", "Vollständiger Checkpoint mit WAL-Truncate"), ("RESTART", "Checkpoint mit WAL-Restart"), ("FULL", "Vollständiger Checkpoint") ] for strategy, description in strategies: try: result = conn.execute(f"PRAGMA wal_checkpoint({strategy})").fetchone() if result and result[0] == 0: # Erfolg logger.info(f"✅ {description} erfolgreich: {result}") return True else: logger.warning(f"⚠️ {description} teilweise erfolgreich: {result}") except Exception as e: logger.warning(f"⚠️ {description} fehlgeschlagen: {e}") continue # Fallback: VACUUM für komplette Reorganisation logger.info("Führe VACUUM als Fallback durch...") conn.execute("VACUUM") logger.info("✅ VACUUM erfolgreich") return True except Exception as e: logger.error(f"Kritischer Fehler bei WAL-Checkpoint: {e}") return False def optimize_migration_performance(): """Optimiert die Datenbank für die Migration""" try: with get_database_connection(timeout=5) as conn: # Performance-Optimierungen für Migration optimizations = [ ("PRAGMA cache_size = -64000", "Cache-Größe auf 64MB erhöht"), ("PRAGMA temp_store = MEMORY", "Temp-Store in Memory"), ("PRAGMA mmap_size = 268435456", "Memory-Mapped I/O aktiviert"), ("PRAGMA optimize", "Automatische Optimierungen") ] for pragma, description in optimizations: try: conn.execute(pragma) logger.info(f"✅ {description}") except Exception as e: logger.warning(f"⚠️ Optimierung fehlgeschlagen ({description}): {e}") except Exception as e: logger.warning(f"Fehler bei Performance-Optimierung: {e}") def main(): """Führt die optimierte Schema-Migration aus.""" global _migration_running _migration_running = True try: logger.info("🚀 Starte optimierte Datenbank-Schema-Migration...") # Überprüfe Datenbankdatei if not os.path.exists(DATABASE_PATH): logger.error(f"❌ Datenbankdatei nicht gefunden: {DATABASE_PATH}") return False # Initial WAL-Checkpoint um sauberen Zustand sicherzustellen logger.info("🔄 Führe initialen WAL-Checkpoint durch...") force_wal_checkpoint() # Performance-Optimierungen optimize_migration_performance() # Eigentliche Migration mit optimierter Verbindung with get_database_connection(timeout=60) as conn: cursor = conn.cursor() # Backup erstellen (mit Timeout) backup_path = f"{DATABASE_PATH}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: logger.info(f"📦 Erstelle Backup: {backup_path}") cursor.execute(f"VACUUM INTO '{backup_path}'") logger.info("✅ Backup erfolgreich erstellt") except Exception as e: logger.warning(f"⚠️ Backup-Erstellung fehlgeschlagen: {e}") # Migrationen durchführen (verkürzt für bessere Performance) migrations_performed = [] if not _migration_running: return False # Schnelle Schema-Checks try: # Test der kritischen Abfrage cursor.execute("SELECT COUNT(*) FROM guest_requests WHERE duration_minutes IS NOT NULL") logger.info("✅ Schema-Integritätstest bestanden") except Exception: logger.info("🔧 Führe kritische Schema-Reparaturen durch...") # Nur die wichtigsten Reparaturen critical_fixes = [ ("ALTER TABLE guest_requests ADD COLUMN duration_minutes INTEGER", "duration_minutes zu guest_requests"), ("ALTER TABLE users ADD COLUMN username VARCHAR(100)", "username zu users"), ("UPDATE users SET username = email WHERE username IS NULL", "Username-Fallback") ] for sql, description in critical_fixes: if not _migration_running: break try: cursor.execute(sql) logger.info(f"✅ {description}") migrations_performed.append(description) except sqlite3.OperationalError as e: if "duplicate column" not in str(e).lower(): logger.warning(f"⚠️ {description}: {e}") # Commit und WAL-Checkpoint zwischen Operationen if migrations_performed: conn.commit() cursor.execute("PRAGMA wal_checkpoint(PASSIVE)") # Finale Optimierungen (reduziert) if _migration_running: essential_indices = [ "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)", "CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)", "CREATE INDEX IF NOT EXISTS idx_guest_requests_status ON guest_requests(status)" ] for index_sql in essential_indices: try: cursor.execute(index_sql) except Exception: pass # Indices sind nicht kritisch # Finale Statistiken cursor.execute("ANALYZE") migrations_performed.append("optimizations") # Finale Commit conn.commit() logger.info(f"✅ Migration abgeschlossen. Bereiche: {', '.join(migrations_performed)}") # Abschließender WAL-Checkpoint logger.info("🔄 Führe abschließenden WAL-Checkpoint durch...") force_wal_checkpoint() # Kurze Pause um sicherzustellen, dass alle I/O-Operationen abgeschlossen sind time.sleep(1) logger.info("🎉 Optimierte Schema-Migration erfolgreich abgeschlossen!") return True except KeyboardInterrupt: logger.info("🔄 Migration durch Benutzer unterbrochen") return False except Exception as e: logger.error(f"❌ Kritischer Fehler bei der Migration: {str(e)}") return False finally: _migration_running = False # Finale WAL-Bereinigung try: force_wal_checkpoint() except Exception: pass if __name__ == "__main__": success = main() if not success: sys.exit(1)