#!/usr/bin/env python3 """ Datenbankanalyse für MYP-Backend =============================== Analysiert die Datenbankstruktur auf: - Ungenutzte Modelle und Felder - Ineffiziente Queries - Fehlende Indizes - Performance-Probleme Autor: Till Tomczak Datum: 2025-06-19 """ import os import sys import re import ast from pathlib import Path from typing import Dict, List, Set, Tuple, Optional from collections import defaultdict # Pfad zum Backend-Verzeichnis BACKEND_DIR = Path(__file__).parent BLUEPRINTS_DIR = BACKEND_DIR / "blueprints" UTILS_DIR = BACKEND_DIR / "utils" MODELS_FILE = BACKEND_DIR / "models.py" class DatabaseAnalyzer: """Analysiert die Datenbankstruktur und -nutzung.""" def __init__(self): self.models = {} self.model_fields = defaultdict(list) self.model_usage = defaultdict(set) self.field_usage = defaultdict(set) self.queries = [] self.potential_issues = [] def analyze_models(self): """Analysiert alle definierten Datenbankmodelle.""" print("📊 Analysiere Datenbankmodelle...") with open(MODELS_FILE, 'r', encoding='utf-8') as f: content = f.read() # Modell-Klassen finden model_pattern = r'class (\w+)\([^)]*Base[^)]*\):' models = re.findall(model_pattern, content) for model in models: self.models[model] = self._extract_model_details(content, model) print(f"✅ {len(self.models)} Modelle gefunden: {list(self.models.keys())}") def _extract_model_details(self, content: str, model_name: str) -> Dict: """Extrahiert Details zu einem Modell.""" pattern = rf'class {model_name}\([^)]*\):(.*?)(?=class|\Z)' match = re.search(pattern, content, re.DOTALL) if not match: return {} model_content = match.group(1) # Felder extrahieren fields = [] field_pattern = r'(\w+)\s*=\s*Column\([^)]*\)' field_matches = re.findall(field_pattern, model_content) for field in field_matches: if field not in ['__tablename__']: fields.append(field) self.model_fields[model_name].append(field) # Relationships extrahieren relationships = [] rel_pattern = r'(\w+)\s*=\s*relationship\([^)]*\)' rel_matches = re.findall(rel_pattern, model_content) relationships.extend(rel_matches) return { 'fields': fields, 'relationships': relationships, 'content': model_content } def analyze_usage(self): """Analysiert die Nutzung der Modelle in Blueprints.""" print("🔍 Analysiere Modellnutzung in Blueprints...") py_files = list(BLUEPRINTS_DIR.glob("*.py")) + list(UTILS_DIR.glob("*.py")) for file_path in py_files: self._analyze_file_usage(file_path) print(f"✅ {len(py_files)} Dateien analysiert") def _analyze_file_usage(self, file_path: Path): """Analysiert die Nutzung in einer spezifischen Datei.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Modell-Imports finden import_pattern = r'from models import[^#\n]*' imports = re.findall(import_pattern, content) for imp in imports: # Einzelne Modelle aus Import extrahieren models_in_import = re.findall(r'\b([A-Z]\w+)\b', imp) for model in models_in_import: if model in self.models: self.model_usage[model].add(str(file_path)) # Query-Patterns finden query_patterns = [ r'session\.query\([^)]*\)', r'db_session\.query\([^)]*\)', r'\.filter\([^)]*\)', r'\.filter_by\([^)]*\)', r'\.join\([^)]*\)', r'\.all\(\)', r'\.first\(\)', r'\.get\([^)]*\)' ] for pattern in query_patterns: matches = re.findall(pattern, content) for match in matches: self.queries.append({ 'file': str(file_path), 'query': match, 'line': self._find_line_number(content, match) }) except Exception as e: print(f"⚠️ Fehler beim Analysieren von {file_path}: {e}") def _find_line_number(self, content: str, search_text: str) -> int: """Findet die Zeilennummer für einen Text.""" lines = content.split('\n') for i, line in enumerate(lines, 1): if search_text in line: return i return 0 def find_unused_models(self) -> List[str]: """Findet ungenutzte Modelle.""" unused = [] for model in self.models: if not self.model_usage[model]: unused.append(model) return unused def find_unused_fields(self) -> Dict[str, List[str]]: """Findet ungenutzte Felder (sehr einfache Analyse).""" unused_fields = {} for model, fields in self.model_fields.items(): model_unused = [] for field in fields: # Einfache Suche nach Feldnutzung if field in ['id', 'created_at', 'updated_at']: continue # Standard-Felder überspringen usage_count = 0 for file_path in BLUEPRINTS_DIR.glob("*.py"): try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if f'.{field}' in content or f'"{field}"' in content or f"'{field}'" in content: usage_count += 1 except: continue if usage_count == 0: model_unused.append(field) if model_unused: unused_fields[model] = model_unused return unused_fields def find_missing_indexes(self) -> List[Dict]: """Findet Felder die Indizes benötigen könnten.""" missing_indexes = [] # Häufige Felder die Indizes brauchen index_candidates = [ 'user_id', 'printer_id', 'job_id', 'created_at', 'updated_at', 'start_at', 'end_at', 'status', 'email', 'username' ] for model, details in self.models.items(): for field in details.get('fields', []): if field in index_candidates: # Prüfen ob bereits Index vorhanden model_content = details.get('content', '') if 'index=True' not in model_content: missing_indexes.append({ 'model': model, 'field': field, 'reason': 'Häufig in WHERE/JOIN-Klauseln verwendet' }) return missing_indexes def find_n_plus_one_queries(self) -> List[Dict]: """Findet potentielle N+1 Query-Probleme.""" n_plus_one = [] for query in self.queries: query_text = query['query'] # Patterns für N+1 Probleme if '.user' in query_text or '.printer' in query_text or '.job' in query_text: if 'eager' not in query_text and 'join' not in query_text.lower(): n_plus_one.append({ 'file': query['file'], 'line': query['line'], 'query': query_text, 'issue': 'Potentielles N+1 Problem durch Relationship-Zugriff' }) return n_plus_one def find_inefficient_queries(self) -> List[Dict]: """Findet ineffiziente Queries.""" inefficient = [] for query in self.queries: query_text = query['query'] # Pattern für ineffiziente Queries if '.all()' in query_text and 'limit' not in query_text.lower(): inefficient.append({ 'file': query['file'], 'line': query['line'], 'query': query_text, 'issue': 'Lädt alle Datensätze ohne LIMIT' }) # Doppelte Queries if 'query(Printer).count()' in query_text: inefficient.append({ 'file': query['file'], 'line': query['line'], 'query': query_text, 'issue': 'Doppelte COUNT-Query' }) return inefficient def generate_report(self) -> str: """Generiert einen Analysebericht.""" report = [] report.append("# Datenbankanalyse für MYP-Backend") report.append("=" * 50) report.append("") # Modell-Übersicht report.append("## 📊 Modell-Übersicht") report.append(f"Gefundene Modelle: {len(self.models)}") for model, details in self.models.items(): usage_count = len(self.model_usage[model]) report.append(f"- **{model}**: {len(details.get('fields', []))} Felder, {usage_count} Nutzungen") report.append("") # Ungenutzte Modelle unused_models = self.find_unused_models() if unused_models: report.append("## ⚠️ Ungenutzte Modelle") for model in unused_models: report.append(f"- **{model}**: Wird nirgends importiert oder verwendet") report.append("") # Ungenutzte Felder unused_fields = self.find_unused_fields() if unused_fields: report.append("## 🔍 Potentiell ungenutzte Felder") for model, fields in unused_fields.items(): report.append(f"- **{model}**: {', '.join(fields)}") report.append("") # Fehlende Indizes missing_indexes = self.find_missing_indexes() if missing_indexes: report.append("## 📈 Empfohlene Indizes") for index in missing_indexes: report.append(f"- **{index['model']}.{index['field']}**: {index['reason']}") report.append("") # N+1 Probleme n_plus_one = self.find_n_plus_one_queries() if n_plus_one: report.append("## 🐌 Potentielle N+1 Query-Probleme") for issue in n_plus_one[:10]: # Nur erste 10 report.append(f"- **{issue['file']}:{issue['line']}**: {issue['query']}") report.append("") # Ineffiziente Queries inefficient = self.find_inefficient_queries() if inefficient: report.append("## ⚡ Ineffiziente Queries") for issue in inefficient: report.append(f"- **{issue['file']}:{issue['line']}**: {issue['issue']}") report.append("") # Raspberry Pi Empfehlungen report.append("## 🥧 Raspberry Pi Performance-Empfehlungen") report.append("- **SQLite WAL-Modus**: Bereits konfiguriert (aber deaktiviert für WSL2)") report.append("- **Cache-Größe**: Auf 32MB reduziert für Pi") report.append("- **Memory-Mapped I/O**: Auf 128MB reduziert") report.append("- **Eager Loading**: Verwende `joinedload()` für Relationships") report.append("- **Pagination**: Implementiere LIMIT/OFFSET für große Datensätze") report.append("- **Connection Pooling**: Bereits mit StaticPool konfiguriert") report.append("") return "\n".join(report) def run_analysis(self): """Führt die komplette Analyse durch.""" print("🚀 Starte Datenbankanalyse...") self.analyze_models() self.analyze_usage() print("📝 Generiere Bericht...") report = self.generate_report() # Bericht speichern report_file = BACKEND_DIR / "database_analysis_report.md" with open(report_file, 'w', encoding='utf-8') as f: f.write(report) print(f"✅ Analyse abgeschlossen! Bericht gespeichert: {report_file}") return report if __name__ == "__main__": analyzer = DatabaseAnalyzer() report = analyzer.run_analysis() print("\n" + "="*50) print(report)