2025-06-04 10:03:22 +02:00

338 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Security Utilities für MYP Platform
Content Security Policy (CSP), Security Headers und weitere Sicherheitsmaßnahmen
"""
import secrets
import hashlib
from flask import request, g, session
from functools import wraps
from typing import Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger("security")
# Content Security Policy Konfiguration
CSP_POLICY = {
'default-src': ["'self'"],
'script-src': [
"'self'",
"'unsafe-inline'", # Für inline Scripts (wird nur verwendet wenn keine Nonce vorhanden)
"https://cdn.jsdelivr.net", # Für externe Libraries
"https://unpkg.com" # Für Fallback-Libraries
],
'style-src': [
"'self'",
"'unsafe-inline'", # Für Tailwind und Dynamic Styles
"https://fonts.googleapis.com"
],
'img-src': [
"'self'",
"data:", # Für SVG Data URLs
"blob:", # Für dynamisch generierte Bilder
"https:" # HTTPS-Bilder erlauben
],
'font-src': [
"'self'",
"https://fonts.gstatic.com",
"data:" # Für eingebettete Fonts
],
'connect-src': [
"'self'",
"ws:", # WebSocket für lokale Entwicklung
"wss:", # Sichere WebSockets
"http://localhost:*", # Lokale Entwicklung
"http://127.0.0.1:*", # Lokale Entwicklung
"https://localhost:*", # Lokale Entwicklung HTTPS
"https://127.0.0.1:*" # Lokale Entwicklung HTTPS
],
'media-src': ["'self'"],
'object-src': ["'none'"], # Flash und andere Plugins blockieren
'base-uri': ["'self'"],
'form-action': ["'self'"],
'frame-ancestors': ["'none'"], # Clickjacking-Schutz
'upgrade-insecure-requests': False, # Für lokale Entwicklung deaktiviert
'block-all-mixed-content': False # Für lokale Entwicklung deaktiviert
}
# Security Headers Konfiguration
SECURITY_HEADERS = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': (
'geolocation=(), '
'microphone=(), '
'camera=(), '
'payment=(), '
'usb=(), '
'accelerometer=(), '
'gyroscope=(), '
'magnetometer=()'
),
'Cross-Origin-Embedder-Policy': 'require-corp',
'Cross-Origin-Opener-Policy': 'same-origin',
'Cross-Origin-Resource-Policy': 'same-origin'
}
class SecurityManager:
"""
Zentrale Sicherheitsverwaltung für MYP Platform
"""
def __init__(self):
self.nonce_store: Dict[str, str] = {}
def generate_nonce(self) -> str:
"""Generiert eine sichere Nonce für CSP"""
nonce = secrets.token_urlsafe(32)
# Nonce in Session speichern für Validierung
if 'security_nonces' not in session:
session['security_nonces'] = []
session['security_nonces'].append(nonce)
# Maximal 10 Nonces pro Session
if len(session['security_nonces']) > 10:
session['security_nonces'] = session['security_nonces'][-10:]
return nonce
def validate_nonce(self, nonce: str) -> bool:
"""Validiert eine Nonce"""
if 'security_nonces' not in session:
return False
return nonce in session['security_nonces']
def build_csp_header(self, nonce: Optional[str] = None, use_nonce: bool = False) -> str:
"""
Erstellt den Content-Security-Policy Header
Args:
nonce: Optional CSP nonce für inline scripts
use_nonce: Ob Nonces verwendet werden sollen (deaktiviert dann 'unsafe-inline')
Returns:
CSP Header String
"""
csp_parts = []
for directive, values in CSP_POLICY.items():
if directive in ['upgrade-insecure-requests', 'block-all-mixed-content']:
if values:
csp_parts.append(directive.replace('_', '-'))
continue
if isinstance(values, list):
directive_values = values.copy()
# Nonce für script-src hinzufügen nur wenn explizit gewünscht
if directive == 'script-src' and nonce and use_nonce:
directive_values.append(f"'nonce-{nonce}'")
# 'unsafe-inline' entfernen wenn Nonce verwendet wird
if "'unsafe-inline'" in directive_values:
directive_values.remove("'unsafe-inline'")
csp_parts.append(f"{directive.replace('_', '-')} {' '.join(directive_values)}")
return "; ".join(csp_parts)
def get_client_fingerprint(self) -> str:
"""
Erstellt einen Client-Fingerprint für erweiterte Sicherheit
"""
components = [
request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr),
request.headers.get('User-Agent', ''),
request.headers.get('Accept-Language', ''),
request.headers.get('Accept-Encoding', '')
]
fingerprint_string = '|'.join(components)
return hashlib.sha256(fingerprint_string.encode()).hexdigest()[:32]
def check_suspicious_activity(self) -> bool:
"""
Prüft auf verdächtige Aktivitäten
"""
# SQL Injection Patterns
sql_patterns = [
'union select', 'drop table', 'insert into', 'delete from',
'script>', '<iframe', 'javascript:', 'vbscript:',
'onload=', 'onerror=', 'onclick='
]
# Request-Daten prüfen
request_data = str(request.args) + str(request.form) + str(request.json or {})
request_data_lower = request_data.lower()
for pattern in sql_patterns:
if pattern in request_data_lower:
logger.warning(f"🚨 Verdächtige Aktivität erkannt: {pattern} von {request.remote_addr}")
return True
# Übermäßig große Requests
if len(request_data) > 50000: # 50KB Limit
logger.warning(f"🚨 Übermäßig große Anfrage von {request.remote_addr}: {len(request_data)} bytes")
return True
return False
def log_security_event(self, event_type: str, details: Dict):
"""
Protokolliert Sicherheitsereignisse
"""
security_data = {
'event_type': event_type,
'ip_address': request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr),
'user_agent': request.headers.get('User-Agent', ''),
'timestamp': request.environ.get('REQUEST_START_TIME'),
'fingerprint': self.get_client_fingerprint(),
**details
}
logger.warning(f"🔒 Sicherheitsereignis: {event_type} - {security_data}")
# Globale Security Manager Instanz
security_manager = SecurityManager()
def apply_security_headers(response):
"""
Wendet Sicherheits-Headers auf Response an
"""
# Standard Security Headers
for header, value in SECURITY_HEADERS.items():
response.headers[header] = value
# Content Security Policy - für Entwicklung weniger restriktiv
nonce = getattr(g, 'csp_nonce', None)
# In der Entwicklung verwenden wir keine Nonces, um 'unsafe-inline' zu erhalten
use_nonce = False # In Produktion auf True setzen für bessere Sicherheit
csp_header = security_manager.build_csp_header(nonce, use_nonce)
response.headers['Content-Security-Policy'] = csp_header
# HSTS nur für HTTPS und Produktion
if request.is_secure and not request.environ.get('FLASK_ENV') == 'development':
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains; preload'
return response
def security_check(check_suspicious: bool = True):
"""
Decorator für Sicherheitsprüfungen
Args:
check_suspicious: Ob auf verdächtige Aktivitäten geprüft werden soll
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Verdächtige Aktivitäten prüfen
if check_suspicious and security_manager.check_suspicious_activity():
security_manager.log_security_event('suspicious_request', {
'endpoint': request.endpoint,
'method': request.method,
'args': dict(request.args),
'form': dict(request.form)
})
from flask import jsonify
return jsonify({
'error': 'Verdächtige Anfrage erkannt',
'message': 'Ihre Anfrage wurde aus Sicherheitsgründen blockiert.'
}), 400
return f(*args, **kwargs)
return wrapper
return decorator
def require_secure_headers(f):
"""
Decorator der sicherstellt, dass Security Headers gesetzt werden
"""
@wraps(f)
def wrapper(*args, **kwargs):
# CSP Nonce generieren
g.csp_nonce = security_manager.generate_nonce()
response = f(*args, **kwargs)
# Security Headers anwenden
if hasattr(response, 'headers'):
response = apply_security_headers(response)
return response
return wrapper
def get_csp_nonce() -> str:
"""
Holt die aktuelle CSP Nonce für Templates
"""
return getattr(g, 'csp_nonce', '')
def validate_origin():
"""
Validiert die Origin des Requests
"""
origin = request.headers.get('Origin')
referer = request.headers.get('Referer')
host = request.headers.get('Host')
# Für API-Requests Origin prüfen
if request.path.startswith('/api/') and origin:
allowed_origins = [
f"http://{host}",
f"https://{host}",
"http://localhost:5000",
"http://127.0.0.1:5000"
]
if origin not in allowed_origins:
logger.warning(f"🚨 Ungültige Origin: {origin} für {request.path}")
return False
return True
# Template Helper für CSP Nonce
def csp_nonce():
"""Template Helper für CSP Nonce"""
return get_csp_nonce()
# Security Middleware für Flask App
def init_security(app):
"""
Initialisiert Sicherheitsfeatures für Flask App
"""
@app.before_request
def before_request_security():
"""Security Checks vor jedem Request"""
# Origin validieren
if not validate_origin():
from flask import jsonify
return jsonify({
'error': 'Invalid origin',
'message': 'Anfrage von ungültiger Quelle'
}), 403
# CSP Nonce generieren
g.csp_nonce = security_manager.generate_nonce()
@app.after_request
def after_request_security(response):
"""Security Headers nach jedem Request anwenden"""
return apply_security_headers(response)
# Template Helper registrieren
app.jinja_env.globals['csp_nonce'] = csp_nonce
logger.info("🔒 Security System initialisiert")
return app