"""Garde-fous sécurité pour l'interface web. Ce module centralise quatre durcissements pour rendre Picarones déployable sur un Space HuggingFace public ou un serveur d'institution sans donner les clefs du royaume au premier visiteur : 1. **Mode public** (``PICARONES_PUBLIC_MODE=1``) — désactive les pipelines OCR+LLM et les moteurs OCR cloud, dont les clefs API sont mutualisées côté serveur (OPENAI_API_KEY, ANTHROPIC_API_KEY, MISTRAL_API_KEY, etc.). Sans ce garde-fou, n'importe quel visiteur consomme le quota du mainteneur via 10 lignes de ``curl``. 2. **Browse roots restreints** — ``PICARONES_BROWSE_ROOTS`` (chemins séparés par ``:``) remplace la liste hardcodée. Par défaut, uniquement ``./uploads/`` est exposé en mode public ; en mode ``dev`` on conserve l'ancien comportement (cwd, ``/workspaces``, ``tempdir``). 3. **Validation des images uploadées** — appel à ``Image.verify()`` dans un ``try/except`` capturant ``DecompressionBombError``, ``UnidentifiedImageError`` et l'exception générique de Pillow. Limite de taille via ``PICARONES_MAX_UPLOAD_MB`` (défaut 100). 4. **Rate limiting + plafond de jobs concurrents** — limiteur en mémoire par IP (``PICARONES_RATE_LIMIT_PER_HOUR``) et sémaphore global (``PICARONES_MAX_CONCURRENT_JOBS``). Le tout est piloté par variables d'environnement pour ne pas obliger un mainteneur à patcher du code lors du passage à la prod. """ from __future__ import annotations import io import logging import os import threading import time from collections import deque from pathlib import Path from typing import Iterable logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Mode public # --------------------------------------------------------------------------- #: Identifiants de moteurs cloud dont les clefs API sont mutualisées côté #: serveur. En mode public on refuse toute requête qui les invoque. CLOUD_OCR_ENGINES: frozenset[str] = frozenset({ "mistral_ocr", "google_vision", "azure_doc_intel", }) #: Identifiants de fournisseurs LLM facturés à la clef serveur. CLOUD_LLM_PROVIDERS: frozenset[str] = frozenset({ "openai", "anthropic", "mistral", "ollama", # local mais quand même mutualisé }) def is_public_mode() -> bool: """Vrai si l'instance tourne en mode public (HuggingFace Space, etc.).""" return os.environ.get("PICARONES_PUBLIC_MODE", "").strip() in ("1", "true", "yes") def assert_engines_allowed(engines: Iterable[str]) -> None: """Lève ``PermissionError`` si la liste contient un moteur cloud bloqué. Réponse à utiliser côté FastAPI : ``HTTPException(403, str(exc))``. """ if not is_public_mode(): return banned = [e for e in engines if e in CLOUD_OCR_ENGINES] if banned: raise PermissionError( "Mode public actif (PICARONES_PUBLIC_MODE=1) — les moteurs OCR " f"cloud sont désactivés : {', '.join(banned)}. Faites tourner " "Picarones localement ou désactivez le mode public." ) def assert_llm_provider_allowed(llm_provider: str) -> None: """Lève ``PermissionError`` si un LLM mutualisé est sollicité en mode public.""" if not is_public_mode(): return if llm_provider and llm_provider.strip() in CLOUD_LLM_PROVIDERS: raise PermissionError( "Mode public actif — les pipelines OCR+LLM sont désactivés " f"(provider '{llm_provider}'). En production institutionnelle, " "exiger une clef API utilisateur via l'en-tête X-User-API-Key." ) # --------------------------------------------------------------------------- # Validation des chemins utilisateur (Sprint A14-S1, A.I.0 P0) # --------------------------------------------------------------------------- class PathValidationError(ValueError): """Levée quand un chemin utilisateur sort de la zone autorisée.""" def validated_path( user_path: str, allowed_roots: list[Path], must_exist: bool = False, must_be_dir: bool = False, ) -> Path: """Résout un chemin utilisateur et vérifie qu'il reste dans une racine autorisée. Garde-fou central contre la traversée de répertoires (path traversal) et l'écriture/lecture arbitraire dans le système de fichiers du serveur. Avant ce sprint, les endpoints ``/api/benchmark/*`` acceptaient n'importe quel ``corpus_path`` ou ``output_dir`` validé uniquement par ``Path.exists()`` — ce qui permettait à un client de pousser le serveur à lire/écrire en dehors de ses propres workspaces, dans la limite des permissions du process. Algorithme : 1. Refuse les chemins vides ou contenant des octets nuls. 2. Résout le chemin de manière absolue (``Path.resolve()``) — ça écrase ``..``, les liens symboliques et les chemins relatifs. 3. Vérifie que le résultat est ``.is_relative_to(root)`` pour au moins une des ``allowed_roots`` (elles aussi pré-résolues). 4. Optionnellement : vérifie l'existence et le type (dir). Parameters ---------- user_path: Chemin tel que reçu de l'utilisateur (str). Peut être absolu ou relatif. allowed_roots: Liste de répertoires racines (``Path``) au sein desquels le chemin résolu doit se trouver. Liste vide = tout refuser. must_exist: Si ``True``, exige que le chemin résolu existe sur le disque après validation. must_be_dir: Si ``True``, exige que le chemin résolu existe ET soit un répertoire. Implique ``must_exist=True``. Returns ------- Path Chemin résolu absolu, garanti dans une des racines autorisées. Raises ------ PathValidationError Si le chemin est vide, contient un octet nul, sort des racines autorisées, ou ne satisfait pas ``must_exist`` / ``must_be_dir``. """ if not user_path or not user_path.strip(): raise PathValidationError("Chemin vide.") if "\x00" in user_path: raise PathValidationError("Chemin contient un octet nul.") if not allowed_roots: raise PathValidationError( "Aucune racine autorisée — refus de toute requête de chemin." ) try: resolved = Path(user_path).expanduser().resolve() except (OSError, RuntimeError) as exc: raise PathValidationError(f"Chemin invalide : {exc}") from exc resolved_roots = [Path(r).expanduser().resolve() for r in allowed_roots] if not any(_is_within(resolved, root) for root in resolved_roots): raise PathValidationError( f"Chemin hors zone autorisée : {user_path!r}. " f"Racines acceptées : {[str(r) for r in resolved_roots]}." ) if must_be_dir or must_exist: if not resolved.exists(): raise PathValidationError(f"Chemin inexistant : {user_path!r}.") if must_be_dir and not resolved.is_dir(): raise PathValidationError(f"Chemin n'est pas un répertoire : {user_path!r}.") return resolved def _is_within(child: Path, parent: Path) -> bool: """Vrai si ``child`` est ``parent`` ou un descendant. ``Path.is_relative_to`` n'apparaît qu'en Python 3.9 — on l'utilise via try/except pour rester explicite sur l'intention sans dépendre du comportement exact de la stdlib selon la version. """ try: child.relative_to(parent) return True except ValueError: return False def validated_prompt_filename(name: str) -> str: """Valide qu'un ``prompt_file`` web est un simple nom de fichier sûr. Sprint A14-S1 — A.I.0 P0 : le pipeline OCR+LLM lit un prompt depuis le disque via ``picarones.pipelines.base._load_prompt``, qui acceptait n'importe quel chemin absolu existant. En contexte web, ça permettait à un utilisateur d'API de pousser le serveur à lire un fichier arbitraire (``/etc/passwd``, ``.env``, etc.) puis à l'envoyer comme prompt à un LLM externe — vecteur classique d'exfiltration via tokens. Cette fonction restreint la valeur reçue à un simple nom de fichier de la **bibliothèque de prompts intégrée** (``picarones/prompts/``). Pas de ``/``, pas de ``\\``, pas de ``..``, pas d'absolu. Le caller (web layer) est responsable d'appeler cette fonction AVANT de transmettre la valeur au pipeline. Returns ------- str Nom de fichier validé (basename uniquement). Raises ------ PathValidationError Si la valeur contient un séparateur de chemin, un caractère de contrôle, ou ressemble à un chemin absolu/relatif suspect. """ if not name: raise PathValidationError("Nom de prompt vide.") if "\x00" in name: raise PathValidationError("Nom de prompt contient un octet nul.") if any(c in name for c in ("/", "\\")): raise PathValidationError( f"Nom de prompt invalide (séparateur de chemin) : {name!r}. " "Le web n'accepte que les prompts de la bibliothèque intégrée " "(``picarones/prompts/``) — fournir le simple nom de fichier." ) if name.startswith(".") or ".." in name: raise PathValidationError( f"Nom de prompt suspect : {name!r}. " "Refus des préfixes ``.`` et des séquences ``..``." ) if any(ord(c) < 0x20 for c in name): raise PathValidationError("Nom de prompt contient un caractère de contrôle.") return name def safe_report_name(name: str, max_length: int = 128) -> str: """Sanitize un nom de rapport utilisateur en composant de chemin sûr. Refuse les séparateurs de chemin (``/``, ``\\``), les caractères de contrôle, les octets nuls. Tronque à ``max_length``. Si la chaîne devient vide après nettoyage, lève ``PathValidationError``. Cette fonction NE produit PAS un chemin — elle produit un nom qu'un caller peut concaténer à un répertoire qu'il a déjà validé avec ``validated_path``. """ if not name: raise PathValidationError("Nom de rapport vide.") if "\x00" in name: raise PathValidationError("Nom de rapport contient un octet nul.") # Refus explicite de tout séparateur de chemin et de caractères de contrôle. bad = set("/\\") cleaned = "".join( c for c in name if c not in bad and ord(c) >= 0x20 ) cleaned = cleaned.strip().strip(".") # pas de "." en début/fin (caché Unix, extension forçée) if not cleaned: raise PathValidationError(f"Nom de rapport invalide après nettoyage : {name!r}.") if cleaned in (".", "..", ""): raise PathValidationError(f"Nom de rapport réservé : {name!r}.") return cleaned[:max_length] # --------------------------------------------------------------------------- # Browse roots # --------------------------------------------------------------------------- def compute_browse_roots(uploads_dir: Path) -> list[Path]: """Retourne la liste de répertoires autorisés pour ``/api/corpus/browse``. - Variable d'env ``PICARONES_BROWSE_ROOTS`` (séparateur ``os.pathsep``, ``:`` sur Linux/macOS, ``;`` sur Windows) : prioritaire si définie. - Sinon, mode public ⇒ uniquement ``uploads_dir``. - Sinon, mode dev (défaut) ⇒ cwd + uploads_dir + ``/workspaces`` (Codespaces) + ``tempdir`` (compatibilité ascendante). """ raw = os.environ.get("PICARONES_BROWSE_ROOTS") if raw: roots = [Path(p).resolve() for p in raw.split(os.pathsep) if p.strip()] return roots if is_public_mode(): return [uploads_dir.resolve()] import tempfile return [ Path(".").resolve(), uploads_dir.resolve(), Path("/workspaces").resolve(), Path(tempfile.gettempdir()).resolve(), ] def compute_workspace_roots(uploads_dir: Path) -> list[Path]: """Retourne les racines autorisées pour les opérations de benchmark. Sprint A14-S1 — A.I.0 P0 : utilisé par les endpoints ``/api/benchmark/start`` et ``/api/benchmark/run`` pour valider ``corpus_path`` et ``output_dir`` via :func:`validated_path`. Sémantique : - Si ``PICARONES_WORKSPACE_ROOTS`` est défini, prend précédence absolue (admin sait ce qu'il fait). - Sinon, en mode public : uniquement ``uploads_dir`` (lecture) et ``./rapports`` (écriture des rapports générés). - Sinon, mode dev : ``compute_browse_roots`` + ``./rapports`` + ``./corpus`` (corpus locaux des développeurs). En production institutionnelle, exporter ``PICARONES_WORKSPACE_ROOTS`` pour épingler explicitement les répertoires autorisés. """ raw = os.environ.get("PICARONES_WORKSPACE_ROOTS") if raw: return [Path(p).expanduser().resolve() for p in raw.split(os.pathsep) if p.strip()] base = compute_browse_roots(uploads_dir) extras = [ Path("./rapports").resolve(), Path("./corpus").resolve(), ] seen: set[Path] = set() out: list[Path] = [] for p in base + extras: if p not in seen: seen.add(p) out.append(p) return out # --------------------------------------------------------------------------- # Validation des images uploadées # --------------------------------------------------------------------------- def get_max_upload_mb() -> int: raw = os.environ.get("PICARONES_MAX_UPLOAD_MB", "100") try: return max(1, int(raw)) except ValueError: logger.warning( "[security] PICARONES_MAX_UPLOAD_MB invalide (%r) — défaut 100 Mo.", raw ) return 100 def validate_image_safe(data: bytes, filename: str = "") -> None: """Vérifie qu'un buffer décode comme une image valide sans bombe. Levée de ``ValueError`` (à mapper en HTTP 415/422) si : - taille > limite ; - Pillow rejette l'image (UnidentifiedImageError, DecompressionBombError) ; - le format ouvert ne correspond pas à ce que prétend l'extension. On ne bloque pas l'absence de Pillow (il est dépendance core), mais on log si l'import échoue pour aider au diagnostic. """ max_mb = get_max_upload_mb() if len(data) > max_mb * 1024 * 1024: raise ValueError( f"Image '{filename}' refusée : taille {len(data) / (1024 * 1024):.1f} Mo > " f"limite {max_mb} Mo (PICARONES_MAX_UPLOAD_MB)." ) try: from PIL import Image, UnidentifiedImageError except ImportError as exc: # pragma: no cover — Pillow est core logger.warning("[security] Pillow indisponible — validation image sautée : %s", exc) return try: with Image.open(io.BytesIO(data)) as im: im.verify() except UnidentifiedImageError as exc: raise ValueError( f"Image '{filename}' refusée : format non reconnu par Pillow ({exc})." ) from exc except Image.DecompressionBombError as exc: raise ValueError( f"Image '{filename}' refusée : bombe de décompression détectée ({exc})." ) from exc except Exception as exc: # Pillow lève un panel d'exceptions hétérogènes (SyntaxError sur les # GIF malformés, OSError sur les TIFF corrompus, ValueError divers). raise ValueError( f"Image '{filename}' refusée : erreur de décodage Pillow ({type(exc).__name__}: {exc})." ) from exc # --------------------------------------------------------------------------- # Rate limiting + concurrence # --------------------------------------------------------------------------- def get_max_concurrent_jobs() -> int: raw = os.environ.get("PICARONES_MAX_CONCURRENT_JOBS", "2") try: return max(1, int(raw)) except ValueError: logger.warning( "[security] PICARONES_MAX_CONCURRENT_JOBS invalide (%r) — défaut 2.", raw ) return 2 def get_rate_limit_per_hour() -> int: """Nombre maximal de jobs lancés par IP et par heure (mode public). En mode dev, on ne limite pas (retourne 0 = illimité). """ if not is_public_mode(): return 0 raw = os.environ.get("PICARONES_RATE_LIMIT_PER_HOUR", "5") try: return max(0, int(raw)) except ValueError: return 5 class RateLimiter: """Limiteur de débit en mémoire, fenêtre glissante par IP. Implémentation volontairement simple : un ``deque`` de timestamps par IP avec purge paresseuse. Suffisant pour un Space HF (RAM constante, ~1 Ko par IP active). Pour de l'institutionnel multi-replica, voir Sprint 26 (file SQLite partagée). """ def __init__(self, max_per_hour: int): self.max_per_hour = max_per_hour self._buckets: dict[str, deque[float]] = {} self._lock = threading.Lock() def check(self, ip: str) -> None: """Lève ``PermissionError`` si ``ip`` dépasse le quota horaire.""" if self.max_per_hour <= 0: return # désactivé now = time.monotonic() cutoff = now - 3600.0 with self._lock: bucket = self._buckets.setdefault(ip, deque()) while bucket and bucket[0] < cutoff: bucket.popleft() if len(bucket) >= self.max_per_hour: # Temps avant que le plus ancien hit ne sorte de la fenêtre retry_after = max(1, int(bucket[0] + 3600.0 - now)) raise PermissionError( f"Quota dépassé : {self.max_per_hour} jobs/heure max. " f"Réessayer dans {retry_after} s." ) bucket.append(now) def reset(self) -> None: """Vide complètement les buckets (utile aux tests).""" with self._lock: self._buckets.clear() # --------------------------------------------------------------------------- # CSP middleware # --------------------------------------------------------------------------- def is_huggingface_space() -> bool: """Vrai si l'instance tourne dans un HuggingFace Space. HuggingFace injecte ``SPACE_ID`` (au format ``user/space``) dans l'environnement du container — c'est le marqueur canonique documenté par HuggingFace, présent quel que soit le SDK (Docker, Streamlit, Gradio…). On l'utilise pour adapter automatiquement la CSP : un Space est servi via une ``