2025-03-12 12:33:05 +01:00

156 lines
5.0 KiB
Python
Executable File

from flask import request, jsonify, current_app
from app import db
from app.auth import bp
from app.models import User, Session
from datetime import datetime, timedelta
import time
import functools
import re
@bp.route('/register', methods=['POST'])
def register():
"""Register a new user"""
data = request.get_json() or {}
# Validate required fields
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
# Validate email format
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, data['email']):
return jsonify({'error': 'Invalid email format'}), 400
# Validate password strength (at least 8 characters)
if len(data['password']) < 8:
return jsonify({'error': 'Password must be at least 8 characters long'}), 400
# Check if username already exists
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
# Check if email already exists
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already exists'}), 400
# Create new user
user = User(
username=data['username'],
email=data['email'],
display_name=data.get('display_name', data['username']),
role='user' # Default role
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'display_name': user.display_name,
'role': user.role
}), 201
@bp.route('/login', methods=['POST'])
def login():
"""Login a user with username/email and password"""
data = request.get_json() or {}
# Validate required fields
if 'password' not in data:
return jsonify({'error': 'Password is required'}), 400
if 'username' not in data and 'email' not in data:
return jsonify({'error': 'Username or email is required'}), 400
# Find user by username or email
user = None
if 'username' in data:
user = User.query.filter_by(username=data['username']).first()
else:
user = User.query.filter_by(email=data['email']).first()
# Check if user exists and verify password
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid credentials'}), 401
# Create a session for the user
expires_at = int((datetime.utcnow() + timedelta(days=7)).timestamp())
session = Session(
user_id=user.id,
expires_at=expires_at
)
db.session.add(session)
db.session.commit()
# Generate JWT token
token = user.generate_token()
return jsonify({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'display_name': user.display_name,
'role': user.role
}
})
@bp.route('/logout', methods=['POST'])
def logout():
"""Log out a user by invalidating their session"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header required'}), 401
token = auth_header.split(' ')[1]
payload = User.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
# Delete all sessions for this user
Session.query.filter_by(user_id=payload['user_id']).delete()
db.session.commit()
return jsonify({'message': 'Successfully logged out'})
def token_required(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header required'}), 401
token = auth_header.split(' ')[1]
payload = User.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
# Check if user has an active session
user_id = payload['user_id']
current_time = int(time.time())
session = Session.query.filter_by(user_id=user_id).filter(Session.expires_at > current_time).first()
if not session:
return jsonify({'error': 'No active session found'}), 401
# Add user to request context
request.user_id = user_id
request.user_role = payload['role']
return f(*args, **kwargs)
return decorated
def admin_required(f):
@functools.wraps(f)
@token_required
def decorated(*args, **kwargs):
if request.user_role != 'admin':
return jsonify({'error': 'Admin privileges required'}), 403
return f(*args, **kwargs)
return decorated