Picarones / picarones /measurements /pipeline_comparison.py
Claude
feat(migration): Phase 4-quater โ€” relocate core/corpus.py vers evaluation/
3300273 unverified
Raw
History Blame
11.4 kB
"""Comparaison de N pipelines sur le mรชme corpus โ€” Sprint 65 (axe B).
Sprint 65 โ€” ร‰tape 4 / axe B du plan d'รฉvolution 2026 : suite directe
des Sprints 63-64. Le runner mono-document (Sprint 63) et
l'orchestration corpus-wide (Sprint 64) permettent d'รฉvaluer **une**
pipeline composรฉe ; ce sprint rรฉpond ร  la question typique BnF :
ยซ OCR seul vs OCR+correcteur A vs OCR+correcteur B :
laquelle est la meilleure sur mon corpus, et de combien ? ยป
Philosophie inchangรฉe
---------------------
Picarones reste un **banc d'essai** โ€” on juge des pipelines tierces
sur le **mรชme corpus** avec la **mรชme GT**, en exposant des chiffres
bruts comparatifs. Aucun verdict imposรฉ : le chercheur lit le
ranking et la table de gain et conclut selon ses critรจres.
Pรฉrimรจtre Sprint 65
-------------------
Inclus :
- ``compare_pipelines(specs, corpus, factories=None)`` qui exรฉcute
sรฉquentiellement N pipelines sur le mรชme corpus.
- ``PipelineComparisonResult`` : conteneur avec
``per_pipeline: dict[name โ†’ PipelineBenchmarkResult]``,
``ranking_by_final_metric(artifact_type, metric_name,
higher_is_better)`` qui retourne ``[(pipeline_name, score), ...]``
triรฉ, et ``gain_table(artifact_type, metric_name,
baseline_pipeline)`` qui retourne pour chaque pipeline le
``{absolute, relative}`` vs baseline.
- ``factories``: dict ``{pipeline_name: InitialInputsFactory}`` pour
personnaliser les entrรฉes initiales par pipeline (utile pour
comparer une pipeline qui dรฉmarre par IMAGE et une qui dรฉmarre
par TEXT).
- Garde-fou : noms de pipelines uniques exigรฉs.
Reportรฉ ร  des sprints suivants :
- DAG branchant non sรฉquentiel (Sprint 66).
- Vue HTML dรฉdiรฉe ร  la comparaison de pipelines (Sprint 67+).
- Tests statistiques (Wilcoxon, Friedman, Nemenyi) sur les
pipelines composรฉes โ€” dรฉjร  disponibles cรดtรฉ OCR (Sprint 18) ;
l'application au cadre pipeline arrive plus tard.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from picarones.evaluation.corpus import Corpus
from picarones.domain.artifacts import ArtifactType
from picarones.measurements.pipeline_benchmark import (
InitialInputsFactory,
PipelineBenchmarkResult,
default_initial_inputs,
run_pipeline_benchmark,
)
from picarones.core.pipeline import PipelineSpec
logger = logging.getLogger(__name__)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Conteneur de rรฉsultats
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@dataclass
class PipelineComparisonResult:
"""Rรฉsultat de la comparaison de N pipelines sur un corpus.
Champs
------
corpus_name:
Nom du corpus (commun ร  toutes les pipelines comparรฉes).
n_docs:
Nombre de documents du corpus.
per_pipeline:
Map ``{pipeline_name: PipelineBenchmarkResult}``. L'ordre
d'insertion suit l'ordre des ``specs`` passรฉes ร 
``compare_pipelines`` ; on s'appuie sur le ``dict`` ordonnรฉ
de Python 3.7+.
total_duration_seconds:
Durรฉe totale de la comparaison (sommes des durรฉes par
pipeline + petit overhead).
"""
corpus_name: str
n_docs: int = 0
per_pipeline: dict[str, PipelineBenchmarkResult] = field(
default_factory=dict,
)
total_duration_seconds: float = 0.0
def pipeline_names(self) -> list[str]:
"""Retourne la liste des noms de pipelines dans leur ordre
d'insertion (= ordre de la comparaison initiale)."""
return list(self.per_pipeline.keys())
def _final_metric_value(
self,
pipeline_name: str,
artifact_type: ArtifactType,
metric_name: str,
) -> Optional[float]:
"""Retourne le ``mean`` de la mรฉtrique demandรฉe ร  la
**derniรจre รฉtape** de la pipeline qui a produit
``artifact_type`` (avec succรจs sur โ‰ฅ 1 doc), ou ``None``
si la mรฉtrique n'est pas disponible.
Cohรฉrent avec ``PipelineResult.junction_metrics_for`` du
Sprint 63 mais au niveau corpus-wide.
"""
bench = self.per_pipeline.get(pipeline_name)
if bench is None:
return None
from picarones.domain.artifacts import LEGACY_VALUE_ALIASES
legacy_alias = LEGACY_VALUE_ALIASES.get(artifact_type.value)
for agg in reversed(bench.per_step_aggregates):
type_metrics = agg.junction_metrics.get(artifact_type.value)
if not type_metrics and legacy_alias is not None:
# Phase 4-bis : un caller (typiquement les tests
# ou un agrรฉgateur tiers) peut avoir construit le
# dict avec la clรฉ legacy ``"text"`` au lieu de la
# canonique ``"raw_text"``. expand_legacy_keys
# synchronise les deux cรดtรฉs sur les sites
# d'รฉcriture du runner โ€” ce fallback couvre le
# reste.
type_metrics = agg.junction_metrics.get(legacy_alias)
if not type_metrics:
continue
stats = type_metrics.get(metric_name)
if stats is None:
continue
return stats["mean"]
return None
def ranking_by_final_metric(
self,
artifact_type: ArtifactType,
metric_name: str,
higher_is_better: bool = False,
) -> list[tuple[str, Optional[float]]]:
"""Classe les pipelines par la valeur **finale** de
``metric_name`` ร  la jonction ``artifact_type``.
Returns
-------
list[tuple[str, Optional[float]]]
Liste ``[(pipeline_name, mean_value)]`` triรฉe :
- Les pipelines avec une valeur dรฉfinie viennent en
premier, triรฉes selon ``higher_is_better``.
- Les pipelines sans valeur (mรฉtrique absente) viennent
en queue, dans leur ordre d'insertion.
"""
with_value: list[tuple[str, float]] = []
without_value: list[tuple[str, Optional[float]]] = []
for name in self.pipeline_names():
value = self._final_metric_value(name, artifact_type, metric_name)
if value is None:
without_value.append((name, None))
else:
with_value.append((name, value))
with_value.sort(
key=lambda pair: pair[1],
reverse=higher_is_better,
)
return [*with_value, *without_value]
def gain_table(
self,
artifact_type: ArtifactType,
metric_name: str,
baseline_pipeline: str,
) -> dict[str, dict[str, Optional[float]]]:
"""Calcule l'รฉcart de chaque pipeline vs la baseline.
Returns
-------
dict
Map ``{pipeline_name: {"value", "absolute", "relative"}}``
oรน :
- ``value`` : valeur finale de la mรฉtrique pour cette
pipeline (``None`` si absente).
- ``absolute`` : ``value - baseline_value``
(``None`` si l'une des deux est absente).
- ``relative`` : ``(value - baseline_value) /
baseline_value`` (``None`` si baseline absente ou
รฉgale ร  0).
La baseline elle-mรชme apparaรฎt avec ``absolute == 0`` et
``relative == 0``.
"""
if baseline_pipeline not in self.per_pipeline:
raise KeyError(
f"baseline {baseline_pipeline!r} absente de la comparaison",
)
baseline_value = self._final_metric_value(
baseline_pipeline, artifact_type, metric_name,
)
out: dict[str, dict[str, Optional[float]]] = {}
for name in self.pipeline_names():
value = self._final_metric_value(
name, artifact_type, metric_name,
)
absolute: Optional[float]
relative: Optional[float]
if value is None or baseline_value is None:
absolute = None
relative = None
else:
absolute = value - baseline_value
relative = (
(value - baseline_value) / baseline_value
if baseline_value != 0 else None
)
out[name] = {
"value": value,
"absolute": absolute,
"relative": relative,
}
return out
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Orchestrateur
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def compare_pipelines(
specs: list[PipelineSpec],
corpus: Corpus,
factories: Optional[dict[str, InitialInputsFactory]] = None,
) -> PipelineComparisonResult:
"""Exรฉcute N ``PipelineSpec`` sur le **mรชme** ``corpus``.
Parameters
----------
specs:
Liste de ``PipelineSpec``. Les noms de pipelines doivent
รชtre uniques (sinon ``ValueError``).
corpus:
Corpus partagรฉ entre toutes les pipelines comparรฉes โ€”
c'est le point fort du sprint : mรชme corpus, mรชme GT, on
peut comparer apple-to-apple.
factories:
Optionnel. Si fourni, dict ``{pipeline_name:
InitialInputsFactory}`` pour personnaliser les entrรฉes
initiales par pipeline. Les pipelines absentes du dict
utilisent ``default_initial_inputs`` (cas standard
``IMAGE`` depuis ``Document.image_path``).
Returns
-------
PipelineComparisonResult
Conteneur avec ``per_pipeline`` indexรฉ par nom et
utilitaires comparatifs (``ranking_by_final_metric``,
``gain_table``).
Raises
------
ValueError
Si deux ``PipelineSpec`` ont le mรชme nom (impossible alors
de les distinguer dans le rรฉsultat).
"""
names = [s.name for s in specs]
if len(set(names)) != len(names):
seen: set[str] = set()
duplicates: list[str] = []
for n in names:
if n in seen:
duplicates.append(n)
seen.add(n)
raise ValueError(
f"noms de pipelines non uniques : {sorted(set(duplicates))}",
)
factories = factories or {}
result = PipelineComparisonResult(
corpus_name=corpus.name,
n_docs=len(list(corpus.documents)),
)
t0 = time.monotonic()
for spec in specs:
factory = factories.get(spec.name, default_initial_inputs)
bench = run_pipeline_benchmark(spec, corpus, factory)
result.per_pipeline[spec.name] = bench
result.total_duration_seconds = time.monotonic() - t0
return result
__all__ = [
"PipelineComparisonResult",
"compare_pipelines",
]