""" Erweiterte Datenbank-Utilities für Backup, Monitoring und Wartung. """ import os import shutil import sqlite3 import threading import time import gzip from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from pathlib import Path from sqlalchemy import text from sqlalchemy.engine import Engine from config.settings import DATABASE_PATH from utils.logging_config import get_logger from models import get_cached_session, create_optimized_engine logger = get_logger("database") # ===== BACKUP-SYSTEM ===== class DatabaseBackupManager: """ Verwaltet automatische Datenbank-Backups mit Rotation. """ def __init__(self, backup_dir: str = None): self.backup_dir = backup_dir or os.path.join(os.path.dirname(DATABASE_PATH), "backups") self.ensure_backup_directory() self._backup_lock = threading.Lock() def ensure_backup_directory(self): """Stellt sicher, dass das Backup-Verzeichnis existiert.""" Path(self.backup_dir).mkdir(parents=True, exist_ok=True) def create_backup(self, compress: bool = True) -> str: """ Erstellt ein Backup der Datenbank. Args: compress: Ob das Backup komprimiert werden soll Returns: str: Pfad zum erstellten Backup """ with self._backup_lock: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"myp_backup_{timestamp}.db" if compress: backup_filename += ".gz" backup_path = os.path.join(self.backup_dir, backup_filename) try: if compress: # Komprimiertes Backup erstellen with open(DATABASE_PATH, 'rb') as f_in: with gzip.open(backup_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: # Einfache Kopie shutil.copy2(DATABASE_PATH, backup_path) logger.info(f"Datenbank-Backup erstellt: {backup_path}") return backup_path except Exception as e: logger.error(f"Fehler beim Erstellen des Backups: {str(e)}") raise def restore_backup(self, backup_path: str) -> bool: """ Stellt ein Backup wieder her. Args: backup_path: Pfad zum Backup Returns: bool: True bei Erfolg """ with self._backup_lock: try: # Aktuelles Backup der bestehenden DB erstellen current_backup = self.create_backup() logger.info(f"Sicherheitsbackup erstellt: {current_backup}") if backup_path.endswith('.gz'): # Komprimiertes Backup wiederherstellen with gzip.open(backup_path, 'rb') as f_in: with open(DATABASE_PATH, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: # Einfache Kopie shutil.copy2(backup_path, DATABASE_PATH) logger.info(f"Datenbank aus Backup wiederhergestellt: {backup_path}") return True except Exception as e: logger.error(f"Fehler beim Wiederherstellen des Backups: {str(e)}") return False def cleanup_old_backups(self, keep_days: int = 30): """ Löscht alte Backups. Args: keep_days: Anzahl Tage, die Backups aufbewahrt werden sollen """ cutoff_date = datetime.now() - timedelta(days=keep_days) deleted_count = 0 try: for filename in os.listdir(self.backup_dir): if filename.startswith("myp_backup_"): file_path = os.path.join(self.backup_dir, filename) file_time = datetime.fromtimestamp(os.path.getctime(file_path)) if file_time < cutoff_date: os.remove(file_path) deleted_count += 1 logger.info(f"Altes Backup gelöscht: {filename}") if deleted_count > 0: logger.info(f"{deleted_count} alte Backups gelöscht") except Exception as e: logger.error(f"Fehler beim Bereinigen alter Backups: {str(e)}") def get_backup_list(self) -> List[Dict]: """ Gibt eine Liste aller verfügbaren Backups zurück. Returns: List[Dict]: Liste mit Backup-Informationen """ backups = [] try: for filename in os.listdir(self.backup_dir): if filename.startswith("myp_backup_"): file_path = os.path.join(self.backup_dir, filename) file_stat = os.stat(file_path) backups.append({ "filename": filename, "path": file_path, "size": file_stat.st_size, "created": datetime.fromtimestamp(file_stat.st_ctime), "compressed": filename.endswith('.gz') }) # Nach Erstellungsdatum sortieren (neueste zuerst) backups.sort(key=lambda x: x['created'], reverse=True) except Exception as e: logger.error(f"Fehler beim Abrufen der Backup-Liste: {str(e)}") return backups # ===== DATENBANK-MONITORING ===== class DatabaseMonitor: """ Überwacht die Datenbank-Performance und -Gesundheit. """ def __init__(self): self.engine = create_optimized_engine() def get_database_stats(self) -> Dict: """ Sammelt Datenbank-Statistiken. Returns: Dict: Datenbank-Statistiken """ stats = {} try: with self.engine.connect() as conn: # Datenbankgröße result = conn.execute(text("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()")) db_size = result.fetchone()[0] stats['database_size_bytes'] = db_size stats['database_size_mb'] = round(db_size / (1024 * 1024), 2) # WAL-Datei-Größe wal_path = DATABASE_PATH + "-wal" if os.path.exists(wal_path): wal_size = os.path.getsize(wal_path) stats['wal_size_bytes'] = wal_size stats['wal_size_mb'] = round(wal_size / (1024 * 1024), 2) else: stats['wal_size_bytes'] = 0 stats['wal_size_mb'] = 0 # Journal-Modus result = conn.execute(text("PRAGMA journal_mode")) stats['journal_mode'] = result.fetchone()[0] # Cache-Statistiken result = conn.execute(text("PRAGMA cache_size")) stats['cache_size'] = result.fetchone()[0] # Synchronous-Modus result = conn.execute(text("PRAGMA synchronous")) stats['synchronous_mode'] = result.fetchone()[0] # Tabellen-Statistiken result = conn.execute(text(""" SELECT name, (SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=m.name) as table_count FROM sqlite_master m WHERE type='table' """)) table_stats = {} for table_name, _ in result.fetchall(): if not table_name.startswith('sqlite_'): count_result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")) table_stats[table_name] = count_result.fetchone()[0] stats['table_counts'] = table_stats # Letzte Wartung stats['last_analyze'] = self._get_last_analyze_time() stats['last_vacuum'] = self._get_last_vacuum_time() except Exception as e: logger.error(f"Fehler beim Sammeln der Datenbank-Statistiken: {str(e)}") stats['error'] = str(e) return stats def _get_last_analyze_time(self) -> Optional[str]: """Ermittelt den Zeitpunkt der letzten ANALYZE-Operation.""" try: # SQLite speichert keine direkten Timestamps für ANALYZE # Wir verwenden die Modifikationszeit der Statistik-Tabellen stat_path = DATABASE_PATH + "-stat" if os.path.exists(stat_path): return datetime.fromtimestamp(os.path.getmtime(stat_path)).isoformat() except: pass return None def _get_last_vacuum_time(self) -> Optional[str]: """Ermittelt den Zeitpunkt der letzten VACUUM-Operation.""" try: # Approximation über Datei-Modifikationszeit return datetime.fromtimestamp(os.path.getmtime(DATABASE_PATH)).isoformat() except: pass return None def check_database_health(self) -> Dict: """ Führt eine Gesundheitsprüfung der Datenbank durch. Returns: Dict: Gesundheitsstatus """ health = { "status": "healthy", "issues": [], "recommendations": [] } try: with self.engine.connect() as conn: # Integritätsprüfung result = conn.execute(text("PRAGMA integrity_check")) integrity_result = result.fetchone()[0] if integrity_result != "ok": health["status"] = "critical" health["issues"].append(f"Integritätsprüfung fehlgeschlagen: {integrity_result}") # WAL-Dateigröße prüfen wal_path = DATABASE_PATH + "-wal" if os.path.exists(wal_path): wal_size_mb = os.path.getsize(wal_path) / (1024 * 1024) if wal_size_mb > 100: # Über 100MB health["issues"].append(f"WAL-Datei sehr groß: {wal_size_mb:.1f}MB") health["recommendations"].append("WAL-Checkpoint durchführen") # Freier Speicherplatz prüfen db_dir = os.path.dirname(DATABASE_PATH) free_space = shutil.disk_usage(db_dir).free / (1024 * 1024 * 1024) # GB if free_space < 1: # Weniger als 1GB health["status"] = "warning" if health["status"] == "healthy" else health["status"] health["issues"].append(f"Wenig freier Speicherplatz: {free_space:.1f}GB") health["recommendations"].append("Speicherplatz freigeben oder alte Backups löschen") # Connection Pool Status (falls verfügbar) # Hier könnten weitere Checks hinzugefügt werden except Exception as e: health["status"] = "error" health["issues"].append(f"Fehler bei Gesundheitsprüfung: {str(e)}") logger.error(f"Fehler bei Datenbank-Gesundheitsprüfung: {str(e)}") return health def optimize_database(self) -> Dict: """ Führt Optimierungsoperationen auf der Datenbank durch. Returns: Dict: Ergebnis der Optimierung """ result = { "operations": [], "success": True, "errors": [] } try: with self.engine.connect() as conn: # ANALYZE für bessere Query-Planung conn.execute(text("ANALYZE")) result["operations"].append("ANALYZE ausgeführt") # WAL-Checkpoint checkpoint_result = conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)")) checkpoint_info = checkpoint_result.fetchone() result["operations"].append(f"WAL-Checkpoint: {checkpoint_info}") # Incremental Vacuum conn.execute(text("PRAGMA incremental_vacuum")) result["operations"].append("Incremental Vacuum ausgeführt") # Optimize Pragma conn.execute(text("PRAGMA optimize")) result["operations"].append("PRAGMA optimize ausgeführt") conn.commit() except Exception as e: result["success"] = False result["errors"].append(str(e)) logger.error(f"Fehler bei Datenbank-Optimierung: {str(e)}") return result # ===== AUTOMATISCHE WARTUNG ===== class DatabaseMaintenanceScheduler: """ Plant und führt automatische Wartungsaufgaben durch. """ def __init__(self): self.backup_manager = DatabaseBackupManager() self.monitor = DatabaseMonitor() self._running = False self._thread = None def start_maintenance_scheduler(self): """Startet den Wartungs-Scheduler.""" if self._running: return self._running = True self._thread = threading.Thread(target=self._maintenance_loop, daemon=True) self._thread.start() logger.info("Datenbank-Wartungs-Scheduler gestartet") def stop_maintenance_scheduler(self): """Stoppt den Wartungs-Scheduler.""" self._running = False if self._thread: self._thread.join(timeout=5) logger.info("Datenbank-Wartungs-Scheduler gestoppt") def _maintenance_loop(self): """Hauptschleife für Wartungsaufgaben.""" last_backup = datetime.now() last_cleanup = datetime.now() last_optimization = datetime.now() while self._running: try: now = datetime.now() # Tägliches Backup (alle 24 Stunden) if (now - last_backup).total_seconds() > 86400: # 24 Stunden self.backup_manager.create_backup() last_backup = now # Wöchentliche Bereinigung alter Backups (alle 7 Tage) if (now - last_cleanup).total_seconds() > 604800: # 7 Tage self.backup_manager.cleanup_old_backups() last_cleanup = now # Tägliche Optimierung (alle 24 Stunden) if (now - last_optimization).total_seconds() > 86400: # 24 Stunden self.monitor.optimize_database() last_optimization = now # 1 Stunde warten bis zum nächsten Check time.sleep(3600) except Exception as e: logger.error(f"Fehler im Wartungs-Scheduler: {str(e)}") time.sleep(300) # 5 Minuten warten bei Fehlern # ===== GLOBALE INSTANZEN ===== # Globale Instanzen für einfachen Zugriff backup_manager = DatabaseBackupManager() database_monitor = DatabaseMonitor() maintenance_scheduler = DatabaseMaintenanceScheduler() # Automatisch starten maintenance_scheduler.start_maintenance_scheduler()