manage-your-printer/utils/rate_limiter.py
2025-06-04 10:03:22 +02:00

244 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Rate Limiting System für MYP Platform
Schutz vor API-Missbrauch und DDoS-Attacken
"""
import time
import redis
import hashlib
from functools import wraps
from flask import request, jsonify, g
from typing import Dict, Optional
from dataclasses import dataclass
from utils.logging_config import get_logger
logger = get_logger("security")
@dataclass
class RateLimit:
"""Konfiguration für Rate-Limiting-Regeln"""
requests: int # Anzahl erlaubter Anfragen
per: int # Zeitraum in Sekunden
message: str # Fehlermeldung bei Überschreitung
# Rate-Limiting-Konfiguration
RATE_LIMITS = {
# API-Endpunkte
'api_general': RateLimit(100, 300, "Zu viele API-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
'api_auth': RateLimit(10, 300, "Zu viele Anmeldeversuche. Versuchen Sie es in 5 Minuten erneut."),
'api_upload': RateLimit(20, 3600, "Zu viele Upload-Anfragen. Versuchen Sie es in einer Stunde erneut."),
'api_admin': RateLimit(200, 300, "Zu viele Admin-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
# Spezielle Endpunkte
'printer_status': RateLimit(300, 300, "Zu viele Drucker-Status-Anfragen."),
'job_creation': RateLimit(50, 3600, "Zu viele Job-Erstellungen. Versuchen Sie es in einer Stunde erneut."),
# Drucker-Monitor Rate-Limits (gelockert für Live-Updates)
'printer_monitor_live': RateLimit(30, 60, "Zu viele Live-Status-Anfragen. Versuchen Sie es in einer Minute erneut."),
'printer_monitor_summary': RateLimit(60, 60, "Zu viele Zusammenfassungs-Anfragen. Versuchen Sie es in einer Minute erneut."),
'printer_monitor_cache': RateLimit(10, 120, "Zu viele Cache-Lösch-Anfragen. Versuchen Sie es in 2 Minuten erneut."),
'printer_monitor_init': RateLimit(5, 300, "Zu viele Initialisierungs-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
# Sicherheitskritische Endpunkte
'password_reset': RateLimit(3, 3600, "Zu viele Passwort-Reset-Anfragen. Versuchen Sie es in einer Stunde erneut."),
'user_creation': RateLimit(10, 3600, "Zu viele Benutzer-Erstellungen.")
}
class RateLimiter:
"""
In-Memory Rate Limiter mit optionaler Redis-Unterstützung
"""
def __init__(self, use_redis: bool = False, redis_url: str = None):
self.use_redis = use_redis
self.redis_client = None
self.memory_store: Dict[str, Dict] = {}
if use_redis and redis_url:
try:
import redis
self.redis_client = redis.from_url(redis_url, decode_responses=True)
logger.info("✅ Redis-basiertes Rate Limiting aktiviert")
except ImportError:
logger.warning("⚠️ Redis nicht verfügbar, verwende In-Memory Rate Limiting")
self.use_redis = False
except Exception as e:
logger.error(f"❌ Redis-Verbindung fehlgeschlagen: {e}")
self.use_redis = False
def _get_client_id(self) -> str:
"""
Generiert eine eindeutige Client-ID basierend auf IP und User-Agent
"""
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
user_agent = request.headers.get('User-Agent', '')
# Hash für Anonymisierung
client_string = f"{ip}:{user_agent}"
return hashlib.sha256(client_string.encode()).hexdigest()[:16]
def _get_key(self, limit_type: str, client_id: str) -> str:
"""Erstellt Redis/Memory-Key für Rate-Limiting"""
return f"rate_limit:{limit_type}:{client_id}"
def _get_current_requests(self, key: str, window_start: int) -> int:
"""Holt aktuelle Anfragen-Anzahl"""
if self.use_redis and self.redis_client:
try:
# Redis-basierte Implementierung
pipe = self.redis_client.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
_, count = pipe.execute()
return count
except Exception as e:
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
self.use_redis = False
# In-Memory Implementierung
if key not in self.memory_store:
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
# Alte Einträge bereinigen
current_time = time.time()
data = self.memory_store[key]
data['requests'] = [req_time for req_time in data['requests'] if req_time > window_start]
return len(data['requests'])
def _add_request(self, key: str, current_time: int, expire_time: int):
"""Fügt neue Anfrage hinzu"""
if self.use_redis and self.redis_client:
try:
pipe = self.redis_client.pipeline()
pipe.zadd(key, {str(current_time): current_time})
pipe.expire(key, expire_time)
pipe.execute()
return
except Exception as e:
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
self.use_redis = False
# In-Memory Implementierung
if key not in self.memory_store:
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
self.memory_store[key]['requests'].append(current_time)
def is_allowed(self, limit_type: str) -> tuple[bool, Dict]:
"""
Prüft ob eine Anfrage erlaubt ist
Returns:
(is_allowed, info_dict)
"""
if limit_type not in RATE_LIMITS:
return True, {}
rate_limit = RATE_LIMITS[limit_type]
client_id = self._get_client_id()
key = self._get_key(limit_type, client_id)
current_time = int(time.time())
window_start = current_time - rate_limit.per
# Aktuelle Anfragen zählen
current_requests = self._get_current_requests(key, window_start)
# Limite prüfen
if current_requests >= rate_limit.requests:
logger.warning(f"🚨 Rate limit exceeded: {limit_type} für Client {client_id[:8]}...")
return False, {
'limit': rate_limit.requests,
'remaining': 0,
'reset_time': current_time + rate_limit.per,
'message': rate_limit.message
}
# Anfrage hinzufügen
self._add_request(key, current_time, rate_limit.per)
return True, {
'limit': rate_limit.requests,
'remaining': rate_limit.requests - current_requests - 1,
'reset_time': current_time + rate_limit.per
}
def cleanup_memory(self):
"""Bereinigt alte In-Memory-Einträge"""
if self.use_redis:
return
current_time = time.time()
keys_to_delete = []
for key, data in self.memory_store.items():
# Bereinige alle Einträge älter als 24 Stunden
if current_time - data.get('last_cleanup', 0) > 86400:
keys_to_delete.append(key)
for key in keys_to_delete:
del self.memory_store[key]
# Globale Rate-Limiter-Instanz
rate_limiter = RateLimiter()
def limit_requests(limit_type: str):
"""
Decorator für Rate-Limiting von API-Endpunkten
Args:
limit_type: Art des Limits (siehe RATE_LIMITS)
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Rate-Limiting prüfen
is_allowed, info = rate_limiter.is_allowed(limit_type)
if not is_allowed:
response = jsonify({
'error': 'Rate limit exceeded',
'message': info['message'],
'retry_after': info['reset_time'] - int(time.time())
})
response.status_code = 429
response.headers['Retry-After'] = str(info['reset_time'] - int(time.time()))
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
return response
# Rate-Limiting-Headers zu Response hinzufügen
response = f(*args, **kwargs)
if hasattr(response, 'headers'):
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
return response
return wrapper
return decorator
def get_client_info() -> Dict:
"""
Gibt Client-Informationen für Rate-Limiting zurück
"""
client_id = rate_limiter._get_client_id()
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
return {
'client_id': client_id,
'ip_address': ip,
'user_agent': request.headers.get('User-Agent', ''),
'timestamp': int(time.time())
}
# Maintenance-Task für Memory-Cleanup
def cleanup_rate_limiter():
"""Periodische Bereinigung des Rate-Limiters"""
rate_limiter.cleanup_memory()
logger.debug("🧹 Rate-Limiter Memory bereinigt")