ein-dateien backend erstellt

This commit is contained in:
2025-03-07 20:58:34 +01:00
parent 68a1910bdc
commit 55936c81f0
45 changed files with 2070 additions and 0 deletions

View File

@ -1 +0,0 @@
PRINTER_IPS=192.168.0.10,192.168.0.11,192.168.0.12

View File

@ -1,106 +0,0 @@
# 🖨️ 3D-Drucker Status API 📊
Willkommen beim Blueprint der 3D-Drucker Status API! Diese API ermöglicht es Ihnen, den Status mehrerer über LAN verbundener 3D-Drucker zu überwachen und Druckaufträge an sie zu senden.
## 🌟 Funktionen
- 🔍 Abrufen des Status von 3D-Druckern, einschließlich ihres aktuellen Status, Fortschrittes und Temperatur.
- 📥 Senden von Druckaufträgen an verfügbare 3D-Drucker.
- 💾 Speichern und Aktualisieren des Status jedes Druckers in einer SQLite-Datenbank.
## 🛠Verwendete Technologien
- 🐍 Python
- 🌶️ Flask
- 🗄️ SQLite
- 🌐 HTTP-Anfragen
## 📋 Verordnungen
Bevor Sie die API starten, stellen Sie sicher, dass Sie folgendes haben:
- Python 3.x installiert
- Flask und python-dotenv-Bibliotheken installiert (`pip install flask python-dotenv`)
- Eine Liste von IP-Adressen der 3D-Drucker, die Sie überwachen möchten
## 🚀 Erste Schritte
1. Klonen Sie das Repository:
```
git clone https://git.i.mercedes-benz.com/TBA-Berlin-FI/MYP
```
2. Installieren Sie die erforderlichen Abhängigkeiten:
```
pip install -r requirements.txt
```
3. Erstellen Sie eine `.env`-Datei im Projektverzeichnis und geben Sie die IP-Adressen Ihrer 3D-Drucker an:
```
PRINTER_IPS=192.168.0.10,192.168.0.11,192.168.0.12
```
4. Starten Sie das Skript, um die SQLite-Datenbank zu erstellen:
```
python create_db.py
```
5. Starten Sie den API-Server:
```
python app.py
```
6. Die API ist unter `http://localhost:5000` erreichbar.
## 📡 API-Endpunkte
- `GET /printer_status`: Rufen Sie den Status aller 3D-Drucker ab.
- `POST /print_job`: Senden Sie einen Druckauftrag an einen bestimmten 3D-Drucker.
## 📝 API-Nutzung
### Druckerstatus abrufen
Senden Sie eine `GET`-Anfrage an `/printer_status`, um den Status aller 3D-Drucker abzurufen.
Antwort:
```json
[
{
"ip": "192.168.0.10",
"status": "frei",
"progress": 0,
"temperature": 25
},
{
"ip": "192.168.0.11",
"status": "besetzt",
"progress": 50,
"temperature": 180
},
...
]
```
### Druckauftrag senden
Senden Sie eine `POST`-Anfrage an `/print_job` mit der folgenden JSON-Last, um einen Druckauftrag an einen bestimmten 3D-Drucker zu senden:
```json
{
"printer_ip": "192.168.0.10",
"file_url": "http://example.com/print_file.gcode"
}
```
Antwort:
```json
{
"message": "Druckauftrag gestartet"
}
```
## 📄 Lizenz
- --> Noch nicht verfügbar

View File

@ -1,25 +0,0 @@
import sqlite3
from dotenv import load_dotenv
import os
load_dotenv()
printers = os.getenv('PRINTER_IPS').split(',')
def create_db():
conn = sqlite3.connect('printers.db')
c = conn.cursor()
# Tabelle 'printers' erstellen, falls sie nicht existiert
c.execute('''CREATE TABLE IF NOT EXISTS printers
(ip TEXT PRIMARY KEY, status TEXT)''')
# Drucker-IPs in die Tabelle einfügen, falls sie noch nicht vorhanden sind
for printer_ip in printers:
c.execute("INSERT OR IGNORE INTO printers (ip, status) VALUES (?, ?)", (printer_ip, "frei"))
conn.commit()
conn.close()
print("Datenbank 'printers.db' erfolgreich erstellt.")
if __name__ == '__main__':
create_db()

View File

@ -1,3 +0,0 @@
flask==2.1.0
requests==2.25.1
python-dotenv==0.20.0

View File

