244 lines
9.3 KiB
Python
244 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Rate Limiting System für MYP Platform
|
|
Schutz vor API-Missbrauch und DDoS-Attacken
|
|
"""
|
|
|
|
import time
|
|
import redis
|
|
import hashlib
|
|
from functools import wraps
|
|
from flask import request, jsonify, g
|
|
from typing import Dict, Optional
|
|
from dataclasses import dataclass
|
|
from utils.logging_config import get_logger
|
|
|
|
logger = get_logger("security")
|
|
|
|
@dataclass
|
|
class RateLimit:
|
|
"""Konfiguration für Rate-Limiting-Regeln"""
|
|
requests: int # Anzahl erlaubter Anfragen
|
|
per: int # Zeitraum in Sekunden
|
|
message: str # Fehlermeldung bei Überschreitung
|
|
|
|
# Rate-Limiting-Konfiguration
|
|
RATE_LIMITS = {
|
|
# API-Endpunkte
|
|
'api_general': RateLimit(100, 300, "Zu viele API-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
|
|
'api_auth': RateLimit(10, 300, "Zu viele Anmeldeversuche. Versuchen Sie es in 5 Minuten erneut."),
|
|
'api_upload': RateLimit(20, 3600, "Zu viele Upload-Anfragen. Versuchen Sie es in einer Stunde erneut."),
|
|
'api_admin': RateLimit(200, 300, "Zu viele Admin-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
|
|
|
|
# Spezielle Endpunkte
|
|
'printer_status': RateLimit(300, 300, "Zu viele Drucker-Status-Anfragen."),
|
|
'job_creation': RateLimit(50, 3600, "Zu viele Job-Erstellungen. Versuchen Sie es in einer Stunde erneut."),
|
|
|
|
# Drucker-Monitor Rate-Limits (gelockert für Live-Updates)
|
|
'printer_monitor_live': RateLimit(30, 60, "Zu viele Live-Status-Anfragen. Versuchen Sie es in einer Minute erneut."),
|
|
'printer_monitor_summary': RateLimit(60, 60, "Zu viele Zusammenfassungs-Anfragen. Versuchen Sie es in einer Minute erneut."),
|
|
'printer_monitor_cache': RateLimit(10, 120, "Zu viele Cache-Lösch-Anfragen. Versuchen Sie es in 2 Minuten erneut."),
|
|
'printer_monitor_init': RateLimit(5, 300, "Zu viele Initialisierungs-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
|
|
|
|
# Sicherheitskritische Endpunkte
|
|
'password_reset': RateLimit(3, 3600, "Zu viele Passwort-Reset-Anfragen. Versuchen Sie es in einer Stunde erneut."),
|
|
'user_creation': RateLimit(10, 3600, "Zu viele Benutzer-Erstellungen.")
|
|
}
|
|
|
|
class RateLimiter:
|
|
"""
|
|
In-Memory Rate Limiter mit optionaler Redis-Unterstützung
|
|
"""
|
|
|
|
def __init__(self, use_redis: bool = False, redis_url: str = None):
|
|
self.use_redis = use_redis
|
|
self.redis_client = None
|
|
self.memory_store: Dict[str, Dict] = {}
|
|
|
|
if use_redis and redis_url:
|
|
try:
|
|
import redis
|
|
self.redis_client = redis.from_url(redis_url, decode_responses=True)
|
|
logger.info("✅ Redis-basiertes Rate Limiting aktiviert")
|
|
except ImportError:
|
|
logger.warning("⚠️ Redis nicht verfügbar, verwende In-Memory Rate Limiting")
|
|
self.use_redis = False
|
|
except Exception as e:
|
|
logger.error(f"❌ Redis-Verbindung fehlgeschlagen: {e}")
|
|
self.use_redis = False
|
|
|
|
def _get_client_id(self) -> str:
|
|
"""
|
|
Generiert eine eindeutige Client-ID basierend auf IP und User-Agent
|
|
"""
|
|
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
|
|
user_agent = request.headers.get('User-Agent', '')
|
|
|
|
# Hash für Anonymisierung
|
|
client_string = f"{ip}:{user_agent}"
|
|
return hashlib.sha256(client_string.encode()).hexdigest()[:16]
|
|
|
|
def _get_key(self, limit_type: str, client_id: str) -> str:
|
|
"""Erstellt Redis/Memory-Key für Rate-Limiting"""
|
|
return f"rate_limit:{limit_type}:{client_id}"
|
|
|
|
def _get_current_requests(self, key: str, window_start: int) -> int:
|
|
"""Holt aktuelle Anfragen-Anzahl"""
|
|
if self.use_redis and self.redis_client:
|
|
try:
|
|
# Redis-basierte Implementierung
|
|
pipe = self.redis_client.pipeline()
|
|
pipe.zremrangebyscore(key, 0, window_start)
|
|
pipe.zcard(key)
|
|
_, count = pipe.execute()
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
|
|
self.use_redis = False
|
|
|
|
# In-Memory Implementierung
|
|
if key not in self.memory_store:
|
|
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
|
|
|
|
# Alte Einträge bereinigen
|
|
current_time = time.time()
|
|
data = self.memory_store[key]
|
|
data['requests'] = [req_time for req_time in data['requests'] if req_time > window_start]
|
|
|
|
return len(data['requests'])
|
|
|
|
def _add_request(self, key: str, current_time: int, expire_time: int):
|
|
"""Fügt neue Anfrage hinzu"""
|
|
if self.use_redis and self.redis_client:
|
|
try:
|
|
pipe = self.redis_client.pipeline()
|
|
pipe.zadd(key, {str(current_time): current_time})
|
|
pipe.expire(key, expire_time)
|
|
pipe.execute()
|
|
return
|
|
except Exception as e:
|
|
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
|
|
self.use_redis = False
|
|
|
|
# In-Memory Implementierung
|
|
if key not in self.memory_store:
|
|
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
|
|
|
|
self.memory_store[key]['requests'].append(current_time)
|
|
|
|
def is_allowed(self, limit_type: str) -> tuple[bool, Dict]:
|
|
"""
|
|
Prüft ob eine Anfrage erlaubt ist
|
|
|
|
Returns:
|
|
(is_allowed, info_dict)
|
|
"""
|
|
if limit_type not in RATE_LIMITS:
|
|
return True, {}
|
|
|
|
rate_limit = RATE_LIMITS[limit_type]
|
|
client_id = self._get_client_id()
|
|
key = self._get_key(limit_type, client_id)
|
|
|
|
current_time = int(time.time())
|
|
window_start = current_time - rate_limit.per
|
|
|
|
# Aktuelle Anfragen zählen
|
|
current_requests = self._get_current_requests(key, window_start)
|
|
|
|
# Limite prüfen
|
|
if current_requests >= rate_limit.requests:
|
|
logger.warning(f"🚨 Rate limit exceeded: {limit_type} für Client {client_id[:8]}...")
|
|
return False, {
|
|
'limit': rate_limit.requests,
|
|
'remaining': 0,
|
|
'reset_time': current_time + rate_limit.per,
|
|
'message': rate_limit.message
|
|
}
|
|
|
|
# Anfrage hinzufügen
|
|
self._add_request(key, current_time, rate_limit.per)
|
|
|
|
return True, {
|
|
'limit': rate_limit.requests,
|
|
'remaining': rate_limit.requests - current_requests - 1,
|
|
'reset_time': current_time + rate_limit.per
|
|
}
|
|
|
|
def cleanup_memory(self):
|
|
"""Bereinigt alte In-Memory-Einträge"""
|
|
if self.use_redis:
|
|
return
|
|
|
|
current_time = time.time()
|
|
keys_to_delete = []
|
|
|
|
for key, data in self.memory_store.items():
|
|
# Bereinige alle Einträge älter als 24 Stunden
|
|
if current_time - data.get('last_cleanup', 0) > 86400:
|
|
keys_to_delete.append(key)
|
|
|
|
for key in keys_to_delete:
|
|
del self.memory_store[key]
|
|
|
|
# Globale Rate-Limiter-Instanz
|
|
rate_limiter = RateLimiter()
|
|
|
|
def limit_requests(limit_type: str):
|
|
"""
|
|
Decorator für Rate-Limiting von API-Endpunkten
|
|
|
|
Args:
|
|
limit_type: Art des Limits (siehe RATE_LIMITS)
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
# Rate-Limiting prüfen
|
|
is_allowed, info = rate_limiter.is_allowed(limit_type)
|
|
|
|
if not is_allowed:
|
|
response = jsonify({
|
|
'error': 'Rate limit exceeded',
|
|
'message': info['message'],
|
|
'retry_after': info['reset_time'] - int(time.time())
|
|
})
|
|
response.status_code = 429
|
|
response.headers['Retry-After'] = str(info['reset_time'] - int(time.time()))
|
|
response.headers['X-RateLimit-Limit'] = str(info['limit'])
|
|
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
|
|
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
|
|
return response
|
|
|
|
# Rate-Limiting-Headers zu Response hinzufügen
|
|
response = f(*args, **kwargs)
|
|
|
|
if hasattr(response, 'headers'):
|
|
response.headers['X-RateLimit-Limit'] = str(info['limit'])
|
|
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
|
|
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
|
|
|
|
return response
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
def get_client_info() -> Dict:
|
|
"""
|
|
Gibt Client-Informationen für Rate-Limiting zurück
|
|
"""
|
|
client_id = rate_limiter._get_client_id()
|
|
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
|
|
|
|
return {
|
|
'client_id': client_id,
|
|
'ip_address': ip,
|
|
'user_agent': request.headers.get('User-Agent', ''),
|
|
'timestamp': int(time.time())
|
|
}
|
|
|
|
# Maintenance-Task für Memory-Cleanup
|
|
def cleanup_rate_limiter():
|
|
"""Periodische Bereinigung des Rate-Limiters"""
|
|
rate_limiter.cleanup_memory()
|
|
logger.debug("🧹 Rate-Limiter Memory bereinigt") |