#!/usr/bin/env python3 """ Erweiterte Analytik und Statistiken für MYP Platform Umfassende Datenanalyse, Berichte und KPI-Tracking """ import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from sqlalchemy import func, desc, and_, or_, extract from sqlalchemy.orm import Session from dataclasses import dataclass, asdict from enum import Enum from utils.logging_config import get_logger logger = get_logger("analytics") # ===== ANALYTICS ENUMS ===== class MetricType(Enum): """Typen von Metriken""" COUNTER = "counter" # Zähler (erhöht sich) GAUGE = "gauge" # Momentanwert HISTOGRAM = "histogram" # Verteilung von Werten RATE = "rate" # Rate über Zeit class TimeRange(Enum): """Zeiträume für Analysen""" HOUR = "hour" DAY = "day" WEEK = "week" MONTH = "month" QUARTER = "quarter" YEAR = "year" CUSTOM = "custom" class ReportFormat(Enum): """Ausgabeformate für Berichte""" JSON = "json" CSV = "csv" PDF = "pdf" EXCEL = "excel" # ===== DATA CLASSES ===== @dataclass class Metric: """Einzelne Metrik""" name: str value: float unit: str timestamp: datetime tags: Dict[str, str] = None def to_dict(self) -> Dict: result = asdict(self) result['timestamp'] = self.timestamp.isoformat() return result @dataclass class AnalyticsData: """Container für Analytik-Daten""" metrics: List[Metric] timerange: TimeRange start_date: datetime end_date: datetime filters: Dict[str, Any] = None def to_dict(self) -> Dict: return { 'metrics': [m.to_dict() for m in self.metrics], 'timerange': self.timerange.value, 'start_date': self.start_date.isoformat(), 'end_date': self.end_date.isoformat(), 'filters': self.filters or {} } @dataclass class KPI: """Key Performance Indicator""" name: str current_value: float previous_value: float target_value: float unit: str trend: str # "up", "down", "stable" change_percent: float def to_dict(self) -> Dict: return asdict(self) # ===== ANALYTICS ENGINE ===== class AnalyticsEngine: """Hauptklasse für Analytik und Statistiken""" def __init__(self): self.cache = {} self.cache_timeout = timedelta(minutes=10) def get_printer_statistics(self, time_range: TimeRange = TimeRange.MONTH, start_date: datetime = None, end_date: datetime = None) -> Dict: """ Drucker-Statistiken abrufen Args: time_range: Zeitraum für Analyse start_date: Startdatum (optional) end_date: Enddatum (optional) Returns: Dict: Drucker-Statistiken """ try: from models import get_db_session, Printer, Job if not start_date or not end_date: start_date, end_date = self._get_date_range(time_range) db_session = get_db_session() # Basis-Statistiken total_printers = db_session.query(Printer).filter(Printer.active == True).count() online_printers = db_session.query(Printer).filter( and_(Printer.active == True, Printer.status.in_(["online", "idle"])) ).count() # Auslastung nach Druckern printer_usage = db_session.query( Printer.name, func.count(Job.id).label('job_count'), func.sum(Job.duration_minutes).label('total_duration') ).outerjoin(Job, and_( Job.printer_id == Printer.id, Job.created_at.between(start_date, end_date) )).group_by(Printer.id, Printer.name).all() # Status-Verteilung status_distribution = db_session.query( Printer.status, func.count(Printer.id).label('count') ).filter(Printer.active == True).group_by(Printer.status).all() # Durchschnittliche Verfügbarkeit availability_stats = self._calculate_printer_availability(db_session, start_date, end_date) db_session.close() return { 'summary': { 'total_printers': total_printers, 'online_printers': online_printers, 'availability_rate': round((online_printers / total_printers * 100) if total_printers > 0 else 0, 1) }, 'usage_by_printer': [ { 'name': usage.name, 'jobs': usage.job_count or 0, 'total_hours': round((usage.total_duration or 0) / 60, 1), 'utilization_rate': self._calculate_utilization_rate(usage.total_duration, start_date, end_date) } for usage in printer_usage ], 'status_distribution': [ {'status': status.status, 'count': status.count} for status in status_distribution ], 'availability': availability_stats, 'time_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat(), 'type': time_range.value } } except Exception as e: logger.error(f"Fehler beim Abrufen der Drucker-Statistiken: {e}") return {'error': str(e)} def get_job_statistics(self, time_range: TimeRange = TimeRange.MONTH, start_date: datetime = None, end_date: datetime = None) -> Dict: """ Job-Statistiken abrufen Args: time_range: Zeitraum für Analyse start_date: Startdatum (optional) end_date: Enddatum (optional) Returns: Dict: Job-Statistiken """ try: from models import get_db_session, Job, User if not start_date or not end_date: start_date, end_date = self._get_date_range(time_range) db_session = get_db_session() # Basis-Statistiken base_query = db_session.query(Job).filter( Job.created_at.between(start_date, end_date) ) total_jobs = base_query.count() completed_jobs = base_query.filter(Job.status == 'completed').count() failed_jobs = base_query.filter(Job.status == 'failed').count() cancelled_jobs = base_query.filter(Job.status == 'cancelled').count() # Status-Verteilung status_distribution = db_session.query( Job.status, func.count(Job.id).label('count') ).filter( Job.created_at.between(start_date, end_date) ).group_by(Job.status).all() # Durchschnittliche Job-Dauer avg_duration = db_session.query( func.avg(Job.duration_minutes) ).filter( and_( Job.created_at.between(start_date, end_date), Job.status == 'completed' ) ).scalar() or 0 # Top-Benutzer top_users = db_session.query( User.username, User.name, func.count(Job.id).label('job_count'), func.sum(Job.duration_minutes).label('total_duration') ).join(Job).filter( Job.created_at.between(start_date, end_date) ).group_by(User.id, User.username, User.name).order_by( desc('job_count') ).limit(10).all() # Jobs über Zeit (täglich) daily_jobs = self._get_daily_job_trend(db_session, start_date, end_date) # Material-Verbrauch (falls verfügbar) material_usage = db_session.query( func.sum(Job.material_used) ).filter( and_( Job.created_at.between(start_date, end_date), Job.material_used.isnot(None) ) ).scalar() or 0 db_session.close() success_rate = round((completed_jobs / total_jobs * 100) if total_jobs > 0 else 0, 1) return { 'summary': { 'total_jobs': total_jobs, 'completed_jobs': completed_jobs, 'failed_jobs': failed_jobs, 'cancelled_jobs': cancelled_jobs, 'success_rate': success_rate, 'avg_duration_hours': round(avg_duration / 60, 1), 'total_material_g': round(material_usage, 1) }, 'status_distribution': [ {'status': status.status, 'count': status.count} for status in status_distribution ], 'top_users': [ { 'username': user.username, 'name': user.name, 'jobs': user.job_count, 'total_hours': round((user.total_duration or 0) / 60, 1) } for user in top_users ], 'daily_trend': daily_jobs, 'time_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat(), 'type': time_range.value } } except Exception as e: logger.error(f"Fehler beim Abrufen der Job-Statistiken: {e}") return {'error': str(e)} def get_user_statistics(self, time_range: TimeRange = TimeRange.MONTH, start_date: datetime = None, end_date: datetime = None) -> Dict: """ Benutzer-Statistiken abrufen Args: time_range: Zeitraum für Analyse start_date: Startdatum (optional) end_date: Enddatum (optional) Returns: Dict: Benutzer-Statistiken """ try: from models import get_db_session, User, Job if not start_date or not end_date: start_date, end_date = self._get_date_range(time_range) db_session = get_db_session() # Basis-Statistiken total_users = db_session.query(User).filter(User.active == True).count() active_users = db_session.query(func.distinct(Job.user_id)).filter( Job.created_at.between(start_date, end_date) ).count() # Neue Benutzer im Zeitraum new_users = db_session.query(User).filter( and_( User.created_at.between(start_date, end_date), User.active == True ) ).count() # Benutzer-Aktivität user_activity = db_session.query( User.username, User.name, func.count(Job.id).label('jobs'), func.max(Job.created_at).label('last_activity'), func.sum(Job.duration_minutes).label('total_duration') ).outerjoin(Job, and_( Job.user_id == User.id, Job.created_at.between(start_date, end_date) )).filter(User.active == True).group_by( User.id, User.username, User.name ).all() # Rollenverteilung role_distribution = db_session.query( User.role, func.count(User.id).label('count') ).filter(User.active == True).group_by(User.role).all() db_session.close() # Engagement-Rate berechnen engagement_rate = round((active_users / total_users * 100) if total_users > 0 else 0, 1) return { 'summary': { 'total_users': total_users, 'active_users': active_users, 'new_users': new_users, 'engagement_rate': engagement_rate }, 'role_distribution': [ {'role': role.role or 'user', 'count': role.count} for role in role_distribution ], 'user_activity': [ { 'username': user.username, 'name': user.name, 'jobs': user.jobs or 0, 'last_activity': user.last_activity.isoformat() if user.last_activity else None, 'total_hours': round((user.total_duration or 0) / 60, 1) } for user in user_activity ], 'time_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat(), 'type': time_range.value } } except Exception as e: logger.error(f"Fehler beim Abrufen der Benutzer-Statistiken: {e}") return {'error': str(e)} def get_system_kpis(self, time_range: TimeRange = TimeRange.MONTH) -> Dict: """ System-KPIs abrufen Args: time_range: Zeitraum für Vergleich Returns: Dict: KPI-Daten """ try: current_start, current_end = self._get_date_range(time_range) previous_start, previous_end = self._get_previous_period(current_start, current_end) # Aktuelle Periode current_printer_stats = self.get_printer_statistics(TimeRange.CUSTOM, current_start, current_end) current_job_stats = self.get_job_statistics(TimeRange.CUSTOM, current_start, current_end) current_user_stats = self.get_user_statistics(TimeRange.CUSTOM, current_start, current_end) # Vorherige Periode previous_printer_stats = self.get_printer_statistics(TimeRange.CUSTOM, previous_start, previous_end) previous_job_stats = self.get_job_statistics(TimeRange.CUSTOM, previous_start, previous_end) previous_user_stats = self.get_user_statistics(TimeRange.CUSTOM, previous_start, previous_end) # KPIs berechnen kpis = [ self._create_kpi( name="Drucker-Verfügbarkeit", current=current_printer_stats['summary']['availability_rate'], previous=previous_printer_stats['summary']['availability_rate'], target=95.0, unit="%" ), self._create_kpi( name="Job-Erfolgsrate", current=current_job_stats['summary']['success_rate'], previous=previous_job_stats['summary']['success_rate'], target=90.0, unit="%" ), self._create_kpi( name="Aktive Benutzer", current=current_user_stats['summary']['active_users'], previous=previous_user_stats['summary']['active_users'], target=50, unit="Benutzer" ), self._create_kpi( name="Durchschnittliche Job-Dauer", current=current_job_stats['summary']['avg_duration_hours'], previous=previous_job_stats['summary']['avg_duration_hours'], target=4.0, unit="Stunden" ), self._create_kpi( name="Material-Verbrauch", current=current_job_stats['summary']['total_material_g'], previous=previous_job_stats['summary']['total_material_g'], target=10000, unit="g" ) ] return { 'kpis': [kpi.to_dict() for kpi in kpis], 'period': { 'current': { 'start': current_start.isoformat(), 'end': current_end.isoformat() }, 'previous': { 'start': previous_start.isoformat(), 'end': previous_end.isoformat() } } } except Exception as e: logger.error(f"Fehler beim Abrufen der System-KPIs: {e}") return {'error': str(e)} def generate_report(self, report_type: str, time_range: TimeRange = TimeRange.MONTH, format: ReportFormat = ReportFormat.JSON, **kwargs) -> Dict: """ Bericht generieren Args: report_type: Art des Berichts time_range: Zeitraum format: Ausgabeformat **kwargs: Zusätzliche Parameter Returns: Dict: Bericht-Daten """ try: start_date = kwargs.get('start_date') end_date = kwargs.get('end_date') if not start_date or not end_date: start_date, end_date = self._get_date_range(time_range) if report_type == "comprehensive": return self._generate_comprehensive_report(start_date, end_date, format) elif report_type == "printer_usage": return self._generate_printer_usage_report(start_date, end_date, format) elif report_type == "user_activity": return self._generate_user_activity_report(start_date, end_date, format) elif report_type == "efficiency": return self._generate_efficiency_report(start_date, end_date, format) else: raise ValueError(f"Unbekannter Berichtstyp: {report_type}") except Exception as e: logger.error(f"Fehler beim Generieren des Berichts: {e}") return {'error': str(e)} # ===== HELPER METHODS ===== def _get_date_range(self, time_range: TimeRange) -> Tuple[datetime, datetime]: """Berechnet Datumsbereich basierend auf TimeRange""" end_date = datetime.now() if time_range == TimeRange.HOUR: start_date = end_date - timedelta(hours=1) elif time_range == TimeRange.DAY: start_date = end_date - timedelta(days=1) elif time_range == TimeRange.WEEK: start_date = end_date - timedelta(weeks=1) elif time_range == TimeRange.MONTH: start_date = end_date - timedelta(days=30) elif time_range == TimeRange.QUARTER: start_date = end_date - timedelta(days=90) elif time_range == TimeRange.YEAR: start_date = end_date - timedelta(days=365) else: start_date = end_date - timedelta(days=30) # Default return start_date, end_date def _get_previous_period(self, start_date: datetime, end_date: datetime) -> Tuple[datetime, datetime]: """Berechnet vorherige Periode für Vergleiche""" duration = end_date - start_date previous_end = start_date previous_start = previous_end - duration return previous_start, previous_end def _create_kpi(self, name: str, current: float, previous: float, target: float, unit: str) -> KPI: """Erstellt KPI-Objekt mit Berechnungen""" if previous > 0: change_percent = round(((current - previous) / previous) * 100, 1) else: change_percent = 0.0 if abs(change_percent) < 1: trend = "stable" elif change_percent > 0: trend = "up" else: trend = "down" return KPI( name=name, current_value=current, previous_value=previous, target_value=target, unit=unit, trend=trend, change_percent=change_percent ) def _calculate_printer_availability(self, db_session: Session, start_date: datetime, end_date: datetime) -> Dict: """Berechnet Drucker-Verfügbarkeit""" # Vereinfachte Berechnung - kann erweitert werden from models import Printer total_printers = db_session.query(Printer).filter(Printer.active == True).count() online_printers = db_session.query(Printer).filter( and_(Printer.active == True, Printer.status.in_(["online", "idle"])) ).count() availability_rate = round((online_printers / total_printers * 100) if total_printers > 0 else 0, 1) return { 'total_printers': total_printers, 'online_printers': online_printers, 'availability_rate': availability_rate, 'downtime_hours': 0 # Placeholder - kann mit detaillierter Logging berechnet werden } def _calculate_utilization_rate(self, total_minutes: int, start_date: datetime, end_date: datetime) -> float: """Berechnet Auslastungsrate""" if not total_minutes: return 0.0 total_hours = (end_date - start_date).total_seconds() / 3600 utilization_rate = (total_minutes / 60) / total_hours * 100 return round(min(utilization_rate, 100), 1) def _get_daily_job_trend(self, db_session: Session, start_date: datetime, end_date: datetime) -> List[Dict]: """Holt tägliche Job-Trends""" from models import Job daily_jobs = db_session.query( func.date(Job.created_at).label('date'), func.count(Job.id).label('count') ).filter( Job.created_at.between(start_date, end_date) ).group_by( func.date(Job.created_at) ).order_by('date').all() return [ { 'date': job.date.isoformat(), 'jobs': job.count } for job in daily_jobs ] def _generate_comprehensive_report(self, start_date: datetime, end_date: datetime, format: ReportFormat) -> Dict: """Generiert umfassenden Bericht""" printer_stats = self.get_printer_statistics(TimeRange.CUSTOM, start_date, end_date) job_stats = self.get_job_statistics(TimeRange.CUSTOM, start_date, end_date) user_stats = self.get_user_statistics(TimeRange.CUSTOM, start_date, end_date) kpis = self.get_system_kpis(TimeRange.CUSTOM) report = { 'title': 'Umfassender System-Bericht', 'generated_at': datetime.now().isoformat(), 'period': { 'start': start_date.isoformat(), 'end': end_date.isoformat() }, 'summary': { 'total_jobs': job_stats['summary']['total_jobs'], 'success_rate': job_stats['summary']['success_rate'], 'active_users': user_stats['summary']['active_users'], 'printer_availability': printer_stats['summary']['availability_rate'] }, 'sections': { 'printers': printer_stats, 'jobs': job_stats, 'users': user_stats, 'kpis': kpis } } if format == ReportFormat.JSON: return report else: # Für andere Formate würde hier die Konvertierung stattfinden return {'error': f'Format {format.value} noch nicht implementiert'} # ===== GLOBALE INSTANZ ===== analytics_engine = AnalyticsEngine() # ===== UTILITY FUNCTIONS ===== def get_dashboard_stats() -> Dict: """Schnelle Dashboard-Statistiken""" return analytics_engine.get_system_kpis(TimeRange.DAY) def export_statistics(report_type: str, time_range: TimeRange, format: ReportFormat = ReportFormat.JSON) -> Dict: """Exportiert Statistiken in verschiedenen Formaten""" return analytics_engine.generate_report(report_type, time_range, format) def track_event(event_name: str, properties: Dict = None): """Verfolgt Events für Analytik""" try: logger.info(f"📊 Event tracked: {event_name} - {properties or {}}") # Hier könnte Event-Tracking implementiert werden except Exception as e: logger.error(f"Fehler beim Event-Tracking: {e}") # Logging für Analytics-System logger.info("📈 Analytics Engine initialisiert")