Claude commited on
Commit
facd994
·
unverified ·
1 Parent(s): d43c941

feat(7.D)!: supprime les 5 modules pipeline/legacy_*.py

Browse files

BREAKING CHANGE — fin de la phase 7 du retrait du legacy.

Modules supprimés (~2000 LOC)
-----------------------------
- ``picarones/pipeline/legacy_runner.py`` (487 LOC)
+ ``PipelineRunner.run()``
+ ``PipelineSpec`` (dataclass legacy)
+ ``PipelineStep`` (dataclass legacy)
+ ``PipelineResult`` (dataclass legacy)
+ ``StepResult`` (dataclass legacy)
- ``picarones/pipeline/legacy_pipeline_benchmark.py`` (522 LOC)
+ ``run_pipeline_benchmark()``
+ ``PipelineBenchmarkResult``
+ ``StepAggregate``
- ``picarones/pipeline/legacy_pipeline_comparison.py`` (307 LOC)
+ ``compare_pipelines()``
+ ``PipelineComparisonResult``
- ``picarones/pipeline/_legacy_translator.py`` (377 LOC)
+ helpers de traduction ``execute_legacy_spec_via_canonical``
- ``picarones/pipeline/_legacy_module_adapter.py`` (302 LOC)
+ ``_BaseModuleAdapter``, ``_PayloadRegistry``

Mise à jour ``picarones/__init__.py``
-------------------------------------
Retrait des re-exports :
- ``PipelineResult``
- ``PipelineRunner``
- ``PipelineSpec``
- ``PipelineStep``
- ``StepResult``

Pour l'API canonique, importer explicitement :

.. code-block:: python

# avant (legacy supprimé)
from picarones import PipelineSpec, PipelineStep, PipelineRunner

# après (canonique)
from picarones.domain.pipeline_spec import PipelineSpec, PipelineStep
from picarones.pipeline.executor import PipelineExecutor
from picarones.pipeline.types import PipelineResult, StepResult

