""" Authentifizierungs-Blueprint für das 3D-Druck-Management-System Dieses Modul enthält alle Routen und Funktionen für die Benutzerauthentifizierung, einschließlich Login, Logout, OAuth-Callbacks und Passwort-Reset. """ import logging from datetime import datetime from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session from flask_login import login_user, logout_user, login_required, current_user from werkzeug.security import check_password_hash from models import User, get_db_session from utils.logging_config import get_logger # Blueprint erstellen auth_blueprint = Blueprint('auth', __name__, url_prefix='/auth') # Logger initialisieren auth_logger = get_logger("auth") @auth_blueprint.route("/login", methods=["GET", "POST"]) def login(): """Benutzeranmeldung mit E-Mail/Benutzername und Passwort""" if current_user.is_authenticated: return redirect(url_for("index")) error = None if request.method == "POST": # Debug-Logging für Request-Details auth_logger.debug(f"Login-Request: Content-Type={request.content_type}, Headers={dict(request.headers)}") # Erweiterte Content-Type-Erkennung für AJAX-Anfragen content_type = request.content_type or "" is_json_request = ( request.is_json or "application/json" in content_type or request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.headers.get('Accept', '').startswith('application/json') ) # Robuste Datenextraktion username = None password = None remember_me = False try: if is_json_request: # JSON-Request verarbeiten try: data = request.get_json(force=True) or {} username = data.get("username") or data.get("email") password = data.get("password") remember_me = data.get("remember_me", False) except Exception as json_error: auth_logger.warning(f"JSON-Parsing fehlgeschlagen: {str(json_error)}") # Fallback zu Form-Daten username = request.form.get("email") password = request.form.get("password") remember_me = request.form.get("remember_me") == "on" else: # Form-Request verarbeiten username = request.form.get("email") password = request.form.get("password") remember_me = request.form.get("remember_me") == "on" # Zusätzlicher Fallback für verschiedene Feldnamen if not username: username = request.form.get("username") or request.values.get("email") or request.values.get("username") if not password: password = request.form.get("password") or request.values.get("password") except Exception as extract_error: auth_logger.error(f"Fehler beim Extrahieren der Login-Daten: {str(extract_error)}") error = "Fehler beim Verarbeiten der Anmeldedaten." if is_json_request: return jsonify({"error": error, "success": False}), 400 if not username or not password: error = "E-Mail-Adresse und Passwort müssen angegeben werden." auth_logger.warning(f"Unvollständige Login-Daten: username={bool(username)}, password={bool(password)}") if is_json_request: return jsonify({"error": error, "success": False}), 400 else: db_session = None try: db_session = get_db_session() # Suche nach Benutzer mit übereinstimmendem Benutzernamen oder E-Mail user = db_session.query(User).filter( (User.username == username) | (User.email == username) ).first() if user and user.check_password(password): # Update last login timestamp user.update_last_login() db_session.commit() login_user(user, remember=remember_me) auth_logger.info(f"Benutzer {username} hat sich erfolgreich angemeldet") next_page = request.args.get("next") if is_json_request: return jsonify({ "success": True, "message": "Anmeldung erfolgreich", "redirect_url": next_page or url_for("index") }) else: if next_page: return redirect(next_page) return redirect(url_for("index")) else: error = "Ungültige E-Mail-Adresse oder Passwort." auth_logger.warning(f"Fehlgeschlagener Login-Versuch für Benutzer {username}") if is_json_request: return jsonify({"error": error, "success": False}), 401 except Exception as e: # Fehlerbehandlung für Datenbankprobleme error = "Anmeldefehler. Bitte versuchen Sie es später erneut." auth_logger.error(f"Fehler bei der Anmeldung: {str(e)}") if is_json_request: return jsonify({"error": error, "success": False}), 500 finally: # Sicherstellen, dass die Datenbankverbindung geschlossen wird if db_session: try: db_session.close() except Exception as close_error: auth_logger.error(f"Fehler beim Schließen der DB-Session: {str(close_error)}") return render_template("login.html", error=error) @auth_blueprint.route("/logout", methods=["GET", "POST"]) @login_required def logout(): """Meldet den Benutzer ab""" auth_logger.info(f"Benutzer {current_user.email} hat sich abgemeldet") logout_user() flash("Sie wurden erfolgreich abgemeldet.", "info") return redirect(url_for("auth.login")) @auth_blueprint.route("/reset-password-request", methods=["GET", "POST"]) def reset_password_request(): """Passwort-Reset anfordern (Placeholder)""" # TODO: Implement password reset functionality flash("Passwort-Reset-Funktionalität ist noch nicht implementiert.", "info") return redirect(url_for("auth.login")) @auth_blueprint.route("/api/login", methods=["POST"]) def api_login(): """API-Login-Endpunkt für Frontend""" try: data = request.get_json() if not data: return jsonify({"error": "Keine Daten erhalten"}), 400 username = data.get("username") password = data.get("password") remember_me = data.get("remember_me", False) if not username or not password: return jsonify({"error": "Benutzername und Passwort müssen angegeben werden"}), 400 db_session = get_db_session() user = db_session.query(User).filter( (User.username == username) | (User.email == username) ).first() if user and user.check_password(password): # Update last login timestamp user.update_last_login() db_session.commit() login_user(user, remember=remember_me) auth_logger.info(f"API-Login erfolgreich für Benutzer {username}") user_data = { "id": user.id, "username": user.username, "name": user.name, "email": user.email, "is_admin": user.is_admin } db_session.close() return jsonify({ "success": True, "user": user_data, "redirect_url": url_for("index") }) else: auth_logger.warning(f"Fehlgeschlagener API-Login für Benutzer {username}") db_session.close() return jsonify({"error": "Ungültiger Benutzername oder Passwort"}), 401 except Exception as e: auth_logger.error(f"Fehler beim API-Login: {str(e)}") return jsonify({"error": "Anmeldefehler. Bitte versuchen Sie es später erneut"}), 500 @auth_blueprint.route("/api/callback", methods=["GET", "POST"]) def api_callback(): """OAuth-Callback-Endpunkt für externe Authentifizierung""" try: # OAuth-Provider bestimmen provider = request.args.get('provider', 'github') if request.method == "GET": # Authorization Code aus URL-Parameter extrahieren code = request.args.get('code') state = request.args.get('state') error = request.args.get('error') if error: auth_logger.warning(f"OAuth-Fehler von {provider}: {error}") return jsonify({ "error": f"OAuth-Authentifizierung fehlgeschlagen: {error}", "redirect_url": url_for("auth.login") }), 400 if not code: auth_logger.warning(f"Kein Authorization Code von {provider} erhalten") return jsonify({ "error": "Kein Authorization Code erhalten", "redirect_url": url_for("auth.login") }), 400 # State-Parameter validieren (CSRF-Schutz) session_state = session.get('oauth_state') if not state or state != session_state: auth_logger.warning(f"Ungültiger State-Parameter von {provider}") return jsonify({ "error": "Ungültiger State-Parameter", "redirect_url": url_for("auth.login") }), 400 # OAuth-Token austauschen if provider == 'github': user_data = handle_github_callback(code) else: auth_logger.error(f"Unbekannter OAuth-Provider: {provider}") return jsonify({ "error": "Unbekannter OAuth-Provider", "redirect_url": url_for("auth.login") }), 400 if not user_data: return jsonify({ "error": "Fehler beim Abrufen der Benutzerdaten", "redirect_url": url_for("auth.login") }), 400 # Benutzer in Datenbank suchen oder erstellen db_session = get_db_session() try: user = db_session.query(User).filter( User.email == user_data['email'] ).first() if not user: # Neuen Benutzer erstellen user = User( username=user_data['username'], email=user_data['email'], name=user_data['name'], role="user", oauth_provider=provider, oauth_id=str(user_data['id']) ) # Zufälliges Passwort setzen (wird nicht verwendet) import secrets user.set_password(secrets.token_urlsafe(32)) db_session.add(user) db_session.commit() auth_logger.info(f"Neuer OAuth-Benutzer erstellt: {user.username} via {provider}") else: # Bestehenden Benutzer aktualisieren user.oauth_provider = provider user.oauth_id = str(user_data['id']) user.name = user_data['name'] user.updated_at = datetime.now() db_session.commit() auth_logger.info(f"OAuth-Benutzer aktualisiert: {user.username} via {provider}") # Update last login timestamp user.update_last_login() db_session.commit() login_user(user, remember=True) # Session-State löschen session.pop('oauth_state', None) response_data = { "success": True, "user": { "id": user.id, "username": user.username, "name": user.name, "email": user.email, "is_admin": user.is_admin }, "redirect_url": url_for("index") } db_session.close() return jsonify(response_data) except Exception as e: db_session.rollback() db_session.close() auth_logger.error(f"Datenbankfehler bei OAuth-Callback: {str(e)}") return jsonify({ "error": "Datenbankfehler bei der Benutzeranmeldung", "redirect_url": url_for("auth.login") }), 500 except Exception as e: auth_logger.error(f"Fehler im OAuth-Callback: {str(e)}") return jsonify({ "error": "OAuth-Callback-Fehler", "redirect_url": url_for("auth.login") }), 500 def handle_github_callback(code): """Verarbeite GitHub OAuth Callback""" # TODO: Implementiere GitHub OAuth Handling auth_logger.warning("GitHub OAuth Callback noch nicht implementiert") return None def get_github_user_data(access_token): """Lade Benutzerdaten von GitHub API""" # TODO: Implementiere GitHub API Abfrage auth_logger.warning("GitHub User Data Abfrage noch nicht implementiert") return None