1244 lines
47 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import locale
# Setze Standardkodierung auf UTF-8
if sys.platform.startswith('win'):
# Setze für Windows die richtige Codepage
try:
# Versuche zuerst, die Codepage direkt zu setzen
import os
os.system('chcp 65001 > nul')
except:
# Fallback-Methode, falls der direkte Aufruf fehlschlägt
try:
import subprocess
subprocess.run(["cmd.exe", "/c", "chcp", "65001"],
capture_output=True,
check=False)
except:
print("Konnte Codepage nicht auf UTF-8 setzen. Eventuell werden Unicode-Zeichen nicht korrekt angezeigt.")
# Setze das Locale auf die passende Einstellung
try:
locale.setlocale(locale.LC_ALL, 'C.UTF-8')
except:
try:
locale.setlocale(locale.LC_ALL, '') # Systemstandard
except:
print("Konnte Locale nicht setzen.")
else:
# Für Unix-Systeme
try:
locale.setlocale(locale.LC_ALL, 'C.UTF-8')
except:
try:
locale.setlocale(locale.LC_ALL, '') # Systemstandard
except:
print("Konnte Locale nicht setzen.")
from flask import Flask, render_template, jsonify, request, redirect, url_for
import socket
import platform
import os
import subprocess
import json
import datetime
import psutil
import netifaces
import requests
import re
import time
import logging
import matplotlib
matplotlib.use('Agg') # Verwende Agg-Backend für Server ohne Display
import matplotlib.pyplot as plt
import io
import base64
from flask_cors import CORS
import pandas as pd
import threading
import docker
import tempfile
from werkzeug.utils import secure_filename
import shutil
app = Flask(__name__)
CORS(app)
DEBUG_PORT = 5555 # Port für den Debug-Server
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join('..', '..', 'logs', 'debug-server.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger('debug-server')
# Konfigurationsdatei
CONFIG_FILE = '../instance/network_config.json'
DEFAULT_CONFIG = {
'backend_hostname': '192.168.0.105',
'backend_port': 5000,
'frontend_hostname': '192.168.0.106',
'frontend_port': 3000
}
# Cache für Docker-Container-IDs und Namen
CONTAINER_CACHE = {}
# Port-Scanner-Thread-Flag
PORT_SCAN_RUNNING = False
# Docker-Client initialisieren
try:
docker_client = docker.from_env()
DOCKER_AVAILABLE = True
except:
DOCKER_AVAILABLE = False
logger.warning("Docker ist nicht verfügbar. Docker-bezogene Funktionen werden deaktiviert.")
def get_config():
"""Lädt die Netzwerkkonfiguration."""
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Fehler beim Laden der Konfiguration: {e}")
return DEFAULT_CONFIG.copy()
def save_config(config):
"""Speichert die Netzwerkkonfiguration."""
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
try:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der Konfiguration: {e}")
return False
@app.route('/')
def index():
"""Zeigt die Debug-Oberfläche an."""
return redirect(url_for('dashboard'))
@app.route('/dashboard')
def dashboard():
"""Zeigt das neue Debug-Dashboard an."""
interfaces = get_network_interfaces()
config = get_config()
# Prüfe Verbindungen
backend_status = "Nicht überprüft"
frontend_status = "Nicht überprüft"
try:
backend_url = f"http://{config['backend_hostname']}:{config['backend_port']}/api/test"
response = requests.get(backend_url, timeout=3)
if response.status_code == 200:
backend_status = "Verbunden"
else:
backend_status = f"Fehler: HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
backend_status = f"Nicht erreichbar"
if ping_host(config['backend_hostname']):
backend_status += " (Host antwortet auf Ping)"
try:
frontend_url = f"http://{config['frontend_hostname']}:{config['frontend_port']}"
response = requests.get(frontend_url, timeout=3)
if response.status_code == 200:
frontend_status = "Verbunden"
else:
frontend_status = f"Fehler: HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
frontend_status = f"Nicht erreichbar"
if ping_host(config['frontend_hostname']):
frontend_status += " (Host antwortet auf Ping)"
return render_template('dashboard.html',
interfaces=interfaces,
config=config,
backend_status=backend_status,
frontend_status=frontend_status,
last_check=datetime.datetime.now().strftime("%d.%m.%Y %H:%M:%S"))
@app.route('/debug')
def debug():
"""Zeigt die alte Debug-Oberfläche an."""
interfaces = get_network_interfaces()
config = get_config()
# Prüfe Verbindungen
backend_status = "Nicht überprüft"
frontend_status = "Nicht überprüft"
try:
backend_url = f"http://{config['backend_hostname']}:{config['backend_port']}/api/test"
response = requests.get(backend_url, timeout=3)
if response.status_code == 200:
backend_status = "Verbunden"
else:
backend_status = f"Fehler: HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
backend_status = f"Nicht erreichbar"
if ping_host(config['backend_hostname']):
backend_status += " (Host antwortet auf Ping)"
try:
frontend_url = f"http://{config['frontend_hostname']}:{config['frontend_port']}"
response = requests.get(frontend_url, timeout=3)
if response.status_code == 200:
frontend_status = "Verbunden"
else:
frontend_status = f"Fehler: HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
frontend_status = f"Nicht erreichbar"
if ping_host(config['frontend_hostname']):
frontend_status += " (Host antwortet auf Ping)"
return render_template('debug.html',
interfaces=interfaces,
config=config,
backend_status=backend_status,
frontend_status=frontend_status,
last_check=datetime.datetime.now().strftime("%d.%m.%Y %H:%M:%S"))
# API-Endpunkte für das Dashboard
@app.route('/api/system/metrics')
def system_metrics():
"""Liefert aktuelle Systemmetriken."""
try:
# CPU-Auslastung
cpu_percent = psutil.cpu_percent(interval=0.5)
# Speicherauslastung
memory = psutil.virtual_memory()
memory_info = {
'total': memory.total,
'available': memory.available,
'used': memory.used,
'percent': memory.percent
}
# Festplattennutzung
disk_usage = []
for partition in psutil.disk_partitions():
if os.name == 'nt' and ('cdrom' in partition.opts or partition.fstype == ''):
# Unter Windows: CD-ROMs und spezielle Laufwerke überspringen
continue
usage = psutil.disk_usage(partition.mountpoint)
disk_usage.append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent
})
# Netzwerk-I/O
net_io = psutil.net_io_counters()
network_io = {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv,
'errin': net_io.errin,
'errout': net_io.errout,
'dropin': net_io.dropin,
'dropout': net_io.dropout
}
return jsonify({
'success': True,
'timestamp': datetime.datetime.now().isoformat(),
'cpu_percent': cpu_percent,
'memory': memory_info,
'disk_usage': disk_usage,
'network_io': network_io
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der Systemmetriken: {str(e)}")
return jsonify({
'success': False,
'message': f"Fehler beim Abrufen der Systemmetriken: {str(e)}"
}), 500
@app.route('/api/docker/status')
def docker_status():
"""Liefert Statusinformationen zu Docker-Containern."""
try:
containers = []
docker_running = False
try:
# Prüfe, ob Docker läuft
result = subprocess.run(['docker', 'info'], capture_output=True, text=True, check=False)
docker_running = result.returncode == 0
if docker_running:
# Liste alle Container auf
result = subprocess.run(['docker', 'ps', '-a', '--format', '{{.ID}}|{{.Names}}|{{.Status}}|{{.Image}}'], capture_output=True, text=True, check=True)
lines = result.stdout.strip().split('\n')
for line in lines:
if not line:
continue
parts = line.split('|')
if len(parts) < 4:
continue
container_id, name, status, image = parts
is_running = status.startswith('Up')
# CPU- und Speichernutzung nur für laufende Container abrufen
cpu_percent = None
memory_usage = None
network_io = None
if is_running:
try:
# CPU-Nutzung
cpu_result = subprocess.run(['docker', 'stats', container_id, '--no-stream', '--format', '{{.CPUPerc}}'], capture_output=True, text=True, check=True)
cpu_text = cpu_result.stdout.strip()
cpu_match = re.search(r'([\d.]+)%', cpu_text)
if cpu_match:
cpu_percent = float(cpu_match.group(1))
# Speichernutzung
mem_result = subprocess.run(['docker', 'stats', container_id, '--no-stream', '--format', '{{.MemUsage}}'], capture_output=True, text=True, check=True)
mem_text = mem_result.stdout.strip()
mem_match = re.search(r'([\d.]+)([KMGiB]+)', mem_text)
if mem_match:
value = float(mem_match.group(1))
unit = mem_match.group(2)
# Konvertiere in Bytes
if unit.startswith('K'):
memory_usage = value * 1024
elif unit.startswith('M'):
memory_usage = value * 1024 * 1024
elif unit.startswith('G'):
memory_usage = value * 1024 * 1024 * 1024
else:
memory_usage = value
# Netzwerknutzung
net_result = subprocess.run(['docker', 'stats', container_id, '--no-stream', '--format', '{{.NetIO}}'], capture_output=True, text=True, check=True)
net_text = net_result.stdout.strip()
net_parts = net_text.split(' / ')
if len(net_parts) == 2:
rx_text, tx_text = net_parts
# Empfangen (RX)
rx_match = re.search(r'([\d.]+)([KMGiB]+)', rx_text)
if rx_match:
rx_value = float(rx_match.group(1))
rx_unit = rx_match.group(2)
# Konvertiere in Bytes
rx_bytes = rx_value
if rx_unit.startswith('K'):
rx_bytes = rx_value * 1024
elif rx_unit.startswith('M'):
rx_bytes = rx_value * 1024 * 1024
elif rx_unit.startswith('G'):
rx_bytes = rx_value * 1024 * 1024 * 1024
# Gesendet (TX)
tx_match = re.search(r'([\d.]+)([KMGiB]+)', tx_text)
if tx_match:
tx_value = float(tx_match.group(1))
tx_unit = tx_match.group(2)
# Konvertiere in Bytes
tx_bytes = tx_value
if tx_unit.startswith('K'):
tx_bytes = tx_value * 1024
elif tx_unit.startswith('M'):
tx_bytes = tx_value * 1024 * 1024
elif tx_unit.startswith('G'):
tx_bytes = tx_value * 1024 * 1024 * 1024
network_io = {
'rx_bytes': rx_bytes,
'tx_bytes': tx_bytes
}
except Exception as e:
logger.error(f"Fehler beim Abrufen der Container-Metriken für {name}: {str(e)}")
# In Cache speichern für spätere Verwendung
CONTAINER_CACHE[container_id] = name
containers.append({
'id': container_id,
'name': name,
'status': status,
'image': image,
'running': is_running,
'cpu_percent': cpu_percent,
'memory_usage': memory_usage,
'network_io': network_io
})
except Exception as docker_error:
logger.error(f"Docker-Fehler: {str(docker_error)}")
return jsonify({
'success': True,
'docker_running': docker_running,
'containers': containers
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der Docker-Container-Informationen: {str(e)}")
return jsonify({
'success': False,
'message': f"Fehler beim Abrufen der Docker-Container-Informationen: {str(e)}"
}), 500
@app.route('/api/docker/info')
def docker_info():
"""Gibt detaillierte Docker-Informationen zurück."""
try:
info = {
'version': 'Unbekannt',
'api_version': 'Unbekannt',
'os': 'Unbekannt',
'status': 'Nicht verfügbar'
}
try:
result = subprocess.run(['docker', 'info', '--format', '{{json .}}'], capture_output=True, text=True, check=False)
if result.returncode == 0:
json_data = json.loads(result.stdout)
info['version'] = json_data.get('ServerVersion', 'Unbekannt')
info['api_version'] = json_data.get('ApiVersion', 'Unbekannt')
info['os'] = json_data.get('OperatingSystem', 'Unbekannt')
info['status'] = 'Läuft'
else:
info['status'] = 'Gestoppt oder nicht installiert'
except Exception as docker_error:
logger.error(f"Docker-Info-Fehler: {str(docker_error)}")
info['status'] = 'Fehler: ' + str(docker_error)
return jsonify({
'success': True,
'info': info
})
except Exception as e:
logger.error(f"Fehler beim Abrufen der Docker-Informationen: {str(e)}")
return jsonify({
'success': False,
'message': f"Fehler beim Abrufen der Docker-Informationen: {str(e)}"
}), 500
@app.route('/api/docker/restart/<container_id>', methods=['POST'])
def restart_container(container_id):
"""Startet einen Docker-Container neu."""
try:
result = subprocess.run(['docker', 'restart', container_id], capture_output=True, text=True, check=False)
if result.returncode == 0:
return jsonify({
'success': True,
'message': f"Container wurde erfolgreich neugestartet."
})
else:
error_message = result.stderr.strip()
logger.error(f"Fehler beim Neustarten des Containers: {error_message}")
return jsonify({
'success': False,
'message': f"Fehler beim Neustarten des Containers: {error_message}"
}), 500
except Exception as e:
logger.error(f"Fehler beim Neustarten des Containers: {str(e)}")
return jsonify({
'success': False,
'message': f"Fehler beim Neustarten des Containers: {str(e)}"
}), 500
@app.route('/api/docker/logs/<container_id>')
def container_logs(container_id):
"""Gibt die Logs eines Docker-Containers zurück."""
try:
# Benutze den Container-Namen aus dem Cache, wenn verfügbar
container_name = CONTAINER_CACHE.get(container_id, container_id)
# Limitiere die Anzahl der zurückgegebenen Zeilen
tail_option = request.args.get('tail', '100')
result = subprocess.run(['docker', 'logs', '--tail', tail_option, container_id], capture_output=True, text=True, check=False)
if result.returncode == 0:
return jsonify({
'success': True,
'container_id': container_id,
'container_name': container_name,
'logs': result.stdout
})
else:
error_message = result.stderr.strip()
logger.error(f"Fehler beim Abrufen der Container-Logs: {error_message}")
return jsonify({
'success': False,
'message': f"Fehler beim Abrufen der Container-Logs: {error_message}"
}), 500
except Exception as e:
logger.error(f"Fehler beim Abrufen der Container-Logs: {str(e)}")
return jsonify({
'success': False,
'message': f"Fehler beim Abrufen der Container-Logs: {str(e)}"
}), 500
@app.route('/api/logs/analyze')
def analyze_logs():
"""Analysiert Logdateien und gibt Fehler zurück."""
try:
log_type = request.args.get('type', 'backend')
filter_text = request.args.get('filter', '')
log_file = None
# Bestimme den Pfad zur Logdatei
if log_type == 'backend':
log_file = os.path.join('..', '..', 'logs', 'backend.log')
elif log_type == 'frontend':
log_file = os.path.join('..', '..', 'logs', 'frontend.log')
elif log_type == 'docker':
# Docker-Logs dynamisch abrufen
result = subprocess.run(['docker', 'logs', 'myp-backend'], capture_output=True, text=True, check=False)
if result.returncode == 0:
docker_logs = result.stdout
entries = parse_docker_logs(docker_logs, filter_text)
return jsonify({
'success': True,
'entries': entries
})
else:
return jsonify({
'success': False,
'message': f"Konnte Docker-Logs nicht abrufen: {result.stderr}"
})
else:
return jsonify({
'success': False,
'message': f"Unbekannter Log-Typ: {log_type}"
})
# Logdatei lesen und analysieren
if log_file and os.path.exists(log_file):
entries = []
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
log_content = f.readlines()
# Logs parsen und nach Fehlern filtern
error_pattern = re.compile(r'(ERROR|CRITICAL|FEHLER|Exception|Error|Fehler)', re.IGNORECASE)
for line in log_content:
if error_pattern.search(line) and (not filter_text or filter_text.lower() in line.lower()):
# Versuche, Zeitstempel, Meldungstyp und Meldung zu extrahieren
timestamp_match = re.search(r'(\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2})', line)
timestamp = timestamp_match.group(1) if timestamp_match else 'Unbekannt'
# Entferne Zeitstempel und andere Metadaten
message = re.sub(r'^\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}.*?\s-\s', '', line).strip()
entries.append({
'timestamp': timestamp,
'message': message,
'raw': line.strip()
})
return jsonify({
'success': True,
'entries': entries
})
else:
return jsonify({
'success': False,
'message': f"Logdatei nicht gefunden: {log_file}"
})
except Exception as e:
logger.error(f"Fehler bei der Log-Analyse: {str(e)}")
return jsonify({
'success': False,
'message': f"Fehler bei der Log-Analyse: {str(e)}"
}), 500
@app.route('/save-config', methods=['POST'])
def save_config_route():
"""Speichert die Netzwerkkonfiguration."""
try:
config = get_config()
# Aktualisiere Konfiguration
config['backend_hostname'] = request.form.get('backend_hostname', DEFAULT_CONFIG['backend_hostname'])
config['backend_port'] = int(request.form.get('backend_port', DEFAULT_CONFIG['backend_port']))
config['frontend_hostname'] = request.form.get('frontend_hostname', DEFAULT_CONFIG['frontend_hostname'])
config['frontend_port'] = int(request.form.get('frontend_port', DEFAULT_CONFIG['frontend_port']))
# Speichere Konfiguration
if save_config(config):
return jsonify({"success": True, "message": "Konfiguration erfolgreich gespeichert."})
else:
return jsonify({"success": False, "message": "Fehler beim Speichern der Konfiguration."})
except Exception as e:
return jsonify({"success": False, "message": f"Fehler: {str(e)}"})
@app.route('/sync-frontend', methods=['POST'])
def sync_frontend():
"""Synchronisiert die Frontend-Konfiguration mit der gespeicherten Backend-URL."""
try:
config = get_config()
backend_url = f"http://{config['backend_hostname']}:{config['backend_port']}"
# Erstelle notwendige Verzeichnisse
frontend_dir = "../../frontend"
env_file = os.path.join(frontend_dir, ".env.local")
if not os.path.exists(frontend_dir):
return jsonify({"success": False, "message": "Frontend-Verzeichnis nicht gefunden."})
# Bestimme den Hostnamen für OAuth
hostname = socket.gethostname()
if "corpintra" in hostname:
frontend_hostname = "m040tbaraspi001.de040.corpintra.net"
oauth_url = "http://m040tbaraspi001.de040.corpintra.net/auth/login/callback"
else:
frontend_hostname = hostname
oauth_url = f"http://{hostname}:3000/auth/login/callback"
# Erstelle .env.local-Datei
env_content = f"""# Backend API Konfiguration
NEXT_PUBLIC_API_URL={backend_url}
# Frontend-URL für OAuth Callback
NEXT_PUBLIC_FRONTEND_URL=http://{frontend_hostname}
# Explizite OAuth Callback URL für GitHub
NEXT_PUBLIC_OAUTH_CALLBACK_URL={oauth_url}
# OAuth Konfiguration
OAUTH_CLIENT_ID=client_id
OAUTH_CLIENT_SECRET=client_secret
"""
try:
with open(env_file, 'w') as f:
f.write(env_content)
os.chmod(env_file, 0o600)
return jsonify({"success": True, "message": "Frontend erfolgreich mit Backend-URL konfiguriert."})
except Exception as e:
return jsonify({"success": False, "message": f"Fehler beim Schreiben der Frontend-Konfiguration: {str(e)}"})
except Exception as e:
return jsonify({"success": False, "message": f"Fehler: {str(e)}"})
@app.route('/test-connection', methods=['POST'])
def test_connection_route():
"""Testet die Verbindung zum Backend und Frontend."""
try:
backend_hostname = request.form.get('backend_hostname')
backend_port = int(request.form.get('backend_port'))
frontend_hostname = request.form.get('frontend_hostname')
frontend_port = int(request.form.get('frontend_port'))
results = {
'backend': {
'ping': ping_host(backend_hostname),
'connection': check_connection(backend_hostname, backend_port)
},
'frontend': {
'ping': ping_host(frontend_hostname),
'connection': check_connection(frontend_hostname, frontend_port)
}
}
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"success": False, "message": f"Fehler: {str(e)}"})
@app.route('/systeminfo')
def systeminfo():
"""Gibt Systeminformationen zurück."""
info = {
"hostname": socket.gethostname(),
"platform": platform.platform(),
"architecture": platform.machine(),
"processor": platform.processor(),
"python_version": platform.python_version(),
"uptime": get_uptime(),
"memory": get_memory_info(),
"disk": get_disk_info(),
}
return jsonify(info)
@app.route('/network')
def network():
"""Gibt Netzwerkinformationen zurück."""
info = {
"interfaces": get_network_interfaces(),
"connections": get_active_connections(),
"dns_servers": get_dns_servers(),
"gateway": get_default_gateway(),
}
return jsonify(info)
@app.route('/docker')
def docker():
"""Gibt Docker-Informationen zurück."""
return jsonify(get_docker_info())
@app.route('/ping/<host>')
def ping_host(host):
"""Pingt einen Host an."""
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', host]
try:
# Verwende errors='replace' um Dekodierungsfehler zu behandeln
result = subprocess.run(command,
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=5)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
except Exception as e:
print(f"Fehler beim Ping von {host}: {e}")
return False
@app.route('/traceroute/<host>')
def traceroute_host(host):
"""Führt einen Traceroute zum Zielhost durch und gibt das Ergebnis zurück."""
if is_valid_hostname(host):
try:
# Verwende errors='replace' um Dekodierungsfehler zu behandeln
result = subprocess.run(['tracert', host],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=30)
return jsonify({
"success": True,
"output": result.stdout,
"error": result.stderr,
"return_code": result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({"success": False, "error": "Zeitüberschreitung beim Traceroute-Befehl"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
else:
return jsonify({"success": False, "error": "Ungültiger Hostname oder IP-Adresse"})
@app.route('/nslookup/<host>')
def nslookup_host(host):
"""Führt eine DNS-Abfrage für den angegebenen Host durch."""
if is_valid_hostname(host):
try:
# Verwende errors='replace' um Dekodierungsfehler zu behandeln
result = subprocess.run(['nslookup', host],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=10)
return jsonify({
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr,
"return_code": result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({"success": False, "error": "Zeitüberschreitung beim NSLookup-Befehl"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
else:
return jsonify({"success": False, "error": "Ungültiger Hostname oder IP-Adresse"})
@app.route('/backend-status')
def backend_status():
"""Überprüft den Status des Haupt-Backend-Servers."""
try:
# Benutze den Localhost und den Port des Haupt-Backends (Standard: 5000)
backend_port = 5000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
result = s.connect_ex(('localhost', backend_port))
s.close()
return jsonify({
"status": "online" if result == 0 else "offline",
"port": backend_port,
"error_code": result
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
})
@app.route('/api/docker/inspect/<container_id>')
def inspect_container(container_id):
"""Gibt detaillierte Informationen zu einem Docker-Container zurück."""
if not DOCKER_AVAILABLE:
return jsonify({'success': False, 'error': 'Docker ist nicht verfügbar'})
try:
# Container-Details abrufen
container = docker_client.containers.get(container_id)
inspect_data = container.attrs
# Vereinfachte Übersicht zusammenstellen
simplified_data = {
'id': inspect_data['Id'],
'name': inspect_data['Name'].lstrip('/'),
'image': inspect_data['Config']['Image'],
'created': inspect_data['Created'],
'state': inspect_data['State'],
'ports': inspect_data['NetworkSettings']['Ports'],
'volumes': inspect_data['Mounts'],
'networks': inspect_data['NetworkSettings']['Networks'],
'environment': inspect_data['Config']['Env'],
'labels': inspect_data['Config']['Labels'],
'cmd': inspect_data['Config']['Cmd'],
'entrypoint': inspect_data['Config']['Entrypoint'],
'host_config': {
'restart_policy': inspect_data['HostConfig']['RestartPolicy'],
'binds': inspect_data['HostConfig']['Binds'],
'port_bindings': inspect_data['HostConfig']['PortBindings'],
}
}
return jsonify({'success': True, 'data': simplified_data})
except docker.errors.NotFound:
return jsonify({'success': False, 'error': f'Container mit ID {container_id} nicht gefunden'})
except Exception as e:
logger.error(f"Fehler bei Container-Inspektion: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/docker/logs/download/<container_id>')
def download_container_logs(container_id):
"""Lädt die Logs eines Docker-Containers als Datei herunter."""
if not DOCKER_AVAILABLE:
return jsonify({'success': False, 'error': 'Docker ist nicht verfügbar'})
try:
# Container-Details abrufen
container = docker_client.containers.get(container_id)
logs = container.logs(timestamps=True).decode('utf-8', errors='replace')
# Temporäre Datei erstellen
temp_dir = tempfile.mkdtemp()
container_name = container.name.replace('/', '_')
file_name = f"{container_name}_logs_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
file_path = os.path.join(temp_dir, secure_filename(file_name))
# Logs in Datei schreiben
with open(file_path, 'w', encoding='utf-8') as f:
f.write(logs)
# Datei zum Download senden
response = send_file(file_path, as_attachment=True, download_name=file_name)
# Aufräumen nach Response
@response.call_on_close
def cleanup():
shutil.rmtree(temp_dir)
return response
except docker.errors.NotFound:
return jsonify({'success': False, 'error': f'Container mit ID {container_id} nicht gefunden'})
except Exception as e:
logger.error(f"Fehler beim Herunterladen der Container-Logs: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/network/scan-ports', methods=['POST'])
def scan_ports():
"""Führt einen Port-Scan auf dem angegebenen Host durch."""
global PORT_SCAN_RUNNING
if PORT_SCAN_RUNNING:
return jsonify({'success': False, 'error': 'Ein Port-Scan läuft bereits'})
data = request.json
target_host = data.get('host')
port_range = data.get('port_range', '1-1024')
if not target_host:
return jsonify({'success': False, 'error': 'Kein Ziel-Host angegeben'})
# Überprüfe, ob der Hostname/IP gültig ist
if not is_valid_hostname(target_host):
return jsonify({'success': False, 'error': 'Ungültiger Hostname oder IP-Adresse'})
# Parsen des Port-Bereichs
try:
if '-' in port_range:
start_port, end_port = map(int, port_range.split('-'))
else:
start_port = end_port = int(port_range)
if start_port < 1 or end_port > 65535 or start_port > end_port:
raise ValueError("Ungültiger Port-Bereich")
except Exception:
return jsonify({'success': False, 'error': 'Ungültiger Port-Bereich (Format: 1-1024)'})
# Port-Scan im Hintergrund starten
thread = threading.Thread(target=port_scan_thread, args=(target_host, start_port, end_port))
thread.daemon = True
thread.start()
PORT_SCAN_RUNNING = True
return jsonify({'success': True, 'message': f'Port-Scan auf {target_host} für Ports {start_port}-{end_port} gestartet'})
@app.route('/api/network/scan-status')
def scan_status():
"""Gibt den Status des laufenden Port-Scans zurück."""
global PORT_SCAN_RUNNING
global PORT_SCAN_RESULTS
if PORT_SCAN_RUNNING:
return jsonify({'success': True, 'status': 'running', 'message': 'Port-Scan läuft'})
else:
# Wenn kein Scan läuft, gib die Ergebnisse des letzten Scans zurück (falls vorhanden)
if 'PORT_SCAN_RESULTS' in globals():
return jsonify({'success': True, 'status': 'completed', 'results': PORT_SCAN_RESULTS})
else:
return jsonify({'success': True, 'status': 'idle', 'message': 'Kein Port-Scan aktiv'})
def port_scan_thread(host, start_port, end_port):
"""Führt einen Port-Scan im Hintergrund durch."""
global PORT_SCAN_RUNNING
global PORT_SCAN_RESULTS
try:
results = []
service_dict = get_common_services()
# Timeout für Socket-Verbindungen (in Sekunden)
timeout = 0.5
# Port-Scan durchführen
for port in range(start_port, end_port + 1):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
service = service_dict.get(port, "Unbekannter Dienst")
results.append({
'port': port,
'status': 'open',
'service': service
})
sock.close()
# Kurze Pause, um die CPU-Last zu reduzieren
time.sleep(0.01)
# Ergebnisse speichern
PORT_SCAN_RESULTS = {
'host': host,
'start_port': start_port,
'end_port': end_port,
'timestamp': datetime.datetime.now().isoformat(),
'open_ports': results
}
except Exception as e:
logger.error(f"Fehler bei Port-Scan: {e}")
PORT_SCAN_RESULTS = {
'host': host,
'error': str(e),
'timestamp': datetime.datetime.now().isoformat()
}
finally:
PORT_SCAN_RUNNING = False
def get_common_services():
"""Gibt ein Wörterbuch mit gängigen Port-Dienst-Zuordnungen zurück."""
return {
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
80: "HTTP",
110: "POP3",
143: "IMAP",
443: "HTTPS",
465: "SMTP (SSL)",
587: "SMTP (TLS)",
993: "IMAP (SSL)",
995: "POP3 (SSL)",
1433: "MS SQL Server",
3306: "MySQL",
3389: "RDP",
5000: "Flask/Python Web",
5432: "PostgreSQL",
6379: "Redis",
8080: "HTTP-Alternativ",
8443: "HTTPS-Alternativ",
9000: "PHP-FPM",
9090: "Prometheus",
9200: "Elasticsearch",
27017: "MongoDB"
}
@app.route('/api/network/interfaces')
def get_interfaces():
"""Gibt detaillierte Informationen zu allen Netzwerkschnittstellen zurück."""
interfaces = []
for iface_name in netifaces.interfaces():
iface_info = {}
iface_info['name'] = iface_name
# IP-Adressen abrufen
addresses = netifaces.ifaddresses(iface_name)
# IPv4-Adressen
if netifaces.AF_INET in addresses:
iface_info['ipv4'] = addresses[netifaces.AF_INET]
else:
iface_info['ipv4'] = []
# IPv6-Adressen
if netifaces.AF_INET6 in addresses:
iface_info['ipv6'] = addresses[netifaces.AF_INET6]
else:
iface_info['ipv6'] = []
# MAC-Adresse
if netifaces.AF_LINK in addresses:
iface_info['mac'] = addresses[netifaces.AF_LINK][0].get('addr', 'Nicht verfügbar')
else:
iface_info['mac'] = 'Nicht verfügbar'
# Statistiken
try:
stats = psutil.net_io_counters(pernic=True).get(iface_name)
if stats:
iface_info['stats'] = {
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv,
'packets_sent': stats.packets_sent,
'packets_recv': stats.packets_recv,
'errin': stats.errin,
'errout': stats.errout,
'dropin': stats.dropin,
'dropout': stats.dropout
}
except:
iface_info['stats'] = 'Nicht verfügbar'
interfaces.append(iface_info)
return jsonify({'success': True, 'interfaces': interfaces})
@app.route('/api/network/active-connections')
def active_connections():
"""Gibt alle aktiven Netzwerkverbindungen zurück."""
try:
connections = []
for conn in psutil.net_connections(kind='inet'):
if conn.status == 'ESTABLISHED':
connection_info = {
'local_address': f"{conn.laddr.ip}:{conn.laddr.port}",
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else 'Keine',
'status': conn.status,
'pid': conn.pid,
'process': None
}
# Prozessinformationen hinzufügen, falls verfügbar
if conn.pid:
try:
process = psutil.Process(conn.pid)
connection_info['process'] = {
'name': process.name(),
'create_time': datetime.datetime.fromtimestamp(process.create_time()).strftime('%Y-%m-%d %H:%M:%S'),
'username': process.username()
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
connections.append(connection_info)
return jsonify({'success': True, 'connections': connections})
except Exception as e:
logger.error(f"Fehler beim Abrufen der aktiven Verbindungen: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/network/route-table')
def route_table():
"""Gibt die Routing-Tabelle zurück."""
try:
if sys.platform.startswith('win'):
output = subprocess.check_output(['route', 'print'], universal_newlines=True)
else:
output = subprocess.check_output(['netstat', '-rn'], universal_newlines=True)
return jsonify({'success': True, 'route_table': output})
except subprocess.SubprocessError as e:
logger.error(f"Fehler beim Abrufen der Routing-Tabelle: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/logs/tail/<log_type>')
def tail_logs(log_type):
"""Gibt die letzten Zeilen eines Logfiles zurück."""
log_files = {
'backend': os.path.join('..', '..', 'logs', 'backend.log'),
'frontend': os.path.join('..', '..', 'logs', 'frontend.log'),
'debug': os.path.join('..', '..', 'logs', 'debug-server.log')
}
if log_type not in log_files:
return jsonify({'success': False, 'error': f'Unbekannter Log-Typ: {log_type}'})
log_file = log_files[log_type]
lines = request.args.get('lines', 100, type=int)
if not os.path.exists(log_file):
return jsonify({'success': False, 'error': f'Log-Datei existiert nicht: {log_file}'})
try:
if sys.platform.startswith('win'):
# Windows-spezifischer Befehl
output = subprocess.check_output(
['powershell', '-Command', f'Get-Content -Path "{log_file}" -Tail {lines}'],
universal_newlines=True
)
else:
# Unix-Befehl
output = subprocess.check_output(
['tail', '-n', str(lines), log_file],
universal_newlines=True
)
log_entries = output.splitlines()
return jsonify({'success': True, 'entries': log_entries, 'count': len(log_entries)})
except subprocess.SubprocessError as e:
logger.error(f"Fehler beim Lesen der Log-Datei: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/healthcheck')
def healthcheck():
"""Gibt eine einfache Gesundheitsprüfung zurück."""
checks = {}
# Systemressourcen prüfen
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
checks['system'] = {
'cpu': {
'percent': cpu_percent,
'status': 'healthy' if cpu_percent < 90 else 'warning' if cpu_percent < 95 else 'critical'
},
'memory': {
'percent': memory.percent,
'status': 'healthy' if memory.percent < 90 else 'warning' if memory.percent < 95 else 'critical'
},
'disk': {
'percent': disk.percent,
'status': 'healthy' if disk.percent < 90 else 'warning' if disk.percent < 95 else 'critical'
}
}
# Docker-Status prüfen
if DOCKER_AVAILABLE:
try:
docker_info = docker_client.info()
docker_status = 'running'
docker_containers_running = docker_info.get('ContainersRunning', 0)
checks['docker'] = {
'status': docker_status,
'containers_running': docker_containers_running,
'overall': 'healthy'
}
except Exception:
checks['docker'] = {
'status': 'error',
'overall': 'critical'
}
else:
checks['docker'] = {
'status': 'not_available',
'overall': 'warning'
}
# Backend-Verbindung prüfen
config = get_config()
try:
backend_url = f"http://{config['backend_hostname']}:{config['backend_port']}/api/test"
response = requests.get(backend_url, timeout=2)
backend_status = {
'status': 'connected' if response.status_code == 200 else 'error',
'http_code': response.status_code,
'overall': 'healthy' if response.status_code == 200 else 'critical'
}
except requests.exceptions.RequestException:
backend_status = {
'status': 'unreachable',
'overall': 'critical'
}
checks['backend'] = backend_status
# Frontend-Verbindung prüfen
try:
frontend_url = f"http://{config['frontend_hostname']}:{config['frontend_port']}"
response = requests.get(frontend_url, timeout=2)
frontend_status = {
'status': 'connected' if response.status_code == 200 else 'error',
'http_code': response.status_code,
'overall': 'healthy' if response.status_code == 200 else 'critical'
}
except requests.exceptions.RequestException:
frontend_status = {
'status': 'unreachable',
'overall': 'critical'
}
checks['frontend'] = frontend_status
# Gesamtstatus bestimmen
overall_status = 'healthy'
for component, check in checks.items():
if check.get('overall') == 'critical':
overall_status = 'critical'
break
elif check.get('overall') == 'warning' and overall_status != 'critical':
overall_status = 'warning'
return jsonify({
'status': overall_status,
'timestamp': datetime.datetime.now().isoformat(),
'checks': checks
})
if __name__ == "__main__":
try:
os.makedirs("../../logs", exist_ok=True)
print(f"Debug-Server startet auf Port {DEBUG_PORT}")
app.run(host="0.0.0.0", port=DEBUG_PORT, debug=True)
except Exception as e:
print(f"Fehler beim Starten des Debug-Servers: {e}")
sys.exit(1)