Spaces:
Sleeping
Sleeping
Claude
test: réorganiser les 110 fichiers tests/test_*.py par cercle architectural
d109222 unverified | """Tests Sprint 66 — DAG branchant via ``inputs_from``. | |
| Couvre : | |
| 1. ``PipelineStep.inputs_from`` accepté par défaut (vide). | |
| 2. ``PipelineSpec.validate`` : | |
| - ``inputs_from`` vers une étape antérieure connue qui produit | |
| le type → OK | |
| - ``inputs_from`` vers une étape inconnue → erreur explicite | |
| - ``inputs_from`` vers une étape qui ne produit pas ce type → | |
| erreur explicite | |
| - ``inputs_from`` pour un type que le module ne consomme pas → | |
| erreur explicite | |
| - ``inputs_from = {TYPE: "__initial__"}`` valide si ce type est | |
| dans les entrées initiales | |
| 3. ``PipelineRunner.run`` : | |
| - DAG fork : 2 corrections en parallèle d'un même OCR (chacune | |
| démarre depuis OCR, pas l'une de l'autre) → métriques | |
| indépendantes | |
| - Rétrocompat : sans ``inputs_from``, comportement Sprint 63 | |
| préservé (chaîne) | |
| - ``inputs_from`` vers une étape qui a échoué → entrée | |
| manquante explicite avec marqueur ``@step`` | |
| 4. ``PipelineResult.junction_metrics_for`` retourne la dernière | |
| étape réussie ayant produit le type, indépendamment du DAG. | |
| 5. Philosophie inchangée : tous les modules sont des **mocks**. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| from picarones.core.corpus import Document, GTLevel, TextGT | |
| from picarones.core.modules import ArtifactType, BaseModule | |
| from picarones.core.pipeline import ( | |
| PipelineRunner, | |
| PipelineSpec, | |
| PipelineStep, | |
| ) | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # Mocks | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class MockOCR(BaseModule): | |
| input_types = (ArtifactType.IMAGE,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "io" | |
| def __init__(self, output: str) -> None: | |
| self._out = output | |
| def name(self) -> str: | |
| return "mock-ocr" | |
| def process(self, inputs): | |
| return {ArtifactType.TEXT: self._out} | |
| class TextFixer(BaseModule): | |
| """Rewriter qui applique un dict de remplacements.""" | |
| input_types = (ArtifactType.TEXT,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "cpu" | |
| def __init__(self, name: str, replacements: dict[str, str]) -> None: | |
| self._name = name | |
| self._replacements = replacements | |
| def name(self) -> str: | |
| return self._name | |
| def process(self, inputs): | |
| text = inputs[ArtifactType.TEXT] | |
| for src, dst in self._replacements.items(): | |
| text = text.replace(src, dst) | |
| return {ArtifactType.TEXT: text} | |
| class TextDoubler(BaseModule): | |
| """Module qui consomme TEXT et produit TEXT (concatène 2 fois).""" | |
| input_types = (ArtifactType.TEXT,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "cpu" | |
| def name(self) -> str: | |
| return "doubler" | |
| def process(self, inputs): | |
| return {ArtifactType.TEXT: inputs[ArtifactType.TEXT] * 2} | |
| class AlwaysFails(BaseModule): | |
| input_types = (ArtifactType.TEXT,) | |
| output_types = (ArtifactType.TEXT,) | |
| execution_mode: Any = "cpu" | |
| def name(self) -> str: | |
| return "fail" | |
| def process(self, inputs): | |
| raise RuntimeError("boom") | |
| def _make_doc(text: str = "hello world") -> Document: | |
| return Document( | |
| image_path="/tmp/x.png", ground_truth=text, doc_id="d1", | |
| ground_truths={GTLevel.TEXT: TextGT(text=text)}, | |
| ) | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # 1. PipelineStep.inputs_from default | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class TestStepDefaults: | |
| def test_inputs_from_default_empty(self) -> None: | |
| step = PipelineStep("ocr", MockOCR("x")) | |
| assert step.inputs_from == {} | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # 2. Validation étendue | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class TestValidateInputsFrom: | |
| def test_valid_reference_to_prior_step(self) -> None: | |
| spec = PipelineSpec( | |
| name="ok", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("x")), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {}), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| ], | |
| ) | |
| problems = spec.validate((ArtifactType.IMAGE,)) | |
| assert problems == [] | |
| def test_reference_to_initial_input(self) -> None: | |
| # Une pipeline démarrant par TEXT (factory custom) peut | |
| # référencer "__initial__" | |
| spec = PipelineSpec( | |
| name="ok", | |
| steps=[ | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {}), | |
| inputs_from={ArtifactType.TEXT: "__initial__"}, | |
| ), | |
| ], | |
| ) | |
| problems = spec.validate((ArtifactType.TEXT,)) | |
| assert problems == [] | |
| def test_reference_to_unknown_step(self) -> None: | |
| spec = PipelineSpec( | |
| name="bad", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("x")), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {}), | |
| inputs_from={ArtifactType.TEXT: "non_existing"}, | |
| ), | |
| ], | |
| ) | |
| problems = spec.validate((ArtifactType.IMAGE,)) | |
| assert any("non_existing" in p for p in problems) | |
| def test_reference_to_step_not_producing_type(self) -> None: | |
| # Un step qui produit TEXT, on référence un type ALTO qu'il | |
| # n'a pas — mais le module en aval ne consomme pas ALTO, | |
| # donc on test directement avec un type que le module | |
| # consomme bien. Pour ce test on simule en référençant | |
| # un type que le module en aval consomme mais que l'étape | |
| # source n'a pas produit. | |
| spec = PipelineSpec( | |
| name="bad", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("x")), # produit TEXT | |
| # Le step suivant consomme TEXT et inputs_from | |
| # référence l'étape "ocr" mais via un type qu'elle | |
| # ne produit pas. Pour faire ça il faut un module | |
| # qui consomme un autre type. On ne couvre pas ce | |
| # cas ici (il faudrait un mock multi-type) ; | |
| # on valide via test_reference_type_not_consumed. | |
| ], | |
| ) | |
| # Ce test est vide intentionnellement — couvert par le | |
| # suivant. | |
| assert spec.validate((ArtifactType.IMAGE,)) == [] | |
| def test_reference_type_not_consumed(self) -> None: | |
| # Le module ne consomme pas IMAGE, mais on déclare | |
| # inputs_from[IMAGE] = "ocr" — erreur. | |
| spec = PipelineSpec( | |
| name="bad", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("x")), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {}), | |
| inputs_from={ | |
| ArtifactType.IMAGE: "ocr", # IMAGE n'est pas dans input_types de TextFixer | |
| }, | |
| ), | |
| ], | |
| ) | |
| problems = spec.validate((ArtifactType.IMAGE,)) | |
| assert any("ne consomme pas" in p for p in problems) | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # 3. DAG branchant : fork explicite | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class TestForkBranch: | |
| def test_two_fixers_from_same_ocr(self) -> None: | |
| """OCR → fix_a (depuis OCR), OCR → fix_b (depuis OCR). | |
| Sans inputs_from, fix_b consommerait la sortie de fix_a | |
| (chaîne). Avec inputs_from explicite, chaque fixer part de | |
| l'OCR original. | |
| """ | |
| doc = _make_doc("hello world") | |
| # OCR produit du texte fautif corrigible de plusieurs | |
| # façons : | |
| # - fix_a corrige "hellb" → "hello" | |
| # - fix_b corrige "wlrd" → "world" | |
| # Si fix_b avait reçu la sortie de fix_a (qui n'a corrigé | |
| # que "hellb"), il aurait pu corriger "wlrd" en "world" | |
| # mais "hellb" reste incorrect. Avec le DAG branchant, | |
| # fix_a et fix_b appliquent chacun leur correction sur | |
| # l'OCR original, indépendamment. | |
| spec = PipelineSpec( | |
| name="fork", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("hellb wlrd")), | |
| PipelineStep( | |
| "fix_a", | |
| TextFixer("fix_a", {"hellb": "hello"}), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| PipelineStep( | |
| "fix_b", | |
| TextFixer("fix_b", {"wlrd": "world"}), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| ], | |
| ) | |
| result = PipelineRunner.run( | |
| spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, | |
| ) | |
| assert result.succeeded | |
| # fix_a a corrigé "hellb" → "hello wlrd" (CER élevé) | |
| # fix_b a corrigé "wlrd" → "hellb world" (CER élevé) | |
| # Aucun ne ramène à "hello world", mais on vérifie que | |
| # chacun a bien démarré depuis l'OCR original. | |
| cer_a = result.steps[1].junction_metrics["text"]["cer"] | |
| cer_b = result.steps[2].junction_metrics["text"]["cer"] | |
| # Les deux CER sont strictement > 0 (puisque chaque fixer | |
| # ne corrige qu'une partie du texte fautif) | |
| assert cer_a > 0.0 | |
| assert cer_b > 0.0 | |
| def test_fork_vs_chain_diverge(self) -> None: | |
| """Fork explicite vs chain implicite produisent des résultats | |
| différents quand les transformations ne sont pas commutatives.""" | |
| doc = _make_doc("hello world") | |
| # chain : ocr → doubler → fixer (le fixer voit le texte doublé) | |
| chain_spec = PipelineSpec( | |
| name="chain", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("hello wrold")), | |
| PipelineStep("doubler", TextDoubler()), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {"wrold": "world"}), | |
| ), | |
| ], | |
| ) | |
| # fork : doubler depuis ocr ; fix DEPUIS ocr (pas depuis | |
| # doubler) → fix corrige sans le doubling | |
| fork_spec = PipelineSpec( | |
| name="fork", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("hello wrold")), | |
| PipelineStep( | |
| "doubler", TextDoubler(), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {"wrold": "world"}), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| ], | |
| ) | |
| chain_result = PipelineRunner.run( | |
| chain_spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, | |
| ) | |
| fork_result = PipelineRunner.run( | |
| fork_spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, | |
| ) | |
| # En chain, le fixer voit le texte doublé "hello wroldhello wrold" | |
| # → "hello worldhello world" — CER élevé vs GT "hello world". | |
| # En fork, le fixer voit l'OCR original "hello wrold" → | |
| # "hello world" — CER 0 vs GT "hello world". | |
| chain_fix_cer = chain_result.steps[2].junction_metrics["text"]["cer"] | |
| fork_fix_cer = fork_result.steps[2].junction_metrics["text"]["cer"] | |
| assert fork_fix_cer == 0.0 | |
| assert chain_fix_cer > 0.0 | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # 4. Référence vers une étape qui a échoué | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class TestReferenceToFailedStep: | |
| def test_inputs_from_failed_step_propagates_missing(self) -> None: | |
| doc = _make_doc("hello world") | |
| spec = PipelineSpec( | |
| name="fail_then_ref", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("hello world")), | |
| PipelineStep( | |
| "fail", AlwaysFails(), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| # Cette étape référence "fail" qui a échoué | |
| PipelineStep( | |
| "after_fail", | |
| TextFixer("after", {}), | |
| inputs_from={ArtifactType.TEXT: "fail"}, | |
| ), | |
| ], | |
| ) | |
| result = PipelineRunner.run( | |
| spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, | |
| ) | |
| # ocr OK, fail échoue, after_fail signale entrée manquante | |
| assert result.steps[0].error is None | |
| assert result.steps[1].error is not None | |
| assert "RuntimeError" in result.steps[1].error | |
| assert result.steps[2].error is not None | |
| assert "@fail" in result.steps[2].error | |
| # ────────────────────────────────────────────────────────────────────────── | |
| # 5. Rétrocompat : sans inputs_from, comportement Sprint 63 | |
| # ────────────────────────────────────────────────────────────────────────── | |
| class TestBackwardsCompat: | |
| def test_chain_without_inputs_from_still_works(self) -> None: | |
| doc = _make_doc("hello world") | |
| spec = PipelineSpec( | |
| name="legacy", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("hello wrold")), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {"wrold": "world"}), | |
| ), | |
| ], | |
| ) | |
| result = PipelineRunner.run( | |
| spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, | |
| ) | |
| assert result.succeeded | |
| cer = result.steps[1].junction_metrics["text"]["cer"] | |
| assert cer == 0.0 | |
| def test_junction_metrics_for_returns_last_text(self) -> None: | |
| doc = _make_doc("hello world") | |
| spec = PipelineSpec( | |
| name="fork", | |
| steps=[ | |
| PipelineStep("ocr", MockOCR("hello world")), | |
| PipelineStep( | |
| "fix", | |
| TextFixer("fix", {}), | |
| inputs_from={ArtifactType.TEXT: "ocr"}, | |
| ), | |
| ], | |
| ) | |
| result = PipelineRunner.run( | |
| spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"}, | |
| ) | |
| # La dernière étape réussie ayant produit TEXT est "fix" | |
| final = result.junction_metrics_for(ArtifactType.TEXT) | |
| assert final is not None | |
| assert final["cer"] == 0.0 | |