220 lines
7.5 KiB
Python

"""
Sicherheitsmodule und Middleware für die MYP Flask-Anwendung.
Implementiert CSRF-Schutz, Content Security Policy und weitere Sicherheitsmaßnahmen.
"""
from flask import request, jsonify, current_app
from flask_talisman import Talisman
from functools import wraps
import time
import hashlib
import hmac
from collections import defaultdict, deque
from datetime import datetime, timedelta
class SecurityMiddleware:
"""Zentrale Sicherheits-Middleware für die Anwendung."""
def __init__(self, app=None):
self.app = app
self.rate_limits = defaultdict(lambda: deque())
self.failed_attempts = defaultdict(int)
self.blocked_ips = set()
if app is not None:
self.init_app(app)
def init_app(self, app):
"""Initialisiert die Sicherheits-Middleware mit der Flask-App."""
self.app = app
# Talisman für Content Security Policy und HTTPS-Enforcement
if not app.debug:
Talisman(
app,
force_https=False, # In Produktion auf True setzen, wenn HTTPS verfügbar
strict_transport_security=True,
content_security_policy={
'default-src': "'self'",
'script-src': "'self' 'unsafe-inline'",
'style-src': "'self' 'unsafe-inline'",
'img-src': "'self' data:",
'font-src': "'self'",
'connect-src': "'self'",
'form-action': "'self'"
}
)
# Request-Hooks registrieren
app.before_request(self.before_request_security_check)
app.after_request(self.after_request_security_headers)
def before_request_security_check(self):
"""Sicherheitsüberprüfungen vor jeder Anfrage."""
client_ip = self.get_client_ip()
# Blocked IPs prüfen
if client_ip in self.blocked_ips:
current_app.logger.warning(f"Blockierte IP-Adresse versucht Zugriff: {client_ip}")
return jsonify({'message': 'Zugriff verweigert'}), 403
# Rate Limiting
if self.is_rate_limited(client_ip):
current_app.logger.warning(f"Rate Limit überschritten für IP: {client_ip}")
return jsonify({'message': 'Zu viele Anfragen'}), 429
# Content-Length prüfen (Schutz vor großen Payloads)
if request.content_length and request.content_length > 10 * 1024 * 1024: # 10MB
current_app.logger.warning(f"Payload zu groß von IP: {client_ip}")
return jsonify({'message': 'Payload zu groß'}), 413
def after_request_security_headers(self, response):
"""Fügt Sicherheits-Header zu jeder Antwort hinzu."""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Cache-Control für statische Ressourcen
if request.endpoint and 'static' in request.endpoint:
response.headers['Cache-Control'] = 'public, max-age=3600'
else:
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
def get_client_ip(self):
"""Ermittelt die Client-IP-Adresse."""
if request.headers.get('X-Forwarded-For'):
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
elif request.headers.get('X-Real-IP'):
return request.headers.get('X-Real-IP')
else:
return request.remote_addr
def is_rate_limited(self, ip, max_requests=100, window_minutes=15):
"""
Überprüft Rate Limiting für eine IP-Adresse.
Args:
ip: Client-IP-Adresse
max_requests: Maximale Anzahl Requests pro Zeitfenster
window_minutes: Zeitfenster in Minuten
Returns:
bool: True wenn Rate Limit überschritten
"""
now = datetime.now()
window_start = now - timedelta(minutes=window_minutes)
# Alte Einträge entfernen
while self.rate_limits[ip] and self.rate_limits[ip][0] < window_start:
self.rate_limits[ip].popleft()
# Neue Anfrage hinzufügen
self.rate_limits[ip].append(now)
# Rate Limit prüfen
if len(self.rate_limits[ip]) > max_requests:
return True
return False
def record_failed_login(self, ip):
"""
Zeichnet fehlgeschlagene Login-Versuche auf.
Args:
ip: Client-IP-Adresse
"""
self.failed_attempts[ip] += 1
# Nach 5 fehlgeschlagenen Versuchen temporär blockieren
if self.failed_attempts[ip] >= 5:
self.blocked_ips.add(ip)
current_app.logger.warning(f"IP-Adresse blockiert nach zu vielen fehlgeschlagenen Login-Versuchen: {ip}")
# Automatisches Entsperren nach 1 Stunde
def unblock_ip():
time.sleep(3600) # 1 Stunde
if ip in self.blocked_ips:
self.blocked_ips.remove(ip)
self.failed_attempts[ip] = 0
current_app.logger.info(f"IP-Adresse automatisch entsperrt: {ip}")
import threading
threading.Thread(target=unblock_ip, daemon=True).start()
def clear_failed_attempts(self, ip):
"""
Löscht fehlgeschlagene Login-Versuche für eine IP.
Args:
ip: Client-IP-Adresse
"""
if ip in self.failed_attempts:
self.failed_attempts[ip] = 0
def require_api_key(f):
"""
Decorator für API-Endpunkte, die einen API-Key erfordern.
Args:
f: Zu schützende Funktion
Returns:
Geschützte Funktion
"""
@wraps(f)
def decorated(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
expected_key = current_app.config.get('API_KEY')
if not expected_key:
# Kein API-Key konfiguriert, Zugriff erlauben
return f(*args, **kwargs)
if not api_key:
return jsonify({'message': 'API-Key erforderlich'}), 401
# Sichere Vergleichsfunktion verwenden
if not hmac.compare_digest(api_key, expected_key):
current_app.logger.warning(f"Ungültiger API-Key von IP: {request.remote_addr}")
return jsonify({'message': 'Ungültiger API-Key'}), 401
return f(*args, **kwargs)
return decorated
def validate_csrf_token():
"""
Validiert CSRF-Token für POST/PUT/DELETE-Requests.
Returns:
bool: True wenn Token gültig ist
"""
if request.method in ['GET', 'HEAD', 'OPTIONS']:
return True
token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')
session_token = request.cookies.get('csrf_token')
if not token or not session_token:
return False
return hmac.compare_digest(token, session_token)
def generate_csrf_token():
"""
Generiert ein neues CSRF-Token.
Returns:
str: CSRF-Token
"""
import secrets
return secrets.token_hex(32)
# Globale Sicherheits-Middleware-Instanz
security_middleware = SecurityMiddleware()