"""Tests Sprint 63 — banc d'essai de pipelines composées (axe B). Couvre : 1. ``PipelineSpec.validate`` : pipeline vide, types qui s'enchaînent, manque d'entrée à une étape. 2. ``PipelineRunner.run`` : - 1 étape OCR mock + GT TEXT → métriques calculées à la jonction - 2 étapes OCR + rewriter LLM mock → 2 jonctions évaluées - Module qui lève → propagation gracieuse, étapes suivantes reçoivent une erreur explicite d'entrée manquante - Sortie déclarée mais non produite → erreur explicite - Aucune GT au type produit → pas de métriques (pas d'erreur) - Mesure du temps par étape > 0 3. Cas d'usage réaliste : OCR fautif + rewriter qui corrige → la métrique CER baisse à la jonction post-rewrite. 4. ``PipelineResult.junction_metrics_for`` retourne les métriques de la dernière étape ayant produit le type, ignorant les étapes qui ont échoué. 5. **Test philosophie** : Picarones ne fournit pas de modules métier — tous les modules utilisés ici sont des **mocks définis dans le test**, pas dans le code de production. """ from __future__ import annotations from typing import Any from picarones.core.corpus import Document, GTLevel, TextGT from picarones.core.modules import ArtifactType, BaseModule from picarones.core.pipeline import ( PipelineResult, PipelineRunner, PipelineSpec, PipelineStep, StepResult, ) # ────────────────────────────────────────────────────────────────────────── # Mocks — uniquement à but de test, jamais en production # ────────────────────────────────────────────────────────────────────────── class MockOCR(BaseModule): """Mock d'un OCR : produit un texte fixe à partir d'une image.""" input_types = (ArtifactType.IMAGE,) output_types = (ArtifactType.TEXT,) execution_mode: Any = "io" def __init__(self, fixed_output: str) -> None: self._out = fixed_output @property def name(self) -> str: return "mock-ocr" def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]: return {ArtifactType.TEXT: self._out} class MockTextRewriter(BaseModule): """Mock d'un correcteur LLM TEXT→TEXT.""" input_types = (ArtifactType.TEXT,) output_types = (ArtifactType.TEXT,) execution_mode: Any = "cpu" def __init__(self, transform) -> None: self._transform = transform @property def name(self) -> str: return "mock-rewriter" def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]: return {ArtifactType.TEXT: self._transform(inputs[ArtifactType.TEXT])} class MockCrasher(BaseModule): """Mock d'un module qui lève à chaque appel.""" input_types = (ArtifactType.TEXT,) output_types = (ArtifactType.TEXT,) execution_mode: Any = "cpu" @property def name(self) -> str: return "mock-crasher" def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]: raise RuntimeError("module en panne") class MockSilentDropper(BaseModule): """Mock d'un module qui déclare produire TEXT mais ne le produit pas.""" input_types = (ArtifactType.TEXT,) output_types = (ArtifactType.TEXT,) execution_mode: Any = "cpu" @property def name(self) -> str: return "mock-silent-dropper" def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]: return {} def _make_doc( text: str = "hello world", with_gt: bool = True, ) -> Document: gts: dict[GTLevel, Any] = {} if with_gt: gts[GTLevel.TEXT] = TextGT(text=text) return Document( image_path="/tmp/x.png", ground_truth=text if with_gt else "", doc_id="d1", ground_truths=gts, ) # ────────────────────────────────────────────────────────────────────────── # 1. PipelineSpec.validate # ────────────────────────────────────────────────────────────────────────── class TestSpecValidate: def test_empty_pipeline_invalid(self) -> None: spec = PipelineSpec(name="empty") problems = spec.validate(initial_inputs=(ArtifactType.IMAGE,)) assert problems assert "vide" in problems[0] def test_single_step_with_image_input_valid(self) -> None: spec = PipelineSpec( name="ocr", steps=[PipelineStep("ocr", MockOCR("x"))], ) assert spec.is_valid((ArtifactType.IMAGE,)) def test_chained_steps_valid(self) -> None: spec = PipelineSpec( name="ocr_then_rewrite", steps=[ PipelineStep("ocr", MockOCR("x")), PipelineStep("rewrite", MockTextRewriter(lambda t: t)), ], ) assert spec.is_valid((ArtifactType.IMAGE,)) def test_missing_input_invalid(self) -> None: # Rewriter demande TEXT mais aucun OCR n'a été placé avant spec = PipelineSpec( name="rewrite_only", steps=[PipelineStep("rewrite", MockTextRewriter(lambda t: t))], ) problems = spec.validate(initial_inputs=(ArtifactType.IMAGE,)) assert problems assert "rewrite" in problems[0] assert "text" in problems[0] # ────────────────────────────────────────────────────────────────────────── # 2. PipelineRunner.run — chemins nominaux # ────────────────────────────────────────────────────────────────────────── class TestRunSingleStep: def test_one_step_with_text_gt(self) -> None: doc = _make_doc("hello world") spec = PipelineSpec( name="ocr", steps=[PipelineStep("ocr", MockOCR("hello world"))], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) assert result.succeeded assert len(result.steps) == 1 step = result.steps[0] assert step.error is None assert step.duration_seconds >= 0.0 # Métrique CER à 0 (hyp == GT) assert step.junction_metrics["text"]["cer"] == 0.0 def test_one_step_imperfect_ocr(self) -> None: doc = _make_doc("hello world") spec = PipelineSpec( name="ocr", steps=[PipelineStep("ocr", MockOCR("hellp wrld"))], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) cer = result.steps[0].junction_metrics["text"]["cer"] assert 0.0 < cer < 1.0 class TestRunChained: def test_two_steps_evaluation_at_each_junction(self) -> None: doc = _make_doc("hello world") # OCR fautif + rewriter qui corrige spec = PipelineSpec( name="ocr_then_rewrite", steps=[ PipelineStep("ocr", MockOCR("hello wrold")), PipelineStep( "rewrite", MockTextRewriter(lambda t: t.replace("wrold", "world")), ), ], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) assert result.succeeded assert len(result.steps) == 2 cer_after_ocr = result.steps[0].junction_metrics["text"]["cer"] cer_after_rewrite = result.steps[1].junction_metrics["text"]["cer"] # Le CER baisse après le rewriter assert cer_after_rewrite < cer_after_ocr assert cer_after_rewrite == 0.0 def test_junction_metrics_for_returns_last(self) -> None: doc = _make_doc("hello world") spec = PipelineSpec( name="ocr_then_rewrite", steps=[ PipelineStep("ocr", MockOCR("hello wrold")), PipelineStep( "rewrite", MockTextRewriter(lambda t: t.replace("wrold", "world")), ), ], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) final = result.junction_metrics_for(ArtifactType.TEXT) assert final is not None assert final["cer"] == 0.0 # ────────────────────────────────────────────────────────────────────────── # 3. Erreurs gracieuses # ────────────────────────────────────────────────────────────────────────── class TestGracefulErrors: def test_module_raises_captured(self) -> None: doc = _make_doc("hello world") spec = PipelineSpec( name="crash", steps=[ PipelineStep("ocr", MockOCR("hello world")), PipelineStep("crash", MockCrasher()), ], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) assert not result.succeeded assert result.steps[1].error is not None assert "RuntimeError" in result.steps[1].error assert "panne" in result.steps[1].error # L'étape précédente reste OK assert result.steps[0].error is None assert result.failing_steps == ["crash"] def test_silent_dropper_reported_as_missing_output(self) -> None: doc = _make_doc("hello world") spec = PipelineSpec( name="dropper", steps=[ PipelineStep("ocr", MockOCR("hello world")), PipelineStep("drop", MockSilentDropper()), ], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) # L'étape drop signale une sortie manquante assert result.steps[1].error is not None assert "sortie manquante" in result.steps[1].error def test_invalid_spec_marked_as_error(self) -> None: doc = _make_doc() # Pipeline qui demande TEXT mais on ne fournit que IMAGE # et aucun OCR ne précède spec = PipelineSpec( name="bad", steps=[PipelineStep("rewrite", MockTextRewriter(lambda t: t))], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) assert result.error is not None assert "text" in result.error # Aucune étape n'a été exécutée assert result.steps == [] # ────────────────────────────────────────────────────────────────────────── # 4. Pas de GT → pas de métriques mais pas d'erreur # ────────────────────────────────────────────────────────────────────────── class TestNoGroundTruth: def test_no_gt_no_metrics_no_error(self) -> None: doc = _make_doc(with_gt=False) spec = PipelineSpec( name="ocr", steps=[PipelineStep("ocr", MockOCR("anything"))], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) # Pas d'erreur — la pipeline a tourné, simplement aucune # métrique calculable # (Document __post_init__ crée TextGT depuis ground_truth="" # donc une GT vide existe ; la métrique CER vaudra alors 1.0 # ce qui est un autre test ; pour ce test on retire la GT.) # On accepte donc soit absence soit présence du dict junction_metrics ; # le point clé est que ça ne plante pas. assert result.steps[0].error is None assert result.succeeded # ────────────────────────────────────────────────────────────────────────── # 5. Temps par étape # ────────────────────────────────────────────────────────────────────────── class TestTiming: def test_step_duration_recorded(self) -> None: doc = _make_doc() spec = PipelineSpec( name="ocr", steps=[PipelineStep("ocr", MockOCR("hello"))], ) result = PipelineRunner.run( spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, ) assert result.steps[0].duration_seconds >= 0.0 assert result.total_duration_seconds >= result.steps[0].duration_seconds # ────────────────────────────────────────────────────────────────────────── # 6. Dataclasses (StepResult / PipelineResult) # ────────────────────────────────────────────────────────────────────────── class TestDataclasses: def test_step_result_default(self) -> None: sr = StepResult( step_name="x", duration_seconds=0.1, output_types=(), ) assert sr.junction_metrics == {} assert sr.error is None def test_pipeline_result_succeeded_false_on_step_error(self) -> None: pr = PipelineResult( pipeline_name="p", doc_id="d", steps=[ StepResult(step_name="a", duration_seconds=0.1, output_types=(ArtifactType.TEXT,)), StepResult(step_name="b", duration_seconds=0.1, output_types=(), error="boom"), ], ) assert not pr.succeeded assert pr.failing_steps == ["b"] def test_junction_metrics_for_skips_failed_steps(self) -> None: # Étape 1 a échoué, étape 0 a produit TEXT avec une métrique pr = PipelineResult( pipeline_name="p", doc_id="d", steps=[ StepResult( step_name="ocr", duration_seconds=0.1, output_types=(ArtifactType.TEXT,), junction_metrics={"text": {"cer": 0.1}}, ), StepResult( step_name="rewrite", duration_seconds=0.1, output_types=(), error="boom", ), ], ) # On doit retomber sur l'étape OCR (la dernière qui a réussi # pour TEXT) assert pr.junction_metrics_for(ArtifactType.TEXT) == {"cer": 0.1}