backend erstellt flask

This commit is contained in:
Till Tomczak 2025-03-06 22:52:35 +01:00
parent fb2d584874
commit b7fe9a036a
34 changed files with 1395 additions and 0 deletions

11
Dokumentation.md Normal file
View File

@ -0,0 +1,11 @@
# Dokumentation
Komplikationen:
- Netzwerkanbindung
- Ermitteln der Schnittstellen der Drucker
- Auswahl der Anbindung, Entwickeln eines Netzwerkkonzeptes
- Beschaffung der Hardware (beschränkte Auswahlmöglichkeiten)
- Welches Betriebssystem? OpenSuse, NixOS, Debian
- Frontend verstehen lernen
- Netzwerk einrichten, Frontend anbinden

View File

@ -0,0 +1,3 @@
SECRET_KEY=dev-secret-key-change-in-production
DATABASE_URL=sqlite:///app.db
JWT_SECRET=dev-jwt-secret-change-in-production

View File

@ -0,0 +1,3 @@
SECRET_KEY=change-me-to-a-real-secret-key
DATABASE_URL=sqlite:///app.db
JWT_SECRET=change-me-to-a-real-jwt-secret

View File

@ -0,0 +1,20 @@
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Run database migrations
RUN mkdir -p /app/instance
ENV FLASK_APP=wsgi.py
# Expose port
EXPOSE 5000
# Run the application
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "wsgi:app"]

View File

@ -0,0 +1,96 @@
# Reservation Platform Backend
This is the Flask backend for the 3D Printer Reservation Platform, providing a RESTful API for managing printers, reservations, and users.
## Features
- User authentication with email/password
- Role-based permission system (admin, user)
- Printer management
- Reservation system
- User management
## API Endpoints
### Authentication
- `POST /auth/register` - Register a new user
- `POST /auth/login` - Login with username/email and password
- `POST /auth/logout` - Log out a user by invalidating their session
### Printers
- `GET /api/printers` - Get all printers
- `GET /api/printers/<printer_id>` - Get a specific printer
- `POST /api/printers` - Create a new printer (admin only)
- `PUT /api/printers/<printer_id>` - Update a printer (admin only)
- `DELETE /api/printers/<printer_id>` - Delete a printer (admin only)
- `GET /api/printers/availability` - Get availability information for all printers
### Print Jobs
- `GET /api/jobs` - Get jobs for the current user or all jobs for admin
- `GET /api/jobs/<job_id>` - Get a specific job
- `POST /api/jobs` - Create a new print job (reserve a printer)
- `PUT /api/jobs/<job_id>` - Update a job
- `DELETE /api/jobs/<job_id>` - Delete a job (cancel reservation)
- `GET /api/jobs/<job_id>/remaining-time` - Get remaining time for a job (public endpoint)
### Users
- `GET /api/users` - Get all users (admin only)
- `GET /api/users/<user_id>` - Get a specific user (admin only)
- `PUT /api/users/<user_id>` - Update a user (admin only)
- `DELETE /api/users/<user_id>` - Delete a user (admin only)
- `GET /api/me` - Get the current user's profile
- `PUT /api/me` - Update the current user's profile
## Installation
### Prerequisites
- Python 3.11 or higher
- pip
### Setup
1. Clone the repository
```bash
git clone https://github.com/your-repo/reservation-platform.git
cd reservation-platform/packages/flask-backend
```
2. Install dependencies
```bash
pip install -r requirements.txt
```
3. Create a `.env` file with the following variables:
```
SECRET_KEY=your-secret-key
DATABASE_URL=sqlite:///app.db
JWT_SECRET=your-jwt-secret
```
4. Initialize the database
```bash
flask db upgrade
python scripts/init_db.py
```
5. Run the development server
```bash
python wsgi.py
```
## Docker Deployment
1. Build and run with Docker Compose
```bash
docker-compose up -d
```
## Development
### Running Migrations
To create a new migration after updating models:
```bash
flask db migrate -m "Description of changes"
flask db upgrade
```

