""" Erweiterte Formular-Validierung für das MYP-System ================================================== Dieses Modul stellt umfassende Client- und serverseitige Validierung mit benutzerfreundlichem UI-Feedback bereit. Funktionen: - Multi-Level-Validierung (Client/Server) - Echtzeitvalidierung mit JavaScript - Barrierefreie Fehlermeldungen - Custom Validators für spezielle Anforderungen - Automatische Sanitization von Eingaben """ import re import html import json import logging from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime, timedelta from flask import request, jsonify, session from functools import wraps from werkzeug.datastructures import FileStorage from utils.logging_config import get_logger from config.settings import ALLOWED_EXTENSIONS, MAX_FILE_SIZE logger = get_logger("validation") class ValidationError(Exception): """Custom Exception für Validierungsfehler""" def __init__(self, message: str, field: str = None, code: str = None): self.message = message self.field = field self.code = code super().__init__(self.message) class ValidationResult: """Ergebnis einer Validierung""" def __init__(self): self.is_valid = True self.errors: Dict[str, List[str]] = {} self.warnings: Dict[str, List[str]] = {} self.cleaned_data: Dict[str, Any] = {} def add_error(self, field: str, message: str): """Fügt einen Validierungsfehler hinzu""" if field not in self.errors: self.errors[field] = [] self.errors[field].append(message) self.is_valid = False def add_warning(self, field: str, message: str): """Fügt eine Warnung hinzu""" if field not in self.warnings: self.warnings[field] = [] self.warnings[field].append(message) def to_dict(self) -> Dict[str, Any]: """Konvertiert das Ergebnis zu einem Dictionary""" return { "is_valid": self.is_valid, "errors": self.errors, "warnings": self.warnings, "cleaned_data": self.cleaned_data } class BaseValidator: """Basis-Klasse für alle Validatoren""" def __init__(self, required: bool = False, allow_empty: bool = True): self.required = required self.allow_empty = allow_empty def validate(self, value: Any, field_name: str = None) -> ValidationResult: """Führt die Validierung durch""" result = ValidationResult() # Prüfung auf erforderliche Felder if self.required and (value is None or value == ""): result.add_error(field_name or "field", "Dieses Feld ist erforderlich.") return result # Wenn Wert leer und erlaubt, keine weitere Validierung if not value and self.allow_empty: result.cleaned_data[field_name or "field"] = value return result return self._validate_value(value, field_name, result) def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: """Überschreibbar für spezifische Validierungslogik""" result.cleaned_data[field_name or "field"] = value return result class StringValidator(BaseValidator): """Validator für String-Werte""" def __init__(self, min_length: int = None, max_length: int = None, pattern: str = None, trim: bool = True, **kwargs): super().__init__(**kwargs) self.min_length = min_length self.max_length = max_length self.pattern = re.compile(pattern) if pattern else None self.trim = trim def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: # String konvertieren und trimmen str_value = str(value) if self.trim: str_value = str_value.strip() # Längenprüfung if self.min_length is not None and len(str_value) < self.min_length: result.add_error(field_name, f"Mindestlänge: {self.min_length} Zeichen") if self.max_length is not None and len(str_value) > self.max_length: result.add_error(field_name, f"Maximallänge: {self.max_length} Zeichen") # Pattern-Prüfung if self.pattern and not self.pattern.match(str_value): result.add_error(field_name, "Format ist ungültig") # HTML-Sanitization cleaned_value = html.escape(str_value) result.cleaned_data[field_name] = cleaned_value return result class EmailValidator(StringValidator): """Validator für E-Mail-Adressen""" EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' def __init__(self, **kwargs): super().__init__(pattern=self.EMAIL_PATTERN, **kwargs) def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: result = super()._validate_value(value, field_name, result) if result.is_valid: # Normalisierung der E-Mail email = str(value).lower().strip() result.cleaned_data[field_name] = email return result class IntegerValidator(BaseValidator): """Validator für Integer-Werte""" def __init__(self, min_value: int = None, max_value: int = None, **kwargs): super().__init__(**kwargs) self.min_value = min_value self.max_value = max_value def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: try: int_value = int(value) except (ValueError, TypeError): result.add_error(field_name, "Muss eine ganze Zahl sein") return result if self.min_value is not None and int_value < self.min_value: result.add_error(field_name, f"Mindestwert: {self.min_value}") if self.max_value is not None and int_value > self.max_value: result.add_error(field_name, f"Maximalwert: {self.max_value}") result.cleaned_data[field_name] = int_value return result class FloatValidator(BaseValidator): """Validator für Float-Werte""" def __init__(self, min_value: float = None, max_value: float = None, decimal_places: int = None, **kwargs): super().__init__(**kwargs) self.min_value = min_value self.max_value = max_value self.decimal_places = decimal_places def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: try: float_value = float(value) except (ValueError, TypeError): result.add_error(field_name, "Muss eine Dezimalzahl sein") return result if self.min_value is not None and float_value < self.min_value: result.add_error(field_name, f"Mindestwert: {self.min_value}") if self.max_value is not None and float_value > self.max_value: result.add_error(field_name, f"Maximalwert: {self.max_value}") # Rundung auf bestimmte Dezimalstellen if self.decimal_places is not None: float_value = round(float_value, self.decimal_places) result.cleaned_data[field_name] = float_value return result class DateTimeValidator(BaseValidator): """Validator für DateTime-Werte""" def __init__(self, format_string: str = "%Y-%m-%d %H:%M", min_date: datetime = None, max_date: datetime = None, **kwargs): super().__init__(**kwargs) self.format_string = format_string self.min_date = min_date self.max_date = max_date def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: if isinstance(value, datetime): dt_value = value else: try: dt_value = datetime.strptime(str(value), self.format_string) except ValueError: result.add_error(field_name, f"Ungültiges Datumsformat. Erwartet: {self.format_string}") return result if self.min_date and dt_value < self.min_date: result.add_error(field_name, f"Datum muss nach {self.min_date.strftime('%d.%m.%Y')} liegen") if self.max_date and dt_value > self.max_date: result.add_error(field_name, f"Datum muss vor {self.max_date.strftime('%d.%m.%Y')} liegen") result.cleaned_data[field_name] = dt_value return result class FileValidator(BaseValidator): """Validator für Datei-Uploads""" def __init__(self, allowed_extensions: List[str] = None, max_size_mb: int = None, min_size_kb: int = None, **kwargs): super().__init__(**kwargs) self.allowed_extensions = allowed_extensions or ALLOWED_EXTENSIONS self.max_size_mb = max_size_mb or (MAX_FILE_SIZE / (1024 * 1024)) self.min_size_kb = min_size_kb def _validate_value(self, value: Any, field_name: str, result: ValidationResult) -> ValidationResult: if not isinstance(value, FileStorage): result.add_error(field_name, "Muss eine gültige Datei sein") return result # Dateiname prüfen if not value.filename: result.add_error(field_name, "Dateiname ist erforderlich") return result # Dateierweiterung prüfen extension = value.filename.rsplit('.', 1)[-1].lower() if '.' in value.filename else '' if extension not in self.allowed_extensions: result.add_error(field_name, f"Nur folgende Dateiformate sind erlaubt: {', '.join(self.allowed_extensions)}") # Dateigröße prüfen value.seek(0, 2) # Zum Ende der Datei file_size = value.tell() value.seek(0) # Zurück zum Anfang if self.max_size_mb and file_size > (self.max_size_mb * 1024 * 1024): result.add_error(field_name, f"Datei zu groß. Maximum: {self.max_size_mb} MB") if self.min_size_kb and file_size < (self.min_size_kb * 1024): result.add_error(field_name, f"Datei zu klein. Minimum: {self.min_size_kb} KB") result.cleaned_data[field_name] = value return result class FormValidator: """Haupt-Formular-Validator""" def __init__(self): self.fields: Dict[str, BaseValidator] = {} self.custom_validators: List[Callable] = [] self.rate_limit_key = None self.csrf_check = True def add_field(self, name: str, validator: BaseValidator): """Fügt ein Feld mit Validator hinzu""" self.fields[name] = validator return self def add_custom_validator(self, validator_func: Callable): """Fügt einen benutzerdefinierten Validator hinzu""" self.custom_validators.append(validator_func) return self def set_rate_limit(self, key: str): """Setzt einen Rate-Limiting-Schlüssel""" self.rate_limit_key = key return self def disable_csrf(self): """Deaktiviert CSRF-Prüfung für dieses Formular""" self.csrf_check = False return self def validate(self, data: Dict[str, Any]) -> ValidationResult: """Validiert die gesamten Formulardaten""" result = ValidationResult() # Einzelfeldvalidierung for field_name, validator in self.fields.items(): field_value = data.get(field_name) field_result = validator.validate(field_value, field_name) if not field_result.is_valid: result.errors.update(field_result.errors) result.is_valid = False result.warnings.update(field_result.warnings) result.cleaned_data.update(field_result.cleaned_data) # Benutzerdefinierte Validierung if result.is_valid: for custom_validator in self.custom_validators: try: custom_result = custom_validator(result.cleaned_data) if isinstance(custom_result, ValidationResult): if not custom_result.is_valid: result.errors.update(custom_result.errors) result.is_valid = False result.warnings.update(custom_result.warnings) except Exception as e: logger.error(f"Fehler bei benutzerdefinierter Validierung: {str(e)}") result.add_error("form", "Unerwarteter Validierungsfehler") return result # Vordefinierte Formular-Validatoren def get_user_registration_validator() -> FormValidator: """Validator für Benutzerregistrierung""" return FormValidator() \ .add_field("username", StringValidator(min_length=3, max_length=50, required=True)) \ .add_field("email", EmailValidator(required=True)) \ .add_field("password", StringValidator(min_length=8, required=True)) \ .add_field("password_confirm", StringValidator(min_length=8, required=True)) \ .add_field("name", StringValidator(min_length=2, max_length=100, required=True)) \ .add_custom_validator(lambda data: _validate_password_match(data)) def get_job_creation_validator() -> FormValidator: """Validator für Job-Erstellung""" return FormValidator() \ .add_field("name", StringValidator(min_length=1, max_length=200, required=True)) \ .add_field("description", StringValidator(max_length=500)) \ .add_field("printer_id", IntegerValidator(min_value=1, required=True)) \ .add_field("duration_minutes", IntegerValidator(min_value=1, max_value=1440, required=True)) \ .add_field("start_at", DateTimeValidator(min_date=datetime.now())) \ .add_field("file", FileValidator(required=True)) def get_printer_creation_validator() -> FormValidator: """Validator für Drucker-Erstellung""" return FormValidator() \ .add_field("name", StringValidator(min_length=1, max_length=100, required=True)) \ .add_field("model", StringValidator(max_length=100)) \ .add_field("location", StringValidator(max_length=100)) \ .add_field("ip_address", StringValidator(pattern=r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')) \ .add_field("mac_address", StringValidator(pattern=r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$', required=True)) \ .add_field("plug_ip", StringValidator(pattern=r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', required=True)) \ .add_field("plug_username", StringValidator(min_length=1, required=True)) \ .add_field("plug_password", StringValidator(min_length=1, required=True)) def get_guest_request_validator() -> FormValidator: """Validator für Gastanfragen""" return FormValidator() \ .add_field("name", StringValidator(min_length=2, max_length=100, required=True)) \ .add_field("email", EmailValidator()) \ .add_field("reason", StringValidator(min_length=10, max_length=500, required=True)) \ .add_field("duration_minutes", IntegerValidator(min_value=5, max_value=480, required=True)) \ .add_field("copies", IntegerValidator(min_value=1, max_value=10)) \ .add_field("file", FileValidator(required=True)) \ .set_rate_limit("guest_request") def _validate_password_match(data: Dict[str, Any]) -> ValidationResult: """Validiert, ob Passwörter übereinstimmen""" result = ValidationResult() password = data.get("password") password_confirm = data.get("password_confirm") if password != password_confirm: result.add_error("password_confirm", "Passwörter stimmen nicht überein") return result # Decorator für automatische Formularvalidierung def validate_form(validator_func: Callable[[], FormValidator]): """Decorator für automatische Formularvalidierung""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): try: # Validator erstellen validator = validator_func() # Daten aus Request extrahieren if request.is_json: data = request.get_json() or {} else: data = dict(request.form) # Dateien hinzufügen for key, file in request.files.items(): data[key] = file # Validierung durchführen validation_result = validator.validate(data) # Bei Fehlern JSON-Response zurückgeben if not validation_result.is_valid: logger.warning(f"Validierungsfehler für {request.endpoint}: {validation_result.errors}") return jsonify({ "success": False, "errors": validation_result.errors, "warnings": validation_result.warnings }), 400 # Gereinigte Daten an die Request anhängen request.validated_data = validation_result.cleaned_data request.validation_warnings = validation_result.warnings return f(*args, **kwargs) except Exception as e: logger.error(f"Fehler bei Formularvalidierung: {str(e)}") return jsonify({ "success": False, "errors": {"form": ["Unerwarteter Validierungsfehler"]} }), 500 return decorated_function return decorator # JavaScript für Client-seitige Validierung def get_client_validation_js() -> str: """Generiert JavaScript für Client-seitige Validierung""" return """ class FormValidator { constructor(formId, validationRules = {}) { this.form = document.getElementById(formId); this.rules = validationRules; this.errors = {}; this.setupEventListeners(); } setupEventListeners() { if (!this.form) return; // Echtzeit-Validierung bei Eingabe this.form.addEventListener('input', (e) => { this.validateField(e.target); }); // Formular-Submission this.form.addEventListener('submit', (e) => { if (!this.validateForm()) { e.preventDefault(); } }); } validateField(field) { const fieldName = field.name; const value = field.value; const rule = this.rules[fieldName]; if (!rule) return true; this.clearFieldError(field); // Required-Prüfung if (rule.required && (!value || value.trim() === '')) { this.addFieldError(field, 'Dieses Feld ist erforderlich.'); return false; } // Längenprüfung if (rule.minLength && value.length < rule.minLength) { this.addFieldError(field, `Mindestlänge: ${rule.minLength} Zeichen`); return false; } if (rule.maxLength && value.length > rule.maxLength) { this.addFieldError(field, `Maximallänge: ${rule.maxLength} Zeichen`); return false; } // Pattern-Prüfung if (rule.pattern && !new RegExp(rule.pattern).test(value)) { this.addFieldError(field, rule.patternMessage || 'Format ist ungültig'); return false; } // Email-Prüfung if (rule.type === 'email' && value) { const emailPattern = /^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/; if (!emailPattern.test(value)) { this.addFieldError(field, 'Bitte geben Sie eine gültige E-Mail-Adresse ein'); return false; } } // Custom Validierung if (rule.customValidator) { const customResult = rule.customValidator(value, field); if (customResult !== true) { this.addFieldError(field, customResult); return false; } } return true; } validateForm() { let isValid = true; this.errors = {}; // Alle Felder validieren const fields = this.form.querySelectorAll('input, textarea, select'); fields.forEach(field => { if (!this.validateField(field)) { isValid = false; } }); // Custom Form-Validierung if (this.rules._formValidator) { const formData = new FormData(this.form); const customResult = this.rules._formValidator(formData, this.form); if (customResult !== true) { this.addFormError(customResult); isValid = false; } } return isValid; } addFieldError(field, message) { const fieldName = field.name; // Error-Container finden oder erstellen let errorContainer = field.parentNode.querySelector('.field-error'); if (!errorContainer) { errorContainer = document.createElement('div'); errorContainer.className = 'field-error text-red-600 text-sm mt-1'; errorContainer.setAttribute('role', 'alert'); errorContainer.setAttribute('aria-live', 'polite'); field.parentNode.appendChild(errorContainer); } errorContainer.textContent = message; field.classList.add('border-red-500'); field.setAttribute('aria-invalid', 'true'); // Für Screen Reader if (!field.getAttribute('aria-describedby')) { const errorId = `error-${fieldName}-${Date.now()}`; errorContainer.id = errorId; field.setAttribute('aria-describedby', errorId); } this.errors[fieldName] = message; } clearFieldError(field) { const errorContainer = field.parentNode.querySelector('.field-error'); if (errorContainer) { errorContainer.remove(); } field.classList.remove('border-red-500'); field.removeAttribute('aria-invalid'); field.removeAttribute('aria-describedby'); delete this.errors[field.name]; } addFormError(message) { let formErrorContainer = this.form.querySelector('.form-error'); if (!formErrorContainer) { formErrorContainer = document.createElement('div'); formErrorContainer.className = 'form-error bg-red-100 border border-red-400 text-red-700 px-4 py-3 rounded mb-4'; formErrorContainer.setAttribute('role', 'alert'); this.form.insertBefore(formErrorContainer, this.form.firstChild); } formErrorContainer.textContent = message; } clearFormErrors() { const formErrorContainer = this.form.querySelector('.form-error'); if (formErrorContainer) { formErrorContainer.remove(); } } showServerErrors(errors) { // Server-Fehler anzeigen for (const [fieldName, messages] of Object.entries(errors)) { const field = this.form.querySelector(`[name="${fieldName}"]`); if (field && messages.length > 0) { this.addFieldError(field, messages[0]); } } } } // Utility-Funktionen window.FormValidationUtils = { // Passwort-Stärke prüfen validatePasswordStrength: (password) => { if (password.length < 8) return 'Passwort muss mindestens 8 Zeichen lang sein'; if (!/[A-Z]/.test(password)) return 'Passwort muss mindestens einen Großbuchstaben enthalten'; if (!/[a-z]/.test(password)) return 'Passwort muss mindestens einen Kleinbuchstaben enthalten'; if (!/[0-9]/.test(password)) return 'Passwort muss mindestens eine Zahl enthalten'; return true; }, // Passwort-Bestätigung prüfen validatePasswordConfirm: (password, confirm) => { return password === confirm ? true : 'Passwörter stimmen nicht überein'; }, // Datei-Validierung validateFile: (file, allowedTypes = [], maxSizeMB = 10) => { if (!file) return 'Bitte wählen Sie eine Datei aus'; const fileType = file.name.split('.').pop().toLowerCase(); if (allowedTypes.length > 0 && !allowedTypes.includes(fileType)) { return `Nur folgende Dateiformate sind erlaubt: ${allowedTypes.join(', ')}`; } if (file.size > maxSizeMB * 1024 * 1024) { return `Datei ist zu groß. Maximum: ${maxSizeMB} MB`; } return true; } }; """ def render_validation_errors(errors: Dict[str, List[str]]) -> str: """Rendert Validierungsfehler als HTML""" if not errors: return "" html_parts = ['