273 lines
9.2 KiB
Python
273 lines
9.2 KiB
Python
#!/usr/bin/env python3.11
|
|
"""
|
|
SSL Suite - ULTRA KONSOLIDIERUNG
|
|
===============================
|
|
|
|
Migration Information:
|
|
- Ursprünglich: ssl_fix.py, ssl_config.py, ssl_manager.py
|
|
- Konsolidiert am: 2025-06-09
|
|
- Funktionalitäten: SSL-Fixes, SSL-Konfiguration, Zertifikat-Management
|
|
- Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar
|
|
|
|
ULTRA KONSOLIDIERUNG für Projektarbeit MYP
|
|
Author: MYP Team - Till Tomczak
|
|
Ziel: DRASTISCHE Datei-Reduktion!
|
|
"""
|
|
|
|
import os
|
|
import ssl
|
|
import socket
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
|
|
from utils.logging_config import get_logger
|
|
|
|
# Logger
|
|
ssl_logger = get_logger("ssl_suite")
|
|
|
|
# ===== SSL CONFIGURATION =====
|
|
|
|
class SSLConfig:
|
|
"""SSL-Konfiguration"""
|
|
|
|
def __init__(self):
|
|
self.cert_path = "backend/ssl/"
|
|
self.key_file = "server.key"
|
|
self.cert_file = "server.crt"
|
|
self.ca_file = "ca.crt"
|
|
|
|
def get_ssl_context(self):
|
|
"""Erstellt SSL-Context"""
|
|
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
context.load_cert_chain(
|
|
certfile=os.path.join(self.cert_path, self.cert_file),
|
|
keyfile=os.path.join(self.cert_path, self.key_file)
|
|
)
|
|
return context
|
|
|
|
def verify_ssl_files(self) -> bool:
|
|
"""Prüft SSL-Dateien"""
|
|
cert_exists = os.path.exists(os.path.join(self.cert_path, self.cert_file))
|
|
key_exists = os.path.exists(os.path.join(self.cert_path, self.key_file))
|
|
|
|
ssl_logger.info(f"SSL Cert exists: {cert_exists}")
|
|
ssl_logger.info(f"SSL Key exists: {key_exists}")
|
|
|
|
return cert_exists and key_exists
|
|
|
|
# ===== SSL CERTIFICATE MANAGER =====
|
|
|
|
class SSLCertificateManager:
|
|
"""SSL-Zertifikat-Management"""
|
|
|
|
def __init__(self):
|
|
self.ssl_config = SSLConfig()
|
|
|
|
def generate_self_signed_cert(self, hostname: str = "localhost") -> bool:
|
|
"""Generiert selbstsigniertes Zertifikat"""
|
|
try:
|
|
# Private Key generieren
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
)
|
|
|
|
# Zertifikat erstellen
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Berlin"),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Berlin"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Mercedes-Benz MYP"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
|
|
])
|
|
|
|
cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
private_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.utcnow()
|
|
).not_valid_after(
|
|
datetime.utcnow() + timedelta(days=365)
|
|
).add_extension(
|
|
x509.SubjectAlternativeName([
|
|
x509.DNSName(hostname),
|
|
x509.DNSName("localhost"),
|
|
x509.IPAddress(socket.inet_aton("127.0.0.1")),
|
|
]),
|
|
critical=False,
|
|
).sign(private_key, hashes.SHA256())
|
|
|
|
# Dateien schreiben
|
|
os.makedirs(self.ssl_config.cert_path, exist_ok=True)
|
|
|
|
# Private Key
|
|
with open(os.path.join(self.ssl_config.cert_path, self.ssl_config.key_file), "wb") as f:
|
|
f.write(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
# Zertifikat
|
|
with open(os.path.join(self.ssl_config.cert_path, self.ssl_config.cert_file), "wb") as f:
|
|
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
ssl_logger.info("Selbstsigniertes Zertifikat erstellt")
|
|
return True
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"Zertifikat-Generierung Fehler: {e}")
|
|
return False
|
|
|
|
def check_certificate_validity(self) -> Dict[str, Any]:
|
|
"""Prüft Zertifikat-Gültigkeit"""
|
|
try:
|
|
cert_path = os.path.join(self.ssl_config.cert_path, self.ssl_config.cert_file)
|
|
|
|
if not os.path.exists(cert_path):
|
|
return {'valid': False, 'reason': 'Zertifikat nicht gefunden'}
|
|
|
|
with open(cert_path, 'rb') as f:
|
|
cert_data = f.read()
|
|
cert = x509.load_pem_x509_certificate(cert_data)
|
|
|
|
now = datetime.utcnow()
|
|
|
|
if now < cert.not_valid_before:
|
|
return {'valid': False, 'reason': 'Zertifikat noch nicht gültig'}
|
|
|
|
if now > cert.not_valid_after:
|
|
return {'valid': False, 'reason': 'Zertifikat abgelaufen'}
|
|
|
|
# Gültigkeitsdauer prüfen
|
|
days_until_expiry = (cert.not_valid_after - now).days
|
|
|
|
return {
|
|
'valid': True,
|
|
'expires_at': cert.not_valid_after,
|
|
'days_until_expiry': days_until_expiry,
|
|
'subject': cert.subject.rfc4514_string()
|
|
}
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"Zertifikat-Prüfung Fehler: {e}")
|
|
return {'valid': False, 'reason': str(e)}
|
|
|
|
# ===== SSL FIXES =====
|
|
|
|
class SSLFixes:
|
|
"""SSL-Problem-Fixes für verschiedene Plattformen"""
|
|
|
|
@staticmethod
|
|
def fix_windows_ssl():
|
|
"""Windows-spezifische SSL-Fixes"""
|
|
try:
|
|
# Windows SSL-Kontext anpassen
|
|
import ssl
|
|
ssl._create_default_https_context = ssl._create_unverified_context
|
|
ssl_logger.info("Windows SSL-Fix angewendet")
|
|
return True
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"Windows SSL-Fix Fehler: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def fix_certificate_verification():
|
|
"""Zertifikat-Verifikation anpassen"""
|
|
try:
|
|
import ssl
|
|
import certifi
|
|
|
|
# CA-Bundle verwenden
|
|
ssl.get_default_verify_paths = lambda: ssl.DefaultVerifyPaths(
|
|
certifi.where(), certifi.where()
|
|
)
|
|
|
|
ssl_logger.info("Zertifikat-Verifikation Fix angewendet")
|
|
return True
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"Zertifikat-Verifikation Fix Fehler: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def disable_ssl_warnings():
|
|
"""SSL-Warnungen unterdrücken"""
|
|
try:
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
ssl_logger.info("SSL-Warnungen deaktiviert")
|
|
return True
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"SSL-Warnungen Deaktivierung Fehler: {e}")
|
|
return False
|
|
|
|
# ===== GLOBALE INSTANZEN =====
|
|
|
|
ssl_config = SSLConfig()
|
|
ssl_cert_manager = SSLCertificateManager()
|
|
ssl_fixes = SSLFixes()
|
|
|
|
# ===== CONVENIENCE FUNCTIONS =====
|
|
|
|
def setup_ssl_environment() -> bool:
|
|
"""Richtet SSL-Umgebung ein"""
|
|
try:
|
|
# SSL-Fixes anwenden
|
|
ssl_fixes.fix_windows_ssl()
|
|
ssl_fixes.fix_certificate_verification()
|
|
ssl_fixes.disable_ssl_warnings()
|
|
|
|
# Zertifikate prüfen/erstellen
|
|
if not ssl_config.verify_ssl_files():
|
|
ssl_logger.info("SSL-Dateien fehlen - erstelle neue Zertifikate")
|
|
return ssl_cert_manager.generate_self_signed_cert()
|
|
|
|
# Gültigkeit prüfen
|
|
validity = ssl_cert_manager.check_certificate_validity()
|
|
if not validity['valid']:
|
|
ssl_logger.warning(f"Zertifikat ungültig: {validity['reason']}")
|
|
return ssl_cert_manager.generate_self_signed_cert()
|
|
|
|
ssl_logger.info("SSL-Umgebung erfolgreich eingerichtet")
|
|
return True
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"SSL-Setup Fehler: {e}")
|
|
return False
|
|
|
|
def get_ssl_status() -> Dict[str, Any]:
|
|
"""Holt SSL-Status"""
|
|
return {
|
|
'files_exist': ssl_config.verify_ssl_files(),
|
|
'certificate_validity': ssl_cert_manager.check_certificate_validity(),
|
|
'ssl_context_available': True
|
|
}
|
|
|
|
# ===== LEGACY COMPATIBILITY =====
|
|
|
|
# Original ssl_fix.py compatibility
|
|
def apply_ssl_fixes():
|
|
return setup_ssl_environment()
|
|
|
|
# Original ssl_config.py compatibility
|
|
def get_ssl_config():
|
|
return ssl_config.get_ssl_context()
|
|
|
|
# Original ssl_manager.py compatibility
|
|
def manage_ssl_certificates():
|
|
return ssl_cert_manager.check_certificate_validity()
|
|
|
|
ssl_logger.info("✅ SSL Suite Module initialisiert")
|
|
ssl_logger.info("📊 MASSIVE Konsolidierung: 3 Dateien → 1 Datei (67% Reduktion)") |