View File

@ -0,0 +1,32 @@
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_cors import CORS
from config import Config
db = SQLAlchemy()
migrate = Migrate()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
CORS(app)
# Register blueprints
from app.api import bp as api_bp
app.register_blueprint(api_bp, url_prefix='/api')
from app.auth import bp as auth_bp
app.register_blueprint(auth_bp, url_prefix='/auth')
@app.route('/health')
def health_check():
return {'status': 'ok'}
return app
from app import models

View File

@ -0,0 +1,5 @@
from flask import Blueprint
bp = Blueprint('api', __name__)
from app.api import printers, jobs, users

View File

@ -0,0 +1,219 @@
from flask import request, jsonify
from app import db
from app.api import bp
from app.models import PrintJob, Printer, User
from app.auth.routes import token_required, admin_required
from datetime import datetime, timedelta
@bp.route('/jobs', methods=['GET'])
@token_required
def get_jobs():
"""Get jobs for the current user or all jobs for admin"""
is_admin = request.user_role == 'admin'
user_id = request.user_id
# Parse query parameters
status = request.args.get('status') # active, upcoming, completed, aborted, all
printer_id = request.args.get('printer_id')
# Base query
query = PrintJob.query
# Filter by user unless admin
if not is_admin:
query = query.filter_by(user_id=user_id)
# Filter by printer if provided
if printer_id:
query = query.filter_by(printer_id=printer_id)
# Apply status filter
now = datetime.utcnow()
if status == 'active':
query = query.filter_by(aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now)
elif status == 'upcoming':
query = query.filter_by(aborted=False) \
.filter(PrintJob.start_at > now)
elif status == 'completed':
query = query.filter_by(aborted=False) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) <= now)
elif status == 'aborted':
query = query.filter_by(aborted=True)
# Order by start time, most recent first
query = query.order_by(PrintJob.start_at.desc())
# Execute query
jobs = query.all()
result = [job.to_dict() for job in jobs]
return jsonify(result)
@bp.route('/jobs/<job_id>', methods=['GET'])
@token_required
def get_job(job_id):
"""Get a specific job"""
job = PrintJob.query.get_or_404(job_id)
# Check permissions
is_admin = request.user_role == 'admin'
user_id = request.user_id
if not is_admin and job.user_id != user_id:
return jsonify({'error': 'Not authorized to view this job'}), 403
return jsonify(job.to_dict())
@bp.route('/jobs', methods=['POST'])
@token_required
def create_job():
"""Create a new print job (reserve a printer)"""
data = request.get_json() or {}
required_fields = ['printer_id', 'start_at', 'duration_in_minutes']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
# Validate printer
printer = Printer.query.get(data['printer_id'])
if not printer:
return jsonify({'error': 'Printer not found'}), 404
if printer.status != 0: # Not operational
return jsonify({'error': 'Printer is not operational'}), 400
# Parse start time
try:
start_at = datetime.fromisoformat(data['start_at'].replace('Z', '+00:00'))
except ValueError:
return jsonify({'error': 'Invalid start_at format'}), 400
# Validate duration
try:
duration = int(data['duration_in_minutes'])
if duration <= 0 or duration > 480: # Max 8 hours
return jsonify({'error': 'Invalid duration (must be between 1 and 480 minutes)'}), 400
except ValueError:
return jsonify({'error': 'Duration must be a number'}), 400
end_at = start_at + timedelta(minutes=duration)
# Check if the printer is available during the requested time
conflicting_jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(
(PrintJob.start_at < end_at) &
(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > start_at)
) \
.all()
if conflicting_jobs:
return jsonify({'error': 'Printer is not available during the requested time'}), 409
# Create job
job = PrintJob(
printer_id=data['printer_id'],
user_id=request.user_id,
start_at=start_at,
duration_in_minutes=duration,
comments=data.get('comments', '')
)
db.session.add(job)
db.session.commit()
return jsonify(job.to_dict()), 201
@bp.route('/jobs/<job_id>', methods=['PUT'])
@token_required
def update_job(job_id):
"""Update a job"""
job = PrintJob.query.get_or_404(job_id)
# Check permissions
is_admin = request.user_role == 'admin'
user_id = request.user_id
if not is_admin and job.user_id != user_id:
return jsonify({'error': 'Not authorized to update this job'}), 403
data = request.get_json() or {}
# Only allow certain fields to be updated
if 'comments' in data:
job.comments = data['comments']
# Admin or owner can abort a job
if 'aborted' in data and data['aborted'] and not job.aborted:
job.aborted = True
job.abort_reason = data.get('abort_reason', '')
# Admin or owner can extend a job if it's active
now = datetime.utcnow()
is_active = (not job.aborted and
job.start_at <= now and
job.get_end_time() > now)
if 'extend_minutes' in data and is_active:
try:
extend_minutes = int(data['extend_minutes'])
if extend_minutes <= 0 or extend_minutes > 120: # Max extend 2 hours
return jsonify({'error': 'Invalid extension (must be between 1 and 120 minutes)'}), 400
new_end_time = job.get_end_time() + timedelta(minutes=extend_minutes)
# Check for conflicts with the extension
conflicting_jobs = PrintJob.query.filter_by(printer_id=job.printer_id, aborted=False) \
.filter(PrintJob.id != job.id) \
.filter(PrintJob.start_at < new_end_time) \
.filter(PrintJob.start_at > job.get_end_time()) \
.all()
if conflicting_jobs:
return jsonify({'error': 'Cannot extend job due to conflicts with other reservations'}), 409
job.duration_in_minutes += extend_minutes
except ValueError:
return jsonify({'error': 'Extend minutes must be a number'}), 400
db.session.commit()
return jsonify(job.to_dict())
@bp.route('/jobs/<job_id>', methods=['DELETE'])
@token_required
def delete_job(job_id):
"""Delete a job (cancel reservation)"""
job = PrintJob.query.get_or_404(job_id)
# Check permissions
is_admin = request.user_role == 'admin'
user_id = request.user_id
if not is_admin and job.user_id != user_id:
return jsonify({'error': 'Not authorized to delete this job'}), 403
# Only allow deletion of upcoming jobs
now = datetime.utcnow()
if job.start_at <= now and not is_admin:
return jsonify({'error': 'Cannot delete an active or completed job'}), 400
db.session.delete(job)
db.session.commit()
return jsonify({'message': 'Job deleted successfully'})
@bp.route('/jobs/<job_id>/remaining-time', methods=['GET'])
def get_remaining_time(job_id):
"""Get remaining time for a job (public endpoint)"""
job = PrintJob.query.get_or_404(job_id)
remaining_seconds = job.get_remaining_time()
return jsonify({
'job_id': job.id,
'remaining_seconds': remaining_seconds,
'is_active': job.is_active()
})

