Implementiere eine Weboberfläche zur Verwaltung von Druckern, Druckaufträgen und Benutzern mit folgenden Funktionen: - Login/Registrierungsseiten - Dashboard mit Überblick - Drucker-Verwaltung (Hinzufügen, Bearbeiten, Löschen) - Auftrags-Verwaltung (Erstellen, Abbrechen, Verlängern) - Benutzer-Verwaltung (nur Admin) - Statistik-Dashboard 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
790 lines
26 KiB
Python
Executable File
790 lines
26 KiB
Python
Executable File
from flask import Flask, request, jsonify, g, redirect, url_for, session as flask_session, render_template, flash
|
|
from flask_cors import CORS
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_migrate import Migrate
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from functools import wraps
|
|
import jwt
|
|
import datetime
|
|
import os
|
|
import json
|
|
import logging
|
|
import uuid
|
|
import asyncio
|
|
from logging.handlers import RotatingFileHandler
|
|
from datetime import timedelta
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from dataclasses import dataclass
|
|
import sqlite3
|
|
from tapo import ApiClient
|
|
from dotenv import load_dotenv
|
|
|
|
# Lade Umgebungsvariablen
|
|
load_dotenv()
|
|
|
|
# Initialisierung
|
|
app = Flask(__name__)
|
|
CORS(app, supports_credentials=True)
|
|
|
|
# Konfiguration
|
|
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key')
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///myp.db')
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SECURE'] = os.environ.get('FLASK_ENV') == 'production'
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
|
|
|
|
# Tapo-Konfiguration
|
|
TAPO_USERNAME = os.environ.get('TAPO_USERNAME')
|
|
TAPO_PASSWORD = os.environ.get('TAPO_PASSWORD')
|
|
TAPO_DEVICES = json.loads(os.environ.get('TAPO_DEVICES', '{}'))
|
|
|
|
# Logging
|
|
if not os.path.exists('logs'):
|
|
os.mkdir('logs')
|
|
file_handler = RotatingFileHandler('logs/myp.log', maxBytes=10240, backupCount=10)
|
|
file_handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
|
|
))
|
|
file_handler.setLevel(logging.INFO)
|
|
app.logger.addHandler(file_handler)
|
|
app.logger.setLevel(logging.INFO)
|
|
app.logger.info('MYP Backend starting up')
|
|
|
|
# DB Setup
|
|
db = SQLAlchemy(app)
|
|
migrate = Migrate(app, db)
|
|
|
|
# Lokale Authentifizierung statt OAuth
|
|
|
|
# Models
|
|
class User(db.Model):
|
|
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
username = db.Column(db.String(64), index=True, unique=True)
|
|
password_hash = db.Column(db.String(128))
|
|
display_name = db.Column(db.String(100))
|
|
email = db.Column(db.String(120), index=True, unique=True)
|
|
role = db.Column(db.String(20), default='user') # admin, user, guest
|
|
jobs = db.relationship('PrintJob', backref='user', lazy='dynamic')
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'username': self.username,
|
|
'displayName': self.display_name,
|
|
'email': self.email,
|
|
'role': self.role
|
|
}
|
|
|
|
class Session(db.Model):
|
|
id = db.Column(db.String(36), primary_key=True)
|
|
user_id = db.Column(db.String(36), db.ForeignKey('user.id'), nullable=False)
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
|
user = db.relationship('User', backref=db.backref('sessions', lazy=True))
|
|
|
|
class Printer(db.Model):
|
|
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
name = db.Column(db.String(64), index=True, nullable=False)
|
|
description = db.Column(db.Text, nullable=False)
|
|
status = db.Column(db.Integer, default=0) # 0=available, 1=busy, 2=maintenance
|
|
ip_address = db.Column(db.String(15), nullable=True) # IP-Adresse der Tapo-Steckdose
|
|
jobs = db.relationship('PrintJob', backref='printer', lazy='dynamic')
|
|
|
|
def get_latest_job(self):
|
|
return PrintJob.query.filter_by(printer_id=self.id).order_by(PrintJob.start_at.desc()).first()
|
|
|
|
def to_dict(self):
|
|
latest_job = self.get_latest_job()
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'description': self.description,
|
|
'status': self.status,
|
|
'latestJob': latest_job.to_dict() if latest_job else None
|
|
}
|
|
|
|
class PrintJob(db.Model):
|
|
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
printer_id = db.Column(db.String(36), db.ForeignKey('printer.id'), nullable=False)
|
|
user_id = db.Column(db.String(36), db.ForeignKey('user.id'), nullable=False)
|
|
start_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
|
duration_in_minutes = db.Column(db.Integer, nullable=False)
|
|
comments = db.Column(db.Text, nullable=True)
|
|
aborted = db.Column(db.Boolean, default=False)
|
|
abort_reason = db.Column(db.Text, nullable=True)
|
|
|
|
@property
|
|
def end_at(self):
|
|
return self.start_at + timedelta(minutes=self.duration_in_minutes)
|
|
|
|
def remaining_time(self):
|
|
if self.aborted:
|
|
return 0
|
|
|
|
now = datetime.datetime.utcnow()
|
|
if now > self.end_at:
|
|
return 0
|
|
|
|
diff = self.end_at - now
|
|
return int(diff.total_seconds() / 60)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'printerId': self.printer_id,
|
|
'userId': self.user_id,
|
|
'startAt': self.start_at.isoformat(),
|
|
'durationInMinutes': self.duration_in_minutes,
|
|
'comments': self.comments,
|
|
'aborted': self.aborted,
|
|
'abortReason': self.abort_reason,
|
|
'remainingMinutes': self.remaining_time()
|
|
}
|
|
|
|
# Tapo Steckdosen-Steuerung
|
|
class TapoControl:
|
|
def __init__(self, username, password):
|
|
self.username = username
|
|
self.password = password
|
|
self.clients = {}
|
|
|
|
async def get_client(self, ip_address):
|
|
if ip_address not in self.clients:
|
|
try:
|
|
client = ApiClient(self.username, self.password)
|
|
await client.login()
|
|
self.clients[ip_address] = client
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler bei der Anmeldung an Tapo-Gerät {ip_address}: {e}")
|
|
return None
|
|
return self.clients[ip_address]
|
|
|
|
async def turn_on(self, ip_address):
|
|
client = await self.get_client(ip_address)
|
|
if client:
|
|
try:
|
|
device = await client.p115(ip_address)
|
|
await device.on()
|
|
app.logger.info(f"Tapo-Steckdose {ip_address} eingeschaltet")
|
|
return True
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Einschalten der Tapo-Steckdose {ip_address}: {e}")
|
|
return False
|
|
|
|
async def turn_off(self, ip_address):
|
|
client = await self.get_client(ip_address)
|
|
if client:
|
|
try:
|
|
device = await client.p115(ip_address)
|
|
await device.off()
|
|
app.logger.info(f"Tapo-Steckdose {ip_address} ausgeschaltet")
|
|
return True
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Tapo-Steckdose {ip_address}: {e}")
|
|
return False
|
|
|
|
tapo_control = TapoControl(TAPO_USERNAME, TAPO_PASSWORD)
|
|
|
|
# Hilfsfunktionen
|
|
def get_current_user():
|
|
session_id = flask_session.get('session_id')
|
|
if not session_id:
|
|
return None
|
|
|
|
session = Session.query.get(session_id)
|
|
if not session or session.expires_at < datetime.datetime.utcnow():
|
|
if session:
|
|
db.session.delete(session)
|
|
db.session.commit()
|
|
flask_session.pop('session_id', None)
|
|
return None
|
|
|
|
return session.user
|
|
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
user = get_current_user()
|
|
if not user:
|
|
return jsonify({'message': 'Authentifizierung erforderlich!'}), 401
|
|
|
|
g.current_user = user
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not g.current_user or g.current_user.role != 'admin':
|
|
return jsonify({'message': 'Admin-Rechte erforderlich!'}), 403
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
def create_session(user):
|
|
session_id = str(uuid.uuid4())
|
|
expires_at = datetime.datetime.utcnow() + timedelta(days=7)
|
|
|
|
session = Session(id=session_id, user_id=user.id, expires_at=expires_at)
|
|
db.session.add(session)
|
|
db.session.commit()
|
|
|
|
flask_session['session_id'] = session_id
|
|
flask_session.permanent = True
|
|
|
|
return session_id
|
|
|
|
# Authentifizierungs-Routen
|
|
@app.route('/auth/register', methods=['POST'])
|
|
def register():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
display_name = data.get('displayName', username)
|
|
email = data.get('email', '')
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'message': 'Benutzername bereits vergeben!'}), 400
|
|
|
|
if email and User.query.filter_by(email=email).first():
|
|
return jsonify({'message': 'E-Mail-Adresse bereits registriert!'}), 400
|
|
|
|
# Prüfen, ob es bereits einen Admin gibt
|
|
admin_exists = User.query.filter_by(role='admin').first() is not None
|
|
|
|
# Falls kein Admin existiert, wird der erste Benutzer zum Admin
|
|
role = 'admin' if not admin_exists else 'user'
|
|
|
|
user = User(
|
|
username=username,
|
|
display_name=display_name,
|
|
email=email,
|
|
role=role
|
|
)
|
|
user.set_password(password)
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
app.logger.info(f'Neuer Benutzer registriert: {username} (Rolle: {role})')
|
|
|
|
# Session erstellen
|
|
create_session(user)
|
|
|
|
return jsonify({
|
|
'message': 'Registrierung erfolgreich!',
|
|
'user': user.to_dict()
|
|
}), 201
|
|
|
|
@app.route('/auth/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
|
|
if not user or not user.check_password(password):
|
|
return jsonify({'message': 'Ungültiger Benutzername oder Passwort!'}), 401
|
|
|
|
# Session erstellen
|
|
create_session(user)
|
|
|
|
return jsonify({
|
|
'message': 'Anmeldung erfolgreich!',
|
|
'user': user.to_dict()
|
|
})
|
|
|
|
@app.route('/auth/logout', methods=['POST'])
|
|
def logout():
|
|
session_id = flask_session.get('session_id')
|
|
if session_id:
|
|
session = Session.query.get(session_id)
|
|
if session:
|
|
db.session.delete(session)
|
|
db.session.commit()
|
|
|
|
flask_session.pop('session_id', None)
|
|
|
|
return jsonify({'message': 'Erfolgreich abgemeldet!'}), 200
|
|
|
|
# API-Routen
|
|
@app.route('/api/me', methods=['GET'])
|
|
def get_me():
|
|
user = get_current_user()
|
|
if not user:
|
|
return jsonify({'authenticated': False}), 401
|
|
|
|
return jsonify({
|
|
'authenticated': True,
|
|
'user': user.to_dict()
|
|
})
|
|
|
|
@app.route('/api/printers', methods=['GET'])
|
|
def get_printers():
|
|
printers = Printer.query.all()
|
|
return jsonify([printer.to_dict() for printer in printers])
|
|
|
|
@app.route('/api/printers', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def create_printer():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('name') or not data.get('description'):
|
|
return jsonify({'message': 'Name und Beschreibung sind erforderlich!'}), 400
|
|
|
|
printer = Printer(
|
|
name=data.get('name'),
|
|
description=data.get('description'),
|
|
status=data.get('status', 0),
|
|
ip_address=data.get('ipAddress')
|
|
)
|
|
|
|
db.session.add(printer)
|
|
db.session.commit()
|
|
|
|
return jsonify(printer.to_dict()), 201
|
|
|
|
@app.route('/api/printers/<printer_id>', methods=['GET'])
|
|
def get_printer(printer_id):
|
|
printer = Printer.query.get_or_404(printer_id)
|
|
return jsonify(printer.to_dict())
|
|
|
|
@app.route('/api/printers/<printer_id>', methods=['PUT'])
|
|
@login_required
|
|
@admin_required
|
|
def update_printer(printer_id):
|
|
printer = Printer.query.get_or_404(printer_id)
|
|
data = request.get_json()
|
|
|
|
if data.get('name'):
|
|
printer.name = data['name']
|
|
if data.get('description'):
|
|
printer.description = data['description']
|
|
if 'status' in data:
|
|
printer.status = data['status']
|
|
if data.get('ipAddress'):
|
|
printer.ip_address = data['ipAddress']
|
|
|
|
db.session.commit()
|
|
return jsonify(printer.to_dict())
|
|
|
|
@app.route('/api/printers/<printer_id>', methods=['DELETE'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_printer(printer_id):
|
|
printer = Printer.query.get_or_404(printer_id)
|
|
db.session.delete(printer)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Drucker gelöscht!'})
|
|
|
|
@app.route('/api/jobs', methods=['GET'])
|
|
@login_required
|
|
def get_jobs():
|
|
# Admins sehen alle Jobs, normale Benutzer nur ihre eigenen
|
|
if g.current_user.role == 'admin':
|
|
jobs = PrintJob.query.all()
|
|
else:
|
|
jobs = PrintJob.query.filter_by(user_id=g.current_user.id).all()
|
|
|
|
return jsonify([job.to_dict() for job in jobs])
|
|
|
|
@app.route('/api/jobs', methods=['POST'])
|
|
@login_required
|
|
def create_job():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('printerId') or not data.get('durationInMinutes'):
|
|
return jsonify({'message': 'Drucker-ID und Dauer sind erforderlich!'}), 400
|
|
|
|
printer = Printer.query.get_or_404(data['printerId'])
|
|
|
|
if printer.status != 0: # 0 = available
|
|
return jsonify({'message': 'Drucker ist nicht verfügbar!'}), 400
|
|
|
|
duration = int(data['durationInMinutes'])
|
|
|
|
job = PrintJob(
|
|
printer_id=printer.id,
|
|
user_id=g.current_user.id,
|
|
duration_in_minutes=duration,
|
|
comments=data.get('comments', '')
|
|
)
|
|
|
|
# Drucker als belegt markieren
|
|
printer.status = 1 # 1 = busy
|
|
|
|
db.session.add(job)
|
|
db.session.commit()
|
|
|
|
# Steckdose einschalten, falls IP-Adresse hinterlegt ist
|
|
if printer.ip_address:
|
|
try:
|
|
asyncio.run(tapo_control.turn_on(printer.ip_address))
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Einschalten der Steckdose: {e}")
|
|
|
|
return jsonify(job.to_dict()), 201
|
|
|
|
@app.route('/api/jobs/<job_id>', methods=['GET'])
|
|
@login_required
|
|
def get_job(job_id):
|
|
# Admins können alle Jobs sehen, Benutzer nur ihre eigenen
|
|
if g.current_user.role == 'admin':
|
|
job = PrintJob.query.get_or_404(job_id)
|
|
else:
|
|
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
|
|
|
|
return jsonify(job.to_dict())
|
|
|
|
@app.route('/api/jobs/<job_id>/abort', methods=['POST'])
|
|
@login_required
|
|
def abort_job(job_id):
|
|
# Admins können alle Jobs abbrechen, Benutzer nur ihre eigenen
|
|
if g.current_user.role == 'admin':
|
|
job = PrintJob.query.get_or_404(job_id)
|
|
else:
|
|
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
|
|
|
|
data = request.get_json()
|
|
|
|
job.aborted = True
|
|
job.abort_reason = data.get('reason', '')
|
|
|
|
# Drucker wieder verfügbar machen
|
|
printer = job.printer
|
|
printer.status = 0 # 0 = available
|
|
|
|
db.session.commit()
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if printer.ip_address:
|
|
try:
|
|
asyncio.run(tapo_control.turn_off(printer.ip_address))
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose: {e}")
|
|
|
|
return jsonify(job.to_dict())
|
|
|
|
@app.route('/api/jobs/<job_id>/finish', methods=['POST'])
|
|
@login_required
|
|
def finish_job(job_id):
|
|
# Admins können alle Jobs beenden, Benutzer nur ihre eigenen
|
|
if g.current_user.role == 'admin':
|
|
job = PrintJob.query.get_or_404(job_id)
|
|
else:
|
|
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
|
|
|
|
# Aktuelle Zeit als Ende setzen
|
|
now = datetime.datetime.utcnow()
|
|
actual_duration = int((now - job.start_at).total_seconds() / 60)
|
|
job.duration_in_minutes = actual_duration
|
|
|
|
# Drucker wieder verfügbar machen
|
|
printer = job.printer
|
|
printer.status = 0 # 0 = available
|
|
|
|
db.session.commit()
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if printer.ip_address:
|
|
try:
|
|
asyncio.run(tapo_control.turn_off(printer.ip_address))
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose: {e}")
|
|
|
|
return jsonify(job.to_dict())
|
|
|
|
@app.route('/api/jobs/<job_id>/extend', methods=['POST'])
|
|
@login_required
|
|
def extend_job(job_id):
|
|
# Admins können alle Jobs verlängern, Benutzer nur ihre eigenen
|
|
if g.current_user.role == 'admin':
|
|
job = PrintJob.query.get_or_404(job_id)
|
|
else:
|
|
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
|
|
|
|
data = request.get_json()
|
|
minutes = int(data.get('minutes', 0))
|
|
hours = int(data.get('hours', 0))
|
|
|
|
additional_minutes = minutes + (hours * 60)
|
|
if additional_minutes <= 0:
|
|
return jsonify({'message': 'Ungültige Verlängerungszeit!'}), 400
|
|
|
|
job.duration_in_minutes += additional_minutes
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify(job.to_dict())
|
|
|
|
@app.route('/api/jobs/<job_id>/comments', methods=['PUT'])
|
|
@login_required
|
|
def update_job_comments(job_id):
|
|
# Admins können alle Jobs bearbeiten, Benutzer nur ihre eigenen
|
|
if g.current_user.role == 'admin':
|
|
job = PrintJob.query.get_or_404(job_id)
|
|
else:
|
|
job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404()
|
|
|
|
data = request.get_json()
|
|
job.comments = data.get('comments', '')
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify(job.to_dict())
|
|
|
|
@app.route('/api/job/<job_id>/remaining-time', methods=['GET'])
|
|
def job_remaining_time(job_id):
|
|
job = PrintJob.query.get_or_404(job_id)
|
|
|
|
remaining = job.remaining_time()
|
|
return jsonify({'remaining_minutes': remaining})
|
|
|
|
@app.route('/api/users', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def get_users():
|
|
users = User.query.all()
|
|
return jsonify([user.to_dict() for user in users])
|
|
|
|
@app.route('/api/users/<user_id>', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def get_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
return jsonify(user.to_dict())
|
|
|
|
@app.route('/api/users/<user_id>', methods=['PUT'])
|
|
@login_required
|
|
@admin_required
|
|
def update_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
data = request.get_json()
|
|
|
|
if data.get('username'):
|
|
user.username = data['username']
|
|
if data.get('displayName'):
|
|
user.display_name = data['displayName']
|
|
if data.get('email'):
|
|
user.email = data['email']
|
|
if data.get('role'):
|
|
user.role = data['role']
|
|
|
|
db.session.commit()
|
|
return jsonify(user.to_dict())
|
|
|
|
@app.route('/api/users/<user_id>', methods=['DELETE'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Löschen aller Sessions des Benutzers
|
|
Session.query.filter_by(user_id=user.id).delete()
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Benutzer gelöscht!'})
|
|
|
|
@app.route('/api/stats', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def stats():
|
|
# Drucker-Nutzungsstatistiken
|
|
total_printers = Printer.query.count()
|
|
available_printers = Printer.query.filter_by(status=0).count()
|
|
|
|
# Job-Statistiken
|
|
total_jobs = PrintJob.query.count()
|
|
active_jobs = PrintJob.query.filter(
|
|
PrintJob.aborted == False,
|
|
PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) > datetime.datetime.utcnow()
|
|
).count()
|
|
completed_jobs = PrintJob.query.filter(
|
|
PrintJob.aborted == False,
|
|
PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) <= datetime.datetime.utcnow()
|
|
).count()
|
|
|
|
# Benutzerstatistiken
|
|
total_users = User.query.count()
|
|
|
|
# Durchschnittliche Druckdauer
|
|
avg_duration_result = db.session.query(db.func.avg(PrintJob.duration_in_minutes)).first()
|
|
avg_duration = int(avg_duration_result[0]) if avg_duration_result[0] else 0
|
|
|
|
return jsonify({
|
|
'printers': {
|
|
'total': total_printers,
|
|
'available': available_printers,
|
|
'utilization_rate': (total_printers - available_printers) / total_printers if total_printers > 0 else 0
|
|
},
|
|
'jobs': {
|
|
'total': total_jobs,
|
|
'active': active_jobs,
|
|
'completed': completed_jobs,
|
|
'avg_duration': avg_duration
|
|
},
|
|
'users': {
|
|
'total': total_users
|
|
}
|
|
})
|
|
|
|
# Tägliche Überprüfung der Jobs und automatische Abschaltung der Steckdosen
|
|
@app.cli.command("check-jobs")
|
|
def check_jobs():
|
|
"""Überprüft abgelaufene Jobs und schaltet Steckdosen aus."""
|
|
now = datetime.datetime.utcnow()
|
|
|
|
# Abgelaufene Jobs finden
|
|
expired_jobs = PrintJob.query.filter(
|
|
PrintJob.aborted == False,
|
|
PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) <= now
|
|
).all()
|
|
|
|
for job in expired_jobs:
|
|
printer = job.printer
|
|
|
|
# Drucker-Status auf verfügbar setzen
|
|
if printer.status == 1: # busy
|
|
printer.status = 0 # available
|
|
app.logger.info(f"Job {job.id} abgelaufen. Drucker {printer.id} auf verfügbar gesetzt.")
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if printer.ip_address:
|
|
try:
|
|
asyncio.run(tapo_control.turn_off(printer.ip_address))
|
|
app.logger.info(f"Steckdose {printer.ip_address} für abgelaufenen Job {job.id} ausgeschaltet.")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {printer.ip_address}: {e}")
|
|
|
|
db.session.commit()
|
|
app.logger.info(f"{len(expired_jobs)} abgelaufene Jobs überprüft und Steckdosen aktualisiert.")
|
|
|
|
@app.route('/api/test', methods=['GET'])
|
|
def test():
|
|
return jsonify({'message': 'MYP Backend API funktioniert!'})
|
|
|
|
@app.route('/api/create-initial-admin', methods=['POST'])
|
|
def create_initial_admin():
|
|
admin_exists = User.query.filter_by(role='admin').first() is not None
|
|
|
|
if admin_exists:
|
|
return jsonify({'message': 'Es existiert bereits ein Administrator!'}), 400
|
|
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
display_name = data.get('displayName', username)
|
|
email = data.get('email', '')
|
|
|
|
user = User(
|
|
username=username,
|
|
display_name=display_name,
|
|
email=email,
|
|
role='admin'
|
|
)
|
|
user.set_password(password)
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
app.logger.info(f'Initialer Admin-Benutzer erstellt: {username}')
|
|
|
|
return jsonify({
|
|
'message': 'Administrator wurde erfolgreich erstellt!',
|
|
'user': user.to_dict()
|
|
}), 201
|
|
|
|
# Error Handler
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return jsonify({'message': 'Nicht gefunden!'}), 404
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(error):
|
|
app.logger.error(f'Serverfehler: {error}')
|
|
return jsonify({'message': 'Interner Serverfehler!'}), 500
|
|
|
|
# Web UI Routen
|
|
@app.route('/')
|
|
def index():
|
|
current_user = get_current_user()
|
|
if current_user:
|
|
return render_template('dashboard.html', current_user=current_user, active_page='home')
|
|
return redirect(url_for('login_page'))
|
|
|
|
@app.route('/login')
|
|
def login_page():
|
|
return render_template('login.html', active_page='login')
|
|
|
|
@app.route('/register')
|
|
def register_page():
|
|
return render_template('register.html', active_page='register')
|
|
|
|
@app.route('/logout')
|
|
def logout_page():
|
|
session_id = flask_session.get('session_id')
|
|
if session_id:
|
|
session = Session.query.get(session_id)
|
|
if session:
|
|
db.session.delete(session)
|
|
db.session.commit()
|
|
|
|
flask_session.pop('session_id', None)
|
|
|
|
flash('Sie wurden erfolgreich abgemeldet.', 'success')
|
|
return redirect(url_for('login_page'))
|
|
|
|
@app.route('/admin/printers')
|
|
def printers_page():
|
|
current_user = get_current_user()
|
|
if not current_user:
|
|
return redirect(url_for('login_page'))
|
|
return render_template('printers.html', current_user=current_user, active_page='printers')
|
|
|
|
@app.route('/admin/jobs')
|
|
def jobs_page():
|
|
current_user = get_current_user()
|
|
if not current_user:
|
|
return redirect(url_for('login_page'))
|
|
return render_template('jobs.html', current_user=current_user, active_page='jobs')
|
|
|
|
@app.route('/admin/users')
|
|
def users_page():
|
|
current_user = get_current_user()
|
|
if not current_user or current_user.role != 'admin':
|
|
flash('Sie haben keine Berechtigung, diese Seite zu besuchen.', 'danger')
|
|
return redirect(url_for('index'))
|
|
return render_template('users.html', current_user=current_user, active_page='users')
|
|
|
|
@app.route('/admin/stats')
|
|
def stats_page():
|
|
current_user = get_current_user()
|
|
if not current_user or current_user.role != 'admin':
|
|
flash('Sie haben keine Berechtigung, diese Seite zu besuchen.', 'danger')
|
|
return redirect(url_for('index'))
|
|
return render_template('stats.html', current_user=current_user, active_page='stats')
|
|
|
|
# Server starten
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0') |