État du legacy pipeline
-----------------------
- Phase 7.A : engines/ + modules/ → adapters/legacy_* (terminée)
- Phase 7.B : pont BaseModule → StepExecutor (terminée)
- Phase 7.C : suppression des tests axe B legacy (PR #60)
- Phase 7.D : **CETTE PHASE** — suppression des modules legacy

``BaseModule`` reste dans ``picarones.domain.module_protocol`` car
``BaseOCREngine`` (``adapters/legacy_engines/base.py``) en hérite
encore — sera traité dans une phase ultérieure quand les engines
legacy seront migrés vers ``StepExecutor`` Protocol.

https://claude.ai/code/session_011XQZNitg1rCgia8ZD1a2hP

picarones/__init__.py CHANGED
@@ -69,13 +69,6 @@ from picarones.domain.facts import (
69
  FactImportance,
70
  FactType,
71
  )
72
- from picarones.pipeline.legacy_runner import (
73
- PipelineResult,
74
- PipelineRunner,
75
- PipelineSpec,
76
- PipelineStep,
77
- StepResult,
78
- )
79
  from picarones.evaluation.metric_registry import (
80
  MetricSpec,
81
  compute_at_junction,
@@ -118,12 +111,6 @@ __all__ = [
118
  "Fact",
119
  "FactImportance",
120
  "FactType",
121
- # Pipelines composées (axe B)
122
- "PipelineResult",
123
- "PipelineRunner",
124
- "PipelineSpec",
125
- "PipelineStep",
126
- "StepResult",
127
  # Registre de métriques typées
128
  "MetricSpec",
129
  "compute_at_junction",
 
69
  FactImportance,
70
  FactType,
71
  )
 
 
 
 
 
 
 
72
  from picarones.evaluation.metric_registry import (
73
  MetricSpec,
74
  compute_at_junction,
 
111
  "Fact",
112
  "FactImportance",
113
  "FactType",
 
 
 
 
 
 
114
  # Registre de métriques typées
115
  "MetricSpec",
116
  "compute_at_junction",
picarones/pipeline/_legacy_module_adapter.py DELETED
@@ -1,302 +0,0 @@
1
- """Adaptateur ``BaseModule`` → ``StepExecutor`` (Phase 7.B).
2
-
3
- Pont entre le contrat module legacy
4
- (:class:`picarones.domain.module_protocol.BaseModule`,
5
- ``process(dict[ArtifactType, payload]) → dict[ArtifactType, payload]``)
6
- et le contrat canonique
7
- (:class:`picarones.pipeline.protocols.StepExecutor`,
8
- ``execute(dict[ArtifactType, Artifact], params, context)
9
- → dict[ArtifactType, Artifact]``).
10
-
11
- Pourquoi ce module
12
- ------------------
13
- Sub-phase 7.B du plan de convergence
14
- (``docs/migration/pipeline-convergence-plan.md``) : on fait
15
- consommer en interne le ``PipelineExecutor`` canonique par le
16
- ``PipelineRunner`` legacy. Cela élimine la duplication de
17
- moteur d'exécution (1 seul code path à maintenir) tout en
18
- préservant l'API legacy ``BaseModule`` pour les modules qui en
19
- hériteraient encore.
20
-
21
- Le wrapper est **interne au module** : aucun caller production
22
- ne devrait importer ``_BaseModuleAdapter``. Les modules tiers
23
- qui contribuent à un benchmark composé continuent d'écrire des
24
- sous-classes de ``BaseModule`` ; le wrapper fait l'adaptation
25
- au moment de l'exécution.
26
-
27
- Sémantique des payloads
28
- -----------------------
29
- Les modules ``BaseModule`` historiques travaillent avec des
30
- **payloads bruts** :
31
-
32
- - ``ArtifactType.IMAGE`` → ``str`` (chemin filesystem)
33
- - ``ArtifactType.RAW_TEXT`` / ``ArtifactType.CORRECTED_TEXT`` → ``str`` (texte inline)
34
- - ``ArtifactType.ALTO_XML`` / ``ArtifactType.PAGE_XML`` → ``str`` (XML inline)
35
- - ``ArtifactType.ENTITIES`` → ``list[dict]``
36
- - ``ArtifactType.READING_ORDER`` → ``list[str]``
37
-
38
- Le canonique ``Artifact`` Pydantic immutable n'a pas de champ
39
- ``content`` direct — le contenu se lit via ``uri``. Le wrapper
40
- résout cette incompatibilité via un **registre d'inline
41
- payloads** in-process : chaque ``Artifact`` produit a un ``id``
42
- unique, et le registre map ``id → payload`` pour la durée d'un
43
- run.
44
-
45
- Cela évite l'I/O disque pour chaque step (qui pollue le wall-
46
- clock du chronométrage et pose des problèmes de cleanup en
47
- test). Trade-off : le wrapper ne fonctionne qu'**en
48
- mono-process**. La parallélisation inter-document via
49
- ``ProcessPoolExecutor`` (encore inutilisée par
50
- ``PipelineRunner``) requerrait une autre stratégie (URI
51
- ``data:``, sérialisation Pickle des payloads, etc.).
52
-
53
- Anti-sur-ingénierie
54
- -------------------
55
- - Pas de cache d'artefacts (le registre est purement transient).
56
- - Pas de provenance détaillée (les ``Artifact`` produits ont
57
- ``provenance=None`` ; le legacy ``PipelineRunner`` ne portait
58
- pas cette info).
59
- - Pas de garantie inter-process (cf. trade-off ci-dessus).
60
- """
61
-
62
- from __future__ import annotations
63
-
64
- import logging
65
- from typing import Any
66
-
67
- from picarones.domain.artifacts import Artifact, ArtifactType
68
- from picarones.domain.module_protocol import BaseModule, ExecutionMode
69
- from picarones.pipeline.types import RunContext
70
-
71
- logger = logging.getLogger(__name__)
72
-
73
-
74
- class _PayloadRegistry:
75
- """Registre in-process ``Artifact.id → payload``.
76
-
77
- Utilisé par :class:`_BaseModuleAdapter` pour matérialiser
78
- inline-payload ↔ ``Artifact`` sans I/O disque.
79
-
80
- Une instance par run de pipeline mono-document. Le
81
- ``PipelineRunner`` qui consomme cet adapter est responsable
82
- d'instancier un registre par appel ``run()``.
83
- """
84
-
85
- def __init__(self) -> None:
86
- self._payloads: dict[str, Any] = {}
87
-
88
- def store(self, artifact_id: str, payload: Any) -> None:
89
- """Enregistre un payload inline sous ``artifact_id``."""
90
- self._payloads[artifact_id] = payload
91
-
92
- def get(self, artifact_id: str) -> Any:
93
- """Retourne le payload enregistré ou lève ``KeyError``."""
94
- if artifact_id not in self._payloads:
95
- raise KeyError(
96
- f"Payload introuvable pour artifact_id={artifact_id!r}. "
97
- "Le registre attend que tous les Artifacts produits par "
98
- "une étape soient enregistrés en parallèle.",
99
- )
100
- return self._payloads[artifact_id]
101
-
102
- def __contains__(self, artifact_id: str) -> bool:
103
- return artifact_id in self._payloads
104
-
105
- def clear(self) -> None:
106
- """Vide le registre. À appeler entre deux runs."""
107
- self._payloads.clear()
108
-
109
-
110
- class _BaseModuleAdapter:
111
- """Wrappe un :class:`BaseModule` pour satisfaire le Protocol
112
- :class:`StepExecutor`.
113
-
114
- Le wrapper expose les attributs du module legacy
115
- (``name``, ``input_types``, ``output_types``,
116
- ``execution_mode``) et implémente ``execute()`` qui :
117
-
118
- 1. Extrait les payloads des ``Artifact`` d'entrée via le
119
- registre (ou via ``artifact.uri`` pour les types
120
- file-based).
121
- 2. Invoque ``module.process(payloads)``.
122
- 3. Wrappe chaque payload de sortie dans un ``Artifact``
123
- (avec ``id`` dérivé de ``context.document_id`` + nom
124
- du module + type).
125
- 4. Enregistre le payload de sortie dans le registre pour
126
- qu'une étape downstream puisse le consommer.
127
- """
128
-
129
- #: Types pour lesquels ``Artifact.uri`` porte directement la
130
- #: valeur attendue par le ``BaseModule`` historique (chemin
131
- #: filesystem). Pour les autres types, on passe par le
132
- #: registre.
133
- _URI_BACKED_TYPES: frozenset[ArtifactType] = frozenset({
134
- ArtifactType.IMAGE,
135
- })
136
-
137
- def __init__(
138
- self,
139
- module: BaseModule,
140
- registry: _PayloadRegistry,
141
- ) -> None:
142
- self._module = module
143
- self._registry = registry
144
-
145
- @property
146
- def name(self) -> str:
147
- return self._module.name
148
-
149
- @property
150
- def input_types(self) -> frozenset[ArtifactType]:
151
- return frozenset(self._module.input_types)
152
-
153
- @property
154
- def output_types(self) -> frozenset[ArtifactType]:
155
- return frozenset(self._module.output_types)
156
-
157
- @property
158
- def execution_mode(self) -> ExecutionMode:
159
- # Mypy ne sait pas que le legacy ``BaseModule.execution_mode``
160
- # est typé ``Literal["io", "cpu"]`` — on coerce.
161
- return self._module.execution_mode # type: ignore[return-value]
162
-
163
- def execute(
164
- self,
165
- inputs: dict[ArtifactType, Artifact],
166
- params: dict[str, Any],
167
- context: RunContext,
168
- ) -> dict[ArtifactType, Artifact]:
169
- """Convertit ``inputs``/``outputs`` entre les deux contrats.
170
-
171
- Parameters
172
- ----------
173
- inputs:
174
- Map ``ArtifactType → Artifact`` fournie par le
175
- ``PipelineExecutor`` canonique.
176
- params:
177
- Paramètres du step. Le wrapper les ignore (le legacy
178
- ``BaseModule.process`` ne prend pas de params — ils
179
- sont configurés via le constructeur du module).
180
- context:
181
- ``RunContext`` du run en cours.
182
-
183
- Returns
184
- -------
185
- dict[ArtifactType, Artifact]
186
- Outputs sous forme ``Artifact`` typés. Les payloads
187
- inline sont enregistrés dans ``self._registry`` pour
188
- consommation par les étapes downstream.
189
- """
190
- # 1. Extraire les payloads des Artifacts d'entrée
191
- payloads: dict[ArtifactType, Any] = {}
192
- for at, artifact in inputs.items():
193
- if at in self._URI_BACKED_TYPES:
194
- # IMAGE : le module attend un chemin string
195
- payloads[at] = artifact.uri or ""
196
- else:
197
- # Autres types : payload inline via registre
198
- if artifact.id in self._registry:
199
- payloads[at] = self._registry.get(artifact.id)
200
- elif artifact.uri:
201
- # Fallback : artefact registré ailleurs avec uri
202
- # filesystem — on lit le contenu textuel.
203
- from pathlib import Path
204
- payloads[at] = Path(artifact.uri).read_text(
205
- encoding="utf-8",
206
- )
207
- else:
208
- raise KeyError(
209
- f"Artifact {artifact.id!r} (type={at.value}) sans "
210
- f"payload disponible : ni dans le registre, ni via uri."
211
- )
212
-
213
- # 2. Invoquer le module legacy
214
- outputs = self._module.process(payloads)
215
-
216
- # 3. Wrappe chaque output dans un Artifact + registre
217
- out_artifacts: dict[ArtifactType, Artifact] = {}
218
- for at, payload in outputs.items():
219
- artifact_id = self._build_artifact_id(context, at)
220
- self._registry.store(artifact_id, payload)
221
- artifact = Artifact(
222
- id=artifact_id,
223
- document_id=context.document_id,
224
- type=at,
225
- produced_by_step=self._module.name,
226
- # uri / content_hash / provenance sont None — le
227
- # legacy n'avait pas ces concepts.
228
- )
229
- out_artifacts[at] = artifact
230
- return out_artifacts
231
-
232
- def _build_artifact_id(
233
- self,
234
- context: RunContext,
235
- artifact_type: ArtifactType,
236
- ) -> str:
237
- """Construit un ``Artifact.id`` unique pour cette
238
- production.
239
-
240
- Format : ``<document_id>:<step_name>:<artifact_type>``.
241
- Cohérent avec la convention du wiring rewrite (cf.
242
- ``adapters/ocr/tesseract.py``).
243
- """
244
- return f"{context.document_id}:{self._module.name}:{artifact_type.value}"
245
-
246
-
247
- def wrap_initial_inputs(
248
- inputs: dict[ArtifactType, Any],
249
- registry: _PayloadRegistry,
250
- document_id: str,
251
- ) -> dict[ArtifactType, Artifact]:
252
- """Convertit les ``initial_inputs`` legacy en ``dict[ArtifactType, Artifact]``.
253
-
254
- Le ``PipelineRunner`` legacy accepte ``initial_inputs:
255
- dict[ArtifactType, Any]`` où chaque valeur est un payload
256
- brut (chemin pour IMAGE, texte inline pour TEXT, ...). Cette
257
- fonction les wrappe en ``Artifact`` typés et enregistre les
258
- payloads inline dans le registre.
259
-
260
- Parameters
261
- ----------
262
- inputs:
263
- Map legacy.
264
- registry:
265
- Registre de payloads (à utiliser dans le même run).
266
- document_id:
267
- ``DocumentRef.id`` du document. Sert à construire
268
- les ``Artifact.id`` initiaux.
269
-
270
- Returns
271
- -------
272
- dict[ArtifactType, Artifact]
273
- Inputs canoniques.
274
- """
275
- out: dict[ArtifactType, Artifact] = {}
276
- for at, payload in inputs.items():
277
- artifact_id = f"{document_id}:__initial__:{at.value}"
278
- if at == ArtifactType.IMAGE:
279
- # Chemin filesystem : ``uri`` direct
280
- artifact = Artifact(
281
- id=artifact_id,
282
- document_id=document_id,
283
- type=at,
284
- uri=str(payload) if payload else None,
285
- )
286
- else:
287
- # Payload inline : on enregistre + Artifact sans uri
288
- registry.store(artifact_id, payload)
289
- artifact = Artifact(
290
- id=artifact_id,
291
- document_id=document_id,
292
- type=at,
293
- )
294
- out[at] = artifact
295
- return out
296
-
297
-
298
- __all__ = [
299
- "_BaseModuleAdapter",
300
- "_PayloadRegistry",
301
- "wrap_initial_inputs",
302
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
picarones/pipeline/_legacy_translator.py DELETED
@@ -1,377 +0,0 @@
1
- """Pont legacy ↔ canonique — Phase 7.B.3.
2
-
3
- Helpers partagés entre :mod:`legacy_runner` (mono-document) et
4
- :mod:`legacy_pipeline_benchmark` (corpus-wide) pour exécuter une
5
- ``PipelineSpec`` legacy via le ``PipelineExecutor`` canonique
6
- :mod:`picarones.pipeline.executor` et reconstruire les types de
7
- retour legacy (``PipelineResult``, ``StepResult``, dataclasses du
8
- Sprint 63) attendus par les ~440 tests existants.
9
-
10
- Pourquoi ce module
11
- ------------------
12
- La sub-phase 7.B.2 avait introduit ces helpers en privé dans
13
- :mod:`legacy_runner`. La 7.B.3 doit faire que
14
- :mod:`legacy_pipeline_benchmark` exécute lui-même les pipelines via
15
- ``PipelineExecutor.run_plan`` (au lieu de transiter par
16
- ``PipelineRunner.run`` du legacy_runner) — pour ça, les helpers
17
- de traduction doivent être partageables.
18
-
19
- L'API publique de ce module est strictement interne au package
20
- ``picarones.pipeline`` et sera supprimée en sub-phase 7.D, en même
21
- temps que le runner legacy lui-même.
22
-
23
- Anti-sur-ingénierie
24
- -------------------
25
- - Pas de cache de plan (le ``PipelinePlanner`` est instanciable et
26
- léger — chaque appel re-plan).
27
- - Pas d'instance partagée d'``_PayloadRegistry`` entre documents :
28
- un registre par exécution de pipeline mono-doc, conforme au
29
- contrat de :class:`_BaseModuleAdapter`.
30
- - Pas de provenance détaillée (``Artifact.provenance=None``) — le
31
- legacy ne portait pas cette info.
32
- """
33
-
34
- from __future__ import annotations
35
-
36
- import logging
37
- from typing import Any, Optional, TYPE_CHECKING
38
-
39
- from picarones.domain.artifacts import ArtifactType
40
- from picarones.domain.documents import DocumentRef
41
- from picarones.domain.pipeline_spec import (
42
- PipelineSpec as _DomainPipelineSpec,
43
- PipelineStep as _DomainPipelineStep,
44
- )
45
- from picarones.evaluation.corpus import Document, GTLevel
46
- from picarones.evaluation.metric_registry import compute_at_junction
47
- from picarones.pipeline._legacy_module_adapter import (
48
- _BaseModuleAdapter,
49
- _PayloadRegistry,
50
- wrap_initial_inputs,
51
- )
52
- from picarones.pipeline.executor import PipelineExecutor
53
- from picarones.pipeline.types import (
54
- PipelineResult as _CanonicalPipelineResult,
55
- RunContext,
56
- StepResult as _CanonicalStepResult,
57
- )
58
-
59
- if TYPE_CHECKING:
60
- # Import paresseux pour éviter la dépendance cyclique
61
- # (legacy_runner importe ce module via les helpers,
62
- # ce module connaît ``PipelineSpec``/``PipelineStep`` legacy).
63
- from picarones.pipeline.legacy_runner import (
64
- PipelineResult as _LegacyPipelineResult,
65
- PipelineSpec as _LegacyPipelineSpec,
66
- PipelineStep as _LegacyPipelineStep,
67
- StepResult as _LegacyStepResult,
68
- )
69
-
70
- logger = logging.getLogger(__name__)
71
-
72
-
73
- # ──────────────────────────────────────────────────────────────────────────
74
- # Conversion ArtifactType <-> GTLevel
75
- # ──────────────────────────────────────────────────────────────────────────
76
-
77
-
78
- _ARTIFACT_TO_GT_LEVEL: dict[ArtifactType, GTLevel] = {
79
- ArtifactType.RAW_TEXT: GTLevel.TEXT,
80
- ArtifactType.CORRECTED_TEXT: GTLevel.TEXT,
81
- ArtifactType.ALTO_XML: GTLevel.ALTO,
82
- ArtifactType.PAGE_XML: GTLevel.PAGE,
83
- ArtifactType.ENTITIES: GTLevel.ENTITIES,
84
- ArtifactType.READING_ORDER: GTLevel.READING_ORDER,
85
- }
86
-
87
-
88
- def artifact_type_to_gt_level(at: ArtifactType) -> Optional[GTLevel]:
89
- """Retourne le ``GTLevel`` correspondant à un ``ArtifactType``.
90
-
91
- ``IMAGE`` et les types pré-pipeline (``CONFIDENCES``, ``ALIGNMENT``,
92
- ``CANONICAL_DOCUMENT``) n'ont pas de niveau de GT direct.
93
- """
94
- return _ARTIFACT_TO_GT_LEVEL.get(at)
95
-
96
-
97
- def gt_payload_to_value(payload: Any) -> Any:
98
- """Extrait la valeur exploitable d'un ``GTPayload`` typé.
99
-
100
- Pour ``TextGT`` on veut juste la chaîne ; pour les autres
101
- payloads on retourne le payload entier (la métrique sait quoi
102
- en faire selon sa signature de types).
103
- """
104
- from picarones.evaluation.corpus import (
105
- AltoGT, EntitiesGT, PageGT, ReadingOrderGT, TextGT,
106
- )
107
- if isinstance(payload, TextGT):
108
- return payload.text
109
- if isinstance(payload, EntitiesGT):
110
- return payload.entities
111
- if isinstance(payload, ReadingOrderGT):
112
- return payload.region_order
113
- if isinstance(payload, (AltoGT, PageGT)):
114
- return payload
115
- return payload
116
-
117
-
118
- # ──────────────────────────────────────────────────────────────────────────
119
- # Conversion spec legacy → spec canonique
120
- # ─────────────────────────────────────────────────────────────���────────────
121
-
122
-
123
- def legacy_spec_to_canonical_spec(
124
- legacy_spec: "_LegacyPipelineSpec",
125
- initial_input_types: tuple[ArtifactType, ...],
126
- ) -> tuple[_DomainPipelineSpec, dict[str, _BaseModuleAdapter]]:
127
- """Convertit une ``PipelineSpec`` legacy en ``domain.PipelineSpec``.
128
-
129
- Retourne aussi un dict ``{step.name: _BaseModuleAdapter sans
130
- registry}`` — l'appelant doit injecter un ``_PayloadRegistry``
131
- par exécution mono-document avant d'utiliser les adapters.
132
- """
133
- canonical_steps: list[_DomainPipelineStep] = []
134
- adapter_factories: dict[str, _BaseModuleAdapter] = {}
135
- for step in legacy_spec.steps:
136
- canonical_steps.append(
137
- _DomainPipelineStep(
138
- id=step.name,
139
- kind="legacy_module",
140
- adapter_name=step.name,
141
- input_types=tuple(step.input_types),
142
- output_types=tuple(step.output_types),
143
- inputs_from=dict(step.inputs_from),
144
- ),
145
- )
146
- # Note : on construit l'adapter **sans** registry — l'appelant
147
- # devra créer le registry et le passer au moment de l'usage.
148
- # On stocke l'instance pour le mapping ; le registry lié à
149
- # cette instance reste à fournir.
150
- adapter_factories[step.name] = step.module # type: ignore[assignment]
151
- canonical_spec = _DomainPipelineSpec(
152
- name=legacy_spec.name,
153
- initial_inputs=initial_input_types,
154
- steps=tuple(canonical_steps),
155
- )
156
- return canonical_spec, adapter_factories
157
-
158
-
159
- def build_adapter_resolver(
160
- legacy_spec: "_LegacyPipelineSpec",
161
- registry: _PayloadRegistry,
162
- ):
163
- """Construit un ``adapter_resolver`` pour ``PipelineExecutor``.
164
-
165
- Pour chaque step legacy, fabrique un ``_BaseModuleAdapter``
166
- lié au registre fourni. Le résolveur retourne l'adapter via
167
- ``__getitem__`` (lève ``KeyError`` si nom inconnu — ce qui est
168
- le comportement attendu par ``PipelineExecutor``).
169
- """
170
- adapter_map: dict[str, _BaseModuleAdapter] = {
171
- step.name: _BaseModuleAdapter(step.module, registry)
172
- for step in legacy_spec.steps
173
- }
174
- return adapter_map.__getitem__
175
-
176
-
177
- # ──────────────────────────────────────────────────────────────────────────
178
- # Exécution mono-document via le canonique
179
- # ──────────────────────────────────────────────────────────────────────────
180
-
181
-
182
- def execute_legacy_spec_via_canonical(
183
- legacy_spec: "_LegacyPipelineSpec",
184
- document: Document,
185
- initial_inputs: dict[ArtifactType, Any],
186
- ) -> tuple[_CanonicalPipelineResult, _PayloadRegistry]:
187
- """Exécute ``legacy_spec`` via :class:`PipelineExecutor`.
188
-
189
- Construit la ``domain.PipelineSpec`` canonique équivalente, un
190
- ``adapter_resolver`` ad-hoc qui mappe ``step.name →
191
- _BaseModuleAdapter``, et délègue à l'executor. Retourne le
192
- ``PipelineResult`` canonique + le registre de payloads (dont le
193
- caller a besoin pour reconstruire les ``junction_metrics`` du
194
- contrat legacy).
195
-
196
- Mono-document. Le caller corpus-wide
197
- (``legacy_pipeline_benchmark.run_pipeline_benchmark``) n'utilise
198
- PAS cette fonction : il a son propre flow qui plan une fois pour
199
- tout le corpus.
200
- """
201
- registry = _PayloadRegistry()
202
- canonical_inputs = wrap_initial_inputs(
203
- initial_inputs, registry, document.doc_id,
204
- )
205
-
206
- canonical_spec, _ = legacy_spec_to_canonical_spec(
207
- legacy_spec, tuple(initial_inputs.keys()),
208
- )
209
- resolver = build_adapter_resolver(legacy_spec, registry)
210
-
211
- document_ref = DocumentRef(id=document.doc_id)
212
- context = RunContext(
213
- document_id=document.doc_id,
214
- code_version="legacy_runner",
215
- pipeline_name=legacy_spec.name,
216
- )
217
- executor = PipelineExecutor(adapter_resolver=resolver)
218
- canonical_result = executor.run(
219
- canonical_spec, document_ref, canonical_inputs, context,
220
- )
221
- return canonical_result, registry
222
-
223
-
224
- # ──────────────────────────────────────────────────────────────────────────
225
- # Reconstruction des types legacy depuis le canonique
226
- # ──────────────────────────────────────────────────────────────────────────
227
-
228
-
229
- def translate_canonical_error(canonical_error: str | None) -> Optional[str]:
230
- """Traduit un message d'erreur canonique vers le format legacy.
231
-
232
- Le ``PipelineExecutor`` produit des messages structurés avec un
233
- préfixe (``adapter_raised:``, ``missing_input:``, ``missing_output:``,
234
- ``adapter_not_found:``). Les tests legacy s'attendent à des
235
- messages français du Sprint 63 — on convertit pour préserver
236
- rétrocompat strict tant que la sub-phase 7.C n'a pas migré les
237
- tests.
238
- """
239
- if canonical_error is None:
240
- return None
241
- if canonical_error.startswith("adapter_raised: "):
242
- return canonical_error[len("adapter_raised: "):]
243
- if canonical_error.startswith("missing_input: "):
244
- miss = canonical_error[len("missing_input: "):]
245
- return f"entrée manquante : {miss}"
246
- if canonical_error.startswith("missing_output: "):
247
- miss_repr = canonical_error[len("missing_output: "):]
248
- miss = miss_repr.strip("[]").replace("'", "").replace(" ", "")
249
- return f"sortie manquante : {miss}"
250
- if canonical_error.startswith("adapter_not_found: "):
251
- adapter = canonical_error[len("adapter_not_found: "):]
252
- return f"adapter introuvable : {adapter}"
253
- if canonical_error.startswith("adapter_resolver_failed: "):
254
- msg = canonical_error[len("adapter_resolver_failed: "):]
255
- return f"résolution adapter échouée : {msg}"
256
- return canonical_error
257
-
258
-
259
- def compute_junction_metrics_for_step(
260
- produced_at: list[ArtifactType],
261
- canonical_sr: _CanonicalStepResult,
262
- registry: _PayloadRegistry,
263
- document: Document,
264
- ) -> dict[str, dict[str, Any]]:
265
- """Calcule ``junction_metrics`` en post-traitant les outputs.
266
-
267
- Pour chaque ``ArtifactType`` produit, retrouve le payload via
268
- ``registry`` puis appelle
269
- ``compute_at_junction(gt, payload, (T, T))`` exactement comme le
270
- Sprint 63. Les exceptions par jonction sont logguées et la
271
- jonction est silencieusement ignorée — comportement historique.
272
- """
273
- junction_metrics: dict[str, dict[str, Any]] = {}
274
- for at in produced_at:
275
- gt_level = artifact_type_to_gt_level(at)
276
- if gt_level is None:
277
- continue
278
- gt_payload = document.get_gt(gt_level)
279
- if gt_payload is None:
280
- continue
281
- artifact_id = canonical_sr.produced_artifacts.get(at.value)
282
- if artifact_id is None or artifact_id not in registry:
283
- continue
284
- payload = registry.get(artifact_id)
285
- try:
286
- metrics = compute_at_junction(
287
- gt_payload_to_value(gt_payload),
288
- payload,
289
- (at, at),
290
- )
291
- except Exception as exc: # noqa: BLE001
292
- logger.warning(
293
- "[legacy_translator] évaluation à la jonction %s "
294
- "a levé : %s",
295
- at.value, exc,
296
- )
297
- continue
298
- if metrics:
299
- junction_metrics[at.value] = metrics
300
-
301
- # Phase 4-bis : double-clé pour rétrocompat.
302
- from picarones.domain.artifacts import expand_legacy_keys
303
- expand_legacy_keys(junction_metrics)
304
- return junction_metrics
305
-
306
-
307
- def build_legacy_step_result(
308
- legacy_step: "_LegacyPipelineStep",
309
- canonical_sr: _CanonicalStepResult,
310
- registry: _PayloadRegistry,
311
- document: Document,
312
- ) -> "_LegacyStepResult":
313
- """Reconstruit un ``StepResult`` legacy depuis le canonique."""
314
- from picarones.pipeline.legacy_runner import StepResult as _LegacyStepResult
315
-
316
- error = translate_canonical_error(canonical_sr.error)
317
-
318
- produced_at: list[ArtifactType] = []
319
- for type_value in canonical_sr.produced_artifacts:
320
- try:
321
- produced_at.append(ArtifactType(type_value))
322
- except ValueError:
323
- continue
324
-
325
- junction_metrics = compute_junction_metrics_for_step(
326
- produced_at, canonical_sr, registry, document,
327
- )
328
-
329
- return _LegacyStepResult(
330
- step_name=legacy_step.name,
331
- duration_seconds=canonical_sr.duration_seconds,
332
- output_types=tuple(produced_at),
333
- junction_metrics=junction_metrics,
334
- error=error,
335
- )
336
-
337
-
338
- def build_legacy_pipeline_result(
339
- legacy_spec: "_LegacyPipelineSpec",
340
- document: Document,
341
- canonical_result: _CanonicalPipelineResult,
342
- registry: _PayloadRegistry,
343
- ) -> "_LegacyPipelineResult":
344
- """Reconstruit un ``PipelineResult`` legacy complet depuis le canonique.
345
-
346
- Itère sur les paires (step legacy, step result canonique) et
347
- délègue à :func:`build_legacy_step_result` pour chaque.
348
- """
349
- from picarones.pipeline.legacy_runner import PipelineResult as _LegacyPipelineResult
350
-
351
- result = _LegacyPipelineResult(
352
- pipeline_name=legacy_spec.name,
353
- doc_id=document.doc_id,
354
- total_duration_seconds=canonical_result.duration_seconds,
355
- )
356
- for legacy_step, canonical_sr in zip(
357
- legacy_spec.steps, canonical_result.step_results,
358
- ):
359
- result.steps.append(
360
- build_legacy_step_result(
361
- legacy_step, canonical_sr, registry, document,
362
- ),
363
- )
364
- return result
365
-
366
-
367
- __all__ = [
368
- "artifact_type_to_gt_level",
369
- "build_adapter_resolver",
370
- "build_legacy_pipeline_result",
371
- "build_legacy_step_result",
372
- "compute_junction_metrics_for_step",
373
- "execute_legacy_spec_via_canonical",
374
- "gt_payload_to_value",
375
- "legacy_spec_to_canonical_spec",
376
- "translate_canonical_error",
377
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
picarones/pipeline/legacy_pipeline_benchmark.py DELETED
@@ -1,522 +0,0 @@
1
- """Orchestration corpus-wide d'une pipeline composée — Sprint 64
2
- (axe B).
3
-
4
- Phase 5.C.batch7 — module relocalisé depuis
5
- ``picarones.measurements.pipeline_benchmark`` vers
6
- ``picarones.evaluation.pipeline_benchmark``. Le chemin legacy
7
- reste disponible via un shim avec ``DeprecationWarning`` ;
8
- suppression prévue en 2.0.
9
-
10
- Phase 7.B.2 — module relocalisé une seconde fois
11
- ------------------------------------------------
12
- ``picarones.evaluation.pipeline_benchmark`` →
13
- ``picarones.pipeline.legacy_pipeline_benchmark``. Raison : ce module
14
- consomme le ``PipelineRunner`` legacy (et désormais directement le
15
- ``PipelineExecutor`` canonique en 7.B.3) — ces dépendances sortent
16
- de la couche ``evaluation/`` vers la couche ``pipeline/``, ce
17
- qu'interdit la règle d'architecture concentrique.
18
-
19
- Phase 7.B.3 — exécution via le canonique direct
20
- -----------------------------------------------
21
- Depuis 2026-05, ``run_pipeline_benchmark`` ne passe **plus** par
22
- ``PipelineRunner.run``. Il consomme directement
23
- :class:`picarones.pipeline.executor.PipelineExecutor` et reconstruit
24
- les ``PipelineResult`` legacy via les helpers de
25
- :mod:`picarones.pipeline._legacy_translator`. Bénéfice : la spec
26
- canonique est planifiée **une seule fois** pour tout le corpus
27
- (économie N-1 plans) et ce module n'a plus de dépendance d'API à
28
- ``PipelineRunner`` — débloque la suppression du runner legacy en
29
- sub-phase 7.D.
30
-
31
- Sprint 64 — Étape 4 / axe B du plan d'évolution 2026 : suite directe
32
- du Sprint 63. Le ``PipelineRunner`` exécute une pipeline sur **un**
33
- document ; ce module fournit l'orchestration sur un **corpus
34
- complet** et l'agrégation des résultats par étape.
35
-
36
- Philosophie inchangée
37
- ---------------------
38
- Picarones reste un **banc d'essai**. Aucun module métier n'est
39
- fourni — l'utilisateur amène ses propres ``BaseModule`` (Sprint 33).
40
- Cette infrastructure se contente d'orchestrer leur exécution sur un
41
- corpus, de mesurer le temps, de capturer les erreurs gracieusement,
42
- et d'agréger les métriques calculées aux jonctions GT-vs-sortie.
43
-
44
- Périmètre Sprint 64
45
- -------------------
46
- Inclus :
47
-
48
- - ``run_pipeline_benchmark(spec, corpus, initial_inputs_factory)``
49
- qui itère séquentiellement sur les documents.
50
- - Agrégation par étape : ``StepAggregate`` avec n_succeeded /
51
- n_failed, durées (total / mean / median), failing_doc_ids,
52
- métriques agrégées par type d'artefact (mean / median sur les
53
- métriques numériques uniquement), breakdown des types d'erreur.
54
- - ``PipelineBenchmarkResult`` : conteneur global avec liste des
55
- ``PipelineResult`` par doc + liste des ``StepAggregate``.
56
- - Helper ``default_initial_inputs`` qui couvre le cas standard
57
- ``IMAGE`` depuis ``Document.image_path``.
58
-
59
- Reporté à des sprints suivants :
60
-
61
- - Comparaison de N pipelines sur le même corpus (Sprint 65).
62
- - DAG branchant non séquentiel (Sprint 66).
63
- - Vue HTML dédiée aux pipelines composées (Sprint 67).
64
- - Parallélisation inter-documents (à arbitrer selon les besoins).
65
- """
66
-
67
- from __future__ import annotations
68
-
69
- import logging
70
- import statistics
71
- import time
72
- from dataclasses import dataclass, field
73
- from typing import Any, Callable, Optional
74
-
75
- from picarones.domain.artifacts import ArtifactType
76
- from picarones.domain.documents import DocumentRef
77
- from picarones.evaluation.corpus import Corpus, Document
78
- from picarones.pipeline._legacy_module_adapter import (
79
- _BaseModuleAdapter,
80
- _PayloadRegistry,
81
- wrap_initial_inputs,
82
- )
83
- from picarones.pipeline._legacy_translator import (
84
- build_legacy_pipeline_result,
85
- legacy_spec_to_canonical_spec,
86
- )
87
- from picarones.pipeline.executor import PipelineExecutor, PipelineSpecInvalid
88
- from picarones.pipeline.legacy_runner import PipelineResult, PipelineSpec
89
- from picarones.pipeline.planner import PipelinePlanner
90
- from picarones.pipeline.types import RunContext
91
-
92
- logger = logging.getLogger(__name__)
93
-
94
-
95
- # ──────────────────────────────────────────────────────────────────────────
96
- # Helpers : factory d'entrées initiales
97
- # ──────────────────────────────────────────────────────────────────────────
98
-
99
- InitialInputsFactory = Callable[[Document], dict[ArtifactType, Any]]
100
-
101
-
102
- def default_initial_inputs(document: Document) -> dict[ArtifactType, Any]:
103
- """Factory d'entrées initiales par défaut : couvre le cas
104
- « la pipeline démarre par un module qui consomme l'image ».
105
-
106
- Retourne ``{ArtifactType.IMAGE: document.image_path}`` si
107
- ``image_path`` est présent, sinon dict vide (la première étape
108
- devra alors signaler « entrée manquante »).
109
- """
110
- if document.image_path:
111
- return {ArtifactType.IMAGE: document.image_path}
112
- return {}
113
-
114
-
115
- # ──────────────────────────────────────────────────────────────────────────
116
- # Agrégats
117
- # ──────────────────────────────────────────────────────────────────────────
118
-
119
-
120
- @dataclass
121
- class StepAggregate:
122
- """Agrégat des résultats d'une étape sur tout le corpus.
123
-
124
- Champs
125
- ------
126
- step_name:
127
- Nom de l'étape (cf. ``PipelineStep.name``).
128
- n_docs:
129
- Nombre de documents pour lesquels l'étape a été tentée.
130
- n_succeeded:
131
- Nombre de documents pour lesquels l'étape s'est terminée
132
- sans erreur (``StepResult.error is None``).
133
- n_failed:
134
- Nombre de documents pour lesquels l'étape a renvoyé une
135
- erreur.
136
- duration_seconds_total / mean / median:
137
- Statistiques de durée sur les **étapes ayant réussi**
138
- uniquement (les étapes en erreur peuvent avoir une durée
139
- artificielle).
140
- failing_doc_ids:
141
- Liste des ``doc_id`` pour lesquels cette étape a échoué.
142
- junction_metrics:
143
- ``{artifact_type_value: {metric_name: {"mean": float,
144
- "median": float, "n": int}}}`` — agrégé sur les documents
145
- où la métrique a été calculée (n peut différer de
146
- ``n_succeeded`` si la GT du type n'est pas portée par tous
147
- les docs).
148
- error_breakdown:
149
- ``{type_d_erreur: count}`` où ``type_d_erreur`` est extrait
150
- en heuristique depuis le message (``"missing_input"``,
151
- ``"raised_exception"``, ``"missing_output"``,
152
- ``"other"``).
153
- """
154
-
155
- step_name: str
156
- n_docs: int = 0
157
- n_succeeded: int = 0
158
- n_failed: int = 0
159
- duration_seconds_total: float = 0.0
160
- duration_seconds_mean: float = 0.0
161
- duration_seconds_median: float = 0.0
162
- failing_doc_ids: list[str] = field(default_factory=list)
163
- junction_metrics: dict[str, dict[str, dict[str, float]]] = field(
164
- default_factory=dict,
165
- )
166
- error_breakdown: dict[str, int] = field(default_factory=dict)
167
-
168
- @property
169
- def success_rate(self) -> float:
170
- if self.n_docs == 0:
171
- return 0.0
172
- return self.n_succeeded / self.n_docs
173
-
174
-
175
- @dataclass
176
- class PipelineBenchmarkResult:
177
- """Résultat d'un benchmark de pipeline sur un corpus complet.
178
-
179
- On capture la durée totale, les résultats par document
180
- (utiles pour le rapport HTML par-doc des sprints suivants), et
181
- l'agrégation par étape.
182
- """
183
-
184
- pipeline_name: str
185
- corpus_name: str
186
- n_docs: int = 0
187
- per_doc_results: list[PipelineResult] = field(default_factory=list)
188
- per_step_aggregates: list[StepAggregate] = field(default_factory=list)
189
- total_duration_seconds: float = 0.0
190
-
191
- @property
192
- def n_pipelines_succeeded(self) -> int:
193
- return sum(1 for r in self.per_doc_results if r.succeeded)
194
-
195
- @property
196
- def n_pipelines_failed(self) -> int:
197
- return sum(1 for r in self.per_doc_results if not r.succeeded)
198
-
199
- def aggregate_for_step(self, step_name: str) -> Optional[StepAggregate]:
200
- for agg in self.per_step_aggregates:
201
- if agg.step_name == step_name:
202
- return agg
203
- return None
204
-
205
-
206
- # ──────────────────────────────────────────────────────────────────────────
207
- # Classification des erreurs
208
- # ──────────────────────────────────────────────────────────────────────────
209
-
210
-
211
- _ERROR_PATTERNS: tuple[tuple[str, str], ...] = (
212
- ("entrée manquante", "missing_input"),
213
- ("sortie manquante", "missing_output"),
214
- ("Error", "raised_exception"), # RuntimeError, ValueError…
215
- )
216
-
217
-
218
- def _classify_error(message: str) -> str:
219
- """Heuristique simple pour catégoriser une erreur d'étape.
220
-
221
- On regarde des marqueurs lexicaux dans le message (les messages
222
- sont produits par ``pipeline_runner._run_step`` qui les contrôle
223
- entièrement, donc cette heuristique est stable).
224
- """
225
- if not message:
226
- return "other"
227
- for pattern, label in _ERROR_PATTERNS:
228
- if pattern in message:
229
- return label
230
- return "other"
231
-
232
-
233
- # ──────────────────────────────────────────────────────────────────────────
234
- # Agrégation
235
- # ──────────────────────────────────────────────────────────────────────────
236
-
237
-
238
- def _aggregate_step(
239
- step_name: str, per_doc: list[tuple[str, Any]],
240
- ) -> StepAggregate:
241
- """Construit le ``StepAggregate`` pour une étape donnée.
242
-
243
- ``per_doc`` est une liste de tuples ``(doc_id, step_result)`` où
244
- ``step_result`` peut être ``None`` (cas où la pipeline a été
245
- arrêtée en amont avant cette étape) ou un ``StepResult``.
246
- """
247
- agg = StepAggregate(step_name=step_name)
248
- durations_succeeded: list[float] = []
249
- metrics_by_type: dict[str, dict[str, list[float]]] = {}
250
-
251
- for doc_id, sr in per_doc:
252
- if sr is None:
253
- # L'étape n'a même pas été exécutée (validation amont
254
- # invalide, ou exécutée n'a pas atteint l'index — ne se
255
- # produit pas en séquentiel mais peut arriver avec un
256
- # DAG plus tard). On compte ce cas comme échec
257
- # explicite avec un type dédié.
258
- agg.n_docs += 1
259
- agg.n_failed += 1
260
- agg.failing_doc_ids.append(doc_id)
261
- agg.error_breakdown["pipeline_aborted"] = (
262
- agg.error_breakdown.get("pipeline_aborted", 0) + 1
263
- )
264
- continue
265
- agg.n_docs += 1
266
- if sr.error is None:
267
- agg.n_succeeded += 1
268
- durations_succeeded.append(sr.duration_seconds)
269
- # Collecte des métriques pour agrégation moyenne/médiane
270
- for at_value, metrics in sr.junction_metrics.items():
271
- slot = metrics_by_type.setdefault(at_value, {})
272
- for mname, mvalue in metrics.items():
273
- if isinstance(mvalue, (int, float)) and not isinstance(
274
- mvalue, bool,
275
- ):
276
- slot.setdefault(mname, []).append(float(mvalue))
277
- else:
278
- agg.n_failed += 1
279
- agg.failing_doc_ids.append(doc_id)
280
- label = _classify_error(sr.error)
281
- agg.error_breakdown[label] = (
282
- agg.error_breakdown.get(label, 0) + 1
283
- )
284
-
285
- if durations_succeeded:
286
- agg.duration_seconds_total = sum(durations_succeeded)
287
- agg.duration_seconds_mean = statistics.fmean(durations_succeeded)
288
- agg.duration_seconds_median = statistics.median(durations_succeeded)
289
-
290
- for at_value, metrics in metrics_by_type.items():
291
- agg.junction_metrics[at_value] = {
292
- mname: {
293
- "mean": statistics.fmean(values),
294
- "median": statistics.median(values),
295
- "n": len(values),
296
- }
297
- for mname, values in metrics.items()
298
- }
299
- # Phase 4-bis : double-clé legacy/canonique pour rétrocompat.
300
- from picarones.domain.artifacts import expand_legacy_keys
301
- expand_legacy_keys(agg.junction_metrics)
302
- return agg
303
-
304
-
305
- # ──────────────────────────────────────────────────────────────────────────
306
- # Orchestrateur principal
307
- # ──────────────────────────────────────────────────────────────────────────
308
-
309
-
310
- def run_pipeline_benchmark(
311
- spec: PipelineSpec,
312
- corpus: Corpus,
313
- initial_inputs_factory: InitialInputsFactory = default_initial_inputs,
314
- ) -> PipelineBenchmarkResult:
315
- """Exécute ``spec`` sur tous les documents de ``corpus``.
316
-
317
- Parameters
318
- ----------
319
- spec:
320
- Spécification de la pipeline composée. Toutes les étapes
321
- sont des ``BaseModule`` fournis par l'utilisateur.
322
- corpus:
323
- Corpus chargé via ``Corpus.from_directory`` ou équivalent.
324
- initial_inputs_factory:
325
- Fonction qui produit, pour chaque document, les artefacts
326
- d'entrée de la pipeline. Par défaut : ``IMAGE`` depuis
327
- ``document.image_path``. L'utilisateur peut fournir une
328
- factory personnalisée pour brancher d'autres sources
329
- (par exemple ``ALTO`` pré-existant pour évaluer un
330
- pipeline qui démarre par un re-segmenteur).
331
-
332
- Returns
333
- -------
334
- PipelineBenchmarkResult
335
- Résultat global avec ``per_doc_results``,
336
- ``per_step_aggregates``, durée totale.
337
-
338
- Comportement
339
- ------------
340
- L'orchestration est **séquentielle** par document. Phase 7.B.3 :
341
- la spec canonique est planifiée une seule fois (économie N-1
342
- plans) puis ``PipelineExecutor.run_plan`` est appelé pour chaque
343
- document. Quel que soit le résultat (réussi, partiellement
344
- échoué, totalement invalide), le résultat est ajouté à
345
- ``per_doc_results`` et le benchmark continue avec le document
346
- suivant.
347
-
348
- Si la spec est statiquement invalide (cf. ``PipelineSpec.validate``
349
- ou ``PipelinePlanner.plan``), tous les documents auront un
350
- ``PipelineResult.error`` non vide et aucune étape ne sera
351
- exécutée — le résultat reste cohérent.
352
- """
353
- result = PipelineBenchmarkResult(
354
- pipeline_name=spec.name, corpus_name=corpus.name,
355
- )
356
- documents = list(corpus.documents)
357
- result.n_docs = len(documents)
358
-
359
- # Validation amont legacy : si la pipeline est statiquement
360
- # invalide, on n'exécute aucun document mais on remplit quand
361
- # même per_doc_results avec des PipelineResult.error pour
362
- # préserver l'invariant ``n_docs == len(per_doc_results)``.
363
- initial_input_types = _initial_input_types_for_corpus(
364
- documents, initial_inputs_factory,
365
- )
366
- problems = spec.validate(initial_input_types)
367
- if problems:
368
- error_msg = " ; ".join(problems)
369
- for doc in documents:
370
- result.per_doc_results.append(
371
- PipelineResult(
372
- pipeline_name=spec.name,
373
- doc_id=doc.doc_id,
374
- error=error_msg,
375
- ),
376
- )
377
- # Agrégation : aucune étape exécutée → tous les step_results
378
- # sont None.
379
- for step in spec.steps:
380
- per_doc_step = [(pr.doc_id, None) for pr in result.per_doc_results]
381
- result.per_step_aggregates.append(
382
- _aggregate_step(step.name, per_doc_step),
383
- )
384
- return result
385
-
386
- # Planification canonique unique pour tout le corpus.
387
- canonical_spec, _ = legacy_spec_to_canonical_spec(spec, initial_input_types)
388
- planner = PipelinePlanner()
389
- try:
390
- plan = planner.plan(canonical_spec)
391
- except Exception as exc: # noqa: BLE001
392
- # Cohérent avec le format legacy : tous les documents
393
- # remontent l'erreur planning.
394
- logger.warning(
395
- "[pipeline_benchmark] planning a levé sur %s : %s",
396
- spec.name, exc,
397
- )
398
- msg = f"planning_error: {type(exc).__name__}: {exc}"
399
- for doc in documents:
400
- result.per_doc_results.append(
401
- PipelineResult(
402
- pipeline_name=spec.name, doc_id=doc.doc_id, error=msg,
403
- ),
404
- )
405
- return result
406
-
407
- benchmark_t0 = time.monotonic()
408
- for doc in documents:
409
- try:
410
- initial = initial_inputs_factory(doc)
411
- except Exception as exc: # noqa: BLE001
412
- logger.warning(
413
- "[pipeline_benchmark] factory a levé sur %s : %s",
414
- doc.doc_id, exc,
415
- )
416
- failed = PipelineResult(
417
- pipeline_name=spec.name, doc_id=doc.doc_id,
418
- error=f"initial_inputs_factory: {type(exc).__name__}: {exc}",
419
- )
420
- result.per_doc_results.append(failed)
421
- continue
422
- per_doc = _run_one_document_via_canonical(spec, doc, initial, plan)
423
- result.per_doc_results.append(per_doc)
424
- result.total_duration_seconds = time.monotonic() - benchmark_t0
425
-
426
- # Agrégation par étape (logique inchangée).
427
- step_names = [step.name for step in spec.steps]
428
- for idx, step_name in enumerate(step_names):
429
- per_doc_step: list[tuple[str, Any]] = []
430
- for pr in result.per_doc_results:
431
- if idx < len(pr.steps):
432
- per_doc_step.append((pr.doc_id, pr.steps[idx]))
433
- else:
434
- per_doc_step.append((pr.doc_id, None))
435
- result.per_step_aggregates.append(
436
- _aggregate_step(step_name, per_doc_step),
437
- )
438
-
439
- return result
440
-
441
-
442
- # ──────────────────────────────────────────────────────────────────────────
443
- # Phase 7.B.3 — exécution mono-document via le canonique
444
- # ──────────────────────────────────────────────────────────────────────────
445
-
446
-
447
- def _initial_input_types_for_corpus(
448
- documents: list[Document],
449
- factory: InitialInputsFactory,
450
- ) -> tuple[ArtifactType, ...]:
451
- """Inspecte le premier document pour déduire les types initiaux.
452
-
453
- Sprint 64 : la factory peut produire des types différents par
454
- document (rare, mais possible). Pour la planification corpus-wide,
455
- on prend ceux du premier document avec une factory réussie. Si la
456
- factory lève sur tous les documents, on retourne ``()`` — la
457
- validation amont remontera les inputs manquants par document.
458
- """
459
- for doc in documents:
460
- try:
461
- initial = factory(doc)
462
- except Exception: # noqa: BLE001
463
- continue
464
- return tuple(initial.keys())
465
- return ()
466
-
467
-
468
- def _run_one_document_via_canonical(
469
- spec: PipelineSpec,
470
- document: Document,
471
- initial_inputs: dict[ArtifactType, Any],
472
- plan,
473
- ) -> PipelineResult:
474
- """Exécute ``spec`` sur ``document`` via le ``ExecutionPlan``
475
- pré-calculé du corpus.
476
-
477
- Le plan canonique est partagé entre tous les documents (économie
478
- de planification). L'``adapter_resolver`` et le registre de
479
- payloads sont créés par doc — exigence du contrat
480
- :class:`_BaseModuleAdapter`.
481
- """
482
- registry = _PayloadRegistry()
483
- canonical_inputs = wrap_initial_inputs(
484
- initial_inputs, registry, document.doc_id,
485
- )
486
- adapter_map = {
487
- step.name: _BaseModuleAdapter(step.module, registry)
488
- for step in spec.steps
489
- }
490
- document_ref = DocumentRef(id=document.doc_id)
491
- context = RunContext(
492
- document_id=document.doc_id,
493
- code_version="legacy_runner",
494
- pipeline_name=spec.name,
495
- )
496
- executor = PipelineExecutor(adapter_resolver=adapter_map.__getitem__)
497
- try:
498
- canonical_result = executor.run_plan(
499
- plan, document_ref, canonical_inputs, context,
500
- )
501
- except PipelineSpecInvalid as exc: # pragma: no cover
502
- # Branche défensive : ne devrait pas arriver puisque le plan
503
- # a déjà été validé par le planner en amont (avant la boucle
504
- # documents). L'executor ne peut pas re-lever
505
- # PipelineSpecInvalid à ``run_plan`` qui consomme un plan
506
- # déjà validé — mais on défend en profondeur.
507
- return PipelineResult(
508
- pipeline_name=spec.name, doc_id=document.doc_id,
509
- error=f"executor_run_failed: {exc}",
510
- )
511
- return build_legacy_pipeline_result(
512
- spec, document, canonical_result, registry,
513
- )
514
-
515
-
516
- __all__ = [
517
- "InitialInputsFactory",
518
- "PipelineBenchmarkResult",
519
- "StepAggregate",
520
- "default_initial_inputs",
521
- "run_pipeline_benchmark",
522
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
picarones/pipeline/legacy_pipeline_comparison.py DELETED
@@ -1,307 +0,0 @@
1
- """Comparaison de N pipelines sur le même corpus — Sprint 65 (axe B).
2
-
3
- Phase 5.C.batch7 — module relocalisé depuis
4
- ``picarones.measurements.pipeline_comparison`` vers
5
- ``picarones.evaluation.pipeline_comparison``. Le chemin legacy
6
- reste disponible via un shim avec ``DeprecationWarning`` ;
7
- suppression prévue en 2.0.
8
-
9
- Sprint 65 — Étape 4 / axe B du plan d'évolution 2026 : suite directe
10
- des Sprints 63-64. Le runner mono-document (Sprint 63) et
11
- l'orchestration corpus-wide (Sprint 64) permettent d'évaluer **une**
12
- pipeline composée ; ce sprint répond à la question typique BnF :
13
-
14
- « OCR seul vs OCR+correcteur A vs OCR+correcteur B :
15
- laquelle est la meilleure sur mon corpus, et de combien ? »
16
-
17
- Philosophie inchangée
18
- ---------------------
19
- Picarones reste un **banc d'essai** — on juge des pipelines tierces
20
- sur le **même corpus** avec la **même GT**, en exposant des chiffres
21
- bruts comparatifs. Aucun verdict imposé : le chercheur lit le
22
- ranking et la table de gain et conclut selon ses critères.
23
-
24
- Périmètre Sprint 65
25
- -------------------
26
- Inclus :
27
-
28
- - ``compare_pipelines(specs, corpus, factories=None)`` qui exécute
29
- séquentiellement N pipelines sur le même corpus.
30
- - ``PipelineComparisonResult`` : conteneur avec
31
- ``per_pipeline: dict[name → PipelineBenchmarkResult]``,
32
- ``ranking_by_final_metric(artifact_type, metric_name,
33
- higher_is_better)`` qui retourne ``[(pipeline_name, score), ...]``
34
- trié, et ``gain_table(artifact_type, metric_name,
35
- baseline_pipeline)`` qui retourne pour chaque pipeline le
36
- ``{absolute, relative}`` vs baseline.
37
- - ``factories``: dict ``{pipeline_name: InitialInputsFactory}`` pour
38
- personnaliser les entrées initiales par pipeline (utile pour
39
- comparer une pipeline qui démarre par IMAGE et une qui démarre
40
- par TEXT).
41
- - Garde-fou : noms de pipelines uniques exigés.
42
-
43
- Reporté à des sprints suivants :
44
-
45
- - DAG branchant non séquentiel (Sprint 66).
46
- - Vue HTML dédiée à la comparaison de pipelines (Sprint 67+).
47
- - Tests statistiques (Wilcoxon, Friedman, Nemenyi) sur les
48
- pipelines composées — déjà disponibles côté OCR (Sprint 18) ;
49
- l'application au cadre pipeline arrive plus tard.
50
- """
51
-
52
- from __future__ import annotations
53
-
54
- import logging
55
- import time
56
- from dataclasses import dataclass, field
57
- from typing import Optional
58
-
59
- from picarones.evaluation.corpus import Corpus
60
- from picarones.domain.artifacts import ArtifactType
61
- from picarones.pipeline.legacy_pipeline_benchmark import (
62
- InitialInputsFactory,
63
- PipelineBenchmarkResult,
64
- default_initial_inputs,
65
- run_pipeline_benchmark,
66
- )
67
- from picarones.pipeline.legacy_runner import PipelineSpec
68
-
69
- logger = logging.getLogger(__name__)
70
-
71
-
72
- # ──────────────────────────────────────────────────────────────────────────
73
- # Conteneur de résultats
74
- # ──────────────────────────────────────────────────────────────────────────
75
-
76
-
77
- @dataclass
78
- class PipelineComparisonResult:
79
- """Résultat de la comparaison de N pipelines sur un corpus.
80
-
81
- Champs
82
- ------
83
- corpus_name:
84
- Nom du corpus (commun à toutes les pipelines comparées).
85
- n_docs:
86
- Nombre de documents du corpus.
87
- per_pipeline:
88
- Map ``{pipeline_name: PipelineBenchmarkResult}``. L'ordre
89
- d'insertion suit l'ordre des ``specs`` passées à
90
- ``compare_pipelines`` ; on s'appuie sur le ``dict`` ordonné
91
- de Python 3.7+.
92
- total_duration_seconds:
93
- Durée totale de la comparaison (sommes des durées par
94
- pipeline + petit overhead).
95
- """
96
-
97
- corpus_name: str
98
- n_docs: int = 0
99
- per_pipeline: dict[str, PipelineBenchmarkResult] = field(
100
- default_factory=dict,
101
- )
102
- total_duration_seconds: float = 0.0
103
-
104
- def pipeline_names(self) -> list[str]:
105
- """Retourne la liste des noms de pipelines dans leur ordre
106
- d'insertion (= ordre de la comparaison initiale)."""
107
- return list(self.per_pipeline.keys())
108
-
109
- def _final_metric_value(
110
- self,
111
- pipeline_name: str,
112
- artifact_type: ArtifactType,
113
- metric_name: str,
114
- ) -> Optional[float]:
115
- """Retourne le ``mean`` de la métrique demandée à la
116
- **dernière étape** de la pipeline qui a produit
117
- ``artifact_type`` (avec succès sur ≥ 1 doc), ou ``None``
118
- si la métrique n'est pas disponible.
119
-
120
- Cohérent avec ``PipelineResult.junction_metrics_for`` du
121
- Sprint 63 mais au niveau corpus-wide.
122
- """
123
- bench = self.per_pipeline.get(pipeline_name)
124
- if bench is None:
125
- return None
126
- from picarones.domain.artifacts import LEGACY_VALUE_ALIASES
127
- legacy_alias = LEGACY_VALUE_ALIASES.get(artifact_type.value)
128
- for agg in reversed(bench.per_step_aggregates):
129
- type_metrics = agg.junction_metrics.get(artifact_type.value)
130
- if not type_metrics and legacy_alias is not None:
131
- # Phase 4-bis : un caller (typiquement les tests
132
- # ou un agrégateur tiers) peut avoir construit le
133
- # dict avec la clé legacy ``"text"`` au lieu de la
134
- # canonique ``"raw_text"``. expand_legacy_keys
135
- # synchronise les deux côtés sur les sites
136
- # d'écriture du runner — ce fallback couvre le
137
- # reste.
138
- type_metrics = agg.junction_metrics.get(legacy_alias)
139
- if not type_metrics:
140
- continue
141
- stats = type_metrics.get(metric_name)
142
- if stats is None:
143
- continue
144
- return stats["mean"]
145
- return None
146
-
147
- def ranking_by_final_metric(
148
- self,
149
- artifact_type: ArtifactType,
150
- metric_name: str,
151
- higher_is_better: bool = False,
152
- ) -> list[tuple[str, Optional[float]]]:
153
- """Classe les pipelines par la valeur **finale** de
154
- ``metric_name`` à la jonction ``artifact_type``.
155
-
156
- Returns
157
- -------
158
- list[tuple[str, Optional[float]]]
159
- Liste ``[(pipeline_name, mean_value)]`` triée :
160
-
161
- - Les pipelines avec une valeur définie viennent en
162
- premier, triées selon ``higher_is_better``.
163
- - Les pipelines sans valeur (métrique absente) viennent
164
- en queue, dans leur ordre d'insertion.
165
- """
166
- with_value: list[tuple[str, float]] = []
167
- without_value: list[tuple[str, Optional[float]]] = []
168
- for name in self.pipeline_names():
169
- value = self._final_metric_value(name, artifact_type, metric_name)
170
- if value is None:
171
- without_value.append((name, None))
172
- else:
173
- with_value.append((name, value))
174
- with_value.sort(
175
- key=lambda pair: pair[1],
176
- reverse=higher_is_better,
177
- )
178
- return [*with_value, *without_value]
179
-
180
- def gain_table(
181
- self,
182
- artifact_type: ArtifactType,
183
- metric_name: str,
184
- baseline_pipeline: str,
185
- ) -> dict[str, dict[str, Optional[float]]]:
186
- """Calcule l'écart de chaque pipeline vs la baseline.
187
-
188
- Returns
189
- -------
190
- dict
191
- Map ``{pipeline_name: {"value", "absolute", "relative"}}``
192
- où :
193
-
194
- - ``value`` : valeur finale de la métrique pour cette
195
- pipeline (``None`` si absente).
196
- - ``absolute`` : ``value - baseline_value``
197
- (``None`` si l'une des deux est absente).
198
- - ``relative`` : ``(value - baseline_value) /
199
- baseline_value`` (``None`` si baseline absente ou
200
- égale à 0).
201
-
202
- La baseline elle-même apparaît avec ``absolute == 0`` et
203
- ``relative == 0``.
204
- """
205
- if baseline_pipeline not in self.per_pipeline:
206
- raise KeyError(
207
- f"baseline {baseline_pipeline!r} absente de la comparaison",
208
- )
209
- baseline_value = self._final_metric_value(
210
- baseline_pipeline, artifact_type, metric_name,
211
- )
212
- out: dict[str, dict[str, Optional[float]]] = {}
213
- for name in self.pipeline_names():
214
- value = self._final_metric_value(
215
- name, artifact_type, metric_name,
216
- )
217
- absolute: Optional[float]
218
- relative: Optional[float]
219
- if value is None or baseline_value is None:
220
- absolute = None
221
- relative = None
222
- else:
223
- absolute = value - baseline_value
224
- relative = (
225
- (value - baseline_value) / baseline_value
226
- if baseline_value != 0 else None
227
- )
228
- out[name] = {
229
- "value": value,
230
- "absolute": absolute,
231
- "relative": relative,
232
- }
233
- return out
234
-
235
-
236
- # ──────────────────────────────────────────────────────────────────────────
237
- # Orchestrateur
238
- # ──────────────────────────────────────────────────────────────────────────
239
-
240
-
241
- def compare_pipelines(
242
- specs: list[PipelineSpec],
243
- corpus: Corpus,
244
- factories: Optional[dict[str, InitialInputsFactory]] = None,
245
- ) -> PipelineComparisonResult:
246
- """Exécute N ``PipelineSpec`` sur le **même** ``corpus``.
247
-
248
- Parameters
249
- ----------
250
- specs:
251
- Liste de ``PipelineSpec``. Les noms de pipelines doivent
252
- être uniques (sinon ``ValueError``).
253
- corpus:
254
- Corpus partagé entre toutes les pipelines comparées —
255
- c'est le point fort du sprint : même corpus, même GT, on
256
- peut comparer apple-to-apple.
257
- factories:
258
- Optionnel. Si fourni, dict ``{pipeline_name:
259
- InitialInputsFactory}`` pour personnaliser les entrées
260
- initiales par pipeline. Les pipelines absentes du dict
261
- utilisent ``default_initial_inputs`` (cas standard
262
- ``IMAGE`` depuis ``Document.image_path``).
263
-
264
- Returns
265
- -------
266
- PipelineComparisonResult
267
- Conteneur avec ``per_pipeline`` indexé par nom et
268
- utilitaires comparatifs (``ranking_by_final_metric``,
269
- ``gain_table``).
270
-
271
- Raises
272
- ------
273
- ValueError
274
- Si deux ``PipelineSpec`` ont le même nom (impossible alors
275
- de les distinguer dans le résultat).
276
- """
277
- names = [s.name for s in specs]
278
- if len(set(names)) != len(names):
279
- seen: set[str] = set()
280
- duplicates: list[str] = []
281
- for n in names:
282
- if n in seen:
283
- duplicates.append(n)
284
- seen.add(n)
285
- raise ValueError(
286
- f"noms de pipelines non uniques : {sorted(set(duplicates))}",
287
- )
288
-
289
- factories = factories or {}
290
- result = PipelineComparisonResult(
291
- corpus_name=corpus.name,
292
- n_docs=len(list(corpus.documents)),
293
- )
294
-
295
- t0 = time.monotonic()
296
- for spec in specs:
297
- factory = factories.get(spec.name, default_initial_inputs)
298
- bench = run_pipeline_benchmark(spec, corpus, factory)
299
- result.per_pipeline[spec.name] = bench
300
- result.total_duration_seconds = time.monotonic() - t0
301
- return result
302
-
303
-
304
- __all__ = [
305
- "PipelineComparisonResult",
306
- "compare_pipelines",
307
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
picarones/pipeline/legacy_runner.py DELETED
@@ -1,487 +0,0 @@
1
- """Banc d'essai de pipelines composées — Sprint 63 (axe B).
2
-
3
- Phase 5.C.batch7 — module relocalisé depuis
4
- ``picarones.core.pipeline`` vers ``picarones.evaluation.pipeline``.
5
- Shim ``picarones.core.pipeline`` retiré au Lot C (2026-05-07).
6
-
7
- Phase 7.B.2 — module relocalisé une seconde fois
8
- ------------------------------------------------
9
- ``picarones.evaluation.pipeline`` → ``picarones.pipeline.legacy_runner``.
10
- La délégation à :class:`PipelineExecutor` (ci-dessous) exige d'importer
11
- la couche ``pipeline/``, ce que la règle d'architecture concentrique
12
- interdit à ``evaluation/`` (whitelist externe restreinte, pas de
13
- dépendance sortante vers une couche plus externe — cf. CLAUDE.md
14
- § "architecture des couches"). Le module bridge legacy ↔ canonique
15
- vit donc dans la couche ``pipeline/``. ``picarones.evaluation.pipeline``
16
- reste exposé en re-export shim le temps que les callers historiques
17
- migrent.
18
-
19
- Phase 7.B.2 — délégation au ``PipelineExecutor`` canonique
20
- ----------------------------------------------------------
21
- Depuis 2026-05, ``PipelineRunner.run`` ne porte **plus** sa propre
22
- boucle d'exécution. Le corps de la méthode délègue intégralement à
23
- :class:`picarones.pipeline.executor.PipelineExecutor` via le wrapper
24
- :class:`picarones.pipeline._legacy_module_adapter._BaseModuleAdapter`
25
- (créé en 7.B.1). Le runner ne conserve que :
26
-
27
- 1. La validation amont legacy (préservation des messages d'erreur
28
- français du Sprint 63 — ``"étape N (X) demande Y qui n'est ni…"``).
29
- 2. La traduction des résultats canoniques (``pipeline.types.StepResult``
30
- Pydantic) vers les types legacy (``StepResult``, ``PipelineResult``
31
- dataclass) attendus par les ~440 tests existants.
32
- 3. Le calcul des ``junction_metrics`` aux jonctions GT-vs-sortie —
33
- le canonique laisse cette responsabilité au caller (`MetricRegistry`
34
- intégré au planner mais évaluation déférée).
35
-
36
- Cela élimine la duplication de moteur d'exécution (un seul code
37
- path) tout en préservant intégralement l'API publique du Sprint 63
38
- le temps que la sub-phase 7.C migre les tests vers le canonique
39
- direct, puis 7.D supprime le runner legacy.
40
-
41
- Sprint 63 — Étape 4 / axe B du plan d'évolution 2026 : démarrage du
42
- banc d'essai de pipelines.
43
-
44
- Philosophie
45
- -----------
46
- Picarones est un **banc d'essai**, pas un atelier de production.
47
- Cette infrastructure permet d'**évaluer des pipelines composées de
48
- modules tiers** que l'utilisateur amène — par exemple :
49
-
50
- - ``[OCR(image→texte)] → [reconstructeur ALTO tiers(texte→ALTO)]``
51
- - ``[VLM(image→ALTO)] → [post-processing tiers(ALTO→ALTO)]``
52
- - ``[OCR(image→texte)] → [LLM correcteur(texte→texte)]``
53
-
54
- Picarones **ne fournit aucun module métier** (pas de
55
- reconstructeur ALTO, pas de correcteur, pas de re-segmenteur).
56
- L'utilisateur branche ses propres ``BaseModule`` (Sprint 33), le
57
- runner orchestre l'exécution séquentielle, valide les types aux
58
- jonctions et **évalue automatiquement** chaque artefact produit
59
- contre la GT du même niveau (Sprint 32) en sélectionnant les
60
- métriques pertinentes du registre typé (Sprint 34).
61
-
62
- Périmètre Sprint 63
63
- -------------------
64
- Inclus :
65
-
66
- - Spécification déclarative d'une pipeline séquentielle.
67
- - Exécution sur un seul document avec passage typé d'artefacts.
68
- - Validation des types aux jonctions inter-modules.
69
- - Évaluation automatique aux jonctions GT-vs-sortie pour chaque
70
- niveau de GT disponible sur le document.
71
- - Mesure du temps par étape.
72
- - Capture gracieuse des erreurs (un module qui lève n'arrête pas
73
- les étapes suivantes — leur entrée manquante est rapportée
74
- comme erreur explicite).
75
-
76
- Reporté à des sprints dédiés :
77
-
78
- - DAG branchant non séquentiel (1 → {2, 3} → 4) — Sprint 64+.
79
- - Orchestration corpus-wide + agrégation par pipeline — Sprint 65+.
80
- - Vue HTML dédiée aux pipelines composées — Sprint 66+.
81
- - Cache d'artefacts intermédiaires — non prévu.
82
- - Parallélisation inter-étapes — non prévue (les modules
83
- ``execution_mode`` sont déjà respectés par le runner historique
84
- pour le bench OCR mono-étage).
85
- """
86
-
87
- from __future__ import annotations
88
-
89
- import logging
90
- from dataclasses import dataclass, field
91
- from typing import Any, Optional
92
-
93
- from picarones.evaluation.corpus import Document
94
- from picarones.domain.artifacts import ArtifactType
95
- from picarones.domain.module_protocol import BaseModule
96
- from picarones.pipeline._legacy_translator import ( # noqa: F401
97
- # ``_artifact_type_to_gt_level`` et ``_gt_payload_to_value`` sont
98
- # ré-exportés (alias avec préfixe ``_``) pour préserver l'API
99
- # privée historique consommée par quelques tests intégration.
100
- # Suppression en sub-phase 7.D avec le runner lui-même.
101
- artifact_type_to_gt_level as _artifact_type_to_gt_level,
102
- build_legacy_pipeline_result,
103
- execute_legacy_spec_via_canonical,
104
- gt_payload_to_value as _gt_payload_to_value,
105
- )
106
-
107
- # Sprint A3 (renforce la règle Cercle 1 → Cercle 1 uniquement) — la
108
- # cérémonie d'eager-load des métriques typées (Sprint 34) qui vivait
109
- # ici a été déplacée dans ``picarones/measurements/__init__.py``. Tout
110
- # consommateur de ``compute_at_junction`` (typiquement la classe
111
- # ``PipelineRunner`` ci-dessous) doit avoir importé
112
- # ``picarones.measurements`` au moins une fois — c'est le cas dans
113
- # l'API publique via ``picarones.__init__`` qui déclenche le trigger.
114
-
115
- logger = logging.getLogger(__name__)
116
-
117
-
118
- # Phase 7.B.3 : ``_artifact_type_to_gt_level`` et ``_gt_payload_to_value``
119
- # ont migré vers :mod:`picarones.pipeline._legacy_translator` et sont
120
- # ré-exportés via les imports en tête de module pour préserver l'API
121
- # ``from picarones.pipeline.legacy_runner import _artifact_type_to_gt_level``
122
- # qui est utilisée par les anciens tests intégration (sera supprimée
123
- # en 7.D avec le runner lui-même).
124
-
125
-
126
- # ──────────────────────────────────────────────────────────────────────────
127
- # PipelineStep + PipelineSpec
128
- # ──────────────────────────────────────────────────────────────────────────
129
-
130
-
131
- @dataclass
132
- class PipelineStep:
133
- """Une étape dans une pipeline composée.
134
-
135
- L'étape porte un nom lisible (utile pour le rapport et le
136
- diagnostic) et une instance de ``BaseModule`` fournie par
137
- l'utilisateur. Les types d'entrée et de sortie ne sont pas
138
- redéclarés ici : ils sont lus depuis le module lui-même
139
- (``module.input_types`` / ``module.output_types``).
140
-
141
- Sprint 66 — DAG branchant
142
- -------------------------
143
- ``inputs_from`` permet de désigner explicitement, pour chaque
144
- type d'entrée, l'étape source dont on veut consommer l'artefact.
145
- Utile quand plusieurs étapes antérieures produisent le même
146
- type et qu'on veut éviter l'écrasement implicite (par exemple
147
- deux correcteurs LLM en parallèle qui partent du même OCR).
148
-
149
- - ``inputs_from = {}`` (défaut) : pour chaque type d'entrée,
150
- le runner prend la version **la plus récente** disponible
151
- dans le bag (comportement Sprint 63, rétrocompat stricte).
152
- - ``inputs_from = {ArtifactType.TEXT: "ocr"}`` : exige la
153
- version du ``TEXT`` produite par l'étape nommée ``"ocr"``.
154
- Si cette étape n'existe pas ou n'a pas produit ce type,
155
- ``PipelineSpec.validate`` remonte un problème explicite et
156
- le runner remonte une erreur d'entrée manquante.
157
-
158
- La chaîne spéciale ``"__initial__"`` désigne les artefacts
159
- fournis dans ``initial_inputs`` (par exemple ``IMAGE``).
160
- """
161
-
162
- name: str
163
- module: BaseModule
164
- inputs_from: dict[ArtifactType, str] = field(default_factory=dict)
165
-
166
- @property
167
- def input_types(self) -> tuple[ArtifactType, ...]:
168
- return tuple(self.module.input_types)
169
-
170
- @property
171
- def output_types(self) -> tuple[ArtifactType, ...]:
172
- return tuple(self.module.output_types)
173
-
174
- def __repr__(self) -> str:
175
- ins = ",".join(t.value for t in self.input_types) or "·"
176
- outs = ",".join(t.value for t in self.output_types) or "·"
177
- if self.inputs_from:
178
- refs = ",".join(
179
- f"{t.value}@{src}" for t, src in self.inputs_from.items()
180
- )
181
- return f"PipelineStep({self.name}: [{refs}] → {outs})"
182
- return f"PipelineStep({self.name}: {ins} → {outs})"
183
-
184
-
185
- @dataclass
186
- class PipelineSpec:
187
- """DAG séquentiel de ``PipelineStep``.
188
-
189
- Sprint 63 — séquentiel uniquement : l'étape ``i+1`` consomme
190
- les artefacts produits par l'étape ``i`` (et tous les artefacts
191
- initiaux fournis au runner, par exemple l'image source).
192
-
193
- Le DAG branchant arrive dans un sprint dédié.
194
- """
195
-
196
- name: str
197
- steps: list[PipelineStep] = field(default_factory=list)
198
-
199
- def validate(self, initial_inputs: tuple[ArtifactType, ...]) -> list[str]:
200
- """Vérifie que les types s'enchaînent et retourne la liste
201
- des problèmes détectés (vide si la pipeline est valide).
202
-
203
- Une pipeline est valide si, pour chaque étape, tous les
204
- ``input_types`` sont disponibles : soit dans les
205
- ``initial_inputs`` (typiquement ``IMAGE``), soit produits
206
- par une étape antérieure.
207
-
208
- Sprint 66 — validation des références ``inputs_from`` :
209
- si une étape déclare ``inputs_from[type] = "foo"``,
210
- l'étape ``foo`` doit exister parmi les étapes antérieures
211
- et avoir ce type dans ses ``output_types``. La chaîne
212
- spéciale ``"__initial__"`` désigne les entrées initiales.
213
- """
214
- problems: list[str] = []
215
- if not self.steps:
216
- problems.append("pipeline vide : au moins une étape est requise")
217
- return problems
218
- # Map type → set des steps qui ont produit ce type
219
- # ("__initial__" pour les entrées initiales) — utilisé pour
220
- # valider les références ``inputs_from``.
221
- producers: dict[ArtifactType, set[str]] = {
222
- t: {"__initial__"} for t in initial_inputs
223
- }
224
- # Map step_name → set des types produits, pour la validation
225
- # des références.
226
- step_outputs: dict[str, set[ArtifactType]] = {
227
- "__initial__": set(initial_inputs),
228
- }
229
- # Set des types disponibles à un instant t (latest seulement).
230
- available: set[ArtifactType] = set(initial_inputs)
231
-
232
- for i, step in enumerate(self.steps):
233
- # 1. Toutes les entrées doivent être disponibles
234
- missing = [t for t in step.input_types if t not in available]
235
- if missing:
236
- miss_str = ",".join(t.value for t in missing)
237
- problems.append(
238
- f"étape {i} ({step.name}) demande {miss_str} "
239
- f"qui n'est ni dans les entrées initiales "
240
- f"ni produit par une étape antérieure"
241
- )
242
- # 2. Vérification des références ``inputs_from``
243
- for ref_type, ref_step in step.inputs_from.items():
244
- if ref_type not in step.input_types:
245
- problems.append(
246
- f"étape {i} ({step.name}) déclare "
247
- f"inputs_from[{ref_type.value}]={ref_step!r} "
248
- f"mais le module ne consomme pas ce type"
249
- )
250
- continue
251
- if ref_step not in step_outputs:
252
- problems.append(
253
- f"étape {i} ({step.name}) référence "
254
- f"inputs_from[{ref_type.value}]={ref_step!r} "
255
- f"qui n'est pas une étape antérieure connue"
256
- )
257
- continue
258
- if ref_type not in step_outputs[ref_step]:
259
- problems.append(
260
- f"étape {i} ({step.name}) référence "
261
- f"inputs_from[{ref_type.value}]={ref_step!r} "
262
- f"mais cette étape ne produit pas ce type"
263
- )
264
- # 3. Mise à jour pour les étapes suivantes
265
- available.update(step.output_types)
266
- step_outputs[step.name] = set(step.output_types)
267
- for out_type in step.output_types:
268
- producers.setdefault(out_type, set()).add(step.name)
269
- return problems
270
-
271
- def is_valid(self, initial_inputs: tuple[ArtifactType, ...]) -> bool:
272
- return not self.validate(initial_inputs)
273
-
274
- def __repr__(self) -> str:
275
- chain = " → ".join(str(s) for s in self.steps)
276
- return f"PipelineSpec({self.name}: {chain})"
277
-
278
-
279
- # ──────────────────────────────────────────────────────────────────────────
280
- # StepResult + PipelineResult
281
- # ──────────────────────────────────────────────────────────────────────────
282
-
283
-
284
- @dataclass
285
- class StepResult:
286
- """Résultat de l'exécution d'une étape sur un document.
287
-
288
- Champs
289
- ------
290
- step_name:
291
- Nom de l'étape (cf. ``PipelineStep.name``).
292
- duration_seconds:
293
- Temps d'exécution de ``module.process`` mesuré en wall-clock.
294
- output_types:
295
- Types effectivement présents dans la sortie (peut être un
296
- sous-ensemble de ``module.output_types`` si le module a
297
- omis un type — cas reporté ici comme info pour diagnostic).
298
- junction_metrics:
299
- Pour chaque type produit qui correspond à un ``GTLevel``
300
- dont le document porte une GT : dictionnaire ``{type: dict
301
- métriques}`` retourné par ``compute_at_junction``.
302
- error:
303
- ``None`` si l'étape s'est bien déroulée ; sinon message
304
- d'erreur (le module a levé, l'entrée est manquante, ou la
305
- validation des types a échoué).
306
- """
307
-
308
- step_name: str
309
- duration_seconds: float
310
- output_types: tuple[ArtifactType, ...]
311
- junction_metrics: dict[str, dict[str, Any]] = field(default_factory=dict)
312
- """Map ``{artifact_type_value: {metric_name: value}}``.
313
-
314
- La clé est la valeur string du ``ArtifactType`` (ex. ``"text"``,
315
- ``"alto"``) et non l'enum lui-même, pour faciliter la
316
- sérialisation JSON.
317
- """
318
- error: Optional[str] = None
319
-
320
-
321
- @dataclass
322
- class PipelineResult:
323
- """Résultat complet d'une exécution de pipeline sur un document.
324
-
325
- On capture la durée totale, la durée par étape et les
326
- métriques aux jonctions pour chaque artefact produit qui a une
327
- GT correspondante.
328
- """
329
-
330
- pipeline_name: str
331
- doc_id: str
332
- steps: list[StepResult] = field(default_factory=list)
333
- total_duration_seconds: float = 0.0
334
- error: Optional[str] = None
335
- """Erreur fatale au niveau pipeline (ex. validation des types
336
- en amont avant la première étape). ``None`` n'implique pas
337
- qu'aucune étape n'a échoué — voir ``StepResult.error`` pour le
338
- détail par étape."""
339
-
340
- @property
341
- def succeeded(self) -> bool:
342
- """Vrai si la pipeline s'est exécutée jusqu'au bout sans
343
- qu'aucune étape ne lève d'erreur."""
344
- if self.error is not None:
345
- return False
346
- return all(s.error is None for s in self.steps)
347
-
348
- @property
349
- def failing_steps(self) -> list[str]:
350
- """Noms des étapes ayant levé une erreur."""
351
- return [s.step_name for s in self.steps if s.error is not None]
352
-
353
- def junction_metrics_for(
354
- self, artifact_type: ArtifactType,
355
- ) -> Optional[dict[str, Any]]:
356
- """Retourne les métriques de la **dernière** étape qui a
357
- produit ``artifact_type``, ou ``None`` si aucune étape ne
358
- l'a produit avec succès.
359
-
360
- Utile pour comparer plusieurs pipelines qui produisent in
361
- fine le même type (ex. deux DAG aboutissant à du texte
362
- corrigé).
363
- """
364
- from picarones.domain.artifacts import LEGACY_VALUE_ALIASES
365
- legacy_alias = LEGACY_VALUE_ALIASES.get(artifact_type.value)
366
- for step in reversed(self.steps):
367
- if step.error is not None:
368
- continue
369
- metrics = step.junction_metrics.get(artifact_type.value)
370
- if metrics is None and legacy_alias is not None:
371
- # Phase 4-bis : un caller legacy peut avoir construit
372
- # le dict avec la clé pré-rewrite ("text" au lieu de
373
- # "raw_text"). expand_legacy_keys synchronise les deux
374
- # côtés sur les sites d'écriture du runner, mais des
375
- # StepResult construits à la main par les tests ou par
376
- # un caller externe peuvent encore avoir une seule
377
- # clé — on tolère.
378
- metrics = step.junction_metrics.get(legacy_alias)
379
- if metrics is not None:
380
- return metrics
381
- return None
382
-
383
-
384
- # ──────────────────────────────────────────────────────────────────────────
385
- # Exécuteur
386
- # ──────────────────────────────────────────────────────────────────────────
387
-
388
-
389
- class PipelineRunner:
390
- """Exécute une ``PipelineSpec`` sur un document.
391
-
392
- Sprint 63 — un seul document à la fois. L'orchestration
393
- corpus-wide et l'agrégation par pipeline sont reportées à un
394
- sprint dédié.
395
-
396
- Phase 7.B.2 — délégation au canonique
397
- --------------------------------------
398
- L'API publique (``run`` statique, types de retour ``PipelineResult``
399
- et ``StepResult`` legacy, format des messages d'erreur en français)
400
- est rigoureusement préservée pour rétrocompat. Le corps de
401
- ``run`` délègue à :class:`picarones.pipeline.executor.PipelineExecutor`
402
- via :class:`_BaseModuleAdapter` — il n'y a plus de code de
403
- boucle d'exécution dupliqué.
404
-
405
- Usage typique
406
- -------------
407
-
408
- >>> spec = PipelineSpec(
409
- ... name="ocr_then_rewrite",
410
- ... steps=[
411
- ... PipelineStep("ocr", my_ocr_module),
412
- ... PipelineStep("rewrite", my_llm_rewriter),
413
- ... ],
414
- ... )
415
- >>> runner = PipelineRunner()
416
- >>> result = runner.run(spec, document, {ArtifactType.IMAGE: "/path/img.png"})
417
- >>> result.succeeded
418
- True
419
- >>> result.junction_metrics_for(ArtifactType.TEXT)
420
- {'cer': 0.05, 'wer': 0.12, ...}
421
- """
422
-
423
- @staticmethod
424
- def run(
425
- spec: PipelineSpec,
426
- document: Document,
427
- initial_inputs: dict[ArtifactType, Any],
428
- ) -> PipelineResult:
429
- """Exécute ``spec`` sur ``document`` à partir de
430
- ``initial_inputs``.
431
-
432
- Parameters
433
- ----------
434
- spec:
435
- Spécification de la pipeline.
436
- document:
437
- Document du corpus, porteur de zéro ou plusieurs niveaux
438
- de GT (Sprint 32).
439
- initial_inputs:
440
- Artefacts initiaux par type — typiquement
441
- ``{ArtifactType.IMAGE: "/path/img.png"}`` pour une
442
- pipeline qui démarre par un OCR.
443
-
444
- Returns
445
- -------
446
- PipelineResult
447
- Résultat complet : durée totale, résultat par étape,
448
- métriques aux jonctions évaluées contre la GT.
449
- """
450
- result = PipelineResult(
451
- pipeline_name=spec.name, doc_id=document.doc_id,
452
- )
453
-
454
- # Validation amont legacy : si la pipeline est statiquement
455
- # invalide, on n'exécute aucune étape. Cette validation
456
- # produit des messages français spécifiques au Sprint 63
457
- # (cf. ``PipelineSpec.validate``) que les tests vérifient ;
458
- # le canonique a sa propre ``ValidationError`` au format
459
- # différent — d'où la double validation tant que les tests
460
- # legacy ne sont pas migrés (sub-phase 7.C).
461
- problems = spec.validate(tuple(initial_inputs.keys()))
462
- if problems:
463
- result.error = " ; ".join(problems)
464
- return result
465
-
466
- canonical_result, registry = execute_legacy_spec_via_canonical(
467
- spec, document, initial_inputs,
468
- )
469
- # ``build_legacy_pipeline_result`` reconstruit un PipelineResult
470
- # legacy complet (steps + total_duration) ; on transfère ses
471
- # champs sur l'instance ``result`` pour préserver le ``error``
472
- # déjà vide (pas de validation amont qui aurait court-circuité).
473
- rebuilt = build_legacy_pipeline_result(
474
- spec, document, canonical_result, registry,
475
- )
476
- result.steps = rebuilt.steps
477
- result.total_duration_seconds = rebuilt.total_duration_seconds
478
- return result
479
-
480
-
481
- __all__ = [
482
- "PipelineRunner",
483
- "PipelineResult",
484
- "PipelineSpec",
485
- "PipelineStep",
486
- "StepResult",
487
- ]