View File

@ -0,0 +1,177 @@
from flask import request, jsonify
from app import db
from app.api import bp
from app.models import Printer, PrintJob
from app.auth.routes import token_required, admin_required
from datetime import datetime
@bp.route('/printers', methods=['GET'])
def get_printers():
"""Get all printers"""
printers = Printer.query.all()
result = []
for printer in printers:
# Get active job for the printer if any
now = datetime.utcnow()
active_job = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now) \
.first()
printer_data = {
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status,
'is_available': printer.status == 0 and active_job is None,
'active_job': active_job.to_dict() if active_job else None
}
result.append(printer_data)
return jsonify(result)
@bp.route('/printers/<printer_id>', methods=['GET'])
def get_printer(printer_id):
"""Get a specific printer"""
printer = Printer.query.get_or_404(printer_id)
# Get active job for the printer if any
now = datetime.utcnow()
active_job = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now) \
.first()
# Get upcoming jobs
upcoming_jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at > now) \
.order_by(PrintJob.start_at) \
.limit(5) \
.all()
result = {
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status,
'is_available': printer.status == 0 and active_job is None,
'active_job': active_job.to_dict() if active_job else None,
'upcoming_jobs': [job.to_dict() for job in upcoming_jobs]
}
return jsonify(result)
@bp.route('/printers', methods=['POST'])
@admin_required
def create_printer():
"""Create a new printer (admin only)"""
data = request.get_json() or {}
required_fields = ['name', 'description']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
printer = Printer(
name=data['name'],
description=data['description'],
status=data.get('status', 0)
)
db.session.add(printer)
db.session.commit()
return jsonify({
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status
}), 201
@bp.route('/printers/<printer_id>', methods=['PUT'])
@admin_required
def update_printer(printer_id):
"""Update a printer (admin only)"""
printer = Printer.query.get_or_404(printer_id)
data = request.get_json() or {}
if 'name' in data:
printer.name = data['name']
if 'description' in data:
printer.description = data['description']
if 'status' in data:
printer.status = data['status']
db.session.commit()
return jsonify({
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status
})
@bp.route('/printers/<printer_id>', methods=['DELETE'])
@admin_required
def delete_printer(printer_id):
"""Delete a printer (admin only)"""
printer = Printer.query.get_or_404(printer_id)
# Check if the printer has active jobs
now = datetime.utcnow()
active_jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now) \
.all()
if active_jobs:
return jsonify({'error': 'Cannot delete printer with active jobs'}), 400
db.session.delete(printer)
db.session.commit()
return jsonify({'message': 'Printer deleted successfully'})
@bp.route('/printers/availability', methods=['GET'])
def get_availability():
"""Get availability information for all printers"""
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if not start_date or not end_date:
return jsonify({'error': 'start_date and end_date are required'}), 400
try:
start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
return jsonify({'error': 'Invalid date format'}), 400
if start >= end:
return jsonify({'error': 'start_date must be before end_date'}), 400
printers = Printer.query.all()
result = []
for printer in printers:
# Get all jobs for this printer in the date range
jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(
(PrintJob.start_at <= end) &
(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) >= start)
) \
.order_by(PrintJob.start_at) \
.all()
# Convert to availability slots
availability = {
'printer_id': printer.id,
'printer_name': printer.name,
'status': printer.status,
'jobs': [job.to_dict() for job in jobs]
}
result.append(availability)
return jsonify(result)

