""" Zentralisierte Datenbank-Operationen für das MYP System Konsolidierte Implementierung aller datenbankbezogenen Funktionen: - CRUD-Operationen (ursprünglich db_manager.py) - Backup-Verwaltung (ursprünglich database_utils.py) - Cleanup-Operationen (ursprünglich database_cleanup.py) - Einheitliches Session-Management Optimierungen: - Intelligente Session-Factory basierend auf Operationstyp - Zentrale Engine-Registry für verschiedene Anwendungsfälle - Koordinierte Lock-Behandlung und Retry-Logik - Vereinheitlichte Error-Handling-Patterns Autor: MYP Team - Konsolidiert für IHK-Projektarbeit Datum: 2025-06-09 """ import os import shutil import sqlite3 import threading import time import gzip from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Union from pathlib import Path from contextlib import contextmanager from sqlalchemy import text, create_engine from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.exc import SQLAlchemyError, OperationalError from utils.settings import DATABASE_PATH from utils.logging_config import get_logger from models import get_cached_session, create_optimized_engine, User, Printer, Job # ===== ZENTRALER LOGGER ===== db_logger = get_logger("database_core") # ===== ENGINE-REGISTRY ===== class EngineRegistry: """ Zentrale Registry für verschiedene Datenbank-Engine-Konfigurationen. Vermeidet Duplikation und ermöglicht optimierte Engines für verschiedene Anwendungsfälle. """ def __init__(self): self.engines: Dict[str, Engine] = {} self._lock = threading.RLock() def get_engine(self, engine_type: str = 'default') -> Engine: """ Holt oder erstellt eine Engine basierend auf dem Typ. Args: engine_type: Art der Engine ('default', 'cleanup', 'monitoring', 'backup') Returns: Engine: Konfigurierte SQLAlchemy Engine """ with self._lock: if engine_type not in self.engines: self.engines[engine_type] = self._create_engine(engine_type) return self.engines[engine_type] def _create_engine(self, engine_type: str) -> Engine: """Erstellt optimierte Engine basierend auf Anwendungsfall""" base_url = f"sqlite:///{DATABASE_PATH}" if engine_type == 'default': # Standard-Engine für CRUD-Operationen return create_optimized_engine() elif engine_type == 'cleanup': # Engine für Cleanup-Operationen mit aggressiven Timeouts return create_engine( base_url, pool_timeout=1.0, pool_recycle=300, pool_pre_ping=True, connect_args={ 'timeout': 5, 'check_same_thread': False, 'isolation_level': None # Autocommit für Cleanup } ) elif engine_type == 'monitoring': # Engine für Monitoring mit minimaler Blockierung return create_engine( base_url, pool_timeout=0.5, pool_recycle=60, connect_args={ 'timeout': 2, 'check_same_thread': False } ) elif engine_type == 'backup': # Engine für Backup-Operationen mit längeren Timeouts return create_engine( base_url, pool_timeout=30.0, pool_recycle=3600, connect_args={ 'timeout': 30, 'check_same_thread': False } ) else: db_logger.warning(f"Unknown engine type '{engine_type}', using default") return create_optimized_engine() def dispose_all(self): """Schließt alle registrierten Engines""" with self._lock: for engine_type, engine in self.engines.items(): try: engine.dispose() db_logger.debug(f"Engine '{engine_type}' disposed successfully") except Exception as e: db_logger.warning(f"Error disposing engine '{engine_type}': {e}") self.engines.clear() # Globale Engine-Registry engine_registry = EngineRegistry() # ===== SESSION-MANAGEMENT ===== @contextmanager def get_database_session(operation_type: str = 'default'): """ Intelligenter Session-Manager basierend auf Operationstyp. Args: operation_type: Art der Operation ('default', 'cleanup', 'monitoring', 'backup', 'cached') Yields: Session: Konfigurierte SQLAlchemy Session """ if operation_type == 'cached': # Verwende das bestehende Cached-Session-System für Standard-CRUD session = get_cached_session() try: yield session finally: # Cached Sessions werden automatisch verwaltet pass else: # Erstelle neue Session für spezielle Operationen engine = engine_registry.get_engine(operation_type) SessionClass = sessionmaker(bind=engine) session = SessionClass() try: yield session except Exception as e: try: session.rollback() db_logger.error(f"Session rollback for {operation_type}: {e}") except Exception as rollback_error: db_logger.error(f"Session rollback failed for {operation_type}: {rollback_error}") raise finally: try: session.close() except Exception as close_error: db_logger.warning(f"Session close failed for {operation_type}: {close_error}") # ===== CLEANUP-OPERATIONEN ===== class DatabaseCleanupManager: """ Robuste Cleanup-Operationen mit intelligenter Retry-Logik. Konsolidiert Funktionalität aus database_cleanup.py. """ def __init__(self): self.cleanup_logger = get_logger("database_cleanup") self._registered_engines = set() def register_engine_for_cleanup(self, engine: Engine): """Registriert Engine für Cleanup bei WAL-Operationen""" self._registered_engines.add(engine) def force_close_all_connections(self): """Schließt alle offenen Datenbankverbindungen forciert""" try: # Standard-Engine-Registry schließen engine_registry.dispose_all() # Registrierte Engines schließen for engine in self._registered_engines: try: engine.dispose() except Exception as e: self.cleanup_logger.warning(f"Failed to dispose registered engine: {e}") self._registered_engines.clear() # Warten auf Verbindungsschließung time.sleep(0.5) self.cleanup_logger.info("All database connections forcefully closed") except Exception as e: self.cleanup_logger.error(f"Error during connection cleanup: {e}") def perform_wal_checkpoint(self, retries: int = 3) -> bool: """ Führt WAL-Checkpoint mit Retry-Logik durch. Args: retries: Anzahl der Wiederholungsversuche Returns: bool: True wenn erfolgreich """ for attempt in range(retries): try: if attempt > 0: self.force_close_all_connections() time.sleep(1.0 * attempt) # Exponential backoff # Direkte SQLite3-Verbindung für maximale Kontrolle conn = sqlite3.connect(DATABASE_PATH, timeout=10.0) cursor = conn.cursor() try: # WAL-Checkpoint durchführen cursor.execute("PRAGMA wal_checkpoint(TRUNCATE)") result = cursor.fetchone() conn.commit() conn.close() self.cleanup_logger.info(f"WAL checkpoint successful on attempt {attempt + 1}: {result}") return True except sqlite3.OperationalError as e: conn.close() if "database is locked" in str(e).lower() and attempt < retries - 1: self.cleanup_logger.warning(f"Database locked on attempt {attempt + 1}, retrying...") continue else: raise except Exception as e: self.cleanup_logger.error(f"WAL checkpoint attempt {attempt + 1} failed: {e}") if attempt == retries - 1: return False return False def switch_journal_mode(self, mode: str = "WAL") -> bool: """ Wechselt den Journal-Modus der Datenbank. Args: mode: Journal-Modus ('WAL', 'DELETE', 'TRUNCATE', etc.) Returns: bool: True wenn erfolgreich """ try: self.force_close_all_connections() time.sleep(1.0) conn = sqlite3.connect(DATABASE_PATH, timeout=15.0) cursor = conn.cursor() try: cursor.execute(f"PRAGMA journal_mode = {mode}") result = cursor.fetchone() conn.commit() conn.close() self.cleanup_logger.info(f"Journal mode switched to {mode}: {result}") return True except Exception as e: conn.close() self.cleanup_logger.error(f"Failed to switch journal mode to {mode}: {e}") return False except Exception as e: self.cleanup_logger.error(f"Error during journal mode switch: {e}") return False # ===== BACKUP-OPERATIONEN ===== class DatabaseBackupManager: """ Erweiterte Backup-Verwaltung mit automatischer Rotation. Konsolidiert Funktionalität aus database_utils.py. """ def __init__(self, backup_dir: str = None): self.backup_dir = backup_dir or os.path.join(os.path.dirname(DATABASE_PATH), "backups") self.backup_logger = get_logger("database_backup") self.ensure_backup_directory() self._backup_lock = threading.Lock() def ensure_backup_directory(self): """Stellt sicher, dass das Backup-Verzeichnis existiert""" Path(self.backup_dir).mkdir(parents=True, exist_ok=True) def create_backup(self, compress: bool = True) -> str: """ Erstellt ein Backup der Datenbank. Args: compress: Ob das Backup komprimiert werden soll Returns: str: Pfad zum erstellten Backup """ with self._backup_lock: try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') extension = '.gz' if compress else '.db' backup_filename = f"myp_backup_{timestamp}.db{extension}" backup_path = os.path.join(self.backup_dir, backup_filename) # Checkpoint vor Backup cleanup_manager = DatabaseCleanupManager() cleanup_manager.perform_wal_checkpoint() if compress: # Komprimiertes Backup with open(DATABASE_PATH, 'rb') as f_in: with gzip.open(backup_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: # Einfache Kopie shutil.copy2(DATABASE_PATH, backup_path) backup_size = os.path.getsize(backup_path) self.backup_logger.info(f"Backup created: {backup_filename} ({backup_size / 1024 / 1024:.2f} MB)") return backup_path except Exception as e: self.backup_logger.error(f"Backup creation failed: {e}") raise def list_backups(self) -> List[Dict[str, Any]]: """ Listet alle verfügbaren Backups auf. Returns: List[Dict]: Liste der Backup-Informationen """ try: backups = [] backup_pattern = "myp_backup_*.db*" for backup_file in Path(self.backup_dir).glob(backup_pattern): stat = backup_file.stat() backups.append({ 'filename': backup_file.name, 'path': str(backup_file), 'size_bytes': stat.st_size, 'size_mb': round(stat.st_size / 1024 / 1024, 2), 'created_at': datetime.fromtimestamp(stat.st_ctime), 'compressed': backup_file.suffix == '.gz' }) # Sortiere nach Datum (neueste zuerst) backups.sort(key=lambda x: x['created_at'], reverse=True) return backups except Exception as e: self.backup_logger.error(f"Error listing backups: {e}") return [] def cleanup_old_backups(self, keep_count: int = 10) -> int: """ Räumt alte Backups auf und behält nur die neuesten. Args: keep_count: Anzahl der zu behaltenden Backups Returns: int: Anzahl der gelöschten Backups """ try: backups = self.list_backups() if len(backups) <= keep_count: return 0 backups_to_delete = backups[keep_count:] deleted_count = 0 for backup in backups_to_delete: try: os.remove(backup['path']) deleted_count += 1 self.backup_logger.debug(f"Deleted old backup: {backup['filename']}") except Exception as e: self.backup_logger.warning(f"Failed to delete backup {backup['filename']}: {e}") self.backup_logger.info(f"Cleaned up {deleted_count} old backups, kept {keep_count}") return deleted_count except Exception as e: self.backup_logger.error(f"Error during backup cleanup: {e}") return 0 def restore_backup(self, backup_path: str) -> bool: """ Stellt ein Backup wieder her. Args: backup_path: Pfad zur Backup-Datei Returns: bool: True wenn erfolgreich """ try: if not os.path.exists(backup_path): self.backup_logger.error(f"Backup file not found: {backup_path}") return False # Verbindungen schließen cleanup_manager = DatabaseCleanupManager() cleanup_manager.force_close_all_connections() time.sleep(2.0) # Aktueller Datenbank-Backup erstellen current_backup = self.create_backup(compress=True) self.backup_logger.info(f"Current database backed up to: {current_backup}") # Backup wiederherstellen if backup_path.endswith('.gz'): # Komprimiertes Backup entpacken with gzip.open(backup_path, 'rb') as f_in: with open(DATABASE_PATH, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: # Einfache Kopie shutil.copy2(backup_path, DATABASE_PATH) self.backup_logger.info(f"Database restored from: {backup_path}") return True except Exception as e: self.backup_logger.error(f"Backup restoration failed: {e}") return False # ===== CRUD-OPERATIONEN ===== class DatabaseCRUDManager: """ Geschäftslogik-orientierte CRUD-Operationen. Konsolidiert Funktionalität aus db_manager.py. """ def __init__(self): self.crud_logger = get_logger("database_crud") def get_active_jobs(self, limit: int = None) -> List[Job]: """ Holt aktive Jobs mit optimiertem Loading. Args: limit: Maximale Anzahl Jobs Returns: List[Job]: Liste der aktiven Jobs """ try: with get_database_session('cached') as session: query = session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).order_by(Job.created_at.desc()) if limit: query = query.limit(limit) jobs = query.all() self.crud_logger.debug(f"Retrieved {len(jobs)} active jobs") return jobs except Exception as e: self.crud_logger.error(f"Error retrieving active jobs: {e}") return [] def get_printer_with_jobs(self, printer_id: int) -> Optional[Printer]: """ Holt Drucker mit zugehörigen Jobs (Eager Loading). Args: printer_id: ID des Druckers Returns: Optional[Printer]: Drucker mit Jobs oder None """ try: with get_database_session('cached') as session: from sqlalchemy.orm import joinedload printer = session.query(Printer).options( joinedload(Printer.jobs) ).filter(Printer.id == printer_id).first() if printer: self.crud_logger.debug(f"Retrieved printer {printer.name} with {len(printer.jobs)} jobs") return printer except Exception as e: self.crud_logger.error(f"Error retrieving printer with jobs: {e}") return None def get_user_job_statistics(self, user_id: int) -> Dict[str, Any]: """ Holt Benutzer-Job-Statistiken. Args: user_id: ID des Benutzers Returns: Dict: Statistiken des Benutzers """ try: with get_database_session('cached') as session: user = session.query(User).filter(User.id == user_id).first() if not user: return {} # Job-Statistiken berechnen total_jobs = session.query(Job).filter(Job.user_id == user_id).count() completed_jobs = session.query(Job).filter( Job.user_id == user_id, Job.status == 'completed' ).count() active_jobs = session.query(Job).filter( Job.user_id == user_id, Job.status.in_(['pending', 'printing', 'paused']) ).count() stats = { 'user_id': user_id, 'username': user.username, 'total_jobs': total_jobs, 'completed_jobs': completed_jobs, 'active_jobs': active_jobs, 'success_rate': round((completed_jobs / total_jobs * 100), 2) if total_jobs > 0 else 0 } self.crud_logger.debug(f"Generated statistics for user {user.username}") return stats except Exception as e: self.crud_logger.error(f"Error generating user statistics: {e}") return {} # ===== MONITORING-OPERATIONEN ===== class DatabaseMonitor: """ Performance-Überwachung und Gesundheitsprüfungen. Erweitert Funktionalität aus database_utils.py. """ def __init__(self): self.monitor_logger = get_logger("database_monitor") def get_database_health_check(self) -> Dict[str, Any]: """ Umfassende Gesundheitsprüfung der Datenbank. Returns: Dict: Gesundheitsstatus der Datenbank """ health_status = { 'timestamp': datetime.now().isoformat(), 'overall_status': 'unknown', 'checks': {} } try: with get_database_session('monitoring') as session: # 1. Verbindungstest try: session.execute(text("SELECT 1")) health_status['checks']['connection'] = {'status': 'ok', 'message': 'Database connection successful'} except Exception as e: health_status['checks']['connection'] = {'status': 'error', 'message': str(e)} # 2. Integritätsprüfung try: result = session.execute(text("PRAGMA integrity_check")).fetchone() integrity_ok = result and result[0] == 'ok' health_status['checks']['integrity'] = { 'status': 'ok' if integrity_ok else 'warning', 'message': result[0] if result else 'No integrity result' } except Exception as e: health_status['checks']['integrity'] = {'status': 'error', 'message': str(e)} # 3. WAL-Status try: wal_result = session.execute(text("PRAGMA journal_mode")).fetchone() wal_mode = wal_result[0] if wal_result else 'unknown' health_status['checks']['wal_mode'] = { 'status': 'ok' if wal_mode == 'wal' else 'info', 'message': f'Journal mode: {wal_mode}' } except Exception as e: health_status['checks']['wal_mode'] = {'status': 'error', 'message': str(e)} # 4. Datenbankgröße try: if os.path.exists(DATABASE_PATH): db_size = os.path.getsize(DATABASE_PATH) health_status['checks']['database_size'] = { 'status': 'ok', 'message': f'Database size: {db_size / 1024 / 1024:.2f} MB', 'size_bytes': db_size } except Exception as e: health_status['checks']['database_size'] = {'status': 'error', 'message': str(e)} # Gesamtstatus bestimmen statuses = [check['status'] for check in health_status['checks'].values()] if 'error' in statuses: health_status['overall_status'] = 'error' elif 'warning' in statuses: health_status['overall_status'] = 'warning' else: health_status['overall_status'] = 'ok' except Exception as e: health_status['overall_status'] = 'error' health_status['error'] = str(e) self.monitor_logger.error(f"Database health check failed: {e}") return health_status # ===== UNIFIED DATABASE SERVICE ===== class UnifiedDatabaseService: """ Zentrale Schnittstelle für alle Datenbankoperationen. Kombiniert CRUD, Wartung, Cleanup und Monitoring. """ def __init__(self): self.logger = get_logger("unified_database") self.crud = DatabaseCRUDManager() self.backup = DatabaseBackupManager() self.cleanup = DatabaseCleanupManager() self.monitor = DatabaseMonitor() # Engines für Cleanup registrieren for engine_type in ['default', 'monitoring', 'backup']: engine = engine_registry.get_engine(engine_type) self.cleanup.register_engine_for_cleanup(engine) def get_service_status(self) -> Dict[str, Any]: """ Holt den Status aller Datenbankdienste. Returns: Dict: Umfassender Service-Status """ try: health_check = self.monitor.get_database_health_check() backups = self.backup.list_backups() return { 'timestamp': datetime.now().isoformat(), 'health': health_check, 'backups': { 'count': len(backups), 'latest': backups[0] if backups else None }, 'engines': { 'registered_count': len(engine_registry.engines), 'types': list(engine_registry.engines.keys()) } } except Exception as e: self.logger.error(f"Error getting service status: {e}") return {'error': str(e), 'timestamp': datetime.now().isoformat()} def perform_maintenance(self) -> Dict[str, Any]: """ Führt umfassende Datenbankwartung durch. Returns: Dict: Wartungsergebnisse """ maintenance_results = { 'timestamp': datetime.now().isoformat(), 'operations': {} } try: # 1. WAL-Checkpoint self.logger.info("Starting WAL checkpoint...") checkpoint_success = self.cleanup.perform_wal_checkpoint() maintenance_results['operations']['wal_checkpoint'] = { 'success': checkpoint_success, 'message': 'WAL checkpoint completed' if checkpoint_success else 'WAL checkpoint failed' } # 2. Backup erstellen self.logger.info("Creating maintenance backup...") try: backup_path = self.backup.create_backup(compress=True) maintenance_results['operations']['backup'] = { 'success': True, 'message': f'Backup created: {os.path.basename(backup_path)}', 'path': backup_path } except Exception as e: maintenance_results['operations']['backup'] = { 'success': False, 'message': f'Backup failed: {str(e)}' } # 3. Alte Backups aufräumen self.logger.info("Cleaning up old backups...") try: deleted_count = self.backup.cleanup_old_backups(keep_count=10) maintenance_results['operations']['backup_cleanup'] = { 'success': True, 'message': f'Cleaned up {deleted_count} old backups' } except Exception as e: maintenance_results['operations']['backup_cleanup'] = { 'success': False, 'message': f'Backup cleanup failed: {str(e)}' } # 4. Gesundheitsprüfung self.logger.info("Performing health check...") health_check = self.monitor.get_database_health_check() maintenance_results['health_check'] = health_check # Gesamtergebnis operation_results = [op['success'] for op in maintenance_results['operations'].values()] maintenance_results['overall_success'] = all(operation_results) self.logger.info(f"Maintenance completed with overall success: {maintenance_results['overall_success']}") except Exception as e: self.logger.error(f"Maintenance operation failed: {e}") maintenance_results['error'] = str(e) maintenance_results['overall_success'] = False return maintenance_results # ===== GLOBALE INSTANZ ===== # Zentrale Datenbankdienst-Instanz database_service = UnifiedDatabaseService() # Cleanup-Manager für Legacy-Kompatibilität cleanup_manager = database_service.cleanup # Backup-Manager für Legacy-Kompatibilität backup_manager = database_service.backup