root ea4b903d63 Ersetze tapo durch PyP100
- Ersetzt tapo==0.8.1 mit PyP100==0.0.19 in requirements.txt
- Ändert Import von ApiClient zu PyP100
- Implementiert TapoControl Klasse neu mit PyP100 API
- Entfernt async/await-Aufrufe, da PyP100 synchrone API benutzt

🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-03-12 10:48:23 +01:00

1095 lines
35 KiB
Python
Executable File

from flask import Flask, request, jsonify, g, redirect, url_for, session as flask_session, render_template, flash
from flask_cors import CORS
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
import jwt
import datetime
import os
import json
import logging
import uuid
import asyncio
import sqlite3
from logging.handlers import RotatingFileHandler
from datetime import timedelta
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from PyP100 import PyP100
from dotenv import load_dotenv
# Lade Umgebungsvariablen
load_dotenv()
# Initialisierung
app = Flask(__name__)
CORS(app, supports_credentials=True)
# Konfiguration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key')
app.config['DATABASE'] = os.environ.get('DATABASE_PATH', 'instance/myp.db')
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SECURE'] = os.environ.get('FLASK_ENV') == 'production'
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
# Tapo-Konfiguration
TAPO_USERNAME = os.environ.get('TAPO_USERNAME')
TAPO_PASSWORD = os.environ.get('TAPO_PASSWORD')
TAPO_DEVICES = json.loads(os.environ.get('TAPO_DEVICES', '{}'))
# Logging
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = RotatingFileHandler('logs/myp.log', maxBytes=10240, backupCount=10)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('MYP Backend starting up')
# Database functions
def get_db():
if 'db' not in g:
# Stelle sicher, dass das instance-Verzeichnis existiert
os.makedirs(os.path.dirname(app.config['DATABASE']), exist_ok=True)
g.db = sqlite3.connect(app.config['DATABASE'])
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
db = g.pop('db', None)
if db is not None:
db.close()
def init_db():
"""Initialisiere die Datenbank, falls sie noch nicht existiert."""
db = get_db()
db.execute('PRAGMA foreign_keys = ON') # SQLite-Fremdschlüsselunterstützung aktivieren
# Tabellen erstellen
db.executescript('''
CREATE TABLE IF NOT EXISTS user (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT,
email TEXT UNIQUE,
role TEXT DEFAULT 'user'
);
CREATE TABLE IF NOT EXISTS session (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS printer (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT NOT NULL,
status INTEGER DEFAULT 0,
ip_address TEXT
);
CREATE TABLE IF NOT EXISTS print_job (
id TEXT PRIMARY KEY,
printer_id TEXT NOT NULL,
user_id TEXT NOT NULL,
start_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
duration_in_minutes INTEGER NOT NULL,
comments TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
FOREIGN KEY (printer_id) REFERENCES printer (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
);
''')
db.commit()
# Initialisiere die Datenbank beim Starten der Anwendung
with app.app_context():
init_db()
app.teardown_appcontext(close_db)
class User:
@staticmethod
def get_by_id(user_id):
db = get_db()
row = db.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
if row:
return User.from_row(row)
return None
@staticmethod
def get_by_username(username):
db = get_db()
row = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if row:
return User.from_row(row)
return None
@staticmethod
def get_all():
db = get_db()
rows = db.execute('SELECT * FROM user').fetchall()
return [User.from_row(row) for row in rows]
@staticmethod
def from_row(row):
user = User()
user.id = row['id']
user.username = row['username']
user.password_hash = row['password_hash']
user.display_name = row['display_name']
user.email = row['email']
user.role = row['role']
return user
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.username = kwargs.get('username')
self.password_hash = kwargs.get('password_hash')
self.display_name = kwargs.get('display_name')
self.email = kwargs.get('email')
self.role = kwargs.get('role', 'user')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def save(self):
db = get_db()
db.execute(
'INSERT INTO user (id, username, password_hash, display_name, email, role) VALUES (?, ?, ?, ?, ?, ?)',
(self.id, self.username, self.password_hash, self.display_name, self.email, self.role)
)
db.commit()
return self
def update(self):
db = get_db()
db.execute(
'UPDATE user SET username = ?, password_hash = ?, display_name = ?, email = ?, role = ? WHERE id = ?',
(self.username, self.password_hash, self.display_name, self.email, self.role, self.id)
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM user WHERE id = ?', (self.id,))
db.commit()
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'displayName': self.display_name,
'email': self.email,
'role': self.role
}
class Session:
@staticmethod
def get_by_id(session_id):
db = get_db()
row = db.execute('SELECT * FROM session WHERE id = ?', (session_id,)).fetchone()
if row:
return Session.from_row(row)
return None
@staticmethod
def delete_by_user_id(user_id):
db = get_db()
db.execute('DELETE FROM session WHERE user_id = ?', (user_id,))
db.commit()
@staticmethod
def from_row(row):
session = Session()
session.id = row['id']
session.user_id = row['user_id']
session.expires_at = datetime.datetime.fromisoformat(row['expires_at'])
return session
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.user_id = kwargs.get('user_id')
self.expires_at = kwargs.get('expires_at')
def save(self):
db = get_db()
db.execute(
'INSERT INTO session (id, user_id, expires_at) VALUES (?, ?, ?)',
(self.id, self.user_id, self.expires_at.isoformat())
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM session WHERE id = ?', (self.id,))
db.commit()
@property
def user(self):
return User.get_by_id(self.user_id)
class Printer:
@staticmethod
def get_by_id(printer_id):
db = get_db()
row = db.execute('SELECT * FROM printer WHERE id = ?', (printer_id,)).fetchone()
if row:
return Printer.from_row(row)
return None
@staticmethod
def get_all():
db = get_db()
rows = db.execute('SELECT * FROM printer').fetchall()
return [Printer.from_row(row) for row in rows]
@staticmethod
def from_row(row):
printer = Printer()
printer.id = row['id']
printer.name = row['name']
printer.description = row['description']
printer.status = row['status']
printer.ip_address = row['ip_address']
return printer
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.name = kwargs.get('name')
self.description = kwargs.get('description')
self.status = kwargs.get('status', 0)
self.ip_address = kwargs.get('ip_address')
def save(self):
db = get_db()
db.execute(
'INSERT INTO printer (id, name, description, status, ip_address) VALUES (?, ?, ?, ?, ?)',
(self.id, self.name, self.description, self.status, self.ip_address)
)
db.commit()
return self
def update(self):
db = get_db()
db.execute(
'UPDATE printer SET name = ?, description = ?, status = ?, ip_address = ? WHERE id = ?',
(self.name, self.description, self.status, self.ip_address, self.id)
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM printer WHERE id = ?', (self.id,))
db.commit()
def get_latest_job(self):
db = get_db()
row = db.execute('''
SELECT * FROM print_job
WHERE printer_id = ?
ORDER BY start_at DESC
LIMIT 1
''', (self.id,)).fetchone()
if row:
return PrintJob.from_row(row)
return None
def to_dict(self):
latest_job = self.get_latest_job()
return {
'id': self.id,
'name': self.name,
'description': self.description,
'status': self.status,
'latestJob': latest_job.to_dict() if latest_job else None
}
class PrintJob:
@staticmethod
def get_by_id(job_id):
db = get_db()
row = db.execute('SELECT * FROM print_job WHERE id = ?', (job_id,)).fetchone()
if row:
return PrintJob.from_row(row)
return None
@staticmethod
def get_by_user_id(user_id):
db = get_db()
rows = db.execute('SELECT * FROM print_job WHERE user_id = ?', (user_id,)).fetchall()
return [PrintJob.from_row(row) for row in rows]
@staticmethod
def get_all():
db = get_db()
rows = db.execute('SELECT * FROM print_job').fetchall()
return [PrintJob.from_row(row) for row in rows]
@staticmethod
def get_expired_jobs():
db = get_db()
now = datetime.datetime.utcnow().isoformat()
rows = db.execute('''
SELECT * FROM print_job
WHERE aborted = 0
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') <= datetime(?)
''', (now,)).fetchall()
return [PrintJob.from_row(row) for row in rows]
@staticmethod
def from_row(row):
job = PrintJob()
job.id = row['id']
job.printer_id = row['printer_id']
job.user_id = row['user_id']
job.start_at = datetime.datetime.fromisoformat(row['start_at'])
job.duration_in_minutes = row['duration_in_minutes']
job.comments = row['comments']
job.aborted = bool(row['aborted'])
job.abort_reason = row['abort_reason']
return job
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.printer_id = kwargs.get('printer_id')
self.user_id = kwargs.get('user_id')
self.start_at = kwargs.get('start_at', datetime.datetime.utcnow())
self.duration_in_minutes = kwargs.get('duration_in_minutes')
self.comments = kwargs.get('comments')
self.aborted = kwargs.get('aborted', False)
self.abort_reason = kwargs.get('abort_reason')
def save(self):
db = get_db()
db.execute(
'''INSERT INTO print_job
(id, printer_id, user_id, start_at, duration_in_minutes, comments, aborted, abort_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(self.id, self.printer_id, self.user_id, self.start_at.isoformat(),
self.duration_in_minutes, self.comments, int(self.aborted), self.abort_reason)
)
db.commit()
return self
def update(self):
db = get_db()
db.execute(
'''UPDATE print_job SET
printer_id = ?, user_id = ?, start_at = ?, duration_in_minutes = ?,
comments = ?, aborted = ?, abort_reason = ?
WHERE id = ?''',
(self.printer_id, self.user_id, self.start_at.isoformat(), self.duration_in_minutes,
self.comments, int(self.aborted), self.abort_reason, self.id)
)
db.commit()
return self
def delete(self):
db = get_db()
db.execute('DELETE FROM print_job WHERE id = ?', (self.id,))
db.commit()
@property
def end_at(self):
return self.start_at + timedelta(minutes=self.duration_in_minutes)
def remaining_time(self):
if self.aborted:
return 0
now = datetime.datetime.utcnow()
if now > self.end_at:
return 0
diff = self.end_at - now
return int(diff.total_seconds() / 60)
def to_dict(self):
return {
'id': self.id,
'printerId': self.printer_id,
'userId': self.user_id,
'startAt': self.start_at.isoformat(),
'durationInMinutes': self.duration_in_minutes,
'comments': self.comments,
'aborted': self.aborted,
'abortReason': self.abort_reason,
'remainingMinutes': self.remaining_time()
}
# TP-Link Steckdosen-Steuerung mit PyP100
class TapoControl:
def __init__(self, username, password):
self.username = username
self.password = password
self.devices = {}
def get_device(self, ip_address):
if ip_address not in self.devices:
try:
device = PyP100.P100(ip_address, self.username, self.password)
device.handshake() # Erstellt die erforderlichen Cookies
device.login() # Sendet Anmeldedaten und erstellt AES-Schlüssel
self.devices[ip_address] = device
app.logger.info(f"PyP100 Verbindung zu {ip_address} hergestellt")
except Exception as e:
app.logger.error(f"Fehler bei der Anmeldung an P100-Gerät {ip_address}: {e}")
return None
return self.devices[ip_address]
def turn_on(self, ip_address):
device = self.get_device(ip_address)
if device:
try:
device.turnOn()
app.logger.info(f"P100-Steckdose {ip_address} eingeschaltet")
return True
except Exception as e:
app.logger.error(f"Fehler beim Einschalten der P100-Steckdose {ip_address}: {e}")
return False
def turn_off(self, ip_address):
device = self.get_device(ip_address)
if device:
try:
device.turnOff()
app.logger.info(f"P100-Steckdose {ip_address} ausgeschaltet")
return True
except Exception as e:
app.logger.error(f"Fehler beim Ausschalten der P100-Steckdose {ip_address}: {e}")
return False
tapo_control = TapoControl(TAPO_USERNAME, TAPO_PASSWORD)
# Hilfsfunktionen
def get_current_user():
session_id = flask_session.get('session_id')
if not session_id:
return None
session = Session.get_by_id(session_id)
if not session or session.expires_at < datetime.datetime.utcnow():
if session:
session.delete()
flask_session.pop('session_id', None)
return None
return session.user
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'message': 'Authentifizierung erforderlich!'}), 401
g.current_user = user
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not g.current_user or g.current_user.role != 'admin':
return jsonify({'message': 'Admin-Rechte erforderlich!'}), 403
return f(*args, **kwargs)
return decorated
def create_session(user):
session_id = str(uuid.uuid4())
expires_at = datetime.datetime.utcnow() + timedelta(days=7)
session = Session(id=session_id, user_id=user.id, expires_at=expires_at)
session.save()
flask_session['session_id'] = session_id
flask_session.permanent = True
return session_id
# Authentifizierungs-Routen
@app.route('/auth/register', methods=['POST'])
def register():
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
username = data.get('username')
password = data.get('password')
display_name = data.get('displayName', username)
email = data.get('email', '')
if User.get_by_username(username):
return jsonify({'message': 'Benutzername bereits vergeben!'}), 400
# Prüfen, ob es bereits einen Admin gibt
db = get_db()
admin_exists = db.execute("SELECT 1 FROM user WHERE role = 'admin' LIMIT 1").fetchone() is not None
# Falls kein Admin existiert, wird der erste Benutzer zum Admin
role = 'admin' if not admin_exists else 'user'
user = User(
username=username,
display_name=display_name,
email=email,
role=role
)
user.set_password(password)
user.save()
app.logger.info(f'Neuer Benutzer registriert: {username} (Rolle: {role})')
# Session erstellen
create_session(user)
return jsonify({
'message': 'Registrierung erfolgreich!',
'user': user.to_dict()
}), 201
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
username = data.get('username')
password = data.get('password')
user = User.get_by_username(username)
if not user or not user.check_password(password):
return jsonify({'message': 'Ungültiger Benutzername oder Passwort!'}), 401
# Session erstellen
create_session(user)
return jsonify({
'message': 'Anmeldung erfolgreich!',
'user': user.to_dict()
})
@app.route('/auth/logout', methods=['POST'])
def logout():
session_id = flask_session.get('session_id')
if session_id:
session = Session.get_by_id(session_id)
if session:
session.delete()
flask_session.pop('session_id', None)
return jsonify({'message': 'Erfolgreich abgemeldet!'}), 200
# API-Routen
@app.route('/api/me', methods=['GET'])
def get_me():
user = get_current_user()
if not user:
return jsonify({'authenticated': False}), 401
return jsonify({
'authenticated': True,
'user': user.to_dict()
})
@app.route('/api/printers', methods=['GET'])
def get_printers():
printers = Printer.get_all()
return jsonify([printer.to_dict() for printer in printers])
@app.route('/api/printers', methods=['POST'])
@login_required
@admin_required
def create_printer():
data = request.get_json()
if not data or not data.get('name') or not data.get('description'):
return jsonify({'message': 'Name und Beschreibung sind erforderlich!'}), 400
printer = Printer(
name=data.get('name'),
description=data.get('description'),
status=data.get('status', 0),
ip_address=data.get('ipAddress')
)
printer.save()
return jsonify(printer.to_dict()), 201
@app.route('/api/printers/<printer_id>', methods=['GET'])
def get_printer(printer_id):
printer = Printer.get_by_id(printer_id)
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
return jsonify(printer.to_dict())
@app.route('/api/printers/<printer_id>', methods=['PUT'])
@login_required
@admin_required
def update_printer(printer_id):
printer = Printer.get_by_id(printer_id)
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
data = request.get_json()
if data.get('name'):
printer.name = data['name']
if data.get('description'):
printer.description = data['description']
if 'status' in data:
printer.status = data['status']
if data.get('ipAddress'):
printer.ip_address = data['ipAddress']
printer.update()
return jsonify(printer.to_dict())
@app.route('/api/printers/<printer_id>', methods=['DELETE'])
@login_required
@admin_required
def delete_printer(printer_id):
printer = Printer.get_by_id(printer_id)
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
printer.delete()
return jsonify({'message': 'Drucker gelöscht!'})
@app.route('/api/jobs', methods=['GET'])
@login_required
def get_jobs():
# Admins sehen alle Jobs, normale Benutzer nur ihre eigenen
if g.current_user.role == 'admin':
jobs = PrintJob.get_all()
else:
jobs = PrintJob.get_by_user_id(g.current_user.id)
return jsonify([job.to_dict() for job in jobs])
@app.route('/api/jobs', methods=['POST'])
@login_required
def create_job():
data = request.get_json()
if not data or not data.get('printerId') or not data.get('durationInMinutes'):
return jsonify({'message': 'Drucker-ID und Dauer sind erforderlich!'}), 400
printer = Printer.get_by_id(data['printerId'])
if not printer:
return jsonify({'message': 'Drucker nicht gefunden!'}), 404
if printer.status != 0: # 0 = available
return jsonify({'message': 'Drucker ist nicht verfügbar!'}), 400
duration = int(data['durationInMinutes'])
job = PrintJob(
printer_id=printer.id,
user_id=g.current_user.id,
duration_in_minutes=duration,
comments=data.get('comments', '')
)
# Drucker als belegt markieren
printer.status = 1 # 1 = busy
printer.update()
job.save()
# Steckdose einschalten, falls IP-Adresse hinterlegt ist
if printer.ip_address:
try:
tapo_control.turn_on(printer.ip_address)
except Exception as e:
app.logger.error(f"Fehler beim Einschalten der Steckdose: {e}")
return jsonify(job.to_dict()), 201
@app.route('/api/jobs/<job_id>', methods=['GET'])
@login_required
def get_job(job_id):
# Admins können alle Jobs sehen, Benutzer nur ihre eigenen
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
return jsonify(job.to_dict())
@app.route('/api/jobs/<job_id>/abort', methods=['POST'])
@login_required
def abort_job(job_id):
# Admins können alle Jobs abbrechen, Benutzer nur ihre eigenen
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
data = request.get_json()
job.aborted = True
job.abort_reason = data.get('reason', '')
job.update()
# Drucker wieder verfügbar machen
printer = Printer.get_by_id(job.printer_id)
if printer:
printer.status = 0 # 0 = available
printer.update()
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer and printer.ip_address:
try:
tapo_control.turn_off(printer.ip_address)
except Exception as e:
app.logger.error(f"Fehler beim Ausschalten der Steckdose: {e}")
return jsonify(job.to_dict())
@app.route('/api/jobs/<job_id>/finish', methods=['POST'])
@login_required
def finish_job(job_id):
# Admins können alle Jobs beenden, Benutzer nur ihre eigenen
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
# Aktuelle Zeit als Ende setzen
now = datetime.datetime.utcnow()
actual_duration = int((now - job.start_at).total_seconds() / 60)
job.duration_in_minutes = actual_duration
job.update()
# Drucker wieder verfügbar machen
printer = Printer.get_by_id(job.printer_id)
if printer:
printer.status = 0 # 0 = available
printer.update()
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer and printer.ip_address:
try:
tapo_control.turn_off(printer.ip_address)
except Exception as e:
app.logger.error(f"Fehler beim Ausschalten der Steckdose: {e}")
return jsonify(job.to_dict())
@app.route('/api/jobs/<job_id>/extend', methods=['POST'])
@login_required
def extend_job(job_id):
# Admins können alle Jobs verlängern, Benutzer nur ihre eigenen
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
data = request.get_json()
minutes = int(data.get('minutes', 0))
hours = int(data.get('hours', 0))
additional_minutes = minutes + (hours * 60)
if additional_minutes <= 0:
return jsonify({'message': 'Ungültige Verlängerungszeit!'}), 400
job.duration_in_minutes += additional_minutes
job.update()
return jsonify(job.to_dict())
@app.route('/api/jobs/<job_id>/comments', methods=['PUT'])
@login_required
def update_job_comments(job_id):
# Admins können alle Jobs bearbeiten, Benutzer nur ihre eigenen
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
if g.current_user.role != 'admin' and job.user_id != g.current_user.id:
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
data = request.get_json()
job.comments = data.get('comments', '')
job.update()
return jsonify(job.to_dict())
@app.route('/api/job/<job_id>/remaining-time', methods=['GET'])
def job_remaining_time(job_id):
job = PrintJob.get_by_id(job_id)
if not job:
return jsonify({'message': 'Job nicht gefunden!'}), 404
remaining = job.remaining_time()
return jsonify({'remaining_minutes': remaining})
@app.route('/api/users', methods=['GET'])
@login_required
@admin_required
def get_users():
users = User.get_all()
return jsonify([user.to_dict() for user in users])
@app.route('/api/users/<user_id>', methods=['GET'])
@login_required
@admin_required
def get_user(user_id):
user = User.get_by_id(user_id)
if not user:
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
return jsonify(user.to_dict())
@app.route('/api/users/<user_id>', methods=['PUT'])
@login_required
@admin_required
def update_user(user_id):
user = User.get_by_id(user_id)
if not user:
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
data = request.get_json()
if data.get('username'):
user.username = data['username']
if data.get('displayName'):
user.display_name = data['displayName']
if data.get('email'):
user.email = data['email']
if data.get('role'):
user.role = data['role']
user.update()
return jsonify(user.to_dict())
@app.route('/api/users/<user_id>', methods=['DELETE'])
@login_required
@admin_required
def delete_user(user_id):
user = User.get_by_id(user_id)
if not user:
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
# Löschen aller Sessions des Benutzers
Session.delete_by_user_id(user.id)
user.delete()
return jsonify({'message': 'Benutzer gelöscht!'})
@app.route('/api/stats', methods=['GET'])
@login_required
@admin_required
def stats():
db = get_db()
# Drucker-Nutzungsstatistiken
total_printers = db.execute('SELECT COUNT(*) as count FROM printer').fetchone()['count']
available_printers = db.execute('SELECT COUNT(*) as count FROM printer WHERE status = 0').fetchone()['count']
# Job-Statistiken
total_jobs = db.execute('SELECT COUNT(*) as count FROM print_job').fetchone()['count']
now = datetime.datetime.utcnow().isoformat()
active_jobs = db.execute('''
SELECT COUNT(*) as count FROM print_job
WHERE aborted = 0
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') > datetime(?)
''', (now,)).fetchone()['count']
completed_jobs = db.execute('''
SELECT COUNT(*) as count FROM print_job
WHERE aborted = 0
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') <= datetime(?)
''', (now,)).fetchone()['count']
# Benutzerstatistiken
total_users = db.execute('SELECT COUNT(*) as count FROM user').fetchone()['count']
# Durchschnittliche Druckdauer
avg_duration_result = db.execute('SELECT AVG(duration_in_minutes) as avg FROM print_job').fetchone()
avg_duration = int(avg_duration_result['avg']) if avg_duration_result['avg'] else 0
return jsonify({
'printers': {
'total': total_printers,
'available': available_printers,
'utilization_rate': (total_printers - available_printers) / total_printers if total_printers > 0 else 0
},
'jobs': {
'total': total_jobs,
'active': active_jobs,
'completed': completed_jobs,
'avg_duration': avg_duration
},
'users': {
'total': total_users
}
})
# Tägliche Überprüfung der Jobs und automatische Abschaltung der Steckdosen
@app.cli.command("check-jobs")
def check_jobs():
"""Überprüft abgelaufene Jobs und schaltet Steckdosen aus."""
with app.app_context():
expired_jobs = PrintJob.get_expired_jobs()
for job in expired_jobs:
printer = Printer.get_by_id(job.printer_id)
if printer and printer.status == 1: # busy
printer.status = 0 # available
printer.update()
app.logger.info(f"Job {job.id} abgelaufen. Drucker {printer.id} auf verfügbar gesetzt.")
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
if printer and printer.ip_address:
try:
tapo_control.turn_off(printer.ip_address)
app.logger.info(f"Steckdose {printer.ip_address} für abgelaufenen Job {job.id} ausgeschaltet.")
except Exception as e:
app.logger.error(f"Fehler beim Ausschalten der Steckdose {printer.ip_address}: {e}")
app.logger.info(f"{len(expired_jobs)} abgelaufene Jobs überprüft und Steckdosen aktualisiert.")
@app.route('/api/test', methods=['GET'])
def test():
return jsonify({'message': 'MYP Backend API funktioniert!'})
@app.route('/api/create-initial-admin', methods=['POST'])
def create_initial_admin():
db = get_db()
admin_exists = db.execute("SELECT 1 FROM user WHERE role = 'admin' LIMIT 1").fetchone() is not None
if admin_exists:
return jsonify({'message': 'Es existiert bereits ein Administrator!'}), 400
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
username = data.get('username')
password = data.get('password')
display_name = data.get('displayName', username)
email = data.get('email', '')
user = User(
username=username,
display_name=display_name,
email=email,
role='admin'
)
user.set_password(password)
user.save()
app.logger.info(f'Initialer Admin-Benutzer erstellt: {username}')
return jsonify({
'message': 'Administrator wurde erfolgreich erstellt!',
'user': user.to_dict()
}), 201
# Error Handler
@app.errorhandler(404)
def not_found(error):
return jsonify({'message': 'Nicht gefunden!'}), 404
@app.errorhandler(500)
def server_error(error):
app.logger.error(f'Serverfehler: {error}')
return jsonify({'message': 'Interner Serverfehler!'}), 500
# Web UI Routen
@app.route('/')
def index():
current_user = get_current_user()
if current_user:
return render_template('dashboard.html', current_user=current_user, active_page='home')
return redirect(url_for('login_page'))
@app.route('/login')
def login_page():
return render_template('login.html', active_page='login')
@app.route('/register')
def register_page():
return render_template('register.html', active_page='register')
@app.route('/logout')
def logout_page():
session_id = flask_session.get('session_id')
if session_id:
session = Session.get_by_id(session_id)
if session:
session.delete()
flask_session.pop('session_id', None)
flash('Sie wurden erfolgreich abgemeldet.', 'success')
return redirect(url_for('login_page'))
@app.route('/admin/printers')
def printers_page():
current_user = get_current_user()
if not current_user:
return redirect(url_for('login_page'))
return render_template('printers.html', current_user=current_user, active_page='printers')
@app.route('/admin/jobs')
def jobs_page():
current_user = get_current_user()
if not current_user:
return redirect(url_for('login_page'))
return render_template('jobs.html', current_user=current_user, active_page='jobs')
@app.route('/admin/users')
def users_page():
current_user = get_current_user()
if not current_user or current_user.role != 'admin':
flash('Sie haben keine Berechtigung, diese Seite zu besuchen.', 'danger')
return redirect(url_for('index'))
return render_template('users.html', current_user=current_user, active_page='users')
@app.route('/admin/stats')
def stats_page():
current_user = get_current_user()
if not current_user or current_user.role != 'admin':
flash('Sie haben keine Berechtigung, diese Seite zu besuchen.', 'danger')
return redirect(url_for('index'))
return render_template('stats.html', current_user=current_user, active_page='stats')
# Server starten
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')