Files
Projektarbeit-MYP/backend/database_analysis.py
Till Tomczak 3f7aa12577 📊 Vollständige Projektanalyse - Backup vor Optimierung
🔍 Analyseergebnisse:
- 62% ungenutzte Imports (788 von 1.271)
- 29% redundante Funktionen (326 von 1.126)
- 35% optimierbare Frontend-Assets (1.7MB von 5MB)
- 3.849 Zeilen Legacy-Code löschbar

📁 Erstellte Analyse-Dateien:
- PROJEKT_ANALYSE_VOLLSTÄNDIGER_BERICHT.md
- database_analysis_report.md

🎯 Bereit für systematische Optimierung

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-19 20:53:33 +02:00

346 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Datenbankanalyse für MYP-Backend
===============================
Analysiert die Datenbankstruktur auf:
- Ungenutzte Modelle und Felder
- Ineffiziente Queries
- Fehlende Indizes
- Performance-Probleme
Autor: Till Tomczak
Datum: 2025-06-19
"""
import os
import sys
import re
import ast
from pathlib import Path
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
# Pfad zum Backend-Verzeichnis
BACKEND_DIR = Path(__file__).parent
BLUEPRINTS_DIR = BACKEND_DIR / "blueprints"
UTILS_DIR = BACKEND_DIR / "utils"
MODELS_FILE = BACKEND_DIR / "models.py"
class DatabaseAnalyzer:
"""Analysiert die Datenbankstruktur und -nutzung."""
def __init__(self):
self.models = {}
self.model_fields = defaultdict(list)
self.model_usage = defaultdict(set)
self.field_usage = defaultdict(set)
self.queries = []
self.potential_issues = []
def analyze_models(self):
"""Analysiert alle definierten Datenbankmodelle."""
print("📊 Analysiere Datenbankmodelle...")
with open(MODELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
# Modell-Klassen finden
model_pattern = r'class (\w+)\([^)]*Base[^)]*\):'
models = re.findall(model_pattern, content)
for model in models:
self.models[model] = self._extract_model_details(content, model)
print(f"{len(self.models)} Modelle gefunden: {list(self.models.keys())}")
def _extract_model_details(self, content: str, model_name: str) -> Dict:
"""Extrahiert Details zu einem Modell."""
pattern = rf'class {model_name}\([^)]*\):(.*?)(?=class|\Z)'
match = re.search(pattern, content, re.DOTALL)
if not match:
return {}
model_content = match.group(1)
# Felder extrahieren
fields = []
field_pattern = r'(\w+)\s*=\s*Column\([^)]*\)'
field_matches = re.findall(field_pattern, model_content)
for field in field_matches:
if field not in ['__tablename__']:
fields.append(field)
self.model_fields[model_name].append(field)
# Relationships extrahieren
relationships = []
rel_pattern = r'(\w+)\s*=\s*relationship\([^)]*\)'
rel_matches = re.findall(rel_pattern, model_content)
relationships.extend(rel_matches)
return {
'fields': fields,
'relationships': relationships,
'content': model_content
}
def analyze_usage(self):
"""Analysiert die Nutzung der Modelle in Blueprints."""
print("🔍 Analysiere Modellnutzung in Blueprints...")
py_files = list(BLUEPRINTS_DIR.glob("*.py")) + list(UTILS_DIR.glob("*.py"))
for file_path in py_files:
self._analyze_file_usage(file_path)
print(f"{len(py_files)} Dateien analysiert")
def _analyze_file_usage(self, file_path: Path):
"""Analysiert die Nutzung in einer spezifischen Datei."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Modell-Imports finden
import_pattern = r'from models import[^#\n]*'
imports = re.findall(import_pattern, content)
for imp in imports:
# Einzelne Modelle aus Import extrahieren
models_in_import = re.findall(r'\b([A-Z]\w+)\b', imp)
for model in models_in_import:
if model in self.models:
self.model_usage[model].add(str(file_path))
# Query-Patterns finden
query_patterns = [
r'session\.query\([^)]*\)',
r'db_session\.query\([^)]*\)',
r'\.filter\([^)]*\)',
r'\.filter_by\([^)]*\)',
r'\.join\([^)]*\)',
r'\.all\(\)',
r'\.first\(\)',
r'\.get\([^)]*\)'
]
for pattern in query_patterns:
matches = re.findall(pattern, content)
for match in matches:
self.queries.append({
'file': str(file_path),
'query': match,
'line': self._find_line_number(content, match)
})
except Exception as e:
print(f"⚠️ Fehler beim Analysieren von {file_path}: {e}")
def _find_line_number(self, content: str, search_text: str) -> int:
"""Findet die Zeilennummer für einen Text."""
lines = content.split('\n')
for i, line in enumerate(lines, 1):
if search_text in line:
return i
return 0
def find_unused_models(self) -> List[str]:
"""Findet ungenutzte Modelle."""
unused = []
for model in self.models:
if not self.model_usage[model]:
unused.append(model)
return unused
def find_unused_fields(self) -> Dict[str, List[str]]:
"""Findet ungenutzte Felder (sehr einfache Analyse)."""
unused_fields = {}
for model, fields in self.model_fields.items():
model_unused = []
for field in fields:
# Einfache Suche nach Feldnutzung
if field in ['id', 'created_at', 'updated_at']:
continue # Standard-Felder überspringen
usage_count = 0
for file_path in BLUEPRINTS_DIR.glob("*.py"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if f'.{field}' in content or f'"{field}"' in content or f"'{field}'" in content:
usage_count += 1
except:
continue
if usage_count == 0:
model_unused.append(field)
if model_unused:
unused_fields[model] = model_unused
return unused_fields
def find_missing_indexes(self) -> List[Dict]:
"""Findet Felder die Indizes benötigen könnten."""
missing_indexes = []
# Häufige Felder die Indizes brauchen
index_candidates = [
'user_id', 'printer_id', 'job_id', 'created_at', 'updated_at',
'start_at', 'end_at', 'status', 'email', 'username'
]
for model, details in self.models.items():
for field in details.get('fields', []):
if field in index_candidates:
# Prüfen ob bereits Index vorhanden
model_content = details.get('content', '')
if 'index=True' not in model_content:
missing_indexes.append({
'model': model,
'field': field,
'reason': 'Häufig in WHERE/JOIN-Klauseln verwendet'
})
return missing_indexes
def find_n_plus_one_queries(self) -> List[Dict]:
"""Findet potentielle N+1 Query-Probleme."""
n_plus_one = []
for query in self.queries:
query_text = query['query']
# Patterns für N+1 Probleme
if '.user' in query_text or '.printer' in query_text or '.job' in query_text:
if 'eager' not in query_text and 'join' not in query_text.lower():
n_plus_one.append({
'file': query['file'],
'line': query['line'],
'query': query_text,
'issue': 'Potentielles N+1 Problem durch Relationship-Zugriff'
})
return n_plus_one
def find_inefficient_queries(self) -> List[Dict]:
"""Findet ineffiziente Queries."""
inefficient = []
for query in self.queries:
query_text = query['query']
# Pattern für ineffiziente Queries
if '.all()' in query_text and 'limit' not in query_text.lower():
inefficient.append({
'file': query['file'],
'line': query['line'],
'query': query_text,
'issue': 'Lädt alle Datensätze ohne LIMIT'
})
# Doppelte Queries
if 'query(Printer).count()' in query_text:
inefficient.append({
'file': query['file'],
'line': query['line'],
'query': query_text,
'issue': 'Doppelte COUNT-Query'
})
return inefficient
def generate_report(self) -> str:
"""Generiert einen Analysebericht."""
report = []
report.append("# Datenbankanalyse für MYP-Backend")
report.append("=" * 50)
report.append("")
# Modell-Übersicht
report.append("## 📊 Modell-Übersicht")
report.append(f"Gefundene Modelle: {len(self.models)}")
for model, details in self.models.items():
usage_count = len(self.model_usage[model])
report.append(f"- **{model}**: {len(details.get('fields', []))} Felder, {usage_count} Nutzungen")
report.append("")
# Ungenutzte Modelle
unused_models = self.find_unused_models()
if unused_models:
report.append("## ⚠️ Ungenutzte Modelle")
for model in unused_models:
report.append(f"- **{model}**: Wird nirgends importiert oder verwendet")
report.append("")
# Ungenutzte Felder
unused_fields = self.find_unused_fields()
if unused_fields:
report.append("## 🔍 Potentiell ungenutzte Felder")
for model, fields in unused_fields.items():
report.append(f"- **{model}**: {', '.join(fields)}")
report.append("")
# Fehlende Indizes
missing_indexes = self.find_missing_indexes()
if missing_indexes:
report.append("## 📈 Empfohlene Indizes")
for index in missing_indexes:
report.append(f"- **{index['model']}.{index['field']}**: {index['reason']}")
report.append("")
# N+1 Probleme
n_plus_one = self.find_n_plus_one_queries()
if n_plus_one:
report.append("## 🐌 Potentielle N+1 Query-Probleme")
for issue in n_plus_one[:10]: # Nur erste 10
report.append(f"- **{issue['file']}:{issue['line']}**: {issue['query']}")
report.append("")
# Ineffiziente Queries
inefficient = self.find_inefficient_queries()
if inefficient:
report.append("## ⚡ Ineffiziente Queries")
for issue in inefficient:
report.append(f"- **{issue['file']}:{issue['line']}**: {issue['issue']}")
report.append("")
# Raspberry Pi Empfehlungen
report.append("## 🥧 Raspberry Pi Performance-Empfehlungen")
report.append("- **SQLite WAL-Modus**: Bereits konfiguriert (aber deaktiviert für WSL2)")
report.append("- **Cache-Größe**: Auf 32MB reduziert für Pi")
report.append("- **Memory-Mapped I/O**: Auf 128MB reduziert")
report.append("- **Eager Loading**: Verwende `joinedload()` für Relationships")
report.append("- **Pagination**: Implementiere LIMIT/OFFSET für große Datensätze")
report.append("- **Connection Pooling**: Bereits mit StaticPool konfiguriert")
report.append("")
return "\n".join(report)
def run_analysis(self):
"""Führt die komplette Analyse durch."""
print("🚀 Starte Datenbankanalyse...")
self.analyze_models()
self.analyze_usage()
print("📝 Generiere Bericht...")
report = self.generate_report()
# Bericht speichern
report_file = BACKEND_DIR / "database_analysis_report.md"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"✅ Analyse abgeschlossen! Bericht gespeichert: {report_file}")
return report
if __name__ == "__main__":
analyzer = DatabaseAnalyzer()
report = analyzer.run_analysis()
print("\n" + "="*50)
print(report)