feat: Implementiere Scheduler zur Überprüfung von Druckaufträgen und verbessere SSL-Zertifikat-Generierung unter Windows

This commit is contained in:
Till Tomczak 2025-05-26 09:12:14 +02:00
parent 327b0a6ea4
commit e3ebd219dd
4 changed files with 149 additions and 13 deletions

View File

@ -1322,6 +1322,52 @@ def init_app():
else:
app_logger.warning("SSL ist deaktiviert. Die Verbindung ist unverschlüsselt!")
# Scheduler-Funktion zur Überprüfung der Druckaufträge
def check_jobs():
"""
Überprüft alle aktiven Druckaufträge und führt entsprechende Aktionen aus.
Diese Funktion wird vom Scheduler regelmäßig aufgerufen.
"""
app_logger.info("Überprüfe Druckaufträge...")
try:
db_session = get_db_session()
# Aktive Jobs abrufen
active_jobs = db_session.query(Job).filter(
Job.status.in_(["scheduled", "running"])
).all()
now = datetime.now()
for job in active_jobs:
# Prüfen, ob der Job gestartet werden soll
if job.status == "scheduled" and job.start_at <= now:
app_logger.info(f"Starte Job {job.id} für Drucker {job.printer_id}")
job.status = "running"
# Steckdose einschalten (implementieren Sie diese Funktion)
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, True)
# Prüfen, ob der Job beendet werden soll
elif job.status == "running" and job.end_at <= now:
app_logger.info(f"Beende Job {job.id} für Drucker {job.printer_id}")
job.status = "finished"
job.actual_end_time = now
# Steckdose ausschalten
from utils.job_scheduler import toggle_plug
toggle_plug(job.printer_id, False)
db_session.commit()
db_session.close()
except Exception as e:
app_logger.error(f"Fehler bei der Überprüfung von Druckaufträgen: {str(e)}")
if 'db_session' in locals():
db_session.close()
# App starten
if __name__ == "__main__":
import argparse

View File

@ -96,21 +96,110 @@ def get_ssl_context():
if FLASK_DEBUG:
print("SSL-Zertifikate nicht gefunden. Erstelle selbstsignierte Zertifikate...")
# Pfad zum create_ssl_cert.sh-Skript ermitteln
script_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"install", "create_ssl_cert.sh")
# Ausführungsrechte setzen
if os.path.exists(script_path):
os.system(f"chmod +x {script_path}")
# Zertifikate erstellen mit spezifischem Hostnamen
os.system(f"{script_path} -c {SSL_CERT_PATH} -k {SSL_KEY_PATH} -h {SSL_HOSTNAME}")
# Auf Windows prüfen
import platform
if platform.system() == "Windows":
# Unter Windows verwenden wir OpenSSL direkt über subprocess
import subprocess
try:
# Erstelle ein selbstsigniertes Zertifikat mit OpenSSL
ssl_cmd = [
"openssl", "req", "-new", "-x509", "-newkey", "rsa:2048",
"-nodes", "-keyout", SSL_KEY_PATH, "-out", SSL_CERT_PATH,
"-days", "365", "-subj", f"/CN={SSL_HOSTNAME}"
]
result = subprocess.run(ssl_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Fehler beim Erstellen der SSL-Zertifikate: {result.stderr}")
# Fallback: Generiere einfache Schlüssel mit Python
create_simple_ssl_cert()
except Exception as e:
print(f"Fehler beim Ausführen von OpenSSL: {e}")
# Fallback: Generiere einfache Schlüssel mit Python
create_simple_ssl_cert()
else:
print(f"WARNUNG: SSL-Zertifikat-Generator nicht gefunden: {script_path}")
return None
# Unter Linux das vorhandene Skript verwenden
script_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"install", "create_ssl_cert.sh")
# Ausführungsrechte setzen
if os.path.exists(script_path):
os.system(f"chmod +x {script_path}")
# Zertifikate erstellen mit spezifischem Hostnamen
os.system(f"{script_path} -c {SSL_CERT_PATH} -k {SSL_KEY_PATH} -h {SSL_HOSTNAME}")
else:
print(f"WARNUNG: SSL-Zertifikat-Generator nicht gefunden: {script_path}")
return None
else:
print("WARNUNG: SSL-Zertifikate nicht gefunden und Nicht-Debug-Modus. SSL wird deaktiviert.")
return None
return (SSL_CERT_PATH, SSL_KEY_PATH)
return (SSL_CERT_PATH, SSL_KEY_PATH)
def create_simple_ssl_cert():
"""
Erstellt ein einfaches selbstsigniertes Zertifikat mit Python.
Dies ist ein Fallback, falls OpenSSL nicht verfügbar ist.
"""
try:
import datetime
import socket
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
# Generiere privaten Schlüssel
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Schreibe privaten Schlüssel
with open(SSL_KEY_PATH, "wb") as f:
f.write(private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption()
))
# Erstelle Zertifikat
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, SSL_HOSTNAME),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(SSL_HOSTNAME)]),
critical=False,
).sign(private_key, hashes.SHA256())
# Schreibe Zertifikat
with open(SSL_CERT_PATH, "wb") as f:
f.write(cert.public_bytes(Encoding.PEM))
print(f"Einfaches selbstsigniertes Zertifikat erstellt: {SSL_CERT_PATH}")
except ImportError:
print("Konnte keine SSL-Zertifikate erstellen: cryptography-Paket nicht installiert")
return None
except Exception as e:
print(f"Fehler beim Erstellen der SSL-Zertifikate: {e}")
return None

Binary file not shown.

View File

@ -19,6 +19,7 @@ bcrypt==4.1.2
# Sicherheit und Rate Limiting
redis==5.0.1
cryptography==42.0.8
# Entwicklung und Testing (optional)
pytest==7.4.3