2025-05-31 22:40:29 +02:00

667 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Erweiterte Analytik und Statistiken für MYP Platform
Umfassende Datenanalyse, Berichte und KPI-Tracking
"""
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from sqlalchemy import func, desc, and_, or_, extract
from sqlalchemy.orm import Session
from dataclasses import dataclass, asdict
from enum import Enum
from utils.logging_config import get_logger
logger = get_logger("analytics")
# ===== ANALYTICS ENUMS =====
class MetricType(Enum):
"""Typen von Metriken"""
COUNTER = "counter" # Zähler (erhöht sich)
GAUGE = "gauge" # Momentanwert
HISTOGRAM = "histogram" # Verteilung von Werten
RATE = "rate" # Rate über Zeit
class TimeRange(Enum):
"""Zeiträume für Analysen"""
HOUR = "hour"
DAY = "day"
WEEK = "week"
MONTH = "month"
QUARTER = "quarter"
YEAR = "year"
CUSTOM = "custom"
class ReportFormat(Enum):
"""Ausgabeformate für Berichte"""
JSON = "json"
CSV = "csv"
PDF = "pdf"
EXCEL = "excel"
# ===== DATA CLASSES =====
@dataclass
class Metric:
"""Einzelne Metrik"""
name: str
value: float
unit: str
timestamp: datetime
tags: Dict[str, str] = None
def to_dict(self) -> Dict:
result = asdict(self)
result['timestamp'] = self.timestamp.isoformat()
return result
@dataclass
class AnalyticsData:
"""Container für Analytik-Daten"""
metrics: List[Metric]
timerange: TimeRange
start_date: datetime
end_date: datetime
filters: Dict[str, Any] = None
def to_dict(self) -> Dict:
return {
'metrics': [m.to_dict() for m in self.metrics],
'timerange': self.timerange.value,
'start_date': self.start_date.isoformat(),
'end_date': self.end_date.isoformat(),
'filters': self.filters or {}
}
@dataclass
class KPI:
"""Key Performance Indicator"""
name: str
current_value: float
previous_value: float
target_value: float
unit: str
trend: str # "up", "down", "stable"
change_percent: float
def to_dict(self) -> Dict:
return asdict(self)
# ===== ANALYTICS ENGINE =====
class AnalyticsEngine:
"""Hauptklasse für Analytik und Statistiken"""
def __init__(self):
self.cache = {}
self.cache_timeout = timedelta(minutes=10)
def get_printer_statistics(self, time_range: TimeRange = TimeRange.MONTH,
start_date: datetime = None, end_date: datetime = None) -> Dict:
"""
Drucker-Statistiken abrufen
Args:
time_range: Zeitraum für Analyse
start_date: Startdatum (optional)
end_date: Enddatum (optional)
Returns:
Dict: Drucker-Statistiken
"""
try:
from models import get_db_session, Printer, Job
if not start_date or not end_date:
start_date, end_date = self._get_date_range(time_range)
db_session = get_db_session()
# Basis-Statistiken
total_printers = db_session.query(Printer).filter(Printer.active == True).count()
online_printers = db_session.query(Printer).filter(
and_(Printer.active == True, Printer.status.in_(["online", "idle"]))
).count()
# Auslastung nach Druckern
printer_usage = db_session.query(
Printer.name,
func.count(Job.id).label('job_count'),
func.sum(Job.duration_minutes).label('total_duration')
).outerjoin(Job, and_(
Job.printer_id == Printer.id,
Job.created_at.between(start_date, end_date)
)).group_by(Printer.id, Printer.name).all()
# Status-Verteilung
status_distribution = db_session.query(
Printer.status,
func.count(Printer.id).label('count')
).filter(Printer.active == True).group_by(Printer.status).all()
# Durchschnittliche Verfügbarkeit
availability_stats = self._calculate_printer_availability(db_session, start_date, end_date)
db_session.close()
return {
'summary': {
'total_printers': total_printers,
'online_printers': online_printers,
'availability_rate': round((online_printers / total_printers * 100) if total_printers > 0 else 0, 1)
},
'usage_by_printer': [
{
'name': usage.name,
'jobs': usage.job_count or 0,
'total_hours': round((usage.total_duration or 0) / 60, 1),
'utilization_rate': self._calculate_utilization_rate(usage.total_duration, start_date, end_date)
}
for usage in printer_usage
],
'status_distribution': [
{'status': status.status, 'count': status.count}
for status in status_distribution
],
'availability': availability_stats,
'time_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat(),
'type': time_range.value
}
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der Drucker-Statistiken: {e}")
return {'error': str(e)}
def get_job_statistics(self, time_range: TimeRange = TimeRange.MONTH,
start_date: datetime = None, end_date: datetime = None) -> Dict:
"""
Job-Statistiken abrufen
Args:
time_range: Zeitraum für Analyse
start_date: Startdatum (optional)
end_date: Enddatum (optional)
Returns:
Dict: Job-Statistiken
"""
try:
from models import get_db_session, Job, User
if not start_date or not end_date:
start_date, end_date = self._get_date_range(time_range)
db_session = get_db_session()
# Basis-Statistiken
base_query = db_session.query(Job).filter(
Job.created_at.between(start_date, end_date)
)
total_jobs = base_query.count()
completed_jobs = base_query.filter(Job.status == 'completed').count()
failed_jobs = base_query.filter(Job.status == 'failed').count()
cancelled_jobs = base_query.filter(Job.status == 'cancelled').count()
# Status-Verteilung
status_distribution = db_session.query(
Job.status,
func.count(Job.id).label('count')
).filter(
Job.created_at.between(start_date, end_date)
).group_by(Job.status).all()
# Durchschnittliche Job-Dauer
avg_duration = db_session.query(
func.avg(Job.duration_minutes)
).filter(
and_(
Job.created_at.between(start_date, end_date),
Job.status == 'completed'
)
).scalar() or 0
# Top-Benutzer
top_users = db_session.query(
User.username,
User.name,
func.count(Job.id).label('job_count'),
func.sum(Job.duration_minutes).label('total_duration')
).join(Job).filter(
Job.created_at.between(start_date, end_date)
).group_by(User.id, User.username, User.name).order_by(
desc('job_count')
).limit(10).all()
# Jobs über Zeit (täglich)
daily_jobs = self._get_daily_job_trend(db_session, start_date, end_date)
# Material-Verbrauch (falls verfügbar)
material_usage = db_session.query(
func.sum(Job.material_used)
).filter(
and_(
Job.created_at.between(start_date, end_date),
Job.material_used.isnot(None)
)
).scalar() or 0
db_session.close()
success_rate = round((completed_jobs / total_jobs * 100) if total_jobs > 0 else 0, 1)
return {
'summary': {
'total_jobs': total_jobs,
'completed_jobs': completed_jobs,
'failed_jobs': failed_jobs,
'cancelled_jobs': cancelled_jobs,
'success_rate': success_rate,
'avg_duration_hours': round(avg_duration / 60, 1),
'total_material_g': round(material_usage, 1)
},
'status_distribution': [
{'status': status.status, 'count': status.count}
for status in status_distribution
],
'top_users': [
{
'username': user.username,
'name': user.name,
'jobs': user.job_count,
'total_hours': round((user.total_duration or 0) / 60, 1)
}
for user in top_users
],
'daily_trend': daily_jobs,
'time_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat(),
'type': time_range.value
}
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der Job-Statistiken: {e}")
return {'error': str(e)}
def get_user_statistics(self, time_range: TimeRange = TimeRange.MONTH,
start_date: datetime = None, end_date: datetime = None) -> Dict:
"""
Benutzer-Statistiken abrufen
Args:
time_range: Zeitraum für Analyse
start_date: Startdatum (optional)
end_date: Enddatum (optional)
Returns:
Dict: Benutzer-Statistiken
"""
try:
from models import get_db_session, User, Job
if not start_date or not end_date:
start_date, end_date = self._get_date_range(time_range)
db_session = get_db_session()
# Basis-Statistiken
total_users = db_session.query(User).filter(User.active == True).count()
active_users = db_session.query(func.distinct(Job.user_id)).filter(
Job.created_at.between(start_date, end_date)
).count()
# Neue Benutzer im Zeitraum
new_users = db_session.query(User).filter(
and_(
User.created_at.between(start_date, end_date),
User.active == True
)
).count()
# Benutzer-Aktivität
user_activity = db_session.query(
User.username,
User.name,
func.count(Job.id).label('jobs'),
func.max(Job.created_at).label('last_activity'),
func.sum(Job.duration_minutes).label('total_duration')
).outerjoin(Job, and_(
Job.user_id == User.id,
Job.created_at.between(start_date, end_date)
)).filter(User.active == True).group_by(
User.id, User.username, User.name
).all()
# Rollenverteilung
role_distribution = db_session.query(
User.role,
func.count(User.id).label('count')
).filter(User.active == True).group_by(User.role).all()
db_session.close()
# Engagement-Rate berechnen
engagement_rate = round((active_users / total_users * 100) if total_users > 0 else 0, 1)
return {
'summary': {
'total_users': total_users,
'active_users': active_users,
'new_users': new_users,
'engagement_rate': engagement_rate
},
'role_distribution': [
{'role': role.role or 'user', 'count': role.count}
for role in role_distribution
],
'user_activity': [
{
'username': user.username,
'name': user.name,
'jobs': user.jobs or 0,
'last_activity': user.last_activity.isoformat() if user.last_activity else None,
'total_hours': round((user.total_duration or 0) / 60, 1)
}
for user in user_activity
],
'time_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat(),
'type': time_range.value
}
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der Benutzer-Statistiken: {e}")
return {'error': str(e)}
def get_system_kpis(self, time_range: TimeRange = TimeRange.MONTH) -> Dict:
"""
System-KPIs abrufen
Args:
time_range: Zeitraum für Vergleich
Returns:
Dict: KPI-Daten
"""
try:
current_start, current_end = self._get_date_range(time_range)
previous_start, previous_end = self._get_previous_period(current_start, current_end)
# Aktuelle Periode
current_printer_stats = self.get_printer_statistics(TimeRange.CUSTOM, current_start, current_end)
current_job_stats = self.get_job_statistics(TimeRange.CUSTOM, current_start, current_end)
current_user_stats = self.get_user_statistics(TimeRange.CUSTOM, current_start, current_end)
# Vorherige Periode
previous_printer_stats = self.get_printer_statistics(TimeRange.CUSTOM, previous_start, previous_end)
previous_job_stats = self.get_job_statistics(TimeRange.CUSTOM, previous_start, previous_end)
previous_user_stats = self.get_user_statistics(TimeRange.CUSTOM, previous_start, previous_end)
# KPIs berechnen
kpis = [
self._create_kpi(
name="Drucker-Verfügbarkeit",
current=current_printer_stats['summary']['availability_rate'],
previous=previous_printer_stats['summary']['availability_rate'],
target=95.0,
unit="%"
),
self._create_kpi(
name="Job-Erfolgsrate",
current=current_job_stats['summary']['success_rate'],
previous=previous_job_stats['summary']['success_rate'],
target=90.0,
unit="%"
),
self._create_kpi(
name="Aktive Benutzer",
current=current_user_stats['summary']['active_users'],
previous=previous_user_stats['summary']['active_users'],
target=50,
unit="Benutzer"
),
self._create_kpi(
name="Durchschnittliche Job-Dauer",
current=current_job_stats['summary']['avg_duration_hours'],
previous=previous_job_stats['summary']['avg_duration_hours'],
target=4.0,
unit="Stunden"
),
self._create_kpi(
name="Material-Verbrauch",
current=current_job_stats['summary']['total_material_g'],
previous=previous_job_stats['summary']['total_material_g'],
target=10000,
unit="g"
)
]
return {
'kpis': [kpi.to_dict() for kpi in kpis],
'period': {
'current': {
'start': current_start.isoformat(),
'end': current_end.isoformat()
},
'previous': {
'start': previous_start.isoformat(),
'end': previous_end.isoformat()
}
}
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der System-KPIs: {e}")
return {'error': str(e)}
def generate_report(self, report_type: str, time_range: TimeRange = TimeRange.MONTH,
format: ReportFormat = ReportFormat.JSON, **kwargs) -> Dict:
"""
Bericht generieren
Args:
report_type: Art des Berichts
time_range: Zeitraum
format: Ausgabeformat
**kwargs: Zusätzliche Parameter
Returns:
Dict: Bericht-Daten
"""
try:
start_date = kwargs.get('start_date')
end_date = kwargs.get('end_date')
if not start_date or not end_date:
start_date, end_date = self._get_date_range(time_range)
if report_type == "comprehensive":
return self._generate_comprehensive_report(start_date, end_date, format)
elif report_type == "printer_usage":
return self._generate_printer_usage_report(start_date, end_date, format)
elif report_type == "user_activity":
return self._generate_user_activity_report(start_date, end_date, format)
elif report_type == "efficiency":
return self._generate_efficiency_report(start_date, end_date, format)
else:
raise ValueError(f"Unbekannter Berichtstyp: {report_type}")
except Exception as e:
logger.error(f"Fehler beim Generieren des Berichts: {e}")
return {'error': str(e)}
# ===== HELPER METHODS =====
def _get_date_range(self, time_range: TimeRange) -> Tuple[datetime, datetime]:
"""Berechnet Datumsbereich basierend auf TimeRange"""
end_date = datetime.now()
if time_range == TimeRange.HOUR:
start_date = end_date - timedelta(hours=1)
elif time_range == TimeRange.DAY:
start_date = end_date - timedelta(days=1)
elif time_range == TimeRange.WEEK:
start_date = end_date - timedelta(weeks=1)
elif time_range == TimeRange.MONTH:
start_date = end_date - timedelta(days=30)
elif time_range == TimeRange.QUARTER:
start_date = end_date - timedelta(days=90)
elif time_range == TimeRange.YEAR:
start_date = end_date - timedelta(days=365)
else:
start_date = end_date - timedelta(days=30) # Default
return start_date, end_date
def _get_previous_period(self, start_date: datetime, end_date: datetime) -> Tuple[datetime, datetime]:
"""Berechnet vorherige Periode für Vergleiche"""
duration = end_date - start_date
previous_end = start_date
previous_start = previous_end - duration
return previous_start, previous_end
def _create_kpi(self, name: str, current: float, previous: float,
target: float, unit: str) -> KPI:
"""Erstellt KPI-Objekt mit Berechnungen"""
if previous > 0:
change_percent = round(((current - previous) / previous) * 100, 1)
else:
change_percent = 0.0
if abs(change_percent) < 1:
trend = "stable"
elif change_percent > 0:
trend = "up"
else:
trend = "down"
return KPI(
name=name,
current_value=current,
previous_value=previous,
target_value=target,
unit=unit,
trend=trend,
change_percent=change_percent
)
def _calculate_printer_availability(self, db_session: Session,
start_date: datetime, end_date: datetime) -> Dict:
"""Berechnet Drucker-Verfügbarkeit"""
# Vereinfachte Berechnung - kann erweitert werden
from models import Printer
total_printers = db_session.query(Printer).filter(Printer.active == True).count()
online_printers = db_session.query(Printer).filter(
and_(Printer.active == True, Printer.status.in_(["online", "idle"]))
).count()
availability_rate = round((online_printers / total_printers * 100) if total_printers > 0 else 0, 1)
return {
'total_printers': total_printers,
'online_printers': online_printers,
'availability_rate': availability_rate,
'downtime_hours': 0 # Placeholder - kann mit detaillierter Logging berechnet werden
}
def _calculate_utilization_rate(self, total_minutes: int,
start_date: datetime, end_date: datetime) -> float:
"""Berechnet Auslastungsrate"""
if not total_minutes:
return 0.0
total_hours = (end_date - start_date).total_seconds() / 3600
utilization_rate = (total_minutes / 60) / total_hours * 100
return round(min(utilization_rate, 100), 1)
def _get_daily_job_trend(self, db_session: Session,
start_date: datetime, end_date: datetime) -> List[Dict]:
"""Holt tägliche Job-Trends"""
from models import Job
daily_jobs = db_session.query(
func.date(Job.created_at).label('date'),
func.count(Job.id).label('count')
).filter(
Job.created_at.between(start_date, end_date)
).group_by(
func.date(Job.created_at)
).order_by('date').all()
return [
{
'date': job.date.isoformat(),
'jobs': job.count
}
for job in daily_jobs
]
def _generate_comprehensive_report(self, start_date: datetime,
end_date: datetime, format: ReportFormat) -> Dict:
"""Generiert umfassenden Bericht"""
printer_stats = self.get_printer_statistics(TimeRange.CUSTOM, start_date, end_date)
job_stats = self.get_job_statistics(TimeRange.CUSTOM, start_date, end_date)
user_stats = self.get_user_statistics(TimeRange.CUSTOM, start_date, end_date)
kpis = self.get_system_kpis(TimeRange.CUSTOM)
report = {
'title': 'Umfassender System-Bericht',
'generated_at': datetime.now().isoformat(),
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'summary': {
'total_jobs': job_stats['summary']['total_jobs'],
'success_rate': job_stats['summary']['success_rate'],
'active_users': user_stats['summary']['active_users'],
'printer_availability': printer_stats['summary']['availability_rate']
},
'sections': {
'printers': printer_stats,
'jobs': job_stats,
'users': user_stats,
'kpis': kpis
}
}
if format == ReportFormat.JSON:
return report
else:
# Für andere Formate würde hier die Konvertierung stattfinden
return {'error': f'Format {format.value} noch nicht implementiert'}
# ===== GLOBALE INSTANZ =====
analytics_engine = AnalyticsEngine()
# ===== UTILITY FUNCTIONS =====
def get_dashboard_stats() -> Dict:
"""Schnelle Dashboard-Statistiken"""
return analytics_engine.get_system_kpis(TimeRange.DAY)
def export_statistics(report_type: str, time_range: TimeRange, format: ReportFormat = ReportFormat.JSON) -> Dict:
"""Exportiert Statistiken in verschiedenen Formaten"""
return analytics_engine.generate_report(report_type, time_range, format)
def track_event(event_name: str, properties: Dict = None):
"""Verfolgt Events für Analytik"""
try:
logger.info(f"📊 Event tracked: {event_name} - {properties or {}}")
# Hier könnte Event-Tracking implementiert werden
except Exception as e:
logger.error(f"Fehler beim Event-Tracking: {e}")
# Logging für Analytics-System
logger.info("📈 Analytics Engine initialisiert")