Spaces:
Sleeping
Sleeping
Claude
chantier1: reconstructeur ALTO de référence + refonte BaseOCREngine via hooks
ceb4ba7 unverified | """Banc d'essai de pipelines composées — Sprint 63 (axe B). | |
| Sprint 63 — Étape 4 / axe B du plan d'évolution 2026 : démarrage du | |
| banc d'essai de pipelines. | |
| Philosophie | |
| ----------- | |
| Picarones est un **banc d'essai**, pas un atelier de production. | |
| Cette infrastructure permet d'**évaluer des pipelines composées de | |
| modules tiers** que l'utilisateur amène — par exemple : | |
| - ``[OCR(image→texte)] → [reconstructeur ALTO tiers(texte→ALTO)]`` | |
| - ``[VLM(image→ALTO)] → [post-processing tiers(ALTO→ALTO)]`` | |
| - ``[OCR(image→texte)] → [LLM correcteur(texte→texte)]`` | |
| Picarones **ne fournit aucun module métier** (pas de | |
| reconstructeur ALTO, pas de correcteur, pas de re-segmenteur). | |
| L'utilisateur branche ses propres ``BaseModule`` (Sprint 33), le | |
| runner orchestre l'exécution séquentielle, valide les types aux | |
| jonctions et **évalue automatiquement** chaque artefact produit | |
| contre la GT du même niveau (Sprint 32) en sélectionnant les | |
| métriques pertinentes du registre typé (Sprint 34). | |
| Périmètre Sprint 63 | |
| ------------------- | |
| Inclus : | |
| - Spécification déclarative d'une pipeline séquentielle. | |
| - Exécution sur un seul document avec passage typé d'artefacts. | |
| - Validation des types aux jonctions inter-modules. | |
| - Évaluation automatique aux jonctions GT-vs-sortie pour chaque | |
| niveau de GT disponible sur le document. | |
| - Mesure du temps par étape. | |
| - Capture gracieuse des erreurs (un module qui lève n'arrête pas | |
| les étapes suivantes — leur entrée manquante est rapportée | |
| comme erreur explicite). | |
| Reporté à des sprints dédiés : | |
| - DAG branchant non séquentiel (1 → {2, 3} → 4) — Sprint 64+. | |
| - Orchestration corpus-wide + agrégation par pipeline — Sprint 65+. | |
| - Vue HTML dédiée aux pipelines composées — Sprint 66+. | |
| - Cache d'artefacts intermédiaires — non prévu. | |
| - Parallélisation inter-étapes — non prévue (les modules | |
| ``execution_mode`` sont déjà respectés par le runner historique | |
| pour le bench OCR mono-étage). | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Any, Optional | |
| from picarones.core.corpus import Document, GTLevel | |
| from picarones.core.metric_registry import compute_at_junction | |
| from picarones.core.modules import ArtifactType, BaseModule | |
| # Eager-load des modules qui enregistrent des métriques dans le | |
| # registre typé (Sprint 34) — sans ces imports, ``compute_at_junction`` | |
| # trouverait un registre vide et ne calculerait rien aux jonctions. | |
| # Sprint 34 : cer / wer / mer / wil + stub TEXT→ALTO | |
| import picarones.core.builtin_metrics # noqa: F401 | |
| # Sprints 55-60 : métriques philologiques. | |
| import picarones.core.unicode_blocks # noqa: F401 | |
| import picarones.core.abbreviations # noqa: F401 | |
| import picarones.core.mufi # noqa: F401 | |
| import picarones.core.early_modern_typography # noqa: F401 | |
| import picarones.core.modern_archives # noqa: F401 | |
| import picarones.core.roman_numerals # noqa: F401 | |
| # Sprint 53 : reading order F1. Sprints 38, 52 : NER, readability. | |
| import picarones.core.reading_order # noqa: F401 | |
| import picarones.core.readability # noqa: F401 | |
| import picarones.core.ner # noqa: F401 | |
| # Chantier 1 (post-Sprint 97) : métriques (ALTO, ALTO) pour évaluer | |
| # les reconstructeurs ALTO contre une GT ALTO du document. | |
| import picarones.core.alto_metrics # noqa: F401 | |
| logger = logging.getLogger(__name__) | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # Conversion ArtifactType <-> GTLevel | |
| # ────────────────────────────────────────────────────────────────────────── | |
| def _artifact_type_to_gt_level(at: ArtifactType) -> Optional[GTLevel]: | |
| """Retourne le ``GTLevel`` correspondant à un ``ArtifactType``. | |
| ``IMAGE`` n'a pas de correspondance GT (on n'évalue pas une | |
| image en sortie d'un module — c'est typiquement une entrée). | |
| """ | |
| if at == ArtifactType.IMAGE: | |
| return None | |
| try: | |
| return GTLevel(at.value) | |
| except ValueError: | |
| return None | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # PipelineStep + PipelineSpec | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class PipelineStep: | |
| """Une étape dans une pipeline composée. | |
| L'étape porte un nom lisible (utile pour le rapport et le | |
| diagnostic) et une instance de ``BaseModule`` fournie par | |
| l'utilisateur. Les types d'entrée et de sortie ne sont pas | |
| redéclarés ici : ils sont lus depuis le module lui-même | |
| (``module.input_types`` / ``module.output_types``). | |
| Sprint 66 — DAG branchant | |
| ------------------------- | |
| ``inputs_from`` permet de désigner explicitement, pour chaque | |
| type d'entrée, l'étape source dont on veut consommer l'artefact. | |
| Utile quand plusieurs étapes antérieures produisent le même | |
| type et qu'on veut éviter l'écrasement implicite (par exemple | |
| deux correcteurs LLM en parallèle qui partent du même OCR). | |
| - ``inputs_from = {}`` (défaut) : pour chaque type d'entrée, | |
| le runner prend la version **la plus récente** disponible | |
| dans le bag (comportement Sprint 63, rétrocompat stricte). | |
| - ``inputs_from = {ArtifactType.TEXT: "ocr"}`` : exige la | |
| version du ``TEXT`` produite par l'étape nommée ``"ocr"``. | |
| Si cette étape n'existe pas ou n'a pas produit ce type, | |
| ``PipelineSpec.validate`` remonte un problème explicite et | |
| le runner remonte une erreur d'entrée manquante. | |
| La chaîne spéciale ``"__initial__"`` désigne les artefacts | |
| fournis dans ``initial_inputs`` (par exemple ``IMAGE``). | |
| """ | |
| name: str | |
| module: BaseModule | |
| inputs_from: dict[ArtifactType, str] = field(default_factory=dict) | |
| def input_types(self) -> tuple[ArtifactType, ...]: | |
| return tuple(self.module.input_types) | |
| def output_types(self) -> tuple[ArtifactType, ...]: | |
| return tuple(self.module.output_types) | |
| def __repr__(self) -> str: | |
| ins = ",".join(t.value for t in self.input_types) or "·" | |
| outs = ",".join(t.value for t in self.output_types) or "·" | |
| if self.inputs_from: | |
| refs = ",".join( | |
| f"{t.value}@{src}" for t, src in self.inputs_from.items() | |
| ) | |
| return f"PipelineStep({self.name}: [{refs}] → {outs})" | |
| return f"PipelineStep({self.name}: {ins} → {outs})" | |
| class PipelineSpec: | |
| """DAG séquentiel de ``PipelineStep``. | |
| Sprint 63 — séquentiel uniquement : l'étape ``i+1`` consomme | |
| les artefacts produits par l'étape ``i`` (et tous les artefacts | |
| initiaux fournis au runner, par exemple l'image source). | |
| Le DAG branchant arrive dans un sprint dédié. | |
| """ | |
| name: str | |
| steps: list[PipelineStep] = field(default_factory=list) | |
| def validate(self, initial_inputs: tuple[ArtifactType, ...]) -> list[str]: | |
| """Vérifie que les types s'enchaînent et retourne la liste | |
| des problèmes détectés (vide si la pipeline est valide). | |
| Une pipeline est valide si, pour chaque étape, tous les | |
| ``input_types`` sont disponibles : soit dans les | |
| ``initial_inputs`` (typiquement ``IMAGE``), soit produits | |
| par une étape antérieure. | |
| Sprint 66 — validation des références ``inputs_from`` : | |
| si une étape déclare ``inputs_from[type] = "foo"``, | |
| l'étape ``foo`` doit exister parmi les étapes antérieures | |
| et avoir ce type dans ses ``output_types``. La chaîne | |
| spéciale ``"__initial__"`` désigne les entrées initiales. | |
| """ | |
| problems: list[str] = [] | |
| if not self.steps: | |
| problems.append("pipeline vide : au moins une étape est requise") | |
| return problems | |
| # Map type → set des steps qui ont produit ce type | |
| # ("__initial__" pour les entrées initiales) — utilisé pour | |
| # valider les références ``inputs_from``. | |
| producers: dict[ArtifactType, set[str]] = { | |
| t: {"__initial__"} for t in initial_inputs | |
| } | |
| # Map step_name → set des types produits, pour la validation | |
| # des références. | |
| step_outputs: dict[str, set[ArtifactType]] = { | |
| "__initial__": set(initial_inputs), | |
| } | |
| # Set des types disponibles à un instant t (latest seulement). | |
| available: set[ArtifactType] = set(initial_inputs) | |
| for i, step in enumerate(self.steps): | |
| # 1. Toutes les entrées doivent être disponibles | |
| missing = [t for t in step.input_types if t not in available] | |
| if missing: | |
| miss_str = ",".join(t.value for t in missing) | |
| problems.append( | |
| f"étape {i} ({step.name}) demande {miss_str} " | |
| f"qui n'est ni dans les entrées initiales " | |
| f"ni produit par une étape antérieure" | |
| ) | |
| # 2. Vérification des références ``inputs_from`` | |
| for ref_type, ref_step in step.inputs_from.items(): | |
| if ref_type not in step.input_types: | |
| problems.append( | |
| f"étape {i} ({step.name}) déclare " | |
| f"inputs_from[{ref_type.value}]={ref_step!r} " | |
| f"mais le module ne consomme pas ce type" | |
| ) | |
| continue | |
| if ref_step not in step_outputs: | |
| problems.append( | |
| f"étape {i} ({step.name}) référence " | |
| f"inputs_from[{ref_type.value}]={ref_step!r} " | |
| f"qui n'est pas une étape antérieure connue" | |
| ) | |
| continue | |
| if ref_type not in step_outputs[ref_step]: | |
| problems.append( | |
| f"étape {i} ({step.name}) référence " | |
| f"inputs_from[{ref_type.value}]={ref_step!r} " | |
| f"mais cette étape ne produit pas ce type" | |
| ) | |
| # 3. Mise à jour pour les étapes suivantes | |
| available.update(step.output_types) | |
| step_outputs[step.name] = set(step.output_types) | |
| for out_type in step.output_types: | |
| producers.setdefault(out_type, set()).add(step.name) | |
| return problems | |
| def is_valid(self, initial_inputs: tuple[ArtifactType, ...]) -> bool: | |
| return not self.validate(initial_inputs) | |
| def __repr__(self) -> str: | |
| chain = " → ".join(str(s) for s in self.steps) | |
| return f"PipelineSpec({self.name}: {chain})" | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # StepResult + PipelineResult | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class StepResult: | |
| """Résultat de l'exécution d'une étape sur un document. | |
| Champs | |
| ------ | |
| step_name: | |
| Nom de l'étape (cf. ``PipelineStep.name``). | |
| duration_seconds: | |
| Temps d'exécution de ``module.process`` mesuré en wall-clock. | |
| output_types: | |
| Types effectivement présents dans la sortie (peut être un | |
| sous-ensemble de ``module.output_types`` si le module a | |
| omis un type — cas reporté ici comme info pour diagnostic). | |
| junction_metrics: | |
| Pour chaque type produit qui correspond à un ``GTLevel`` | |
| dont le document porte une GT : dictionnaire ``{type: dict | |
| métriques}`` retourné par ``compute_at_junction``. | |
| error: | |
| ``None`` si l'étape s'est bien déroulée ; sinon message | |
| d'erreur (le module a levé, l'entrée est manquante, ou la | |
| validation des types a échoué). | |
| """ | |
| step_name: str | |
| duration_seconds: float | |
| output_types: tuple[ArtifactType, ...] | |
| junction_metrics: dict[str, dict[str, Any]] = field(default_factory=dict) | |
| """Map ``{artifact_type_value: {metric_name: value}}``. | |
| La clé est la valeur string du ``ArtifactType`` (ex. ``"text"``, | |
| ``"alto"``) et non l'enum lui-même, pour faciliter la | |
| sérialisation JSON. | |
| """ | |
| error: Optional[str] = None | |
| class PipelineResult: | |
| """Résultat complet d'une exécution de pipeline sur un document. | |
| On capture la durée totale, la durée par étape et les | |
| métriques aux jonctions pour chaque artefact produit qui a une | |
| GT correspondante. | |
| """ | |
| pipeline_name: str | |
| doc_id: str | |
| steps: list[StepResult] = field(default_factory=list) | |
| total_duration_seconds: float = 0.0 | |
| error: Optional[str] = None | |
| """Erreur fatale au niveau pipeline (ex. validation des types | |
| en amont avant la première étape). ``None`` n'implique pas | |
| qu'aucune étape n'a échoué — voir ``StepResult.error`` pour le | |
| détail par étape.""" | |
| def succeeded(self) -> bool: | |
| """Vrai si la pipeline s'est exécutée jusqu'au bout sans | |
| qu'aucune étape ne lève d'erreur.""" | |
| if self.error is not None: | |
| return False | |
| return all(s.error is None for s in self.steps) | |
| def failing_steps(self) -> list[str]: | |
| """Noms des étapes ayant levé une erreur.""" | |
| return [s.step_name for s in self.steps if s.error is not None] | |
| def junction_metrics_for( | |
| self, artifact_type: ArtifactType, | |
| ) -> Optional[dict[str, Any]]: | |
| """Retourne les métriques de la **dernière** étape qui a | |
| produit ``artifact_type``, ou ``None`` si aucune étape ne | |
| l'a produit avec succès. | |
| Utile pour comparer plusieurs pipelines qui produisent in | |
| fine le même type (ex. deux DAG aboutissant à du texte | |
| corrigé). | |
| """ | |
| for step in reversed(self.steps): | |
| if step.error is not None: | |
| continue | |
| metrics = step.junction_metrics.get(artifact_type.value) | |
| if metrics is not None: | |
| return metrics | |
| return None | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # Exécuteur | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class PipelineRunner: | |
| """Exécute une ``PipelineSpec`` sur un document. | |
| Sprint 63 — un seul document à la fois. L'orchestration | |
| corpus-wide et l'agrégation par pipeline sont reportées à un | |
| sprint dédié. | |
| Usage typique | |
| ------------- | |
| >>> spec = PipelineSpec( | |
| ... name="ocr_then_rewrite", | |
| ... steps=[ | |
| ... PipelineStep("ocr", my_ocr_module), | |
| ... PipelineStep("rewrite", my_llm_rewriter), | |
| ... ], | |
| ... ) | |
| >>> runner = PipelineRunner() | |
| >>> result = runner.run(spec, document, {ArtifactType.IMAGE: "/path/img.png"}) | |
| >>> result.succeeded | |
| True | |
| >>> result.junction_metrics_for(ArtifactType.TEXT) | |
| {'cer': 0.05, 'wer': 0.12, ...} | |
| """ | |
| def run( | |
| spec: PipelineSpec, | |
| document: Document, | |
| initial_inputs: dict[ArtifactType, Any], | |
| ) -> PipelineResult: | |
| """Exécute ``spec`` sur ``document`` à partir de | |
| ``initial_inputs``. | |
| Parameters | |
| ---------- | |
| spec: | |
| Spécification de la pipeline. | |
| document: | |
| Document du corpus, porteur de zéro ou plusieurs niveaux | |
| de GT (Sprint 32). | |
| initial_inputs: | |
| Artefacts initiaux par type — typiquement | |
| ``{ArtifactType.IMAGE: "/path/img.png"}`` pour une | |
| pipeline qui démarre par un OCR. | |
| Returns | |
| ------- | |
| PipelineResult | |
| Résultat complet : durée totale, résultat par étape, | |
| métriques aux jonctions évaluées contre la GT. | |
| """ | |
| result = PipelineResult( | |
| pipeline_name=spec.name, doc_id=document.doc_id, | |
| ) | |
| # Validation amont : si la pipeline est statiquement | |
| # invalide, on n'exécute aucune étape. | |
| problems = spec.validate(tuple(initial_inputs.keys())) | |
| if problems: | |
| result.error = " ; ".join(problems) | |
| return result | |
| # Sprint 66 — bag versionné : ``versioned[(type, src_step)]`` | |
| # contient l'artefact produit par ``src_step`` pour ``type``. | |
| # ``src_step`` vaut ``"__initial__"`` pour les entrées | |
| # initiales fournies par l'utilisateur. ``latest[type]`` | |
| # désigne le nom de l'étape qui a produit la version la plus | |
| # récente du type — utilisé en l'absence d'``inputs_from`` | |
| # explicite (rétrocompat Sprint 63). | |
| versioned: dict[tuple[ArtifactType, str], Any] = { | |
| (t, "__initial__"): v for t, v in initial_inputs.items() | |
| } | |
| latest: dict[ArtifactType, str] = { | |
| t: "__initial__" for t in initial_inputs | |
| } | |
| pipeline_t0 = time.monotonic() | |
| for step in spec.steps: | |
| step_result = PipelineRunner._run_step( | |
| step, versioned, latest, document, | |
| ) | |
| result.steps.append(step_result) | |
| result.total_duration_seconds = time.monotonic() - pipeline_t0 | |
| return result | |
| def _run_step( | |
| step: PipelineStep, | |
| versioned: dict[tuple[ArtifactType, str], Any], | |
| latest: dict[ArtifactType, str], | |
| document: Document, | |
| ) -> StepResult: | |
| # Sprint 66 — résolution des entrées : pour chaque type | |
| # demandé, on consulte ``inputs_from`` ; sinon on prend la | |
| # dernière version disponible (rétrocompat Sprint 63). | |
| resolved: dict[ArtifactType, Any] = {} | |
| missing: list[str] = [] | |
| for t in step.input_types: | |
| src = step.inputs_from.get(t, latest.get(t)) | |
| if src is None: | |
| missing.append(t.value) | |
| continue | |
| key = (t, src) | |
| if key not in versioned: | |
| # Référence explicite vers une étape qui n'a pas | |
| # produit cet artefact (ex. l'étape source a échoué). | |
| missing.append(f"{t.value}@{src}") | |
| continue | |
| resolved[t] = versioned[key] | |
| if missing: | |
| miss_str = ",".join(missing) | |
| return StepResult( | |
| step_name=step.name, | |
| duration_seconds=0.0, | |
| output_types=(), | |
| error=f"entrée manquante : {miss_str}", | |
| ) | |
| inputs_for_module = resolved | |
| # Exécution chronométrée | |
| t0 = time.monotonic() | |
| try: | |
| outputs = step.module.process(inputs_for_module) | |
| except Exception as exc: # noqa: BLE001 | |
| duration = time.monotonic() - t0 | |
| logger.warning( | |
| "[pipeline_runner] étape '%s' a levé : %s", | |
| step.name, exc, | |
| ) | |
| return StepResult( | |
| step_name=step.name, | |
| duration_seconds=duration, | |
| output_types=(), | |
| error=f"{type(exc).__name__}: {exc}", | |
| ) | |
| duration = time.monotonic() - t0 | |
| # Validation des sorties : le module est censé déclarer ses | |
| # output_types, on vérifie qu'il les a tous produits. Si | |
| # ce n'est pas le cas, on remonte une erreur explicite mais | |
| # on conserve les sorties effectivement présentes (utile | |
| # pour le diagnostic). | |
| if not isinstance(outputs, dict): | |
| return StepResult( | |
| step_name=step.name, | |
| duration_seconds=duration, | |
| output_types=(), | |
| error=( | |
| f"le module a retourné {type(outputs).__name__}, " | |
| f"un dict[ArtifactType, Any] est attendu" | |
| ), | |
| ) | |
| produced = tuple(t for t in step.output_types if t in outputs) | |
| missing_outputs = [t for t in step.output_types if t not in outputs] | |
| error: Optional[str] = None | |
| if missing_outputs: | |
| miss_str = ",".join(t.value for t in missing_outputs) | |
| error = f"sortie manquante : {miss_str}" | |
| # Mise à jour du bag versionné : on stocke la sortie sous | |
| # une clé (type, step.name) ET on met à jour ``latest`` pour | |
| # que les étapes suivantes la récupèrent par défaut. | |
| for t in produced: | |
| versioned[(t, step.name)] = outputs[t] | |
| latest[t] = step.name | |
| # Évaluation aux jonctions : pour chaque type produit, si | |
| # la GT du même niveau existe, on calcule les métriques. | |
| junction_metrics: dict[str, dict[str, Any]] = {} | |
| for at in produced: | |
| gt_level = _artifact_type_to_gt_level(at) | |
| if gt_level is None: | |
| continue | |
| gt_payload = document.get_gt(gt_level) | |
| if gt_payload is None: | |
| continue | |
| try: | |
| metrics = compute_at_junction( | |
| _gt_payload_to_value(gt_payload), | |
| outputs[at], | |
| (at, at), | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning( | |
| "[pipeline_runner] évaluation à la jonction %s " | |
| "a levé : %s", | |
| at.value, exc, | |
| ) | |
| continue | |
| if metrics: | |
| junction_metrics[at.value] = metrics | |
| return StepResult( | |
| step_name=step.name, | |
| duration_seconds=duration, | |
| output_types=produced, | |
| junction_metrics=junction_metrics, | |
| error=error, | |
| ) | |
| def _gt_payload_to_value(payload: Any) -> Any: | |
| """Extrait la valeur exploitable d'un ``GTPayload`` typé. | |
| Pour ``TextGT`` on veut juste la chaîne ; pour les autres | |
| payloads on retourne le payload entier (la métrique sait quoi | |
| en faire selon sa signature de types). | |
| """ | |
| # Import paresseux pour éviter une dépendance cyclique | |
| from picarones.core.corpus import ( | |
| AltoGT, EntitiesGT, PageGT, ReadingOrderGT, TextGT, | |
| ) | |
| if isinstance(payload, TextGT): | |
| return payload.text | |
| if isinstance(payload, EntitiesGT): | |
| return payload.entities | |
| if isinstance(payload, ReadingOrderGT): | |
| return payload.region_order | |
| if isinstance(payload, (AltoGT, PageGT)): | |
| return payload | |
| return payload | |
| __all__ = [ | |
| "PipelineRunner", | |
| "PipelineResult", | |
| "PipelineSpec", | |
| "PipelineStep", | |
| "StepResult", | |
| ] | |