Spaces:
Sleeping
Sleeping
File size: 11,396 Bytes
7c4ecda 3300273 75b91fd 979f3c3 7c4ecda 979f3c3 7c4ecda 75b91fd 7c4ecda 75b91fd 7c4ecda | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | """Comparaison de N pipelines sur le même corpus — Sprint 65 (axe B).
Sprint 65 — Étape 4 / axe B du plan d'évolution 2026 : suite directe
des Sprints 63-64. Le runner mono-document (Sprint 63) et
l'orchestration corpus-wide (Sprint 64) permettent d'évaluer **une**
pipeline composée ; ce sprint répond à la question typique BnF :
« OCR seul vs OCR+correcteur A vs OCR+correcteur B :
laquelle est la meilleure sur mon corpus, et de combien ? »
Philosophie inchangée
---------------------
Picarones reste un **banc d'essai** — on juge des pipelines tierces
sur le **même corpus** avec la **même GT**, en exposant des chiffres
bruts comparatifs. Aucun verdict imposé : le chercheur lit le
ranking et la table de gain et conclut selon ses critères.
Périmètre Sprint 65
-------------------
Inclus :
- ``compare_pipelines(specs, corpus, factories=None)`` qui exécute
séquentiellement N pipelines sur le même corpus.
- ``PipelineComparisonResult`` : conteneur avec
``per_pipeline: dict[name → PipelineBenchmarkResult]``,
``ranking_by_final_metric(artifact_type, metric_name,
higher_is_better)`` qui retourne ``[(pipeline_name, score), ...]``
trié, et ``gain_table(artifact_type, metric_name,
baseline_pipeline)`` qui retourne pour chaque pipeline le
``{absolute, relative}`` vs baseline.
- ``factories``: dict ``{pipeline_name: InitialInputsFactory}`` pour
personnaliser les entrées initiales par pipeline (utile pour
comparer une pipeline qui démarre par IMAGE et une qui démarre
par TEXT).
- Garde-fou : noms de pipelines uniques exigés.
Reporté à des sprints suivants :
- DAG branchant non séquentiel (Sprint 66).
- Vue HTML dédiée à la comparaison de pipelines (Sprint 67+).
- Tests statistiques (Wilcoxon, Friedman, Nemenyi) sur les
pipelines composées — déjà disponibles côté OCR (Sprint 18) ;
l'application au cadre pipeline arrive plus tard.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from picarones.evaluation.corpus import Corpus
from picarones.domain.artifacts import ArtifactType
from picarones.measurements.pipeline_benchmark import (
InitialInputsFactory,
PipelineBenchmarkResult,
default_initial_inputs,
run_pipeline_benchmark,
)
from picarones.core.pipeline import PipelineSpec
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────
# Conteneur de résultats
# ──────────────────────────────────────────────────────────────────────────
@dataclass
class PipelineComparisonResult:
"""Résultat de la comparaison de N pipelines sur un corpus.
Champs
------
corpus_name:
Nom du corpus (commun à toutes les pipelines comparées).
n_docs:
Nombre de documents du corpus.
per_pipeline:
Map ``{pipeline_name: PipelineBenchmarkResult}``. L'ordre
d'insertion suit l'ordre des ``specs`` passées à
``compare_pipelines`` ; on s'appuie sur le ``dict`` ordonné
de Python 3.7+.
total_duration_seconds:
Durée totale de la comparaison (sommes des durées par
pipeline + petit overhead).
"""
corpus_name: str
n_docs: int = 0
per_pipeline: dict[str, PipelineBenchmarkResult] = field(
default_factory=dict,
)
total_duration_seconds: float = 0.0
def pipeline_names(self) -> list[str]:
"""Retourne la liste des noms de pipelines dans leur ordre
d'insertion (= ordre de la comparaison initiale)."""
return list(self.per_pipeline.keys())
def _final_metric_value(
self,
pipeline_name: str,
artifact_type: ArtifactType,
metric_name: str,
) -> Optional[float]:
"""Retourne le ``mean`` de la métrique demandée à la
**dernière étape** de la pipeline qui a produit
``artifact_type`` (avec succès sur ≥ 1 doc), ou ``None``
si la métrique n'est pas disponible.
Cohérent avec ``PipelineResult.junction_metrics_for`` du
Sprint 63 mais au niveau corpus-wide.
"""
bench = self.per_pipeline.get(pipeline_name)
if bench is None:
return None
from picarones.domain.artifacts import LEGACY_VALUE_ALIASES
legacy_alias = LEGACY_VALUE_ALIASES.get(artifact_type.value)
for agg in reversed(bench.per_step_aggregates):
type_metrics = agg.junction_metrics.get(artifact_type.value)
if not type_metrics and legacy_alias is not None:
# Phase 4-bis : un caller (typiquement les tests
# ou un agrégateur tiers) peut avoir construit le
# dict avec la clé legacy ``"text"`` au lieu de la
# canonique ``"raw_text"``. expand_legacy_keys
# synchronise les deux côtés sur les sites
# d'écriture du runner — ce fallback couvre le
# reste.
type_metrics = agg.junction_metrics.get(legacy_alias)
if not type_metrics:
continue
stats = type_metrics.get(metric_name)
if stats is None:
continue
return stats["mean"]
return None
def ranking_by_final_metric(
self,
artifact_type: ArtifactType,
metric_name: str,
higher_is_better: bool = False,
) -> list[tuple[str, Optional[float]]]:
"""Classe les pipelines par la valeur **finale** de
``metric_name`` à la jonction ``artifact_type``.
Returns
-------
list[tuple[str, Optional[float]]]
Liste ``[(pipeline_name, mean_value)]`` triée :
- Les pipelines avec une valeur définie viennent en
premier, triées selon ``higher_is_better``.
- Les pipelines sans valeur (métrique absente) viennent
en queue, dans leur ordre d'insertion.
"""
with_value: list[tuple[str, float]] = []
without_value: list[tuple[str, Optional[float]]] = []
for name in self.pipeline_names():
value = self._final_metric_value(name, artifact_type, metric_name)
if value is None:
without_value.append((name, None))
else:
with_value.append((name, value))
with_value.sort(
key=lambda pair: pair[1],
reverse=higher_is_better,
)
return [*with_value, *without_value]
def gain_table(
self,
artifact_type: ArtifactType,
metric_name: str,
baseline_pipeline: str,
) -> dict[str, dict[str, Optional[float]]]:
"""Calcule l'écart de chaque pipeline vs la baseline.
Returns
-------
dict
Map ``{pipeline_name: {"value", "absolute", "relative"}}``
où :
- ``value`` : valeur finale de la métrique pour cette
pipeline (``None`` si absente).
- ``absolute`` : ``value - baseline_value``
(``None`` si l'une des deux est absente).
- ``relative`` : ``(value - baseline_value) /
baseline_value`` (``None`` si baseline absente ou
égale à 0).
La baseline elle-même apparaît avec ``absolute == 0`` et
``relative == 0``.
"""
if baseline_pipeline not in self.per_pipeline:
raise KeyError(
f"baseline {baseline_pipeline!r} absente de la comparaison",
)
baseline_value = self._final_metric_value(
baseline_pipeline, artifact_type, metric_name,
)
out: dict[str, dict[str, Optional[float]]] = {}
for name in self.pipeline_names():
value = self._final_metric_value(
name, artifact_type, metric_name,
)
absolute: Optional[float]
relative: Optional[float]
if value is None or baseline_value is None:
absolute = None
relative = None
else:
absolute = value - baseline_value
relative = (
(value - baseline_value) / baseline_value
if baseline_value != 0 else None
)
out[name] = {
"value": value,
"absolute": absolute,
"relative": relative,
}
return out
# ──────────────────────────────────────────────────────────────────────────
# Orchestrateur
# ──────────────────────────────────────────────────────────────────────────
def compare_pipelines(
specs: list[PipelineSpec],
corpus: Corpus,
factories: Optional[dict[str, InitialInputsFactory]] = None,
) -> PipelineComparisonResult:
"""Exécute N ``PipelineSpec`` sur le **même** ``corpus``.
Parameters
----------
specs:
Liste de ``PipelineSpec``. Les noms de pipelines doivent
être uniques (sinon ``ValueError``).
corpus:
Corpus partagé entre toutes les pipelines comparées —
c'est le point fort du sprint : même corpus, même GT, on
peut comparer apple-to-apple.
factories:
Optionnel. Si fourni, dict ``{pipeline_name:
InitialInputsFactory}`` pour personnaliser les entrées
initiales par pipeline. Les pipelines absentes du dict
utilisent ``default_initial_inputs`` (cas standard
``IMAGE`` depuis ``Document.image_path``).
Returns
-------
PipelineComparisonResult
Conteneur avec ``per_pipeline`` indexé par nom et
utilitaires comparatifs (``ranking_by_final_metric``,
``gain_table``).
Raises
------
ValueError
Si deux ``PipelineSpec`` ont le même nom (impossible alors
de les distinguer dans le résultat).
"""
names = [s.name for s in specs]
if len(set(names)) != len(names):
seen: set[str] = set()
duplicates: list[str] = []
for n in names:
if n in seen:
duplicates.append(n)
seen.add(n)
raise ValueError(
f"noms de pipelines non uniques : {sorted(set(duplicates))}",
)
factories = factories or {}
result = PipelineComparisonResult(
corpus_name=corpus.name,
n_docs=len(list(corpus.documents)),
)
t0 = time.monotonic()
for spec in specs:
factory = factories.get(spec.name, default_initial_inputs)
bench = run_pipeline_benchmark(spec, corpus, factory)
result.per_pipeline[spec.name] = bench
result.total_duration_seconds = time.monotonic() - t0
return result
__all__ = [
"PipelineComparisonResult",
"compare_pipelines",
]
|