Picarones / tests /core /test_sprint63_pipeline_runner.py
Claude
test: réorganiser les 110 fichiers tests/test_*.py par cercle architectural
d109222 unverified
Raw
History Blame
15.8 kB
"""Tests Sprint 63 — banc d'essai de pipelines composées (axe B).
Couvre :
1. ``PipelineSpec.validate`` : pipeline vide, types qui s'enchaînent,
manque d'entrée à une étape.
2. ``PipelineRunner.run`` :
- 1 étape OCR mock + GT TEXT → métriques calculées à la jonction
- 2 étapes OCR + rewriter LLM mock → 2 jonctions évaluées
- Module qui lève → propagation gracieuse, étapes suivantes
reçoivent une erreur explicite d'entrée manquante
- Sortie déclarée mais non produite → erreur explicite
- Aucune GT au type produit → pas de métriques (pas d'erreur)
- Mesure du temps par étape > 0
3. Cas d'usage réaliste : OCR fautif + rewriter qui corrige → la
métrique CER baisse à la jonction post-rewrite.
4. ``PipelineResult.junction_metrics_for`` retourne les métriques
de la dernière étape ayant produit le type, ignorant les étapes
qui ont échoué.
5. **Test philosophie** : Picarones ne fournit pas de modules
métier — tous les modules utilisés ici sont des **mocks définis
dans le test**, pas dans le code de production.
"""
from __future__ import annotations
from typing import Any
from picarones.core.corpus import Document, GTLevel, TextGT
from picarones.core.modules import ArtifactType, BaseModule
from picarones.core.pipeline import (
PipelineResult,
PipelineRunner,
PipelineSpec,
PipelineStep,
StepResult,
)
# ──────────────────────────────────────────────────────────────────────────
# Mocks — uniquement à but de test, jamais en production
# ──────────────────────────────────────────────────────────────────────────
class MockOCR(BaseModule):
"""Mock d'un OCR : produit un texte fixe à partir d'une image."""
input_types = (ArtifactType.IMAGE,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "io"
def __init__(self, fixed_output: str) -> None:
self._out = fixed_output
@property
def name(self) -> str:
return "mock-ocr"
def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]:
return {ArtifactType.TEXT: self._out}
class MockTextRewriter(BaseModule):
"""Mock d'un correcteur LLM TEXT→TEXT."""
input_types = (ArtifactType.TEXT,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "cpu"
def __init__(self, transform) -> None:
self._transform = transform
@property
def name(self) -> str:
return "mock-rewriter"
def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]:
return {ArtifactType.TEXT: self._transform(inputs[ArtifactType.TEXT])}
class MockCrasher(BaseModule):
"""Mock d'un module qui lève à chaque appel."""
input_types = (ArtifactType.TEXT,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "cpu"
@property
def name(self) -> str:
return "mock-crasher"
def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]:
raise RuntimeError("module en panne")
class MockSilentDropper(BaseModule):
"""Mock d'un module qui déclare produire TEXT mais ne le produit pas."""
input_types = (ArtifactType.TEXT,)
output_types = (ArtifactType.TEXT,)
execution_mode: Any = "cpu"
@property
def name(self) -> str:
return "mock-silent-dropper"
def process(self, inputs: dict[ArtifactType, Any]) -> dict[ArtifactType, Any]:
return {}
def _make_doc(
text: str = "hello world", with_gt: bool = True,
) -> Document:
gts: dict[GTLevel, Any] = {}
if with_gt:
gts[GTLevel.TEXT] = TextGT(text=text)
return Document(
image_path="/tmp/x.png",
ground_truth=text if with_gt else "",
doc_id="d1",
ground_truths=gts,
)
# ──────────────────────────────────────────────────────────────────────────
# 1. PipelineSpec.validate
# ──────────────────────────────────────────────────────────────────────────
class TestSpecValidate:
def test_empty_pipeline_invalid(self) -> None:
spec = PipelineSpec(name="empty")
problems = spec.validate(initial_inputs=(ArtifactType.IMAGE,))
assert problems
assert "vide" in problems[0]
def test_single_step_with_image_input_valid(self) -> None:
spec = PipelineSpec(
name="ocr",
steps=[PipelineStep("ocr", MockOCR("x"))],
)
assert spec.is_valid((ArtifactType.IMAGE,))
def test_chained_steps_valid(self) -> None:
spec = PipelineSpec(
name="ocr_then_rewrite",
steps=[
PipelineStep("ocr", MockOCR("x")),
PipelineStep("rewrite", MockTextRewriter(lambda t: t)),
],
)
assert spec.is_valid((ArtifactType.IMAGE,))
def test_missing_input_invalid(self) -> None:
# Rewriter demande TEXT mais aucun OCR n'a été placé avant
spec = PipelineSpec(
name="rewrite_only",
steps=[PipelineStep("rewrite", MockTextRewriter(lambda t: t))],
)
problems = spec.validate(initial_inputs=(ArtifactType.IMAGE,))
assert problems
assert "rewrite" in problems[0]
assert "text" in problems[0]
# ──────────────────────────────────────────────────────────────────────────
# 2. PipelineRunner.run — chemins nominaux
# ──────────────────────────────────────────────────────────────────────────
class TestRunSingleStep:
def test_one_step_with_text_gt(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="ocr",
steps=[PipelineStep("ocr", MockOCR("hello world"))],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert result.succeeded
assert len(result.steps) == 1
step = result.steps[0]
assert step.error is None
assert step.duration_seconds >= 0.0
# Métrique CER à 0 (hyp == GT)
assert step.junction_metrics["text"]["cer"] == 0.0
def test_one_step_imperfect_ocr(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="ocr",
steps=[PipelineStep("ocr", MockOCR("hellp wrld"))],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
cer = result.steps[0].junction_metrics["text"]["cer"]
assert 0.0 < cer < 1.0
class TestRunChained:
def test_two_steps_evaluation_at_each_junction(self) -> None:
doc = _make_doc("hello world")
# OCR fautif + rewriter qui corrige
spec = PipelineSpec(
name="ocr_then_rewrite",
steps=[
PipelineStep("ocr", MockOCR("hello wrold")),
PipelineStep(
"rewrite",
MockTextRewriter(lambda t: t.replace("wrold", "world")),
),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert result.succeeded
assert len(result.steps) == 2
cer_after_ocr = result.steps[0].junction_metrics["text"]["cer"]
cer_after_rewrite = result.steps[1].junction_metrics["text"]["cer"]
# Le CER baisse après le rewriter
assert cer_after_rewrite < cer_after_ocr
assert cer_after_rewrite == 0.0
def test_junction_metrics_for_returns_last(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="ocr_then_rewrite",
steps=[
PipelineStep("ocr", MockOCR("hello wrold")),
PipelineStep(
"rewrite",
MockTextRewriter(lambda t: t.replace("wrold", "world")),
),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
final = result.junction_metrics_for(ArtifactType.TEXT)
assert final is not None
assert final["cer"] == 0.0
# ──────────────────────────────────────────────────────────────────────────
# 3. Erreurs gracieuses
# ──────────────────────────────────────────────────────────────────────────
class TestGracefulErrors:
def test_module_raises_captured(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="crash",
steps=[
PipelineStep("ocr", MockOCR("hello world")),
PipelineStep("crash", MockCrasher()),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert not result.succeeded
assert result.steps[1].error is not None
assert "RuntimeError" in result.steps[1].error
assert "panne" in result.steps[1].error
# L'étape précédente reste OK
assert result.steps[0].error is None
assert result.failing_steps == ["crash"]
def test_silent_dropper_reported_as_missing_output(self) -> None:
doc = _make_doc("hello world")
spec = PipelineSpec(
name="dropper",
steps=[
PipelineStep("ocr", MockOCR("hello world")),
PipelineStep("drop", MockSilentDropper()),
],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
# L'étape drop signale une sortie manquante
assert result.steps[1].error is not None
assert "sortie manquante" in result.steps[1].error
def test_invalid_spec_marked_as_error(self) -> None:
doc = _make_doc()
# Pipeline qui demande TEXT mais on ne fournit que IMAGE
# et aucun OCR ne précède
spec = PipelineSpec(
name="bad",
steps=[PipelineStep("rewrite", MockTextRewriter(lambda t: t))],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert result.error is not None
assert "text" in result.error
# Aucune étape n'a été exécutée
assert result.steps == []
# ──────────────────────────────────────────────────────────────────────────
# 4. Pas de GT → pas de métriques mais pas d'erreur
# ──────────────────────────────────────────────────────────────────────────
class TestNoGroundTruth:
def test_no_gt_no_metrics_no_error(self) -> None:
doc = _make_doc(with_gt=False)
spec = PipelineSpec(
name="ocr",
steps=[PipelineStep("ocr", MockOCR("anything"))],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
# Pas d'erreur — la pipeline a tourné, simplement aucune
# métrique calculable
# (Document __post_init__ crée TextGT depuis ground_truth=""
# donc une GT vide existe ; la métrique CER vaudra alors 1.0
# ce qui est un autre test ; pour ce test on retire la GT.)
# On accepte donc soit absence soit présence du dict junction_metrics ;
# le point clé est que ça ne plante pas.
assert result.steps[0].error is None
assert result.succeeded
# ──────────────────────────────────────────────────────────────────────────
# 5. Temps par étape
# ──────────────────────────────────────────────────────────────────────────
class TestTiming:
def test_step_duration_recorded(self) -> None:
doc = _make_doc()
spec = PipelineSpec(
name="ocr",
steps=[PipelineStep("ocr", MockOCR("hello"))],
)
result = PipelineRunner.run(
spec, doc, {ArtifactType.IMAGE: "/tmp/x.png"},
)
assert result.steps[0].duration_seconds >= 0.0
assert result.total_duration_seconds >= result.steps[0].duration_seconds
# ──────────────────────────────────────────────────────────────────────────
# 6. Dataclasses (StepResult / PipelineResult)
# ──────────────────────────────────────────────────────────────────────────
class TestDataclasses:
def test_step_result_default(self) -> None:
sr = StepResult(
step_name="x", duration_seconds=0.1, output_types=(),
)
assert sr.junction_metrics == {}
assert sr.error is None
def test_pipeline_result_succeeded_false_on_step_error(self) -> None:
pr = PipelineResult(
pipeline_name="p", doc_id="d",
steps=[
StepResult(step_name="a", duration_seconds=0.1,
output_types=(ArtifactType.TEXT,)),
StepResult(step_name="b", duration_seconds=0.1,
output_types=(), error="boom"),
],
)
assert not pr.succeeded
assert pr.failing_steps == ["b"]
def test_junction_metrics_for_skips_failed_steps(self) -> None:
# Étape 1 a échoué, étape 0 a produit TEXT avec une métrique
pr = PipelineResult(
pipeline_name="p", doc_id="d",
steps=[
StepResult(
step_name="ocr", duration_seconds=0.1,
output_types=(ArtifactType.TEXT,),
junction_metrics={"text": {"cer": 0.1}},
),
StepResult(
step_name="rewrite", duration_seconds=0.1,
output_types=(), error="boom",
),
],
)
# On doit retomber sur l'étape OCR (la dernière qui a réussi
# pour TEXT)
assert pr.junction_metrics_for(ArtifactType.TEXT) == {"cer": 0.1}