Spaces:
Sleeping
Sleeping
| """Sprint A14-S28 β ``PipelinePlanner`` + ``ExecutionPlan``. | |
| Tests du planner introduit par S28 pour transformer une | |
| ``PipelineSpec`` en plan d'exΓ©cution immuable consommΓ© par | |
| le ``PipelineExecutor.run_plan``. | |
| Couvre : | |
| 1. ``PipelinePlanner.plan`` : | |
| - spec valide β ExecutionPlan avec resolved_steps + bindings ; | |
| - spec invalide β PlanningError avec liste d'erreurs ; | |
| - DAG branchant (inputs_from explicite) β bindings non implicites ; | |
| - validation d'adapters (set fourni) ; | |
| - validation d'adapters (None β skip). | |
| 2. DΓ©tection des jonctions de mΓ©triques : | |
| - sans MetricRegistry β metric_junctions = () ; | |
| - avec MetricRegistry β 1 junction par sortie de step ; | |
| - sortie sans mΓ©trique applicable β candidate_metrics = () ; | |
| - tri alphabΓ©tique dΓ©terministe des noms. | |
| 3. ``ExecutionPlan`` API : | |
| - frozen dataclass ; | |
| - step_by_id() ; | |
| - junctions_for_step(). | |
| 4. IntΓ©gration avec ``PipelineExecutor`` : | |
| - run_plan(plan) consume un plan prΓ©-calculΓ© ; | |
| - run(spec) plan internement et exΓ©cute ; | |
| - executor.plan(spec) sucre. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import FrozenInstanceError | |
| import pytest | |
| from picarones.domain.artifacts import Artifact, ArtifactType | |
| from picarones.domain.documents import DocumentRef | |
| from picarones.domain.evaluation_spec import MetricSpec | |
| from picarones.evaluation.registry import MetricRegistry | |
| from picarones.pipeline.executor import PipelineExecutor, PipelineSpecInvalid | |
| from picarones.pipeline.planner import ( | |
| ExecutionPlan, | |
| MetricJunction, | |
| PipelinePlanner, | |
| PlanningError, | |
| StepInputBinding, | |
| ) | |
| from picarones.domain.pipeline_spec import ( | |
| INITIAL_STEP_ID, | |
| PipelineSpec, | |
| PipelineStep, | |
| ) | |
| from picarones.pipeline.types import RunContext | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stub adapter | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _IdentityAdapter: | |
| """Adapter qui retourne directement ses inputs comme outputs.""" | |
| name = "identity" | |
| input_types = frozenset() # ne sert pas β l'executor lit step.input_types | |
| output_types = frozenset() | |
| execution_mode = "io" | |
| def execute(self, inputs, params, context): | |
| return { | |
| t: Artifact( | |
| id=f"{context.document_id}:{t.value}", | |
| document_id=context.document_id, | |
| type=t, | |
| ) | |
| for t in inputs | |
| } | |
| class _OCRStub: | |
| name = "ocr_stub" | |
| input_types = frozenset({ArtifactType.IMAGE}) | |
| output_types = frozenset({ArtifactType.RAW_TEXT}) | |
| execution_mode = "io" | |
| def execute(self, inputs, params, context): | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:raw", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| ), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PipelinePlanner β validation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPipelinePlannerConstructor: | |
| def test_no_args(self) -> None: | |
| planner = PipelinePlanner() | |
| assert planner is not None | |
| def test_with_metric_registry(self) -> None: | |
| planner = PipelinePlanner(metric_registry=MetricRegistry()) | |
| assert planner is not None | |
| def test_rejects_non_metric_registry(self) -> None: | |
| with pytest.raises(TypeError, match="metric_registry"): | |
| PipelinePlanner(metric_registry="nope") # type: ignore[arg-type] | |
| def test_with_available_adapters(self) -> None: | |
| planner = PipelinePlanner(available_adapters={"adapter_a", "adapter_b"}) | |
| assert planner is not None | |
| class TestPipelinePlannerErrors: | |
| def test_empty_spec_raises_planning_error(self) -> None: | |
| spec = PipelineSpec(name="empty", steps=()) | |
| planner = PipelinePlanner() | |
| with pytest.raises(PlanningError) as exc_info: | |
| planner.plan(spec) | |
| assert exc_info.value.errors | |
| assert exc_info.value.errors[0].code == "empty_pipeline" | |
| def test_unknown_adapter_raises_when_set_provided(self) -> None: | |
| spec = PipelineSpec( | |
| name="unknown_adapter", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="s1", | |
| kind="ocr", | |
| adapter_name="not_in_registry", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| planner = PipelinePlanner(available_adapters={"foo", "bar"}) | |
| with pytest.raises(PlanningError) as exc_info: | |
| planner.plan(spec) | |
| assert any( | |
| e.code == "unknown_adapter" for e in exc_info.value.errors | |
| ) | |
| def test_unknown_adapter_skipped_when_set_none(self) -> None: | |
| """Sans set d'adapters fourni, la validation est sautΓ©e.""" | |
| spec = PipelineSpec( | |
| name="unknown_adapter", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="s1", | |
| kind="ocr", | |
| adapter_name="any_name", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| planner = PipelinePlanner() | |
| plan = planner.plan(spec) # ne lève pas | |
| assert isinstance(plan, ExecutionPlan) | |
| def test_planning_error_carries_all_errors(self) -> None: | |
| """Le planner ne short-circuit pas β il rΓ©colte toutes les erreurs.""" | |
| spec = PipelineSpec( | |
| name="multi_err", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=( | |
| PipelineStep( | |
| id="s1", | |
| kind="ocr", | |
| adapter_name="bad_a", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ), | |
| PipelineStep( | |
| id="s1", # duplicated id ! | |
| kind="other", | |
| adapter_name="bad_b", | |
| input_types=(ArtifactType.RAW_TEXT,), | |
| output_types=(ArtifactType.CORRECTED_TEXT,), | |
| ), | |
| ), | |
| ) | |
| planner = PipelinePlanner(available_adapters={"only_one"}) | |
| with pytest.raises(PlanningError) as exc_info: | |
| planner.plan(spec) | |
| codes = {e.code for e in exc_info.value.errors} | |
| assert "duplicate_id" in codes | |
| assert "unknown_adapter" in codes | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PipelinePlanner β rΓ©solution des bindings | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPipelinePlannerBindings: | |
| def test_simple_chain_resolves_to_initial(self) -> None: | |
| spec = PipelineSpec( | |
| name="simple", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", | |
| kind="ocr", | |
| adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| assert len(plan.resolved_steps) == 1 | |
| rs = plan.resolved_steps[0] | |
| assert rs.id == "ocr" | |
| assert len(rs.input_bindings) == 1 | |
| binding = rs.input_bindings[0] | |
| assert binding.input_type == ArtifactType.IMAGE | |
| assert binding.source_step_id == INITIAL_STEP_ID | |
| def test_two_step_chain_resolves_to_previous(self) -> None: | |
| spec = PipelineSpec( | |
| name="two_step", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=( | |
| PipelineStep( | |
| id="ocr", | |
| kind="ocr", | |
| adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ), | |
| PipelineStep( | |
| id="post", | |
| kind="post_correction", | |
| adapter_name="llm_corrector", | |
| input_types=(ArtifactType.RAW_TEXT,), | |
| output_types=(ArtifactType.CORRECTED_TEXT,), | |
| ), | |
| ), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| assert len(plan.resolved_steps) == 2 | |
| # 1er step : IMAGE depuis __initial__ | |
| assert plan.resolved_steps[0].input_bindings[0].source_step_id == INITIAL_STEP_ID | |
| # 2e step : RAW_TEXT depuis le step "ocr" | |
| assert plan.resolved_steps[1].input_bindings[0].source_step_id == "ocr" | |
| def test_inputs_from_explicit_overrides_latest(self) -> None: | |
| """Si inputs_from dΓ©signe une Γ©tape antΓ©rieure non-rΓ©cente, | |
| le binding doit pointer vers cette Γ©tape, pas vers le | |
| dernier producteur.""" | |
| spec = PipelineSpec( | |
| name="explicit_dag", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=( | |
| PipelineStep( | |
| id="ocr_a", | |
| kind="ocr", | |
| adapter_name="ocr_a", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ), | |
| PipelineStep( | |
| id="ocr_b", | |
| kind="ocr", | |
| adapter_name="ocr_b", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ), | |
| PipelineStep( | |
| id="post_from_a", | |
| kind="post_correction", | |
| adapter_name="llm", | |
| input_types=(ArtifactType.RAW_TEXT,), | |
| output_types=(ArtifactType.CORRECTED_TEXT,), | |
| # On veut explicitement le RAW_TEXT de ocr_a, pas ocr_b | |
| # qui serait le Β« dernier producteur Β». | |
| inputs_from={ArtifactType.RAW_TEXT: "ocr_a"}, | |
| ), | |
| ), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| assert plan.resolved_steps[2].input_bindings[0].source_step_id == "ocr_a" | |
| def test_resolved_step_preserves_input_order(self) -> None: | |
| spec = PipelineSpec( | |
| name="multi_input", | |
| initial_inputs=(ArtifactType.IMAGE, ArtifactType.RAW_TEXT), | |
| steps=(PipelineStep( | |
| id="merge", | |
| kind="merge", | |
| adapter_name="m", | |
| input_types=(ArtifactType.IMAGE, ArtifactType.RAW_TEXT), | |
| output_types=(ArtifactType.CORRECTED_TEXT,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| types = [b.input_type for b in plan.resolved_steps[0].input_bindings] | |
| assert types == [ArtifactType.IMAGE, ArtifactType.RAW_TEXT] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PipelinePlanner β dΓ©tection des jonctions de mΓ©triques | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _registry_with_text_metric() -> MetricRegistry: | |
| reg = MetricRegistry() | |
| reg.register( | |
| MetricSpec( | |
| name="cer", | |
| input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT), | |
| ), | |
| lambda r, h: 0.0, | |
| ) | |
| reg.register( | |
| MetricSpec( | |
| name="wer", | |
| input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT), | |
| ), | |
| lambda r, h: 0.0, | |
| ) | |
| return reg | |
| class TestPipelinePlannerJunctions: | |
| def test_no_registry_means_empty_junctions(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| assert plan.metric_junctions == () | |
| def test_registry_yields_junctions_per_output(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner( | |
| metric_registry=_registry_with_text_metric(), | |
| ).plan(spec) | |
| assert len(plan.metric_junctions) == 1 | |
| j = plan.metric_junctions[0] | |
| assert j.step_id == "ocr" | |
| assert j.artifact_type == ArtifactType.RAW_TEXT | |
| # Tri alphabΓ©tique dΓ©terministe | |
| assert j.candidate_metrics == ("cer", "wer") | |
| def test_output_without_metric_yields_empty_candidates(self) -> None: | |
| """Un type d'output sans mΓ©trique enregistrΓ©e donne tout de | |
| mΓͺme une jonction (utile pour le diagnostic) avec | |
| candidate_metrics vide.""" | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="alto", | |
| kind="alto", | |
| adapter_name="alto_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.ALTO_XML,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner( | |
| metric_registry=_registry_with_text_metric(), | |
| ).plan(spec) | |
| assert len(plan.metric_junctions) == 1 | |
| j = plan.metric_junctions[0] | |
| assert j.step_id == "alto" | |
| assert j.artifact_type == ArtifactType.ALTO_XML | |
| assert j.candidate_metrics == () | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ExecutionPlan API | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestExecutionPlanAPI: | |
| def test_step_by_id(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=( | |
| PipelineStep( | |
| id="a", kind="ocr", adapter_name="x", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ), | |
| PipelineStep( | |
| id="b", kind="post", adapter_name="y", | |
| input_types=(ArtifactType.RAW_TEXT,), | |
| output_types=(ArtifactType.CORRECTED_TEXT,), | |
| ), | |
| ), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| a = plan.step_by_id("a") | |
| assert a is not None | |
| assert a.id == "a" | |
| assert plan.step_by_id("missing") is None | |
| def test_junctions_for_step(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=( | |
| PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="o", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ), | |
| PipelineStep( | |
| id="post", kind="post", adapter_name="p", | |
| input_types=(ArtifactType.RAW_TEXT,), | |
| output_types=(ArtifactType.CORRECTED_TEXT,), | |
| ), | |
| ), | |
| ) | |
| plan = PipelinePlanner( | |
| metric_registry=_registry_with_text_metric(), | |
| ).plan(spec) | |
| ocr_junctions = plan.junctions_for_step("ocr") | |
| assert len(ocr_junctions) == 1 | |
| assert ocr_junctions[0].artifact_type == ArtifactType.RAW_TEXT | |
| assert plan.junctions_for_step("missing") == () | |
| def test_dataclass_frozen(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="o", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| with pytest.raises(FrozenInstanceError): | |
| plan.spec = None # type: ignore[misc] | |
| def test_step_input_binding_frozen(self) -> None: | |
| b = StepInputBinding( | |
| input_type=ArtifactType.IMAGE, | |
| source_step_id="x", | |
| ) | |
| with pytest.raises(FrozenInstanceError): | |
| b.source_step_id = "y" # type: ignore[misc] | |
| def test_resolved_step_frozen(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="s", kind="k", adapter_name="a", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| plan = PipelinePlanner().plan(spec) | |
| rs = plan.resolved_steps[0] | |
| with pytest.raises(FrozenInstanceError): | |
| rs.step = None # type: ignore[misc] | |
| def test_metric_junction_frozen(self) -> None: | |
| j = MetricJunction( | |
| step_id="x", | |
| artifact_type=ArtifactType.RAW_TEXT, | |
| candidate_metrics=("cer",), | |
| ) | |
| with pytest.raises(FrozenInstanceError): | |
| j.candidate_metrics = () # type: ignore[misc] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # IntΓ©gration Planner + Executor | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPipelineExecutorWithPlanner: | |
| def test_executor_plan_returns_execution_plan(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| executor = PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| ) | |
| plan = executor.plan(spec) | |
| assert isinstance(plan, ExecutionPlan) | |
| assert len(plan.resolved_steps) == 1 | |
| def test_executor_plan_raises_pipeline_spec_invalid_on_bad_spec(self) -> None: | |
| spec = PipelineSpec(name="bad", steps=()) | |
| executor = PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| ) | |
| with pytest.raises(PipelineSpecInvalid, match="invalide"): | |
| executor.plan(spec) | |
| def test_run_plan_executes_pre_planned(self) -> None: | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| executor = PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| ) | |
| plan = executor.plan(spec) | |
| doc = DocumentRef(id="d1", image_uri="/tmp/img.png") | |
| ctx = RunContext( | |
| document_id="d1", | |
| code_version="1.0.0", | |
| pipeline_name="x", | |
| ) | |
| result = executor.run_plan( | |
| plan=plan, | |
| document=doc, | |
| initial_inputs={ | |
| ArtifactType.IMAGE: Artifact( | |
| id="d1:img", document_id="d1", type=ArtifactType.IMAGE, | |
| ), | |
| }, | |
| context=ctx, | |
| ) | |
| assert result.succeeded | |
| assert len(result.step_results) == 1 | |
| assert result.step_results[0].step_id == "ocr" | |
| def test_run_plan_rejects_non_plan(self) -> None: | |
| executor = PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| ) | |
| # ``PipelineExecutor.run_plan`` lève ``PicaronesError`` quand | |
| # l'argument ``plan`` n'est pas un ``ExecutionPlan``. | |
| from picarones.domain.errors import PicaronesError | |
| with pytest.raises(PicaronesError, match="ExecutionPlan"): | |
| executor.run_plan( | |
| plan="not a plan", # type: ignore[arg-type] | |
| document=DocumentRef(id="d1"), | |
| initial_inputs={}, | |
| context=RunContext( | |
| document_id="d1", code_version="1.0", | |
| pipeline_name="x", | |
| ), | |
| ) | |
| def test_run_spec_still_works_via_planning(self) -> None: | |
| """Sucre run(spec) β plan internement et exΓ©cute.""" | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| executor = PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| ) | |
| doc = DocumentRef(id="d1", image_uri="/tmp/img.png") | |
| ctx = RunContext( | |
| document_id="d1", | |
| code_version="1.0.0", | |
| pipeline_name="x", | |
| ) | |
| result = executor.run( | |
| spec=spec, | |
| document=doc, | |
| initial_inputs={ | |
| ArtifactType.IMAGE: Artifact( | |
| id="d1:img", document_id="d1", type=ArtifactType.IMAGE, | |
| ), | |
| }, | |
| context=ctx, | |
| ) | |
| assert result.succeeded | |
| def test_planner_injection(self) -> None: | |
| """Le caller peut injecter son propre planner (ex: avec | |
| MetricRegistry pour avoir les jonctions).""" | |
| custom_planner = PipelinePlanner( | |
| metric_registry=_registry_with_text_metric(), | |
| ) | |
| executor = PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| planner=custom_planner, | |
| ) | |
| spec = PipelineSpec( | |
| name="x", | |
| initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="ocr", kind="ocr", adapter_name="ocr_stub", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| plan = executor.plan(spec) | |
| assert plan.metric_junctions # non vide grΓ’ce au registry injectΓ© | |
| def test_planner_must_be_pipeline_planner(self) -> None: | |
| from picarones.domain.errors import PicaronesError | |
| with pytest.raises(PicaronesError, match="PipelinePlanner"): | |
| PipelineExecutor( | |
| adapter_resolver=lambda n: _OCRStub(), | |
| planner="not a planner", # type: ignore[arg-type] | |
| ) | |