manage-your-printer/utils/database_migration.py
2025-06-04 10:03:22 +02:00

252 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
Database Migration Utility für MYP Platform
Überprüft und aktualisiert die Datenbankschema automatisch.
"""
import sqlite3
import logging
from typing import List, Dict, Any
from datetime import datetime
from config.settings import DATABASE_PATH
from models import init_db
logger = logging.getLogger(__name__)
def get_table_columns(table_name: str) -> List[Dict[str, Any]]:
"""
Ruft die Spalten einer Tabelle ab.
Args:
table_name: Name der Tabelle
Returns:
List[Dict]: Liste der Spalten mit ihren Eigenschaften
"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute(f'PRAGMA table_info({table_name})')
columns = cursor.fetchall()
conn.close()
return [
{
'name': col[1],
'type': col[2],
'not_null': bool(col[3]),
'default': col[4],
'primary_key': bool(col[5])
}
for col in columns
]
except Exception as e:
logger.error(f"Fehler beim Abrufen der Spalten für Tabelle {table_name}: {e}")
return []
def table_exists(table_name: str) -> bool:
"""
Prüft, ob eine Tabelle existiert.
Args:
table_name: Name der Tabelle
Returns:
bool: True wenn die Tabelle existiert
"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", (table_name,))
result = cursor.fetchone()
conn.close()
return result is not None
except Exception as e:
logger.error(f"Fehler beim Prüfen der Tabelle {table_name}: {e}")
return False
def column_exists(table_name: str, column_name: str) -> bool:
"""
Prüft, ob eine Spalte in einer Tabelle existiert.
Args:
table_name: Name der Tabelle
column_name: Name der Spalte
Returns:
bool: True wenn die Spalte existiert
"""
columns = get_table_columns(table_name)
return any(col['name'] == column_name for col in columns)
def add_column_if_missing(table_name: str, column_name: str, column_type: str, default_value: str = None) -> bool:
"""
Fügt eine Spalte hinzu, falls sie nicht existiert.
Args:
table_name: Name der Tabelle
column_name: Name der Spalte
column_type: Datentyp der Spalte
default_value: Optional - Standardwert
Returns:
bool: True wenn erfolgreich
"""
if column_exists(table_name, column_name):
logger.info(f"Spalte {column_name} existiert bereits in Tabelle {table_name}")
return True
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
if default_value:
sql += f" DEFAULT {default_value}"
cursor.execute(sql)
conn.commit()
conn.close()
logger.info(f"Spalte {column_name} erfolgreich zu Tabelle {table_name} hinzugefügt")
return True
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Spalte {column_name} zu Tabelle {table_name}: {e}")
return False
def migrate_database() -> bool:
"""
Führt alle notwendigen Datenbankmigrationen durch.
Returns:
bool: True wenn erfolgreich
"""
logger.info("Starte Datenbankmigration...")
try:
# Prüfe, ob grundlegende Tabellen existieren
required_tables = ['users', 'printers', 'jobs', 'stats']
missing_tables = [table for table in required_tables if not table_exists(table)]
if missing_tables:
logger.warning(f"Fehlende Tabellen gefunden: {missing_tables}")
logger.info("Erstelle alle Tabellen neu...")
init_db()
logger.info("Tabellen erfolgreich erstellt")
return True
# Prüfe spezifische Spalten, die möglicherweise fehlen
migrations = [
# Printers Tabelle
('printers', 'last_checked', 'DATETIME', 'NULL'),
('printers', 'active', 'BOOLEAN', '1'),
('printers', 'created_at', 'DATETIME', 'CURRENT_TIMESTAMP'),
# Jobs Tabelle
('jobs', 'duration_minutes', 'INTEGER', '60'),
('jobs', 'actual_end_time', 'DATETIME', 'NULL'),
('jobs', 'owner_id', 'INTEGER', 'NULL'),
('jobs', 'file_path', 'VARCHAR(500)', 'NULL'),
# Users Tabelle
('users', 'username', 'VARCHAR(100)', 'NULL'),
('users', 'active', 'BOOLEAN', '1'),
('users', 'created_at', 'DATETIME', 'CURRENT_TIMESTAMP'),
]
success = True
for table_name, column_name, column_type, default_value in migrations:
if not add_column_if_missing(table_name, column_name, column_type, default_value):
success = False
if success:
logger.info("Datenbankmigration erfolgreich abgeschlossen")
else:
logger.warning("Datenbankmigration mit Fehlern abgeschlossen")
return success
except Exception as e:
logger.error(f"Fehler bei der Datenbankmigration: {e}")
return False
def check_database_integrity() -> bool:
"""
Überprüft die Integrität der Datenbank.
Returns:
bool: True wenn die Datenbank integer ist
"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('PRAGMA integrity_check')
result = cursor.fetchone()
conn.close()
if result and result[0] == 'ok':
logger.info("Datenbankintegrität: OK")
return True
else:
logger.error(f"Datenbankintegrität: FEHLER - {result}")
return False
except Exception as e:
logger.error(f"Fehler bei der Integritätsprüfung: {e}")
return False
def backup_database(backup_path: str = None) -> bool:
"""
Erstellt ein Backup der Datenbank.
Args:
backup_path: Optional - Pfad für das Backup
Returns:
bool: True wenn erfolgreich
"""
if not backup_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"database/myp_backup_{timestamp}.db"
try:
import shutil
shutil.copy2(DATABASE_PATH, backup_path)
logger.info(f"Datenbank-Backup erstellt: {backup_path}")
return True
except Exception as e:
logger.error(f"Fehler beim Erstellen des Backups: {e}")
return False
if __name__ == "__main__":
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== MYP Platform - Datenbankmigration ===")
# Backup erstellen
if backup_database():
print("✅ Backup erstellt")
else:
print("⚠️ Backup-Erstellung fehlgeschlagen")
# Integrität prüfen
if check_database_integrity():
print("✅ Datenbankintegrität OK")
else:
print("❌ Datenbankintegrität FEHLER")
# Migration durchführen
if migrate_database():
print("✅ Migration erfolgreich")
else:
print("❌ Migration fehlgeschlagen")
print("\nMigration abgeschlossen!")