220 lines
7.5 KiB
Python
220 lines
7.5 KiB
Python
"""
|
|
Sicherheitsmodule und Middleware für die MYP Flask-Anwendung.
|
|
Implementiert CSRF-Schutz, Content Security Policy und weitere Sicherheitsmaßnahmen.
|
|
"""
|
|
|
|
from flask import request, jsonify, current_app
|
|
from flask_talisman import Talisman
|
|
from functools import wraps
|
|
import time
|
|
import hashlib
|
|
import hmac
|
|
from collections import defaultdict, deque
|
|
from datetime import datetime, timedelta
|
|
|
|
class SecurityMiddleware:
|
|
"""Zentrale Sicherheits-Middleware für die Anwendung."""
|
|
|
|
def __init__(self, app=None):
|
|
self.app = app
|
|
self.rate_limits = defaultdict(lambda: deque())
|
|
self.failed_attempts = defaultdict(int)
|
|
self.blocked_ips = set()
|
|
|
|
if app is not None:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app):
|
|
"""Initialisiert die Sicherheits-Middleware mit der Flask-App."""
|
|
self.app = app
|
|
|
|
# Talisman für Content Security Policy und HTTPS-Enforcement
|
|
if not app.debug:
|
|
Talisman(
|
|
app,
|
|
force_https=False, # In Produktion auf True setzen, wenn HTTPS verfügbar
|
|
strict_transport_security=True,
|
|
content_security_policy={
|
|
'default-src': "'self'",
|
|
'script-src': "'self' 'unsafe-inline'",
|
|
'style-src': "'self' 'unsafe-inline'",
|
|
'img-src': "'self' data:",
|
|
'font-src': "'self'",
|
|
'connect-src': "'self'",
|
|
'form-action': "'self'"
|
|
}
|
|
)
|
|
|
|
# Request-Hooks registrieren
|
|
app.before_request(self.before_request_security_check)
|
|
app.after_request(self.after_request_security_headers)
|
|
|
|
def before_request_security_check(self):
|
|
"""Sicherheitsüberprüfungen vor jeder Anfrage."""
|
|
client_ip = self.get_client_ip()
|
|
|
|
# Blocked IPs prüfen
|
|
if client_ip in self.blocked_ips:
|
|
current_app.logger.warning(f"Blockierte IP-Adresse versucht Zugriff: {client_ip}")
|
|
return jsonify({'message': 'Zugriff verweigert'}), 403
|
|
|
|
# Rate Limiting
|
|
if self.is_rate_limited(client_ip):
|
|
current_app.logger.warning(f"Rate Limit überschritten für IP: {client_ip}")
|
|
return jsonify({'message': 'Zu viele Anfragen'}), 429
|
|
|
|
# Content-Length prüfen (Schutz vor großen Payloads)
|
|
if request.content_length and request.content_length > 10 * 1024 * 1024: # 10MB
|
|
current_app.logger.warning(f"Payload zu groß von IP: {client_ip}")
|
|
return jsonify({'message': 'Payload zu groß'}), 413
|
|
|
|
def after_request_security_headers(self, response):
|
|
"""Fügt Sicherheits-Header zu jeder Antwort hinzu."""
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'DENY'
|
|
response.headers['X-XSS-Protection'] = '1; mode=block'
|
|
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
|
|
|
|
# Cache-Control für statische Ressourcen
|
|
if request.endpoint and 'static' in request.endpoint:
|
|
response.headers['Cache-Control'] = 'public, max-age=3600'
|
|
else:
|
|
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
|
|
response.headers['Pragma'] = 'no-cache'
|
|
response.headers['Expires'] = '0'
|
|
|
|
return response
|
|
|
|
def get_client_ip(self):
|
|
"""Ermittelt die Client-IP-Adresse."""
|
|
if request.headers.get('X-Forwarded-For'):
|
|
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
|
|
elif request.headers.get('X-Real-IP'):
|
|
return request.headers.get('X-Real-IP')
|
|
else:
|
|
return request.remote_addr
|
|
|
|
def is_rate_limited(self, ip, max_requests=100, window_minutes=15):
|
|
"""
|
|
Überprüft Rate Limiting für eine IP-Adresse.
|
|
|
|
Args:
|
|
ip: Client-IP-Adresse
|
|
max_requests: Maximale Anzahl Requests pro Zeitfenster
|
|
window_minutes: Zeitfenster in Minuten
|
|
|
|
Returns:
|
|
bool: True wenn Rate Limit überschritten
|
|
"""
|
|
now = datetime.now()
|
|
window_start = now - timedelta(minutes=window_minutes)
|
|
|
|
# Alte Einträge entfernen
|
|
while self.rate_limits[ip] and self.rate_limits[ip][0] < window_start:
|
|
self.rate_limits[ip].popleft()
|
|
|
|
# Neue Anfrage hinzufügen
|
|
self.rate_limits[ip].append(now)
|
|
|
|
# Rate Limit prüfen
|
|
if len(self.rate_limits[ip]) > max_requests:
|
|
return True
|
|
|
|
return False
|
|
|
|
def record_failed_login(self, ip):
|
|
"""
|
|
Zeichnet fehlgeschlagene Login-Versuche auf.
|
|
|
|
Args:
|
|
ip: Client-IP-Adresse
|
|
"""
|
|
self.failed_attempts[ip] += 1
|
|
|
|
# Nach 5 fehlgeschlagenen Versuchen temporär blockieren
|
|
if self.failed_attempts[ip] >= 5:
|
|
self.blocked_ips.add(ip)
|
|
current_app.logger.warning(f"IP-Adresse blockiert nach zu vielen fehlgeschlagenen Login-Versuchen: {ip}")
|
|
|
|
# Automatisches Entsperren nach 1 Stunde
|
|
def unblock_ip():
|
|
time.sleep(3600) # 1 Stunde
|
|
if ip in self.blocked_ips:
|
|
self.blocked_ips.remove(ip)
|
|
self.failed_attempts[ip] = 0
|
|
current_app.logger.info(f"IP-Adresse automatisch entsperrt: {ip}")
|
|
|
|
import threading
|
|
threading.Thread(target=unblock_ip, daemon=True).start()
|
|
|
|
def clear_failed_attempts(self, ip):
|
|
"""
|
|
Löscht fehlgeschlagene Login-Versuche für eine IP.
|
|
|
|
Args:
|
|
ip: Client-IP-Adresse
|
|
"""
|
|
if ip in self.failed_attempts:
|
|
self.failed_attempts[ip] = 0
|
|
|
|
def require_api_key(f):
|
|
"""
|
|
Decorator für API-Endpunkte, die einen API-Key erfordern.
|
|
|
|
Args:
|
|
f: Zu schützende Funktion
|
|
|
|
Returns:
|
|
Geschützte Funktion
|
|
"""
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
api_key = request.headers.get('X-API-Key')
|
|
expected_key = current_app.config.get('API_KEY')
|
|
|
|
if not expected_key:
|
|
# Kein API-Key konfiguriert, Zugriff erlauben
|
|
return f(*args, **kwargs)
|
|
|
|
if not api_key:
|
|
return jsonify({'message': 'API-Key erforderlich'}), 401
|
|
|
|
# Sichere Vergleichsfunktion verwenden
|
|
if not hmac.compare_digest(api_key, expected_key):
|
|
current_app.logger.warning(f"Ungültiger API-Key von IP: {request.remote_addr}")
|
|
return jsonify({'message': 'Ungültiger API-Key'}), 401
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
def validate_csrf_token():
|
|
"""
|
|
Validiert CSRF-Token für POST/PUT/DELETE-Requests.
|
|
|
|
Returns:
|
|
bool: True wenn Token gültig ist
|
|
"""
|
|
if request.method in ['GET', 'HEAD', 'OPTIONS']:
|
|
return True
|
|
|
|
token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')
|
|
session_token = request.cookies.get('csrf_token')
|
|
|
|
if not token or not session_token:
|
|
return False
|
|
|
|
return hmac.compare_digest(token, session_token)
|
|
|
|
def generate_csrf_token():
|
|
"""
|
|
Generiert ein neues CSRF-Token.
|
|
|
|
Returns:
|
|
str: CSRF-Token
|
|
"""
|
|
import secrets
|
|
return secrets.token_hex(32)
|
|
|
|
# Globale Sicherheits-Middleware-Instanz
|
|
security_middleware = SecurityMiddleware() |