Picarones / tests /pipeline /test_sprint_a14_s28_planner.py
Claude
test(audit): Γ©liminer tous les pytest.raises(Exception) rΓ©siduels
0d00572 unverified
Raw
History Blame
24.6 kB
"""Sprint A14-S28 β€” ``PipelinePlanner`` + ``ExecutionPlan``.
Tests du planner introduit par S28 pour transformer une
``PipelineSpec`` en plan d'exΓ©cution immuable consommΓ© par
le ``PipelineExecutor.run_plan``.
Couvre :
1. ``PipelinePlanner.plan`` :
- spec valide β†’ ExecutionPlan avec resolved_steps + bindings ;
- spec invalide β†’ PlanningError avec liste d'erreurs ;
- DAG branchant (inputs_from explicite) β†’ bindings non implicites ;
- validation d'adapters (set fourni) ;
- validation d'adapters (None β†’ skip).
2. DΓ©tection des jonctions de mΓ©triques :
- sans MetricRegistry β†’ metric_junctions = () ;
- avec MetricRegistry β†’ 1 junction par sortie de step ;
- sortie sans mΓ©trique applicable β†’ candidate_metrics = () ;
- tri alphabΓ©tique dΓ©terministe des noms.
3. ``ExecutionPlan`` API :
- frozen dataclass ;
- step_by_id() ;
- junctions_for_step().
4. IntΓ©gration avec ``PipelineExecutor`` :
- run_plan(plan) consume un plan prΓ©-calculΓ© ;
- run(spec) plan internement et exΓ©cute ;
- executor.plan(spec) sucre.
"""
from __future__ import annotations
from dataclasses import FrozenInstanceError
import pytest
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.documents import DocumentRef
from picarones.domain.evaluation_spec import MetricSpec
from picarones.evaluation.registry import MetricRegistry
from picarones.pipeline.executor import PipelineExecutor, PipelineSpecInvalid
from picarones.pipeline.planner import (
ExecutionPlan,
MetricJunction,
PipelinePlanner,
PlanningError,
StepInputBinding,
)
from picarones.domain.pipeline_spec import (
INITIAL_STEP_ID,
PipelineSpec,
PipelineStep,
)
from picarones.pipeline.types import RunContext
# ──────────────────────────────────────────────────────────────────────
# Stub adapter
# ──────────────────────────────────────────────────────────────────────
class _IdentityAdapter:
"""Adapter qui retourne directement ses inputs comme outputs."""
name = "identity"
input_types = frozenset() # ne sert pas β€” l'executor lit step.input_types
output_types = frozenset()
execution_mode = "io"
def execute(self, inputs, params, context):
return {
t: Artifact(
id=f"{context.document_id}:{t.value}",
document_id=context.document_id,
type=t,
)
for t in inputs
}
class _OCRStub:
name = "ocr_stub"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def execute(self, inputs, params, context):
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
# ──────────────────────────────────────────────────────────────────────
# PipelinePlanner β€” validation
# ──────────────────────────────────────────────────────────────────────
class TestPipelinePlannerConstructor:
def test_no_args(self) -> None:
planner = PipelinePlanner()
assert planner is not None
def test_with_metric_registry(self) -> None:
planner = PipelinePlanner(metric_registry=MetricRegistry())
assert planner is not None
def test_rejects_non_metric_registry(self) -> None:
with pytest.raises(TypeError, match="metric_registry"):
PipelinePlanner(metric_registry="nope") # type: ignore[arg-type]
def test_with_available_adapters(self) -> None:
planner = PipelinePlanner(available_adapters={"adapter_a", "adapter_b"})
assert planner is not None
class TestPipelinePlannerErrors:
def test_empty_spec_raises_planning_error(self) -> None:
spec = PipelineSpec(name="empty", steps=())
planner = PipelinePlanner()
with pytest.raises(PlanningError) as exc_info:
planner.plan(spec)
assert exc_info.value.errors
assert exc_info.value.errors[0].code == "empty_pipeline"
def test_unknown_adapter_raises_when_set_provided(self) -> None:
spec = PipelineSpec(
name="unknown_adapter",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s1",
kind="ocr",
adapter_name="not_in_registry",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
planner = PipelinePlanner(available_adapters={"foo", "bar"})
with pytest.raises(PlanningError) as exc_info:
planner.plan(spec)
assert any(
e.code == "unknown_adapter" for e in exc_info.value.errors
)
def test_unknown_adapter_skipped_when_set_none(self) -> None:
"""Sans set d'adapters fourni, la validation est sautΓ©e."""
spec = PipelineSpec(
name="unknown_adapter",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s1",
kind="ocr",
adapter_name="any_name",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
planner = PipelinePlanner()
plan = planner.plan(spec) # ne lève pas
assert isinstance(plan, ExecutionPlan)
def test_planning_error_carries_all_errors(self) -> None:
"""Le planner ne short-circuit pas β€” il rΓ©colte toutes les erreurs."""
spec = PipelineSpec(
name="multi_err",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="s1",
kind="ocr",
adapter_name="bad_a",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="s1", # duplicated id !
kind="other",
adapter_name="bad_b",
input_types=(ArtifactType.RAW_TEXT,),
output_types=(ArtifactType.CORRECTED_TEXT,),
),
),
)
planner = PipelinePlanner(available_adapters={"only_one"})
with pytest.raises(PlanningError) as exc_info:
planner.plan(spec)
codes = {e.code for e in exc_info.value.errors}
assert "duplicate_id" in codes
assert "unknown_adapter" in codes
# ──────────────────────────────────────────────────────────────────────
# PipelinePlanner β€” rΓ©solution des bindings
# ──────────────────────────────────────────────────────────────────────
class TestPipelinePlannerBindings:
def test_simple_chain_resolves_to_initial(self) -> None:
spec = PipelineSpec(
name="simple",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr",
kind="ocr",
adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
plan = PipelinePlanner().plan(spec)
assert len(plan.resolved_steps) == 1
rs = plan.resolved_steps[0]
assert rs.id == "ocr"
assert len(rs.input_bindings) == 1
binding = rs.input_bindings[0]
assert binding.input_type == ArtifactType.IMAGE
assert binding.source_step_id == INITIAL_STEP_ID
def test_two_step_chain_resolves_to_previous(self) -> None:
spec = PipelineSpec(
name="two_step",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="ocr",
kind="ocr",
adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="post",
kind="post_correction",
adapter_name="llm_corrector",
input_types=(ArtifactType.RAW_TEXT,),
output_types=(ArtifactType.CORRECTED_TEXT,),
),
),
)
plan = PipelinePlanner().plan(spec)
assert len(plan.resolved_steps) == 2
# 1er step : IMAGE depuis __initial__
assert plan.resolved_steps[0].input_bindings[0].source_step_id == INITIAL_STEP_ID
# 2e step : RAW_TEXT depuis le step "ocr"
assert plan.resolved_steps[1].input_bindings[0].source_step_id == "ocr"
def test_inputs_from_explicit_overrides_latest(self) -> None:
"""Si inputs_from dΓ©signe une Γ©tape antΓ©rieure non-rΓ©cente,
le binding doit pointer vers cette Γ©tape, pas vers le
dernier producteur."""
spec = PipelineSpec(
name="explicit_dag",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="ocr_a",
kind="ocr",
adapter_name="ocr_a",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="ocr_b",
kind="ocr",
adapter_name="ocr_b",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="post_from_a",
kind="post_correction",
adapter_name="llm",
input_types=(ArtifactType.RAW_TEXT,),
output_types=(ArtifactType.CORRECTED_TEXT,),
# On veut explicitement le RAW_TEXT de ocr_a, pas ocr_b
# qui serait le Β« dernier producteur Β».
inputs_from={ArtifactType.RAW_TEXT: "ocr_a"},
),
),
)
plan = PipelinePlanner().plan(spec)
assert plan.resolved_steps[2].input_bindings[0].source_step_id == "ocr_a"
def test_resolved_step_preserves_input_order(self) -> None:
spec = PipelineSpec(
name="multi_input",
initial_inputs=(ArtifactType.IMAGE, ArtifactType.RAW_TEXT),
steps=(PipelineStep(
id="merge",
kind="merge",
adapter_name="m",
input_types=(ArtifactType.IMAGE, ArtifactType.RAW_TEXT),
output_types=(ArtifactType.CORRECTED_TEXT,),
),),
)
plan = PipelinePlanner().plan(spec)
types = [b.input_type for b in plan.resolved_steps[0].input_bindings]
assert types == [ArtifactType.IMAGE, ArtifactType.RAW_TEXT]
# ──────────────────────────────────────────────────────────────────────
# PipelinePlanner β€” dΓ©tection des jonctions de mΓ©triques
# ──────────────────────────────────────────────────────────────────────
def _registry_with_text_metric() -> MetricRegistry:
reg = MetricRegistry()
reg.register(
MetricSpec(
name="cer",
input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT),
),
lambda r, h: 0.0,
)
reg.register(
MetricSpec(
name="wer",
input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT),
),
lambda r, h: 0.0,
)
return reg
class TestPipelinePlannerJunctions:
def test_no_registry_means_empty_junctions(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
plan = PipelinePlanner().plan(spec)
assert plan.metric_junctions == ()
def test_registry_yields_junctions_per_output(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
plan = PipelinePlanner(
metric_registry=_registry_with_text_metric(),
).plan(spec)
assert len(plan.metric_junctions) == 1
j = plan.metric_junctions[0]
assert j.step_id == "ocr"
assert j.artifact_type == ArtifactType.RAW_TEXT
# Tri alphabΓ©tique dΓ©terministe
assert j.candidate_metrics == ("cer", "wer")
def test_output_without_metric_yields_empty_candidates(self) -> None:
"""Un type d'output sans mΓ©trique enregistrΓ©e donne tout de
mΓͺme une jonction (utile pour le diagnostic) avec
candidate_metrics vide."""
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="alto",
kind="alto",
adapter_name="alto_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.ALTO_XML,),
),),
)
plan = PipelinePlanner(
metric_registry=_registry_with_text_metric(),
).plan(spec)
assert len(plan.metric_junctions) == 1
j = plan.metric_junctions[0]
assert j.step_id == "alto"
assert j.artifact_type == ArtifactType.ALTO_XML
assert j.candidate_metrics == ()
# ──────────────────────────────────────────────────────────────────────
# ExecutionPlan API
# ──────────────────────────────────────────────────────────────────────
class TestExecutionPlanAPI:
def test_step_by_id(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="a", kind="ocr", adapter_name="x",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="b", kind="post", adapter_name="y",
input_types=(ArtifactType.RAW_TEXT,),
output_types=(ArtifactType.CORRECTED_TEXT,),
),
),
)
plan = PipelinePlanner().plan(spec)
a = plan.step_by_id("a")
assert a is not None
assert a.id == "a"
assert plan.step_by_id("missing") is None
def test_junctions_for_step(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="ocr", kind="ocr", adapter_name="o",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="post", kind="post", adapter_name="p",
input_types=(ArtifactType.RAW_TEXT,),
output_types=(ArtifactType.CORRECTED_TEXT,),
),
),
)
plan = PipelinePlanner(
metric_registry=_registry_with_text_metric(),
).plan(spec)
ocr_junctions = plan.junctions_for_step("ocr")
assert len(ocr_junctions) == 1
assert ocr_junctions[0].artifact_type == ArtifactType.RAW_TEXT
assert plan.junctions_for_step("missing") == ()
def test_dataclass_frozen(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="o",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
plan = PipelinePlanner().plan(spec)
with pytest.raises(FrozenInstanceError):
plan.spec = None # type: ignore[misc]
def test_step_input_binding_frozen(self) -> None:
b = StepInputBinding(
input_type=ArtifactType.IMAGE,
source_step_id="x",
)
with pytest.raises(FrozenInstanceError):
b.source_step_id = "y" # type: ignore[misc]
def test_resolved_step_frozen(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="k", adapter_name="a",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
plan = PipelinePlanner().plan(spec)
rs = plan.resolved_steps[0]
with pytest.raises(FrozenInstanceError):
rs.step = None # type: ignore[misc]
def test_metric_junction_frozen(self) -> None:
j = MetricJunction(
step_id="x",
artifact_type=ArtifactType.RAW_TEXT,
candidate_metrics=("cer",),
)
with pytest.raises(FrozenInstanceError):
j.candidate_metrics = () # type: ignore[misc]
# ──────────────────────────────────────────────────────────────────────
# IntΓ©gration Planner + Executor
# ──────────────────────────────────────────────────────────────────────
class TestPipelineExecutorWithPlanner:
def test_executor_plan_returns_execution_plan(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
executor = PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
)
plan = executor.plan(spec)
assert isinstance(plan, ExecutionPlan)
assert len(plan.resolved_steps) == 1
def test_executor_plan_raises_pipeline_spec_invalid_on_bad_spec(self) -> None:
spec = PipelineSpec(name="bad", steps=())
executor = PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
)
with pytest.raises(PipelineSpecInvalid, match="invalide"):
executor.plan(spec)
def test_run_plan_executes_pre_planned(self) -> None:
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
executor = PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
)
plan = executor.plan(spec)
doc = DocumentRef(id="d1", image_uri="/tmp/img.png")
ctx = RunContext(
document_id="d1",
code_version="1.0.0",
pipeline_name="x",
)
result = executor.run_plan(
plan=plan,
document=doc,
initial_inputs={
ArtifactType.IMAGE: Artifact(
id="d1:img", document_id="d1", type=ArtifactType.IMAGE,
),
},
context=ctx,
)
assert result.succeeded
assert len(result.step_results) == 1
assert result.step_results[0].step_id == "ocr"
def test_run_plan_rejects_non_plan(self) -> None:
executor = PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
)
# ``PipelineExecutor.run_plan`` lève ``PicaronesError`` quand
# l'argument ``plan`` n'est pas un ``ExecutionPlan``.
from picarones.domain.errors import PicaronesError
with pytest.raises(PicaronesError, match="ExecutionPlan"):
executor.run_plan(
plan="not a plan", # type: ignore[arg-type]
document=DocumentRef(id="d1"),
initial_inputs={},
context=RunContext(
document_id="d1", code_version="1.0",
pipeline_name="x",
),
)
def test_run_spec_still_works_via_planning(self) -> None:
"""Sucre run(spec) β€” plan internement et exΓ©cute."""
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
executor = PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
)
doc = DocumentRef(id="d1", image_uri="/tmp/img.png")
ctx = RunContext(
document_id="d1",
code_version="1.0.0",
pipeline_name="x",
)
result = executor.run(
spec=spec,
document=doc,
initial_inputs={
ArtifactType.IMAGE: Artifact(
id="d1:img", document_id="d1", type=ArtifactType.IMAGE,
),
},
context=ctx,
)
assert result.succeeded
def test_planner_injection(self) -> None:
"""Le caller peut injecter son propre planner (ex: avec
MetricRegistry pour avoir les jonctions)."""
custom_planner = PipelinePlanner(
metric_registry=_registry_with_text_metric(),
)
executor = PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
planner=custom_planner,
)
spec = PipelineSpec(
name="x",
initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="ocr", kind="ocr", adapter_name="ocr_stub",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
plan = executor.plan(spec)
assert plan.metric_junctions # non vide grΓ’ce au registry injectΓ©
def test_planner_must_be_pipeline_planner(self) -> None:
from picarones.domain.errors import PicaronesError
with pytest.raises(PicaronesError, match="PipelinePlanner"):
PipelineExecutor(
adapter_resolver=lambda n: _OCRStub(),
planner="not a planner", # type: ignore[arg-type]
)