"""Garde-fous sécurité pour l'interface web. Ce module centralise quatre durcissements pour rendre Picarones déployable sur un Space HuggingFace public ou un serveur d'institution sans donner les clefs du royaume au premier visiteur : 1. **Mode public** (``PICARONES_PUBLIC_MODE=1``) — désactive les pipelines OCR+LLM et les moteurs OCR cloud, dont les clefs API sont mutualisées côté serveur (OPENAI_API_KEY, ANTHROPIC_API_KEY, MISTRAL_API_KEY, etc.). Sans ce garde-fou, n'importe quel visiteur consomme le quota du mainteneur via 10 lignes de ``curl``. 2. **Browse roots restreints** — ``PICARONES_BROWSE_ROOTS`` (chemins séparés par ``:``) remplace la liste hardcodée. Par défaut, uniquement ``./uploads/`` est exposé en mode public ; en mode ``dev`` on conserve l'ancien comportement (cwd, ``/workspaces``, ``tempdir``). 3. **Validation des images uploadées** — appel à ``Image.verify()`` dans un ``try/except`` capturant ``DecompressionBombError``, ``UnidentifiedImageError`` et l'exception générique de Pillow. Limite de taille via ``PICARONES_MAX_UPLOAD_MB`` (défaut 100). 4. **Rate limiting + plafond de jobs concurrents** — limiteur en mémoire par IP (``PICARONES_RATE_LIMIT_PER_HOUR``) et sémaphore global (``PICARONES_MAX_CONCURRENT_JOBS``). Le tout est piloté par variables d'environnement pour ne pas obliger un mainteneur à patcher du code lors du passage à la prod. """ from __future__ import annotations import io import logging import os import threading import time from collections import deque from pathlib import Path from typing import Iterable logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Mode public # --------------------------------------------------------------------------- #: Identifiants de moteurs cloud dont les clefs API sont mutualisées côté #: serveur. En mode public on refuse toute requête qui les invoque. CLOUD_OCR_ENGINES: frozenset[str] = frozenset({ "mistral_ocr", "google_vision", "azure_doc_intel", }) #: Identifiants de fournisseurs LLM facturés à la clef serveur. CLOUD_LLM_PROVIDERS: frozenset[str] = frozenset({ "openai", "anthropic", "mistral", "ollama", # local mais quand même mutualisé }) def is_public_mode() -> bool: """Vrai si l'instance tourne en mode public (HuggingFace Space, etc.).""" return os.environ.get("PICARONES_PUBLIC_MODE", "").strip() in ("1", "true", "yes") def assert_engines_allowed(engines: Iterable[str]) -> None: """Lève ``PermissionError`` si la liste contient un moteur cloud bloqué. Réponse à utiliser côté FastAPI : ``HTTPException(403, str(exc))``. """ if not is_public_mode(): return banned = [e for e in engines if e in CLOUD_OCR_ENGINES] if banned: raise PermissionError( "Mode public actif (PICARONES_PUBLIC_MODE=1) — les moteurs OCR " f"cloud sont désactivés : {', '.join(banned)}. Faites tourner " "Picarones localement ou désactivez le mode public." ) def assert_llm_provider_allowed(llm_provider: str) -> None: """Lève ``PermissionError`` si un LLM mutualisé est sollicité en mode public.""" if not is_public_mode(): return if llm_provider and llm_provider.strip() in CLOUD_LLM_PROVIDERS: raise PermissionError( "Mode public actif — les pipelines OCR+LLM sont désactivés " f"(provider '{llm_provider}'). En production institutionnelle, " "exiger une clef API utilisateur via l'en-tête X-User-API-Key." ) # --------------------------------------------------------------------------- # Validation des chemins utilisateur (Sprint A14-S1, A.I.0 P0) # # Ré-importé depuis le foyer définitif ``picarones.app.services.path_security`` # (Sprint A14-S19). Pas de duplication — le code vit en un seul # endroit dans la couche app, accessible aussi par la CLI et les jobs # background. # --------------------------------------------------------------------------- from picarones.app.services.path_security import ( # noqa: F401 PathValidationError, safe_report_name, validated_path, validated_prompt_filename, ) from picarones.app.services.path_security import _is_within # noqa: F401 # --------------------------------------------------------------------------- # Browse roots # --------------------------------------------------------------------------- def compute_browse_roots(uploads_dir: Path) -> list[Path]: """Retourne la liste de répertoires autorisés pour ``/api/corpus/browse``. - Variable d'env ``PICARONES_BROWSE_ROOTS`` (séparateur ``os.pathsep``, ``:`` sur Linux/macOS, ``;`` sur Windows) : prioritaire si définie. - Sinon, mode public ⇒ uniquement ``uploads_dir``. - Sinon, mode dev (défaut) ⇒ cwd + uploads_dir + ``/workspaces`` (Codespaces) + ``tempdir`` (compatibilité ascendante). """ raw = os.environ.get("PICARONES_BROWSE_ROOTS") if raw: roots = [Path(p).resolve() for p in raw.split(os.pathsep) if p.strip()] return roots if is_public_mode(): return [uploads_dir.resolve()] import tempfile return [ Path(".").resolve(), uploads_dir.resolve(), Path("/workspaces").resolve(), Path(tempfile.gettempdir()).resolve(), ] def compute_workspace_roots(uploads_dir: Path) -> list[Path]: """Retourne les racines autorisées pour les opérations de benchmark. Sprint A14-S1 — A.I.0 P0 : utilisé par les endpoints ``/api/benchmark/start`` et ``/api/benchmark/run`` pour valider ``corpus_path`` et ``output_dir`` via :func:`validated_path`. Sémantique : - Si ``PICARONES_WORKSPACE_ROOTS`` est défini, prend précédence absolue (admin sait ce qu'il fait). - Sinon, en mode public : uniquement ``uploads_dir`` (lecture) et ``./rapports`` (écriture des rapports générés). - Sinon, mode dev : ``compute_browse_roots`` + ``./rapports`` + ``./corpus`` (corpus locaux des développeurs). En production institutionnelle, exporter ``PICARONES_WORKSPACE_ROOTS`` pour épingler explicitement les répertoires autorisés. """ raw = os.environ.get("PICARONES_WORKSPACE_ROOTS") if raw: return [Path(p).expanduser().resolve() for p in raw.split(os.pathsep) if p.strip()] base = compute_browse_roots(uploads_dir) extras = [ Path("./rapports").resolve(), Path("./corpus").resolve(), ] seen: set[Path] = set() out: list[Path] = [] for p in base + extras: if p not in seen: seen.add(p) out.append(p) return out # --------------------------------------------------------------------------- # Validation des images uploadées # --------------------------------------------------------------------------- def get_max_upload_mb() -> int: raw = os.environ.get("PICARONES_MAX_UPLOAD_MB", "100") try: return max(1, int(raw)) except ValueError: logger.warning( "[security] PICARONES_MAX_UPLOAD_MB invalide (%r) — défaut 100 Mo.", raw ) return 100 def validate_image_safe(data: bytes, filename: str = "") -> None: """Vérifie qu'un buffer décode comme une image valide sans bombe. Levée de ``ValueError`` (à mapper en HTTP 415/422) si : - taille > limite ; - Pillow rejette l'image (UnidentifiedImageError, DecompressionBombError) ; - le format ouvert ne correspond pas à ce que prétend l'extension. On ne bloque pas l'absence de Pillow (il est dépendance core), mais on log si l'import échoue pour aider au diagnostic. """ max_mb = get_max_upload_mb() if len(data) > max_mb * 1024 * 1024: raise ValueError( f"Image '{filename}' refusée : taille {len(data) / (1024 * 1024):.1f} Mo > " f"limite {max_mb} Mo (PICARONES_MAX_UPLOAD_MB)." ) try: from PIL import Image, UnidentifiedImageError except ImportError as exc: # pragma: no cover — Pillow est core logger.warning("[security] Pillow indisponible — validation image sautée : %s", exc) return try: with Image.open(io.BytesIO(data)) as im: im.verify() except UnidentifiedImageError as exc: raise ValueError( f"Image '{filename}' refusée : format non reconnu par Pillow ({exc})." ) from exc except Image.DecompressionBombError as exc: raise ValueError( f"Image '{filename}' refusée : bombe de décompression détectée ({exc})." ) from exc except Exception as exc: # Pillow lève un panel d'exceptions hétérogènes (SyntaxError sur les # GIF malformés, OSError sur les TIFF corrompus, ValueError divers). raise ValueError( f"Image '{filename}' refusée : erreur de décodage Pillow ({type(exc).__name__}: {exc})." ) from exc # --------------------------------------------------------------------------- # Rate limiting + concurrence # --------------------------------------------------------------------------- def get_max_concurrent_jobs() -> int: raw = os.environ.get("PICARONES_MAX_CONCURRENT_JOBS", "2") try: return max(1, int(raw)) except ValueError: logger.warning( "[security] PICARONES_MAX_CONCURRENT_JOBS invalide (%r) — défaut 2.", raw ) return 2 def get_rate_limit_per_hour() -> int: """Nombre maximal de jobs lancés par IP et par heure (mode public). En mode dev, on ne limite pas (retourne 0 = illimité). """ if not is_public_mode(): return 0 raw = os.environ.get("PICARONES_RATE_LIMIT_PER_HOUR", "5") try: return max(0, int(raw)) except ValueError: return 5 class RateLimiter: """Limiteur de débit en mémoire, fenêtre glissante par IP. Implémentation volontairement simple : un ``deque`` de timestamps par IP avec purge paresseuse. Suffisant pour un Space HF (RAM constante, ~1 Ko par IP active). Pour de l'institutionnel multi-replica, voir Sprint 26 (file SQLite partagée). """ def __init__(self, max_per_hour: int): self.max_per_hour = max_per_hour self._buckets: dict[str, deque[float]] = {} self._lock = threading.Lock() def check(self, ip: str) -> None: """Lève ``PermissionError`` si ``ip`` dépasse le quota horaire.""" if self.max_per_hour <= 0: return # désactivé now = time.monotonic() cutoff = now - 3600.0 with self._lock: bucket = self._buckets.setdefault(ip, deque()) while bucket and bucket[0] < cutoff: bucket.popleft() if len(bucket) >= self.max_per_hour: # Temps avant que le plus ancien hit ne sorte de la fenêtre retry_after = max(1, int(bucket[0] + 3600.0 - now)) raise PermissionError( f"Quota dépassé : {self.max_per_hour} jobs/heure max. " f"Réessayer dans {retry_after} s." ) bucket.append(now) def reset(self) -> None: """Vide complètement les buckets (utile aux tests).""" with self._lock: self._buckets.clear() # --------------------------------------------------------------------------- # CSP middleware # --------------------------------------------------------------------------- def is_huggingface_space() -> bool: """Vrai si l'instance tourne dans un HuggingFace Space. HuggingFace injecte ``SPACE_ID`` (au format ``user/space``) dans l'environnement du container — c'est le marqueur canonique documenté par HuggingFace, présent quel que soit le SDK (Docker, Streamlit, Gradio…). On l'utilise pour adapter automatiquement la CSP : un Space est servi via une ``