@ -1,94 +0,0 @@
from flask import Flask, jsonify, request
import requests
import sqlite3
from dotenv import load_dotenv
import os
load_dotenv()
printers = os.getenv('PRINTER_IPS').split(',')
app = Flask(__name__)
# SQLite-Datenbank initialisieren
def init_db():
conn = sqlite3.connect('printers.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS printers
(ip TEXT PRIMARY KEY, status TEXT)''')
for printer_ip in printers:
c.execute("INSERT OR IGNORE INTO printers (ip, status) VALUES (?, ?)", (printer_ip, "frei"))
conn.commit()
conn.close()
@app.route('/printer_status', methods=['GET'])
def get_printer_status():
printer_status = []
conn = sqlite3.connect('printers.db')
c = conn.cursor()
for printer_ip in printers:
c.execute("SELECT status FROM printers WHERE ip = ?", (printer_ip,))
status = c.fetchone()[0]
try:
response = requests.get(f"http://{printer_ip}/api/printer/status")
if response.status_code == 200:
status_data = response.json()
printer_status.append({
"ip": printer_ip,
"status": status,
"progress": status_data["progress"],
"temperature": status_data["temperature"]
})
else:
printer_status.append({
"ip": printer_ip,
"status": "Fehler bei der Abfrage",
"progress": None,
"temperature": None
})
except:
printer_status.append({
"ip": printer_ip,
"status": "Drucker nicht erreichbar",
"progress": None,
"temperature": None
})
conn.close()
return jsonify(printer_status)
@app.route('/print_job', methods=['POST'])
def submit_print_job():
print_job = request.json
printer_ip = print_job["printer_ip"]
file_url = print_job["file_url"]
conn = sqlite3.connect('printers.db')
c = conn.cursor()
c.execute("SELECT status FROM printers WHERE ip = ?", (printer_ip,))
status = c.fetchone()[0]
if status == "frei":
try:
response = requests.post(f"http://{printer_ip}/api/print_job", json={"file_url": file_url})
if response.status_code == 200:
c.execute("UPDATE printers SET status = 'besetzt' WHERE ip = ?", (printer_ip,))
conn.commit()
conn.close()
return jsonify({"message": "Druckauftrag gestartet"}), 200
else:
conn.close()
return jsonify({"message": "Fehler beim Starten des Druckauftrags"}), 500
except:
conn.close()
return jsonify({"message": "Drucker nicht erreichbar"}), 500
else:
conn.close()
return jsonify({"message": "Drucker ist nicht frei"}), 400
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=5000)

View File

@ -1,38 +0,0 @@
# entwendet aus:
# https://github.com/ut-hnl-lab/ultimakerpy
# auch zum lesen:
# https://github.com/MartinBienz/SDPremote?tab=readme-ov-file
import time
from ultimakerpy import UMS3, JobState
def print_started(state):
if state == JobState.PRINTING:
time.sleep(6.0)
return True
return False
def layer_reached(pos, n):
if round(pos / 0.2) >= n: # set layer pitch: 0.2 mm
return True
return False
printer = UMS3(name='MyPrinterName')
targets = {
'job_state': printer.job_state,
'bed_pos': printer.bed.position,
}
printer.print_from_dialog() # select file to print
printer.peripherals.camera_streaming()
with printer.data_logger('output2.csv', targets) as dl:
timer = dl.get_timer()
# sleep until active leveling finishes
timer.wait_for_datalog('job_state', print_started)
for n in range(1, 101):
# sleep until the printing of specified layer to start
timer.wait_for_datalog('bed_pos', lambda v: layer_reached(v, n))
print('printing layer:', n)

View File

@ -1,148 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for, jsonify, session
import sqlite3
import bcrypt
app = Flask(__name__)
app.secret_key = 'supersecretkey'
# Database setup
def init_db():
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT, password TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS printers (id INTEGER PRIMARY KEY, name TEXT, status TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY, printer_id INTEGER, user TEXT, date TEXT, status TEXT)''')
conn.commit()
conn.close()
init_db()
# User registration (Admin setup)
def add_admin():
conn = sqlite3.connect('database.db')
c = conn.cursor()
hashed_pw = bcrypt.hashpw('adminpassword'.encode('utf-8'), bcrypt.gensalt())
c.execute("INSERT INTO users (username, password) VALUES (?, ?)", ('admin', hashed_pw))
conn.commit()
conn.close()
# Comment the next line after the first run
# add_admin()
# API Endpoints
@app.route('/api/printers/status', methods=['GET'])
def get_printer_status():
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("SELECT * FROM printers")
printers = c.fetchall()
conn.close()
return jsonify(printers)
@app.route('/api/printers/job', methods=['POST'])
def create_job():
if not session.get('logged_in'):
return jsonify({'error': 'Unauthorized'}), 403
data = request.json
user = session['username']
printer_id = data['printer_id']
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("SELECT status FROM printers WHERE id=?", (printer_id,))
status = c.fetchone()[0]
if status == 'frei':
c.execute("INSERT INTO jobs (printer_id, user, date, status) VALUES (?, ?, datetime('now'), 'in progress')",
(printer_id, user))
c.execute("UPDATE printers SET status='belegt' WHERE id=?", (printer_id,))
conn.commit()
elif status == 'belegt':
return jsonify({'error': 'Printer already in use'}), 409
else:
return jsonify({'error': 'Invalid printer status'}), 400
conn.close()
return jsonify({'message': 'Job created and printer turned on'}), 200
@app.route('/api/printers/reserve', methods=['POST'])
def reserve_printer():
if not session.get('logged_in'):
return jsonify({'error': 'Unauthorized'}), 403
data = request.json
printer_id = data['printer_id']
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("SELECT status FROM printers WHERE id=?", (printer_id,))
status = c.fetchone()[0]
if status == 'frei':
c.execute("UPDATE printers SET status='reserviert' WHERE id=?", (printer_id,))
conn.commit()
message = 'Printer reserved'
else:
message = 'Printer cannot be reserved'
conn.close()
return jsonify({'message': message}), 200
@app.route('/api/printers/release', methods=['POST'])
def release_printer():
if not session.get('logged_in'):
return jsonify({'error': 'Unauthorized'}), 403
data = request.json
printer_id = data['printer_id']
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("UPDATE printers SET status='frei' WHERE id=?", (printer_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Printer released'}), 200
# Authentication routes
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password'].encode('utf-8')
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("SELECT * FROM users WHERE username=?", (username,))
user = c.fetchone()
conn.close()
if user and bcrypt.checkpw(password, user[2].encode('utf-8')):
session['logged_in'] = True
session['username'] = username
return redirect(url_for('dashboard'))
else:
return render_template('login.html', error='Invalid Credentials')
return render_template('login.html')
@app.route('/dashboard')
def dashboard():
if not session.get('logged_in'):
return redirect(url_for('login'))
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("SELECT * FROM printers")
printers = c.fetchall()
conn.close()
return render_template('dashboard.html', printers=printers)
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
if __name__ == '__main__':
app.run(debug=True)

View File

@ -1,20 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>3D Printer Management</title>
<link href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css" rel="stylesheet">
<link href="https://cdn.jsdelivr.net/npm/daisyui@1.14.0/dist/full.css" rel="stylesheet">
</head>
<body class="bg-black text-white">
<nav class="bg-gray-800 p-4">
<div class="container mx-auto">
<h1 class="text-xl">3D Printer Management Dashboard</h1>
</div>
</nav>
<div class="container mx-auto mt-5">
{% block content %}{% endblock %}
</div>
</body>
</html>

View File

@ -1,29 +0,0 @@
{% extends "base.html" %}
{% block content %}
<h2 class="text-2xl mb-4">Printer Status</h2>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
{% for printer in printers %}
<div class="card bg-gray-900 shadow-xl">
<div class="card-body">
<h2 class="card-title">{{ printer[1] }}</h2>
<p>Status: {{ printer[2] }}</p>
{% if printer[2] == 'frei' %}
<form method="POST" action="/api/printers/job">
<input type="hidden" name="printer_id" value="{{ printer[0] }}">
<button class="btn btn-success mt-4 w-full">Start Job</button>
</form>
{% elif printer[2] == 'belegt' %}
<button class="btn btn-warning mt-4 w-full" disabled>In Use</button>
{% elif printer[2] == 'reserviert' %}
<form method="POST" action="/api/printers/release">
<input type="hidden" name="printer_id" value="{{ printer[0] }}">
<button class="btn btn-info mt-4 w-full">Release</button>
</form>
{% endif %}
</div>
</div>
{% endfor %}
</div>
<a href="/logout" class="btn btn-secondary mt-4">Logout</a>
{% endblock %}

View File

@ -1,33 +0,0 @@
{% extends "base.html" %}
{% block content %}
<div class="flex justify-center items-center h-screen">
<div class="card w-96 bg-gray-900 shadow-xl">
<div class="card-body">
<h2 class="card-title">Login</h2>
<form method="POST">
<div class="form-control">
<label class="label">
<span class="label-text">Username</span>
</label>
<input type="text" name="username" class="input input-bordered w-full" required>
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Password</span>
</label>
<input type="password" name="password" class="input input-bordered w-full" required>
</div>
<div class="form-control mt-6">
<button class="btn btn-primary w-full">Login</button>
</div>
</form>
{% if error %}
<div class="mt-4 text-red-500">
{{ error }}
</div>
{% endif %}
</div>
</div>
</div>
{% endblock %}

View File

@ -1,3 +0,0 @@
SECRET_KEY=dev-secret-key-change-in-production
DATABASE_URL=sqlite:///app.db
JWT_SECRET=dev-jwt-secret-change-in-production

View File

@ -1,3 +0,0 @@
SECRET_KEY=change-me-to-a-real-secret-key
DATABASE_URL=sqlite:///app.db
JWT_SECRET=change-me-to-a-real-jwt-secret

View File

@ -1,20 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Run database migrations
RUN mkdir -p /app/instance
ENV FLASK_APP=wsgi.py
# Expose port
EXPOSE 5000
# Run the application
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "wsgi:app"]

View File

@ -1,96 +0,0 @@
# Reservation Platform Backend
This is the Flask backend for the 3D Printer Reservation Platform, providing a RESTful API for managing printers, reservations, and users.
## Features
- User authentication with email/password
- Role-based permission system (admin, user)
- Printer management
- Reservation system
- User management
## API Endpoints
### Authentication
- `POST /auth/register` - Register a new user
- `POST /auth/login` - Login with username/email and password
- `POST /auth/logout` - Log out a user by invalidating their session
### Printers
- `GET /api/printers` - Get all printers
- `GET /api/printers/<printer_id>` - Get a specific printer
- `POST /api/printers` - Create a new printer (admin only)
- `PUT /api/printers/<printer_id>` - Update a printer (admin only)
- `DELETE /api/printers/<printer_id>` - Delete a printer (admin only)
- `GET /api/printers/availability` - Get availability information for all printers
### Print Jobs
- `GET /api/jobs` - Get jobs for the current user or all jobs for admin
- `GET /api/jobs/<job_id>` - Get a specific job
- `POST /api/jobs` - Create a new print job (reserve a printer)
- `PUT /api/jobs/<job_id>` - Update a job
- `DELETE /api/jobs/<job_id>` - Delete a job (cancel reservation)
- `GET /api/jobs/<job_id>/remaining-time` - Get remaining time for a job (public endpoint)
### Users
- `GET /api/users` - Get all users (admin only)
- `GET /api/users/<user_id>` - Get a specific user (admin only)
- `PUT /api/users/<user_id>` - Update a user (admin only)
- `DELETE /api/users/<user_id>` - Delete a user (admin only)
- `GET /api/me` - Get the current user's profile
- `PUT /api/me` - Update the current user's profile
## Installation
### Prerequisites
- Python 3.11 or higher
- pip
### Setup
1. Clone the repository
```bash
git clone https://github.com/your-repo/reservation-platform.git
cd reservation-platform/packages/flask-backend
```
2. Install dependencies
```bash
pip install -r requirements.txt
```
3. Create a `.env` file with the following variables:
```
SECRET_KEY=your-secret-key
DATABASE_URL=sqlite:///app.db
JWT_SECRET=your-jwt-secret
```
4. Initialize the database
```bash
flask db upgrade
python scripts/init_db.py
```
5. Run the development server
```bash
python wsgi.py
```
## Docker Deployment
1. Build and run with Docker Compose
```bash
docker-compose up -d
```
## Development
### Running Migrations
To create a new migration after updating models:
```bash
flask db migrate -m "Description of changes"
flask db upgrade
```

View File

@ -1,32 +0,0 @@
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_cors import CORS
from config import Config
db = SQLAlchemy()
migrate = Migrate()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
CORS(app)
# Register blueprints
from app.api import bp as api_bp
app.register_blueprint(api_bp, url_prefix='/api')
from app.auth import bp as auth_bp
app.register_blueprint(auth_bp, url_prefix='/auth')
@app.route('/health')
def health_check():
return {'status': 'ok'}
return app
from app import models

View File

@ -1,5 +0,0 @@
from flask import Blueprint
bp = Blueprint('api', __name__)
from app.api import printers, jobs, users

View File

@ -1,219 +0,0 @@
from flask import request, jsonify
from app import db
from app.api import bp
from app.models import PrintJob, Printer, User
from app.auth.routes import token_required, admin_required
from datetime import datetime, timedelta
@bp.route('/jobs', methods=['GET'])
@token_required
def get_jobs():
"""Get jobs for the current user or all jobs for admin"""
is_admin = request.user_role == 'admin'
user_id = request.user_id
# Parse query parameters
status = request.args.get('status') # active, upcoming, completed, aborted, all
printer_id = request.args.get('printer_id')
# Base query
query = PrintJob.query
# Filter by user unless admin
if not is_admin:
query = query.filter_by(user_id=user_id)
# Filter by printer if provided
if printer_id:
query = query.filter_by(printer_id=printer_id)
# Apply status filter
now = datetime.utcnow()
if status == 'active':
query = query.filter_by(aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now)
elif status == 'upcoming':
query = query.filter_by(aborted=False) \
.filter(PrintJob.start_at > now)
elif status == 'completed':
query = query.filter_by(aborted=False) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) <= now)
elif status == 'aborted':
query = query.filter_by(aborted=True)
# Order by start time, most recent first
query = query.order_by(PrintJob.start_at.desc())
# Execute query
jobs = query.all()
result = [job.to_dict() for job in jobs]
return jsonify(result)
@bp.route('/jobs/<job_id>', methods=['GET'])
@token_required
def get_job(job_id):
"""Get a specific job"""
job = PrintJob.query.get_or_404(job_id)
# Check permissions
is_admin = request.user_role == 'admin'
user_id = request.user_id
if not is_admin and job.user_id != user_id:
return jsonify({'error': 'Not authorized to view this job'}), 403
return jsonify(job.to_dict())
@bp.route('/jobs', methods=['POST'])
@token_required
def create_job():
"""Create a new print job (reserve a printer)"""
data = request.get_json() or {}
required_fields = ['printer_id', 'start_at', 'duration_in_minutes']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
# Validate printer
printer = Printer.query.get(data['printer_id'])
if not printer:
return jsonify({'error': 'Printer not found'}), 404
if printer.status != 0: # Not operational
return jsonify({'error': 'Printer is not operational'}), 400
# Parse start time
try:
start_at = datetime.fromisoformat(data['start_at'].replace('Z', '+00:00'))
except ValueError:
return jsonify({'error': 'Invalid start_at format'}), 400
# Validate duration
try:
duration = int(data['duration_in_minutes'])
if duration <= 0 or duration > 480: # Max 8 hours
return jsonify({'error': 'Invalid duration (must be between 1 and 480 minutes)'}), 400
except ValueError:
return jsonify({'error': 'Duration must be a number'}), 400
end_at = start_at + timedelta(minutes=duration)
# Check if the printer is available during the requested time
conflicting_jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(
(PrintJob.start_at < end_at) &
(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > start_at)
) \
.all()
if conflicting_jobs:
return jsonify({'error': 'Printer is not available during the requested time'}), 409
# Create job
job = PrintJob(
printer_id=data['printer_id'],
user_id=request.user_id,
start_at=start_at,
duration_in_minutes=duration,
comments=data.get('comments', '')
)
db.session.add(job)
db.session.commit()
return jsonify(job.to_dict()), 201
@bp.route('/jobs/<job_id>', methods=['PUT'])
@token_required
def update_job(job_id):
"""Update a job"""
job = PrintJob.query.get_or_404(job_id)
# Check permissions
is_admin = request.user_role == 'admin'
user_id = request.user_id
if not is_admin and job.user_id != user_id:
return jsonify({'error': 'Not authorized to update this job'}), 403
data = request.get_json() or {}
# Only allow certain fields to be updated
if 'comments' in data:
job.comments = data['comments']
# Admin or owner can abort a job
if 'aborted' in data and data['aborted'] and not job.aborted:
job.aborted = True
job.abort_reason = data.get('abort_reason', '')
# Admin or owner can extend a job if it's active
now = datetime.utcnow()
is_active = (not job.aborted and
job.start_at <= now and
job.get_end_time() > now)
if 'extend_minutes' in data and is_active:
try:
extend_minutes = int(data['extend_minutes'])
if extend_minutes <= 0 or extend_minutes > 120: # Max extend 2 hours
return jsonify({'error': 'Invalid extension (must be between 1 and 120 minutes)'}), 400
new_end_time = job.get_end_time() + timedelta(minutes=extend_minutes)
# Check for conflicts with the extension
conflicting_jobs = PrintJob.query.filter_by(printer_id=job.printer_id, aborted=False) \
.filter(PrintJob.id != job.id) \
.filter(PrintJob.start_at < new_end_time) \
.filter(PrintJob.start_at > job.get_end_time()) \
.all()
if conflicting_jobs:
return jsonify({'error': 'Cannot extend job due to conflicts with other reservations'}), 409
job.duration_in_minutes += extend_minutes
except ValueError:
return jsonify({'error': 'Extend minutes must be a number'}), 400
db.session.commit()
return jsonify(job.to_dict())
@bp.route('/jobs/<job_id>', methods=['DELETE'])
@token_required
def delete_job(job_id):
"""Delete a job (cancel reservation)"""
job = PrintJob.query.get_or_404(job_id)
# Check permissions
is_admin = request.user_role == 'admin'
user_id = request.user_id
if not is_admin and job.user_id != user_id:
return jsonify({'error': 'Not authorized to delete this job'}), 403
# Only allow deletion of upcoming jobs
now = datetime.utcnow()
if job.start_at <= now and not is_admin:
return jsonify({'error': 'Cannot delete an active or completed job'}), 400
db.session.delete(job)
db.session.commit()
return jsonify({'message': 'Job deleted successfully'})
@bp.route('/jobs/<job_id>/remaining-time', methods=['GET'])
def get_remaining_time(job_id):
"""Get remaining time for a job (public endpoint)"""
job = PrintJob.query.get_or_404(job_id)
remaining_seconds = job.get_remaining_time()
return jsonify({
'job_id': job.id,
'remaining_seconds': remaining_seconds,
'is_active': job.is_active()
})

