1217 lines
41 KiB
Python
Executable File
1217 lines
41 KiB
Python
Executable File
from flask import Flask, request, jsonify, g, redirect, url_for, session as flask_session, render_template, flash
|
|
from flask_cors import CORS
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from functools import wraps
|
|
import jwt
|
|
import datetime
|
|
import os
|
|
import json
|
|
import logging
|
|
import uuid
|
|
import sqlite3
|
|
from logging.handlers import RotatingFileHandler
|
|
from datetime import timedelta
|
|
from PyP100 import PyP100
|
|
from dotenv import load_dotenv
|
|
|
|
# Lade Umgebungsvariablen
|
|
load_dotenv()
|
|
|
|
# Initialisierung
|
|
app = Flask(__name__)
|
|
CORS(app, supports_credentials=True)
|
|
|
|
# Konfiguration
|
|
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key')
|
|
app.config['DATABASE'] = os.environ.get('DATABASE_PATH', 'instance/myp.db')
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SECURE'] = os.environ.get('FLASK_ENV') == 'production'
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
|
|
|
|
# Steckdosen-Konfiguration
|
|
TAPO_USERNAME = os.environ.get('TAPO_USERNAME')
|
|
TAPO_PASSWORD = os.environ.get('TAPO_PASSWORD')
|
|
# SOCKET_DEVICES Format: {"192.168.1.100": {"number": "1"}, "192.168.1.101": {"number": "2"}, ...}
|
|
SOCKET_DEVICES = json.loads(os.environ.get('SOCKET_DEVICES', '{}'))
|
|
|
|
# Logging
|
|
if not os.path.exists('logs'):
|
|
os.mkdir('logs')
|
|
file_handler = RotatingFileHandler('logs/myp.log', maxBytes=10240, backupCount=10)
|
|
file_handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
|
|
))
|
|
file_handler.setLevel(logging.INFO)
|
|
app.logger.addHandler(file_handler)
|
|
app.logger.setLevel(logging.INFO)
|
|
app.logger.info('MYP Backend starting up')
|
|
|
|
# Database functions
|
|
def get_db():
|
|
if 'db' not in g:
|
|
# Stelle sicher, dass das instance-Verzeichnis existiert
|
|
os.makedirs(os.path.dirname(app.config['DATABASE']), exist_ok=True)
|
|
g.db = sqlite3.connect(app.config['DATABASE'])
|
|
g.db.row_factory = sqlite3.Row
|
|
return g.db
|
|
|
|
def close_db(e=None):
|
|
db = g.pop('db', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
def init_db():
|
|
"""Initialisiere die Datenbank, falls sie noch nicht existiert."""
|
|
db = get_db()
|
|
db.execute('PRAGMA foreign_keys = ON') # SQLite-Fremdschlüsselunterstützung aktivieren
|
|
|
|
# Tabellen erstellen
|
|
db.executescript('''
|
|
CREATE TABLE IF NOT EXISTS user (
|
|
id TEXT PRIMARY KEY,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
display_name TEXT,
|
|
email TEXT UNIQUE,
|
|
role TEXT DEFAULT 'user'
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS session (
|
|
id TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
expires_at TIMESTAMP NOT NULL,
|
|
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS socket (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT NOT NULL,
|
|
status INTEGER DEFAULT 0,
|
|
ip_address TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS job (
|
|
id TEXT PRIMARY KEY,
|
|
socket_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
start_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
duration_in_minutes INTEGER NOT NULL,
|
|
comments TEXT,
|
|
aborted INTEGER DEFAULT 0,
|
|
abort_reason TEXT,
|
|
FOREIGN KEY (socket_id) REFERENCES socket (id) ON DELETE CASCADE,
|
|
FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
|
|
);
|
|
''')
|
|
|
|
db.commit()
|
|
|
|
# Initialisierung der Steckdosen
|
|
def init_sockets():
|
|
"""
|
|
Initialisiert die Steckdosen-Einträge in der Datenbank basierend auf SOCKET_DEVICES Umgebungsvariable.
|
|
Stellt sicher, dass alle Steckdosen zu Beginn ausgeschaltet sind.
|
|
"""
|
|
app.logger.info("Initialisiere Steckdosen aus Umgebungsvariablen")
|
|
db = get_db()
|
|
|
|
# Alle IP-Adressen aus der Datenbank abrufen
|
|
existing_ips = {row['ip_address']: row['id'] for row in db.execute('SELECT id, ip_address FROM socket').fetchall() if row['ip_address']}
|
|
|
|
for ip_address, device_config in SOCKET_DEVICES.items():
|
|
socket_number = device_config.get('number', '0')
|
|
name = f"Printer {socket_number}"
|
|
description = f"3D-Drucker mit SmartPlug (IP: {ip_address})"
|
|
|
|
if ip_address in existing_ips:
|
|
# Steckdose existiert bereits, nichts zu tun
|
|
app.logger.info(f"Steckdose mit IP {ip_address} existiert bereits in der Datenbank")
|
|
socket_id = existing_ips[ip_address]
|
|
else:
|
|
# Steckdose erstellen, wenn sie noch nicht existiert
|
|
socket = create_socket(name=name, description=description, ip_address=ip_address, status=0)
|
|
socket_id = socket['id']
|
|
app.logger.info(f"Neue Steckdose angelegt: {name} mit IP {ip_address}")
|
|
|
|
# Steckdose ausschalten, um sicherzustellen, dass alle Steckdosen im AUS-Zustand starten
|
|
try:
|
|
turn_off_socket(ip_address)
|
|
app.logger.info(f"Steckdose {ip_address} wurde beim Start ausgeschaltet")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {ip_address}: {e}")
|
|
|
|
# Initialisiere die Datenbank und Steckdosen beim Starten der Anwendung
|
|
with app.app_context():
|
|
init_db()
|
|
# Nur initialisieren, wenn Steckdosen konfiguriert sind
|
|
if SOCKET_DEVICES:
|
|
init_sockets()
|
|
|
|
app.teardown_appcontext(close_db)
|
|
|
|
# Benutzerverwaltung
|
|
def get_user_by_id(user_id):
|
|
db = get_db()
|
|
row = db.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
def get_user_by_username(username):
|
|
db = get_db()
|
|
row = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
def get_all_users():
|
|
db = get_db()
|
|
rows = db.execute('SELECT * FROM user').fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
def create_user(username, password, display_name=None, email=None, role='user'):
|
|
user_id = str(uuid.uuid4())
|
|
password_hash = generate_password_hash(password)
|
|
display_name = display_name or username
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
'INSERT INTO user (id, username, password_hash, display_name, email, role) VALUES (?, ?, ?, ?, ?, ?)',
|
|
(user_id, username, password_hash, display_name, email, role)
|
|
)
|
|
db.commit()
|
|
|
|
return get_user_by_id(user_id)
|
|
|
|
def update_user(user_id, username=None, password=None, display_name=None, email=None, role=None):
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return None
|
|
|
|
values = []
|
|
params = []
|
|
|
|
if username:
|
|
values.append('username = ?')
|
|
params.append(username)
|
|
|
|
if password:
|
|
values.append('password_hash = ?')
|
|
params.append(generate_password_hash(password))
|
|
|
|
if display_name:
|
|
values.append('display_name = ?')
|
|
params.append(display_name)
|
|
|
|
if email:
|
|
values.append('email = ?')
|
|
params.append(email)
|
|
|
|
if role:
|
|
values.append('role = ?')
|
|
params.append(role)
|
|
|
|
if not values:
|
|
return user
|
|
|
|
query = f'UPDATE user SET {", ".join(values)} WHERE id = ?'
|
|
params.append(user_id)
|
|
|
|
db = get_db()
|
|
db.execute(query, params)
|
|
db.commit()
|
|
|
|
return get_user_by_id(user_id)
|
|
|
|
def delete_user(user_id):
|
|
db = get_db()
|
|
db.execute('DELETE FROM user WHERE id = ?', (user_id,))
|
|
db.commit()
|
|
return True
|
|
|
|
def check_password(user_dict, password):
|
|
return check_password_hash(user_dict['password_hash'], password)
|
|
|
|
def user_to_dict(user):
|
|
if not user:
|
|
return None
|
|
return {
|
|
'id': user['id'],
|
|
'username': user['username'],
|
|
'displayName': user['display_name'],
|
|
'email': user['email'],
|
|
'role': user['role']
|
|
}
|
|
|
|
# Session-Verwaltung
|
|
def get_session_by_id(session_id):
|
|
db = get_db()
|
|
row = db.execute('SELECT * FROM session WHERE id = ?', (session_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
def delete_sessions_by_user(user_id):
|
|
db = get_db()
|
|
db.execute('DELETE FROM session WHERE user_id = ?', (user_id,))
|
|
db.commit()
|
|
|
|
def create_session(user_id):
|
|
session_id = str(uuid.uuid4())
|
|
expires_at = datetime.datetime.utcnow() + timedelta(days=7)
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
'INSERT INTO session (id, user_id, expires_at) VALUES (?, ?, ?)',
|
|
(session_id, user_id, expires_at.isoformat())
|
|
)
|
|
db.commit()
|
|
|
|
flask_session['session_id'] = session_id
|
|
flask_session.permanent = True
|
|
|
|
return session_id
|
|
|
|
def delete_session(session_id):
|
|
db = get_db()
|
|
db.execute('DELETE FROM session WHERE id = ?', (session_id,))
|
|
db.commit()
|
|
|
|
# Steckdosen-Verwaltung
|
|
def get_socket_by_id(socket_id):
|
|
db = get_db()
|
|
row = db.execute('SELECT * FROM socket WHERE id = ?', (socket_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
def get_all_sockets():
|
|
db = get_db()
|
|
rows = db.execute('SELECT * FROM socket').fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
def create_socket(name, description, ip_address=None, status=0):
|
|
socket_id = str(uuid.uuid4())
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
'INSERT INTO socket (id, name, description, status, ip_address) VALUES (?, ?, ?, ?, ?)',
|
|
(socket_id, name, description, status, ip_address)
|
|
)
|
|
db.commit()
|
|
|
|
return get_socket_by_id(socket_id)
|
|
|
|
def update_socket(socket_id, name=None, description=None, status=None, ip_address=None):
|
|
socket = get_socket_by_id(socket_id)
|
|
if not socket:
|
|
return None
|
|
|
|
values = []
|
|
params = []
|
|
|
|
if name:
|
|
values.append('name = ?')
|
|
params.append(name)
|
|
|
|
if description:
|
|
values.append('description = ?')
|
|
params.append(description)
|
|
|
|
if status is not None:
|
|
values.append('status = ?')
|
|
params.append(status)
|
|
|
|
if ip_address:
|
|
values.append('ip_address = ?')
|
|
params.append(ip_address)
|
|
|
|
if not values:
|
|
return socket
|
|
|
|
query = f'UPDATE socket SET {", ".join(values)} WHERE id = ?'
|
|
params.append(socket_id)
|
|
|
|
db = get_db()
|
|
db.execute(query, params)
|
|
db.commit()
|
|
|
|
return get_socket_by_id(socket_id)
|
|
|
|
def delete_socket(socket_id):
|
|
db = get_db()
|
|
db.execute('DELETE FROM socket WHERE id = ?', (socket_id,))
|
|
db.commit()
|
|
return True
|
|
|
|
def get_latest_job_for_socket(socket_id):
|
|
db = get_db()
|
|
row = db.execute('''
|
|
SELECT * FROM job
|
|
WHERE socket_id = ?
|
|
ORDER BY start_at DESC
|
|
LIMIT 1
|
|
''', (socket_id,)).fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
def socket_to_dict(socket):
|
|
if not socket:
|
|
return None
|
|
|
|
latest_job = get_latest_job_for_socket(socket['id'])
|
|
|
|
return {
|
|
'id': socket['id'],
|
|
'name': socket['name'],
|
|
'description': socket['description'],
|
|
'status': socket['status'],
|
|
'latestJob': job_to_dict(latest_job) if latest_job else None
|
|
}
|
|
|
|
# Job-Verwaltung
|
|
def get_job_by_id(job_id):
|
|
db = get_db()
|
|
row = db.execute('SELECT * FROM job WHERE id = ?', (job_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
def get_jobs_by_user(user_id):
|
|
db = get_db()
|
|
rows = db.execute('SELECT * FROM job WHERE user_id = ?', (user_id,)).fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
def get_all_jobs():
|
|
db = get_db()
|
|
rows = db.execute('SELECT * FROM job').fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
def get_expired_jobs():
|
|
db = get_db()
|
|
now = datetime.datetime.utcnow().isoformat()
|
|
rows = db.execute('''
|
|
SELECT * FROM job
|
|
WHERE aborted = 0
|
|
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') <= datetime(?)
|
|
''', (now,)).fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
def create_job(socket_id, user_id, duration_in_minutes, comments=None):
|
|
job_id = str(uuid.uuid4())
|
|
start_at = datetime.datetime.utcnow()
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
'''INSERT INTO job
|
|
(id, socket_id, user_id, start_at, duration_in_minutes, comments, aborted, abort_reason)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
|
|
(job_id, socket_id, user_id, start_at.isoformat(), duration_in_minutes, comments, 0, None)
|
|
)
|
|
db.commit()
|
|
|
|
return get_job_by_id(job_id)
|
|
|
|
def update_job(job_id, socket_id=None, user_id=None, duration_in_minutes=None,
|
|
comments=None, aborted=None, abort_reason=None):
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return None
|
|
|
|
values = []
|
|
params = []
|
|
|
|
if socket_id:
|
|
values.append('socket_id = ?')
|
|
params.append(socket_id)
|
|
|
|
if user_id:
|
|
values.append('user_id = ?')
|
|
params.append(user_id)
|
|
|
|
if duration_in_minutes:
|
|
values.append('duration_in_minutes = ?')
|
|
params.append(duration_in_minutes)
|
|
|
|
if comments is not None:
|
|
values.append('comments = ?')
|
|
params.append(comments)
|
|
|
|
if aborted is not None:
|
|
values.append('aborted = ?')
|
|
params.append(1 if aborted else 0)
|
|
|
|
if abort_reason is not None:
|
|
values.append('abort_reason = ?')
|
|
params.append(abort_reason)
|
|
|
|
if not values:
|
|
return job
|
|
|
|
query = f'UPDATE job SET {", ".join(values)} WHERE id = ?'
|
|
params.append(job_id)
|
|
|
|
db = get_db()
|
|
db.execute(query, params)
|
|
db.commit()
|
|
|
|
return get_job_by_id(job_id)
|
|
|
|
def delete_job(job_id):
|
|
db = get_db()
|
|
db.execute('DELETE FROM job WHERE id = ?', (job_id,))
|
|
db.commit()
|
|
return True
|
|
|
|
def calculate_remaining_time(job):
|
|
if job['aborted']:
|
|
return 0
|
|
|
|
start_at = datetime.datetime.fromisoformat(job['start_at'])
|
|
end_at = start_at + timedelta(minutes=job['duration_in_minutes'])
|
|
|
|
now = datetime.datetime.utcnow()
|
|
if now > end_at:
|
|
return 0
|
|
|
|
diff = end_at - now
|
|
return int(diff.total_seconds() / 60)
|
|
|
|
def job_to_dict(job):
|
|
if not job:
|
|
return None
|
|
|
|
return {
|
|
'id': job['id'],
|
|
'socketId': job['socket_id'],
|
|
'userId': job['user_id'],
|
|
'startAt': job['start_at'],
|
|
'durationInMinutes': job['duration_in_minutes'],
|
|
'comments': job['comments'],
|
|
'aborted': bool(job['aborted']),
|
|
'abortReason': job['abort_reason'],
|
|
'remainingMinutes': calculate_remaining_time(job)
|
|
}
|
|
|
|
# Steckdosen-Steuerung mit PyP100
|
|
def get_socket_device(ip_address):
|
|
try:
|
|
device = PyP100.P100(ip_address, TAPO_USERNAME, TAPO_PASSWORD)
|
|
device.handshake() # Erstellt die erforderlichen Cookies
|
|
device.login() # Sendet Anmeldedaten und erstellt AES-Schlüssel
|
|
app.logger.info(f"PyP100 Verbindung zu {ip_address} hergestellt")
|
|
return device
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler bei der Anmeldung an P100-Gerät {ip_address}: {e}")
|
|
return None
|
|
|
|
def turn_on_socket(ip_address):
|
|
try:
|
|
device = get_socket_device(ip_address)
|
|
if device:
|
|
device.turnOn()
|
|
app.logger.info(f"P100-Steckdose {ip_address} eingeschaltet")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Einschalten der P100-Steckdose {ip_address}: {e}")
|
|
return False
|
|
|
|
def turn_off_socket(ip_address):
|
|
try:
|
|
device = get_socket_device(ip_address)
|
|
if device:
|
|
device.turnOff()
|
|
app.logger.info(f"P100-Steckdose {ip_address} ausgeschaltet")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der P100-Steckdose {ip_address}: {e}")
|
|
return False
|
|
|
|
# Authentifizierung und Autorisierung
|
|
def get_current_user():
|
|
session_id = flask_session.get('session_id')
|
|
if not session_id:
|
|
return None
|
|
|
|
session = get_session_by_id(session_id)
|
|
if not session or datetime.datetime.fromisoformat(session['expires_at']) < datetime.datetime.utcnow():
|
|
if session:
|
|
delete_session(session['id'])
|
|
flask_session.pop('session_id', None)
|
|
return None
|
|
|
|
return get_user_by_id(session['user_id'])
|
|
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
user = get_current_user()
|
|
if not user:
|
|
return jsonify({'message': 'Authentifizierung erforderlich!'}), 401
|
|
|
|
g.current_user = user
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not g.get('current_user') or g.current_user['role'] != 'admin':
|
|
return jsonify({'message': 'Admin-Rechte erforderlich!'}), 403
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
# Authentifizierungs-Routen
|
|
@app.route('/auth/register', methods=['POST'])
|
|
def register():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
display_name = data.get('displayName', username)
|
|
email = data.get('email', '')
|
|
|
|
if get_user_by_username(username):
|
|
return jsonify({'message': 'Benutzername bereits vergeben!'}), 400
|
|
|
|
# Prüfen, ob es bereits einen Admin gibt
|
|
db = get_db()
|
|
admin_exists = db.execute("SELECT 1 FROM user WHERE role = 'admin' LIMIT 1").fetchone() is not None
|
|
|
|
# Falls kein Admin existiert, wird der erste Benutzer zum Admin
|
|
role = 'admin' if not admin_exists else 'user'
|
|
|
|
user = create_user(username, password, display_name, email, role)
|
|
app.logger.info(f'Neuer Benutzer registriert: {username} (Rolle: {role})')
|
|
|
|
# Session erstellen
|
|
create_session(user['id'])
|
|
|
|
return jsonify({
|
|
'message': 'Registrierung erfolgreich!',
|
|
'user': user_to_dict(user)
|
|
}), 201
|
|
|
|
@app.route('/auth/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
user = get_user_by_username(username)
|
|
|
|
if not user or not check_password(user, password):
|
|
return jsonify({'message': 'Ungültiger Benutzername oder Passwort!'}), 401
|
|
|
|
# Session erstellen
|
|
create_session(user['id'])
|
|
|
|
return jsonify({
|
|
'message': 'Anmeldung erfolgreich!',
|
|
'user': user_to_dict(user)
|
|
})
|
|
|
|
@app.route('/auth/logout', methods=['POST'])
|
|
def logout():
|
|
session_id = flask_session.get('session_id')
|
|
if session_id:
|
|
delete_session(session_id)
|
|
flask_session.pop('session_id', None)
|
|
|
|
return jsonify({'message': 'Erfolgreich abgemeldet!'}), 200
|
|
|
|
# API-Routen
|
|
@app.route('/api/me', methods=['GET'])
|
|
def get_me():
|
|
user = get_current_user()
|
|
if not user:
|
|
return jsonify({'authenticated': False}), 401
|
|
|
|
return jsonify({
|
|
'authenticated': True,
|
|
'user': user_to_dict(user)
|
|
})
|
|
|
|
@app.route('/api/printers', methods=['GET'])
|
|
def get_printers():
|
|
sockets = get_all_sockets()
|
|
return jsonify([socket_to_dict(socket) for socket in sockets])
|
|
|
|
@app.route('/api/printers', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def create_printer():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('name') or not data.get('description'):
|
|
return jsonify({'message': 'Name und Beschreibung sind erforderlich!'}), 400
|
|
|
|
socket = create_socket(
|
|
name=data.get('name'),
|
|
description=data.get('description'),
|
|
status=data.get('status', 0),
|
|
ip_address=data.get('ipAddress')
|
|
)
|
|
|
|
return jsonify(socket_to_dict(socket)), 201
|
|
|
|
@app.route('/api/printers/<printer_id>', methods=['GET'])
|
|
def get_printer(printer_id):
|
|
socket = get_socket_by_id(printer_id)
|
|
if not socket:
|
|
return jsonify({'message': 'Steckdose nicht gefunden!'}), 404
|
|
return jsonify(socket_to_dict(socket))
|
|
|
|
@app.route('/api/printers/<printer_id>', methods=['PUT'])
|
|
@login_required
|
|
@admin_required
|
|
def update_printer(printer_id):
|
|
socket = get_socket_by_id(printer_id)
|
|
if not socket:
|
|
return jsonify({'message': 'Steckdose nicht gefunden!'}), 404
|
|
|
|
data = request.get_json()
|
|
|
|
updated_socket = update_socket(
|
|
printer_id,
|
|
name=data.get('name'),
|
|
description=data.get('description'),
|
|
status=data.get('status') if 'status' in data else None,
|
|
ip_address=data.get('ipAddress')
|
|
)
|
|
|
|
return jsonify(socket_to_dict(updated_socket))
|
|
|
|
@app.route('/api/printers/<printer_id>', methods=['DELETE'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_printer(printer_id):
|
|
socket = get_socket_by_id(printer_id)
|
|
if not socket:
|
|
return jsonify({'message': 'Steckdose nicht gefunden!'}), 404
|
|
|
|
delete_socket(printer_id)
|
|
return jsonify({'message': 'Steckdose gelöscht!'})
|
|
|
|
@app.route('/api/jobs', methods=['GET'])
|
|
@login_required
|
|
def get_jobs():
|
|
# Admins sehen alle Jobs, normale Benutzer nur ihre eigenen
|
|
if g.current_user['role'] == 'admin':
|
|
jobs = get_all_jobs()
|
|
else:
|
|
jobs = get_jobs_by_user(g.current_user['id'])
|
|
|
|
return jsonify([job_to_dict(job) for job in jobs])
|
|
|
|
@app.route('/api/jobs', methods=['POST'])
|
|
@login_required
|
|
def create_job_endpoint():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('printerId') or not data.get('durationInMinutes'):
|
|
return jsonify({'message': 'Steckdosen-ID und Dauer sind erforderlich!'}), 400
|
|
|
|
socket = get_socket_by_id(data['printerId'])
|
|
if not socket:
|
|
return jsonify({'message': 'Steckdose nicht gefunden!'}), 404
|
|
|
|
if socket['status'] != 0: # 0 = available
|
|
return jsonify({'message': 'Steckdose ist nicht verfügbar!'}), 400
|
|
|
|
duration = int(data['durationInMinutes'])
|
|
|
|
job = create_job(
|
|
socket_id=socket['id'],
|
|
user_id=g.current_user['id'],
|
|
duration_in_minutes=duration,
|
|
comments=data.get('comments', '')
|
|
)
|
|
|
|
# Steckdose als belegt markieren
|
|
update_socket(socket['id'], status=1) # 1 = busy
|
|
|
|
# Steckdose einschalten, falls IP-Adresse hinterlegt ist
|
|
if socket['ip_address']:
|
|
try:
|
|
turn_on_socket(socket['ip_address'])
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Einschalten der Steckdose: {e}")
|
|
|
|
return jsonify(job_to_dict(job)), 201
|
|
|
|
@app.route('/api/jobs/<job_id>', methods=['GET'])
|
|
@login_required
|
|
def get_job_endpoint(job_id):
|
|
# Admins können alle Jobs sehen, Benutzer nur ihre eigenen
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
if g.current_user['role'] != 'admin' and job['user_id'] != g.current_user['id']:
|
|
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
|
|
|
|
return jsonify(job_to_dict(job))
|
|
|
|
@app.route('/api/jobs/<job_id>/abort', methods=['POST'])
|
|
@login_required
|
|
def abort_job(job_id):
|
|
# Admins können alle Jobs abbrechen, Benutzer nur ihre eigenen
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
if g.current_user['role'] != 'admin' and job['user_id'] != g.current_user['id']:
|
|
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
|
|
|
|
data = request.get_json()
|
|
|
|
updated_job = update_job(job_id, aborted=True, abort_reason=data.get('reason', ''))
|
|
|
|
# Steckdose wieder verfügbar machen
|
|
socket = get_socket_by_id(job['socket_id'])
|
|
if socket:
|
|
update_socket(socket['id'], status=0) # 0 = available
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if socket['ip_address']:
|
|
# Mehrmals versuchen, die Steckdose auszuschalten, um sicherzustellen, dass sie wirklich aus ist
|
|
max_attempts = 3
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
success = turn_off_socket(socket['ip_address'])
|
|
if success:
|
|
app.logger.info(f"Steckdose {socket['ip_address']} für abgebrochenen Job {job['id']} ausgeschaltet (Versuch {attempt}).")
|
|
break
|
|
app.logger.warning(f"Konnte Steckdose {socket['ip_address']} nicht ausschalten (Versuch {attempt}/{max_attempts}).")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {socket['ip_address']}: {e} (Versuch {attempt}/{max_attempts})")
|
|
|
|
# Nur wenn es nicht der letzte Versuch war, kurz warten und neu versuchen
|
|
if attempt < max_attempts:
|
|
import time
|
|
time.sleep(1)
|
|
|
|
return jsonify(job_to_dict(updated_job))
|
|
|
|
@app.route('/api/jobs/<job_id>/finish', methods=['POST'])
|
|
@login_required
|
|
def finish_job(job_id):
|
|
# Admins können alle Jobs beenden, Benutzer nur ihre eigenen
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
if g.current_user['role'] != 'admin' and job['user_id'] != g.current_user['id']:
|
|
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
|
|
|
|
# Aktuelle Zeit als Ende setzen
|
|
now = datetime.datetime.utcnow()
|
|
start_at = datetime.datetime.fromisoformat(job['start_at'])
|
|
actual_duration = int((now - start_at).total_seconds() / 60)
|
|
|
|
updated_job = update_job(job_id, duration_in_minutes=actual_duration)
|
|
|
|
# Steckdose wieder verfügbar machen
|
|
socket = get_socket_by_id(job['socket_id'])
|
|
if socket:
|
|
update_socket(socket['id'], status=0) # 0 = available
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if socket['ip_address']:
|
|
# Mehrmals versuchen, die Steckdose auszuschalten, um sicherzustellen, dass sie wirklich aus ist
|
|
max_attempts = 3
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
success = turn_off_socket(socket['ip_address'])
|
|
if success:
|
|
app.logger.info(f"Steckdose {socket['ip_address']} für beendeten Job {job['id']} ausgeschaltet (Versuch {attempt}).")
|
|
break
|
|
app.logger.warning(f"Konnte Steckdose {socket['ip_address']} nicht ausschalten (Versuch {attempt}/{max_attempts}).")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {socket['ip_address']}: {e} (Versuch {attempt}/{max_attempts})")
|
|
|
|
# Nur wenn es nicht der letzte Versuch war, kurz warten und neu versuchen
|
|
if attempt < max_attempts:
|
|
import time
|
|
time.sleep(1)
|
|
|
|
return jsonify(job_to_dict(updated_job))
|
|
|
|
@app.route('/api/jobs/<job_id>/extend', methods=['POST'])
|
|
@login_required
|
|
def extend_job(job_id):
|
|
# Admins können alle Jobs verlängern, Benutzer nur ihre eigenen
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
if g.current_user['role'] != 'admin' and job['user_id'] != g.current_user['id']:
|
|
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
|
|
|
|
data = request.get_json()
|
|
minutes = int(data.get('minutes', 0))
|
|
hours = int(data.get('hours', 0))
|
|
|
|
additional_minutes = minutes + (hours * 60)
|
|
if additional_minutes <= 0:
|
|
return jsonify({'message': 'Ungültige Verlängerungszeit!'}), 400
|
|
|
|
new_duration = job['duration_in_minutes'] + additional_minutes
|
|
updated_job = update_job(job_id, duration_in_minutes=new_duration)
|
|
|
|
return jsonify(job_to_dict(updated_job))
|
|
|
|
@app.route('/api/jobs/<job_id>/comments', methods=['PUT'])
|
|
@login_required
|
|
def update_job_comments(job_id):
|
|
# Admins können alle Jobs bearbeiten, Benutzer nur ihre eigenen
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
if g.current_user['role'] != 'admin' and job['user_id'] != g.current_user['id']:
|
|
return jsonify({'message': 'Keine Berechtigung für diesen Job!'}), 403
|
|
|
|
data = request.get_json()
|
|
updated_job = update_job(job_id, comments=data.get('comments', ''))
|
|
|
|
return jsonify(job_to_dict(updated_job))
|
|
|
|
@app.route('/api/job/<job_id>/remaining-time', methods=['GET'])
|
|
def job_remaining_time(job_id):
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
remaining = calculate_remaining_time(job)
|
|
|
|
# Wenn die verbleibende Zeit 0 ist und der Job nicht manuell abgebrochen wurde,
|
|
# automatisch die Steckdose ausschalten und Status aktualisieren
|
|
if remaining == 0 and not job['aborted']:
|
|
socket = get_socket_by_id(job['socket_id'])
|
|
if socket and socket['status'] == 1: # busy
|
|
update_socket(socket['id'], status=0) # available
|
|
app.logger.info(f"Job {job['id']} abgelaufen. Steckdose {socket['id']} auf verfügbar gesetzt.")
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if socket['ip_address']:
|
|
# Mehrmals versuchen, die Steckdose auszuschalten, um sicherzustellen, dass sie wirklich aus ist
|
|
max_attempts = 3
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
success = turn_off_socket(socket['ip_address'])
|
|
if success:
|
|
app.logger.info(f"Steckdose {socket['ip_address']} für abgelaufenen Job {job['id']} automatisch ausgeschaltet (Versuch {attempt}).")
|
|
break
|
|
app.logger.warning(f"Konnte Steckdose {socket['ip_address']} nicht ausschalten (Versuch {attempt}/{max_attempts}).")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {socket['ip_address']}: {e} (Versuch {attempt}/{max_attempts})")
|
|
|
|
# Nur wenn es nicht der letzte Versuch war, kurz warten und neu versuchen
|
|
if attempt < max_attempts:
|
|
import time
|
|
time.sleep(1)
|
|
|
|
return jsonify({
|
|
'remaining_minutes': remaining,
|
|
'job_status': 'completed' if remaining == 0 else 'active',
|
|
'socket_status': 'available' if remaining == 0 else 'busy'
|
|
})
|
|
|
|
@app.route('/api/users', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def get_users():
|
|
users = get_all_users()
|
|
return jsonify([user_to_dict(user) for user in users])
|
|
|
|
@app.route('/api/users/<user_id>', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def get_user(user_id):
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
|
|
return jsonify(user_to_dict(user))
|
|
|
|
@app.route('/api/users/<user_id>', methods=['PUT'])
|
|
@login_required
|
|
@admin_required
|
|
def update_user_endpoint(user_id):
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
|
|
|
|
data = request.get_json()
|
|
updated_user = update_user(
|
|
user_id,
|
|
username=data.get('username'),
|
|
password=data.get('password'),
|
|
display_name=data.get('displayName'),
|
|
email=data.get('email'),
|
|
role=data.get('role')
|
|
)
|
|
|
|
return jsonify(user_to_dict(updated_user))
|
|
|
|
@app.route('/api/users/<user_id>', methods=['DELETE'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user_endpoint(user_id):
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'Benutzer nicht gefunden!'}), 404
|
|
|
|
# Löschen aller Sessions des Benutzers
|
|
delete_sessions_by_user(user_id)
|
|
|
|
delete_user(user_id)
|
|
return jsonify({'message': 'Benutzer gelöscht!'})
|
|
|
|
@app.route('/api/stats', methods=['GET'])
|
|
@login_required
|
|
@admin_required
|
|
def stats():
|
|
db = get_db()
|
|
|
|
# Steckdosen-Nutzungsstatistiken
|
|
total_sockets = db.execute('SELECT COUNT(*) as count FROM socket').fetchone()['count']
|
|
available_sockets = db.execute('SELECT COUNT(*) as count FROM socket WHERE status = 0').fetchone()['count']
|
|
|
|
# Job-Statistiken
|
|
total_jobs = db.execute('SELECT COUNT(*) as count FROM job').fetchone()['count']
|
|
|
|
now = datetime.datetime.utcnow().isoformat()
|
|
active_jobs = db.execute('''
|
|
SELECT COUNT(*) as count FROM job
|
|
WHERE aborted = 0
|
|
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') > datetime(?)
|
|
''', (now,)).fetchone()['count']
|
|
|
|
completed_jobs = db.execute('''
|
|
SELECT COUNT(*) as count FROM job
|
|
WHERE aborted = 0
|
|
AND datetime(start_at, '+' || duration_in_minutes || ' minutes') <= datetime(?)
|
|
''', (now,)).fetchone()['count']
|
|
|
|
# Benutzerstatistiken
|
|
total_users = db.execute('SELECT COUNT(*) as count FROM user').fetchone()['count']
|
|
|
|
# Durchschnittliche Druckdauer
|
|
avg_duration_result = db.execute('SELECT AVG(duration_in_minutes) as avg FROM job').fetchone()
|
|
avg_duration = int(avg_duration_result['avg']) if avg_duration_result['avg'] else 0
|
|
|
|
return jsonify({
|
|
'printers': {
|
|
'total': total_sockets,
|
|
'available': available_sockets,
|
|
'utilization_rate': (total_sockets - available_sockets) / total_sockets if total_sockets > 0 else 0
|
|
},
|
|
'jobs': {
|
|
'total': total_jobs,
|
|
'active': active_jobs,
|
|
'completed': completed_jobs,
|
|
'avg_duration': avg_duration
|
|
},
|
|
'users': {
|
|
'total': total_users
|
|
}
|
|
})
|
|
|
|
# Tägliche Überprüfung der Jobs und automatische Abschaltung der Steckdosen
|
|
@app.cli.command("check-jobs")
|
|
def check_jobs():
|
|
"""Überprüft abgelaufene Jobs und schaltet Steckdosen aus."""
|
|
with app.app_context():
|
|
expired_jobs = get_expired_jobs()
|
|
|
|
for job in expired_jobs:
|
|
socket = get_socket_by_id(job['socket_id'])
|
|
|
|
if socket and socket['status'] == 1: # busy
|
|
update_socket(socket['id'], status=0) # available
|
|
app.logger.info(f"Job {job['id']} abgelaufen. Steckdose {socket['id']} auf verfügbar gesetzt.")
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if socket['ip_address']:
|
|
# Mehrmals versuchen, die Steckdose auszuschalten, um sicherzustellen, dass sie wirklich aus ist
|
|
max_attempts = 3
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
success = turn_off_socket(socket['ip_address'])
|
|
if success:
|
|
app.logger.info(f"Steckdose {socket['ip_address']} für abgelaufenen Job {job['id']} ausgeschaltet (Versuch {attempt}).")
|
|
break
|
|
app.logger.warning(f"Konnte Steckdose {socket['ip_address']} nicht ausschalten (Versuch {attempt}/{max_attempts}).")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {socket['ip_address']}: {e} (Versuch {attempt}/{max_attempts})")
|
|
|
|
# Nur wenn es nicht der letzte Versuch war, kurz warten und neu versuchen
|
|
if attempt < max_attempts:
|
|
import time
|
|
time.sleep(1)
|
|
|
|
app.logger.info(f"{len(expired_jobs)} abgelaufene Jobs überprüft und Steckdosen aktualisiert.")
|
|
|
|
@app.route('/api/job/<job_id>/status', methods=['GET'])
|
|
def job_status(job_id):
|
|
"""Endpunkt zum Überprüfen des Status eines Jobs für Frontend-Polling."""
|
|
job = get_job_by_id(job_id)
|
|
if not job:
|
|
return jsonify({'message': 'Job nicht gefunden!'}), 404
|
|
|
|
remaining = calculate_remaining_time(job)
|
|
socket = get_socket_by_id(job['socket_id'])
|
|
socket_status = socket['status'] if socket else None
|
|
|
|
# Wenn die verbleibende Zeit 0 ist und der Job nicht manuell abgebrochen wurde,
|
|
# automatisch die Steckdose ausschalten und Status aktualisieren
|
|
if remaining == 0 and not job['aborted'] and socket and socket['status'] == 1:
|
|
# Update socket status to available
|
|
update_socket(socket['id'], status=0)
|
|
socket_status = 0
|
|
app.logger.info(f"Job {job['id']} abgelaufen. Steckdose {socket['id']} auf verfügbar gesetzt.")
|
|
|
|
# Steckdose ausschalten, falls IP-Adresse hinterlegt ist
|
|
if socket['ip_address']:
|
|
# Mehrmals versuchen, die Steckdose auszuschalten, um sicherzustellen, dass sie wirklich aus ist
|
|
max_attempts = 3
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
success = turn_off_socket(socket['ip_address'])
|
|
if success:
|
|
app.logger.info(f"Steckdose {socket['ip_address']} für abgelaufenen Job {job['id']} automatisch ausgeschaltet (Versuch {attempt}).")
|
|
break
|
|
app.logger.warning(f"Konnte Steckdose {socket['ip_address']} nicht ausschalten (Versuch {attempt}/{max_attempts}).")
|
|
except Exception as e:
|
|
app.logger.error(f"Fehler beim Ausschalten der Steckdose {socket['ip_address']}: {e} (Versuch {attempt}/{max_attempts})")
|
|
|
|
# Nur wenn es nicht der letzte Versuch war, kurz warten und neu versuchen
|
|
if attempt < max_attempts:
|
|
import time
|
|
time.sleep(1)
|
|
|
|
job_status = 'aborted' if job['aborted'] else ('completed' if remaining == 0 else 'active')
|
|
|
|
return jsonify({
|
|
'job': job_to_dict(job),
|
|
'status': job_status,
|
|
'socketStatus': 'available' if socket_status == 0 else 'busy',
|
|
'remainingMinutes': remaining
|
|
})
|
|
|
|
@app.route('/api/test', methods=['GET'])
|
|
def test():
|
|
return jsonify({'message': 'MYP Backend API funktioniert!'})
|
|
|
|
@app.route('/api/create-initial-admin', methods=['POST'])
|
|
def create_initial_admin():
|
|
db = get_db()
|
|
admin_exists = db.execute("SELECT 1 FROM user WHERE role = 'admin' LIMIT 1").fetchone() is not None
|
|
|
|
if admin_exists:
|
|
return jsonify({'message': 'Es existiert bereits ein Administrator!'}), 400
|
|
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
display_name = data.get('displayName', username)
|
|
email = data.get('email', '')
|
|
|
|
user = create_user(username, password, display_name, email, 'admin')
|
|
app.logger.info(f'Initialer Admin-Benutzer erstellt: {username}')
|
|
|
|
return jsonify({
|
|
'message': 'Administrator wurde erfolgreich erstellt!',
|
|
'user': user_to_dict(user)
|
|
}), 201
|
|
|
|
# Error Handler
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return jsonify({'message': 'Nicht gefunden!'}), 404
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(error):
|
|
app.logger.error(f'Serverfehler: {error}')
|
|
return jsonify({'message': 'Interner Serverfehler!'}), 500
|
|
|
|
# Web UI Routen
|
|
@app.route('/')
|
|
def index():
|
|
current_user = get_current_user()
|
|
if current_user:
|
|
return render_template('dashboard.html', current_user=current_user, active_page='home')
|
|
return redirect(url_for('login_page'))
|
|
|
|
@app.route('/login')
|
|
def login_page():
|
|
return render_template('login.html', active_page='login')
|
|
|
|
@app.route('/register')
|
|
def register_page():
|
|
return render_template('register.html', active_page='register')
|
|
|
|
@app.route('/logout')
|
|
def logout_page():
|
|
session_id = flask_session.get('session_id')
|
|
if session_id:
|
|
delete_session(session_id)
|
|
flask_session.pop('session_id', None)
|
|
|
|
flash('Sie wurden erfolgreich abgemeldet.', 'success')
|
|
return redirect(url_for('login_page'))
|
|
|
|
@app.route('/admin/printers')
|
|
def printers_page():
|
|
current_user = get_current_user()
|
|
if not current_user:
|
|
return redirect(url_for('login_page'))
|
|
return render_template('printers.html', current_user=current_user, active_page='printers')
|
|
|
|
@app.route('/admin/jobs')
|
|
def jobs_page():
|
|
current_user = get_current_user()
|
|
if not current_user:
|
|
return redirect(url_for('login_page'))
|
|
return render_template('jobs.html', current_user=current_user, active_page='jobs')
|
|
|
|
@app.route('/admin/users')
|
|
def users_page():
|
|
current_user = get_current_user()
|
|
if not current_user or current_user['role'] != 'admin':
|
|
flash('Sie haben keine Berechtigung, diese Seite zu besuchen.', 'danger')
|
|
return redirect(url_for('index'))
|
|
return render_template('users.html', current_user=current_user, active_page='users')
|
|
|
|
@app.route('/admin/stats')
|
|
def stats_page():
|
|
current_user = get_current_user()
|
|
if not current_user or current_user['role'] != 'admin':
|
|
flash('Sie haben keine Berechtigung, diese Seite zu besuchen.', 'danger')
|
|
return redirect(url_for('index'))
|
|
return render_template('stats.html', current_user=current_user, active_page='stats')
|
|
|
|
# Server starten
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0') |