#!/usr/bin/env python3 """ Database Migration Utility für MYP Platform Überprüft und aktualisiert die Datenbankschema automatisch. """ import sqlite3 import logging from typing import List, Dict, Any from datetime import datetime from config.settings import DATABASE_PATH from models import init_db logger = logging.getLogger(__name__) def get_table_columns(table_name: str) -> List[Dict[str, Any]]: """ Ruft die Spalten einer Tabelle ab. Args: table_name: Name der Tabelle Returns: List[Dict]: Liste der Spalten mit ihren Eigenschaften """ try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute(f'PRAGMA table_info({table_name})') columns = cursor.fetchall() conn.close() return [ { 'name': col[1], 'type': col[2], 'not_null': bool(col[3]), 'default': col[4], 'primary_key': bool(col[5]) } for col in columns ] except Exception as e: logger.error(f"Fehler beim Abrufen der Spalten für Tabelle {table_name}: {e}") return [] def table_exists(table_name: str) -> bool: """ Prüft, ob eine Tabelle existiert. Args: table_name: Name der Tabelle Returns: bool: True wenn die Tabelle existiert """ try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name=? """, (table_name,)) result = cursor.fetchone() conn.close() return result is not None except Exception as e: logger.error(f"Fehler beim Prüfen der Tabelle {table_name}: {e}") return False def column_exists(table_name: str, column_name: str) -> bool: """ Prüft, ob eine Spalte in einer Tabelle existiert. Args: table_name: Name der Tabelle column_name: Name der Spalte Returns: bool: True wenn die Spalte existiert """ columns = get_table_columns(table_name) return any(col['name'] == column_name for col in columns) def add_column_if_missing(table_name: str, column_name: str, column_type: str, default_value: str = None) -> bool: """ Fügt eine Spalte hinzu, falls sie nicht existiert. Args: table_name: Name der Tabelle column_name: Name der Spalte column_type: Datentyp der Spalte default_value: Optional - Standardwert Returns: bool: True wenn erfolgreich """ if column_exists(table_name, column_name): logger.info(f"Spalte {column_name} existiert bereits in Tabelle {table_name}") return True try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" if default_value: sql += f" DEFAULT {default_value}" cursor.execute(sql) conn.commit() conn.close() logger.info(f"Spalte {column_name} erfolgreich zu Tabelle {table_name} hinzugefügt") return True except Exception as e: logger.error(f"Fehler beim Hinzufügen der Spalte {column_name} zu Tabelle {table_name}: {e}") return False def migrate_database() -> bool: """ Führt alle notwendigen Datenbankmigrationen durch. Returns: bool: True wenn erfolgreich """ logger.info("Starte Datenbankmigration...") try: # Prüfe, ob grundlegende Tabellen existieren required_tables = ['users', 'printers', 'jobs', 'stats'] missing_tables = [table for table in required_tables if not table_exists(table)] if missing_tables: logger.warning(f"Fehlende Tabellen gefunden: {missing_tables}") logger.info("Erstelle alle Tabellen neu...") init_db() logger.info("Tabellen erfolgreich erstellt") return True # Prüfe spezifische Spalten, die möglicherweise fehlen migrations = [ # Printers Tabelle ('printers', 'last_checked', 'DATETIME', 'NULL'), ('printers', 'active', 'BOOLEAN', '1'), ('printers', 'created_at', 'DATETIME', 'CURRENT_TIMESTAMP'), # Jobs Tabelle ('jobs', 'duration_minutes', 'INTEGER', '60'), ('jobs', 'actual_end_time', 'DATETIME', 'NULL'), ('jobs', 'owner_id', 'INTEGER', 'NULL'), ('jobs', 'file_path', 'VARCHAR(500)', 'NULL'), # Users Tabelle ('users', 'username', 'VARCHAR(100)', 'NULL'), ('users', 'active', 'BOOLEAN', '1'), ('users', 'created_at', 'DATETIME', 'CURRENT_TIMESTAMP'), ] success = True for table_name, column_name, column_type, default_value in migrations: if not add_column_if_missing(table_name, column_name, column_type, default_value): success = False if success: logger.info("Datenbankmigration erfolgreich abgeschlossen") else: logger.warning("Datenbankmigration mit Fehlern abgeschlossen") return success except Exception as e: logger.error(f"Fehler bei der Datenbankmigration: {e}") return False def check_database_integrity() -> bool: """ Überprüft die Integrität der Datenbank. Returns: bool: True wenn die Datenbank integer ist """ try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute('PRAGMA integrity_check') result = cursor.fetchone() conn.close() if result and result[0] == 'ok': logger.info("Datenbankintegrität: OK") return True else: logger.error(f"Datenbankintegrität: FEHLER - {result}") return False except Exception as e: logger.error(f"Fehler bei der Integritätsprüfung: {e}") return False def backup_database(backup_path: str = None) -> bool: """ Erstellt ein Backup der Datenbank. Args: backup_path: Optional - Pfad für das Backup Returns: bool: True wenn erfolgreich """ if not backup_path: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"database/myp_backup_{timestamp}.db" try: import shutil shutil.copy2(DATABASE_PATH, backup_path) logger.info(f"Datenbank-Backup erstellt: {backup_path}") return True except Exception as e: logger.error(f"Fehler beim Erstellen des Backups: {e}") return False if __name__ == "__main__": # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) print("=== MYP Platform - Datenbankmigration ===") # Backup erstellen if backup_database(): print("✅ Backup erstellt") else: print("⚠️ Backup-Erstellung fehlgeschlagen") # Integrität prüfen if check_database_integrity(): print("✅ Datenbankintegrität OK") else: print("❌ Datenbankintegrität FEHLER") # Migration durchführen if migrate_database(): print("✅ Migration erfolgreich") else: print("❌ Migration fehlgeschlagen") print("\nMigration abgeschlossen!")