View File

@ -1,177 +0,0 @@
from flask import request, jsonify
from app import db
from app.api import bp
from app.models import Printer, PrintJob
from app.auth.routes import token_required, admin_required
from datetime import datetime
@bp.route('/printers', methods=['GET'])
def get_printers():
"""Get all printers"""
printers = Printer.query.all()
result = []
for printer in printers:
# Get active job for the printer if any
now = datetime.utcnow()
active_job = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now) \
.first()
printer_data = {
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status,
'is_available': printer.status == 0 and active_job is None,
'active_job': active_job.to_dict() if active_job else None
}
result.append(printer_data)
return jsonify(result)
@bp.route('/printers/<printer_id>', methods=['GET'])
def get_printer(printer_id):
"""Get a specific printer"""
printer = Printer.query.get_or_404(printer_id)
# Get active job for the printer if any
now = datetime.utcnow()
active_job = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now) \
.first()
# Get upcoming jobs
upcoming_jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at > now) \
.order_by(PrintJob.start_at) \
.limit(5) \
.all()
result = {
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status,
'is_available': printer.status == 0 and active_job is None,
'active_job': active_job.to_dict() if active_job else None,
'upcoming_jobs': [job.to_dict() for job in upcoming_jobs]
}
return jsonify(result)
@bp.route('/printers', methods=['POST'])
@admin_required
def create_printer():
"""Create a new printer (admin only)"""
data = request.get_json() or {}
required_fields = ['name', 'description']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
printer = Printer(
name=data['name'],
description=data['description'],
status=data.get('status', 0)
)
db.session.add(printer)
db.session.commit()
return jsonify({
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status
}), 201
@bp.route('/printers/<printer_id>', methods=['PUT'])
@admin_required
def update_printer(printer_id):
"""Update a printer (admin only)"""
printer = Printer.query.get_or_404(printer_id)
data = request.get_json() or {}
if 'name' in data:
printer.name = data['name']
if 'description' in data:
printer.description = data['description']
if 'status' in data:
printer.status = data['status']
db.session.commit()
return jsonify({
'id': printer.id,
'name': printer.name,
'description': printer.description,
'status': printer.status
})
@bp.route('/printers/<printer_id>', methods=['DELETE'])
@admin_required
def delete_printer(printer_id):
"""Delete a printer (admin only)"""
printer = Printer.query.get_or_404(printer_id)
# Check if the printer has active jobs
now = datetime.utcnow()
active_jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(PrintJob.start_at <= now) \
.filter(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) > now) \
.all()
if active_jobs:
return jsonify({'error': 'Cannot delete printer with active jobs'}), 400
db.session.delete(printer)
db.session.commit()
return jsonify({'message': 'Printer deleted successfully'})
@bp.route('/printers/availability', methods=['GET'])
def get_availability():
"""Get availability information for all printers"""
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if not start_date or not end_date:
return jsonify({'error': 'start_date and end_date are required'}), 400
try:
start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
return jsonify({'error': 'Invalid date format'}), 400
if start >= end:
return jsonify({'error': 'start_date must be before end_date'}), 400
printers = Printer.query.all()
result = []
for printer in printers:
# Get all jobs for this printer in the date range
jobs = PrintJob.query.filter_by(printer_id=printer.id, aborted=False) \
.filter(
(PrintJob.start_at <= end) &
(PrintJob.start_at.op('+')(PrintJob.duration_in_minutes * 60) >= start)
) \
.order_by(PrintJob.start_at) \
.all()
# Convert to availability slots
availability = {
'printer_id': printer.id,
'printer_name': printer.name,
'status': printer.status,
'jobs': [job.to_dict() for job in jobs]
}
result.append(availability)
return jsonify(result)

