336 lines
14 KiB
Python
336 lines
14 KiB
Python
"""
|
|
Authentifizierungs-Blueprint für das 3D-Druck-Management-System
|
|
|
|
Dieses Modul enthält alle Routen und Funktionen für die Benutzerauthentifizierung,
|
|
einschließlich Login, Logout, OAuth-Callbacks und Passwort-Reset.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session
|
|
from flask_login import login_user, logout_user, login_required, current_user
|
|
from werkzeug.security import check_password_hash
|
|
from models import User, get_db_session
|
|
from utils.logging_config import get_logger
|
|
|
|
# Blueprint erstellen
|
|
auth_blueprint = Blueprint('auth', __name__, url_prefix='/auth')
|
|
|
|
# Logger initialisieren
|
|
auth_logger = get_logger("auth")
|
|
|
|
@auth_blueprint.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
"""Benutzeranmeldung mit E-Mail/Benutzername und Passwort"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
|
|
error = None
|
|
if request.method == "POST":
|
|
# Debug-Logging für Request-Details
|
|
auth_logger.debug(f"Login-Request: Content-Type={request.content_type}, Headers={dict(request.headers)}")
|
|
|
|
# Erweiterte Content-Type-Erkennung für AJAX-Anfragen
|
|
content_type = request.content_type or ""
|
|
is_json_request = (
|
|
request.is_json or
|
|
"application/json" in content_type or
|
|
request.headers.get('X-Requested-With') == 'XMLHttpRequest' or
|
|
request.headers.get('Accept', '').startswith('application/json')
|
|
)
|
|
|
|
# Robuste Datenextraktion
|
|
username = None
|
|
password = None
|
|
remember_me = False
|
|
|
|
try:
|
|
if is_json_request:
|
|
# JSON-Request verarbeiten
|
|
try:
|
|
data = request.get_json(force=True) or {}
|
|
username = data.get("username") or data.get("email")
|
|
password = data.get("password")
|
|
remember_me = data.get("remember_me", False)
|
|
except Exception as json_error:
|
|
auth_logger.warning(f"JSON-Parsing fehlgeschlagen: {str(json_error)}")
|
|
# Fallback zu Form-Daten
|
|
username = request.form.get("email")
|
|
password = request.form.get("password")
|
|
remember_me = request.form.get("remember_me") == "on"
|
|
else:
|
|
# Form-Request verarbeiten
|
|
username = request.form.get("email")
|
|
password = request.form.get("password")
|
|
remember_me = request.form.get("remember_me") == "on"
|
|
|
|
# Zusätzlicher Fallback für verschiedene Feldnamen
|
|
if not username:
|
|
username = request.form.get("username") or request.values.get("email") or request.values.get("username")
|
|
if not password:
|
|
password = request.form.get("password") or request.values.get("password")
|
|
|
|
except Exception as extract_error:
|
|
auth_logger.error(f"Fehler beim Extrahieren der Login-Daten: {str(extract_error)}")
|
|
error = "Fehler beim Verarbeiten der Anmeldedaten."
|
|
if is_json_request:
|
|
return jsonify({"error": error, "success": False}), 400
|
|
|
|
if not username or not password:
|
|
error = "E-Mail-Adresse und Passwort müssen angegeben werden."
|
|
auth_logger.warning(f"Unvollständige Login-Daten: username={bool(username)}, password={bool(password)}")
|
|
if is_json_request:
|
|
return jsonify({"error": error, "success": False}), 400
|
|
else:
|
|
db_session = None
|
|
try:
|
|
db_session = get_db_session()
|
|
# Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail
|
|
user = db_session.query(User).filter(
|
|
(User.username == username) | (User.email == username)
|
|
).first()
|
|
|
|
if user and user.check_password(password):
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=remember_me)
|
|
auth_logger.info(f"Benutzer {username} hat sich erfolgreich angemeldet")
|
|
|
|
next_page = request.args.get("next")
|
|
|
|
if is_json_request:
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Anmeldung erfolgreich",
|
|
"redirect_url": next_page or url_for("index")
|
|
})
|
|
else:
|
|
if next_page:
|
|
return redirect(next_page)
|
|
return redirect(url_for("index"))
|
|
else:
|
|
error = "Ungültige E-Mail-Adresse oder Passwort."
|
|
auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}")
|
|
|
|
if is_json_request:
|
|
return jsonify({"error": error, "success": False}), 401
|
|
except Exception as e:
|
|
# Fehlerbehandlung für Datenbankprobleme
|
|
error = "Anmeldefehler. Bitte versuchen Sie es später erneut."
|
|
auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}")
|
|
if is_json_request:
|
|
return jsonify({"error": error, "success": False}), 500
|
|
finally:
|
|
# Sicherstellen, dass die Datenbankverbindung geschlossen wird
|
|
if db_session:
|
|
try:
|
|
db_session.close()
|
|
except Exception as close_error:
|
|
auth_logger.error(f"Fehler beim Schließen der DB-Session: {str(close_error)}")
|
|
|
|
return render_template("login.html", error=error)
|
|
|
|
@auth_blueprint.route("/logout", methods=["GET", "POST"])
|
|
@login_required
|
|
def logout():
|
|
"""Meldet den Benutzer ab"""
|
|
auth_logger.info(f"Benutzer {current_user.email} hat sich abgemeldet")
|
|
logout_user()
|
|
flash("Sie wurden erfolgreich abgemeldet.", "info")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
@auth_blueprint.route("/reset-password-request", methods=["GET", "POST"])
|
|
def reset_password_request():
|
|
"""Passwort-Reset anfordern (Placeholder)"""
|
|
# TODO: Implement password reset functionality
|
|
flash("Passwort-Reset-Funktionalität ist noch nicht implementiert.", "info")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
@auth_blueprint.route("/api/login", methods=["POST"])
|
|
def api_login():
|
|
"""API-Login-Endpunkt für Frontend"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "Keine Daten erhalten"}), 400
|
|
|
|
username = data.get("username")
|
|
password = data.get("password")
|
|
remember_me = data.get("remember_me", False)
|
|
|
|
if not username or not password:
|
|
return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400
|
|
|
|
db_session = get_db_session()
|
|
user = db_session.query(User).filter(
|
|
(User.username == username) | (User.email == username)
|
|
).first()
|
|
|
|
if user and user.check_password(password):
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=remember_me)
|
|
auth_logger.info(f"API-Login erfolgreich für Benutzer {username}")
|
|
|
|
user_data = {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"is_admin": user.is_admin
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify({
|
|
"success": True,
|
|
"user": user_data,
|
|
"redirect_url": url_for("index")
|
|
})
|
|
else:
|
|
auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}")
|
|
db_session.close()
|
|
return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler beim API-Login: {str(e)}")
|
|
return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500
|
|
|
|
@auth_blueprint.route("/api/callback", methods=["GET", "POST"])
|
|
def api_callback():
|
|
"""OAuth-Callback-Endpunkt für externe Authentifizierung"""
|
|
try:
|
|
# OAuth-Provider bestimmen
|
|
provider = request.args.get('provider', 'github')
|
|
|
|
if request.method == "GET":
|
|
# Authorization Code aus URL-Parameter extrahieren
|
|
code = request.args.get('code')
|
|
state = request.args.get('state')
|
|
error = request.args.get('error')
|
|
|
|
if error:
|
|
auth_logger.warning(f"OAuth-Fehler von {provider}: {error}")
|
|
return jsonify({
|
|
"error": f"OAuth-Authentifizierung fehlgeschlagen: {error}",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 400
|
|
|
|
if not code:
|
|
auth_logger.warning(f"Kein Authorization Code von {provider} erhalten")
|
|
return jsonify({
|
|
"error": "Kein Authorization Code erhalten",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 400
|
|
|
|
# State-Parameter validieren (CSRF-Schutz)
|
|
session_state = session.get('oauth_state')
|
|
if not state or state != session_state:
|
|
auth_logger.warning(f"Ungültiger State-Parameter von {provider}")
|
|
return jsonify({
|
|
"error": "Ungültiger State-Parameter",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 400
|
|
|
|
# OAuth-Token austauschen
|
|
if provider == 'github':
|
|
user_data = handle_github_callback(code)
|
|
else:
|
|
auth_logger.error(f"Unbekannter OAuth-Provider: {provider}")
|
|
return jsonify({
|
|
"error": "Unbekannter OAuth-Provider",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 400
|
|
|
|
if not user_data:
|
|
return jsonify({
|
|
"error": "Fehler beim Abrufen der Benutzerdaten",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 400
|
|
|
|
# Benutzer in Datenbank suchen oder erstellen
|
|
db_session = get_db_session()
|
|
try:
|
|
user = db_session.query(User).filter(
|
|
User.email == user_data['email']
|
|
).first()
|
|
|
|
if not user:
|
|
# Neuen Benutzer erstellen
|
|
user = User(
|
|
username=user_data['username'],
|
|
email=user_data['email'],
|
|
name=user_data['name'],
|
|
role="user",
|
|
oauth_provider=provider,
|
|
oauth_id=str(user_data['id'])
|
|
)
|
|
# Zufälliges Passwort setzen (wird nicht verwendet)
|
|
import secrets
|
|
user.set_password(secrets.token_urlsafe(32))
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}")
|
|
else:
|
|
# Bestehenden Benutzer aktualisieren
|
|
user.oauth_provider = provider
|
|
user.oauth_id = str(user_data['id'])
|
|
user.name = user_data['name']
|
|
user.updated_at = datetime.now()
|
|
db_session.commit()
|
|
auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}")
|
|
|
|
# Update last login timestamp
|
|
user.update_last_login()
|
|
db_session.commit()
|
|
|
|
login_user(user, remember=True)
|
|
|
|
# Session-State löschen
|
|
session.pop('oauth_state', None)
|
|
|
|
response_data = {
|
|
"success": True,
|
|
"user": {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"is_admin": user.is_admin
|
|
},
|
|
"redirect_url": url_for("index")
|
|
}
|
|
|
|
db_session.close()
|
|
return jsonify(response_data)
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
db_session.close()
|
|
auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}")
|
|
return jsonify({
|
|
"error": "Datenbankfehler bei der Benutzeranmeldung",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}")
|
|
return jsonify({
|
|
"error": "OAuth-Callback-Fehler",
|
|
"redirect_url": url_for("auth.login")
|
|
}), 500
|
|
|
|
def handle_github_callback(code):
|
|
"""Verarbeite GitHub OAuth Callback"""
|
|
# TODO: Implementiere GitHub OAuth Handling
|
|
auth_logger.warning("GitHub OAuth Callback noch nicht implementiert")
|
|
return None
|
|
|
|
def get_github_user_data(access_token):
|
|
"""Lade Benutzerdaten von GitHub API"""
|
|
# TODO: Implementiere GitHub API Abfrage
|
|
auth_logger.warning("GitHub User Data Abfrage noch nicht implementiert")
|
|
return None |