"feat: Integrate rate limiter

This commit is contained in:
2025-05-29 15:38:32 +02:00
parent d6a583d115
commit 9c89ad1397
5 changed files with 644 additions and 51 deletions

View File

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
Rate Limiting System für MYP Platform
Schutz vor API-Missbrauch und DDoS-Attacken
"""
import time
import redis
import hashlib
from functools import wraps
from flask import request, jsonify, g
from typing import Dict, Optional
from dataclasses import dataclass
from utils.logging_config import get_logger
logger = get_logger("security")
@dataclass
class RateLimit:
"""Konfiguration für Rate-Limiting-Regeln"""
requests: int # Anzahl erlaubter Anfragen
per: int # Zeitraum in Sekunden
message: str # Fehlermeldung bei Überschreitung
# Rate-Limiting-Konfiguration
RATE_LIMITS = {
# API-Endpunkte
'api_general': RateLimit(100, 300, "Zu viele API-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
'api_auth': RateLimit(10, 300, "Zu viele Anmeldeversuche. Versuchen Sie es in 5 Minuten erneut."),
'api_upload': RateLimit(20, 3600, "Zu viele Upload-Anfragen. Versuchen Sie es in einer Stunde erneut."),
'api_admin': RateLimit(200, 300, "Zu viele Admin-Anfragen. Versuchen Sie es in 5 Minuten erneut."),
# Spezielle Endpunkte
'printer_status': RateLimit(300, 300, "Zu viele Drucker-Status-Anfragen."),
'job_creation': RateLimit(50, 3600, "Zu viele Job-Erstellungen. Versuchen Sie es in einer Stunde erneut."),
# Sicherheitskritische Endpunkte
'password_reset': RateLimit(3, 3600, "Zu viele Passwort-Reset-Anfragen. Versuchen Sie es in einer Stunde erneut."),
'user_creation': RateLimit(10, 3600, "Zu viele Benutzer-Erstellungen.")
}
class RateLimiter:
"""
In-Memory Rate Limiter mit optionaler Redis-Unterstützung
"""
def __init__(self, use_redis: bool = False, redis_url: str = None):
self.use_redis = use_redis
self.redis_client = None
self.memory_store: Dict[str, Dict] = {}
if use_redis and redis_url:
try:
import redis
self.redis_client = redis.from_url(redis_url, decode_responses=True)
logger.info("✅ Redis-basiertes Rate Limiting aktiviert")
except ImportError:
logger.warning("⚠️ Redis nicht verfügbar, verwende In-Memory Rate Limiting")
self.use_redis = False
except Exception as e:
logger.error(f"❌ Redis-Verbindung fehlgeschlagen: {e}")
self.use_redis = False
def _get_client_id(self) -> str:
"""
Generiert eine eindeutige Client-ID basierend auf IP und User-Agent
"""
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
user_agent = request.headers.get('User-Agent', '')
# Hash für Anonymisierung
client_string = f"{ip}:{user_agent}"
return hashlib.sha256(client_string.encode()).hexdigest()[:16]
def _get_key(self, limit_type: str, client_id: str) -> str:
"""Erstellt Redis/Memory-Key für Rate-Limiting"""
return f"rate_limit:{limit_type}:{client_id}"
def _get_current_requests(self, key: str, window_start: int) -> int:
"""Holt aktuelle Anfragen-Anzahl"""
if self.use_redis and self.redis_client:
try:
# Redis-basierte Implementierung
pipe = self.redis_client.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
_, count = pipe.execute()
return count
except Exception as e:
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
self.use_redis = False
# In-Memory Implementierung
if key not in self.memory_store:
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
# Alte Einträge bereinigen
current_time = time.time()
data = self.memory_store[key]
data['requests'] = [req_time for req_time in data['requests'] if req_time > window_start]
return len(data['requests'])
def _add_request(self, key: str, current_time: int, expire_time: int):
"""Fügt neue Anfrage hinzu"""
if self.use_redis and self.redis_client:
try:
pipe = self.redis_client.pipeline()
pipe.zadd(key, {str(current_time): current_time})
pipe.expire(key, expire_time)
pipe.execute()
return
except Exception as e:
logger.error(f"Redis-Fehler: {e}, fallback zu Memory")
self.use_redis = False
# In-Memory Implementierung
if key not in self.memory_store:
self.memory_store[key] = {'requests': [], 'last_cleanup': time.time()}
self.memory_store[key]['requests'].append(current_time)
def is_allowed(self, limit_type: str) -> tuple[bool, Dict]:
"""
Prüft ob eine Anfrage erlaubt ist
Returns:
(is_allowed, info_dict)
"""
if limit_type not in RATE_LIMITS:
return True, {}
rate_limit = RATE_LIMITS[limit_type]
client_id = self._get_client_id()
key = self._get_key(limit_type, client_id)
current_time = int(time.time())
window_start = current_time - rate_limit.per
# Aktuelle Anfragen zählen
current_requests = self._get_current_requests(key, window_start)
# Limite prüfen
if current_requests >= rate_limit.requests:
logger.warning(f"🚨 Rate limit exceeded: {limit_type} für Client {client_id[:8]}...")
return False, {
'limit': rate_limit.requests,
'remaining': 0,
'reset_time': current_time + rate_limit.per,
'message': rate_limit.message
}
# Anfrage hinzufügen
self._add_request(key, current_time, rate_limit.per)
return True, {
'limit': rate_limit.requests,
'remaining': rate_limit.requests - current_requests - 1,
'reset_time': current_time + rate_limit.per
}
def cleanup_memory(self):
"""Bereinigt alte In-Memory-Einträge"""
if self.use_redis:
return
current_time = time.time()
keys_to_delete = []
for key, data in self.memory_store.items():
# Bereinige alle Einträge älter als 24 Stunden
if current_time - data.get('last_cleanup', 0) > 86400:
keys_to_delete.append(key)
for key in keys_to_delete:
del self.memory_store[key]
# Globale Rate-Limiter-Instanz
rate_limiter = RateLimiter()
def limit_requests(limit_type: str):
"""
Decorator für Rate-Limiting von API-Endpunkten
Args:
limit_type: Art des Limits (siehe RATE_LIMITS)
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Rate-Limiting prüfen
is_allowed, info = rate_limiter.is_allowed(limit_type)
if not is_allowed:
response = jsonify({
'error': 'Rate limit exceeded',
'message': info['message'],
'retry_after': info['reset_time'] - int(time.time())
})
response.status_code = 429
response.headers['Retry-After'] = str(info['reset_time'] - int(time.time()))
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
return response
# Rate-Limiting-Headers zu Response hinzufügen
response = f(*args, **kwargs)
if hasattr(response, 'headers'):
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(info['reset_time'])
return response
return wrapper
return decorator
def get_client_info() -> Dict:
"""
Gibt Client-Informationen für Rate-Limiting zurück
"""
client_id = rate_limiter._get_client_id()
ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
return {
'client_id': client_id,
'ip_address': ip,
'user_agent': request.headers.get('User-Agent', ''),
'timestamp': int(time.time())
}
# Maintenance-Task für Memory-Cleanup
def cleanup_rate_limiter():
"""Periodische Bereinigung des Rate-Limiters"""
rate_limiter.cleanup_memory()
logger.debug("🧹 Rate-Limiter Memory bereinigt")

