387 lines
16 KiB
Python
387 lines
16 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
SSL-Zertifikatsverwaltung für das Mercedes-Benz MYP-System
|
|
Konsolidiert die Funktionalität der SSL-Zertifikatsgenerierung
|
|
"""
|
|
|
|
import os
|
|
import datetime
|
|
import shutil
|
|
import platform
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
|
|
import ipaddress
|
|
|
|
from config.settings import SSL_CERT_PATH, SSL_KEY_PATH, SSL_HOSTNAME
|
|
from utils.logging_config import get_logger
|
|
|
|
ssl_logger = get_logger("ssl")
|
|
|
|
class SSLCertificateManager:
|
|
"""
|
|
Verwaltet SSL-Zertifikate für das MYP-System
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.cert_path = SSL_CERT_PATH
|
|
self.key_path = SSL_KEY_PATH
|
|
self.hostname = SSL_HOSTNAME
|
|
|
|
# Verzeichnisse definieren
|
|
self.certs_dir = os.path.dirname(self.cert_path)
|
|
self.frontend_ssl_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(self.certs_dir))), "frontend", "ssl")
|
|
|
|
# Mercedes-Benz spezifische Konfiguration
|
|
self.mercedes_config = {
|
|
"organization": "Mercedes-Benz AG",
|
|
"organizational_unit": "Werk 040 Berlin",
|
|
"locality": "Berlin",
|
|
"state": "Berlin",
|
|
"country": "DE",
|
|
"email": "admin@mercedes-benz.com"
|
|
}
|
|
|
|
# Erweiterte Hostnamen und IP-Adressen
|
|
self.hostnames = [
|
|
"localhost",
|
|
"raspberrypi",
|
|
"m040tbaraspi001",
|
|
"m040tbaraspi001.de040.corpintra.net",
|
|
"mbag.corpintra.net",
|
|
"mbag.mb.corpintra.net"
|
|
]
|
|
|
|
self.ip_addresses = [
|
|
"127.0.0.1",
|
|
"192.168.0.101",
|
|
"192.168.0.102",
|
|
"192.168.0.103",
|
|
"192.168.0.104",
|
|
"192.168.0.105",
|
|
"192.168.0.106"
|
|
]
|
|
|
|
def ensure_directories(self) -> None:
|
|
"""Erstellt notwendige Verzeichnisse"""
|
|
os.makedirs(self.certs_dir, exist_ok=True)
|
|
os.makedirs(self.frontend_ssl_dir, exist_ok=True)
|
|
ssl_logger.info(f"SSL-Verzeichnisse erstellt: {self.certs_dir}, {self.frontend_ssl_dir}")
|
|
|
|
def cleanup_old_certificates(self) -> None:
|
|
"""Entfernt alte Zertifikate und veraltete Verzeichnisse"""
|
|
# Alte SSL-Verzeichnisse löschen
|
|
old_ssl_dirs = [
|
|
os.path.join(os.path.dirname(os.path.dirname(self.certs_dir)), "app", "instance", "ssl"),
|
|
os.path.join(os.path.dirname(os.path.dirname(self.certs_dir)), "app", "certs")
|
|
]
|
|
|
|
for old_dir in old_ssl_dirs:
|
|
if os.path.exists(old_dir):
|
|
ssl_logger.info(f"Lösche alten SSL-Ordner: {old_dir}")
|
|
try:
|
|
shutil.rmtree(old_dir)
|
|
except Exception as e:
|
|
ssl_logger.warning(f"Konnte alten SSL-Ordner nicht löschen: {e}")
|
|
|
|
# Alte Zertifikate im aktuellen Verzeichnis entfernen
|
|
for path in [self.cert_path, self.key_path]:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
ssl_logger.info(f"Alte Zertifikatsdatei entfernt: {path}")
|
|
|
|
def generate_mercedes_certificate(self, key_size: int = 4096, validity_days: int = 365) -> bool:
|
|
"""
|
|
Generiert ein vollständiges Mercedes-Benz SSL-Zertifikat
|
|
|
|
Args:
|
|
key_size: Schlüsselgröße in Bits (Standard: 4096)
|
|
validity_days: Gültigkeitsdauer in Tagen (Standard: 365)
|
|
|
|
Returns:
|
|
bool: True bei Erfolg, False bei Fehler
|
|
"""
|
|
ssl_logger.info("Generiere Mercedes-Benz SSL-Zertifikat...")
|
|
|
|
try:
|
|
# Verzeichnisse vorbereiten
|
|
self.ensure_directories()
|
|
self.cleanup_old_certificates()
|
|
|
|
# Privaten Schlüssel generieren
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=key_size,
|
|
)
|
|
ssl_logger.info(f"Privater Schlüssel mit {key_size} Bit generiert")
|
|
|
|
# Zeitstempel
|
|
now = datetime.datetime.now()
|
|
valid_until = now + datetime.timedelta(days=validity_days)
|
|
|
|
# Zertifikatsattribute für Mercedes-Benz
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, self.hostname),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, self.mercedes_config["organization"]),
|
|
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.mercedes_config["organizational_unit"]),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, self.mercedes_config["locality"]),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, self.mercedes_config["state"]),
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, self.mercedes_config["country"]),
|
|
x509.NameAttribute(NameOID.EMAIL_ADDRESS, self.mercedes_config["email"]),
|
|
])
|
|
|
|
# Subject Alternative Names (SAN) erstellen
|
|
san_list = []
|
|
for hostname in self.hostnames:
|
|
san_list.append(x509.DNSName(hostname))
|
|
|
|
for ip in self.ip_addresses:
|
|
try:
|
|
san_list.append(x509.IPAddress(ipaddress.IPv4Address(ip)))
|
|
except ipaddress.AddressValueError:
|
|
ssl_logger.warning(f"Ungültige IP-Adresse übersprungen: {ip}")
|
|
|
|
# Zertifikat erstellen
|
|
cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
private_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
now
|
|
).not_valid_after(
|
|
valid_until
|
|
).add_extension(
|
|
x509.SubjectAlternativeName(san_list),
|
|
critical=False,
|
|
).add_extension(
|
|
x509.BasicConstraints(ca=True, path_length=None), critical=True
|
|
).add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
content_commitment=False,
|
|
key_encipherment=True,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
key_cert_sign=True,
|
|
crl_sign=True,
|
|
encipher_only=False,
|
|
decipher_only=False
|
|
), critical=True
|
|
).add_extension(
|
|
x509.ExtendedKeyUsage([
|
|
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
|
|
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH,
|
|
x509.oid.ExtendedKeyUsageOID.CODE_SIGNING
|
|
]), critical=False
|
|
).sign(private_key, hashes.SHA256())
|
|
|
|
# Zertifikat und Schlüssel speichern
|
|
with open(self.key_path, "wb") as f:
|
|
f.write(private_key.private_bytes(
|
|
encoding=Encoding.PEM,
|
|
format=PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=NoEncryption()
|
|
))
|
|
|
|
with open(self.cert_path, "wb") as f:
|
|
f.write(cert.public_bytes(Encoding.PEM))
|
|
|
|
# Berechtigungen setzen
|
|
os.chmod(self.key_path, 0o600)
|
|
os.chmod(self.cert_path, 0o644)
|
|
|
|
ssl_logger.info(f"Mercedes-Benz SSL-Zertifikat erfolgreich erstellt:")
|
|
ssl_logger.info(f"- Zertifikat: {os.path.abspath(self.cert_path)}")
|
|
ssl_logger.info(f"- Schlüssel: {os.path.abspath(self.key_path)}")
|
|
ssl_logger.info(f"- Gültig bis: {valid_until.strftime('%d.%m.%Y')}")
|
|
ssl_logger.info(f"- Hostnamen: {', '.join(self.hostnames)}")
|
|
ssl_logger.info(f"- IP-Adressen: {', '.join(self.ip_addresses)}")
|
|
|
|
# Zertifikate ins Frontend kopieren
|
|
self._copy_to_frontend()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"Fehler beim Erstellen des Mercedes-Benz SSL-Zertifikats: {e}")
|
|
return False
|
|
|
|
def _copy_to_frontend(self) -> bool:
|
|
"""Kopiert Zertifikate ins Frontend-Verzeichnis"""
|
|
try:
|
|
shutil.copy2(self.cert_path, os.path.join(self.frontend_ssl_dir, "myp.crt"))
|
|
shutil.copy2(self.key_path, os.path.join(self.frontend_ssl_dir, "myp.key"))
|
|
ssl_logger.info(f"Zertifikate ins Frontend kopiert: {os.path.abspath(self.frontend_ssl_dir)}")
|
|
return True
|
|
except Exception as e:
|
|
ssl_logger.error(f"Fehler beim Kopieren ins Frontend: {e}")
|
|
return False
|
|
|
|
def install_system_certificate(self) -> bool:
|
|
"""
|
|
Installiert das Zertifikat im System-Zertifikatsspeicher
|
|
Nur für Windows-Systeme
|
|
"""
|
|
if platform.system() != "Windows":
|
|
ssl_logger.warning("System-Zertifikatsinstallation nur unter Windows verfügbar")
|
|
return False
|
|
|
|
try:
|
|
if not os.path.exists(self.cert_path):
|
|
ssl_logger.error(f"Zertifikat nicht gefunden: {self.cert_path}")
|
|
return False
|
|
|
|
# Befehle zum Installieren des Zertifikats im Windows-Zertifikatsspeicher
|
|
commands = [
|
|
["certutil", "-addstore", "-f", "ROOT", self.cert_path],
|
|
["certutil", "-addstore", "-f", "CA", self.cert_path],
|
|
["certutil", "-addstore", "-f", "MY", self.cert_path]
|
|
]
|
|
|
|
for cmd in commands:
|
|
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
ssl_logger.debug(f"Certutil-Befehl ausgeführt: {' '.join(cmd)}")
|
|
|
|
ssl_logger.info("Zertifikat erfolgreich im System-Zertifikatsspeicher installiert")
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
ssl_logger.error(f"Fehler bei der Installation des Zertifikats im System: {e}")
|
|
return False
|
|
except Exception as e:
|
|
ssl_logger.error(f"Unerwarteter Fehler bei der Zertifikatsinstallation: {e}")
|
|
return False
|
|
|
|
def copy_to_raspberry(self, host: str = "raspberrypi", user: str = "pi", dest: str = "/home/pi/myp/ssl") -> bool:
|
|
"""
|
|
Kopiert das Zertifikat auf den Raspberry Pi
|
|
|
|
Args:
|
|
host: Hostname des Raspberry Pi
|
|
user: Benutzername für SSH
|
|
dest: Zielverzeichnis auf dem Raspberry Pi
|
|
|
|
Returns:
|
|
bool: True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
if not os.path.exists(self.cert_path) or not os.path.exists(self.key_path):
|
|
ssl_logger.error("Zertifikatsdateien nicht gefunden")
|
|
return False
|
|
|
|
# SSH-Befehl zum Erstellen des Verzeichnisses
|
|
ssh_command = ["ssh", f"{user}@{host}", f"mkdir -p {dest}"]
|
|
subprocess.run(ssh_command, check=True)
|
|
ssl_logger.info(f"Verzeichnis auf Raspberry Pi erstellt: {dest}")
|
|
|
|
# SCP-Befehle zum Kopieren der Dateien
|
|
scp_commands = [
|
|
["scp", self.cert_path, f"{user}@{host}:{dest}/myp.crt"],
|
|
["scp", self.key_path, f"{user}@{host}:{dest}/myp.key"]
|
|
]
|
|
|
|
for cmd in scp_commands:
|
|
subprocess.run(cmd, check=True)
|
|
ssl_logger.info(f"Datei kopiert: {cmd[1]} -> {cmd[2]}")
|
|
|
|
# Berechtigungen setzen
|
|
chmod_command = ["ssh", f"{user}@{host}", f"chmod 600 {dest}/myp.key"]
|
|
subprocess.run(chmod_command, check=True)
|
|
|
|
# Zertifikat im System registrieren
|
|
install_command = ["ssh", f"{user}@{host}",
|
|
f"sudo cp {dest}/myp.crt /usr/local/share/ca-certificates/ && sudo update-ca-certificates"]
|
|
subprocess.run(install_command, check=True)
|
|
|
|
ssl_logger.info(f"Zertifikate erfolgreich auf Raspberry Pi installiert: {host}:{dest}")
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
ssl_logger.error(f"Fehler beim Kopieren auf Raspberry Pi: {e}")
|
|
return False
|
|
except Exception as e:
|
|
ssl_logger.error(f"Unerwarteter Fehler beim Raspberry Pi-Transfer: {e}")
|
|
return False
|
|
|
|
def get_certificate_info(self) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Gibt Informationen über das aktuelle Zertifikat zurück
|
|
|
|
Returns:
|
|
Dict mit Zertifikatsinformationen oder None bei Fehler
|
|
"""
|
|
try:
|
|
if not os.path.exists(self.cert_path):
|
|
return None
|
|
|
|
with open(self.cert_path, "rb") as f:
|
|
cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
return {
|
|
"subject": cert.subject.rfc4514_string(),
|
|
"issuer": cert.issuer.rfc4514_string(),
|
|
"serial_number": str(cert.serial_number),
|
|
"not_valid_before": cert.not_valid_before.strftime('%d.%m.%Y %H:%M:%S'),
|
|
"not_valid_after": cert.not_valid_after.strftime('%d.%m.%Y %H:%M:%S'),
|
|
"is_expired": cert.not_valid_after < datetime.datetime.now(),
|
|
"days_until_expiry": (cert.not_valid_after - datetime.datetime.now()).days,
|
|
"fingerprint": cert.fingerprint(hashes.SHA256()).hex(),
|
|
"key_size": cert.public_key().key_size if hasattr(cert.public_key(), 'key_size') else None
|
|
}
|
|
|
|
except Exception as e:
|
|
ssl_logger.error(f"Fehler beim Lesen der Zertifikatsinformationen: {e}")
|
|
return None
|
|
|
|
def is_certificate_valid(self) -> bool:
|
|
"""
|
|
Prüft, ob das aktuelle Zertifikat gültig ist
|
|
|
|
Returns:
|
|
bool: True wenn gültig, False wenn ungültig oder nicht vorhanden
|
|
"""
|
|
cert_info = self.get_certificate_info()
|
|
if not cert_info:
|
|
return False
|
|
|
|
return not cert_info["is_expired"] and cert_info["days_until_expiry"] > 30
|
|
|
|
def regenerate_if_needed(self) -> bool:
|
|
"""
|
|
Regeneriert das Zertifikat, falls es ungültig oder bald abgelaufen ist
|
|
|
|
Returns:
|
|
bool: True wenn regeneriert oder bereits gültig, False bei Fehler
|
|
"""
|
|
if self.is_certificate_valid():
|
|
ssl_logger.info("Zertifikat ist noch gültig, keine Regenerierung notwendig")
|
|
return True
|
|
|
|
ssl_logger.info("Zertifikat ist ungültig oder läuft bald ab, regeneriere...")
|
|
return self.generate_mercedes_certificate()
|
|
|
|
# Globale Instanz für einfachen Zugriff
|
|
ssl_manager = SSLCertificateManager()
|
|
|
|
def generate_ssl_certificate() -> bool:
|
|
"""Wrapper-Funktion für Rückwärtskompatibilität"""
|
|
return ssl_manager.generate_mercedes_certificate()
|
|
|
|
def get_ssl_certificate_info() -> Optional[Dict[str, Any]]:
|
|
"""Wrapper-Funktion für Zertifikatsinformationen"""
|
|
return ssl_manager.get_certificate_info()
|
|
|
|
def ensure_valid_ssl_certificate() -> bool:
|
|
"""Stellt sicher, dass ein gültiges SSL-Zertifikat vorhanden ist"""
|
|
return ssl_manager.regenerate_if_needed() |