"""Banc d'essai de pipelines composées — Sprint 63 (axe B). Sprint 63 — Étape 4 / axe B du plan d'évolution 2026 : démarrage du banc d'essai de pipelines. Philosophie ----------- Picarones est un **banc d'essai**, pas un atelier de production. Cette infrastructure permet d'**évaluer des pipelines composées de modules tiers** que l'utilisateur amène — par exemple : - ``[OCR(image→texte)] → [reconstructeur ALTO tiers(texte→ALTO)]`` - ``[VLM(image→ALTO)] → [post-processing tiers(ALTO→ALTO)]`` - ``[OCR(image→texte)] → [LLM correcteur(texte→texte)]`` Picarones **ne fournit aucun module métier** (pas de reconstructeur ALTO, pas de correcteur, pas de re-segmenteur). L'utilisateur branche ses propres ``BaseModule`` (Sprint 33), le runner orchestre l'exécution séquentielle, valide les types aux jonctions et **évalue automatiquement** chaque artefact produit contre la GT du même niveau (Sprint 32) en sélectionnant les métriques pertinentes du registre typé (Sprint 34). Périmètre Sprint 63 ------------------- Inclus : - Spécification déclarative d'une pipeline séquentielle. - Exécution sur un seul document avec passage typé d'artefacts. - Validation des types aux jonctions inter-modules. - Évaluation automatique aux jonctions GT-vs-sortie pour chaque niveau de GT disponible sur le document. - Mesure du temps par étape. - Capture gracieuse des erreurs (un module qui lève n'arrête pas les étapes suivantes — leur entrée manquante est rapportée comme erreur explicite). Reporté à des sprints dédiés : - DAG branchant non séquentiel (1 → {2, 3} → 4) — Sprint 64+. - Orchestration corpus-wide + agrégation par pipeline — Sprint 65+. - Vue HTML dédiée aux pipelines composées — Sprint 66+. - Cache d'artefacts intermédiaires — non prévu. - Parallélisation inter-étapes — non prévue (les modules ``execution_mode`` sont déjà respectés par le runner historique pour le bench OCR mono-étage). """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from typing import Any, Optional from picarones.core.corpus import Document, GTLevel from picarones.core.metric_registry import compute_at_junction from picarones.core.modules import ArtifactType, BaseModule # Eager-load des modules qui enregistrent des métriques dans le # registre typé (Sprint 34) — sans ces imports, ``compute_at_junction`` # trouverait un registre vide et ne calculerait rien aux jonctions. # Sprint 34 : cer / wer / mer / wil + stub TEXT→ALTO import picarones.core.builtin_metrics # noqa: F401 # Sprints 55-60 : métriques philologiques. import picarones.core.unicode_blocks # noqa: F401 import picarones.core.abbreviations # noqa: F401 import picarones.core.mufi # noqa: F401 import picarones.core.early_modern_typography # noqa: F401 import picarones.core.modern_archives # noqa: F401 import picarones.core.roman_numerals # noqa: F401 # Sprint 53 : reading order F1. Sprints 38, 52 : NER, readability. import picarones.core.reading_order # noqa: F401 import picarones.core.readability # noqa: F401 import picarones.core.ner # noqa: F401 # Chantier 1 (post-Sprint 97) : métriques (ALTO, ALTO) pour évaluer # les reconstructeurs ALTO contre une GT ALTO du document. import picarones.core.alto_metrics # noqa: F401 logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────────── # Conversion ArtifactType <-> GTLevel # ────────────────────────────────────────────────────────────────────────── def _artifact_type_to_gt_level(at: ArtifactType) -> Optional[GTLevel]: """Retourne le ``GTLevel`` correspondant à un ``ArtifactType``. ``IMAGE`` n'a pas de correspondance GT (on n'évalue pas une image en sortie d'un module — c'est typiquement une entrée). """ if at == ArtifactType.IMAGE: return None try: return GTLevel(at.value) except ValueError: return None # ────────────────────────────────────────────────────────────────────────── # PipelineStep + PipelineSpec # ────────────────────────────────────────────────────────────────────────── @dataclass class PipelineStep: """Une étape dans une pipeline composée. L'étape porte un nom lisible (utile pour le rapport et le diagnostic) et une instance de ``BaseModule`` fournie par l'utilisateur. Les types d'entrée et de sortie ne sont pas redéclarés ici : ils sont lus depuis le module lui-même (``module.input_types`` / ``module.output_types``). Sprint 66 — DAG branchant ------------------------- ``inputs_from`` permet de désigner explicitement, pour chaque type d'entrée, l'étape source dont on veut consommer l'artefact. Utile quand plusieurs étapes antérieures produisent le même type et qu'on veut éviter l'écrasement implicite (par exemple deux correcteurs LLM en parallèle qui partent du même OCR). - ``inputs_from = {}`` (défaut) : pour chaque type d'entrée, le runner prend la version **la plus récente** disponible dans le bag (comportement Sprint 63, rétrocompat stricte). - ``inputs_from = {ArtifactType.TEXT: "ocr"}`` : exige la version du ``TEXT`` produite par l'étape nommée ``"ocr"``. Si cette étape n'existe pas ou n'a pas produit ce type, ``PipelineSpec.validate`` remonte un problème explicite et le runner remonte une erreur d'entrée manquante. La chaîne spéciale ``"__initial__"`` désigne les artefacts fournis dans ``initial_inputs`` (par exemple ``IMAGE``). """ name: str module: BaseModule inputs_from: dict[ArtifactType, str] = field(default_factory=dict) @property def input_types(self) -> tuple[ArtifactType, ...]: return tuple(self.module.input_types) @property def output_types(self) -> tuple[ArtifactType, ...]: return tuple(self.module.output_types) def __repr__(self) -> str: ins = ",".join(t.value for t in self.input_types) or "·" outs = ",".join(t.value for t in self.output_types) or "·" if self.inputs_from: refs = ",".join( f"{t.value}@{src}" for t, src in self.inputs_from.items() ) return f"PipelineStep({self.name}: [{refs}] → {outs})" return f"PipelineStep({self.name}: {ins} → {outs})" @dataclass class PipelineSpec: """DAG séquentiel de ``PipelineStep``. Sprint 63 — séquentiel uniquement : l'étape ``i+1`` consomme les artefacts produits par l'étape ``i`` (et tous les artefacts initiaux fournis au runner, par exemple l'image source). Le DAG branchant arrive dans un sprint dédié. """ name: str steps: list[PipelineStep] = field(default_factory=list) def validate(self, initial_inputs: tuple[ArtifactType, ...]) -> list[str]: """Vérifie que les types s'enchaînent et retourne la liste des problèmes détectés (vide si la pipeline est valide). Une pipeline est valide si, pour chaque étape, tous les ``input_types`` sont disponibles : soit dans les ``initial_inputs`` (typiquement ``IMAGE``), soit produits par une étape antérieure. Sprint 66 — validation des références ``inputs_from`` : si une étape déclare ``inputs_from[type] = "foo"``, l'étape ``foo`` doit exister parmi les étapes antérieures et avoir ce type dans ses ``output_types``. La chaîne spéciale ``"__initial__"`` désigne les entrées initiales. """ problems: list[str] = [] if not self.steps: problems.append("pipeline vide : au moins une étape est requise") return problems # Map type → set des steps qui ont produit ce type # ("__initial__" pour les entrées initiales) — utilisé pour # valider les références ``inputs_from``. producers: dict[ArtifactType, set[str]] = { t: {"__initial__"} for t in initial_inputs } # Map step_name → set des types produits, pour la validation # des références. step_outputs: dict[str, set[ArtifactType]] = { "__initial__": set(initial_inputs), } # Set des types disponibles à un instant t (latest seulement). available: set[ArtifactType] = set(initial_inputs) for i, step in enumerate(self.steps): # 1. Toutes les entrées doivent être disponibles missing = [t for t in step.input_types if t not in available] if missing: miss_str = ",".join(t.value for t in missing) problems.append( f"étape {i} ({step.name}) demande {miss_str} " f"qui n'est ni dans les entrées initiales " f"ni produit par une étape antérieure" ) # 2. Vérification des références ``inputs_from`` for ref_type, ref_step in step.inputs_from.items(): if ref_type not in step.input_types: problems.append( f"étape {i} ({step.name}) déclare " f"inputs_from[{ref_type.value}]={ref_step!r} " f"mais le module ne consomme pas ce type" ) continue if ref_step not in step_outputs: problems.append( f"étape {i} ({step.name}) référence " f"inputs_from[{ref_type.value}]={ref_step!r} " f"qui n'est pas une étape antérieure connue" ) continue if ref_type not in step_outputs[ref_step]: problems.append( f"étape {i} ({step.name}) référence " f"inputs_from[{ref_type.value}]={ref_step!r} " f"mais cette étape ne produit pas ce type" ) # 3. Mise à jour pour les étapes suivantes available.update(step.output_types) step_outputs[step.name] = set(step.output_types) for out_type in step.output_types: producers.setdefault(out_type, set()).add(step.name) return problems def is_valid(self, initial_inputs: tuple[ArtifactType, ...]) -> bool: return not self.validate(initial_inputs) def __repr__(self) -> str: chain = " → ".join(str(s) for s in self.steps) return f"PipelineSpec({self.name}: {chain})" # ────────────────────────────────────────────────────────────────────────── # StepResult + PipelineResult # ────────────────────────────────────────────────────────────────────────── @dataclass class StepResult: """Résultat de l'exécution d'une étape sur un document. Champs ------ step_name: Nom de l'étape (cf. ``PipelineStep.name``). duration_seconds: Temps d'exécution de ``module.process`` mesuré en wall-clock. output_types: Types effectivement présents dans la sortie (peut être un sous-ensemble de ``module.output_types`` si le module a omis un type — cas reporté ici comme info pour diagnostic). junction_metrics: Pour chaque type produit qui correspond à un ``GTLevel`` dont le document porte une GT : dictionnaire ``{type: dict métriques}`` retourné par ``compute_at_junction``. error: ``None`` si l'étape s'est bien déroulée ; sinon message d'erreur (le module a levé, l'entrée est manquante, ou la validation des types a échoué). """ step_name: str duration_seconds: float output_types: tuple[ArtifactType, ...] junction_metrics: dict[str, dict[str, Any]] = field(default_factory=dict) """Map ``{artifact_type_value: {metric_name: value}}``. La clé est la valeur string du ``ArtifactType`` (ex. ``"text"``, ``"alto"``) et non l'enum lui-même, pour faciliter la sérialisation JSON. """ error: Optional[str] = None @dataclass class PipelineResult: """Résultat complet d'une exécution de pipeline sur un document. On capture la durée totale, la durée par étape et les métriques aux jonctions pour chaque artefact produit qui a une GT correspondante. """ pipeline_name: str doc_id: str steps: list[StepResult] = field(default_factory=list) total_duration_seconds: float = 0.0 error: Optional[str] = None """Erreur fatale au niveau pipeline (ex. validation des types en amont avant la première étape). ``None`` n'implique pas qu'aucune étape n'a échoué — voir ``StepResult.error`` pour le détail par étape.""" @property def succeeded(self) -> bool: """Vrai si la pipeline s'est exécutée jusqu'au bout sans qu'aucune étape ne lève d'erreur.""" if self.error is not None: return False return all(s.error is None for s in self.steps) @property def failing_steps(self) -> list[str]: """Noms des étapes ayant levé une erreur.""" return [s.step_name for s in self.steps if s.error is not None] def junction_metrics_for( self, artifact_type: ArtifactType, ) -> Optional[dict[str, Any]]: """Retourne les métriques de la **dernière** étape qui a produit ``artifact_type``, ou ``None`` si aucune étape ne l'a produit avec succès. Utile pour comparer plusieurs pipelines qui produisent in fine le même type (ex. deux DAG aboutissant à du texte corrigé). """ for step in reversed(self.steps): if step.error is not None: continue metrics = step.junction_metrics.get(artifact_type.value) if metrics is not None: return metrics return None # ────────────────────────────────────────────────────────────────────────── # Exécuteur # ────────────────────────────────────────────────────────────────────────── class PipelineRunner: """Exécute une ``PipelineSpec`` sur un document. Sprint 63 — un seul document à la fois. L'orchestration corpus-wide et l'agrégation par pipeline sont reportées à un sprint dédié. Usage typique ------------- >>> spec = PipelineSpec( ... name="ocr_then_rewrite", ... steps=[ ... PipelineStep("ocr", my_ocr_module), ... PipelineStep("rewrite", my_llm_rewriter), ... ], ... ) >>> runner = PipelineRunner() >>> result = runner.run(spec, document, {ArtifactType.IMAGE: "/path/img.png"}) >>> result.succeeded True >>> result.junction_metrics_for(ArtifactType.TEXT) {'cer': 0.05, 'wer': 0.12, ...} """ @staticmethod def run( spec: PipelineSpec, document: Document, initial_inputs: dict[ArtifactType, Any], ) -> PipelineResult: """Exécute ``spec`` sur ``document`` à partir de ``initial_inputs``. Parameters ---------- spec: Spécification de la pipeline. document: Document du corpus, porteur de zéro ou plusieurs niveaux de GT (Sprint 32). initial_inputs: Artefacts initiaux par type — typiquement ``{ArtifactType.IMAGE: "/path/img.png"}`` pour une pipeline qui démarre par un OCR. Returns ------- PipelineResult Résultat complet : durée totale, résultat par étape, métriques aux jonctions évaluées contre la GT. """ result = PipelineResult( pipeline_name=spec.name, doc_id=document.doc_id, ) # Validation amont : si la pipeline est statiquement # invalide, on n'exécute aucune étape. problems = spec.validate(tuple(initial_inputs.keys())) if problems: result.error = " ; ".join(problems) return result # Sprint 66 — bag versionné : ``versioned[(type, src_step)]`` # contient l'artefact produit par ``src_step`` pour ``type``. # ``src_step`` vaut ``"__initial__"`` pour les entrées # initiales fournies par l'utilisateur. ``latest[type]`` # désigne le nom de l'étape qui a produit la version la plus # récente du type — utilisé en l'absence d'``inputs_from`` # explicite (rétrocompat Sprint 63). versioned: dict[tuple[ArtifactType, str], Any] = { (t, "__initial__"): v for t, v in initial_inputs.items() } latest: dict[ArtifactType, str] = { t: "__initial__" for t in initial_inputs } pipeline_t0 = time.monotonic() for step in spec.steps: step_result = PipelineRunner._run_step( step, versioned, latest, document, ) result.steps.append(step_result) result.total_duration_seconds = time.monotonic() - pipeline_t0 return result @staticmethod def _run_step( step: PipelineStep, versioned: dict[tuple[ArtifactType, str], Any], latest: dict[ArtifactType, str], document: Document, ) -> StepResult: # Sprint 66 — résolution des entrées : pour chaque type # demandé, on consulte ``inputs_from`` ; sinon on prend la # dernière version disponible (rétrocompat Sprint 63). resolved: dict[ArtifactType, Any] = {} missing: list[str] = [] for t in step.input_types: src = step.inputs_from.get(t, latest.get(t)) if src is None: missing.append(t.value) continue key = (t, src) if key not in versioned: # Référence explicite vers une étape qui n'a pas # produit cet artefact (ex. l'étape source a échoué). missing.append(f"{t.value}@{src}") continue resolved[t] = versioned[key] if missing: miss_str = ",".join(missing) return StepResult( step_name=step.name, duration_seconds=0.0, output_types=(), error=f"entrée manquante : {miss_str}", ) inputs_for_module = resolved # Exécution chronométrée t0 = time.monotonic() try: outputs = step.module.process(inputs_for_module) except Exception as exc: # noqa: BLE001 duration = time.monotonic() - t0 logger.warning( "[pipeline_runner] étape '%s' a levé : %s", step.name, exc, ) return StepResult( step_name=step.name, duration_seconds=duration, output_types=(), error=f"{type(exc).__name__}: {exc}", ) duration = time.monotonic() - t0 # Validation des sorties : le module est censé déclarer ses # output_types, on vérifie qu'il les a tous produits. Si # ce n'est pas le cas, on remonte une erreur explicite mais # on conserve les sorties effectivement présentes (utile # pour le diagnostic). if not isinstance(outputs, dict): return StepResult( step_name=step.name, duration_seconds=duration, output_types=(), error=( f"le module a retourné {type(outputs).__name__}, " f"un dict[ArtifactType, Any] est attendu" ), ) produced = tuple(t for t in step.output_types if t in outputs) missing_outputs = [t for t in step.output_types if t not in outputs] error: Optional[str] = None if missing_outputs: miss_str = ",".join(t.value for t in missing_outputs) error = f"sortie manquante : {miss_str}" # Mise à jour du bag versionné : on stocke la sortie sous # une clé (type, step.name) ET on met à jour ``latest`` pour # que les étapes suivantes la récupèrent par défaut. for t in produced: versioned[(t, step.name)] = outputs[t] latest[t] = step.name # Évaluation aux jonctions : pour chaque type produit, si # la GT du même niveau existe, on calcule les métriques. junction_metrics: dict[str, dict[str, Any]] = {} for at in produced: gt_level = _artifact_type_to_gt_level(at) if gt_level is None: continue gt_payload = document.get_gt(gt_level) if gt_payload is None: continue try: metrics = compute_at_junction( _gt_payload_to_value(gt_payload), outputs[at], (at, at), ) except Exception as exc: # noqa: BLE001 logger.warning( "[pipeline_runner] évaluation à la jonction %s " "a levé : %s", at.value, exc, ) continue if metrics: junction_metrics[at.value] = metrics return StepResult( step_name=step.name, duration_seconds=duration, output_types=produced, junction_metrics=junction_metrics, error=error, ) def _gt_payload_to_value(payload: Any) -> Any: """Extrait la valeur exploitable d'un ``GTPayload`` typé. Pour ``TextGT`` on veut juste la chaîne ; pour les autres payloads on retourne le payload entier (la métrique sait quoi en faire selon sa signature de types). """ # Import paresseux pour éviter une dépendance cyclique from picarones.core.corpus import ( AltoGT, EntitiesGT, PageGT, ReadingOrderGT, TextGT, ) if isinstance(payload, TextGT): return payload.text if isinstance(payload, EntitiesGT): return payload.entities if isinstance(payload, ReadingOrderGT): return payload.region_order if isinstance(payload, (AltoGT, PageGT)): return payload return payload __all__ = [ "PipelineRunner", "PipelineResult", "PipelineSpec", "PipelineStep", "StepResult", ]