View File

@@ -0,0 +1,328 @@
#!/usr/bin/env python3
"""
Security Utilities für MYP Platform
Content Security Policy (CSP), Security Headers und weitere Sicherheitsmaßnahmen
"""
import secrets
import hashlib
from flask import request, g, session
from functools import wraps
from typing import Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger("security")
# Content Security Policy Konfiguration
CSP_POLICY = {
'default-src': ["'self'"],
'script-src': [
"'self'",
"'unsafe-inline'", # Temporär für Dark Mode Script
"https://cdn.jsdelivr.net", # Für externe Libraries
"https://unpkg.com" # Für Fallback-Libraries
],
'style-src': [
"'self'",
"'unsafe-inline'", # Für Tailwind und Dynamic Styles
"https://fonts.googleapis.com"
],
'img-src': [
"'self'",
"data:", # Für SVG Data URLs
"blob:", # Für dynamisch generierte Bilder
"https:" # HTTPS-Bilder erlauben
],
'font-src': [
"'self'",
"https://fonts.gstatic.com",
"data:" # Für eingebettete Fonts
],
'connect-src': [
"'self'",
"ws:", # WebSocket für lokale Entwicklung
"wss:" # Sichere WebSockets
],
'media-src': ["'self'"],
'object-src': ["'none'"], # Flash und andere Plugins blockieren
'base-uri': ["'self'"],
'form-action': ["'self'"],
'frame-ancestors': ["'none'"], # Clickjacking-Schutz
'upgrade-insecure-requests': True,
'block-all-mixed-content': True
}
# Security Headers Konfiguration
SECURITY_HEADERS = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': (
'geolocation=(), '
'microphone=(), '
'camera=(), '
'payment=(), '
'usb=(), '
'accelerometer=(), '
'gyroscope=(), '
'magnetometer=()'
),
'Cross-Origin-Embedder-Policy': 'require-corp',
'Cross-Origin-Opener-Policy': 'same-origin',
'Cross-Origin-Resource-Policy': 'same-origin'
}
class SecurityManager:
"""
Zentrale Sicherheitsverwaltung für MYP Platform
"""
def __init__(self):
self.nonce_store: Dict[str, str] = {}
def generate_nonce(self) -> str:
"""Generiert eine sichere Nonce für CSP"""
nonce = secrets.token_urlsafe(32)
# Nonce in Session speichern für Validierung
if 'security_nonces' not in session:
session['security_nonces'] = []
session['security_nonces'].append(nonce)
# Maximal 10 Nonces pro Session
if len(session['security_nonces']) > 10:
session['security_nonces'] = session['security_nonces'][-10:]
return nonce
def validate_nonce(self, nonce: str) -> bool:
"""Validiert eine Nonce"""
if 'security_nonces' not in session:
return False
return nonce in session['security_nonces']
def build_csp_header(self, nonce: Optional[str] = None) -> str:
"""
Erstellt den Content-Security-Policy Header
Args:
nonce: Optional CSP nonce für inline scripts
Returns:
CSP Header String
"""
csp_parts = []
for directive, values in CSP_POLICY.items():
if directive in ['upgrade-insecure-requests', 'block-all-mixed-content']:
if values:
csp_parts.append(directive.replace('_', '-'))
continue
if isinstance(values, list):
directive_values = values.copy()
# Nonce für script-src hinzufügen
if directive == 'script-src' and nonce:
directive_values.append(f"'nonce-{nonce}'")
csp_parts.append(f"{directive.replace('_', '-')} {' '.join(directive_values)}")
return "; ".join(csp_parts)
def get_client_fingerprint(self) -> str:
"""
Erstellt einen Client-Fingerprint für erweiterte Sicherheit
"""
components = [
request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr),
request.headers.get('User-Agent', ''),
request.headers.get('Accept-Language', ''),
request.headers.get('Accept-Encoding', '')
]
fingerprint_string = '|'.join(components)
return hashlib.sha256(fingerprint_string.encode()).hexdigest()[:32]
def check_suspicious_activity(self) -> bool:
"""
Prüft auf verdächtige Aktivitäten
"""
# SQL Injection Patterns
sql_patterns = [
'union select', 'drop table', 'insert into', 'delete from',
'script>', '<iframe', 'javascript:', 'vbscript:',
'onload=', 'onerror=', 'onclick='
]
# Request-Daten prüfen
request_data = str(request.args) + str(request.form) + str(request.json or {})
request_data_lower = request_data.lower()
for pattern in sql_patterns:
if pattern in request_data_lower:
logger.warning(f"🚨 Verdächtige Aktivität erkannt: {pattern} von {request.remote_addr}")
return True
# Übermäßig große Requests
if len(request_data) > 50000: # 50KB Limit
logger.warning(f"🚨 Übermäßig große Anfrage von {request.remote_addr}: {len(request_data)} bytes")
return True
return False
def log_security_event(self, event_type: str, details: Dict):
"""
Protokolliert Sicherheitsereignisse
"""
security_data = {
'event_type': event_type,
'ip_address': request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr),
'user_agent': request.headers.get('User-Agent', ''),
'timestamp': request.environ.get('REQUEST_START_TIME'),
'fingerprint': self.get_client_fingerprint(),
**details
}
logger.warning(f"🔒 Sicherheitsereignis: {event_type} - {security_data}")
# Globale Security Manager Instanz
security_manager = SecurityManager()
def apply_security_headers(response):
"""
Wendet Sicherheits-Headers auf Response an
"""
# Standard Security Headers
for header, value in SECURITY_HEADERS.items():
response.headers[header] = value
# Content Security Policy
nonce = getattr(g, 'csp_nonce', None)
csp_header = security_manager.build_csp_header(nonce)
response.headers['Content-Security-Policy'] = csp_header
# HSTS für HTTPS
if request.is_secure:
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains; preload'
return response
def security_check(check_suspicious: bool = True):
"""
Decorator für Sicherheitsprüfungen
Args:
check_suspicious: Ob auf verdächtige Aktivitäten geprüft werden soll
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Verdächtige Aktivitäten prüfen
if check_suspicious and security_manager.check_suspicious_activity():
security_manager.log_security_event('suspicious_request', {
'endpoint': request.endpoint,
'method': request.method,
'args': dict(request.args),
'form': dict(request.form)
})
from flask import jsonify
return jsonify({
'error': 'Verdächtige Anfrage erkannt',
'message': 'Ihre Anfrage wurde aus Sicherheitsgründen blockiert.'
}), 400
return f(*args, **kwargs)
return wrapper
return decorator
def require_secure_headers(f):
"""
Decorator der sicherstellt, dass Security Headers gesetzt werden
"""
@wraps(f)
def wrapper(*args, **kwargs):
# CSP Nonce generieren
g.csp_nonce = security_manager.generate_nonce()
response = f(*args, **kwargs)
# Security Headers anwenden
if hasattr(response, 'headers'):
response = apply_security_headers(response)
return response
return wrapper
def get_csp_nonce() -> str:
"""
Holt die aktuelle CSP Nonce für Templates
"""
return getattr(g, 'csp_nonce', '')
def validate_origin():
"""
Validiert die Origin des Requests
"""
origin = request.headers.get('Origin')
referer = request.headers.get('Referer')
host = request.headers.get('Host')
# Für API-Requests Origin prüfen
if request.path.startswith('/api/') and origin:
allowed_origins = [
f"http://{host}",
f"https://{host}",
"http://localhost:5000",
"http://127.0.0.1:5000"
]
if origin not in allowed_origins:
logger.warning(f"🚨 Ungültige Origin: {origin} für {request.path}")
return False
return True
# Template Helper für CSP Nonce
def csp_nonce():
"""Template Helper für CSP Nonce"""
return get_csp_nonce()
# Security Middleware für Flask App
def init_security(app):
"""
Initialisiert Sicherheitsfeatures für Flask App
"""
@app.before_request
def before_request_security():
"""Security Checks vor jedem Request"""
# Origin validieren
if not validate_origin():
from flask import jsonify
return jsonify({
'error': 'Invalid origin',
'message': 'Anfrage von ungültiger Quelle'
}), 403
# CSP Nonce generieren
g.csp_nonce = security_manager.generate_nonce()
@app.after_request
def after_request_security(response):
"""Security Headers nach jedem Request anwenden"""
return apply_security_headers(response)
# Template Helper registrieren
app.jinja_env.globals['csp_nonce'] = csp_nonce
logger.info("🔒 Security System initialisiert")
return app