File size: 5,004 Bytes
823fb32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Sprint A14-S8 — backpressure du ``CorpusRunner``.

Vérifie que ``max_in_flight`` est respecté à tout instant : il n'y
a jamais plus de N adapters qui tournent en parallèle, même sur
des corpus de plusieurs centaines de documents.

Stratégie : un stub d'adapter incrémente un compteur partagé au
début de ``execute()``, le décrémente à la fin, et capture le
maximum atteint.  À la fin du run, on vérifie ``max_observed
<= max_in_flight``.
"""

from __future__ import annotations

import threading
import time

import pytest

from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
    CorpusRunner,
    PipelineExecutor,
    PipelineSpec,
    PipelineStep,
    RunContext,
)


class _ConcurrencyTrackingAdapter:
    """Adapter qui mesure la concurrence observée pendant son exécution."""

    name = "tracking"
    input_types = frozenset({ArtifactType.IMAGE})
    output_types = frozenset({ArtifactType.RAW_TEXT})
    execution_mode = "io"

    def __init__(self, sleep_seconds: float = 0.01) -> None:
        self._sleep = sleep_seconds
        self._lock = threading.Lock()
        self._current = 0
        self.max_observed = 0

    def execute(self, inputs, params, context):
        with self._lock:
            self._current += 1
            if self._current > self.max_observed:
                self.max_observed = self._current
        try:
            time.sleep(self._sleep)
            return {
                ArtifactType.RAW_TEXT: Artifact(
                    id=f"{context.document_id}:raw_text",
                    document_id=context.document_id,
                    type=ArtifactType.RAW_TEXT,
                ),
            }
        finally:
            with self._lock:
                self._current -= 1


def _build(adapter, max_in_flight: int):
    registry = {"tracking": adapter}
    exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
    runner = CorpusRunner(
        exe,
        max_in_flight=max_in_flight,
        timeout_seconds_per_doc=10.0,
        poll_interval_seconds=0.005,
    )
    spec = PipelineSpec(
        name="bp", initial_inputs=(ArtifactType.IMAGE,),
        steps=(PipelineStep(
            id="s", kind="ocr", adapter_name="tracking",
            input_types=(ArtifactType.IMAGE,),
            output_types=(ArtifactType.RAW_TEXT,),
        ),),
    )
    return runner, spec


def _factories():
    def inputs(doc):
        return {ArtifactType.IMAGE: Artifact(
            id=f"{doc.id}:image",
            document_id=doc.id,
            type=ArtifactType.IMAGE,
            uri=doc.image_uri,
        )}

    def ctx(doc):
        return RunContext(
            document_id=doc.id,
            code_version="1.0.0",
            pipeline_name="bp",
        )
    return inputs, ctx


@pytest.mark.parametrize("max_in_flight", [1, 2, 4])
def test_max_in_flight_respected(max_in_flight: int) -> None:
    adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.02)
    runner, spec = _build(adapter, max_in_flight=max_in_flight)
    inputs, ctx = _factories()
    docs = [DocumentRef(id=f"d{i}", image_uri=f"/tmp/{i}.png") for i in range(40)]

    result = runner.run(spec, docs, inputs, ctx, corpus_name="bp")

    assert result.n_documents == 40
    assert result.n_succeeded == 40
    # Garantie de backpressure : la concurrence n'a jamais excédé max.
    assert adapter.max_observed <= max_in_flight, (
        f"max observed = {adapter.max_observed}, attendu <= {max_in_flight}"
    )
    # Et la backpressure a effectivement saturé : on a bien atteint le
    # plafond (preuve qu'on parallélise vraiment).
    assert adapter.max_observed == max_in_flight, (
        f"on aurait dû saturer à {max_in_flight}, observed "
        f"{adapter.max_observed}"
    )


def test_max_in_flight_one_means_sequential() -> None:
    adapter = _ConcurrencyTrackingAdapter(sleep_seconds=0.005)
    runner, spec = _build(adapter, max_in_flight=1)
    inputs, ctx = _factories()
    docs = [DocumentRef(id=f"d{i}") for i in range(20)]

    runner.run(spec, docs, inputs, ctx)
    assert adapter.max_observed == 1


def test_empty_corpus_returns_zero_outcomes() -> None:
    adapter = _ConcurrencyTrackingAdapter()
    runner, spec = _build(adapter, max_in_flight=4)
    inputs, ctx = _factories()

    result = runner.run(spec, [], inputs, ctx)
    assert result.n_documents == 0
    assert result.outcomes == ()
    assert adapter.max_observed == 0


def test_max_in_flight_zero_rejected() -> None:
    from picarones.domain import PicaronesError
    exe = PipelineExecutor(adapter_resolver=lambda n: None)
    with pytest.raises(PicaronesError, match="max_in_flight"):
        CorpusRunner(exe, max_in_flight=0)


def test_negative_timeout_rejected() -> None:
    from picarones.domain import PicaronesError
    exe = PipelineExecutor(adapter_resolver=lambda n: None)
    with pytest.raises(PicaronesError, match="timeout"):
        CorpusRunner(exe, timeout_seconds_per_doc=0)