#!/usr/bin/env python3 """ Rate Limiting System für MYP Platform Schutz vor API-Missbrauch und DDoS-Attacken """ import time import redis import hashlib from functools import wraps from flask import request, jsonify, g from typing import Dict, Optional from dataclasses import dataclass from utils.logging_config import get_logger logger = get_logger("security") @dataclass class RateLimit: """Konfiguration für Rate-Limiting-Regeln""" requests: int # Anzahl erlaubter Anfragen per: int # Zeitraum in Sekunden message: str # Fehlermeldung bei Überschreitung # Rate-Limiting-Konfiguration RATE_LIMITS = { # API-Endpunkte 'api_general': RateLimit(100, 300, "Zu viele API-Anfragen. Versuchen Sie es in 5 Minuten erneut."), 'api_auth': RateLimit(10, 300, "Zu viele Anmeldeversuche. Versuchen Sie es in 5 Minuten erneut."), 'api_upload': RateLimit(20, 3600, "Zu viele Upload-Anfragen. Versuchen Sie es in einer Stunde erneut."), 'api_admin': RateLimit(200, 300, "Zu viele Admin-Anfragen. Versuchen Sie es in 5 Minuten erneut."), # Spezielle Endpunkte 'printer_status': RateLimit(300, 300, "Zu viele Drucker-Status-Anfragen."), 'job_creation': RateLimit(50, 3600, "Zu viele Job-Erstellungen. Versuchen Sie es in einer Stunde erneut."), # Drucker-Monitor Rate-Limits 'printer_monitor_live': RateLimit(5, 60, "Zu viele Live-Status-Anfragen. Versuchen Sie es in einer Minute erneut."), 'printer_monitor_summary': RateLimit(10, 30, "Zu viele Zusammenfassungs-Anfragen. Versuchen Sie es in 30 Sekunden erneut."), 'printer_monitor_cache': RateLimit(3, 120, "Zu viele Cache-Lösch-Anfragen. Versuchen Sie es in 2 Minuten erneut."), 'printer_monitor_init': RateLimit(2, 300, "Zu viele Initialisierungs-Anfragen. Versuchen Sie es in 5 Minuten erneut."), # Sicherheitskritische Endpunkte 'password_reset': RateLimit(3, 3600, "Zu viele Passwort-Reset-Anfragen. Versuchen Sie es in einer Stunde erneut."), 'user_creation': RateLimit(10, 3600, "Zu viele Benutzer-Erstellungen.") } class RateLimiter: """ In-Memory Rate Limiter mit optionaler Redis-Unterstützung """ def __init__(self, use_redis: bool = False, redis_url: str = None): self.use_redis = use_redis self.redis_client = None self.memory_store: Dict[str, Dict] = {} if use_redis and redis_url: try: import redis self.redis_client = redis.from_url(redis_url, decode_responses=True) logger.info("✅ Redis-basiertes Rate Limiting aktiviert") except ImportError: logger.warning("⚠️ Redis nicht verfügbar, verwende In-Memory Rate Limiting") self.use_redis = False except Exception as e: logger.error(f"❌ Redis-Verbindung fehlgeschlagen: {e}") self.use_redis = False def _get_client_id(self) -> str: """ Generiert eine eindeutige Client-ID basierend auf IP und User-Agent """ ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr) user_agent = request.headers.get('User-Agent', '') # Hash für Anonymisierung client_string = f"{ip}:{user_agent}" return hashlib.sha256(client_string.encode()).hexdigest()[:16] def _get_key(self, limit_type: str, client_id: str) -> str: """Erstellt Redis/Memory-Key für Rate-Limiting""" return f"rate_limit:{limit_type}:{client_id}" def _get_current_requests(self, key: str, window_start: int) -> int: """Holt aktuelle Anfragen-Anzahl""" if self.use_redis and self.redis_client: try: # Redis-basierte Implementierung pipe = self.redis_client.pipeline() pipe.zremrangebyscore(key, 0, window_start) pipe.zcard(key) _, count = pipe.execute() return count except Exception as e: logger.error(f"Redis-Fehler: {e}, fallback zu Memory") self.use_redis = False # In-Memory Implementierung if key not in self.memory_store: self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()} # Alte Einträge bereinigen current_time = time.time() data = self.memory_store[key] data['requests'] = [req_time for req_time in data['requests'] if req_time > window_start] return len(data['requests']) def _add_request(self, key: str, current_time: int, expire_time: int): """Fügt neue Anfrage hinzu""" if self.use_redis and self.redis_client: try: pipe = self.redis_client.pipeline() pipe.zadd(key, {str(current_time): current_time}) pipe.expire(key, expire_time) pipe.execute() return except Exception as e: logger.error(f"Redis-Fehler: {e}, fallback zu Memory") self.use_redis = False # In-Memory Implementierung if key not in self.memory_store: self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()} self.memory_store[key]['requests'].append(current_time) def is_allowed(self, limit_type: str) -> tuple[bool, Dict]: """ Prüft ob eine Anfrage erlaubt ist Returns: (is_allowed, info_dict) """ if limit_type not in RATE_LIMITS: return True, {} rate_limit = RATE_LIMITS[limit_type] client_id = self._get_client_id() key = self._get_key(limit_type, client_id) current_time = int(time.time()) window_start = current_time - rate_limit.per # Aktuelle Anfragen zählen current_requests = self._get_current_requests(key, window_start) # Limite prüfen if current_requests >= rate_limit.requests: logger.warning(f"🚨 Rate limit exceeded: {limit_type} für Client {client_id[:8]}...") return False, { 'limit': rate_limit.requests, 'remaining': 0, 'reset_time': current_time + rate_limit.per, 'message': rate_limit.message } # Anfrage hinzufügen self._add_request(key, current_time, rate_limit.per) return True, { 'limit': rate_limit.requests, 'remaining': rate_limit.requests - current_requests - 1, 'reset_time': current_time + rate_limit.per } def cleanup_memory(self): """Bereinigt alte In-Memory-Einträge""" if self.use_redis: return current_time = time.time() keys_to_delete = [] for key, data in self.memory_store.items(): # Bereinige alle Einträge älter als 24 Stunden if current_time - data.get('last_cleanup', 0) > 86400: keys_to_delete.append(key) for key in keys_to_delete: del self.memory_store[key] # Globale Rate-Limiter-Instanz rate_limiter = RateLimiter() def limit_requests(limit_type: str): """ Decorator für Rate-Limiting von API-Endpunkten Args: limit_type: Art des Limits (siehe RATE_LIMITS) """ def decorator(f): @wraps(f) def wrapper(*args, **kwargs): # Rate-Limiting prüfen is_allowed, info = rate_limiter.is_allowed(limit_type) if not is_allowed: response = jsonify({ 'error': 'Rate limit exceeded', 'message': info['message'], 'retry_after': info['reset_time'] - int(time.time()) }) response.status_code = 429 response.headers['Retry-After'] = str(info['reset_time'] - int(time.time())) response.headers['X-RateLimit-Limit'] = str(info['limit']) response.headers['X-RateLimit-Remaining'] = str(info['remaining']) response.headers['X-RateLimit-Reset'] = str(info['reset_time']) return response # Rate-Limiting-Headers zu Response hinzufügen response = f(*args, **kwargs) if hasattr(response, 'headers'): response.headers['X-RateLimit-Limit'] = str(info['limit']) response.headers['X-RateLimit-Remaining'] = str(info['remaining']) response.headers['X-RateLimit-Reset'] = str(info['reset_time']) return response return wrapper return decorator def get_client_info() -> Dict: """ Gibt Client-Informationen für Rate-Limiting zurück """ client_id = rate_limiter._get_client_id() ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr) return { 'client_id': client_id, 'ip_address': ip, 'user_agent': request.headers.get('User-Agent', ''), 'timestamp': int(time.time()) } # Maintenance-Task für Memory-Cleanup def cleanup_rate_limiter(): """Periodische Bereinigung des Rate-Limiters""" rate_limiter.cleanup_memory() logger.debug("🧹 Rate-Limiter Memory bereinigt")