Spaces:
Sleeping
Sleeping
File size: 5,004 Bytes
823fb32 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | """Sprint A14-S8 — backpressure du ``CorpusRunner``.
Vérifie que ``max_in_flight`` est respecté à tout instant : il n'y
a jamais plus de N adapters qui tournent en parallèle, même sur
des corpus de plusieurs centaines de documents.
Stratégie : un stub d'adapter incrémente un compteur partagé au
début de ``execute()``, le décrémente à la fin, et capture le
maximum atteint. À la fin du run, on vérifie ``max_observed
<= max_in_flight``.
"""
from __future__ import annotations
import threading
import time
import pytest
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _ConcurrencyTrackingAdapter:
"""Adapter qui mesure la concurrence observée pendant son exécution."""
name = "tracking"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def __init__(self, sleep_seconds: float = 0.01) -> None:
self._sleep = sleep_seconds
self._lock = threading.Lock()
self._current = 0
self.max_observed = 0
def execute(self, inputs, params, context):
with self._lock:
self._current += 1
if self._current > self.max_observed:
self.max_observed = self._current
try:
time.sleep(self._sleep)
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
finally:
with self._lock:
self._current -= 1
def _build(adapter, max_in_flight: int):
registry = {"tracking": adapter}
exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
runner = CorpusRunner(
exe,
max_in_flight=max_in_flight,
timeout_seconds_per_doc=10.0,
poll_interval_seconds=0.005,
)
spec = PipelineSpec(
name="bp", initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="ocr", adapter_name="tracking",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
return runner, spec
def _factories():
def inputs(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image",
document_id=doc.id,
type=ArtifactType.IMAGE,
uri=doc.image_uri,
)}
def ctx(doc):
return RunContext(
document_id=doc.id,
code_version="1.0.0",
pipeline_name="bp",
)
return inputs, ctx
@pytest.mark.parametrize("max_in_flight", [1, 2, 4])
def test_max_in_flight_respected(max_in_flight: int) -> None:
adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.02)
runner, spec = _build(adapter, max_in_flight=max_in_flight)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}", image_uri=f"/tmp/{i}.png") for i in range(40)]
result = runner.run(spec, docs, inputs, ctx, corpus_name="bp")
assert result.n_documents == 40
assert result.n_succeeded == 40
# Garantie de backpressure : la concurrence n'a jamais excédé max.
assert adapter.max_observed <= max_in_flight, (
f"max observed = {adapter.max_observed}, attendu <= {max_in_flight}"
)
# Et la backpressure a effectivement saturé : on a bien atteint le
# plafond (preuve qu'on parallélise vraiment).
assert adapter.max_observed == max_in_flight, (
f"on aurait dû saturer à {max_in_flight}, observed "
f"{adapter.max_observed}"
)
def test_max_in_flight_one_means_sequential() -> None:
adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.005)
runner, spec = _build(adapter, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(20)]
runner.run(spec, docs, inputs, ctx)
assert adapter.max_observed == 1
def test_empty_corpus_returns_zero_outcomes() -> None:
adapter = _ConcurrencyTrackingAdapter()
runner, spec = _build(adapter, max_in_flight=4)
inputs, ctx = _factories()
result = runner.run(spec, [], inputs, ctx)
assert result.n_documents == 0
assert result.outcomes == ()
assert adapter.max_observed == 0
def test_max_in_flight_zero_rejected() -> None:
from picarones.domain import PicaronesError
exe = PipelineExecutor(adapter_resolver=lambda n: None)
with pytest.raises(PicaronesError, match="max_in_flight"):
CorpusRunner(exe, max_in_flight=0)
def test_negative_timeout_rejected() -> None:
from picarones.domain import PicaronesError
exe = PipelineExecutor(adapter_resolver=lambda n: None)
with pytest.raises(PicaronesError, match="timeout"):
CorpusRunner(exe, timeout_seconds_per_doc=0)
|