from flask import Flask, request, jsonify, g, redirect, url_for, session as flask_session from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from werkzeug.security import generate_password_hash, check_password_hash from functools import wraps import jwt import datetime import os import json import logging import uuid import asyncio from logging.handlers import RotatingFileHandler from datetime import timedelta from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass import sqlite3 from tapo import ApiClient from dotenv import load_dotenv # Lade Umgebungsvariablen load_dotenv() # Initialisierung app = Flask(__name__) CORS(app, supports_credentials=True) # Konfiguration app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_secret_key') app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///myp.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SECURE'] = os.environ.get('FLASK_ENV') == 'production' app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Tapo-Konfiguration TAPO_USERNAME = os.environ.get('TAPO_USERNAME') TAPO_PASSWORD = os.environ.get('TAPO_PASSWORD') TAPO_DEVICES = json.loads(os.environ.get('TAPO_DEVICES', '{}')) # Logging if not os.path.exists('logs'): os.mkdir('logs') file_handler = RotatingFileHandler('logs/myp.log', maxBytes=10240, backupCount=10) file_handler.setFormatter(logging.Formatter( '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' )) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) app.logger.info('MYP Backend starting up') # DB Setup db = SQLAlchemy(app) migrate = Migrate(app, db) # Lokale Authentifizierung statt OAuth # Models class User(db.Model): id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) username = db.Column(db.String(64), index=True, unique=True) password_hash = db.Column(db.String(128)) display_name = db.Column(db.String(100)) email = db.Column(db.String(120), index=True, unique=True) role = db.Column(db.String(20), default='user') # admin, user, guest jobs = db.relationship('PrintJob', backref='user', lazy='dynamic') def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) def to_dict(self): return { 'id': self.id, 'username': self.username, 'displayName': self.display_name, 'email': self.email, 'role': self.role } class Session(db.Model): id = db.Column(db.String(36), primary_key=True) user_id = db.Column(db.String(36), db.ForeignKey('user.id'), nullable=False) expires_at = db.Column(db.DateTime, nullable=False) user = db.relationship('User', backref=db.backref('sessions', lazy=True)) class Printer(db.Model): id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) name = db.Column(db.String(64), index=True, nullable=False) description = db.Column(db.Text, nullable=False) status = db.Column(db.Integer, default=0) # 0=available, 1=busy, 2=maintenance ip_address = db.Column(db.String(15), nullable=True) # IP-Adresse der Tapo-Steckdose jobs = db.relationship('PrintJob', backref='printer', lazy='dynamic') def get_latest_job(self): return PrintJob.query.filter_by(printer_id=self.id).order_by(PrintJob.start_at.desc()).first() def to_dict(self): latest_job = self.get_latest_job() return { 'id': self.id, 'name': self.name, 'description': self.description, 'status': self.status, 'latestJob': latest_job.to_dict() if latest_job else None } class PrintJob(db.Model): id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) printer_id = db.Column(db.String(36), db.ForeignKey('printer.id'), nullable=False) user_id = db.Column(db.String(36), db.ForeignKey('user.id'), nullable=False) start_at = db.Column(db.DateTime, default=datetime.datetime.utcnow) duration_in_minutes = db.Column(db.Integer, nullable=False) comments = db.Column(db.Text, nullable=True) aborted = db.Column(db.Boolean, default=False) abort_reason = db.Column(db.Text, nullable=True) @property def end_at(self): return self.start_at + timedelta(minutes=self.duration_in_minutes) def remaining_time(self): if self.aborted: return 0 now = datetime.datetime.utcnow() if now > self.end_at: return 0 diff = self.end_at - now return int(diff.total_seconds() / 60) def to_dict(self): return { 'id': self.id, 'printerId': self.printer_id, 'userId': self.user_id, 'startAt': self.start_at.isoformat(), 'durationInMinutes': self.duration_in_minutes, 'comments': self.comments, 'aborted': self.aborted, 'abortReason': self.abort_reason, 'remainingMinutes': self.remaining_time() } # Tapo Steckdosen-Steuerung class TapoControl: def __init__(self, username, password): self.username = username self.password = password self.clients = {} async def get_client(self, ip_address): if ip_address not in self.clients: try: client = ApiClient(self.username, self.password) await client.login() self.clients[ip_address] = client except Exception as e: app.logger.error(f"Fehler bei der Anmeldung an Tapo-Gerät {ip_address}: {e}") return None return self.clients[ip_address] async def turn_on(self, ip_address): client = await self.get_client(ip_address) if client: try: device = await client.p115(ip_address) await device.on() app.logger.info(f"Tapo-Steckdose {ip_address} eingeschaltet") return True except Exception as e: app.logger.error(f"Fehler beim Einschalten der Tapo-Steckdose {ip_address}: {e}") return False async def turn_off(self, ip_address): client = await self.get_client(ip_address) if client: try: device = await client.p115(ip_address) await device.off() app.logger.info(f"Tapo-Steckdose {ip_address} ausgeschaltet") return True except Exception as e: app.logger.error(f"Fehler beim Ausschalten der Tapo-Steckdose {ip_address}: {e}") return False tapo_control = TapoControl(TAPO_USERNAME, TAPO_PASSWORD) # Hilfsfunktionen def get_current_user(): session_id = flask_session.get('session_id') if not session_id: return None session = Session.query.get(session_id) if not session or session.expires_at < datetime.datetime.utcnow(): if session: db.session.delete(session) db.session.commit() flask_session.pop('session_id', None) return None return session.user def login_required(f): @wraps(f) def decorated(*args, **kwargs): user = get_current_user() if not user: return jsonify({'message': 'Authentifizierung erforderlich!'}), 401 g.current_user = user return f(*args, **kwargs) return decorated def admin_required(f): @wraps(f) def decorated(*args, **kwargs): if not g.current_user or g.current_user.role != 'admin': return jsonify({'message': 'Admin-Rechte erforderlich!'}), 403 return f(*args, **kwargs) return decorated def create_session(user): session_id = str(uuid.uuid4()) expires_at = datetime.datetime.utcnow() + timedelta(days=7) session = Session(id=session_id, user_id=user.id, expires_at=expires_at) db.session.add(session) db.session.commit() flask_session['session_id'] = session_id flask_session.permanent = True return session_id # Authentifizierungs-Routen @app.route('/auth/register', methods=['POST']) def register(): data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400 username = data.get('username') password = data.get('password') display_name = data.get('displayName', username) email = data.get('email', '') if User.query.filter_by(username=username).first(): return jsonify({'message': 'Benutzername bereits vergeben!'}), 400 if email and User.query.filter_by(email=email).first(): return jsonify({'message': 'E-Mail-Adresse bereits registriert!'}), 400 # Prüfen, ob es bereits einen Admin gibt admin_exists = User.query.filter_by(role='admin').first() is not None # Falls kein Admin existiert, wird der erste Benutzer zum Admin role = 'admin' if not admin_exists else 'user' user = User( username=username, display_name=display_name, email=email, role=role ) user.set_password(password) db.session.add(user) db.session.commit() app.logger.info(f'Neuer Benutzer registriert: {username} (Rolle: {role})') # Session erstellen create_session(user) return jsonify({ 'message': 'Registrierung erfolgreich!', 'user': user.to_dict() }), 201 @app.route('/auth/login', methods=['POST']) def login(): data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400 username = data.get('username') password = data.get('password') user = User.query.filter_by(username=username).first() if not user or not user.check_password(password): return jsonify({'message': 'Ungültiger Benutzername oder Passwort!'}), 401 # Session erstellen create_session(user) return jsonify({ 'message': 'Anmeldung erfolgreich!', 'user': user.to_dict() }) @app.route('/auth/logout', methods=['POST']) def logout(): session_id = flask_session.get('session_id') if session_id: session = Session.query.get(session_id) if session: db.session.delete(session) db.session.commit() flask_session.pop('session_id', None) return jsonify({'message': 'Erfolgreich abgemeldet!'}), 200 # API-Routen @app.route('/api/me', methods=['GET']) def get_me(): user = get_current_user() if not user: return jsonify({'authenticated': False}), 401 return jsonify({ 'authenticated': True, 'user': user.to_dict() }) @app.route('/api/printers', methods=['GET']) def get_printers(): printers = Printer.query.all() return jsonify([printer.to_dict() for printer in printers]) @app.route('/api/printers', methods=['POST']) @login_required @admin_required def create_printer(): data = request.get_json() if not data or not data.get('name') or not data.get('description'): return jsonify({'message': 'Name und Beschreibung sind erforderlich!'}), 400 printer = Printer( name=data.get('name'), description=data.get('description'), status=data.get('status', 0), ip_address=data.get('ipAddress') ) db.session.add(printer) db.session.commit() return jsonify(printer.to_dict()), 201 @app.route('/api/printers/', methods=['GET']) def get_printer(printer_id): printer = Printer.query.get_or_404(printer_id) return jsonify(printer.to_dict()) @app.route('/api/printers/', methods=['PUT']) @login_required @admin_required def update_printer(printer_id): printer = Printer.query.get_or_404(printer_id) data = request.get_json() if data.get('name'): printer.name = data['name'] if data.get('description'): printer.description = data['description'] if 'status' in data: printer.status = data['status'] if data.get('ipAddress'): printer.ip_address = data['ipAddress'] db.session.commit() return jsonify(printer.to_dict()) @app.route('/api/printers/', methods=['DELETE']) @login_required @admin_required def delete_printer(printer_id): printer = Printer.query.get_or_404(printer_id) db.session.delete(printer) db.session.commit() return jsonify({'message': 'Drucker gelöscht!'}) @app.route('/api/jobs', methods=['GET']) @login_required def get_jobs(): # Admins sehen alle Jobs, normale Benutzer nur ihre eigenen if g.current_user.role == 'admin': jobs = PrintJob.query.all() else: jobs = PrintJob.query.filter_by(user_id=g.current_user.id).all() return jsonify([job.to_dict() for job in jobs]) @app.route('/api/jobs', methods=['POST']) @login_required def create_job(): data = request.get_json() if not data or not data.get('printerId') or not data.get('durationInMinutes'): return jsonify({'message': 'Drucker-ID und Dauer sind erforderlich!'}), 400 printer = Printer.query.get_or_404(data['printerId']) if printer.status != 0: # 0 = available return jsonify({'message': 'Drucker ist nicht verfügbar!'}), 400 duration = int(data['durationInMinutes']) job = PrintJob( printer_id=printer.id, user_id=g.current_user.id, duration_in_minutes=duration, comments=data.get('comments', '') ) # Drucker als belegt markieren printer.status = 1 # 1 = busy db.session.add(job) db.session.commit() # Steckdose einschalten, falls IP-Adresse hinterlegt ist if printer.ip_address: try: asyncio.run(tapo_control.turn_on(printer.ip_address)) except Exception as e: app.logger.error(f"Fehler beim Einschalten der Steckdose: {e}") return jsonify(job.to_dict()), 201 @app.route('/api/jobs/', methods=['GET']) @login_required def get_job(job_id): # Admins können alle Jobs sehen, Benutzer nur ihre eigenen if g.current_user.role == 'admin': job = PrintJob.query.get_or_404(job_id) else: job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404() return jsonify(job.to_dict()) @app.route('/api/jobs//abort', methods=['POST']) @login_required def abort_job(job_id): # Admins können alle Jobs abbrechen, Benutzer nur ihre eigenen if g.current_user.role == 'admin': job = PrintJob.query.get_or_404(job_id) else: job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404() data = request.get_json() job.aborted = True job.abort_reason = data.get('reason', '') # Drucker wieder verfügbar machen printer = job.printer printer.status = 0 # 0 = available db.session.commit() # Steckdose ausschalten, falls IP-Adresse hinterlegt ist if printer.ip_address: try: asyncio.run(tapo_control.turn_off(printer.ip_address)) except Exception as e: app.logger.error(f"Fehler beim Ausschalten der Steckdose: {e}") return jsonify(job.to_dict()) @app.route('/api/jobs//finish', methods=['POST']) @login_required def finish_job(job_id): # Admins können alle Jobs beenden, Benutzer nur ihre eigenen if g.current_user.role == 'admin': job = PrintJob.query.get_or_404(job_id) else: job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404() # Aktuelle Zeit als Ende setzen now = datetime.datetime.utcnow() actual_duration = int((now - job.start_at).total_seconds() / 60) job.duration_in_minutes = actual_duration # Drucker wieder verfügbar machen printer = job.printer printer.status = 0 # 0 = available db.session.commit() # Steckdose ausschalten, falls IP-Adresse hinterlegt ist if printer.ip_address: try: asyncio.run(tapo_control.turn_off(printer.ip_address)) except Exception as e: app.logger.error(f"Fehler beim Ausschalten der Steckdose: {e}") return jsonify(job.to_dict()) @app.route('/api/jobs//extend', methods=['POST']) @login_required def extend_job(job_id): # Admins können alle Jobs verlängern, Benutzer nur ihre eigenen if g.current_user.role == 'admin': job = PrintJob.query.get_or_404(job_id) else: job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404() data = request.get_json() minutes = int(data.get('minutes', 0)) hours = int(data.get('hours', 0)) additional_minutes = minutes + (hours * 60) if additional_minutes <= 0: return jsonify({'message': 'Ungültige Verlängerungszeit!'}), 400 job.duration_in_minutes += additional_minutes db.session.commit() return jsonify(job.to_dict()) @app.route('/api/jobs//comments', methods=['PUT']) @login_required def update_job_comments(job_id): # Admins können alle Jobs bearbeiten, Benutzer nur ihre eigenen if g.current_user.role == 'admin': job = PrintJob.query.get_or_404(job_id) else: job = PrintJob.query.filter_by(id=job_id, user_id=g.current_user.id).first_or_404() data = request.get_json() job.comments = data.get('comments', '') db.session.commit() return jsonify(job.to_dict()) @app.route('/api/job//remaining-time', methods=['GET']) def job_remaining_time(job_id): job = PrintJob.query.get_or_404(job_id) remaining = job.remaining_time() return jsonify({'remaining_minutes': remaining}) @app.route('/api/users', methods=['GET']) @login_required @admin_required def get_users(): users = User.query.all() return jsonify([user.to_dict() for user in users]) @app.route('/api/users/', methods=['GET']) @login_required @admin_required def get_user(user_id): user = User.query.get_or_404(user_id) return jsonify(user.to_dict()) @app.route('/api/users/', methods=['PUT']) @login_required @admin_required def update_user(user_id): user = User.query.get_or_404(user_id) data = request.get_json() if data.get('username'): user.username = data['username'] if data.get('displayName'): user.display_name = data['displayName'] if data.get('email'): user.email = data['email'] if data.get('role'): user.role = data['role'] db.session.commit() return jsonify(user.to_dict()) @app.route('/api/users/', methods=['DELETE']) @login_required @admin_required def delete_user(user_id): user = User.query.get_or_404(user_id) # Löschen aller Sessions des Benutzers Session.query.filter_by(user_id=user.id).delete() db.session.delete(user) db.session.commit() return jsonify({'message': 'Benutzer gelöscht!'}) @app.route('/api/stats', methods=['GET']) @login_required @admin_required def stats(): # Drucker-Nutzungsstatistiken total_printers = Printer.query.count() available_printers = Printer.query.filter_by(status=0).count() # Job-Statistiken total_jobs = PrintJob.query.count() active_jobs = PrintJob.query.filter( PrintJob.aborted == False, PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) > datetime.datetime.utcnow() ).count() completed_jobs = PrintJob.query.filter( PrintJob.aborted == False, PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) <= datetime.datetime.utcnow() ).count() # Benutzerstatistiken total_users = User.query.count() # Durchschnittliche Druckdauer avg_duration_result = db.session.query(db.func.avg(PrintJob.duration_in_minutes)).first() avg_duration = int(avg_duration_result[0]) if avg_duration_result[0] else 0 return jsonify({ 'printers': { 'total': total_printers, 'available': available_printers, 'utilization_rate': (total_printers - available_printers) / total_printers if total_printers > 0 else 0 }, 'jobs': { 'total': total_jobs, 'active': active_jobs, 'completed': completed_jobs, 'avg_duration': avg_duration }, 'users': { 'total': total_users } }) # Tägliche Überprüfung der Jobs und automatische Abschaltung der Steckdosen @app.cli.command("check-jobs") def check_jobs(): """Überprüft abgelaufene Jobs und schaltet Steckdosen aus.""" now = datetime.datetime.utcnow() # Abgelaufene Jobs finden expired_jobs = PrintJob.query.filter( PrintJob.aborted == False, PrintJob.start_at + timedelta(minutes=PrintJob.duration_in_minutes) <= now ).all() for job in expired_jobs: printer = job.printer # Drucker-Status auf verfügbar setzen if printer.status == 1: # busy printer.status = 0 # available app.logger.info(f"Job {job.id} abgelaufen. Drucker {printer.id} auf verfügbar gesetzt.") # Steckdose ausschalten, falls IP-Adresse hinterlegt ist if printer.ip_address: try: asyncio.run(tapo_control.turn_off(printer.ip_address)) app.logger.info(f"Steckdose {printer.ip_address} für abgelaufenen Job {job.id} ausgeschaltet.") except Exception as e: app.logger.error(f"Fehler beim Ausschalten der Steckdose {printer.ip_address}: {e}") db.session.commit() app.logger.info(f"{len(expired_jobs)} abgelaufene Jobs überprüft und Steckdosen aktualisiert.") @app.route('/api/test', methods=['GET']) def test(): return jsonify({'message': 'MYP Backend API funktioniert!'}) @app.route('/api/create-initial-admin', methods=['POST']) def create_initial_admin(): admin_exists = User.query.filter_by(role='admin').first() is not None if admin_exists: return jsonify({'message': 'Es existiert bereits ein Administrator!'}), 400 data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({'message': 'Benutzername und Passwort sind erforderlich!'}), 400 username = data.get('username') password = data.get('password') display_name = data.get('displayName', username) email = data.get('email', '') user = User( username=username, display_name=display_name, email=email, role='admin' ) user.set_password(password) db.session.add(user) db.session.commit() app.logger.info(f'Initialer Admin-Benutzer erstellt: {username}') return jsonify({ 'message': 'Administrator wurde erfolgreich erstellt!', 'user': user.to_dict() }), 201 # Error Handler @app.errorhandler(404) def not_found(error): return jsonify({'message': 'Nicht gefunden!'}), 404 @app.errorhandler(500) def server_error(error): app.logger.error(f'Serverfehler: {error}') return jsonify({'message': 'Interner Serverfehler!'}), 500 # Server starten if __name__ == '__main__': app.run(debug=True, host='0.0.0.0')