Picarones / tests /integration /test_sprint_a14_s17_full_run.py
Claude
feat(evaluation): Sprint A14-S27 — découpage ProjectionEngine + EvaluationEngine
2e9e564 unverified
raw
history blame
21.4 kB
"""Sprint A14-S17 — run complet avec persistance JSONL.
Définition de done : un benchmark produit un dossier ``result/``
lisible humainement où on voit :
- ``run_manifest.json`` — métadonnées (run_id, corpus, pipelines,
vues, code_version, timestamps).
- ``pipeline_results.jsonl`` — un PipelineResult par ligne avec
document_id.
- ``view_results.jsonl`` — un ViewResult par ligne avec
document_id.
Le test exécute :
- 2 pipelines mock (un OCR pur RAW_TEXT, un OCR+ALTO).
- 3 documents synthétiques.
- 2 vues canoniques (TextView + AltoView — SearchView est testée
séparément en S16).
- Persistance dans tmp_path.
- Vérification des fichiers produits + structure du RunResult.
Setup disque
------------
Le ``AltoToText`` projecteur (S9) lit son XML depuis l'``Artifact.uri``
filesystem. La fixture écrit donc des fichiers ALTO XML réels sur
disque sous ``tmp_path/alto_files/`` et les stubs OCR pointent leurs
artefacts ALTO vers ces fichiers via leur URI. Cela reproduit
l'usage production où un moteur écrit son XML dans un workspace
sandboxé (S19).
"""
from __future__ import annotations
import json
from pathlib import Path
from picarones.app.services import BenchmarkService
from picarones.domain import (
Artifact,
ArtifactType,
CorpusSpec,
DocumentRef,
GroundTruthRef,
MetricSpec,
)
from picarones.evaluation.metrics.alto_structural import (
compute_alto_validity,
compute_line_count_ratio,
compute_word_box_coverage,
)
from picarones.evaluation.projectors import (
AltoToText,
CanonicalToText,
PageToText,
ProjectorRegistry,
)
from picarones.evaluation.registry import MetricRegistry
from picarones.evaluation.views import (
DefaultEvaluationViewExecutor,
build_alto_view,
build_text_view,
)
from picarones.formats.alto.types import (
AltoBBox,
AltoDocument,
AltoLine,
AltoPage,
AltoString,
AltoTextBlock,
)
from picarones.formats.alto.writer import write_alto
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
# ──────────────────────────────────────────────────────────────────
# Fixtures de données
# ──────────────────────────────────────────────────────────────────
_GT_TEXTS = {
"doc01": "Bonjour le monde",
"doc02": "Test multi documents",
"doc03": "Troisième fixture",
}
def _build_alto(text: str) -> AltoDocument:
"""Produit un AltoDocument 1 page / 1 bloc / 1 ligne avec bbox
sur chaque mot."""
return AltoDocument(pages=(AltoPage(blocks=(AltoTextBlock(lines=(AltoLine(strings=tuple(
AltoString(content=w, bbox=AltoBBox(hpos=0, vpos=0, width=10, height=10))
for w in text.split()
)),),),),),),)
# ──────────────────────────────────────────────────────────────────
# Adapters / pipelines mock
# ──────────────────────────────────────────────────────────────────
class _TextOCRStub:
"""OCR mock qui produit RAW_TEXT déterministe."""
name = "text_ocr"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def execute(self, inputs, params, context):
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:text_ocr:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
produced_by_step="ocr",
),
}
class _AltoOCRStub:
"""OCR mock qui produit ALTO_XML + RAW_TEXT déterministes.
Les fichiers ALTO sont supposés déjà présents sur disque dans
``alto_files_dir`` (écrits par la fixture). L'artefact ALTO
pointe sa ``uri`` vers ce fichier — pour reproduire la chaîne
de production où un moteur ALTO écrit son XML dans un workspace
et l'expose via URI.
"""
name = "alto_ocr"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.ALTO_XML, ArtifactType.RAW_TEXT})
execution_mode = "io"
def __init__(self, alto_files_dir: Path) -> None:
self._alto_files_dir = Path(alto_files_dir)
def execute(self, inputs, params, context):
alto_path = self._alto_files_dir / f"{context.document_id}.cand.alto.xml"
return {
ArtifactType.ALTO_XML: Artifact(
id=f"{context.document_id}:alto_ocr:alto",
document_id=context.document_id,
type=ArtifactType.ALTO_XML,
produced_by_step="ocr",
uri=str(alto_path),
),
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:alto_ocr:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
produced_by_step="ocr",
),
}
# ──────────────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────────────
def _stub_cer(reference: str, hypothesis: str) -> float:
if not reference:
return 0.0 if not hypothesis else 1.0
common = sum(1 for a, b in zip(reference, hypothesis) if a == b)
return 1.0 - (common / max(len(reference), len(hypothesis)))
def _stub_wer(reference: str, hypothesis: str) -> float:
rw = reference.split()
hw = hypothesis.split()
if not rw:
return 0.0 if not hw else 1.0
common = sum(1 for a, b in zip(rw, hw) if a == b)
return 1.0 - (common / len(rw))
def _write_alto_files(tmp_path: Path) -> tuple[Path, dict[str, Path], dict[str, Path]]:
"""Écrit GT et candidate ALTO XML sur disque pour chaque doc.
Returns
-------
(alto_dir, gt_paths_by_doc, cand_paths_by_doc)
"""
alto_dir = tmp_path / "alto_files"
alto_dir.mkdir(parents=True, exist_ok=True)
gt_paths: dict[str, Path] = {}
cand_paths: dict[str, Path] = {}
for doc_id, text in _GT_TEXTS.items():
gt_doc = _build_alto(text)
cand_doc = _build_alto(text) # Texte parfait → ALTO identique.
gt_path = alto_dir / f"{doc_id}.gt.alto.xml"
cand_path = alto_dir / f"{doc_id}.cand.alto.xml"
gt_path.write_bytes(write_alto(gt_doc))
cand_path.write_bytes(write_alto(cand_doc))
gt_paths[doc_id] = gt_path
cand_paths[doc_id] = cand_path
return alto_dir, gt_paths, cand_paths
# ──────────────────────────────────────────────────────────────────
# Setup complet (param tmp_path)
# ──────────────────────────────────────────────────────────────────
def _build_service(tmp_path: Path) -> tuple[BenchmarkService, dict[str, Path]]:
"""Construit le BenchmarkService avec fixtures sur disque.
Returns
-------
(service, gt_paths_by_doc)
"""
alto_dir, gt_paths, _cand_paths = _write_alto_files(tmp_path)
# Métriques (TextView + AltoView)
metrics = MetricRegistry()
for name, fn in (
("cer", _stub_cer),
("wer", _stub_wer),
("mer", _stub_cer),
("wil", _stub_wer),
):
metrics.register(
MetricSpec(
name=name,
input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT),
),
fn,
)
for name, fn in (
("alto_validity", compute_alto_validity),
("alto_line_count_ratio", compute_line_count_ratio),
("alto_word_box_coverage", compute_word_box_coverage),
):
metrics.register(
MetricSpec(
name=name,
input_types=(ArtifactType.ALTO_XML, ArtifactType.ALTO_XML),
higher_is_better=True,
),
fn,
)
# Projecteurs
projectors = ProjectorRegistry()
projectors.register(AltoToText())
projectors.register(PageToText())
projectors.register(CanonicalToText())
# Loader hybride :
# - pour les RAW_TEXT directs (id se termine par ":raw_text") on
# retourne le texte parfait depuis _GT_TEXTS.
# - pour les artefacts projetés (id se termine par ":projected_text")
# on retourne aussi le texte parfait (la projection a déjà fait
# son travail en lisant le XML disque).
# - pour les ALTO_XML (GT ou candidat), on parse le fichier disque.
from picarones.formats.alto.parser import parse_alto
def loader(art: Artifact):
if art.type == ArtifactType.RAW_TEXT:
# GT ou candidat texte direct, ou résultat de projection.
return _GT_TEXTS[art.document_id]
if art.type == ArtifactType.ALTO_XML:
if art.uri is None:
raise KeyError(f"ALTO artefact {art.id} sans URI")
return parse_alto(Path(art.uri).read_bytes())
raise KeyError(f"loader ne sait pas charger {art.id} (type {art.type})")
view_executor = DefaultEvaluationViewExecutor.from_registries(
metrics, projectors, loader,
)
# Pipeline executor + corpus runner.
registry_adapters = {
"text_ocr": _TextOCRStub(),
"alto_ocr": _AltoOCRStub(alto_dir),
}
pipeline_executor = PipelineExecutor(
adapter_resolver=lambda n: registry_adapters[n],
)
corpus_runner = CorpusRunner(
pipeline_executor,
max_in_flight=2,
timeout_seconds_per_doc=10.0,
poll_interval_seconds=0.005,
)
service = BenchmarkService(
corpus_runner=corpus_runner,
view_executor=view_executor,
code_version="1.0.0-s17-test",
)
return service, gt_paths
# ──────────────────────────────────────────────────────────────────
# Tests
# ──────────────────────────────────────────────────────────────────
def _build_corpus_and_specs(gt_paths: dict[str, Path]):
# Note : ``image_uri`` et le ``uri`` de la GT RAW_TEXT ne sont
# jamais lus dans S17 (les payloads sont fournis in-memory par le
# loader des stubs). On les construit comme des chemins **sous le
# tmp_path partagé** pour rester portable cross-OS — sur Windows
# ``/tmp/...`` n'est pas un chemin absolu valide.
base_dir = next(iter(gt_paths.values())).parent
docs = tuple(
DocumentRef(
id=doc_id,
image_uri=str(base_dir / f"{doc_id}.png"),
ground_truths=(
GroundTruthRef(
type=ArtifactType.RAW_TEXT,
uri=str(base_dir / f"{doc_id}.gt.txt"),
),
GroundTruthRef(
type=ArtifactType.ALTO_XML,
uri=str(gt_paths[doc_id]),
),
),
)
for doc_id in _GT_TEXTS
)
corpus = CorpusSpec(name="s17_fixture", documents=docs)
text_pipeline = PipelineSpec(
name="text_only_pipeline",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="text_ocr",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
alto_pipeline = PipelineSpec(
name="alto_pipeline",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="alto_ocr",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.ALTO_XML, ArtifactType.RAW_TEXT),
),),
)
views = (build_text_view(), build_alto_view())
return corpus, [text_pipeline, alto_pipeline], list(views)
def _build_factories(gt_paths: dict[str, Path]):
def gt_factory(doc, art_type):
gt_ref = doc.gt_for(art_type)
if gt_ref is None:
return None
return Artifact(
id=f"{doc.id}:gt:{'raw_text' if art_type == ArtifactType.RAW_TEXT else 'alto'}",
document_id=doc.id,
type=art_type,
uri=gt_ref.uri,
)
def inputs_factory(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image", document_id=doc.id,
type=ArtifactType.IMAGE, uri=doc.image_uri,
)}
def ctx_factory(doc, pipeline_name):
return RunContext(
document_id=doc.id,
code_version="1.0.0-s17-test",
pipeline_name=pipeline_name,
)
return gt_factory, inputs_factory, ctx_factory
class TestFullRun:
def test_run_produces_pipeline_results_for_each_doc(self, tmp_path: Path) -> None:
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
assert result.n_documents == 3
for doc_result in result.document_results:
assert len(doc_result.pipeline_results) == 2
pipeline_names = {pr.pipeline_name for pr in doc_result.pipeline_results}
assert pipeline_names == {"text_only_pipeline", "alto_pipeline"}
def test_omission_pattern_textview_includes_both_pipelines(self, tmp_path: Path) -> None:
"""TextView accepte RAW_TEXT et ALTO_XML → les 2 pipelines
sont éligibles."""
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
text_view_results = result.view_results_for("text_final")
# text_only_pipeline produit RAW_TEXT (1 éligible).
# alto_pipeline produit RAW_TEXT + ALTO_XML (2 éligibles).
# Total : 3 docs × (1 + 2) = 9 ViewResult.
assert len(text_view_results) == 9
for vr in text_view_results:
assert vr.view_name == "text_final"
def test_omission_pattern_altoview_omits_text_only_pipeline(self, tmp_path: Path) -> None:
"""AltoView n'accepte qu'ALTO_XML → text_only_pipeline OMIS."""
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
alto_view_results = result.view_results_for("alto_documentary")
# 3 docs × 1 pipeline (alto_pipeline) × 1 artefact ALTO = 3 results.
assert len(alto_view_results) == 3
for vr in alto_view_results:
assert "alto_ocr" in vr.candidate_artifact_id
def test_view_results_have_metric_values(self, tmp_path: Path) -> None:
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
for vr in result.view_results_for("text_final"):
# CER stub doit être 0 (texte parfait dans la fixture).
assert vr.metric_values.get("cer") == 0.0
assert vr.failed_metrics == {}
class TestPersistence:
def test_persist_writes_three_files(self, tmp_path: Path) -> None:
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
out_dir = tmp_path / "run_output"
files = service.persist(result, out_dir)
assert files["manifest"].exists()
assert files["pipeline_results"].exists()
assert files["view_results"].exists()
def test_persisted_manifest_is_valid_json(self, tmp_path: Path) -> None:
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
out_dir = tmp_path / "run_output"
files = service.persist(result, out_dir)
manifest_data = json.loads(files["manifest"].read_text())
assert manifest_data["corpus_name"] == "s17_fixture"
assert manifest_data["n_documents"] == 3
assert manifest_data["code_version"] == "1.0.0-s17-test"
assert "text_only_pipeline" in manifest_data["pipeline_names"]
assert "alto_pipeline" in manifest_data["pipeline_names"]
def test_persisted_jsonl_is_streamable(self, tmp_path: Path) -> None:
"""Chaque ligne de pipeline_results.jsonl et view_results.jsonl
est un JSON valide indépendamment (streaming)."""
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
files = service.persist(result, tmp_path / "out")
# pipeline_results.jsonl : 3 docs × 2 pipelines = 6 lignes.
pipeline_lines = files["pipeline_results"].read_text().strip().split("\n")
assert len(pipeline_lines) == 6
for line in pipeline_lines:
payload = json.loads(line)
assert "document_id" in payload
assert "pipeline_name" in payload
# view_results.jsonl : 9 (TextView) + 3 (AltoView) = 12 lignes.
view_lines = files["view_results"].read_text().strip().split("\n")
assert len(view_lines) == 12
for line in view_lines:
payload = json.loads(line)
assert "document_id" in payload
assert "view_name" in payload
assert "metric_values" in payload
class TestRunResultHelpers:
def test_pipeline_results_for_returns_correct_subset(self, tmp_path: Path) -> None:
service, gt_paths = _build_service(tmp_path)
corpus, pipelines, views = _build_corpus_and_specs(gt_paths)
gt_factory, inputs_factory, ctx_factory = _build_factories(gt_paths)
result = service.run(
corpus=corpus,
pipelines=pipelines,
views=views,
ground_truth_factory=gt_factory,
pipeline_inputs_factory=inputs_factory,
context_factory=ctx_factory,
)
# 3 docs × 1 pipeline (filtré sur "text_only_pipeline").
text_results = result.pipeline_results_for("text_only_pipeline")
assert len(text_results) == 3
for pr in text_results:
assert pr.pipeline_name == "text_only_pipeline"