Picarones / tests /core /test_sprint66_dag_branching.py
Claude
test: réorganiser les 110 fichiers tests/test_*.py par cercle architectural
d109222 unverified
Raw
History Blame
16.4 kB
"""Tests Sprint 66 — DAG branchant via ``inputs_from``.
Couvre :
1. ``PipelineStep.inputs_from`` accepté par défaut (vide).
2. ``PipelineSpec.validate`` :
- ``inputs_from`` vers une étape antérieure connue qui produit
le type → OK
- ``inputs_from`` vers une étape inconnue → erreur explicite
- ``inputs_from`` vers une étape qui ne produit pas ce type →
erreur explicite
- ``inputs_from`` pour un type que le module ne consomme pas →
erreur explicite
- ``inputs_from = {TYPE: "__initial__"}`` valide si ce type est
dans les entrées initiales
3. ``PipelineRunner.run`` :
- DAG fork : 2 corrections en parallèle d'un même OCR (chacune
démarre depuis OCR, pas l'une de l'autre) → métriques
indépendantes
- Rétrocompat : sans ``inputs_from``, comportement Sprint 63
préservé (chaîne)
- ``inputs_from`` vers une étape qui a échoué → entrée
manquante explicite avec marqueur ``@step``
4. ``PipelineResult.junction_metrics_for`` retourne la dernière
étape réussie ayant produit le type, indépendamment du DAG.
5. Philosophie inchangée : tous les modules sont des **mocks**.
"""
from __future__ import annotations
from typing import Any
from picarones.core.corpus import Document, GTLevel, TextGT
from picarones.core.modules import ArtifactType, BaseModule
from picarones.core.pipeline import (
PipelineRunner,
PipelineSpec,
PipelineStep,
)
# ──────────────────────────────────────────────────────────────────────────
# Mocks
# ──────────────────────────────────────────────────────────────────────────
class MockOCR(BaseModule):
input_types = (ArtifactType.IMAGE,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "io"
def __init__(self, output: str) -> None:
self._out = output
@property
def name(self) -> str:
return "mock-ocr"
def process(self, inputs):
return {ArtifactType.TEXT: self._out}
class TextFixer(BaseModule):
"""Rewriter qui applique un dict de remplacements."""
input_types = (ArtifactType.TEXT,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "cpu"
def __init__(self, name: str, replacements: dict[str, str]) -> None:
self._name = name
self._replacements = replacements
@property
def name(self) -> str:
return self._name
def process(self, inputs):
text = inputs[ArtifactType.TEXT]
for src, dst in self._replacements.items():
text = text.replace(src, dst)
return {ArtifactType.TEXT: text}
class TextDoubler(BaseModule):
"""Module qui consomme TEXT et produit TEXT (concatène 2 fois)."""
input_types = (ArtifactType.TEXT,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "cpu"
@property
def name(self) -> str:
return "doubler"
def process(self, inputs):
return {ArtifactType.TEXT: inputs[ArtifactType.TEXT] * 2}
class AlwaysFails(BaseModule):
input_types = (ArtifactType.TEXT,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "cpu"
@property
def name(self) -> str:
return "fail"
def process(self, inputs):
raise RuntimeError("boom")
def _make_doc(text: str = "hello world") -> Document:
return Document(
image_path="/tmp/x.png", ground_truth=text, doc_id="d1",
ground_truths={GTLevel.TEXT: TextGT(text=text)},
)
# ──────────────────────────────────────────────────────────────────────────
# 1. PipelineStep.inputs_from default
# ──────────────────────────────────────────────────────────────────────────
class TestStepDefaults:
def test_inputs_from_default_empty(self) -> None:
step = PipelineStep("ocr", MockOCR("x"))
assert step.inputs_from == {}
# ──────────────────────────────────────────────────────────────────────────
# 2. Validation étendue
# ──────────────────────────────────────────────────────────────────────────
class TestValidateInputsFrom:
def test_valid_reference_to_prior_step(self) -> None:
spec = PipelineSpec(
name="ok",
steps=[
PipelineStep("ocr", MockOCR("x")),
PipelineStep(
"fix",
TextFixer("fix", {}),
inputs_from={ArtifactType.TEXT: "ocr"},
),
],
)
problems = spec.validate((ArtifactType.IMAGE,))
assert problems == []
def test_reference_to_initial_input(self) -> None:
# Une pipeline démarrant par TEXT (factory custom) peut
# référencer "__initial__"
spec = PipelineSpec(
name="ok",
steps=[
PipelineStep(
"fix",
TextFixer("fix", {}),
inputs_from={ArtifactType.TEXT: "__initial__"},
),
],
)
problems = spec.validate((ArtifactType.TEXT,))
assert problems == []
def test_reference_to_unknown_step(self) -> None:
spec = PipelineSpec(
name="bad",
steps=[
PipelineStep("ocr", MockOCR("x")),
PipelineStep(
"fix",
TextFixer("fix", {}),
inputs_from={ArtifactType.TEXT: "non_existing"},
),
],
)
problems = spec.validate((ArtifactType.IMAGE,))
assert any("non_existing" in p for p in problems)
def test_reference_to_step_not_producing_type(self) -> None:
# Un step qui produit TEXT, on référence un type ALTO qu'il
# n'a pas — mais le module en aval ne consomme pas ALTO,
# donc on test directement avec un type que le module
# consomme bien. Pour ce test on simule en référençant
# un type que le module en aval consomme mais que l'étape
# source n'a pas produit.
spec = PipelineSpec(
name="bad",
steps=[
PipelineStep("ocr", MockOCR("x")), # produit TEXT
# Le step suivant consomme TEXT et inputs_from
# référence l'étape "ocr" mais via un type qu'elle
# ne produit pas. Pour faire ça il faut un module
# qui consomme un autre type. On ne couvre pas ce
# cas ici (il faudrait un mock multi-type) ;
# on valide via test_reference_type_not_consumed.
],
)
# Ce test est vide intentionnellement — couvert par le
# suivant.
assert spec.validate((ArtifactType.IMAGE,)) == []
def test_reference_type_not_consumed(self) -> None:
# Le module ne consomme pas IMAGE, mais on déclare
# inputs_from[IMAGE] = "ocr" — erreur.
spec = PipelineSpec(
name="bad",
steps=[
PipelineStep("ocr", MockOCR("x")),
PipelineStep(
"fix",
TextFixer("fix", {}),
inputs_from={
ArtifactType.IMAGE: "ocr", # IMAGE n'est pas dans input_types de TextFixer
},
),
],
)
problems = spec.validate((ArtifactType.IMAGE,))
assert any("ne consomme pas" in p for p in problems)
# ──────────────────────────────────────────────────────────────────────────
# 3. DAG branchant : fork explicite
# ──────────────────────────────────────────────────────────────────────────
class TestForkBranch:
def test_two_fixers_from_same_ocr(self) -> None:
"""OCR → fix_a (depuis OCR), OCR → fix_b (depuis OCR).
Sans inputs_from, fix_b consommerait la sortie de fix_a
(chaîne). Avec inputs_from explicite, chaque fixer part de
l'OCR original.
"""
doc = _make_doc("hello world")
# OCR produit du texte fautif corrigible de plusieurs
# façons :
# - fix_a corrige "hellb" → "hello"
# - fix_b corrige "wlrd" → "world"
# Si fix_b avait reçu la sortie de fix_a (qui n'a corrigé
# que "hellb"), il aurait pu corriger "wlrd" en "world"
# mais "hellb" reste incorrect. Avec le DAG branchant,
# fix_a et fix_b appliquent chacun leur correction sur
# l'OCR original, indépendamment.
spec = PipelineSpec(
name="fork",
steps=[
PipelineStep("ocr", MockOCR("hellb wlrd")),
PipelineStep(
"fix_a",
TextFixer("fix_a", {"hellb": "hello"}),
inputs_from={ArtifactType.TEXT: "ocr"},
),
PipelineStep(
"fix_b",
TextFixer("fix_b", {"wlrd": "world"}),
inputs_from={ArtifactType.TEXT: "ocr"},
),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert result.succeeded
# fix_a a corrigé "hellb" → "hello wlrd" (CER élevé)
# fix_b a corrigé "wlrd" → "hellb world" (CER élevé)
# Aucun ne ramène à "hello world", mais on vérifie que
# chacun a bien démarré depuis l'OCR original.
cer_a = result.steps[1].junction_metrics["text"]["cer"]
cer_b = result.steps[2].junction_metrics["text"]["cer"]
# Les deux CER sont strictement > 0 (puisque chaque fixer
# ne corrige qu'une partie du texte fautif)
assert cer_a > 0.0
assert cer_b > 0.0
def test_fork_vs_chain_diverge(self) -> None:
"""Fork explicite vs chain implicite produisent des résultats
différents quand les transformations ne sont pas commutatives."""
doc = _make_doc("hello world")
# chain : ocr → doubler → fixer (le fixer voit le texte doublé)
chain_spec = PipelineSpec(
name="chain",
steps=[
PipelineStep("ocr", MockOCR("hello wrold")),
PipelineStep("doubler", TextDoubler()),
PipelineStep(
"fix",
TextFixer("fix", {"wrold": "world"}),
),
],
)
# fork : doubler depuis ocr ; fix DEPUIS ocr (pas depuis
# doubler) → fix corrige sans le doubling
fork_spec = PipelineSpec(
name="fork",
steps=[
PipelineStep("ocr", MockOCR("hello wrold")),
PipelineStep(
"doubler", TextDoubler(),
inputs_from={ArtifactType.TEXT: "ocr"},
),
PipelineStep(
"fix",
TextFixer("fix", {"wrold": "world"}),
inputs_from={ArtifactType.TEXT: "ocr"},
),
],
)
chain_result = PipelineRunner.run(
chain_spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
fork_result = PipelineRunner.run(
fork_spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
# En chain, le fixer voit le texte doublé "hello wroldhello wrold"
# → "hello worldhello world" — CER élevé vs GT "hello world".
# En fork, le fixer voit l'OCR original "hello wrold" →
# "hello world" — CER 0 vs GT "hello world".
chain_fix_cer = chain_result.steps[2].junction_metrics["text"]["cer"]
fork_fix_cer = fork_result.steps[2].junction_metrics["text"]["cer"]
assert fork_fix_cer == 0.0
assert chain_fix_cer > 0.0
# ──────────────────────────────────────────────────────────────────────────
# 4. Référence vers une étape qui a échoué
# ──────────────────────────────────────────────────────────────────────────
class TestReferenceToFailedStep:
def test_inputs_from_failed_step_propagates_missing(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="fail_then_ref",
steps=[
PipelineStep("ocr", MockOCR("hello world")),
PipelineStep(
"fail", AlwaysFails(),
inputs_from={ArtifactType.TEXT: "ocr"},
),
# Cette étape référence "fail" qui a échoué
PipelineStep(
"after_fail",
TextFixer("after", {}),
inputs_from={ArtifactType.TEXT: "fail"},
),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
# ocr OK, fail échoue, after_fail signale entrée manquante
assert result.steps[0].error is None
assert result.steps[1].error is not None
assert "RuntimeError" in result.steps[1].error
assert result.steps[2].error is not None
assert "@fail" in result.steps[2].error
# ──────────────────────────────────────────────────────────────────────────
# 5. Rétrocompat : sans inputs_from, comportement Sprint 63
# ──────────────────────────────────────────────────────────────────────────
class TestBackwardsCompat:
def test_chain_without_inputs_from_still_works(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="legacy",
steps=[
PipelineStep("ocr", MockOCR("hello wrold")),
PipelineStep(
"fix",
TextFixer("fix", {"wrold": "world"}),
),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert result.succeeded
cer = result.steps[1].junction_metrics["text"]["cer"]
assert cer == 0.0
def test_junction_metrics_for_returns_last_text(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="fork",
steps=[
PipelineStep("ocr", MockOCR("hello world")),
PipelineStep(
"fix",
TextFixer("fix", {}),
inputs_from={ArtifactType.TEXT: "ocr"},
),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
# La dernière étape réussie ayant produit TEXT est "fix"
final = result.junction_metrics_for(ArtifactType.TEXT)
assert final is not None
assert final["cer"] == 0.0