Spaces:
Sleeping
Sleeping
Claude
feat(pipeline): Sprint A14-S8 — CorpusRunner backpressure + timeout réel + cancellation
823fb32 unverified | """Sprint A14-S8 — backpressure du ``CorpusRunner``. | |
| Vérifie que ``max_in_flight`` est respecté à tout instant : il n'y | |
| a jamais plus de N adapters qui tournent en parallèle, même sur | |
| des corpus de plusieurs centaines de documents. | |
| Stratégie : un stub d'adapter incrémente un compteur partagé au | |
| début de ``execute()``, le décrémente à la fin, et capture le | |
| maximum atteint. À la fin du run, on vérifie ``max_observed | |
| <= max_in_flight``. | |
| """ | |
| from __future__ import annotations | |
| import threading | |
| import time | |
| import pytest | |
| from picarones.domain import Artifact, ArtifactType, DocumentRef | |
| from picarones.pipeline import ( | |
| CorpusRunner, | |
| PipelineExecutor, | |
| PipelineSpec, | |
| PipelineStep, | |
| RunContext, | |
| ) | |
| class _ConcurrencyTrackingAdapter: | |
| """Adapter qui mesure la concurrence observée pendant son exécution.""" | |
| name = "tracking" | |
| input_types = frozenset({ArtifactType.IMAGE}) | |
| output_types = frozenset({ArtifactType.RAW_TEXT}) | |
| execution_mode = "io" | |
| def __init__(self, sleep_seconds: float = 0.01) -> None: | |
| self._sleep = sleep_seconds | |
| self._lock = threading.Lock() | |
| self._current = 0 | |
| self.max_observed = 0 | |
| def execute(self, inputs, params, context): | |
| with self._lock: | |
| self._current += 1 | |
| if self._current > self.max_observed: | |
| self.max_observed = self._current | |
| try: | |
| time.sleep(self._sleep) | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:raw_text", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| ), | |
| } | |
| finally: | |
| with self._lock: | |
| self._current -= 1 | |
| def _build(adapter, max_in_flight: int): | |
| registry = {"tracking": adapter} | |
| exe = PipelineExecutor(adapter_resolver=lambda n: registry[n]) | |
| runner = CorpusRunner( | |
| exe, | |
| max_in_flight=max_in_flight, | |
| timeout_seconds_per_doc=10.0, | |
| poll_interval_seconds=0.005, | |
| ) | |
| spec = PipelineSpec( | |
| name="bp", initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="s", kind="ocr", adapter_name="tracking", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| return runner, spec | |
| def _factories(): | |
| def inputs(doc): | |
| return {ArtifactType.IMAGE: Artifact( | |
| id=f"{doc.id}:image", | |
| document_id=doc.id, | |
| type=ArtifactType.IMAGE, | |
| uri=doc.image_uri, | |
| )} | |
| def ctx(doc): | |
| return RunContext( | |
| document_id=doc.id, | |
| code_version="1.0.0", | |
| pipeline_name="bp", | |
| ) | |
| return inputs, ctx | |
| def test_max_in_flight_respected(max_in_flight: int) -> None: | |
| adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.02) | |
| runner, spec = _build(adapter, max_in_flight=max_in_flight) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id=f"d{i}", image_uri=f"/tmp/{i}.png") for i in range(40)] | |
| result = runner.run(spec, docs, inputs, ctx, corpus_name="bp") | |
| assert result.n_documents == 40 | |
| assert result.n_succeeded == 40 | |
| # Garantie de backpressure : la concurrence n'a jamais excédé max. | |
| assert adapter.max_observed <= max_in_flight, ( | |
| f"max observed = {adapter.max_observed}, attendu <= {max_in_flight}" | |
| ) | |
| # Et la backpressure a effectivement saturé : on a bien atteint le | |
| # plafond (preuve qu'on parallélise vraiment). | |
| assert adapter.max_observed == max_in_flight, ( | |
| f"on aurait dû saturer à {max_in_flight}, observed " | |
| f"{adapter.max_observed}" | |
| ) | |
| def test_max_in_flight_one_means_sequential() -> None: | |
| adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.005) | |
| runner, spec = _build(adapter, max_in_flight=1) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id=f"d{i}") for i in range(20)] | |
| runner.run(spec, docs, inputs, ctx) | |
| assert adapter.max_observed == 1 | |
| def test_empty_corpus_returns_zero_outcomes() -> None: | |
| adapter = _ConcurrencyTrackingAdapter() | |
| runner, spec = _build(adapter, max_in_flight=4) | |
| inputs, ctx = _factories() | |
| result = runner.run(spec, [], inputs, ctx) | |
| assert result.n_documents == 0 | |
| assert result.outcomes == () | |
| assert adapter.max_observed == 0 | |
| def test_max_in_flight_zero_rejected() -> None: | |
| from picarones.domain import PicaronesError | |
| exe = PipelineExecutor(adapter_resolver=lambda n: None) | |
| with pytest.raises(PicaronesError, match="max_in_flight"): | |
| CorpusRunner(exe, max_in_flight=0) | |
| def test_negative_timeout_rejected() -> None: | |
| from picarones.domain import PicaronesError | |
| exe = PipelineExecutor(adapter_resolver=lambda n: None) | |
| with pytest.raises(PicaronesError, match="timeout"): | |
| CorpusRunner(exe, timeout_seconds_per_doc=0) | |