225 lines
6.9 KiB
Python
225 lines
6.9 KiB
Python
"""
|
|
Einfache API-Endpunkte für Tapo-Steckdosen-Steuerung
|
|
Minimale REST-API für externe Zugriffe
|
|
"""
|
|
|
|
from flask import Blueprint, jsonify, request
|
|
from flask_login import login_required, current_user
|
|
import ipaddress
|
|
import time
|
|
|
|
from utils.hardware_integration import tapo_controller
|
|
from utils.logging_config import get_logger
|
|
from utils.security_suite import require_permission, Permission
|
|
from models import get_db_session, Printer
|
|
|
|
# Blueprint initialisieren
|
|
api_blueprint = Blueprint('api', __name__, url_prefix='/api/v1')
|
|
|
|
# Logger konfigurieren
|
|
api_logger = get_logger("api_simple")
|
|
|
|
@api_blueprint.route("/tapo/outlets", methods=["GET"])
|
|
@login_required
|
|
@require_permission(Permission.CONTROL_PRINTER)
|
|
def list_outlets():
|
|
"""Listet alle verfügbaren Tapo-Steckdosen auf."""
|
|
try:
|
|
db_session = get_db_session()
|
|
printers_with_tapo = db_session.query(Printer).filter(
|
|
Printer.active == True,
|
|
Printer.plug_ip.isnot(None)
|
|
).all()
|
|
|
|
outlets = []
|
|
for printer in printers_with_tapo:
|
|
outlets.append({
|
|
'id': printer.id,
|
|
'name': printer.name,
|
|
'ip': printer.plug_ip,
|
|
'location': printer.location or "Unbekannt",
|
|
'model': printer.model or "P110"
|
|
})
|
|
|
|
db_session.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'outlets': outlets,
|
|
'count': len(outlets),
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
except Exception as e:
|
|
api_logger.error(f"Fehler beim Auflisten der Outlets: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@api_blueprint.route("/tapo/outlet/<ip>/status", methods=["GET"])
|
|
@login_required
|
|
@require_permission(Permission.CONTROL_PRINTER)
|
|
def get_outlet_status_api(ip):
|
|
"""Holt den Status einer spezifischen Steckdose."""
|
|
try:
|
|
# IP validieren
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Ungültige IP-Adresse'
|
|
}), 400
|
|
|
|
# Status prüfen
|
|
reachable, status = tapo_controller.check_outlet_status(ip)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'ip': ip,
|
|
'status': status,
|
|
'reachable': reachable,
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
except Exception as e:
|
|
api_logger.error(f"API Fehler beim Status-Check für {ip}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@api_blueprint.route("/tapo/outlet/<ip>/control", methods=["POST"])
|
|
@login_required
|
|
@require_permission(Permission.CONTROL_PRINTER)
|
|
def control_outlet_api(ip):
|
|
"""Schaltet eine Steckdose ein oder aus."""
|
|
try:
|
|
# IP validieren
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Ungültige IP-Adresse'
|
|
}), 400
|
|
|
|
# Aktion aus Request Body
|
|
data = request.get_json()
|
|
if not data or 'action' not in data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Aktion erforderlich (on/off)'
|
|
}), 400
|
|
|
|
action = data['action']
|
|
if action not in ['on', 'off']:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Ungültige Aktion. Nur "on" oder "off" erlaubt.'
|
|
}), 400
|
|
|
|
# Steckdose schalten
|
|
state = action == 'on'
|
|
success = tapo_controller.toggle_plug(ip, state)
|
|
|
|
if success:
|
|
action_text = "eingeschaltet" if state else "ausgeschaltet"
|
|
api_logger.info(f"✅ API: Steckdose {ip} {action_text} durch {current_user.name}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'ip': ip,
|
|
'action': action,
|
|
'message': f'Steckdose {action_text}',
|
|
'timestamp': time.time()
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Fehler beim Schalten der Steckdose'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
api_logger.error(f"API Fehler beim Schalten von {ip}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@api_blueprint.route("/tapo/status/all", methods=["GET"])
|
|
@login_required
|
|
@require_permission(Permission.CONTROL_PRINTER)
|
|
def get_all_status_api():
|
|
"""Holt den Status aller Steckdosen."""
|
|
try:
|
|
db_session = get_db_session()
|
|
printers_with_tapo = db_session.query(Printer).filter(
|
|
Printer.active == True,
|
|
Printer.plug_ip.isnot(None)
|
|
).all()
|
|
|
|
outlets = []
|
|
online_count = 0
|
|
|
|
for printer in printers_with_tapo:
|
|
try:
|
|
reachable, status = tapo_controller.check_outlet_status(
|
|
printer.plug_ip,
|
|
printer_id=printer.id
|
|
)
|
|
|
|
if reachable:
|
|
online_count += 1
|
|
|
|
outlets.append({
|
|
'id': printer.id,
|
|
'name': printer.name,
|
|
'ip': printer.plug_ip,
|
|
'location': printer.location or "Unbekannt",
|
|
'status': status,
|
|
'reachable': reachable
|
|
})
|
|
|
|
except Exception as e:
|
|
api_logger.warning(f"API Status-Check für {printer.plug_ip} fehlgeschlagen: {e}")
|
|
outlets.append({
|
|
'id': printer.id,
|
|
'name': printer.name,
|
|
'ip': printer.plug_ip,
|
|
'location': printer.location or "Unbekannt",
|
|
'status': 'error',
|
|
'reachable': False,
|
|
'error': str(e)
|
|
})
|
|
|
|
db_session.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'outlets': outlets,
|
|
'summary': {
|
|
'total': len(outlets),
|
|
'online': online_count,
|
|
'offline': len(outlets) - online_count
|
|
},
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
except Exception as e:
|
|
api_logger.error(f"API Fehler beim Abrufen aller Status: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500
|
|
|
|
@api_blueprint.route("/health", methods=["GET"])
|
|
def health_check():
|
|
"""API Gesundheitscheck."""
|
|
return jsonify({
|
|
'success': True,
|
|
'service': 'MYP Platform Tapo API',
|
|
'version': '1.0',
|
|
'timestamp': time.time()
|
|
}) |