#!/usr/bin/env python3.11 """ SSL Suite - ULTRA KONSOLIDIERUNG =============================== Migration Information: - Ursprünglich: ssl_fix.py, ssl_config.py, ssl_manager.py - Konsolidiert am: 2025-06-09 - Funktionalitäten: SSL-Fixes, SSL-Konfiguration, Zertifikat-Management - Breaking Changes: Keine - Alle Original-APIs bleiben verfügbar ULTRA KONSOLIDIERUNG für Projektarbeit MYP Author: MYP Team - Till Tomczak Ziel: DRASTISCHE Datei-Reduktion! """ import os import ssl import socket import subprocess from datetime import datetime, timedelta from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from utils.logging_config import get_logger # Logger ssl_logger = get_logger("ssl_suite") # ===== SSL CONFIGURATION ===== class SSLConfig: """SSL-Konfiguration""" def __init__(self): self.cert_path = "backend/ssl/" self.key_file = "server.key" self.cert_file = "server.crt" self.ca_file = "ca.crt" def get_ssl_context(self): """Erstellt SSL-Context""" context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain( certfile=os.path.join(self.cert_path, self.cert_file), keyfile=os.path.join(self.cert_path, self.key_file) ) return context def verify_ssl_files(self) -> bool: """Prüft SSL-Dateien""" cert_exists = os.path.exists(os.path.join(self.cert_path, self.cert_file)) key_exists = os.path.exists(os.path.join(self.cert_path, self.key_file)) ssl_logger.info(f"SSL Cert exists: {cert_exists}") ssl_logger.info(f"SSL Key exists: {key_exists}") return cert_exists and key_exists # ===== SSL CERTIFICATE MANAGER ===== class SSLCertificateManager: """SSL-Zertifikat-Management""" def __init__(self): self.ssl_config = SSLConfig() def generate_self_signed_cert(self, hostname: str = "localhost") -> bool: """Generiert selbstsigniertes Zertifikat""" try: # Private Key generieren private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) # Zertifikat erstellen subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Berlin"), x509.NameAttribute(NameOID.LOCALITY_NAME, "Berlin"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Mercedes-Benz MYP"), x509.NameAttribute(NameOID.COMMON_NAME, hostname), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( private_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.utcnow() ).not_valid_after( datetime.utcnow() + timedelta(days=365) ).add_extension( x509.SubjectAlternativeName([ x509.DNSName(hostname), x509.DNSName("localhost"), x509.IPAddress(socket.inet_aton("127.0.0.1")), ]), critical=False, ).sign(private_key, hashes.SHA256()) # Dateien schreiben os.makedirs(self.ssl_config.cert_path, exist_ok=True) # Private Key with open(os.path.join(self.ssl_config.cert_path, self.ssl_config.key_file), "wb") as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) # Zertifikat with open(os.path.join(self.ssl_config.cert_path, self.ssl_config.cert_file), "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) ssl_logger.info("Selbstsigniertes Zertifikat erstellt") return True except Exception as e: ssl_logger.error(f"Zertifikat-Generierung Fehler: {e}") return False def check_certificate_validity(self) -> Dict[str, Any]: """Prüft Zertifikat-Gültigkeit""" try: cert_path = os.path.join(self.ssl_config.cert_path, self.ssl_config.cert_file) if not os.path.exists(cert_path): return {'valid': False, 'reason': 'Zertifikat nicht gefunden'} with open(cert_path, 'rb') as f: cert_data = f.read() cert = x509.load_pem_x509_certificate(cert_data) now = datetime.utcnow() if now < cert.not_valid_before: return {'valid': False, 'reason': 'Zertifikat noch nicht gültig'} if now > cert.not_valid_after: return {'valid': False, 'reason': 'Zertifikat abgelaufen'} # Gültigkeitsdauer prüfen days_until_expiry = (cert.not_valid_after - now).days return { 'valid': True, 'expires_at': cert.not_valid_after, 'days_until_expiry': days_until_expiry, 'subject': cert.subject.rfc4514_string() } except Exception as e: ssl_logger.error(f"Zertifikat-Prüfung Fehler: {e}") return {'valid': False, 'reason': str(e)} # ===== SSL FIXES ===== class SSLFixes: """SSL-Problem-Fixes für verschiedene Plattformen""" @staticmethod def fix_windows_ssl(): """Windows-spezifische SSL-Fixes""" try: # Windows SSL-Kontext anpassen import ssl ssl._create_default_https_context = ssl._create_unverified_context ssl_logger.info("Windows SSL-Fix angewendet") return True except Exception as e: ssl_logger.error(f"Windows SSL-Fix Fehler: {e}") return False @staticmethod def fix_certificate_verification(): """Zertifikat-Verifikation anpassen""" try: import ssl import certifi # CA-Bundle verwenden ssl.get_default_verify_paths = lambda: ssl.DefaultVerifyPaths( certifi.where(), certifi.where() ) ssl_logger.info("Zertifikat-Verifikation Fix angewendet") return True except Exception as e: ssl_logger.error(f"Zertifikat-Verifikation Fix Fehler: {e}") return False @staticmethod def disable_ssl_warnings(): """SSL-Warnungen unterdrücken""" try: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ssl_logger.info("SSL-Warnungen deaktiviert") return True except Exception as e: ssl_logger.error(f"SSL-Warnungen Deaktivierung Fehler: {e}") return False # ===== GLOBALE INSTANZEN ===== ssl_config = SSLConfig() ssl_cert_manager = SSLCertificateManager() ssl_fixes = SSLFixes() # ===== CONVENIENCE FUNCTIONS ===== def setup_ssl_environment() -> bool: """Richtet SSL-Umgebung ein""" try: # SSL-Fixes anwenden ssl_fixes.fix_windows_ssl() ssl_fixes.fix_certificate_verification() ssl_fixes.disable_ssl_warnings() # Zertifikate prüfen/erstellen if not ssl_config.verify_ssl_files(): ssl_logger.info("SSL-Dateien fehlen - erstelle neue Zertifikate") return ssl_cert_manager.generate_self_signed_cert() # Gültigkeit prüfen validity = ssl_cert_manager.check_certificate_validity() if not validity['valid']: ssl_logger.warning(f"Zertifikat ungültig: {validity['reason']}") return ssl_cert_manager.generate_self_signed_cert() ssl_logger.info("SSL-Umgebung erfolgreich eingerichtet") return True except Exception as e: ssl_logger.error(f"SSL-Setup Fehler: {e}") return False def get_ssl_status() -> Dict[str, Any]: """Holt SSL-Status""" return { 'files_exist': ssl_config.verify_ssl_files(), 'certificate_validity': ssl_cert_manager.check_certificate_validity(), 'ssl_context_available': True } # ===== LEGACY COMPATIBILITY ===== # Original ssl_fix.py compatibility def apply_ssl_fixes(): return setup_ssl_environment() # Original ssl_config.py compatibility def get_ssl_config(): return ssl_config.get_ssl_context() # Original ssl_manager.py compatibility def manage_ssl_certificates(): return ssl_cert_manager.check_certificate_validity() ssl_logger.info("✅ SSL Suite Module initialisiert") ssl_logger.info("📊 MASSIVE Konsolidierung: 3 Dateien → 1 Datei (67% Reduktion)")