425 lines
16 KiB
Python
425 lines
16 KiB
Python
"""
|
|
Erweiterte Datenbank-Utilities für Backup, Monitoring und Wartung.
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
import gzip
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.engine import Engine
|
|
|
|
from config.settings import DATABASE_PATH
|
|
from utils.logging_config import get_logger
|
|
from models import get_cached_session, create_optimized_engine
|
|
|
|
logger = get_logger("database")
|
|
|
|
# ===== BACKUP-SYSTEM =====
|
|
|
|
class DatabaseBackupManager:
|
|
"""
|
|
Verwaltet automatische Datenbank-Backups mit Rotation.
|
|
"""
|
|
|
|
def __init__(self, backup_dir: str = None):
|
|
self.backup_dir = backup_dir or os.path.join(os.path.dirname(DATABASE_PATH), "backups")
|
|
self.ensure_backup_directory()
|
|
self._backup_lock = threading.Lock()
|
|
|
|
def ensure_backup_directory(self):
|
|
"""Stellt sicher, dass das Backup-Verzeichnis existiert."""
|
|
Path(self.backup_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
def create_backup(self, compress: bool = True) -> str:
|
|
"""
|
|
Erstellt ein Backup der Datenbank.
|
|
|
|
Args:
|
|
compress: Ob das Backup komprimiert werden soll
|
|
|
|
Returns:
|
|
str: Pfad zum erstellten Backup
|
|
"""
|
|
with self._backup_lock:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_filename = f"myp_backup_{timestamp}.db"
|
|
|
|
if compress:
|
|
backup_filename += ".gz"
|
|
|
|
backup_path = os.path.join(self.backup_dir, backup_filename)
|
|
|
|
try:
|
|
if compress:
|
|
# Komprimiertes Backup erstellen
|
|
with open(DATABASE_PATH, 'rb') as f_in:
|
|
with gzip.open(backup_path, 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
else:
|
|
# Einfache Kopie
|
|
shutil.copy2(DATABASE_PATH, backup_path)
|
|
|
|
logger.info(f"Datenbank-Backup erstellt: {backup_path}")
|
|
return backup_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Erstellen des Backups: {str(e)}")
|
|
raise
|
|
|
|
def restore_backup(self, backup_path: str) -> bool:
|
|
"""
|
|
Stellt ein Backup wieder her.
|
|
|
|
Args:
|
|
backup_path: Pfad zum Backup
|
|
|
|
Returns:
|
|
bool: True bei Erfolg
|
|
"""
|
|
with self._backup_lock:
|
|
try:
|
|
# Aktuelles Backup der bestehenden DB erstellen
|
|
current_backup = self.create_backup()
|
|
logger.info(f"Sicherheitsbackup erstellt: {current_backup}")
|
|
|
|
if backup_path.endswith('.gz'):
|
|
# Komprimiertes Backup wiederherstellen
|
|
with gzip.open(backup_path, 'rb') as f_in:
|
|
with open(DATABASE_PATH, 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
else:
|
|
# Einfache Kopie
|
|
shutil.copy2(backup_path, DATABASE_PATH)
|
|
|
|
logger.info(f"Datenbank aus Backup wiederhergestellt: {backup_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Wiederherstellen des Backups: {str(e)}")
|
|
return False
|
|
|
|
def cleanup_old_backups(self, keep_days: int = 30):
|
|
"""
|
|
Löscht alte Backups.
|
|
|
|
Args:
|
|
keep_days: Anzahl Tage, die Backups aufbewahrt werden sollen
|
|
"""
|
|
cutoff_date = datetime.now() - timedelta(days=keep_days)
|
|
deleted_count = 0
|
|
|
|
try:
|
|
for filename in os.listdir(self.backup_dir):
|
|
if filename.startswith("myp_backup_"):
|
|
file_path = os.path.join(self.backup_dir, filename)
|
|
file_time = datetime.fromtimestamp(os.path.getctime(file_path))
|
|
|
|
if file_time < cutoff_date:
|
|
os.remove(file_path)
|
|
deleted_count += 1
|
|
logger.info(f"Altes Backup gelöscht: {filename}")
|
|
|
|
if deleted_count > 0:
|
|
logger.info(f"{deleted_count} alte Backups gelöscht")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Bereinigen alter Backups: {str(e)}")
|
|
|
|
def get_backup_list(self) -> List[Dict]:
|
|
"""
|
|
Gibt eine Liste aller verfügbaren Backups zurück.
|
|
|
|
Returns:
|
|
List[Dict]: Liste mit Backup-Informationen
|
|
"""
|
|
backups = []
|
|
|
|
try:
|
|
for filename in os.listdir(self.backup_dir):
|
|
if filename.startswith("myp_backup_"):
|
|
file_path = os.path.join(self.backup_dir, filename)
|
|
file_stat = os.stat(file_path)
|
|
|
|
backups.append({
|
|
"filename": filename,
|
|
"path": file_path,
|
|
"size": file_stat.st_size,
|
|
"created": datetime.fromtimestamp(file_stat.st_ctime),
|
|
"compressed": filename.endswith('.gz')
|
|
})
|
|
|
|
# Nach Erstellungsdatum sortieren (neueste zuerst)
|
|
backups.sort(key=lambda x: x['created'], reverse=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der Backup-Liste: {str(e)}")
|
|
|
|
return backups
|
|
|
|
|
|
# ===== DATENBANK-MONITORING =====
|
|
|
|
class DatabaseMonitor:
|
|
"""
|
|
Überwacht die Datenbank-Performance und -Gesundheit.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.engine = create_optimized_engine()
|
|
|
|
def get_database_stats(self) -> Dict:
|
|
"""
|
|
Sammelt Datenbank-Statistiken.
|
|
|
|
Returns:
|
|
Dict: Datenbank-Statistiken
|
|
"""
|
|
stats = {}
|
|
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
# Datenbankgröße
|
|
result = conn.execute(text("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()"))
|
|
db_size = result.fetchone()[0]
|
|
stats['database_size_bytes'] = db_size
|
|
stats['database_size_mb'] = round(db_size / (1024 * 1024), 2)
|
|
|
|
# WAL-Datei-Größe
|
|
wal_path = DATABASE_PATH + "-wal"
|
|
if os.path.exists(wal_path):
|
|
wal_size = os.path.getsize(wal_path)
|
|
stats['wal_size_bytes'] = wal_size
|
|
stats['wal_size_mb'] = round(wal_size / (1024 * 1024), 2)
|
|
else:
|
|
stats['wal_size_bytes'] = 0
|
|
stats['wal_size_mb'] = 0
|
|
|
|
# Journal-Modus
|
|
result = conn.execute(text("PRAGMA journal_mode"))
|
|
stats['journal_mode'] = result.fetchone()[0]
|
|
|
|
# Cache-Statistiken
|
|
result = conn.execute(text("PRAGMA cache_size"))
|
|
stats['cache_size'] = result.fetchone()[0]
|
|
|
|
# Synchronous-Modus
|
|
result = conn.execute(text("PRAGMA synchronous"))
|
|
stats['synchronous_mode'] = result.fetchone()[0]
|
|
|
|
# Tabellen-Statistiken
|
|
result = conn.execute(text("""
|
|
SELECT name,
|
|
(SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=m.name) as table_count
|
|
FROM sqlite_master m WHERE type='table'
|
|
"""))
|
|
|
|
table_stats = {}
|
|
for table_name, _ in result.fetchall():
|
|
if not table_name.startswith('sqlite_'):
|
|
count_result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
|
|
table_stats[table_name] = count_result.fetchone()[0]
|
|
|
|
stats['table_counts'] = table_stats
|
|
|
|
# Letzte Wartung
|
|
stats['last_analyze'] = self._get_last_analyze_time()
|
|
stats['last_vacuum'] = self._get_last_vacuum_time()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Sammeln der Datenbank-Statistiken: {str(e)}")
|
|
stats['error'] = str(e)
|
|
|
|
return stats
|
|
|
|
def _get_last_analyze_time(self) -> Optional[str]:
|
|
"""Ermittelt den Zeitpunkt der letzten ANALYZE-Operation."""
|
|
try:
|
|
# SQLite speichert keine direkten Timestamps für ANALYZE
|
|
# Wir verwenden die Modifikationszeit der Statistik-Tabellen
|
|
stat_path = DATABASE_PATH + "-stat"
|
|
if os.path.exists(stat_path):
|
|
return datetime.fromtimestamp(os.path.getmtime(stat_path)).isoformat()
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def _get_last_vacuum_time(self) -> Optional[str]:
|
|
"""Ermittelt den Zeitpunkt der letzten VACUUM-Operation."""
|
|
try:
|
|
# Approximation über Datei-Modifikationszeit
|
|
return datetime.fromtimestamp(os.path.getmtime(DATABASE_PATH)).isoformat()
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def check_database_health(self) -> Dict:
|
|
"""
|
|
Führt eine Gesundheitsprüfung der Datenbank durch.
|
|
|
|
Returns:
|
|
Dict: Gesundheitsstatus
|
|
"""
|
|
health = {
|
|
"status": "healthy",
|
|
"issues": [],
|
|
"recommendations": []
|
|
}
|
|
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
# Integritätsprüfung
|
|
result = conn.execute(text("PRAGMA integrity_check"))
|
|
integrity_result = result.fetchone()[0]
|
|
|
|
if integrity_result != "ok":
|
|
health["status"] = "critical"
|
|
health["issues"].append(f"Integritätsprüfung fehlgeschlagen: {integrity_result}")
|
|
|
|
# WAL-Dateigröße prüfen
|
|
wal_path = DATABASE_PATH + "-wal"
|
|
if os.path.exists(wal_path):
|
|
wal_size_mb = os.path.getsize(wal_path) / (1024 * 1024)
|
|
if wal_size_mb > 100: # Über 100MB
|
|
health["issues"].append(f"WAL-Datei sehr groß: {wal_size_mb:.1f}MB")
|
|
health["recommendations"].append("WAL-Checkpoint durchführen")
|
|
|
|
# Freier Speicherplatz prüfen
|
|
db_dir = os.path.dirname(DATABASE_PATH)
|
|
free_space = shutil.disk_usage(db_dir).free / (1024 * 1024 * 1024) # GB
|
|
|
|
if free_space < 1: # Weniger als 1GB
|
|
health["status"] = "warning" if health["status"] == "healthy" else health["status"]
|
|
health["issues"].append(f"Wenig freier Speicherplatz: {free_space:.1f}GB")
|
|
health["recommendations"].append("Speicherplatz freigeben oder alte Backups löschen")
|
|
|
|
# Connection Pool Status (falls verfügbar)
|
|
# Hier könnten weitere Checks hinzugefügt werden
|
|
|
|
except Exception as e:
|
|
health["status"] = "error"
|
|
health["issues"].append(f"Fehler bei Gesundheitsprüfung: {str(e)}")
|
|
logger.error(f"Fehler bei Datenbank-Gesundheitsprüfung: {str(e)}")
|
|
|
|
return health
|
|
|
|
def optimize_database(self) -> Dict:
|
|
"""
|
|
Führt Optimierungsoperationen auf der Datenbank durch.
|
|
|
|
Returns:
|
|
Dict: Ergebnis der Optimierung
|
|
"""
|
|
result = {
|
|
"operations": [],
|
|
"success": True,
|
|
"errors": []
|
|
}
|
|
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
# ANALYZE für bessere Query-Planung
|
|
conn.execute(text("ANALYZE"))
|
|
result["operations"].append("ANALYZE ausgeführt")
|
|
|
|
# WAL-Checkpoint
|
|
checkpoint_result = conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
|
|
checkpoint_info = checkpoint_result.fetchone()
|
|
result["operations"].append(f"WAL-Checkpoint: {checkpoint_info}")
|
|
|
|
# Incremental Vacuum
|
|
conn.execute(text("PRAGMA incremental_vacuum"))
|
|
result["operations"].append("Incremental Vacuum ausgeführt")
|
|
|
|
# Optimize Pragma
|
|
conn.execute(text("PRAGMA optimize"))
|
|
result["operations"].append("PRAGMA optimize ausgeführt")
|
|
|
|
conn.commit()
|
|
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["errors"].append(str(e))
|
|
logger.error(f"Fehler bei Datenbank-Optimierung: {str(e)}")
|
|
|
|
return result
|
|
|
|
|
|
# ===== AUTOMATISCHE WARTUNG =====
|
|
|
|
class DatabaseMaintenanceScheduler:
|
|
"""
|
|
Plant und führt automatische Wartungsaufgaben durch.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.backup_manager = DatabaseBackupManager()
|
|
self.monitor = DatabaseMonitor()
|
|
self._running = False
|
|
self._thread = None
|
|
|
|
def start_maintenance_scheduler(self):
|
|
"""Startet den Wartungs-Scheduler."""
|
|
if self._running:
|
|
return
|
|
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._maintenance_loop, daemon=True)
|
|
self._thread.start()
|
|
logger.info("Datenbank-Wartungs-Scheduler gestartet")
|
|
|
|
def stop_maintenance_scheduler(self):
|
|
"""Stoppt den Wartungs-Scheduler."""
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
logger.info("Datenbank-Wartungs-Scheduler gestoppt")
|
|
|
|
def _maintenance_loop(self):
|
|
"""Hauptschleife für Wartungsaufgaben."""
|
|
last_backup = datetime.now()
|
|
last_cleanup = datetime.now()
|
|
last_optimization = datetime.now()
|
|
|
|
while self._running:
|
|
try:
|
|
now = datetime.now()
|
|
|
|
# Tägliches Backup (alle 24 Stunden)
|
|
if (now - last_backup).total_seconds() > 86400: # 24 Stunden
|
|
self.backup_manager.create_backup()
|
|
last_backup = now
|
|
|
|
# Wöchentliche Bereinigung alter Backups (alle 7 Tage)
|
|
if (now - last_cleanup).total_seconds() > 604800: # 7 Tage
|
|
self.backup_manager.cleanup_old_backups()
|
|
last_cleanup = now
|
|
|
|
# Tägliche Optimierung (alle 24 Stunden)
|
|
if (now - last_optimization).total_seconds() > 86400: # 24 Stunden
|
|
self.monitor.optimize_database()
|
|
last_optimization = now
|
|
|
|
# 1 Stunde warten bis zum nächsten Check
|
|
time.sleep(3600)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler im Wartungs-Scheduler: {str(e)}")
|
|
time.sleep(300) # 5 Minuten warten bei Fehlern
|
|
|
|
|
|
# ===== GLOBALE INSTANZEN =====
|
|
|
|
# Globale Instanzen für einfachen Zugriff
|
|
backup_manager = DatabaseBackupManager()
|
|
database_monitor = DatabaseMonitor()
|
|
maintenance_scheduler = DatabaseMaintenanceScheduler()
|
|
|
|
# Automatisch starten
|
|
maintenance_scheduler.start_maintenance_scheduler() |