""" Vereinheitlichter Admin-Blueprint für das MYP 3D-Druck-Management-System Konsolidierte Implementierung aller Admin-spezifischen Funktionen: - Benutzerverwaltung und Systemüberwachung (ursprünglich admin.py) - Erweiterte System-API-Funktionen (ursprünglich admin_api.py) - System-Backups, Datenbank-Optimierung, Cache-Verwaltung - Steckdosenschaltzeiten-Übersicht und -verwaltung Optimierungen: - Vereinheitlichter admin_required Decorator - Konsistente Fehlerbehandlung und Logging - Vollständige API-Kompatibilität zu beiden ursprünglichen Blueprints Autor: MYP Team - Konsolidiert für IHK-Projektarbeit Datum: 2025-06-09 """ import os import shutil import zipfile import sqlite3 import glob from datetime import datetime, timedelta from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, current_app from flask_login import login_required, current_user from functools import wraps from models import User, Printer, Job, get_cached_session, Stats, SystemLog, PlugStatusLog from utils.logging_config import get_logger # ===== BLUEPRINT-KONFIGURATION ===== # Haupt-Blueprint für Admin-UI (Templates) admin_blueprint = Blueprint('admin', __name__, url_prefix='/admin') # API-Blueprint für erweiterte System-Funktionen admin_api_blueprint = Blueprint('admin_api', __name__, url_prefix='/api/admin') # Logger für beide Funktionsbereiche admin_logger = get_logger("admin") admin_api_logger = get_logger("admin_api") # ===== EINHEITLICHER ADMIN-DECORATOR ===== def admin_required(f): """ Vereinheitlichter Decorator für Admin-Berechtigung. Kombiniert die beste Praxis aus beiden ursprünglichen Implementierungen: - Umfassende Logging-Funktionalität von admin.py - Robuste Authentifizierungsprüfung von admin_api.py """ @wraps(f) @login_required def decorated_function(*args, **kwargs): # Detaillierte Authentifizierungsprüfung is_authenticated = current_user.is_authenticated user_id = current_user.id if is_authenticated else 'Anonymous' # Doppelte Admin-Prüfung für maximale Sicherheit is_admin = False if is_authenticated: # Methode 1: Property-basierte Prüfung (admin.py-Stil) is_admin = hasattr(current_user, 'is_admin') and current_user.is_admin # Methode 2: Role-basierte Prüfung (admin_api.py-Stil) als Fallback if not is_admin and hasattr(current_user, 'role'): is_admin = current_user.role == 'admin' # Umfassendes Logging admin_logger.info( f"Admin-Check für Funktion {f.__name__}: " f"User authenticated: {is_authenticated}, " f"User ID: {user_id}, " f"Is Admin: {is_admin}" ) if not is_admin: admin_logger.warning( f"Admin-Zugriff verweigert für User {user_id} auf Funktion {f.__name__}" ) return jsonify({ "error": "Nur Administratoren haben Zugriff", "message": "Admin-Berechtigung erforderlich" }), 403 return f(*args, **kwargs) return decorated_function # ===== ADMIN-UI ROUTEN (ursprünglich admin.py) ===== @admin_blueprint.route("/") @admin_required def admin_dashboard(): """Admin-Dashboard-Hauptseite mit Systemstatistiken""" try: with get_cached_session() as db_session: # Grundlegende Statistiken sammeln total_users = db_session.query(User).count() total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs } admin_logger.info(f"Admin-Dashboard geladen von {current_user.username}") return render_template('admin.html', stats=stats, active_tab=None) except Exception as e: admin_logger.error(f"Fehler beim Laden des Admin-Dashboards: {str(e)}") flash("Fehler beim Laden der Dashboard-Daten", "error") return render_template('admin.html', stats={}, active_tab=None) @admin_blueprint.route("/plug-schedules") @admin_required def admin_plug_schedules(): """ Administrator-Übersicht für Steckdosenschaltzeiten. Zeigt detaillierte Historie aller Smart Plug Schaltzeiten mit Kalenderansicht. """ admin_logger.info(f"Admin {current_user.username} (ID: {current_user.id}) öffnet Steckdosenschaltzeiten") try: # Statistiken für die letzten 24 Stunden abrufen stats_24h = PlugStatusLog.get_status_statistics(hours=24) # Alle Drucker für Filter-Dropdown with get_cached_session() as db_session: # Alle Drucker für Auswahlfelder anzeigen (unabhängig von active-Status) printers = db_session.query(Printer).all() return render_template('admin_plug_schedules.html', stats=stats_24h, printers=printers, page_title="Steckdosenschaltzeiten", breadcrumb=[ {"name": "Admin-Dashboard", "url": url_for("admin.admin_dashboard")}, {"name": "Steckdosenschaltzeiten", "url": "#"} ]) except Exception as e: admin_logger.error(f"Fehler beim Laden der Steckdosenschaltzeiten-Seite: {str(e)}") flash("Fehler beim Laden der Steckdosenschaltzeiten-Daten.", "error") return redirect(url_for("admin.admin_dashboard")) @admin_blueprint.route("/users") @admin_required def users_overview(): """Benutzerübersicht für Administratoren""" try: with get_cached_session() as db_session: # Alle Benutzer laden users = db_session.query(User).order_by(User.created_at.desc()).all() # Grundlegende Statistiken sammeln total_users = len(users) total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs } admin_logger.info(f"Benutzerübersicht geladen von {current_user.username}") return render_template('admin.html', stats=stats, users=users, active_tab='users') except Exception as e: admin_logger.error(f"Fehler beim Laden der Benutzerübersicht: {str(e)}") flash("Fehler beim Laden der Benutzerdaten", "error") return render_template('admin.html', stats={}, users=[], active_tab='users') @admin_blueprint.route("/users/add", methods=["GET"]) @admin_required def add_user_page(): """Seite zum Hinzufügen eines neuen Benutzers""" return render_template('admin_add_user.html') @admin_blueprint.route("/users//edit", methods=["GET"]) @admin_required def edit_user_page(user_id): """Seite zum Bearbeiten eines Benutzers""" try: with get_cached_session() as db_session: user = db_session.query(User).filter(User.id == user_id).first() if not user: flash("Benutzer nicht gefunden", "error") return redirect(url_for('admin.users_overview')) return render_template('admin_edit_user.html', user=user) except Exception as e: admin_logger.error(f"Fehler beim Laden der Benutzer-Bearbeitung: {str(e)}") flash("Fehler beim Laden der Benutzerdaten", "error") return redirect(url_for('admin.users_overview')) @admin_blueprint.route("/printers") @admin_required def printers_overview(): """Druckerübersicht für Administratoren""" try: with get_cached_session() as db_session: # Alle Drucker laden printers = db_session.query(Printer).order_by(Printer.created_at.desc()).all() # Grundlegende Statistiken sammeln total_users = db_session.query(User).count() total_printers = len(printers) total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() # Online-Drucker zählen (vereinfacht, da wir keinen Live-Status haben) online_printers = len([p for p in printers if p.status == 'online']) stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs, 'online_printers': online_printers } admin_logger.info(f"Druckerübersicht geladen von {current_user.username}") return render_template('admin.html', stats=stats, printers=printers, active_tab='printers') except Exception as e: admin_logger.error(f"Fehler beim Laden der Druckerübersicht: {str(e)}") flash("Fehler beim Laden der Druckerdaten", "error") return render_template('admin.html', stats={}, printers=[], active_tab='printers') @admin_blueprint.route("/printers/add", methods=["GET"]) @admin_required def add_printer_page(): """Seite zum Hinzufügen eines neuen Druckers""" return render_template('admin_add_printer.html') @admin_blueprint.route("/printers//edit", methods=["GET"]) @admin_required def edit_printer_page(printer_id): """Seite zum Bearbeiten eines Druckers""" try: with get_cached_session() as db_session: printer = db_session.query(Printer).filter(Printer.id == printer_id).first() if not printer: flash("Drucker nicht gefunden", "error") return redirect(url_for('admin.printers_overview')) return render_template('admin_edit_printer.html', printer=printer) except Exception as e: admin_logger.error(f"Fehler beim Laden der Drucker-Bearbeitung: {str(e)}") flash("Fehler beim Laden der Druckerdaten", "error") return redirect(url_for('admin.printers_overview')) @admin_blueprint.route("/guest-requests") @admin_required def guest_requests(): """Gäste-Anfragen-Übersicht""" return render_template('admin_guest_requests.html') @admin_blueprint.route("/advanced-settings") @admin_required def advanced_settings(): """Erweiterte Systemeinstellungen""" return render_template('admin_advanced_settings.html') @admin_blueprint.route("/system-health") @admin_required def system_health(): """System-Gesundheitsstatus""" try: with get_cached_session() as db_session: # Grundlegende Statistiken sammeln total_users = db_session.query(User).count() total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs } admin_logger.info(f"System-Health geladen von {current_user.username}") return render_template('admin.html', stats=stats, active_tab='system') except Exception as e: admin_logger.error(f"Fehler beim Laden des System-Health: {str(e)}") flash("Fehler beim Laden der System-Daten", "error") return render_template('admin.html', stats={}, active_tab='system') @admin_blueprint.route("/logs") @admin_required def logs_overview(): """System-Logs-Übersicht""" try: with get_cached_session() as db_session: # Grundlegende Statistiken sammeln total_users = db_session.query(User).count() total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() # Neueste Logs laden (falls SystemLog Model existiert) try: recent_logs = db_session.query(SystemLog).order_by(SystemLog.timestamp.desc()).limit(50).all() except Exception: recent_logs = [] stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs } admin_logger.info(f"Logs-Übersicht geladen von {current_user.username}") return render_template('admin.html', stats=stats, logs=recent_logs, active_tab='logs') except Exception as e: admin_logger.error(f"Fehler beim Laden der Logs-Übersicht: {str(e)}") flash("Fehler beim Laden der Log-Daten", "error") return render_template('admin.html', stats={}, logs=[], active_tab='logs') @admin_blueprint.route("/maintenance") @admin_required def maintenance(): """Wartungsseite""" try: with get_cached_session() as db_session: # Grundlegende Statistiken sammeln total_users = db_session.query(User).count() total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() stats = { 'total_users': total_users, 'total_printers': total_printers, 'total_jobs': total_jobs, 'active_jobs': active_jobs } admin_logger.info(f"Wartungsseite geladen von {current_user.username}") return render_template('admin.html', stats=stats, active_tab='maintenance') except Exception as e: admin_logger.error(f"Fehler beim Laden der Wartungsseite: {str(e)}") flash("Fehler beim Laden der Wartungsdaten", "error") return render_template('admin.html', stats={}, active_tab='maintenance') # ===== BENUTZER-CRUD-API (ursprünglich admin.py) ===== @admin_blueprint.route("/api/users", methods=["POST"]) @admin_required def create_user_api(): """API-Endpunkt zum Erstellen eines neuen Benutzers""" try: data = request.get_json() # Validierung der erforderlichen Felder required_fields = ['username', 'email', 'password', 'name'] for field in required_fields: if field not in data or not data[field]: return jsonify({"error": f"Feld '{field}' ist erforderlich"}), 400 with get_cached_session() as db_session: # Überprüfung auf bereits existierende Benutzer existing_user = db_session.query(User).filter( (User.username == data['username']) | (User.email == data['email']) ).first() if existing_user: return jsonify({"error": "Benutzername oder E-Mail bereits vergeben"}), 400 # Neuen Benutzer erstellen new_user = User( username=data['username'], email=data['email'], name=data['name'], role=data.get('role', 'user'), department=data.get('department'), position=data.get('position'), phone=data.get('phone'), bio=data.get('bio') ) new_user.set_password(data['password']) db_session.add(new_user) db_session.commit() admin_logger.info(f"Neuer Benutzer erstellt: {new_user.username} von Admin {current_user.username}") return jsonify({ "success": True, "message": "Benutzer erfolgreich erstellt", "user_id": new_user.id }) except Exception as e: admin_logger.error(f"Fehler beim Erstellen des Benutzers: {str(e)}") return jsonify({"error": "Fehler beim Erstellen des Benutzers"}), 500 @admin_blueprint.route("/api/users/", methods=["GET"]) @admin_required def get_user_api(user_id): """API-Endpunkt zum Abrufen von Benutzerdaten""" try: with get_cached_session() as db_session: user = db_session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 user_data = { "id": user.id, "username": user.username, "email": user.email, "name": user.name, "role": user.role, "active": user.active, "created_at": user.created_at.isoformat() if user.created_at else None, "last_login": user.last_login.isoformat() if user.last_login else None, "department": user.department, "position": user.position, "phone": user.phone, "bio": user.bio } return jsonify(user_data) except Exception as e: admin_logger.error(f"Fehler beim Abrufen der Benutzerdaten: {str(e)}") return jsonify({"error": "Fehler beim Abrufen der Benutzerdaten"}), 500 @admin_blueprint.route("/api/users/", methods=["PUT"]) @admin_required def update_user_api(user_id): """API-Endpunkt zum Aktualisieren von Benutzerdaten""" try: data = request.get_json() with get_cached_session() as db_session: user = db_session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Aktualisierbare Felder updatable_fields = ['username', 'email', 'name', 'role', 'active', 'department', 'position', 'phone', 'bio'] for field in updatable_fields: if field in data: setattr(user, field, data[field]) # Passwort separat behandeln if 'password' in data and data['password']: user.set_password(data['password']) user.updated_at = datetime.now() db_session.commit() admin_logger.info(f"Benutzer {user.username} aktualisiert von Admin {current_user.username}") return jsonify({ "success": True, "message": "Benutzer erfolgreich aktualisiert" }) except Exception as e: admin_logger.error(f"Fehler beim Aktualisieren des Benutzers: {str(e)}") return jsonify({"error": "Fehler beim Aktualisieren des Benutzers"}), 500 @admin_blueprint.route("/api/users/", methods=["DELETE"]) @admin_required def delete_user_api(user_id): """Löscht einen Benutzer über die API""" try: with get_cached_session() as db_session: user = db_session.query(User).filter(User.id == user_id).first() if not user: return jsonify({"error": "Benutzer nicht gefunden"}), 404 # Prüfen ob der Benutzer der einzige Admin ist if user.is_admin: admin_count = db_session.query(User).filter(User.is_admin == True).count() if admin_count <= 1: return jsonify({"error": "Der letzte Administrator kann nicht gelöscht werden"}), 400 username = user.username db_session.delete(user) db_session.commit() admin_logger.info(f"Benutzer {username} gelöscht von Admin {current_user.username}") return jsonify({ "success": True, "message": "Benutzer erfolgreich gelöscht" }) except Exception as e: admin_logger.error(f"Fehler beim Löschen des Benutzers: {str(e)}") return jsonify({"error": "Fehler beim Löschen des Benutzers"}), 500 # ===== DRUCKER-API-ROUTEN ===== # HINWEIS: Die Drucker-CRUD-Operationen wurden nach app.py verschoben # für konsistente API-Responses und bessere Wartbarkeit. # Verwenden Sie die folgenden Endpunkte: # - GET /api/admin/printers - Alle Drucker abrufen # - GET /api/admin/printers/ - Einzelnen Drucker abrufen # - POST /api/admin/printers - Neuen Drucker erstellen # - PUT /api/admin/printers/ - Drucker aktualisieren # - DELETE /api/admin/printers/ - Drucker löschen # Die alten Routen wurden entfernt, um Duplikate zu vermeiden # ===== ERWEITERTE SYSTEM-API (ursprünglich admin_api.py) ===== @admin_api_blueprint.route('/backup/create', methods=['POST']) @admin_required def create_backup(): """ Erstellt ein manuelles System-Backup. Erstellt eine Sicherung aller wichtigen Systemdaten einschließlich Datenbank, Konfigurationsdateien und Benutzer-Uploads. Returns: JSON: Erfolgs-Status und Backup-Informationen """ try: admin_api_logger.info(f"Backup-Erstellung angefordert von Admin {current_user.username}") # Backup-Verzeichnis sicherstellen backup_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'database', 'backups') os.makedirs(backup_dir, exist_ok=True) # Eindeutigen Backup-Namen erstellen timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f"system_backup_{timestamp}.zip" backup_path = os.path.join(backup_dir, backup_name) created_files = [] backup_size = 0 with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 1. Datenbank-Datei hinzufügen try: from utils.utilities_collection import DATABASE_PATH if os.path.exists(DATABASE_PATH): zipf.write(DATABASE_PATH, 'database/main.db') created_files.append('database/main.db') admin_api_logger.debug("✅ Hauptdatenbank zur Sicherung hinzugefügt") # WAL- und SHM-Dateien falls vorhanden wal_path = DATABASE_PATH + '-wal' shm_path = DATABASE_PATH + '-shm' if os.path.exists(wal_path): zipf.write(wal_path, 'database/main.db-wal') created_files.append('database/main.db-wal') if os.path.exists(shm_path): zipf.write(shm_path, 'database/main.db-shm') created_files.append('database/main.db-shm') except Exception as db_error: admin_api_logger.warning(f"Fehler beim Hinzufügen der Datenbank: {str(db_error)}") # 2. Konfigurationsdateien try: config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config') if os.path.exists(config_dir): for root, dirs, files in os.walk(config_dir): for file in files: if file.endswith(('.py', '.json', '.yaml', '.yml', '.toml')): file_path = os.path.join(root, file) arc_path = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__))) zipf.write(file_path, arc_path) created_files.append(arc_path) admin_api_logger.debug("✅ Konfigurationsdateien zur Sicherung hinzugefügt") except Exception as config_error: admin_api_logger.warning(f"Fehler beim Hinzufügen der Konfiguration: {str(config_error)}") # 3. Wichtige User-Uploads (limitiert auf die letzten 1000 Dateien) try: uploads_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'uploads') if os.path.exists(uploads_dir): file_count = 0 max_files = 1000 # Limit für Performance for root, dirs, files in os.walk(uploads_dir): for file in files[:max_files - file_count]: if file_count >= max_files: break file_path = os.path.join(root, file) file_size = os.path.getsize(file_path) # Nur Dateien unter 50MB hinzufügen if file_size < 50 * 1024 * 1024: arc_path = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__))) zipf.write(file_path, arc_path) created_files.append(arc_path) file_count += 1 if file_count >= max_files: break admin_api_logger.debug(f"✅ {file_count} Upload-Dateien zur Sicherung hinzugefügt") except Exception as uploads_error: admin_api_logger.warning(f"Fehler beim Hinzufügen der Uploads: {str(uploads_error)}") # 4. System-Logs (nur die letzten 100 Log-Dateien) try: logs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs') if os.path.exists(logs_dir): log_files = [] for root, dirs, files in os.walk(logs_dir): for file in files: if file.endswith(('.log', '.txt')): file_path = os.path.join(root, file) log_files.append((file_path, os.path.getmtime(file_path))) # Sortiere nach Datum (neueste zuerst) und nimm nur die letzten 100 log_files.sort(key=lambda x: x[1], reverse=True) for file_path, _ in log_files[:100]: arc_path = os.path.relpath(file_path, os.path.dirname(os.path.dirname(__file__))) zipf.write(file_path, arc_path) created_files.append(arc_path) admin_api_logger.debug(f"✅ {len(log_files[:100])} Log-Dateien zur Sicherung hinzugefügt") except Exception as logs_error: admin_api_logger.warning(f"Fehler beim Hinzufügen der Logs: {str(logs_error)}") # Backup-Größe bestimmen if os.path.exists(backup_path): backup_size = os.path.getsize(backup_path) admin_api_logger.info(f"✅ System-Backup erfolgreich erstellt: {backup_name} ({backup_size / 1024 / 1024:.2f} MB)") return jsonify({ 'success': True, 'message': f'Backup erfolgreich erstellt: {backup_name}', 'backup_info': { 'filename': backup_name, 'size_bytes': backup_size, 'size_mb': round(backup_size / 1024 / 1024, 2), 'files_count': len(created_files), 'created_at': datetime.now().isoformat(), 'path': backup_path } }) except Exception as e: admin_api_logger.error(f"❌ Fehler beim Erstellen des Backups: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Erstellen des Backups: {str(e)}' }), 500 @admin_api_blueprint.route('/database/optimize', methods=['POST']) @admin_required def optimize_database(): """ Führt Datenbank-Optimierung durch. Optimiert die SQLite-Datenbank durch VACUUM, ANALYZE und weitere Wartungsoperationen für bessere Performance. Returns: JSON: Erfolgs-Status und Optimierungs-Statistiken """ try: admin_api_logger.info(f"Datenbank-Optimierung angefordert von Admin {current_user.username}") from utils.utilities_collection import DATABASE_PATH optimization_results = { 'vacuum_completed': False, 'analyze_completed': False, 'integrity_check': False, 'wal_checkpoint': False, 'size_before': 0, 'size_after': 0, 'space_saved': 0 } # Datenbankgröße vor Optimierung if os.path.exists(DATABASE_PATH): optimization_results['size_before'] = os.path.getsize(DATABASE_PATH) # Verbindung zur Datenbank herstellen conn = sqlite3.connect(DATABASE_PATH, timeout=30.0) cursor = conn.cursor() try: # 1. Integritätsprüfung admin_api_logger.debug("🔍 Führe Integritätsprüfung durch...") cursor.execute("PRAGMA integrity_check") integrity_result = cursor.fetchone() optimization_results['integrity_check'] = integrity_result[0] == 'ok' if not optimization_results['integrity_check']: admin_api_logger.warning(f"⚠️ Integritätsprüfung ergab: {integrity_result[0]}") else: admin_api_logger.debug("✅ Integritätsprüfung erfolgreich") # 2. WAL-Checkpoint (falls WAL-Modus aktiv) try: admin_api_logger.debug("🔄 Führe WAL-Checkpoint durch...") cursor.execute("PRAGMA wal_checkpoint(TRUNCATE)") optimization_results['wal_checkpoint'] = True admin_api_logger.debug("✅ WAL-Checkpoint erfolgreich") except Exception as wal_error: admin_api_logger.debug(f"ℹ️ WAL-Checkpoint nicht möglich: {str(wal_error)}") # 3. ANALYZE - Statistiken aktualisieren admin_api_logger.debug("📊 Aktualisiere Datenbank-Statistiken...") cursor.execute("ANALYZE") optimization_results['analyze_completed'] = True admin_api_logger.debug("✅ ANALYZE erfolgreich") # 4. VACUUM - Datenbank komprimieren und reorganisieren admin_api_logger.debug("🗜️ Komprimiere und reorganisiere Datenbank...") cursor.execute("VACUUM") optimization_results['vacuum_completed'] = True admin_api_logger.debug("✅ VACUUM erfolgreich") # 5. Performance-Optimierungen try: # Cache-Größe optimieren cursor.execute("PRAGMA cache_size = 10000") # 10MB Cache # Journal-Modus auf WAL setzen für bessere Concurrent-Performance cursor.execute("PRAGMA journal_mode = WAL") # Synchronous auf NORMAL für Balance zwischen Performance und Sicherheit cursor.execute("PRAGMA synchronous = NORMAL") # Page-Größe optimieren (falls noch nicht gesetzt) cursor.execute("PRAGMA page_size = 4096") admin_api_logger.debug("✅ Performance-Optimierungen angewendet") except Exception as perf_error: admin_api_logger.warning(f"⚠️ Performance-Optimierungen teilweise fehlgeschlagen: {str(perf_error)}") finally: cursor.close() conn.close() # Datenbankgröße nach Optimierung if os.path.exists(DATABASE_PATH): optimization_results['size_after'] = os.path.getsize(DATABASE_PATH) optimization_results['space_saved'] = optimization_results['size_before'] - optimization_results['size_after'] # Ergebnisse loggen space_saved_mb = optimization_results['space_saved'] / 1024 / 1024 admin_api_logger.info(f"✅ Datenbank-Optimierung abgeschlossen - {space_saved_mb:.2f} MB Speicher gespart") return jsonify({ 'success': True, 'message': 'Datenbank erfolgreich optimiert', 'results': { 'vacuum_completed': optimization_results['vacuum_completed'], 'analyze_completed': optimization_results['analyze_completed'], 'integrity_check_passed': optimization_results['integrity_check'], 'wal_checkpoint_completed': optimization_results['wal_checkpoint'], 'size_before_mb': round(optimization_results['size_before'] / 1024 / 1024, 2), 'size_after_mb': round(optimization_results['size_after'] / 1024 / 1024, 2), 'space_saved_mb': round(space_saved_mb, 2), 'optimization_timestamp': datetime.now().isoformat() } }) except Exception as e: admin_api_logger.error(f"❌ Fehler bei Datenbank-Optimierung: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler bei Datenbank-Optimierung: {str(e)}' }), 500 @admin_api_blueprint.route('/cache/clear', methods=['POST']) @admin_required def clear_cache(): """ Leert den System-Cache. Entfernt alle temporären Dateien, Cache-Verzeichnisse und Python-Bytecode um Speicher freizugeben und Performance zu verbessern. Returns: JSON: Erfolgs-Status und Lösch-Statistiken """ try: admin_api_logger.info(f"Cache-Leerung angefordert von Admin {current_user.username}") cleared_stats = { 'files_deleted': 0, 'dirs_deleted': 0, 'space_freed': 0, 'categories': {} } app_root = os.path.dirname(os.path.dirname(__file__)) # 1. Python-Bytecode-Cache leeren (__pycache__) try: pycache_count = 0 pycache_size = 0 for root, dirs, files in os.walk(app_root): if '__pycache__' in root: for file in files: file_path = os.path.join(root, file) try: pycache_size += os.path.getsize(file_path) os.remove(file_path) pycache_count += 1 except Exception: pass # Versuche das __pycache__-Verzeichnis zu löschen try: os.rmdir(root) cleared_stats['dirs_deleted'] += 1 except Exception: pass cleared_stats['categories']['python_bytecode'] = { 'files': pycache_count, 'size_mb': round(pycache_size / 1024 / 1024, 2) } cleared_stats['files_deleted'] += pycache_count cleared_stats['space_freed'] += pycache_size admin_api_logger.debug(f"✅ Python-Bytecode-Cache: {pycache_count} Dateien, {pycache_size / 1024 / 1024:.2f} MB") except Exception as pycache_error: admin_api_logger.warning(f"⚠️ Fehler beim Leeren des Python-Cache: {str(pycache_error)}") # 2. Temporäre Dateien im uploads/temp Verzeichnis try: temp_count = 0 temp_size = 0 temp_dir = os.path.join(app_root, 'uploads', 'temp') if os.path.exists(temp_dir): for root, dirs, files in os.walk(temp_dir): for file in files: file_path = os.path.join(root, file) try: temp_size += os.path.getsize(file_path) os.remove(file_path) temp_count += 1 except Exception: pass cleared_stats['categories']['temp_uploads'] = { 'files': temp_count, 'size_mb': round(temp_size / 1024 / 1024, 2) } cleared_stats['files_deleted'] += temp_count cleared_stats['space_freed'] += temp_size admin_api_logger.debug(f"✅ Temporäre Upload-Dateien: {temp_count} Dateien, {temp_size / 1024 / 1024:.2f} MB") except Exception as temp_error: admin_api_logger.warning(f"⚠️ Fehler beim Leeren des Temp-Verzeichnisses: {str(temp_error)}") # 3. System-Cache-Verzeichnisse (falls vorhanden) try: cache_count = 0 cache_size = 0 cache_dirs = [ os.path.join(app_root, 'static', 'cache'), os.path.join(app_root, 'cache'), os.path.join(app_root, '.cache') ] for cache_dir in cache_dirs: if os.path.exists(cache_dir): for root, dirs, files in os.walk(cache_dir): for file in files: file_path = os.path.join(root, file) try: cache_size += os.path.getsize(file_path) os.remove(file_path) cache_count += 1 except Exception: pass cleared_stats['categories']['system_cache'] = { 'files': cache_count, 'size_mb': round(cache_size / 1024 / 1024, 2) } cleared_stats['files_deleted'] += cache_count cleared_stats['space_freed'] += cache_size admin_api_logger.debug(f"✅ System-Cache: {cache_count} Dateien, {cache_size / 1024 / 1024:.2f} MB") except Exception as cache_error: admin_api_logger.warning(f"⚠️ Fehler beim Leeren des System-Cache: {str(cache_error)}") # 4. Alte Log-Dateien (älter als 30 Tage) try: logs_count = 0 logs_size = 0 logs_dir = os.path.join(app_root, 'logs') cutoff_date = datetime.now().timestamp() - (30 * 24 * 60 * 60) # 30 Tage if os.path.exists(logs_dir): for root, dirs, files in os.walk(logs_dir): for file in files: if file.endswith(('.log', '.log.1', '.log.2', '.log.3')): file_path = os.path.join(root, file) try: if os.path.getmtime(file_path) < cutoff_date: logs_size += os.path.getsize(file_path) os.remove(file_path) logs_count += 1 except Exception: pass cleared_stats['categories']['old_logs'] = { 'files': logs_count, 'size_mb': round(logs_size / 1024 / 1024, 2) } cleared_stats['files_deleted'] += logs_count cleared_stats['space_freed'] += logs_size admin_api_logger.debug(f"✅ Alte Log-Dateien: {logs_count} Dateien, {logs_size / 1024 / 1024:.2f} MB") except Exception as logs_error: admin_api_logger.warning(f"⚠️ Fehler beim Leeren alter Log-Dateien: {str(logs_error)}") # 5. Application-Level Cache leeren (falls Models-Cache existiert) try: from models import clear_model_cache clear_model_cache() admin_api_logger.debug("✅ Application-Level Cache geleert") except (ImportError, AttributeError): admin_api_logger.debug("ℹ️ Kein Application-Level Cache verfügbar") # Ergebnisse zusammenfassen total_space_mb = cleared_stats['space_freed'] / 1024 / 1024 admin_api_logger.info(f"✅ Cache-Leerung abgeschlossen: {cleared_stats['files_deleted']} Dateien, {total_space_mb:.2f} MB freigegeben") return jsonify({ 'success': True, 'message': f'Cache erfolgreich geleert - {total_space_mb:.2f} MB freigegeben', 'statistics': { 'total_files_deleted': cleared_stats['files_deleted'], 'total_dirs_deleted': cleared_stats['dirs_deleted'], 'total_space_freed_mb': round(total_space_mb, 2), 'categories': cleared_stats['categories'], 'cleanup_timestamp': datetime.now().isoformat() } }) except Exception as e: admin_api_logger.error(f"❌ Fehler beim Leeren des Cache: {str(e)}") return jsonify({ 'success': False, 'message': f'Fehler beim Leeren des Cache: {str(e)}' }), 500 # ===== API-ENDPUNKTE FÜR LOGS ===== @admin_blueprint.route("/api/logs", methods=["GET"]) @admin_required def get_logs_api(): """API-Endpunkt zum Abrufen von System-Logs""" try: level = request.args.get('level', 'all') limit = min(int(request.args.get('limit', 100)), 1000) # Max 1000 Logs with get_cached_session() as db_session: query = db_session.query(SystemLog) # Filter nach Log-Level falls spezifiziert if level != 'all': query = query.filter(SystemLog.level == level.upper()) # Logs laden logs = query.order_by(SystemLog.timestamp.desc()).limit(limit).all() # In Dictionary konvertieren logs_data = [] for log in logs: logs_data.append({ 'id': log.id, 'level': log.level, 'message': log.message, 'timestamp': log.timestamp.isoformat() if log.timestamp else None, 'module': getattr(log, 'module', ''), 'user_id': getattr(log, 'user_id', None), 'ip_address': getattr(log, 'ip_address', '') }) admin_logger.info(f"Logs abgerufen: {len(logs_data)} Einträge, Level: {level}") return jsonify({ "success": True, "logs": logs_data, "count": len(logs_data), "level": level }) except Exception as e: admin_logger.error(f"Fehler beim Abrufen der Logs: {str(e)}") return jsonify({"error": "Fehler beim Laden der Logs"}), 500 @admin_blueprint.route("/api/logs/export", methods=["POST"]) @admin_required def export_logs_api(): """API-Endpunkt zum Exportieren von System-Logs""" try: data = request.get_json() or {} level = data.get('level', 'all') format_type = data.get('format', 'json') # json, csv, txt with get_cached_session() as db_session: query = db_session.query(SystemLog) # Filter nach Log-Level falls spezifiziert if level != 'all': query = query.filter(SystemLog.level == level.upper()) # Alle Logs für Export laden logs = query.order_by(SystemLog.timestamp.desc()).all() # Export-Format bestimmen if format_type == 'csv': import csv import io output = io.StringIO() writer = csv.writer(output) # Header schreiben writer.writerow(['Timestamp', 'Level', 'Module', 'Message', 'User ID', 'IP Address']) # Daten schreiben for log in logs: writer.writerow([ log.timestamp.isoformat() if log.timestamp else '', log.level, getattr(log, 'module', ''), log.message, getattr(log, 'user_id', ''), getattr(log, 'ip_address', '') ]) content = output.getvalue() output.close() return jsonify({ "success": True, "content": content, "filename": f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", "content_type": "text/csv" }) elif format_type == 'txt': lines = [] for log in logs: timestamp = log.timestamp.strftime('%Y-%m-%d %H:%M:%S') if log.timestamp else 'Unknown' lines.append(f"[{timestamp}] {log.level}: {log.message}") content = '\n'.join(lines) return jsonify({ "success": True, "content": content, "filename": f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", "content_type": "text/plain" }) else: # JSON format logs_data = [] for log in logs: logs_data.append({ 'id': log.id, 'level': log.level, 'message': log.message, 'timestamp': log.timestamp.isoformat() if log.timestamp else None, 'module': getattr(log, 'module', ''), 'user_id': getattr(log, 'user_id', None), 'ip_address': getattr(log, 'ip_address', '') }) import json content = json.dumps(logs_data, indent=2, ensure_ascii=False) return jsonify({ "success": True, "content": content, "filename": f"system_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", "content_type": "application/json" }) except Exception as e: admin_logger.error(f"Fehler beim Exportieren der Logs: {str(e)}") return jsonify({"error": "Fehler beim Exportieren der Logs"}), 500 # ===== API-ENDPUNKTE FÜR SYSTEM-INFORMATIONEN ===== @admin_blueprint.route("/api/system/status", methods=["GET"]) @admin_required def get_system_status_api(): """API-Endpunkt für System-Status-Informationen""" try: import psutil import platform # System-Informationen sammeln cpu_usage = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') # Netzwerk-Informationen network = psutil.net_io_counters() # Python und Flask Informationen python_version = platform.python_version() platform_info = platform.platform() # Datenbank-Statistiken with get_cached_session() as db_session: total_users = db_session.query(User).count() total_printers = db_session.query(Printer).count() total_jobs = db_session.query(Job).count() # Aktive Jobs zählen active_jobs = db_session.query(Job).filter( Job.status.in_(['pending', 'printing', 'paused']) ).count() system_status = { "cpu": { "usage_percent": cpu_usage, "core_count": psutil.cpu_count() }, "memory": { "total": memory.total, "available": memory.available, "used": memory.used, "usage_percent": memory.percent }, "disk": { "total": disk.total, "used": disk.used, "free": disk.free, "usage_percent": (disk.used / disk.total) * 100 }, "network": { "bytes_sent": network.bytes_sent, "bytes_received": network.bytes_recv, "packets_sent": network.packets_sent, "packets_received": network.packets_recv }, "system": { "python_version": python_version, "platform": platform_info, "uptime": datetime.now().isoformat() }, "database": { "total_users": total_users, "total_printers": total_printers, "total_jobs": total_jobs, "active_jobs": active_jobs } } admin_logger.info(f"System-Status abgerufen von {current_user.username}") return jsonify({ "success": True, "status": system_status, "timestamp": datetime.now().isoformat() }) except Exception as e: admin_logger.error(f"Fehler beim Abrufen des System-Status: {str(e)}") return jsonify({"error": "Fehler beim Laden des System-Status"}), 500 # ===== TEST-ENDPUNKTE FÜR ENTWICKLUNG ===== @admin_blueprint.route("/api/test/create-sample-logs", methods=["POST"]) @admin_required def create_sample_logs_api(): """Test-Endpunkt zum Erstellen von Beispiel-Log-Einträgen""" try: with get_cached_session() as db_session: # Verschiedene Log-Level erstellen sample_logs = [ { 'level': 'INFO', 'message': 'System erfolgreich gestartet', 'module': 'admin', 'user_id': current_user.id, 'ip_address': request.remote_addr }, { 'level': 'WARNING', 'message': 'Drucker hat 5 Minuten nicht geantwortet', 'module': 'printer_monitor', 'user_id': None, 'ip_address': None }, { 'level': 'ERROR', 'message': 'Fehler beim Verbinden mit Drucker printer-001', 'module': 'printer', 'user_id': None, 'ip_address': None }, { 'level': 'DEBUG', 'message': 'API-Aufruf erfolgreich verarbeitet', 'module': 'api', 'user_id': current_user.id, 'ip_address': request.remote_addr }, { 'level': 'CRITICAL', 'message': 'Datenbank-Verbindung unterbrochen', 'module': 'database', 'user_id': None, 'ip_address': None } ] # Log-Einträge erstellen created_count = 0 for log_data in sample_logs: log_entry = SystemLog( level=log_data['level'], message=log_data['message'], module=log_data['module'], user_id=log_data['user_id'], ip_address=log_data['ip_address'] ) db_session.add(log_entry) created_count += 1 db_session.commit() admin_logger.info(f"Test-Logs erstellt: {created_count} Einträge von {current_user.username}") return jsonify({ "success": True, "message": f"{created_count} Test-Log-Einträge erfolgreich erstellt", "count": created_count }) except Exception as e: admin_logger.error(f"Fehler beim Erstellen der Test-Logs: {str(e)}") return jsonify({"error": "Fehler beim Erstellen der Test-Logs"}), 500 # ===== STECKDOSENSCHALTZEITEN API-ENDPUNKTE ===== @admin_api_blueprint.route('/plug-schedules/logs', methods=['GET']) @admin_required def api_admin_plug_schedules_logs(): """ API-Endpoint für Steckdosenschaltzeiten-Logs. Unterstützt Filterung nach Drucker, Zeitraum und Status. """ try: # Parameter aus Request printer_id = request.args.get('printer_id', type=int) hours = request.args.get('hours', default=24, type=int) status_filter = request.args.get('status') page = request.args.get('page', default=1, type=int) per_page = request.args.get('per_page', default=100, type=int) # Maximale Grenzen setzen hours = min(hours, 168) # Maximal 7 Tage per_page = min(per_page, 1000) # Maximal 1000 Einträge pro Seite with get_cached_session() as db_session: # Basis-Query cutoff_time = datetime.now() - timedelta(hours=hours) query = db_session.query(PlugStatusLog)\ .filter(PlugStatusLog.timestamp >= cutoff_time)\ .join(Printer) # Drucker-Filter if printer_id: query = query.filter(PlugStatusLog.printer_id == printer_id) # Status-Filter if status_filter: query = query.filter(PlugStatusLog.status == status_filter) # Gesamtanzahl für Paginierung total = query.count() # Sortierung und Paginierung logs = query.order_by(PlugStatusLog.timestamp.desc())\ .offset((page - 1) * per_page)\ .limit(per_page)\ .all() # Daten serialisieren log_data = [] for log in logs: log_dict = log.to_dict() # Zusätzliche berechnete Felder log_dict['timestamp_relative'] = get_relative_time(log.timestamp) log_dict['status_icon'] = get_status_icon(log.status) log_dict['status_color'] = get_status_color(log.status) log_data.append(log_dict) # Paginierungs-Metadaten has_next = (page * per_page) < total has_prev = page > 1 return jsonify({ "success": True, "logs": log_data, "pagination": { "page": page, "per_page": per_page, "total": total, "total_pages": (total + per_page - 1) // per_page, "has_next": has_next, "has_prev": has_prev }, "filters": { "printer_id": printer_id, "hours": hours, "status": status_filter }, "generated_at": datetime.now().isoformat() }) except Exception as e: admin_logger.error(f"Fehler beim Abrufen der Steckdosen-Logs: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Laden der Steckdosen-Logs", "details": str(e) if current_user.is_admin else None }), 500 @admin_api_blueprint.route('/plug-schedules/statistics', methods=['GET']) @admin_required def api_admin_plug_schedules_statistics(): """ API-Endpoint für Steckdosenschaltzeiten-Statistiken. """ try: hours = request.args.get('hours', default=24, type=int) hours = min(hours, 168) # Maximal 7 Tage # Statistiken abrufen stats = PlugStatusLog.get_status_statistics(hours=hours) # Drucker-Namen für die Top-Liste hinzufügen if stats.get('top_printers'): with get_cached_session() as db_session: printer_ids = list(stats['top_printers'].keys()) printers = db_session.query(Printer.id, Printer.name)\ .filter(Printer.id.in_(printer_ids))\ .all() printer_names = {p.id: p.name for p in printers} # Top-Drucker mit Namen anreichern top_printers_with_names = [] for printer_id, count in stats['top_printers'].items(): top_printers_with_names.append({ "printer_id": printer_id, "printer_name": printer_names.get(printer_id, f"Drucker {printer_id}"), "log_count": count }) stats['top_printers_detailed'] = top_printers_with_names return jsonify({ "success": True, "statistics": stats }) except Exception as e: admin_logger.error(f"Fehler beim Abrufen der Steckdosen-Statistiken: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Laden der Statistiken", "details": str(e) if current_user.is_admin else None }), 500 @admin_api_blueprint.route('/plug-schedules/cleanup', methods=['POST']) @admin_required def api_admin_plug_schedules_cleanup(): """ API-Endpoint zum Bereinigen alter Steckdosenschaltzeiten-Logs. """ try: data = request.get_json() or {} days = data.get('days', 30) days = max(1, min(days, 365)) # Zwischen 1 und 365 Tagen # Bereinigung durchführen deleted_count = PlugStatusLog.cleanup_old_logs(days=days) # Erfolg loggen SystemLog.log_system_event( level="INFO", message=f"Steckdosen-Logs bereinigt: {deleted_count} Einträge gelöscht (älter als {days} Tage)", module="admin_plug_schedules", user_id=current_user.id ) admin_logger.info(f"Admin {current_user.username} bereinigte {deleted_count} Steckdosen-Logs (älter als {days} Tage)") return jsonify({ "success": True, "deleted_count": deleted_count, "days": days, "message": f"Erfolgreich {deleted_count} alte Einträge gelöscht" }) except Exception as e: admin_logger.error(f"Fehler beim Bereinigen der Steckdosen-Logs: {str(e)}") return jsonify({ "success": False, "error": "Fehler beim Bereinigen der Logs", "details": str(e) if current_user.is_admin else None }), 500 @admin_api_blueprint.route('/plug-schedules/calendar', methods=['GET']) @admin_required def api_admin_plug_schedules_calendar(): """ API-Endpoint für Kalender-Daten der Steckdosenschaltzeiten. Liefert Events für FullCalendar im JSON-Format. """ try: # Parameter aus Request start_date = request.args.get('start') end_date = request.args.get('end') printer_id = request.args.get('printer_id', type=int) if not start_date or not end_date: return jsonify([]) # Leere Events bei fehlenden Daten # Datum-Strings zu datetime konvertieren start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) with get_cached_session() as db_session: # Query für Logs im Zeitraum query = db_session.query(PlugStatusLog)\ .filter(PlugStatusLog.timestamp >= start_dt)\ .filter(PlugStatusLog.timestamp <= end_dt)\ .join(Printer) # Drucker-Filter if printer_id: query = query.filter(PlugStatusLog.printer_id == printer_id) # Logs abrufen und nach Drucker gruppieren logs = query.order_by(PlugStatusLog.timestamp.asc()).all() # Events für FullCalendar formatieren events = [] for log in logs: # Farbe und Titel basierend auf Status if log.status == 'on': color = '#10b981' # Grün title = f"🟢 {log.printer.name}: EIN" elif log.status == 'off': color = '#f59e0b' # Orange title = f"🔴 {log.printer.name}: AUS" elif log.status == 'connected': color = '#3b82f6' # Blau title = f"🔌 {log.printer.name}: Verbunden" elif log.status == 'disconnected': color = '#ef4444' # Rot title = f"⚠️ {log.printer.name}: Getrennt" else: color = '#6b7280' # Grau title = f"❓ {log.printer.name}: {log.status}" # Event-Objekt für FullCalendar event = { 'id': f"plug_{log.id}", 'title': title, 'start': log.timestamp.isoformat(), 'backgroundColor': color, 'borderColor': color, 'textColor': '#ffffff', 'allDay': False, 'extendedProps': { 'printer_id': log.printer_id, 'printer_name': log.printer.name, 'status': log.status, 'timestamp': log.timestamp.isoformat(), 'log_id': log.id } } events.append(event) return jsonify(events) except Exception as e: admin_logger.error(f"Fehler beim Laden der Kalender-Daten: {str(e)}") return jsonify([]) # ===== HELPER FUNCTIONS FOR PLUG SCHEDULES ===== def get_relative_time(timestamp): """Gibt eine relative Zeitangabe zurück (z.B. 'vor 2 Stunden')""" try: if not timestamp: return "Unbekannt" now = datetime.now() diff = now - timestamp if diff.days > 0: return f"vor {diff.days} Tag{'en' if diff.days > 1 else ''}" elif diff.seconds > 3600: hours = diff.seconds // 3600 return f"vor {hours} Stunde{'n' if hours > 1 else ''}" elif diff.seconds > 60: minutes = diff.seconds // 60 return f"vor {minutes} Minute{'n' if minutes > 1 else ''}" else: return "gerade eben" except Exception: return "Unbekannt" def get_status_icon(status): """Gibt ein Icon für den gegebenen Status zurück""" status_icons = { 'on': '🟢', 'off': '🔴', 'connected': '🔌', 'disconnected': '⚠️', 'unknown': '❓' } return status_icons.get(status, '❓') def get_status_color(status): """Gibt eine Farbe für den gegebenen Status zurück""" status_colors = { 'on': '#10b981', # Grün 'off': '#f59e0b', # Orange 'connected': '#3b82f6', # Blau 'disconnected': '#ef4444', # Rot 'unknown': '#6b7280' # Grau } return status_colors.get(status, '#6b7280')