2025-06-04 10:03:22 +02:00

336 lines
14 KiB
Python

"""
Authentifizierungs-Blueprint für das 3D-Druck-Management-System
Dieses Modul enthält alle Routen und Funktionen für die Benutzerauthentifizierung,
einschließlich Login, Logout, OAuth-Callbacks und Passwort-Reset.
"""
import logging
from datetime import datetime
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash
from models import User, get_db_session
from utils.logging_config import get_logger
# Blueprint erstellen
auth_blueprint = Blueprint('auth', __name__, url_prefix='/auth')
# Logger initialisieren
auth_logger = get_logger("auth")
@auth_blueprint.route("/login", methods=["GET", "POST"])
def login():
"""Benutzeranmeldung mit E-Mail/Benutzername und Passwort"""
if current_user.is_authenticated:
return redirect(url_for("index"))
error = None
if request.method == "POST":
# Debug-Logging für Request-Details
auth_logger.debug(f"Login-Request: Content-Type={request.content_type}, Headers={dict(request.headers)}")
# Erweiterte Content-Type-Erkennung für AJAX-Anfragen
content_type = request.content_type or ""
is_json_request = (
request.is_json or
"application/json" in content_type or
request.headers.get('X-Requested-With') == 'XMLHttpRequest' or
request.headers.get('Accept', '').startswith('application/json')
)
# Robuste Datenextraktion
username = None
password = None
remember_me = False
try:
if is_json_request:
# JSON-Request verarbeiten
try:
data = request.get_json(force=True) or {}
username = data.get("username") or data.get("email")
password = data.get("password")
remember_me = data.get("remember_me", False)
except Exception as json_error:
auth_logger.warning(f"JSON-Parsing fehlgeschlagen: {str(json_error)}")
# Fallback zu Form-Daten
username = request.form.get("email")
password = request.form.get("password")
remember_me = request.form.get("remember_me") == "on"
else:
# Form-Request verarbeiten
username = request.form.get("email")
password = request.form.get("password")
remember_me = request.form.get("remember_me") == "on"
# Zusätzlicher Fallback für verschiedene Feldnamen
if not username:
username = request.form.get("username") or request.values.get("email") or request.values.get("username")
if not password:
password = request.form.get("password") or request.values.get("password")
except Exception as extract_error:
auth_logger.error(f"Fehler beim Extrahieren der Login-Daten: {str(extract_error)}")
error = "Fehler beim Verarbeiten der Anmeldedaten."
if is_json_request:
return jsonify({"error": error, "success": False}), 400
if not username or not password:
error = "E-Mail-Adresse und Passwort müssen angegeben werden."
auth_logger.warning(f"Unvollständige Login-Daten: username={bool(username)}, password={bool(password)}")
if is_json_request:
return jsonify({"error": error, "success": False}), 400
else:
db_session = None
try:
db_session = get_db_session()
# Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"Benutzer {username} hat sich erfolgreich angemeldet")
next_page = request.args.get("next")
if is_json_request:
return jsonify({
"success": True,
"message": "Anmeldung erfolgreich",
"redirect_url": next_page or url_for("index")
})
else:
if next_page:
return redirect(next_page)
return redirect(url_for("index"))
else:
error = "Ungültige E-Mail-Adresse oder Passwort."
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}")
if is_json_request:
return jsonify({"error": error, "success": False}), 401
except Exception as e:
# Fehlerbehandlung für Datenbankprobleme
error = "Anmeldefehler. Bitte versuchen Sie es später erneut."
auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}")
if is_json_request:
return jsonify({"error": error, "success": False}), 500
finally:
# Sicherstellen, dass die Datenbankverbindung geschlossen wird
if db_session:
try:
db_session.close()
except Exception as close_error:
auth_logger.error(f"Fehler beim Schließen der DB-Session: {str(close_error)}")
return render_template("login.html", error=error)
@auth_blueprint.route("/logout", methods=["GET", "POST"])
@login_required
def logout():
"""Meldet den Benutzer ab"""
auth_logger.info(f"Benutzer {current_user.email} hat sich abgemeldet")
logout_user()
flash("Sie wurden erfolgreich abgemeldet.", "info")
return redirect(url_for("auth.login"))
@auth_blueprint.route("/reset-password-request", methods=["GET", "POST"])
def reset_password_request():
"""Passwort-Reset anfordern (Placeholder)"""
# TODO: Implement password reset functionality
flash("Passwort-Reset-Funktionalität ist noch nicht implementiert.", "info")
return redirect(url_for("auth.login"))
@auth_blueprint.route("/api/login", methods=["POST"])
def api_login():
"""API-Login-Endpunkt für Frontend"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten erhalten"}), 400
username = data.get("username")
password = data.get("password")
remember_me = data.get("remember_me", False)
if not username or not password:
return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400
db_session = get_db_session()
user = db_session.query(User).filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password):
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=remember_me)
auth_logger.info(f"API-Login erfolgreich für Benutzer {username}")
user_data = {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
}
db_session.close()
return jsonify({
"success": True,
"user": user_data,
"redirect_url": url_for("index")
})
else:
auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}")
db_session.close()
return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401
except Exception as e:
auth_logger.error(f"Fehler beim API-Login: {str(e)}")
return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500
@auth_blueprint.route("/api/callback", methods=["GET", "POST"])
def api_callback():
"""OAuth-Callback-Endpunkt für externe Authentifizierung"""
try:
# OAuth-Provider bestimmen
provider = request.args.get('provider', 'github')
if request.method == "GET":
# Authorization Code aus URL-Parameter extrahieren
code = request.args.get('code')
state = request.args.get('state')
error = request.args.get('error')
if error:
auth_logger.warning(f"OAuth-Fehler von {provider}: {error}")
return jsonify({
"error": f"OAuth-Authentifizierung fehlgeschlagen: {error}",
"redirect_url": url_for("auth.login")
}), 400
if not code:
auth_logger.warning(f"Kein Authorization Code von {provider} erhalten")
return jsonify({
"error": "Kein Authorization Code erhalten",
"redirect_url": url_for("auth.login")
}), 400
# State-Parameter validieren (CSRF-Schutz)
session_state = session.get('oauth_state')
if not state or state != session_state:
auth_logger.warning(f"Ungültiger State-Parameter von {provider}")
return jsonify({
"error": "Ungültiger State-Parameter",
"redirect_url": url_for("auth.login")
}), 400
# OAuth-Token austauschen
if provider == 'github':
user_data = handle_github_callback(code)
else:
auth_logger.error(f"Unbekannter OAuth-Provider: {provider}")
return jsonify({
"error": "Unbekannter OAuth-Provider",
"redirect_url": url_for("auth.login")
}), 400
if not user_data:
return jsonify({
"error": "Fehler beim Abrufen der Benutzerdaten",
"redirect_url": url_for("auth.login")
}), 400
# Benutzer in Datenbank suchen oder erstellen
db_session = get_db_session()
try:
user = db_session.query(User).filter(
User.email == user_data['email']
).first()
if not user:
# Neuen Benutzer erstellen
user = User(
username=user_data['username'],
email=user_data['email'],
name=user_data['name'],
role="user",
oauth_provider=provider,
oauth_id=str(user_data['id'])
)
# Zufälliges Passwort setzen (wird nicht verwendet)
import secrets
user.set_password(secrets.token_urlsafe(32))
db_session.add(user)
db_session.commit()
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
else:
# Bestehenden Benutzer aktualisieren
user.oauth_provider = provider
user.oauth_id = str(user_data['id'])
user.name = user_data['name']
user.updated_at = datetime.now()
db_session.commit()
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
# Update last login timestamp
user.update_last_login()
db_session.commit()
login_user(user, remember=True)
# Session-State löschen
session.pop('oauth_state', None)
response_data = {
"success": True,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"email": user.email,
"is_admin": user.is_admin
},
"redirect_url": url_for("index")
}
db_session.close()
return jsonify(response_data)
except Exception as e:
db_session.rollback()
db_session.close()
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
return jsonify({
"error": "Datenbankfehler bei der Benutzeranmeldung",
"redirect_url": url_for("auth.login")
}), 500
except Exception as e:
auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}")
return jsonify({
"error": "OAuth-Callback-Fehler",
"redirect_url": url_for("auth.login")
}), 500
def handle_github_callback(code):
"""Verarbeite GitHub OAuth Callback"""
# TODO: Implementiere GitHub OAuth Handling
auth_logger.warning("GitHub OAuth Callback noch nicht implementiert")
return None
def get_github_user_data(access_token):
"""Lade Benutzerdaten von GitHub API"""
# TODO: Implementiere GitHub API Abfrage
auth_logger.warning("GitHub User Data Abfrage noch nicht implementiert")
return None