Picarones / tests /pipeline /test_sprint_a14_s8_backpressure.py
Claude
feat(pipeline): Sprint A14-S8 — CorpusRunner backpressure + timeout réel + cancellation
823fb32 unverified
Raw
History Blame
5 kB
"""Sprint A14-S8 — backpressure du ``CorpusRunner``.
Vérifie que ``max_in_flight`` est respecté à tout instant : il n'y
a jamais plus de N adapters qui tournent en parallèle, même sur
des corpus de plusieurs centaines de documents.
Stratégie : un stub d'adapter incrémente un compteur partagé au
début de ``execute()``, le décrémente à la fin, et capture le
maximum atteint. À la fin du run, on vérifie ``max_observed
<= max_in_flight``.
"""
from __future__ import annotations
import threading
import time
import pytest
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _ConcurrencyTrackingAdapter:
"""Adapter qui mesure la concurrence observée pendant son exécution."""
name = "tracking"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def __init__(self, sleep_seconds: float = 0.01) -> None:
self._sleep = sleep_seconds
self._lock = threading.Lock()
self._current = 0
self.max_observed = 0
def execute(self, inputs, params, context):
with self._lock:
self._current += 1
if self._current > self.max_observed:
self.max_observed = self._current
try:
time.sleep(self._sleep)
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
finally:
with self._lock:
self._current -= 1
def _build(adapter, max_in_flight: int):
registry = {"tracking": adapter}
exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
runner = CorpusRunner(
exe,
max_in_flight=max_in_flight,
timeout_seconds_per_doc=10.0,
poll_interval_seconds=0.005,
)
spec = PipelineSpec(
name="bp", initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="ocr", adapter_name="tracking",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
return runner, spec
def _factories():
def inputs(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image",
document_id=doc.id,
type=ArtifactType.IMAGE,
uri=doc.image_uri,
)}
def ctx(doc):
return RunContext(
document_id=doc.id,
code_version="1.0.0",
pipeline_name="bp",
)
return inputs, ctx
@pytest.mark.parametrize("max_in_flight", [1, 2, 4])
def test_max_in_flight_respected(max_in_flight: int) -> None:
adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.02)
runner, spec = _build(adapter, max_in_flight=max_in_flight)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}", image_uri=f"/tmp/{i}.png") for i in range(40)]
result = runner.run(spec, docs, inputs, ctx, corpus_name="bp")
assert result.n_documents == 40
assert result.n_succeeded == 40
# Garantie de backpressure : la concurrence n'a jamais excédé max.
assert adapter.max_observed <= max_in_flight, (
f"max observed = {adapter.max_observed}, attendu <= {max_in_flight}"
)
# Et la backpressure a effectivement saturé : on a bien atteint le
# plafond (preuve qu'on parallélise vraiment).
assert adapter.max_observed == max_in_flight, (
f"on aurait dû saturer à {max_in_flight}, observed "
f"{adapter.max_observed}"
)
def test_max_in_flight_one_means_sequential() -> None:
adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.005)
runner, spec = _build(adapter, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(20)]
runner.run(spec, docs, inputs, ctx)
assert adapter.max_observed == 1
def test_empty_corpus_returns_zero_outcomes() -> None:
adapter = _ConcurrencyTrackingAdapter()
runner, spec = _build(adapter, max_in_flight=4)
inputs, ctx = _factories()
result = runner.run(spec, [], inputs, ctx)
assert result.n_documents == 0
assert result.outcomes == ()
assert adapter.max_observed == 0
def test_max_in_flight_zero_rejected() -> None:
from picarones.domain import PicaronesError
exe = PipelineExecutor(adapter_resolver=lambda n: None)
with pytest.raises(PicaronesError, match="max_in_flight"):
CorpusRunner(exe, max_in_flight=0)
def test_negative_timeout_rejected() -> None:
from picarones.domain import PicaronesError
exe = PipelineExecutor(adapter_resolver=lambda n: None)
with pytest.raises(PicaronesError, match="timeout"):
CorpusRunner(exe, timeout_seconds_per_doc=0)