Spaces:
Running
Running
| """Tests Sprint 64 β orchestration corpus-wide d'une pipeline composΓ©e. | |
| Couvre : | |
| 1. ``default_initial_inputs`` : ``IMAGE`` extrait de | |
| ``Document.image_path`` ; absence d'image β dict vide. | |
| 2. ``run_pipeline_benchmark`` : | |
| - corpus vide β rΓ©sultat avec ``n_docs == 0`` | |
| - 1 doc, pipeline OK β succeeded == 1, agrΓ©gat cohΓ©rent | |
| - N docs, mix succΓ¨s/Γ©checs β comptes corrects, failing_doc_ids | |
| listΓ©s | |
| - MΓ©triques agrΓ©gΓ©es (mean/median CER) sur N docs | |
| - Factory personnalisΓ©e respectΓ©e | |
| - Factory qui lΓ¨ve sur un doc β erreur capturΓ©e, autres docs | |
| continuent | |
| - Spec invalide β tous les docs Γ©chouent en amont | |
| 3. ``StepAggregate`` : | |
| - success_rate | |
| - error_breakdown (missing_input, raised_exception, | |
| missing_output, other) | |
| - junction_metrics agrΓ©gΓ©s correctement par type d'artefact | |
| 4. ``PipelineBenchmarkResult.aggregate_for_step`` retourne le bon | |
| agrΓ©gat ou ``None``. | |
| 5. Philosophie : tous les modules utilisΓ©s sont des **mocks** | |
| dΓ©finis dans le test β Picarones n'expose aucun module mΓ©tier. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| from picarones.core.corpus import Corpus, Document, GTLevel, TextGT | |
| from picarones.core.modules import ArtifactType, BaseModule | |
| from picarones.measurements.pipeline_benchmark import ( | |
| PipelineBenchmarkResult, | |
| StepAggregate, | |
| default_initial_inputs, | |
| run_pipeline_benchmark, | |
| ) | |
| from picarones.core.pipeline import PipelineSpec, PipelineStep | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Mocks (inchangΓ©s vs Sprint 63 β uniquement Γ but de test) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MockOCR(BaseModule): | |
| input_types = (ArtifactType.IMAGE,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "io" | |
| def __init__(self, fn) -> None: | |
| self._fn = fn | |
| def name(self) -> str: | |
| return "mock-ocr" | |
| def process(self, inputs): | |
| return {ArtifactType.TEXT: self._fn(inputs[ArtifactType.IMAGE])} | |
| class MockCrasherSometimes(BaseModule): | |
| """OCR qui lève sur les documents dont le path contient un | |
| marqueur, et qui produit du texte sinon.""" | |
| input_types = (ArtifactType.IMAGE,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "io" | |
| def __init__(self, crash_on: str) -> None: | |
| self._crash_on = crash_on | |
| def name(self) -> str: | |
| return "mock-flaky" | |
| def process(self, inputs): | |
| path = inputs[ArtifactType.IMAGE] | |
| if self._crash_on in path: | |
| raise RuntimeError(f"refusΓ© : {path}") | |
| return {ArtifactType.TEXT: "ok"} | |
| class MockTextRewriter(BaseModule): | |
| input_types = (ArtifactType.TEXT,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "cpu" | |
| def name(self) -> str: | |
| return "mock-rewriter" | |
| def process(self, inputs): | |
| return {ArtifactType.TEXT: inputs[ArtifactType.TEXT].upper()} | |
| def _make_corpus(n: int = 3, name: str = "demo") -> Corpus: | |
| docs = [] | |
| for i in range(n): | |
| gt = f"texte {i}" | |
| docs.append(Document( | |
| image_path=f"/tmp/d{i}.png", | |
| ground_truth=gt, | |
| doc_id=f"d{i}", | |
| ground_truths={GTLevel.TEXT: TextGT(text=gt)}, | |
| )) | |
| return Corpus(name=name, documents=docs) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. default_initial_inputs | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestDefaultInitialInputs: | |
| def test_returns_image_path(self) -> None: | |
| doc = Document( | |
| image_path="/tmp/x.png", ground_truth="t", doc_id="d", | |
| ) | |
| assert default_initial_inputs(doc) == { | |
| ArtifactType.IMAGE: "/tmp/x.png", | |
| } | |
| def test_empty_when_no_image_path(self) -> None: | |
| doc = Document(image_path="", ground_truth="t", doc_id="d") | |
| assert default_initial_inputs(doc) == {} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. run_pipeline_benchmark β chemins nominaux | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestRunBenchmarkBasic: | |
| def test_empty_corpus(self) -> None: | |
| corpus = Corpus(name="empty", documents=[]) | |
| spec = PipelineSpec( | |
| name="ocr", | |
| steps=[PipelineStep("ocr", MockOCR(lambda p: "x"))], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus) | |
| assert result.n_docs == 0 | |
| assert result.per_doc_results == [] | |
| # L'agrΓ©gation existe toujours (1 entrΓ©e par Γ©tape) | |
| assert len(result.per_step_aggregates) == 1 | |
| assert result.per_step_aggregates[0].n_docs == 0 | |
| assert result.per_step_aggregates[0].n_succeeded == 0 | |
| def test_single_doc_success(self) -> None: | |
| corpus = _make_corpus(1) | |
| spec = PipelineSpec( | |
| name="ocr", | |
| steps=[PipelineStep("ocr", MockOCR(lambda p: "texte 0"))], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus) | |
| assert result.n_docs == 1 | |
| assert result.n_pipelines_succeeded == 1 | |
| assert result.n_pipelines_failed == 0 | |
| agg = result.aggregate_for_step("ocr") | |
| assert agg.n_succeeded == 1 | |
| assert agg.success_rate == 1.0 | |
| def test_metrics_aggregation(self) -> None: | |
| corpus = _make_corpus(3) | |
| # OCR parfait : produit le ground_truth depuis le path | |
| # (path == /tmp/d0.png β "texte 0", etc.) | |
| def ocr_perfect(path: str) -> str: | |
| idx = path.replace("/tmp/d", "").replace(".png", "") | |
| return f"texte {idx}" | |
| spec = PipelineSpec( | |
| name="ocr", | |
| steps=[PipelineStep("ocr", MockOCR(ocr_perfect))], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus) | |
| agg = result.aggregate_for_step("ocr") | |
| cer_stats = agg.junction_metrics["text"]["cer"] | |
| assert cer_stats["mean"] == 0.0 | |
| assert cer_stats["median"] == 0.0 | |
| assert cer_stats["n"] == 3 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. Mix succès / échecs | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestMixedResults: | |
| def test_some_docs_fail(self) -> None: | |
| corpus = _make_corpus(4) | |
| # Crash sur les docs dont le path contient "d2" | |
| spec = PipelineSpec( | |
| name="flaky", | |
| steps=[PipelineStep("ocr", MockCrasherSometimes("d2"))], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus) | |
| assert result.n_pipelines_succeeded == 3 | |
| assert result.n_pipelines_failed == 1 | |
| agg = result.aggregate_for_step("ocr") | |
| assert agg.n_failed == 1 | |
| assert "d2" in agg.failing_doc_ids | |
| # Type d'erreur catΓ©gorisΓ© | |
| assert agg.error_breakdown.get("raised_exception", 0) == 1 | |
| def test_two_steps_second_fails_for_subset(self) -> None: | |
| """Γtape 1 rΓ©ussit toujours, Γ©tape 2 plante pour 1 doc. | |
| Le rewriter est OK ici, on simule un Γ©chec en faisant lever | |
| l'OCR sur d1 puis on laisse rewriter produire Γ partir des | |
| OCR qui ont survΓ©cu. | |
| """ | |
| corpus = _make_corpus(3) | |
| spec = PipelineSpec( | |
| name="ocr_then_rewrite", | |
| steps=[ | |
| PipelineStep("ocr", MockCrasherSometimes("d1")), | |
| PipelineStep("rewrite", MockTextRewriter()), | |
| ], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus) | |
| # OCR : 2 OK, 1 KO | |
| ocr_agg = result.aggregate_for_step("ocr") | |
| assert ocr_agg.n_succeeded == 2 | |
| assert ocr_agg.n_failed == 1 | |
| # Rewriter : 2 OK (ceux dont l'OCR a rΓ©ussi), 1 Β« entrΓ©e | |
| # manquante Β» (pour d1 dont l'OCR a plantΓ©) | |
| rw_agg = result.aggregate_for_step("rewrite") | |
| assert rw_agg.n_succeeded == 2 | |
| assert rw_agg.n_failed == 1 | |
| assert rw_agg.error_breakdown.get("missing_input", 0) == 1 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. Spec invalide β tous les docs Γ©chouent en amont | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestInvalidSpec: | |
| def test_invalid_spec_propagates_to_all_docs(self) -> None: | |
| corpus = _make_corpus(2) | |
| # Pipeline qui demande TEXT mais on ne fournit que IMAGE | |
| # et aucun OCR ne précède | |
| spec = PipelineSpec( | |
| name="bad", | |
| steps=[PipelineStep("rewrite", MockTextRewriter())], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus) | |
| assert all(pr.error is not None for pr in result.per_doc_results) | |
| # Aucune Γ©tape n'a Γ©tΓ© exΓ©cutΓ©e β tous les docs ont 0 step | |
| for pr in result.per_doc_results: | |
| assert pr.steps == [] | |
| # L'agrégat de l'étape "rewrite" est entièrement en | |
| # pipeline_aborted | |
| agg = result.aggregate_for_step("rewrite") | |
| assert agg.n_docs == 2 | |
| assert agg.n_failed == 2 | |
| assert agg.error_breakdown.get("pipeline_aborted", 0) == 2 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5. Factory personnalisΓ©e | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestCustomFactory: | |
| def test_custom_factory_used(self) -> None: | |
| corpus = _make_corpus(2) | |
| # Factory qui injecte directement du TEXT β pas besoin d'OCR | |
| def factory(doc): | |
| return {ArtifactType.TEXT: doc.ground_truth} | |
| spec = PipelineSpec( | |
| name="rewrite_only", | |
| steps=[PipelineStep("rewrite", MockTextRewriter())], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus, factory) | |
| assert result.n_pipelines_succeeded == 2 | |
| agg = result.aggregate_for_step("rewrite") | |
| assert agg.n_succeeded == 2 | |
| def test_factory_raises_for_one_doc(self) -> None: | |
| corpus = _make_corpus(3) | |
| def flaky_factory(doc): | |
| if doc.doc_id == "d1": | |
| raise ValueError("factory cassΓ©e pour d1") | |
| return {ArtifactType.IMAGE: doc.image_path} | |
| spec = PipelineSpec( | |
| name="ocr", | |
| steps=[PipelineStep("ocr", MockOCR(lambda p: "ok"))], | |
| ) | |
| result = run_pipeline_benchmark(spec, corpus, flaky_factory) | |
| # 2 OK, 1 KO en amont | |
| assert result.n_pipelines_succeeded == 2 | |
| assert result.n_pipelines_failed == 1 | |
| # Le doc d1 a un PipelineResult avec erreur factory et | |
| # aucune Γ©tape exΓ©cutΓ©e | |
| d1 = next(pr for pr in result.per_doc_results if pr.doc_id == "d1") | |
| assert "ValueError" in d1.error | |
| assert d1.steps == [] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 6. Dataclasses | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestDataclasses: | |
| def test_step_aggregate_default(self) -> None: | |
| agg = StepAggregate(step_name="x") | |
| assert agg.success_rate == 0.0 | |
| assert agg.junction_metrics == {} | |
| assert agg.error_breakdown == {} | |
| def test_step_aggregate_success_rate(self) -> None: | |
| agg = StepAggregate( | |
| step_name="x", n_docs=10, n_succeeded=7, n_failed=3, | |
| ) | |
| assert agg.success_rate == 0.7 | |
| def test_pipeline_benchmark_result_aggregate_for_step(self) -> None: | |
| result = PipelineBenchmarkResult( | |
| pipeline_name="p", corpus_name="c", | |
| per_step_aggregates=[ | |
| StepAggregate(step_name="a", n_docs=1), | |
| StepAggregate(step_name="b", n_docs=2), | |
| ], | |
| ) | |
| assert result.aggregate_for_step("a").n_docs == 1 | |
| assert result.aggregate_for_step("b").n_docs == 2 | |
| assert result.aggregate_for_step("c") is None | |