View File

@ -1,139 +0,0 @@
from flask import request, jsonify
from app import db
from app.api import bp
from app.models import User, PrintJob
from app.auth.routes import admin_required, token_required
@bp.route('/users', methods=['GET'])
@admin_required
def get_users():
"""Get all users (admin only)"""
users = User.query.all()
result = []
for user in users:
# Count jobs
total_jobs = PrintJob.query.filter_by(user_id=user.id).count()
active_jobs = PrintJob.query.filter_by(user_id=user.id, aborted=False).count()
user_data = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role,
'job_count': total_jobs,
'active_job_count': active_jobs
}
result.append(user_data)
return jsonify(result)
@bp.route('/users/<user_id>', methods=['GET'])
@admin_required
def get_user(user_id):
"""Get a specific user (admin only)"""
user = User.query.get_or_404(user_id)
# Count jobs
total_jobs = PrintJob.query.filter_by(user_id=user.id).count()
active_jobs = PrintJob.query.filter_by(user_id=user.id, aborted=False).count()
result = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role,
'job_count': total_jobs,
'active_job_count': active_jobs
}
return jsonify(result)
@bp.route('/users/<user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""Update a user (admin only)"""
user = User.query.get_or_404(user_id)
data = request.get_json() or {}
if 'role' in data and data['role'] in ['admin', 'user', 'guest']:
user.role = data['role']
if 'display_name' in data:
user.display_name = data['display_name']
db.session.commit()
return jsonify({
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role
})
@bp.route('/users/<user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
"""Delete a user (admin only)"""
user = User.query.get_or_404(user_id)
# Check if user has active jobs
active_jobs = PrintJob.query.filter_by(user_id=user.id, aborted=False).first()
if active_jobs:
return jsonify({'error': 'Cannot delete user with active jobs'}), 400
db.session.delete(user)
db.session.commit()
return jsonify({'message': 'User deleted successfully'})
@bp.route('/me', methods=['GET'])
@token_required
def get_current_user():
"""Get the current user's profile"""
user = User.query.get(request.user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
result = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role
}
return jsonify(result)
@bp.route('/me', methods=['PUT'])
@token_required
def update_current_user():
"""Update the current user's profile"""
user = User.query.get(request.user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
data = request.get_json() or {}
if 'display_name' in data:
user.display_name = data['display_name']
db.session.commit()
result = {
'id': user.id,
'github_id': user.github_id,
'username': user.username,
'display_name': user.display_name,
'email': user.email,
'role': user.role
}
return jsonify(result)

View File

@ -1,5 +0,0 @@
from flask import Blueprint
bp = Blueprint('auth', __name__)
from app.auth import routes

View File

@ -1,156 +0,0 @@
from flask import request, jsonify, current_app
from app import db
from app.auth import bp
from app.models import User, Session
from datetime import datetime, timedelta
import time
import functools
import re
@bp.route('/register', methods=['POST'])
def register():
"""Register a new user"""
data = request.get_json() or {}
# Validate required fields
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
# Validate email format
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, data['email']):
return jsonify({'error': 'Invalid email format'}), 400
# Validate password strength (at least 8 characters)
if len(data['password']) < 8:
return jsonify({'error': 'Password must be at least 8 characters long'}), 400
# Check if username already exists
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
# Check if email already exists
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already exists'}), 400
# Create new user
user = User(
username=data['username'],
email=data['email'],
display_name=data.get('display_name', data['username']),
role='user' # Default role
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'display_name': user.display_name,
'role': user.role
}), 201
@bp.route('/login', methods=['POST'])
def login():
"""Login a user with username/email and password"""
data = request.get_json() or {}
# Validate required fields
if 'password' not in data:
return jsonify({'error': 'Password is required'}), 400
if 'username' not in data and 'email' not in data:
return jsonify({'error': 'Username or email is required'}), 400
# Find user by username or email
user = None
if 'username' in data:
user = User.query.filter_by(username=data['username']).first()
else:
user = User.query.filter_by(email=data['email']).first()
# Check if user exists and verify password
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid credentials'}), 401
# Create a session for the user
expires_at = int((datetime.utcnow() + timedelta(days=7)).timestamp())
session = Session(
user_id=user.id,
expires_at=expires_at
)
db.session.add(session)
db.session.commit()
# Generate JWT token
token = user.generate_token()
return jsonify({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'display_name': user.display_name,
'role': user.role
}
})
@bp.route('/logout', methods=['POST'])
def logout():
"""Log out a user by invalidating their session"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header required'}), 401
token = auth_header.split(' ')[1]
payload = User.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
# Delete all sessions for this user
Session.query.filter_by(user_id=payload['user_id']).delete()
db.session.commit()
return jsonify({'message': 'Successfully logged out'})
def token_required(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header required'}), 401
token = auth_header.split(' ')[1]
payload = User.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
# Check if user has an active session
user_id = payload['user_id']
current_time = int(time.time())
session = Session.query.filter_by(user_id=user_id).filter(Session.expires_at > current_time).first()
if not session:
return jsonify({'error': 'No active session found'}), 401
# Add user to request context
request.user_id = user_id
request.user_role = payload['role']
return f(*args, **kwargs)
return decorated
def admin_required(f):
@functools.wraps(f)
@token_required
def decorated(*args, **kwargs):
if request.user_role != 'admin':
return jsonify({'error': 'Admin privileges required'}), 403
return f(*args, **kwargs)
return decorated

View File

@ -1,124 +0,0 @@
from app import db
import uuid
from datetime import datetime, timedelta
import jwt
from config import Config
import bcrypt
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
username = db.Column(db.String(64), index=True, unique=True, nullable=False)
display_name = db.Column(db.String(120))
email = db.Column(db.String(120), index=True, unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
role = db.Column(db.String(20), default='user')
print_jobs = db.relationship('PrintJob', backref='user', lazy='dynamic', cascade='all, delete-orphan')
sessions = db.relationship('Session', backref='user', lazy='dynamic', cascade='all, delete-orphan')
def set_password(self, password):
"""Hash and set the user's password"""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
def check_password(self, password):
"""Check if the provided password matches the stored hash"""
password_bytes = password.encode('utf-8')
stored_hash = self.password_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, stored_hash)
def generate_token(self):
"""Generate a JWT token for this user"""
payload = {
'user_id': self.id,
'username': self.username,
'email': self.email,
'role': self.role,
'exp': datetime.utcnow() + timedelta(seconds=Config.JWT_ACCESS_TOKEN_EXPIRES)
}
return jwt.encode(payload, Config.JWT_SECRET, algorithm='HS256')
@staticmethod
def verify_token(token):
"""Verify and decode a JWT token"""
try:
payload = jwt.decode(token, Config.JWT_SECRET, algorithms=['HS256'])
return payload
except:
return None
class Session(db.Model):
__tablename__ = 'session'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
user_id = db.Column(db.String(36), db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
expires_at = db.Column(db.Integer, nullable=False)
class Printer(db.Model):
__tablename__ = 'printer'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name = db.Column(db.String(120), nullable=False)
description = db.Column(db.Text, nullable=False)
status = db.Column(db.Integer, nullable=False, default=0) # 0: OPERATIONAL, 1: OUT_OF_ORDER
print_jobs = db.relationship('PrintJob', backref='printer', lazy='dynamic', cascade='all, delete-orphan')
class PrintJob(db.Model):
__tablename__ = 'printJob'
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
printer_id = db.Column(db.String(36), db.ForeignKey('printer.id', ondelete='CASCADE'), nullable=False)
user_id = db.Column(db.String(36), db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
start_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
duration_in_minutes = db.Column(db.Integer, nullable=False)
comments = db.Column(db.Text)
aborted = db.Column(db.Boolean, nullable=False, default=False)
abort_reason = db.Column(db.Text)
def get_end_time(self):
return self.start_at + timedelta(minutes=self.duration_in_minutes)
def is_active(self):
now = datetime.utcnow()
return (not self.aborted and
self.start_at <= now and
now < self.get_end_time())
def get_remaining_time(self):
if self.aborted:
return 0
now = datetime.utcnow()
if now < self.start_at:
# Job hasn't started yet
return self.duration_in_minutes * 60
end_time = self.get_end_time()
if now >= end_time:
# Job has ended
return 0
# Job is ongoing
remaining_seconds = (end_time - now).total_seconds()
return int(remaining_seconds)
def to_dict(self):
return {
'id': self.id,
'printer_id': self.printer_id,
'user_id': self.user_id,
'start_at': self.start_at.isoformat(),
'duration_in_minutes': self.duration_in_minutes,
'comments': self.comments,
'aborted': self.aborted,
'abort_reason': self.abort_reason,
'remaining_time': self.get_remaining_time(),
'is_active': self.is_active()
}

View File

@ -1,13 +0,0 @@
import os
from dotenv import load_dotenv
basedir = os.path.abspath(os.path.dirname(__file__))
load_dotenv(os.path.join(basedir, '.env'))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET = os.environ.get('JWT_SECRET') or 'jwt-secret-key'
JWT_ACCESS_TOKEN_EXPIRES = 3600 # 1 hour in seconds

View File

@ -1,20 +0,0 @@
version: '3.8'
services:
flask-backend:
build:
context: .
dockerfile: Dockerfile
restart: always
ports:
- "5000:5000"
environment:
- SECRET_KEY=your-secret-key
- DATABASE_URL=sqlite:///app.db
- JWT_SECRET=your-jwt-secret
volumes:
- ./instance:/app/instance
command: >
bash -c "python -m flask db upgrade &&
python scripts/init_db.py &&
gunicorn --bind 0.0.0.0:5000 wsgi:app"

View File

@ -1,89 +0,0 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -1,91 +0,0 @@
from __future__ import with_statement
import logging
from logging.config import fileConfig
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option(
'sqlalchemy.url',
str(current_app.extensions['migrate'].db.get_engine().url).replace(
'%', '%%'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
connectable = current_app.extensions['migrate'].db.get_engine()
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -1,24 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -1,75 +0,0 @@
"""Initial migration
Revision ID: initial_migration
Revises:
Create Date: 2025-03-06 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'initial_migration'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create user table
op.create_table('user',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('username', sa.String(length=64), nullable=False),
sa.Column('display_name', sa.String(length=120), nullable=True),
sa.Column('email', sa.String(length=120), nullable=False),
sa.Column('password_hash', sa.String(length=128), nullable=False),
sa.Column('role', sa.String(length=20), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_username'), 'user', ['username'], unique=True)
# Create session table
op.create_table('session',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('user_id', sa.String(length=36), nullable=False),
sa.Column('expires_at', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
# Create printer table
op.create_table('printer',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=120), nullable=False),
sa.Column('description', sa.Text(), nullable=False),
sa.Column('status', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# Create printJob table
op.create_table('printJob',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('printer_id', sa.String(length=36), nullable=False),
sa.Column('user_id', sa.String(length=36), nullable=False),
sa.Column('start_at', sa.DateTime(), nullable=False),
sa.Column('duration_in_minutes', sa.Integer(), nullable=False),
sa.Column('comments', sa.Text(), nullable=True),
sa.Column('aborted', sa.Boolean(), nullable=False),
sa.Column('abort_reason', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['printer_id'], ['printer.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
def downgrade():
op.drop_table('printJob')
op.drop_table('printer')
op.drop_table('session')
op.drop_index(op.f('ix_user_username'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')

View File

@ -1,9 +0,0 @@
Flask==2.3.3
Flask-SQLAlchemy==3.1.1
Flask-Migrate==4.0.5
Flask-CORS==4.0.0
python-dotenv==1.0.0
SQLAlchemy==2.0.25
pyjwt==2.8.0
bcrypt==4.1.2
gunicorn==21.2.0

View File

@ -1,23 +0,0 @@
#!/bin/bash
# Initialize virtual environment if it doesn't exist
if [ ! -d "venv" ]; then
echo "Creating virtual environment..."
python3 -m venv venv
fi
# Activate virtual environment
source venv/bin/activate
# Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt
# Initialize database
echo "Initializing database..."
flask db upgrade
python scripts/init_db.py
# Run the application
echo "Starting Flask application..."
python wsgi.py

View File

@ -1,55 +0,0 @@
#!/usr/bin/env python
from app import create_app, db
from app.models import User, Printer
import uuid
def init_db():
app = create_app()
with app.app_context():
# Create tables
db.create_all()
# Check if we already have an admin user
admin = User.query.filter_by(role='admin').first()
if not admin:
# Create admin user
admin = User(
id=str(uuid.uuid4()),
username='admin',
display_name='Administrator',
email='admin@example.com',
role='admin'
)
admin.set_password('admin123') # Default password, change in production!
db.session.add(admin)
print("Created admin user with username 'admin' and password 'admin123'")
# Check if we have any printers
printer_count = Printer.query.count()
if printer_count == 0:
# Create sample printers
printers = [
Printer(
name='Printer 1',
description='3D Printer for general use',
status=0 # OPERATIONAL
),
Printer(
name='Printer 2',
description='High resolution printer for detailed work',
status=0 # OPERATIONAL
),
Printer(
name='Printer 3',
description='Large format printer for big projects',
status=0 # OPERATIONAL
)
]
db.session.add_all(printers)
print("Created sample printers")
db.session.commit()
print("Database initialized successfully!")
if __name__ == '__main__':
init_db()

View File

@ -1,6 +0,0 @@
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)