Files
Projektarbeit-MYP/backend/utils/database_core.py

772 lines
28 KiB
Python

"""
Zentralisierte Datenbank-Operationen für das MYP System
Konsolidierte Implementierung aller datenbankbezogenen Funktionen:
- CRUD-Operationen (ursprünglich db_manager.py)
- Backup-Verwaltung (ursprünglich database_utils.py)
- Cleanup-Operationen (ursprünglich database_cleanup.py)
- Einheitliches Session-Management
Optimierungen:
- Intelligente Session-Factory basierend auf Operationstyp
- Zentrale Engine-Registry für verschiedene Anwendungsfälle
- Koordinierte Lock-Behandlung und Retry-Logik
- Vereinheitlichte Error-Handling-Patterns
Autor: MYP Team - Konsolidiert für IHK-Projektarbeit
Datum: 2025-06-09
"""
import os
import shutil
import sqlite3
import threading
import time
import gzip
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Union
from pathlib import Path
from contextlib import contextmanager
from sqlalchemy import text, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError, OperationalError
from utils.settings import DATABASE_PATH
from utils.logging_config import get_logger
from models import get_cached_session, create_optimized_engine, User, Printer, Job
# ===== ZENTRALER LOGGER =====
db_logger = get_logger("database_core")
# ===== ENGINE-REGISTRY =====
class EngineRegistry:
"""
Zentrale Registry für verschiedene Datenbank-Engine-Konfigurationen.
Vermeidet Duplikation und ermöglicht optimierte Engines für verschiedene Anwendungsfälle.
"""
def __init__(self):
self.engines: Dict[str, Engine] = {}
self._lock = threading.RLock()
def get_engine(self, engine_type: str = 'default') -> Engine:
"""
Holt oder erstellt eine Engine basierend auf dem Typ.
Args:
engine_type: Art der Engine ('default', 'cleanup', 'monitoring', 'backup')
Returns:
Engine: Konfigurierte SQLAlchemy Engine
"""
with self._lock:
if engine_type not in self.engines:
self.engines[engine_type] = self._create_engine(engine_type)
return self.engines[engine_type]
def _create_engine(self, engine_type: str) -> Engine:
"""Erstellt optimierte Engine basierend auf Anwendungsfall"""
base_url = f"sqlite:///{DATABASE_PATH}"
if engine_type == 'default':
# Standard-Engine für CRUD-Operationen
return create_optimized_engine()
elif engine_type == 'cleanup':
# Engine für Cleanup-Operationen mit aggressiven Timeouts
return create_engine(
base_url,
pool_timeout=1.0,
pool_recycle=300,
pool_pre_ping=True,
connect_args={
'timeout': 5,
'check_same_thread': False,
'isolation_level': None # Autocommit für Cleanup
}
)
elif engine_type == 'monitoring':
# Engine für Monitoring mit minimaler Blockierung
return create_engine(
base_url,
pool_timeout=0.5,
pool_recycle=60,
connect_args={
'timeout': 2,
'check_same_thread': False
}
)
elif engine_type == 'backup':
# Engine für Backup-Operationen mit längeren Timeouts
return create_engine(
base_url,
pool_timeout=30.0,
pool_recycle=3600,
connect_args={
'timeout': 30,
'check_same_thread': False
}
)
else:
db_logger.warning(f"Unknown engine type '{engine_type}', using default")
return create_optimized_engine()
def dispose_all(self):
"""Schließt alle registrierten Engines"""
with self._lock:
for engine_type, engine in self.engines.items():
try:
engine.dispose()
db_logger.debug(f"Engine '{engine_type}' disposed successfully")
except Exception as e:
db_logger.warning(f"Error disposing engine '{engine_type}': {e}")
self.engines.clear()
# Globale Engine-Registry
engine_registry = EngineRegistry()
# ===== SESSION-MANAGEMENT =====
@contextmanager
def get_database_session(operation_type: str = 'default'):
"""
Intelligenter Session-Manager basierend auf Operationstyp.
Args:
operation_type: Art der Operation ('default', 'cleanup', 'monitoring', 'backup', 'cached')
Yields:
Session: Konfigurierte SQLAlchemy Session
"""
if operation_type == 'cached':
# Verwende das bestehende Cached-Session-System für Standard-CRUD
session = get_cached_session()
try:
yield session
finally:
# Cached Sessions werden automatisch verwaltet
pass
else:
# Erstelle neue Session für spezielle Operationen
engine = engine_registry.get_engine(operation_type)
SessionClass = sessionmaker(bind=engine)
session = SessionClass()
try:
yield session
except Exception as e:
try:
session.rollback()
db_logger.error(f"Session rollback for {operation_type}: {e}")
except Exception as rollback_error:
db_logger.error(f"Session rollback failed for {operation_type}: {rollback_error}")
raise
finally:
try:
session.close()
except Exception as close_error:
db_logger.warning(f"Session close failed for {operation_type}: {close_error}")
# ===== CLEANUP-OPERATIONEN =====
class DatabaseCleanupManager:
"""
Robuste Cleanup-Operationen mit intelligenter Retry-Logik.
Konsolidiert Funktionalität aus database_cleanup.py.
"""
def __init__(self):
self.cleanup_logger = get_logger("database_cleanup")
self._registered_engines = set()
def register_engine_for_cleanup(self, engine: Engine):
"""Registriert Engine für Cleanup bei WAL-Operationen"""
self._registered_engines.add(engine)
def force_close_all_connections(self):
"""Schließt alle offenen Datenbankverbindungen forciert"""
try:
# Standard-Engine-Registry schließen
engine_registry.dispose_all()
# Registrierte Engines schließen
for engine in self._registered_engines:
try:
engine.dispose()
except Exception as e:
self.cleanup_logger.warning(f"Failed to dispose registered engine: {e}")
self._registered_engines.clear()
# Warten auf Verbindungsschließung
time.sleep(0.5)
self.cleanup_logger.info("All database connections forcefully closed")
except Exception as e:
self.cleanup_logger.error(f"Error during connection cleanup: {e}")
def perform_wal_checkpoint(self, retries: int = 3) -> bool:
"""
Führt WAL-Checkpoint mit Retry-Logik durch.
Args:
retries: Anzahl der Wiederholungsversuche
Returns:
bool: True wenn erfolgreich
"""
for attempt in range(retries):
try:
if attempt > 0:
self.force_close_all_connections()
time.sleep(1.0 * attempt) # Exponential backoff
# Direkte SQLite3-Verbindung für maximale Kontrolle
conn = sqlite3.connect(DATABASE_PATH, timeout=10.0)
cursor = conn.cursor()
try:
# WAL-Checkpoint durchführen
cursor.execute("PRAGMA wal_checkpoint(TRUNCATE)")
result = cursor.fetchone()
conn.commit()
conn.close()
self.cleanup_logger.info(f"WAL checkpoint successful on attempt {attempt + 1}: {result}")
return True
except sqlite3.OperationalError as e:
conn.close()
if "database is locked" in str(e).lower() and attempt < retries - 1:
self.cleanup_logger.warning(f"Database locked on attempt {attempt + 1}, retrying...")
continue
else:
raise
except Exception as e:
self.cleanup_logger.error(f"WAL checkpoint attempt {attempt + 1} failed: {e}")
if attempt == retries - 1:
return False
return False
def switch_journal_mode(self, mode: str = "WAL") -> bool:
"""
Wechselt den Journal-Modus der Datenbank.
Args:
mode: Journal-Modus ('WAL', 'DELETE', 'TRUNCATE', etc.)
Returns:
bool: True wenn erfolgreich
"""
try:
self.force_close_all_connections()
time.sleep(1.0)
conn = sqlite3.connect(DATABASE_PATH, timeout=15.0)
cursor = conn.cursor()
try:
cursor.execute(f"PRAGMA journal_mode = {mode}")
result = cursor.fetchone()
conn.commit()
conn.close()
self.cleanup_logger.info(f"Journal mode switched to {mode}: {result}")
return True
except Exception as e:
conn.close()
self.cleanup_logger.error(f"Failed to switch journal mode to {mode}: {e}")
return False
except Exception as e:
self.cleanup_logger.error(f"Error during journal mode switch: {e}")
return False
# ===== BACKUP-OPERATIONEN =====
class DatabaseBackupManager:
"""
Erweiterte Backup-Verwaltung mit automatischer Rotation.
Konsolidiert Funktionalität aus database_utils.py.
"""
def __init__(self, backup_dir: str = None):
self.backup_dir = backup_dir or os.path.join(os.path.dirname(DATABASE_PATH), "backups")
self.backup_logger = get_logger("database_backup")
self.ensure_backup_directory()
self._backup_lock = threading.Lock()
def ensure_backup_directory(self):
"""Stellt sicher, dass das Backup-Verzeichnis existiert"""
Path(self.backup_dir).mkdir(parents=True, exist_ok=True)
def create_backup(self, compress: bool = True) -> str:
"""
Erstellt ein Backup der Datenbank.
Args:
compress: Ob das Backup komprimiert werden soll
Returns:
str: Pfad zum erstellten Backup
"""
with self._backup_lock:
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
extension = '.gz' if compress else '.db'
backup_filename = f"myp_backup_{timestamp}.db{extension}"
backup_path = os.path.join(self.backup_dir, backup_filename)
# Checkpoint vor Backup
cleanup_manager = DatabaseCleanupManager()
cleanup_manager.perform_wal_checkpoint()
if compress:
# Komprimiertes Backup
with open(DATABASE_PATH, 'rb') as f_in:
with gzip.open(backup_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
# Einfache Kopie
shutil.copy2(DATABASE_PATH, backup_path)
backup_size = os.path.getsize(backup_path)
self.backup_logger.info(f"Backup created: {backup_filename} ({backup_size / 1024 / 1024:.2f} MB)")
return backup_path
except Exception as e:
self.backup_logger.error(f"Backup creation failed: {e}")
raise
def list_backups(self) -> List[Dict[str, Any]]:
"""
Listet alle verfügbaren Backups auf.
Returns:
List[Dict]: Liste der Backup-Informationen
"""
try:
backups = []
backup_pattern = "myp_backup_*.db*"
for backup_file in Path(self.backup_dir).glob(backup_pattern):
stat = backup_file.stat()
backups.append({
'filename': backup_file.name,
'path': str(backup_file),
'size_bytes': stat.st_size,
'size_mb': round(stat.st_size / 1024 / 1024, 2),
'created_at': datetime.fromtimestamp(stat.st_ctime),
'compressed': backup_file.suffix == '.gz'
})
# Sortiere nach Datum (neueste zuerst)
backups.sort(key=lambda x: x['created_at'], reverse=True)
return backups
except Exception as e:
self.backup_logger.error(f"Error listing backups: {e}")
return []
def cleanup_old_backups(self, keep_count: int = 10) -> int:
"""
Räumt alte Backups auf und behält nur die neuesten.
Args:
keep_count: Anzahl der zu behaltenden Backups
Returns:
int: Anzahl der gelöschten Backups
"""
try:
backups = self.list_backups()
if len(backups) <= keep_count:
return 0
backups_to_delete = backups[keep_count:]
deleted_count = 0
for backup in backups_to_delete:
try:
os.remove(backup['path'])
deleted_count += 1
self.backup_logger.debug(f"Deleted old backup: {backup['filename']}")
except Exception as e:
self.backup_logger.warning(f"Failed to delete backup {backup['filename']}: {e}")
self.backup_logger.info(f"Cleaned up {deleted_count} old backups, kept {keep_count}")
return deleted_count
except Exception as e:
self.backup_logger.error(f"Error during backup cleanup: {e}")
return 0
def restore_backup(self, backup_path: str) -> bool:
"""
Stellt ein Backup wieder her.
Args:
backup_path: Pfad zur Backup-Datei
Returns:
bool: True wenn erfolgreich
"""
try:
if not os.path.exists(backup_path):
self.backup_logger.error(f"Backup file not found: {backup_path}")
return False
# Verbindungen schließen
cleanup_manager = DatabaseCleanupManager()
cleanup_manager.force_close_all_connections()
time.sleep(2.0)
# Aktueller Datenbank-Backup erstellen
current_backup = self.create_backup(compress=True)
self.backup_logger.info(f"Current database backed up to: {current_backup}")
# Backup wiederherstellen
if backup_path.endswith('.gz'):
# Komprimiertes Backup entpacken
with gzip.open(backup_path, 'rb') as f_in:
with open(DATABASE_PATH, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
# Einfache Kopie
shutil.copy2(backup_path, DATABASE_PATH)
self.backup_logger.info(f"Database restored from: {backup_path}")
return True
except Exception as e:
self.backup_logger.error(f"Backup restoration failed: {e}")
return False
# ===== CRUD-OPERATIONEN =====
class DatabaseCRUDManager:
"""
Geschäftslogik-orientierte CRUD-Operationen.
Konsolidiert Funktionalität aus db_manager.py.
"""
def __init__(self):
self.crud_logger = get_logger("database_crud")
def get_active_jobs(self, limit: int = None) -> List[Job]:
"""
Holt aktive Jobs mit optimiertem Loading.
Args:
limit: Maximale Anzahl Jobs
Returns:
List[Job]: Liste der aktiven Jobs
"""
try:
with get_database_session('cached') as session:
query = session.query(Job).filter(
Job.status.in_(['pending', 'printing', 'paused'])
).order_by(Job.created_at.desc())
if limit:
query = query.limit(limit)
jobs = query.all()
self.crud_logger.debug(f"Retrieved {len(jobs)} active jobs")
return jobs
except Exception as e:
self.crud_logger.error(f"Error retrieving active jobs: {e}")
return []
def get_printer_with_jobs(self, printer_id: int) -> Optional[Printer]:
"""
Holt Drucker mit zugehörigen Jobs (Eager Loading).
Args:
printer_id: ID des Druckers
Returns:
Optional[Printer]: Drucker mit Jobs oder None
"""
try:
with get_database_session('cached') as session:
from sqlalchemy.orm import joinedload
printer = session.query(Printer).options(
joinedload(Printer.jobs)
).filter(Printer.id == printer_id).first()
if printer:
self.crud_logger.debug(f"Retrieved printer {printer.name} with {len(printer.jobs)} jobs")
return printer
except Exception as e:
self.crud_logger.error(f"Error retrieving printer with jobs: {e}")
return None
def get_user_job_statistics(self, user_id: int) -> Dict[str, Any]:
"""
Holt Benutzer-Job-Statistiken.
Args:
user_id: ID des Benutzers
Returns:
Dict: Statistiken des Benutzers
"""
try:
with get_database_session('cached') as session:
user = session.query(User).filter(User.id == user_id).first()
if not user:
return {}
# Job-Statistiken berechnen
total_jobs = session.query(Job).filter(Job.user_id == user_id).count()
completed_jobs = session.query(Job).filter(
Job.user_id == user_id, Job.status == 'completed'
).count()
active_jobs = session.query(Job).filter(
Job.user_id == user_id, Job.status.in_(['pending', 'printing', 'paused'])
).count()
stats = {
'user_id': user_id,
'username': user.username,
'total_jobs': total_jobs,
'completed_jobs': completed_jobs,
'active_jobs': active_jobs,
'success_rate': round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0
}
self.crud_logger.debug(f"Generated statistics for user {user.username}")
return stats
except Exception as e:
self.crud_logger.error(f"Error generating user statistics: {e}")
return {}
# ===== MONITORING-OPERATIONEN =====
class DatabaseMonitor:
"""
Performance-Überwachung und Gesundheitsprüfungen.
Erweitert Funktionalität aus database_utils.py.
"""
def __init__(self):
self.monitor_logger = get_logger("database_monitor")
def get_database_health_check(self) -> Dict[str, Any]:
"""
Umfassende Gesundheitsprüfung der Datenbank.
Returns:
Dict: Gesundheitsstatus der Datenbank
"""
health_status = {
'timestamp': datetime.now().isoformat(),
'overall_status': 'unknown',
'checks': {}
}
try:
with get_database_session('monitoring') as session:
# 1. Verbindungstest
try:
session.execute(text("SELECT 1"))
health_status['checks']['connection'] = {'status': 'ok', 'message': 'Database connection successful'}
except Exception as e:
health_status['checks']['connection'] = {'status': 'error', 'message': str(e)}
# 2. Integritätsprüfung
try:
result = session.execute(text("PRAGMA integrity_check")).fetchone()
integrity_ok = result and result[0] == 'ok'
health_status['checks']['integrity'] = {
'status': 'ok' if integrity_ok else 'warning',
'message': result[0] if result else 'No integrity result'
}
except Exception as e:
health_status['checks']['integrity'] = {'status': 'error', 'message': str(e)}
# 3. WAL-Status
try:
wal_result = session.execute(text("PRAGMA journal_mode")).fetchone()
wal_mode = wal_result[0] if wal_result else 'unknown'
health_status['checks']['wal_mode'] = {
'status': 'ok' if wal_mode == 'wal' else 'info',
'message': f'Journal mode: {wal_mode}'
}
except Exception as e:
health_status['checks']['wal_mode'] = {'status': 'error', 'message': str(e)}
# 4. Datenbankgröße
try:
if os.path.exists(DATABASE_PATH):
db_size = os.path.getsize(DATABASE_PATH)
health_status['checks']['database_size'] = {
'status': 'ok',
'message': f'Database size: {db_size / 1024 / 1024:.2f} MB',
'size_bytes': db_size
}
except Exception as e:
health_status['checks']['database_size'] = {'status': 'error', 'message': str(e)}
# Gesamtstatus bestimmen
statuses = [check['status'] for check in health_status['checks'].values()]
if 'error' in statuses:
health_status['overall_status'] = 'error'
elif 'warning' in statuses:
health_status['overall_status'] = 'warning'
else:
health_status['overall_status'] = 'ok'
except Exception as e:
health_status['overall_status'] = 'error'
health_status['error'] = str(e)
self.monitor_logger.error(f"Database health check failed: {e}")
return health_status
# ===== UNIFIED DATABASE SERVICE =====
class UnifiedDatabaseService:
"""
Zentrale Schnittstelle für alle Datenbankoperationen.
Kombiniert CRUD, Wartung, Cleanup und Monitoring.
"""
def __init__(self):
self.logger = get_logger("unified_database")
self.crud = DatabaseCRUDManager()
self.backup = DatabaseBackupManager()
self.cleanup = DatabaseCleanupManager()
self.monitor = DatabaseMonitor()
# Engines für Cleanup registrieren
for engine_type in ['default', 'monitoring', 'backup']:
engine = engine_registry.get_engine(engine_type)
self.cleanup.register_engine_for_cleanup(engine)
def get_service_status(self) -> Dict[str, Any]:
"""
Holt den Status aller Datenbankdienste.
Returns:
Dict: Umfassender Service-Status
"""
try:
health_check = self.monitor.get_database_health_check()
backups = self.backup.list_backups()
return {
'timestamp': datetime.now().isoformat(),
'health': health_check,
'backups': {
'count': len(backups),
'latest': backups[0] if backups else None
},
'engines': {
'registered_count': len(engine_registry.engines),
'types': list(engine_registry.engines.keys())
}
}
except Exception as e:
self.logger.error(f"Error getting service status: {e}")
return {'error': str(e), 'timestamp': datetime.now().isoformat()}
def perform_maintenance(self) -> Dict[str, Any]:
"""
Führt umfassende Datenbankwartung durch.
Returns:
Dict: Wartungsergebnisse
"""
maintenance_results = {
'timestamp': datetime.now().isoformat(),
'operations': {}
}
try:
# 1. WAL-Checkpoint
self.logger.info("Starting WAL checkpoint...")
checkpoint_success = self.cleanup.perform_wal_checkpoint()
maintenance_results['operations']['wal_checkpoint'] = {
'success': checkpoint_success,
'message': 'WAL checkpoint completed' if checkpoint_success else 'WAL checkpoint failed'
}
# 2. Backup erstellen
self.logger.info("Creating maintenance backup...")
try:
backup_path = self.backup.create_backup(compress=True)
maintenance_results['operations']['backup'] = {
'success': True,
'message': f'Backup created: {os.path.basename(backup_path)}',
'path': backup_path
}
except Exception as e:
maintenance_results['operations']['backup'] = {
'success': False,
'message': f'Backup failed: {str(e)}'
}
# 3. Alte Backups aufräumen
self.logger.info("Cleaning up old backups...")
try:
deleted_count = self.backup.cleanup_old_backups(keep_count=10)
maintenance_results['operations']['backup_cleanup'] = {
'success': True,
'message': f'Cleaned up {deleted_count} old backups'
}
except Exception as e:
maintenance_results['operations']['backup_cleanup'] = {
'success': False,
'message': f'Backup cleanup failed: {str(e)}'
}
# 4. Gesundheitsprüfung
self.logger.info("Performing health check...")
health_check = self.monitor.get_database_health_check()
maintenance_results['health_check'] = health_check
# Gesamtergebnis
operation_results = [op['success'] for op in maintenance_results['operations'].values()]
maintenance_results['overall_success'] = all(operation_results)
self.logger.info(f"Maintenance completed with overall success: {maintenance_results['overall_success']}")
except Exception as e:
self.logger.error(f"Maintenance operation failed: {e}")
maintenance_results['error'] = str(e)
maintenance_results['overall_success'] = False
return maintenance_results
# ===== GLOBALE INSTANZ =====
# Zentrale Datenbankdienst-Instanz
database_service = UnifiedDatabaseService()
# Cleanup-Manager für Legacy-Kompatibilität
cleanup_manager = database_service.cleanup
# Backup-Manager für Legacy-Kompatibilität
backup_manager = database_service.backup