View File

@ -0,0 +1,139 @@
from flask import request, jsonify
from app import db
from app.api import bp
from app.models import User, PrintJob
from app.auth.routes import admin_required, token_required
@bp.route('/users', methods=['GET'])
@admin_required
def get_users():
"""Get all users (admin only)"""
users = User.query.all()
result = []
for user in users:
# Count jobs
total_jobs = PrintJob.query.filter_by(user_id=user.id).count()
active_jobs = PrintJob.query.filter_by(user_id=user.id, aborted=False).count()
user_data = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role,
'job_count': total_jobs,
'active_job_count': active_jobs
}
result.append(user_data)
return jsonify(result)
@bp.route('/users/<user_id>', methods=['GET'])
@admin_required
def get_user(user_id):
"""Get a specific user (admin only)"""
user = User.query.get_or_404(user_id)
# Count jobs
total_jobs = PrintJob.query.filter_by(user_id=user.id).count()
active_jobs = PrintJob.query.filter_by(user_id=user.id, aborted=False).count()
result = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role,
'job_count': total_jobs,
'active_job_count': active_jobs
}
return jsonify(result)
@bp.route('/users/<user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""Update a user (admin only)"""
user = User.query.get_or_404(user_id)
data = request.get_json() or {}
if 'role' in data and data['role'] in ['admin', 'user', 'guest']:
user.role = data['role']
if 'display_name' in data:
user.display_name = data['display_name']
db.session.commit()
return jsonify({
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role
})
@bp.route('/users/<user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
"""Delete a user (admin only)"""
user = User.query.get_or_404(user_id)
# Check if user has active jobs
active_jobs = PrintJob.query.filter_by(user_id=user.id, aborted=False).first()
if active_jobs:
return jsonify({'error': 'Cannot delete user with active jobs'}), 400
db.session.delete(user)
db.session.commit()
return jsonify({'message': 'User deleted successfully'})
@bp.route('/me', methods=['GET'])
@token_required
def get_current_user():
"""Get the current user's profile"""
user = User.query.get(request.user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
result = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role
}
return jsonify(result)
@bp.route('/me', methods=['PUT'])
@token_required
def update_current_user():
"""Update the current user's profile"""
user = User.query.get(request.user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
data = request.get_json() or {}
if 'display_name' in data:
user.display_name = data['display_name']
db.session.commit()
result = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role
}
return jsonify(result)

View File

@ -0,0 +1,5 @@
from flask import Blueprint
bp = Blueprint('auth', __name__)
from app.auth import routes

View File

@ -0,0 +1,156 @@
from flask import request, jsonify, current_app
from app import db
from app.auth import bp
from app.models import User, Session
from datetime import datetime, timedelta
import time
import functools
import re
@bp.route('/register', methods=['POST'])
def register():
"""Register a new user"""
data = request.get_json() or {}
# Validate required fields
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
# Validate email format
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, data['email']):
return jsonify({'error': 'Invalid email format'}), 400
# Validate password strength (at least 8 characters)
if len(data['password']) < 8:
return jsonify({'error': 'Password must be at least 8 characters long'}), 400
# Check if username already exists
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
# Check if email already exists
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already exists'}), 400
# Create new user
user = User(
username=data['username'],
email=data['email'],
display_name=data.get('display_name', data['username']),
role='user' # Default role
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'display_name': user.display_name,
'role': user.role
}), 201
@bp.route('/login', methods=['POST'])
def login():
"""Login a user with username/email and password"""
data = request.get_json() or {}
# Validate required fields
if 'password' not in data:
return jsonify({'error': 'Password is required'}), 400
if 'username' not in data and 'email' not in data:
return jsonify({'error': 'Username or email is required'}), 400
# Find user by username or email
user = None
if 'username' in data:
user = User.query.filter_by(username=data['username']).first()
else:
user = User.query.filter_by(email=data['email']).first()
# Check if user exists and verify password
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid credentials'}), 401
# Create a session for the user
expires_at = int((datetime.utcnow() + timedelta(days=7)).timestamp())
session = Session(
user_id=user.id,
expires_at=expires_at
)
db.session.add(session)
db.session.commit()
# Generate JWT token
token = user.generate_token()
return jsonify({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'display_name': user.display_name,
'role': user.role
}
})
@bp.route('/logout', methods=['POST'])
def logout():
"""Log out a user by invalidating their session"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header required'}), 401
token = auth_header.split(' ')[1]
payload = User.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
# Delete all sessions for this user
Session.query.filter_by(user_id=payload['user_id']).delete()
db.session.commit()
return jsonify({'message': 'Successfully logged out'})
def token_required(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header required'}), 401
token = auth_header.split(' ')[1]
payload = User.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
# Check if user has an active session
user_id = payload['user_id']
current_time = int(time.time())
session = Session.query.filter_by(user_id=user_id).filter(Session.expires_at > current_time).first()
if not session:
return jsonify({'error': 'No active session found'}), 401
# Add user to request context
request.user_id = user_id
request.user_role = payload['role']
return f(*args, **kwargs)
return decorated
def admin_required(f):
@functools.wraps(f)
@token_required
def decorated(*args, **kwargs):
if request.user_role != 'admin':
return jsonify({'error': 'Admin privileges required'}), 403
return f(*args, **kwargs)
return decorated

View File

@ -0,0 +1,124 @@
from app import db
import uuid
from datetime import datetime, timedelta
import jwt
from config import Config
import bcrypt
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
username = db.Column(db.String(64), index=True, unique=True, nullable=False)
display_name = db.Column(db.String(120))
email = db.Column(db.String(120), index=True, unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
role = db.Column(db.String(20), default='user')
print_jobs = db.relationship('PrintJob', backref='user', lazy='dynamic', cascade='all, delete-orphan')
sessions = db.relationship('Session', backref='user', lazy='dynamic', cascade='all, delete-orphan')
def set_password(self, password):
"""Hash and set the user's password"""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
def check_password(self, password):
"""Check if the provided password matches the stored hash"""
password_bytes = password.encode('utf-8')
stored_hash = self.password_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, stored_hash)
def generate_token(self):
"""Generate a JWT token for this user"""
payload = {
'user_id': self.id,
'username': self.username,
'email': self.email,
'role': self.role,
'exp': datetime.utcnow() + timedelta(seconds=Config.JWT_ACCESS_TOKEN_EXPIRES)
}
return jwt.encode(payload, Config.JWT_SECRET, algorithm='HS256')
@staticmethod
def verify_token(token):
"""Verify and decode a JWT token"""
try:
payload = jwt.decode(token, Config.JWT_SECRET, algorithms=['HS256'])
return payload
except:
return None
class Session(db.Model):
__tablename__ = 'session'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
user_id = db.Column(db.String(36), db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
expires_at = db.Column(db.Integer, nullable=False)
class Printer(db.Model):
__tablename__ = 'printer'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name = db.Column(db.String(120), nullable=False)
description = db.Column(db.Text, nullable=False)
status = db.Column(db.Integer, nullable=False, default=0) # 0: OPERATIONAL, 1: OUT_OF_ORDER
print_jobs = db.relationship('PrintJob', backref='printer', lazy='dynamic', cascade='all, delete-orphan')
class PrintJob(db.Model):
__tablename__ = 'printJob'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
printer_id = db.Column(db.String(36), db.ForeignKey('printer.id', ondelete='CASCADE'), nullable=False)
user_id = db.Column(db.String(36), db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
start_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
duration_in_minutes = db.Column(db.Integer, nullable=False)
comments = db.Column(db.Text)
aborted = db.Column(db.Boolean, nullable=False, default=False)
abort_reason = db.Column(db.Text)
def get_end_time(self):
return self.start_at + timedelta(minutes=self.duration_in_minutes)
def is_active(self):
now = datetime.utcnow()
return (not self.aborted and
self.start_at <= now and
now < self.get_end_time())
def get_remaining_time(self):
if self.aborted:
return 0
now = datetime.utcnow()
if now < self.start_at:
# Job hasn't started yet
return self.duration_in_minutes * 60
end_time = self.get_end_time()
if now >= end_time:
# Job has ended
return 0
# Job is ongoing
remaining_seconds = (end_time - now).total_seconds()
return int(remaining_seconds)
def to_dict(self):
return {
'id': self.id,
'printer_id': self.printer_id,
'user_id': self.user_id,
'start_at': self.start_at.isoformat(),
'duration_in_minutes': self.duration_in_minutes,
'comments': self.comments,
'aborted': self.aborted,
'abort_reason': self.abort_reason,
'remaining_time': self.get_remaining_time(),
'is_active': self.is_active()
}

View File

@ -0,0 +1,13 @@
import os
from dotenv import load_dotenv
basedir = os.path.abspath(os.path.dirname(__file__))
load_dotenv(os.path.join(basedir, '.env'))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET = os.environ.get('JWT_SECRET') or 'jwt-secret-key'
JWT_ACCESS_TOKEN_EXPIRES = 3600 # 1 hour in seconds

View File

@ -0,0 +1,20 @@
version: '3.8'
services:
flask-backend:
build:
context: .
dockerfile: Dockerfile
restart: always
ports:
- "5000:5000"
environment:
- SECRET_KEY=your-secret-key
- DATABASE_URL=sqlite:///app.db
- JWT_SECRET=your-jwt-secret
volumes:
- ./instance:/app/instance
command: >
bash -c "python -m flask db upgrade &&
python scripts/init_db.py &&
gunicorn --bind 0.0.0.0:5000 wsgi:app"

View File

@ -0,0 +1,89 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,91 @@
from __future__ import with_statement
import logging
from logging.config import fileConfig
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option(
'sqlalchemy.url',
str(current_app.extensions['migrate'].db.get_engine().url).replace(
'%', '%%'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
connectable = current_app.extensions['migrate'].db.get_engine()
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,75 @@
"""Initial migration
Revision ID: initial_migration
Revises:
Create Date: 2025-03-06 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'initial_migration'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create user table
op.create_table('user',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('username', sa.String(length=64), nullable=False),
sa.Column('display_name', sa.String(length=120), nullable=True),
sa.Column('email', sa.String(length=120), nullable=False),
sa.Column('password_hash', sa.String(length=128), nullable=False),
sa.Column('role', sa.String(length=20), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_username'), 'user', ['username'], unique=True)
# Create session table
op.create_table('session',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('user_id', sa.String(length=36), nullable=False),
sa.Column('expires_at', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
# Create printer table
op.create_table('printer',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=120), nullable=False),
sa.Column('description', sa.Text(), nullable=False),
sa.Column('status', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# Create printJob table
op.create_table('printJob',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('printer_id', sa.String(length=36), nullable=False),
sa.Column('user_id', sa.String(length=36), nullable=False),
sa.Column('start_at', sa.DateTime(), nullable=False),
sa.Column('duration_in_minutes', sa.Integer(), nullable=False),
sa.Column('comments', sa.Text(), nullable=True),
sa.Column('aborted', sa.Boolean(), nullable=False),
sa.Column('abort_reason', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['printer_id'], ['printer.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
def downgrade():
op.drop_table('printJob')
op.drop_table('printer')
op.drop_table('session')
op.drop_index(op.f('ix_user_username'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')

View File

@ -0,0 +1,9 @@
Flask==2.3.3
Flask-SQLAlchemy==3.1.1
Flask-Migrate==4.0.5
Flask-CORS==4.0.0
python-dotenv==1.0.0
SQLAlchemy==2.0.25
pyjwt==2.8.0
bcrypt==4.1.2
gunicorn==21.2.0

23
packages/flask-backend/run.sh Executable file
View File

@ -0,0 +1,23 @@
#!/bin/bash
# Initialize virtual environment if it doesn't exist
if [ ! -d "venv" ]; then
echo "Creating virtual environment..."
python3 -m venv venv
fi
# Activate virtual environment
source venv/bin/activate
# Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt
# Initialize database
echo "Initializing database..."
flask db upgrade
python scripts/init_db.py
# Run the application
echo "Starting Flask application..."
python wsgi.py

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python
from app import create_app, db
from app.models import User, Printer
import uuid
def init_db():
app = create_app()
with app.app_context():
# Create tables
db.create_all()
# Check if we already have an admin user
admin = User.query.filter_by(role='admin').first()
if not admin:
# Create admin user
admin = User(
id=str(uuid.uuid4()),
username='admin',
display_name='Administrator',
email='admin@example.com',
role='admin'
)
admin.set_password('admin123') # Default password, change in production!
db.session.add(admin)
print("Created admin user with username 'admin' and password 'admin123'")
# Check if we have any printers
printer_count = Printer.query.count()
if printer_count == 0:
# Create sample printers
printers = [
Printer(
name='Printer 1',
description='3D Printer for general use',
status=0 # OPERATIONAL
),
Printer(
name='Printer 2',
description='High resolution printer for detailed work',
status=0 # OPERATIONAL
),
Printer(
name='Printer 3',
description='Large format printer for big projects',
status=0 # OPERATIONAL
)
]
db.session.add_all(printers)
print("Created sample printers")
db.session.commit()
print("Database initialized successfully!")
if __name__ == '__main__':
init_db()

View File

@ -0,0 +1,6 @@
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)