Ersetze Flask-SQLAlchemy durch direktes SQLite

- Direkte Nutzung von SQLite3 statt Flask-SQLAlchemy/Flask-Migrate
- Vereinfachung der Datenbankinitialisierung
- Automatische Erstellung der Datenbank beim Serverstart, falls nicht vorhanden
- ORM-Klassen mit direkten SQLite-Methoden implementiert
- Entfernung nicht benötigter Abhängigkeiten
This commit is contained in:
root 2025-03-11 11:29:29 +01:00
parent e31c4036d7
commit 2adafb149a
4 changed files with 533 additions and 187 deletions

View File

@ -1,7 +1,5 @@
from flask import Flask, request, jsonify, g, redirect, url_for, session as flask_session, render_template, flash
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
import jwt
@ -11,11 +9,11 @@ import json
import logging
import uuid
import asyncio
import sqlite3
from logging.handlers import RotatingFileHandler
from datetime import timedelta
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
import sqlite3
from tapo import ApiClient
from dotenv import load_dotenv
@ -28,8 +26,7 @@ CORS(app, supports_credentials=True)
# Konfiguration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///myp.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['DATABASE'] = os.environ.get('DATABASE_PATH', 'instance/myp.db')
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SECURE'] = os.environ.get('FLASK_ENV') == 'production'
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
@ -52,28 +49,144 @@ app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('MYP Backend starting up')
# DB Setup
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# Database functions
def get_db():
if 'db' not in g:
# Stelle sicher, dass das instance-Verzeichnis existiert
os.makedirs(os.path.dirname(app.config['DATABASE']), exist_ok=True)
g.db = sqlite3.connect(app.config['DATABASE'])
g.db.row_factory = sqlite3.Row
return g.db
# Lokale Authentifizierung statt OAuth
def close_db(e=None):
db = g.pop('db', None)
if db is not None:
db.close()
# Models
class User(db.Model):
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
username = db.Column(db.String(64), index=True, unique=True)
password_hash = db.Column(db.String(128))
display_name = db.Column(db.String(100))
email = db.Column(db.String(120), index=True, unique=True)
role = db.Column(db.String(20), default='user') # admin, user, guest
jobs = db.relationship('PrintJob', backref='user', lazy='dynamic')
def init_db():
"""Initialisiere die Datenbank, falls sie noch nicht existiert."""
db = get_db()
db.execute('PRAGMA foreign_keys = ON') # SQLite-Fremdschlüsselunterstützung aktivieren
# Tabellen erstellen
db.executescript('''
CREATE TABLE IF NOT EXISTS user (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT,
email TEXT UNIQUE,
role TEXT DEFAULT 'user'
);
CREATE TABLE IF NOT EXISTS session (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS printer (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT NOT NULL,
status INTEGER DEFAULT 0,
ip_address TEXT
);
CREATE TABLE IF NOT EXISTS print_job (
id TEXT PRIMARY KEY,
printer_id TEXT NOT NULL,
user_id TEXT NOT NULL,
start_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
duration_in_minutes INTEGER NOT NULL,
comments TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
FOREIGN KEY (printer_id) REFERENCES printer (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
);
''')
db.commit()
# Initialisiere die Datenbank beim Starten der Anwendung
with app.app_context():
init_db()
app.teardown_appcontext(close_db)
class User:
@staticmethod
def get_by_id(user_id):
db = get_db()
row = db.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
if row:
return User.from_row(row)
return None
@staticmethod
def get_by_username(username):
db = get_db()
row = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if row:
return User.from_row(row)
return None
@staticmethod
def get_all():
db = get_db()
rows = db.execute('SELECT * FROM user').fetchall()
return [User.from_row(row) for row in rows]
@staticmethod
def from_row(row):
user = User()
user.id = row['id']
user.username = row['username']
user.password_hash = row['password_hash']
user.display_name = row['display_name']
user.email = row['email']
user.role = row['role']
return user
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.username = kwargs.get('username')
self.password_hash = kwargs.get('password_hash')
self.display_name = kwargs.get('display_name')
self.email = kwargs.get('email')
self.role = kwargs.get('role', 'user')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def save(self):
db = get_db()
db.execute(
'INSERT INTO user (id, username, password_hash, display_name, email, role) VALUES (?, ?, ?, ?, ?, ?)',
(self.id, self.username, self.password_hash, self.display_name, self.email, self.role)
)
db.commit()
return self
def update(self):
db = get_db()
db.execute(
'UPDATE user SET username = ?, password_hash = ?, display_name = ?, email = ?, role = ? WHERE id = ?',
(self.username, self.password_hash, self.display_name, self.email, self.role, self.id)
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM user WHERE id = ?', (self.id,))
db.commit()
def to_dict(self):
return {
'id': self.id,
@ -83,22 +196,119 @@ class User(db.Model):
'role': self.role
}
class Session(db.Model):
id = db.Column(db.String(36), primary_key=True)
user_id = db.Column(db.String(36), db.ForeignKey('user.id'), nullable=False)
expires_at = db.Column(db.DateTime, nullable=False)
user = db.relationship('User', backref=db.backref('sessions', lazy=True))
class Session:
@staticmethod
def get_by_id(session_id):
db = get_db()
row = db.execute('SELECT * FROM session WHERE id = ?', (session_id,)).fetchone()
if row:
return Session.from_row(row)
return None
@staticmethod
def delete_by_user_id(user_id):
db = get_db()
db.execute('DELETE FROM session WHERE user_id = ?', (user_id,))
db.commit()
@staticmethod
def from_row(row):
session = Session()
session.id = row['id']
session.user_id = row['user_id']
session.expires_at = datetime.datetime.fromisoformat(row['expires_at'])
return session
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.user_id = kwargs.get('user_id')
self.expires_at = kwargs.get('expires_at')
def save(self):
db = get_db()
db.execute(
'INSERT INTO session (id, user_id, expires_at) VALUES (?, ?, ?)',
(self.id, self.user_id, self.expires_at.isoformat())
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM session WHERE id = ?', (self.id,))
db.commit()
@property
def user(self):
return User.get_by_id(self.user_id)
class Printer(db.Model):
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name = db.Column(db.String(64), index=True, nullable=False)
description = db.Column(db.Text, nullable=False)
status = db.Column(db.Integer, default=0) # 0=available, 1=busy, 2=maintenance
ip_address = db.Column(db.String(15), nullable=True) # IP-Adresse der Tapo-Steckdose
jobs = db.relationship('PrintJob', backref='printer', lazy='dynamic')
class Printer:
@staticmethod
def get_by_id(printer_id):
db = get_db()
row = db.execute('SELECT * FROM printer WHERE id = ?', (printer_id,)).fetchone()
if row:
return Printer.from_row(row)
return None
@staticmethod
def get_all():
db = get_db()
rows = db.execute('SELECT * FROM printer').fetchall()
return [Printer.from_row(row) for row in rows]
@staticmethod
def from_row(row):
printer = Printer()
printer.id = row['id']
printer.name = row['name']
printer.description = row['description']
printer.status = row['status']
printer.ip_address = row['ip_address']
return printer
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.name = kwargs.get('name')
self.description = kwargs.get('description')
self.status = kwargs.get('status', 0)
self.ip_address = kwargs.get('ip_address')
def save(self):
db = get_db()
db.execute(
'INSERT INTO printer (id, name, description, status, ip_address) VALUES (?, ?, ?, ?, ?)',
(self.id, self.name, self.description, self.status, self.ip_address)
)
db.commit()
return self
def update(self):
db = get_db()
db.execute(
'UPDATE printer SET name = ?, description = ?, status = ?, ip_address = ? WHERE id = ?',
(self.name, self.description, self.status, self.ip_address, self.id)
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM printer WHERE id = ?', (self.id,))
db.commit()
def get_latest_job(self):
return PrintJob.query.filter_by(printer_id=self.id).order_by(PrintJob.start_at.desc()).first()
db = get_db()
row = db.execute('''
SELECT * FROM print_job
WHERE printer_id = ?
ORDER BY start_at DESC
LIMIT 1
''', (self.id,)).fetchone()
if row:
return PrintJob.from_row(row)
return None
def to_dict(self):
latest_job = self.get_latest_job()
@ -110,15 +320,90 @@ class Printer(db.Model):
'latestJob': latest_job.to_dict() if latest_job else None
}
class PrintJob(db.Model):
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
printer_id = db.Column(db.String(36), db.ForeignKey('printer.id'), nullable=False)
user_id = db.Column(db.String(36), db.ForeignKey('user.id'), nullable=False)
start_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
duration_in_minutes = db.Column(db.Integer, nullable=False)
comments = db.Column(db.Text, nullable=True)
aborted = db.Column(db.Boolean, default=False)
abort_reason = db.Column(db.Text, nullable=True)
class PrintJob:
@staticmethod
def get_by_id(job_id):
db = get_db()
row = db.execute('SELECT * FROM print_job WHERE id = ?', (job_id,)).fetchone()
if row:
return PrintJob.from_row(row)
return None
@staticmethod
def get_by_user_id(user_id):
db = get_db()
rows = db.execute('SELECT * FROM print_job WHERE user_id = ?', (user_id,)).fetchall()
return [PrintJob.from_row(row) for row in rows]
@staticmethod
def get_all():
db = get_db()
rows = db.execute('SELECT * FROM print_job').fetchall()
return [PrintJob.from_row(row) for row in rows]
@staticmethod
def get_expired_jobs():
db = get_db()
now = datetime.datetime.utcnow().isoformat()
rows = db.execute('''
SELECT * FROM print_job
WHERE aborted = 0
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') <= datetime(?)
''', (now,)).fetchall()
return [PrintJob.from_row(row) for row in rows]
@staticmethod
def from_row(row):
job = PrintJob()
job.id = row['id']
job.printer_id = row['printer_id']
job.user_id = row['user_id']
job.start_at = datetime.datetime.fromisoformat(row['start_at'])
job.duration_in_minutes = row['duration_in_minutes']
job.comments = row['comments']
job.aborted = bool(row['aborted'])
job.abort_reason = row['abort_reason']
return job
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.printer_id = kwargs.get('printer_id')
self.user_id = kwargs.get('user_id')
self.start_at = kwargs.get('start_at', datetime.datetime.utcnow())
self.duration_in_minutes = kwargs.get('duration_in_minutes')
self.comments = kwargs.get('comments')
self.aborted = kwargs.get('aborted', False)
self.abort_reason = kwargs.get('abort_reason')
def save(self):
db = get_db()
db.execute(
'''INSERT INTO print_job
(id, printer_id, user_id, start_at, duration_in_minutes, comments, aborted, abort_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(self.id, self.printer_id, self.user_id, self.start_at.isoformat(),
self.duration_in_minutes, self.comments, int(self.aborted), self.abort_reason)
)
db.commit()
return self
def update(self):
db = get_db()
db.execute(
'''UPDATE print_job SET
printer_id = ?, user_id = ?, start_at = ?, duration_in_minutes = ?,
comments = ?, aborted = ?, abort_reason = ?
WHERE id = ?''',
(self.printer_id, self.user_id, self.start_at.isoformat(), self.duration_in_minutes,
self.comments, int(self.aborted), self.abort_reason, self.id)
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM print_job WHERE id = ?', (self.id,))
db.commit()
@property
def end_at(self):
@ -198,11 +483,10 @@ def get_current_user():
if not session_id:
return None
session = Session.query.get(session_id)
session = Session.get_by_id(session_id)
if not session or session.expires_at < datetime.datetime.utcnow():
if session:
db.session.delete(session)
db.session.commit()
session.delete()
flask_session.pop('session_id', None)
return None
@ -234,8 +518,7 @@ def create_session(user):
expires_at = datetime.datetime.utcnow() + timedelta(days=7)
session = Session(id=session_id, user_id=user.id, expires_at=expires_at)
db.session.add(session)
db.session.commit()
session.save()
flask_session['session_id'] = session_id
flask_session.permanent = True
@ -255,14 +538,12 @@ def register():
display_name = data.get('displayName', username)
email = data.get('email', '')
if User.query.filter_by(username=username).first():
if User.get_by_username(username):
return jsonify({'message': 'Benutzername bereits vergeben!'}), 400
if email and User.query.filter_by(email=email).first():
return jsonify({'message': 'E-Mail-Adresse bereits registriert!'}), 400
# Prüfen, ob es bereits einen Admin gibt
admin_exists = User.query.filter_by(role='admin').first() is not None
db = get_db()
admin_exists = db.execute("SELECT 1 FROM user WHERE role = 'admin' LIMIT 1").fetchone() is not None
# Falls kein Admin existiert, wird der erste Benutzer zum Admin
role = 'admin' if not admin_exists else 'user'
@ -274,9 +555,8 @@ def register():
role=role
)
user.set_password(password)
user.save()
db.session.add(user)
db.session.commit()
app.logger.info(f'Neuer Benutzer registriert: {username} (Rolle: {role})')
# Session erstellen
@ -297,7 +577,7 @@ def login():
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
user = User.get_by_username(username)
if not user or not user.check_password(password):
return jsonify({'message': 'Ungültiger Benutzername oder Passwort!'}), 401
@ -314,10 +594,9 @@ def login():
def logout():
session_id = flask_session.get('session_id')
if session_id:
session = Session.query.get(session_id)
session = Session.get_by_id(session_id)
if session:
db.session.delete(session)
db.session.commit()
session.delete()
flask_session.pop('session_id', None)
@ -337,7 +616,7 @@ def get_me():
@app.route('/api/printers', methods=['GET'])
def get_printers():
printers = Printer.query.all()
printers = Printer.get_all()
return jsonify([printer.to_dict() for printer in printers])
@app.route('/api/printers', methods=['POST'])
@ -356,21 +635,25 @@ def create_printer():
ip_address=data.get('ipAddress')
)
db.session.add(printer)
db.session.commit()
printer.save()
return jsonify(printer.to_dict()), 201
@app.route('/api/printers/<printer_id>', methods=['GET'])
def get_printer(printer_id):
printer = Printer.query.get_or_404(printer_id)
printer = Printer.get_by_id(printer_id)
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
return jsonify(printer.to_dict())
@app.route('/api/printers/<printer_id>', methods=['PUT'])
@login_required
@admin_required
def update_printer(printer_id):
printer = Printer.query.get_or_404(printer_id)
printer = Printer.get_by_id(printer_id)
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
data = request.get_json()
if data.get('name'):
@ -382,16 +665,18 @@ def update_printer(printer_id):
if data.get('ipAddress'):
printer.ip_address = data['ipAddress']
db.session.commit()
printer.update()
return jsonify(printer.to_dict())
@app.route('/api/printers/<printer_id>', methods=['DELETE'])
@login_required
@admin_required
def delete_printer(printer_id):
printer = Printer.query.get_or_404(printer_id)
db.session.delete(printer)
db.session.commit()
printer = Printer.get_by_id(printer_id)
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
printer.delete()
return jsonify({'message': 'Drucker gelöscht!'})
@app.route('/api/jobs', methods=['GET'])
@ -399,9 +684,9 @@ def delete_printer(printer_id):
def get_jobs():
# Admins sehen alle Jobs, normale Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
jobs = PrintJob.query.all()
jobs = PrintJob.get_all()
else:
jobs = PrintJob.query.filter_by(user_id=g.current_user.id).all()
jobs = PrintJob.get_by_user_id(g.current_user.id)
return jsonify([job.to_dict() for job in jobs])
@ -413,7 +698,9 @@ def create_job():
if not data or not data.get('printerId') or not data.get('durationInMinutes'):
return jsonify({'message': 'Drucker-ID und Dauer sind erforderlich!'}), 400
printer = Printer.query.get_or_404(data['printerId'])
printer = Printer.get_by_id(data['printerId'])
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
if printer.status != 0: # 0 = available
return jsonify({'message': 'Drucker ist nicht verfügbar!'}), 400
@ -429,9 +716,9 @@ def create_job():
# Drucker als belegt markieren
printer.status = 1 # 1 = busy
printer.update()
db.session.add(job)
db.session.commit()
job.save()
# Steckdose einschalten, falls IP-Adresse hinterlegt ist
if printer.ip_address:
@ -446,10 +733,12 @@ def create_job():
@login_required
def get_job(job_id):
# Admins können alle Jobs sehen, Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
job = PrintJob.query.get_or_404(job_id)
else:
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
return jsonify(job.to_dict())
@ -457,24 +746,27 @@ def get_job(job_id):
@login_required
def abort_job(job_id):
# Admins können alle Jobs abbrechen, Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
job = PrintJob.query.get_or_404(job_id)
else:
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
data = request.get_json()
job.aborted = True
job.abort_reason = data.get('reason', '')
job.update()
# Drucker wieder verfügbar machen
printer = job.printer
printer.status = 0 # 0 = available
db.session.commit()
printer = Printer.get_by_id(job.printer_id)
if printer:
printer.status = 0 # 0 = available
printer.update()
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer.ip_address:
if printer and printer.ip_address:
try:
asyncio.run(tapo_control.turn_off(printer.ip_address))
except Exception as e:
@ -486,24 +778,27 @@ def abort_job(job_id):
@login_required
def finish_job(job_id):
# Admins können alle Jobs beenden, Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
job = PrintJob.query.get_or_404(job_id)
else:
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
# Aktuelle Zeit als Ende setzen
now = datetime.datetime.utcnow()
actual_duration = int((now - job.start_at).total_seconds() / 60)
job.duration_in_minutes = actual_duration
job.update()
# Drucker wieder verfügbar machen
printer = job.printer
printer.status = 0 # 0 = available
db.session.commit()
printer = Printer.get_by_id(job.printer_id)
if printer:
printer.status = 0 # 0 = available
printer.update()
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer.ip_address:
if printer and printer.ip_address:
try:
asyncio.run(tapo_control.turn_off(printer.ip_address))
except Exception as e:
@ -515,10 +810,12 @@ def finish_job(job_id):
@login_required
def extend_job(job_id):
# Admins können alle Jobs verlängern, Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
job = PrintJob.query.get_or_404(job_id)
else:
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
data = request.get_json()
minutes = int(data.get('minutes', 0))
@ -529,8 +826,7 @@ def extend_job(job_id):
return jsonify({'message': 'Ungültige Verlängerungszeit!'}), 400
job.duration_in_minutes += additional_minutes
db.session.commit()
job.update()
return jsonify(job.to_dict())
@ -538,21 +834,24 @@ def extend_job(job_id):
@login_required
def update_job_comments(job_id):
# Admins können alle Jobs bearbeiten, Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
job = PrintJob.query.get_or_404(job_id)
else:
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
data = request.get_json()
job.comments = data.get('comments', '')
db.session.commit()
job.update()
return jsonify(job.to_dict())
@app.route('/api/job/<job_id>/remaining-time', methods=['GET'])
def job_remaining_time(job_id):
job = PrintJob.query.get_or_404(job_id)
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
remaining = job.remaining_time()
return jsonify({'remaining_minutes': remaining})
@ -561,21 +860,26 @@ def job_remaining_time(job_id):
@login_required
@admin_required
def get_users():
users = User.query.all()
users = User.get_all()
return jsonify([user.to_dict() for user in users])
@app.route('/api/users/<user_id>', methods=['GET'])
@login_required
@admin_required
def get_user(user_id):
user = User.query.get_or_404(user_id)
user = User.get_by_id(user_id)
if not user:
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
return jsonify(user.to_dict())
@app.route('/api/users/<user_id>', methods=['PUT'])
@login_required
@admin_required
def update_user(user_id):
user = User.query.get_or_404(user_id)
user = User.get_by_id(user_id)
if not user:
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
data = request.get_json()
if data.get('username'):
@ -587,47 +891,55 @@ def update_user(user_id):
if data.get('role'):
user.role = data['role']
db.session.commit()
user.update()
return jsonify(user.to_dict())
@app.route('/api/users/<user_id>', methods=['DELETE'])
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
user = User.get_by_id(user_id)
if not user:
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
# Löschen aller Sessions des Benutzers
Session.query.filter_by(user_id=user.id).delete()
Session.delete_by_user_id(user.id)
db.session.delete(user)
db.session.commit()
user.delete()
return jsonify({'message': 'Benutzer gelöscht!'})
@app.route('/api/stats', methods=['GET'])
@login_required
@admin_required
def stats():
db = get_db()
# Drucker-Nutzungsstatistiken
total_printers = Printer.query.count()
available_printers = Printer.query.filter_by(status=0).count()
total_printers = db.execute('SELECT COUNT(*) as count FROM printer').fetchone()['count']
available_printers = db.execute('SELECT COUNT(*) as count FROM printer WHERE status = 0').fetchone()['count']
# Job-Statistiken
total_jobs = PrintJob.query.count()
active_jobs = PrintJob.query.filter(
PrintJob.aborted == False,
PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) > datetime.datetime.utcnow()
).count()
completed_jobs = PrintJob.query.filter(
PrintJob.aborted == False,
PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) <= datetime.datetime.utcnow()
).count()
total_jobs = db.execute('SELECT COUNT(*) as count FROM print_job').fetchone()['count']
now = datetime.datetime.utcnow().isoformat()
active_jobs = db.execute('''
SELECT COUNT(*) as count FROM print_job
WHERE aborted = 0
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') > datetime(?)
''', (now,)).fetchone()['count']
completed_jobs = db.execute('''
SELECT COUNT(*) as count FROM print_job
WHERE aborted = 0
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') <= datetime(?)
''', (now,)).fetchone()['count']
# Benutzerstatistiken
total_users = User.query.count()
total_users = db.execute('SELECT COUNT(*) as count FROM user').fetchone()['count']
# Durchschnittliche Druckdauer
avg_duration_result = db.session.query(db.func.avg(PrintJob.duration_in_minutes)).first()
avg_duration = int(avg_duration_result[0]) if avg_duration_result[0] else 0
avg_duration_result = db.execute('SELECT AVG(duration_in_minutes) as avg FROM print_job').fetchone()
avg_duration = int(avg_duration_result['avg']) if avg_duration_result['avg'] else 0
return jsonify({
'printers': {
@ -650,32 +962,26 @@ def stats():
@app.cli.command("check-jobs")
def check_jobs():
"""Überprüft abgelaufene Jobs und schaltet Steckdosen aus."""
now = datetime.datetime.utcnow()
# Abgelaufene Jobs finden
expired_jobs = PrintJob.query.filter(
PrintJob.aborted == False,
PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) <= now
).all()
for job in expired_jobs:
printer = job.printer
with app.app_context():
expired_jobs = PrintJob.get_expired_jobs()
# Drucker-Status auf verfügbar setzen
if printer.status == 1: # busy
printer.status = 0 # available
app.logger.info(f"Job {job.id} abgelaufen. Drucker {printer.id} auf verfügbar gesetzt.")
for job in expired_jobs:
printer = Printer.get_by_id(job.printer_id)
if printer and printer.status == 1: # busy
printer.status = 0 # available
printer.update()
app.logger.info(f"Job {job.id} abgelaufen. Drucker {printer.id} auf verfügbar gesetzt.")
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer and printer.ip_address:
try:
asyncio.run(tapo_control.turn_off(printer.ip_address))
app.logger.info(f"Steckdose {printer.ip_address} für abgelaufenen Job {job.id} ausgeschaltet.")
except Exception as e:
app.logger.error(f"Fehler beim Ausschalten der Steckdose {printer.ip_address}: {e}")
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer.ip_address:
try:
asyncio.run(tapo_control.turn_off(printer.ip_address))
app.logger.info(f"Steckdose {printer.ip_address} für abgelaufenen Job {job.id} ausgeschaltet.")
except Exception as e:
app.logger.error(f"Fehler beim Ausschalten der Steckdose {printer.ip_address}: {e}")
db.session.commit()
app.logger.info(f"{len(expired_jobs)} abgelaufene Jobs überprüft und Steckdosen aktualisiert.")
app.logger.info(f"{len(expired_jobs)} abgelaufene Jobs überprüft und Steckdosen aktualisiert.")
@app.route('/api/test', methods=['GET'])
def test():
@ -683,7 +989,8 @@ def test():
@app.route('/api/create-initial-admin', methods=['POST'])
def create_initial_admin():
admin_exists = User.query.filter_by(role='admin').first() is not None
db = get_db()
admin_exists = db.execute("SELECT 1 FROM user WHERE role = 'admin' LIMIT 1").fetchone() is not None
if admin_exists:
return jsonify({'message': 'Es existiert bereits ein Administrator!'}), 400
@ -705,9 +1012,8 @@ def create_initial_admin():
role='admin'
)
user.set_password(password)
user.save()
db.session.add(user)
db.session.commit()
app.logger.info(f'Initialer Admin-Benutzer erstellt: {username}')
return jsonify({
@ -745,10 +1051,9 @@ def register_page():
def logout_page():
session_id = flask_session.get('session_id')
if session_id:
session = Session.query.get(session_id)
session = Session.get_by_id(session_id)
if session:
db.session.delete(session)
db.session.commit()
session.delete()
flask_session.pop('session_id', None)

View File

@ -6,16 +6,10 @@
echo "=== MYP Datenbank Initialisierung ==="
echo ""
# Aktiviere virtuelle Umgebung, falls vorhanden
if [ -d "venv" ]; then
echo "Aktiviere virtuelle Python-Umgebung..."
source venv/bin/activate
fi
# Prüfe, ob Flask installiert ist
if ! python -c "import flask" &> /dev/null; then
echo "FEHLER: Flask ist nicht installiert."
echo "Bitte führe zuerst 'pip install -r requirements.txt' aus."
# Prüfe, ob sqlite3 installiert ist
if ! command -v sqlite3 &> /dev/null; then
echo "FEHLER: sqlite3 ist nicht installiert."
echo "Bitte installiere sqlite3 mit deinem Paketmanager, z.B. 'apt install sqlite3'"
exit 1
fi
@ -23,12 +17,58 @@ fi
echo "Erstelle instance-Ordner, falls nicht vorhanden..."
mkdir -p instance/backups
# Initialisiere die Datenbank
echo ""
echo "Initialisiere die Datenbank..."
FLASK_APP=app.py flask db init
FLASK_APP=app.py flask db migrate -m "Initiale Datenbank-Erstellung"
FLASK_APP=app.py flask db upgrade
# Prüfen, ob die Datenbank bereits existiert
if [ -f "instance/myp.db" ]; then
echo "Datenbank existiert bereits."
echo "Erstelle Backup in instance/backups..."
cp instance/myp.db "instance/backups/myp_$(date '+%Y%m%d_%H%M%S').db"
fi
# Erstelle die Datenbank und ihre Tabellen
echo "Erstelle neue Datenbank..."
sqlite3 instance/myp.db <<EOF
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS user (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT,
email TEXT UNIQUE,
role TEXT DEFAULT 'user'
);
CREATE TABLE IF NOT EXISTS session (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS printer (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT NOT NULL,
status INTEGER DEFAULT 0,
ip_address TEXT
);
CREATE TABLE IF NOT EXISTS print_job (
id TEXT PRIMARY KEY,
printer_id TEXT NOT NULL,
user_id TEXT NOT NULL,
start_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
duration_in_minutes INTEGER NOT NULL,
comments TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
FOREIGN KEY (printer_id) REFERENCES printer (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
);
EOF
# Setze Berechtigungen für die Datenbankdatei
chmod 644 instance/myp.db
echo ""
echo "=== Datenbank-Initialisierung abgeschlossen ==="
@ -38,4 +78,7 @@ echo "Der erste registrierte Benutzer wird automatisch zum Admin."
echo ""
echo "Starte den Server mit:"
echo "python app.py"
echo ""
echo "Alternativ kannst du einen Admin-Benutzer über die API erstellen mit:"
echo "curl -X POST http://localhost:5000/api/create-initial-admin -H \"Content-Type: application/json\" -d '{\"username\":\"admin\",\"password\":\"password\",\"displayName\":\"Administrator\"}'"
echo ""

View File

@ -16,6 +16,13 @@ if [[ "$(printf '%s\n' "$required_version" "$python_version" | sort -V | head -n
exit 1
fi
# Prüfe, ob sqlite3 installiert ist
if ! command -v sqlite3 &> /dev/null; then
echo "FEHLER: sqlite3 ist nicht installiert."
echo "Bitte installiere sqlite3 mit deinem Paketmanager, z.B. 'apt install sqlite3'"
exit 1
fi
# Erstelle virtuelle Umgebung
echo ""
echo "Erstelle virtuelle Python-Umgebung..."
@ -34,7 +41,7 @@ echo "Erstelle .env-Datei..."
if [ ! -f .env ]; then
cp .env.example .env
echo "Die .env-Datei wurde aus der Beispieldatei erstellt."
echo "Bitte passe die Konfiguration für GitHub OAuth und Tapo-Steckdosen an."
echo "Bitte passe die Konfiguration an, falls nötig."
else
echo ".env-Datei existiert bereits."
fi
@ -44,24 +51,17 @@ echo ""
echo "Erstelle logs-Ordner..."
mkdir -p logs
# Erstelle Instance und Backup Ordner
echo ""
echo "Erstelle instance-Ordner..."
mkdir -p instance/backups
# Initialisiere die Datenbank
echo ""
echo "Initialisiere die Datenbank..."
FLASK_APP=app.py flask db init
FLASK_APP=app.py flask db migrate -m "Initiale Datenbank-Erstellung"
FLASK_APP=app.py flask db upgrade
bash initialize_myp_database.sh
echo ""
echo "=== Installation abgeschlossen ==="
echo ""
echo "Wichtige Schritte vor dem Start:"
echo "1. Konfiguriere die .env-Datei mit deinen GitHub OAuth-Credentials"
echo "2. Konfiguriere die Tapo-Steckdosen-Zugangsdaten in der .env-Datei"
echo "1. Passe die Konfigurationen in der .env-Datei an"
echo "2. Konfiguriere die Tapo-Steckdosen-Zugangsdaten in der .env-Datei (optional)"
echo "3. Passe die crontab-example an und installiere den Cron-Job (optional)"
echo ""
echo "Starte den Server mit:"

View File

@ -1,7 +1,5 @@
flask==2.3.3
flask-cors==4.0.0
flask-sqlalchemy==3.1.1
flask-migrate==4.0.5
pyjwt==2.8.0
python-dotenv==1.0.0
werkzeug==2.3.7