""" Sicherheitsmodule und Middleware für die MYP Flask-Anwendung. Implementiert CSRF-Schutz, Content Security Policy und weitere Sicherheitsmaßnahmen. """ from flask import request, jsonify, current_app from flask_talisman import Talisman from functools import wraps import time import hashlib import hmac from collections import defaultdict, deque from datetime import datetime, timedelta class SecurityMiddleware: """Zentrale Sicherheits-Middleware für die Anwendung.""" def __init__(self, app=None): self.app = app self.rate_limits = defaultdict(lambda: deque()) self.failed_attempts = defaultdict(int) self.blocked_ips = set() if app is not None: self.init_app(app) def init_app(self, app): """Initialisiert die Sicherheits-Middleware mit der Flask-App.""" self.app = app # Talisman für Content Security Policy und HTTPS-Enforcement if not app.debug: Talisman( app, force_https=False, # In Produktion auf True setzen, wenn HTTPS verfügbar strict_transport_security=True, content_security_policy={ 'default-src': "'self'", 'script-src': "'self' 'unsafe-inline'", 'style-src': "'self' 'unsafe-inline'", 'img-src': "'self' data:", 'font-src': "'self'", 'connect-src': "'self'", 'form-action': "'self'" } ) # Request-Hooks registrieren app.before_request(self.before_request_security_check) app.after_request(self.after_request_security_headers) def before_request_security_check(self): """Sicherheitsüberprüfungen vor jeder Anfrage.""" client_ip = self.get_client_ip() # Blocked IPs prüfen if client_ip in self.blocked_ips: current_app.logger.warning(f"Blockierte IP-Adresse versucht Zugriff: {client_ip}") return jsonify({'message': 'Zugriff verweigert'}), 403 # Rate Limiting if self.is_rate_limited(client_ip): current_app.logger.warning(f"Rate Limit überschritten für IP: {client_ip}") return jsonify({'message': 'Zu viele Anfragen'}), 429 # Content-Length prüfen (Schutz vor großen Payloads) if request.content_length and request.content_length > 10 * 1024 * 1024: # 10MB current_app.logger.warning(f"Payload zu groß von IP: {client_ip}") return jsonify({'message': 'Payload zu groß'}), 413 def after_request_security_headers(self, response): """Fügt Sicherheits-Header zu jeder Antwort hinzu.""" response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' # Cache-Control für statische Ressourcen if request.endpoint and 'static' in request.endpoint: response.headers['Cache-Control'] = 'public, max-age=3600' else: response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response def get_client_ip(self): """Ermittelt die Client-IP-Adresse.""" if request.headers.get('X-Forwarded-For'): return request.headers.get('X-Forwarded-For').split(',')[0].strip() elif request.headers.get('X-Real-IP'): return request.headers.get('X-Real-IP') else: return request.remote_addr def is_rate_limited(self, ip, max_requests=100, window_minutes=15): """ Überprüft Rate Limiting für eine IP-Adresse. Args: ip: Client-IP-Adresse max_requests: Maximale Anzahl Requests pro Zeitfenster window_minutes: Zeitfenster in Minuten Returns: bool: True wenn Rate Limit überschritten """ now = datetime.now() window_start = now - timedelta(minutes=window_minutes) # Alte Einträge entfernen while self.rate_limits[ip] and self.rate_limits[ip][0] < window_start: self.rate_limits[ip].popleft() # Neue Anfrage hinzufügen self.rate_limits[ip].append(now) # Rate Limit prüfen if len(self.rate_limits[ip]) > max_requests: return True return False def record_failed_login(self, ip): """ Zeichnet fehlgeschlagene Login-Versuche auf. Args: ip: Client-IP-Adresse """ self.failed_attempts[ip] += 1 # Nach 5 fehlgeschlagenen Versuchen temporär blockieren if self.failed_attempts[ip] >= 5: self.blocked_ips.add(ip) current_app.logger.warning(f"IP-Adresse blockiert nach zu vielen fehlgeschlagenen Login-Versuchen: {ip}") # Automatisches Entsperren nach 1 Stunde def unblock_ip(): time.sleep(3600) # 1 Stunde if ip in self.blocked_ips: self.blocked_ips.remove(ip) self.failed_attempts[ip] = 0 current_app.logger.info(f"IP-Adresse automatisch entsperrt: {ip}") import threading threading.Thread(target=unblock_ip, daemon=True).start() def clear_failed_attempts(self, ip): """ Löscht fehlgeschlagene Login-Versuche für eine IP. Args: ip: Client-IP-Adresse """ if ip in self.failed_attempts: self.failed_attempts[ip] = 0 def require_api_key(f): """ Decorator für API-Endpunkte, die einen API-Key erfordern. Args: f: Zu schützende Funktion Returns: Geschützte Funktion """ @wraps(f) def decorated(*args, **kwargs): api_key = request.headers.get('X-API-Key') expected_key = current_app.config.get('API_KEY') if not expected_key: # Kein API-Key konfiguriert, Zugriff erlauben return f(*args, **kwargs) if not api_key: return jsonify({'message': 'API-Key erforderlich'}), 401 # Sichere Vergleichsfunktion verwenden if not hmac.compare_digest(api_key, expected_key): current_app.logger.warning(f"Ungültiger API-Key von IP: {request.remote_addr}") return jsonify({'message': 'Ungültiger API-Key'}), 401 return f(*args, **kwargs) return decorated def validate_csrf_token(): """ Validiert CSRF-Token für POST/PUT/DELETE-Requests. Returns: bool: True wenn Token gültig ist """ if request.method in ['GET', 'HEAD', 'OPTIONS']: return True token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token') session_token = request.cookies.get('csrf_token') if not token or not session_token: return False return hmac.compare_digest(token, session_token) def generate_csrf_token(): """ Generiert ein neues CSRF-Token. Returns: str: CSRF-Token """ import secrets return secrets.token_hex(32) # Globale Sicherheits-Middleware-Instanz security_middleware = SecurityMiddleware()