🎉 Refactor utils directory: Remove unused files & add new script_collection.py 🎨📚
This commit is contained in:
278
backend/utils/database_suite.py
Normal file
278
backend/utils/database_suite.py
Normal file
@ -0,0 +1,278 @@
|
||||
#!/usr/bin/env python3.11
|
||||
"""
|
||||
Database Suite - FINALE ULTRA KONSOLIDIERUNG
|
||||
============================================
|
||||
|
||||
Migration Information:
|
||||
- Ursprünglich: database_core.py, database_utils.py, database_migration.py,
|
||||
database_schema_migration.py, fix_database_immediate.py, migrate_db.py,
|
||||
migrate_user_settings.py, test_database_cleanup.py
|
||||
- Konsolidiert am: 2025-06-09
|
||||
- Funktionalitäten: DB-Core, Utilities, Migrationen, Fixes, Cleanup
|
||||
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
|
||||
|
||||
FINALE ULTRA KONSOLIDIERUNG für Projektarbeit MYP
|
||||
Author: MYP Team - Till Tomczak
|
||||
Ziel: DRASTISCHE Datei-Reduktion!
|
||||
"""
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Any, Optional
|
||||
from sqlalchemy import create_engine, text, inspect
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
# Logger
|
||||
db_logger = get_logger("database_suite")
|
||||
|
||||
# ===== DATABASE CORE =====
|
||||
|
||||
class DatabaseCore:
|
||||
"""Database-Core-Management"""
|
||||
|
||||
def __init__(self):
|
||||
self.database_path = "backend/database/myp.db"
|
||||
self.backup_path = "backend/database/backups/"
|
||||
|
||||
def get_connection(self):
|
||||
"""Holt Datenbank-Verbindung"""
|
||||
return sqlite3.connect(self.database_path)
|
||||
|
||||
def execute_query(self, query: str, params=None) -> List[tuple]:
|
||||
"""Führt Query aus"""
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
if params:
|
||||
cursor.execute(query, params)
|
||||
else:
|
||||
cursor.execute(query)
|
||||
|
||||
result = cursor.fetchall()
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
db_logger.error(f"Query-Fehler: {e}")
|
||||
return []
|
||||
|
||||
def backup_database(self) -> bool:
|
||||
"""Erstellt Datenbank-Backup"""
|
||||
try:
|
||||
os.makedirs(self.backup_path, exist_ok=True)
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_file = f"{self.backup_path}myp_backup_{timestamp}.db"
|
||||
|
||||
shutil.copy2(self.database_path, backup_file)
|
||||
db_logger.info(f"Backup erstellt: {backup_file}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
db_logger.error(f"Backup-Fehler: {e}")
|
||||
return False
|
||||
|
||||
# ===== DATABASE UTILITIES =====
|
||||
|
||||
class DatabaseUtils:
|
||||
"""Datenbank-Hilfsfunktionen"""
|
||||
|
||||
def __init__(self):
|
||||
self.db_core = DatabaseCore()
|
||||
|
||||
def get_table_info(self, table_name: str) -> List[Dict]:
|
||||
"""Holt Tabellen-Info"""
|
||||
query = f"PRAGMA table_info({table_name})"
|
||||
result = self.db_core.execute_query(query)
|
||||
|
||||
return [
|
||||
{
|
||||
'cid': row[0],
|
||||
'name': row[1],
|
||||
'type': row[2],
|
||||
'notnull': row[3],
|
||||
'default': row[4],
|
||||
'pk': row[5]
|
||||
}
|
||||
for row in result
|
||||
]
|
||||
|
||||
def get_all_tables(self) -> List[str]:
|
||||
"""Holt alle Tabellen"""
|
||||
query = "SELECT name FROM sqlite_master WHERE type='table'"
|
||||
result = self.db_core.execute_query(query)
|
||||
return [row[0] for row in result]
|
||||
|
||||
def count_rows(self, table_name: str) -> int:
|
||||
"""Zählt Zeilen in Tabelle"""
|
||||
query = f"SELECT COUNT(*) FROM {table_name}"
|
||||
result = self.db_core.execute_query(query)
|
||||
return result[0][0] if result else 0
|
||||
|
||||
# ===== DATABASE MIGRATION =====
|
||||
|
||||
class DatabaseMigration:
|
||||
"""Datenbank-Migrationen"""
|
||||
|
||||
def __init__(self):
|
||||
self.db_core = DatabaseCore()
|
||||
self.db_utils = DatabaseUtils()
|
||||
|
||||
def migrate_user_settings(self) -> bool:
|
||||
"""Migriert Benutzer-Einstellungen"""
|
||||
try:
|
||||
# Backup vor Migration
|
||||
self.db_core.backup_database()
|
||||
|
||||
# Check if user_settings column exists
|
||||
user_info = self.db_utils.get_table_info('users')
|
||||
has_settings = any(col['name'] == 'user_settings' for col in user_info)
|
||||
|
||||
if not has_settings:
|
||||
# Add user_settings column
|
||||
query = "ALTER TABLE users ADD COLUMN user_settings TEXT"
|
||||
self.db_core.execute_query(query)
|
||||
db_logger.info("user_settings Spalte hinzugefügt")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
db_logger.error(f"User-Settings Migration Fehler: {e}")
|
||||
return False
|
||||
|
||||
def fix_database_immediate(self) -> bool:
|
||||
"""Sofortige Datenbank-Fixes"""
|
||||
try:
|
||||
# Backup
|
||||
self.db_core.backup_database()
|
||||
|
||||
# Fix common issues
|
||||
fixes = [
|
||||
"UPDATE jobs SET status = 'pending' WHERE status IS NULL",
|
||||
"UPDATE printers SET status = 'offline' WHERE status IS NULL",
|
||||
"UPDATE users SET role = 'user' WHERE role IS NULL"
|
||||
]
|
||||
|
||||
for fix in fixes:
|
||||
self.db_core.execute_query(fix)
|
||||
|
||||
db_logger.info("Sofortige Datenbank-Fixes angewendet")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
db_logger.error(f"Immediate Fix Fehler: {e}")
|
||||
return False
|
||||
|
||||
# ===== DATABASE CLEANUP =====
|
||||
|
||||
class DatabaseCleanup:
|
||||
"""Datenbank-Bereinigung"""
|
||||
|
||||
def __init__(self):
|
||||
self.db_core = DatabaseCore()
|
||||
|
||||
def cleanup_old_jobs(self, days: int = 30) -> int:
|
||||
"""Löscht alte Jobs"""
|
||||
try:
|
||||
query = f"""
|
||||
DELETE FROM jobs
|
||||
WHERE created_at < datetime('now', '-{days} days')
|
||||
AND status IN ('completed', 'failed')
|
||||
"""
|
||||
|
||||
conn = self.db_core.get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(query)
|
||||
deleted = cursor.rowcount
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
db_logger.info(f"{deleted} alte Jobs gelöscht")
|
||||
return deleted
|
||||
|
||||
except Exception as e:
|
||||
db_logger.error(f"Job-Cleanup Fehler: {e}")
|
||||
return 0
|
||||
|
||||
def vacuum_database(self) -> bool:
|
||||
"""Komprimiert Datenbank"""
|
||||
try:
|
||||
self.db_core.execute_query("VACUUM")
|
||||
db_logger.info("Datenbank komprimiert")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
db_logger.error(f"Vacuum Fehler: {e}")
|
||||
return False
|
||||
|
||||
# ===== GLOBALE INSTANZEN =====
|
||||
|
||||
database_core = DatabaseCore()
|
||||
database_utils = DatabaseUtils()
|
||||
database_migration = DatabaseMigration()
|
||||
database_cleanup = DatabaseCleanup()
|
||||
|
||||
# ===== CONVENIENCE FUNCTIONS =====
|
||||
|
||||
def backup_database() -> bool:
|
||||
"""Erstellt Datenbank-Backup"""
|
||||
return database_core.backup_database()
|
||||
|
||||
def get_database_stats() -> Dict[str, Any]:
|
||||
"""Holt Datenbank-Statistiken"""
|
||||
try:
|
||||
tables = database_utils.get_all_tables()
|
||||
stats = {'total_tables': len(tables), 'tables': {}}
|
||||
|
||||
for table in tables:
|
||||
stats['tables'][table] = database_utils.count_rows(table)
|
||||
|
||||
return stats
|
||||
except Exception as e:
|
||||
db_logger.error(f"Stats Fehler: {e}")
|
||||
return {'error': str(e)}
|
||||
|
||||
def run_database_maintenance() -> bool:
|
||||
"""Führt Datenbank-Wartung aus"""
|
||||
try:
|
||||
# Cleanup
|
||||
database_cleanup.cleanup_old_jobs()
|
||||
|
||||
# Vacuum
|
||||
database_cleanup.vacuum_database()
|
||||
|
||||
# Backup
|
||||
database_core.backup_database()
|
||||
|
||||
db_logger.info("Datenbank-Wartung abgeschlossen")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
db_logger.error(f"Wartung Fehler: {e}")
|
||||
return False
|
||||
|
||||
# ===== LEGACY COMPATIBILITY =====
|
||||
|
||||
# Original database_core.py compatibility
|
||||
def get_db_connection():
|
||||
return database_core.get_connection()
|
||||
|
||||
# Original database_utils.py compatibility
|
||||
def get_table_names():
|
||||
return database_utils.get_all_tables()
|
||||
|
||||
# Original database_migration.py compatibility
|
||||
def run_migrations():
|
||||
return database_migration.migrate_user_settings()
|
||||
|
||||
# Original database_cleanup.py compatibility
|
||||
def clean_database():
|
||||
return run_database_maintenance()
|
||||
|
||||
db_logger.info("✅ Database Suite Module initialisiert")
|
||||
db_logger.info("📊 FINALE ULTRA Konsolidierung: 8 Dateien → 1 Datei (87% Reduktion)")
|
Reference in New Issue
Block a user