387 lines
16 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SSL-Zertifikatsverwaltung für das Mercedes-Benz MYP-System
Konsolidiert die Funktionalität der SSL-Zertifikatsgenerierung
"""
import os
import datetime
import shutil
import platform
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
import ipaddress
from config.settings import SSL_CERT_PATH, SSL_KEY_PATH, SSL_HOSTNAME
from utils.logging_config import get_logger
ssl_logger = get_logger("ssl")
class SSLCertificateManager:
"""
Verwaltet SSL-Zertifikate für das MYP-System
"""
def __init__(self):
self.cert_path = SSL_CERT_PATH
self.key_path = SSL_KEY_PATH
self.hostname = SSL_HOSTNAME
# Verzeichnisse definieren
self.certs_dir = os.path.dirname(self.cert_path)
self.frontend_ssl_dir = "/home/user/Projektarbeit-MYP/frontend/ssl"
# Mercedes-Benz spezifische Konfiguration
self.mercedes_config = {
"organization": "Mercedes-Benz AG",
"organizational_unit": "Werk 040 Berlin",
"locality": "Berlin",
"state": "Berlin",
"country": "DE",
"email": "admin@mercedes-benz.com"
}
# Erweiterte Hostnamen und IP-Adressen
self.hostnames = [
"localhost",
"raspberrypi",
"m040tbaraspi001",
"m040tbaraspi001.de040.corpintra.net",
"mbag.corpintra.net",
"mbag.mb.corpintra.net"
]
self.ip_addresses = [
"127.0.0.1",
"192.168.0.101",
"192.168.0.102",
"192.168.0.103",
"192.168.0.104",
"192.168.0.105",
"192.168.0.106"
]
def ensure_directories(self) -> None:
"""Erstellt notwendige Verzeichnisse"""
os.makedirs(self.certs_dir, exist_ok=True)
os.makedirs(self.frontend_ssl_dir, exist_ok=True)
ssl_logger.info(f"SSL-Verzeichnisse erstellt: {self.certs_dir}, {self.frontend_ssl_dir}")
def cleanup_old_certificates(self) -> None:
"""Entfernt alte Zertifikate und veraltete Verzeichnisse"""
# Alte SSL-Verzeichnisse löschen
old_ssl_dirs = [
os.path.join(os.path.dirname(os.path.dirname(self.certs_dir)), "app", "instance", "ssl"),
os.path.join(os.path.dirname(os.path.dirname(self.certs_dir)), "app", "certs")
]
for old_dir in old_ssl_dirs:
if os.path.exists(old_dir):
ssl_logger.info(f"Lösche alten SSL-Ordner: {old_dir}")
try:
shutil.rmtree(old_dir)
except Exception as e:
ssl_logger.warning(f"Konnte alten SSL-Ordner nicht löschen: {e}")
# Alte Zertifikate im aktuellen Verzeichnis entfernen
for path in [self.cert_path, self.key_path]:
if os.path.exists(path):
os.remove(path)
ssl_logger.info(f"Alte Zertifikatsdatei entfernt: {path}")
def generate_mercedes_certificate(self, key_size: int = 4096, validity_days: int = 365) -> bool:
"""
Generiert ein vollständiges Mercedes-Benz SSL-Zertifikat
Args:
key_size: Schlüsselgröße in Bits (Standard: 4096)
validity_days: Gültigkeitsdauer in Tagen (Standard: 365)
Returns:
bool: True bei Erfolg, False bei Fehler
"""
ssl_logger.info("Generiere Mercedes-Benz SSL-Zertifikat...")
try:
# Verzeichnisse vorbereiten
self.ensure_directories()
self.cleanup_old_certificates()
# Privaten Schlüssel generieren
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
)
ssl_logger.info(f"Privater Schlüssel mit {key_size} Bit generiert")
# Zeitstempel
now = datetime.datetime.now()
valid_until = now + datetime.timedelta(days=validity_days)
# Zertifikatsattribute für Mercedes-Benz
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, self.hostname),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, self.mercedes_config["organization"]),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.mercedes_config["organizational_unit"]),
x509.NameAttribute(NameOID.LOCALITY_NAME, self.mercedes_config["locality"]),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, self.mercedes_config["state"]),
x509.NameAttribute(NameOID.COUNTRY_NAME, self.mercedes_config["country"]),
x509.NameAttribute(NameOID.EMAIL_ADDRESS, self.mercedes_config["email"]),
])
# Subject Alternative Names (SAN) erstellen
san_list = []
for hostname in self.hostnames:
san_list.append(x509.DNSName(hostname))
for ip in self.ip_addresses:
try:
san_list.append(x509.IPAddress(ipaddress.IPv4Address(ip)))
except ipaddress.AddressValueError:
ssl_logger.warning(f"Ungültige IP-Adresse übersprungen: {ip}")
# Zertifikat erstellen
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
now
).not_valid_after(
valid_until
).add_extension(
x509.SubjectAlternativeName(san_list),
critical=False,
).add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True
).add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False
), critical=True
).add_extension(
x509.ExtendedKeyUsage([
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH,
x509.oid.ExtendedKeyUsageOID.CODE_SIGNING
]), critical=False
).sign(private_key, hashes.SHA256())
# Zertifikat und Schlüssel speichern
with open(self.key_path, "wb") as f:
f.write(private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption()
))
with open(self.cert_path, "wb") as f:
f.write(cert.public_bytes(Encoding.PEM))
# Berechtigungen setzen
os.chmod(self.key_path, 0o600)
os.chmod(self.cert_path, 0o644)
ssl_logger.info(f"Mercedes-Benz SSL-Zertifikat erfolgreich erstellt:")
ssl_logger.info(f"- Zertifikat: {os.path.abspath(self.cert_path)}")
ssl_logger.info(f"- Schlüssel: {os.path.abspath(self.key_path)}")
ssl_logger.info(f"- Gültig bis: {valid_until.strftime('%d.%m.%Y')}")
ssl_logger.info(f"- Hostnamen: {', '.join(self.hostnames)}")
ssl_logger.info(f"- IP-Adressen: {', '.join(self.ip_addresses)}")
# Zertifikate ins Frontend kopieren
self._copy_to_frontend()
return True
except Exception as e:
ssl_logger.error(f"Fehler beim Erstellen des Mercedes-Benz SSL-Zertifikats: {e}")
return False
def _copy_to_frontend(self) -> bool:
"""Kopiert Zertifikate ins Frontend-Verzeichnis"""
try:
shutil.copy2(self.cert_path, os.path.join(self.frontend_ssl_dir, "myp.crt"))
shutil.copy2(self.key_path, os.path.join(self.frontend_ssl_dir, "myp.key"))
ssl_logger.info(f"Zertifikate ins Frontend kopiert: {os.path.abspath(self.frontend_ssl_dir)}")
return True
except Exception as e:
ssl_logger.error(f"Fehler beim Kopieren ins Frontend: {e}")
return False
def install_system_certificate(self) -> bool:
"""
Installiert das Zertifikat im System-Zertifikatsspeicher
Nur für Windows-Systeme
"""
if platform.system() != "Windows":
ssl_logger.warning("System-Zertifikatsinstallation nur unter Windows verfügbar")
return False
try:
if not os.path.exists(self.cert_path):
ssl_logger.error(f"Zertifikat nicht gefunden: {self.cert_path}")
return False
# Befehle zum Installieren des Zertifikats im Windows-Zertifikatsspeicher
commands = [
["certutil", "-addstore", "-f", "ROOT", self.cert_path],
["certutil", "-addstore", "-f", "CA", self.cert_path],
["certutil", "-addstore", "-f", "MY", self.cert_path]
]
for cmd in commands:
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
ssl_logger.debug(f"Certutil-Befehl ausgeführt: {' '.join(cmd)}")
ssl_logger.info("Zertifikat erfolgreich im System-Zertifikatsspeicher installiert")
return True
except subprocess.CalledProcessError as e:
ssl_logger.error(f"Fehler bei der Installation des Zertifikats im System: {e}")
return False
except Exception as e:
ssl_logger.error(f"Unerwarteter Fehler bei der Zertifikatsinstallation: {e}")
return False
def copy_to_raspberry(self, host: str = "raspberrypi", user: str = "user", dest: str = "/home/user/Projektarbeit-MYP/backend/app/certs") -> bool:
"""
Kopiert das Zertifikat auf den Raspberry Pi
Args:
host: Hostname des Raspberry Pi
user: Benutzername für SSH
dest: Zielverzeichnis auf dem Raspberry Pi
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
if not os.path.exists(self.cert_path) or not os.path.exists(self.key_path):
ssl_logger.error("Zertifikatsdateien nicht gefunden")
return False
# SSH-Befehl zum Erstellen des Verzeichnisses
ssh_command = ["ssh", f"{user}@{host}", f"mkdir -p {dest}"]
subprocess.run(ssh_command, check=True)
ssl_logger.info(f"Verzeichnis auf Raspberry Pi erstellt: {dest}")
# SCP-Befehle zum Kopieren der Dateien
scp_commands = [
["scp", self.cert_path, f"{user}@{host}:{dest}/myp.crt"],
["scp", self.key_path, f"{user}@{host}:{dest}/myp.key"]
]
for cmd in scp_commands:
subprocess.run(cmd, check=True)
ssl_logger.info(f"Datei kopiert: {cmd[1]} -> {cmd[2]}")
# Berechtigungen setzen
chmod_command = ["ssh", f"{user}@{host}", f"chmod 600 {dest}/myp.key"]
subprocess.run(chmod_command, check=True)
# Zertifikat im System registrieren
install_command = ["ssh", f"{user}@{host}",
f"sudo cp {dest}/myp.crt /usr/local/share/ca-certificates/ && sudo update-ca-certificates"]
subprocess.run(install_command, check=True)
ssl_logger.info(f"Zertifikate erfolgreich auf Raspberry Pi installiert: {host}:{dest}")
return True
except subprocess.CalledProcessError as e:
ssl_logger.error(f"Fehler beim Kopieren auf Raspberry Pi: {e}")
return False
except Exception as e:
ssl_logger.error(f"Unerwarteter Fehler beim Raspberry Pi-Transfer: {e}")
return False
def get_certificate_info(self) -> Optional[Dict[str, Any]]:
"""
Gibt Informationen über das aktuelle Zertifikat zurück
Returns:
Dict mit Zertifikatsinformationen oder None bei Fehler
"""
try:
if not os.path.exists(self.cert_path):
return None
with open(self.cert_path, "rb") as f:
cert = x509.load_pem_x509_certificate(f.read())
return {
"subject": cert.subject.rfc4514_string(),
"issuer": cert.issuer.rfc4514_string(),
"serial_number": str(cert.serial_number),
"not_valid_before": cert.not_valid_before.strftime('%d.%m.%Y %H:%M:%S'),
"not_valid_after": cert.not_valid_after.strftime('%d.%m.%Y %H:%M:%S'),
"is_expired": cert.not_valid_after < datetime.datetime.now(),
"days_until_expiry": (cert.not_valid_after - datetime.datetime.now()).days,
"fingerprint": cert.fingerprint(hashes.SHA256()).hex(),
"key_size": cert.public_key().key_size if hasattr(cert.public_key(), 'key_size') else None
}
except Exception as e:
ssl_logger.error(f"Fehler beim Lesen der Zertifikatsinformationen: {e}")
return None
def is_certificate_valid(self) -> bool:
"""
Prüft, ob das aktuelle Zertifikat gültig ist
Returns:
bool: True wenn gültig, False wenn ungültig oder nicht vorhanden
"""
cert_info = self.get_certificate_info()
if not cert_info:
return False
return not cert_info["is_expired"] and cert_info["days_until_expiry"] > 30
def regenerate_if_needed(self) -> bool:
"""
Regeneriert das Zertifikat, falls es ungültig oder bald abgelaufen ist
Returns:
bool: True wenn regeneriert oder bereits gültig, False bei Fehler
"""
if self.is_certificate_valid():
ssl_logger.info("Zertifikat ist noch gültig, keine Regenerierung notwendig")
return True
ssl_logger.info("Zertifikat ist ungültig oder läuft bald ab, regeneriere...")
return self.generate_mercedes_certificate()
# Globale Instanz für einfachen Zugriff
ssl_manager = SSLCertificateManager()
def generate_ssl_certificate() -> bool:
"""Wrapper-Funktion für Rückwärtskompatibilität"""
return ssl_manager.generate_mercedes_certificate()
def get_ssl_certificate_info() -> Optional[Dict[str, Any]]:
"""Wrapper-Funktion für Zertifikatsinformationen"""
return ssl_manager.get_certificate_info()
def ensure_valid_ssl_certificate() -> bool:
"""Stellt sicher, dass ein gültiges SSL-Zertifikat vorhanden ist"""
return ssl_manager.regenerate_if_needed()