File size: 6,044 Bytes
3b65839
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Sprint A14-S7 — mesure de temps par étape.

Vérifie que ``StepResult.duration_seconds`` reflète le temps réel
d'exécution de l'adapter (pas zéro, pas négatif), et que la durée
totale est cohérente avec la somme des étapes.

Définition de done : pipeline mock en moins de 100 ms.
"""

from __future__ import annotations

import time

import pytest

from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
    PipelineExecutor,
    PipelineSpec,
    PipelineStep,
    RunContext,
)


class _SlowStub:
    """Adapter qui dort un certain temps avant de retourner."""

    def __init__(self, sleep_seconds: float) -> None:
        self._sleep = sleep_seconds

    name = "slow"
    input_types = frozenset({ArtifactType.IMAGE})
    output_types = frozenset({ArtifactType.RAW_TEXT})
    execution_mode = "cpu"

    def execute(self, inputs, params, context):
        time.sleep(self._sleep)
        return {
            ArtifactType.RAW_TEXT: Artifact(
                id=f"{context.document_id}:slow:raw_text",
                document_id=context.document_id,
                type=ArtifactType.RAW_TEXT,
                produced_by_step="slow",
            ),
        }


class _InstantStub:
    name = "instant"
    input_types = frozenset({ArtifactType.RAW_TEXT})
    output_types = frozenset({ArtifactType.CORRECTED_TEXT})
    execution_mode = "io"

    def execute(self, inputs, params, context):
        return {
            ArtifactType.CORRECTED_TEXT: Artifact(
                id=f"{context.document_id}:instant:corrected",
                document_id=context.document_id,
                type=ArtifactType.CORRECTED_TEXT,
                produced_by_step="instant",
            ),
        }


@pytest.fixture
def doc() -> DocumentRef:
    return DocumentRef(id="d1", image_uri="/tmp/x.png")


@pytest.fixture
def ctx() -> RunContext:
    return RunContext(
        document_id="d1", code_version="1.0.0", pipeline_name="timing",
    )


@pytest.fixture
def image_artifact() -> Artifact:
    return Artifact(
        id="d1:image", document_id="d1", type=ArtifactType.IMAGE,
        uri="/tmp/x.png",
    )


def _spec_two_steps() -> PipelineSpec:
    return PipelineSpec(
        name="timing",
        initial_inputs=(ArtifactType.IMAGE,),
        steps=(
            PipelineStep(
                id="slow", kind="ocr", adapter_name="slow",
                input_types=(ArtifactType.IMAGE,),
                output_types=(ArtifactType.RAW_TEXT,),
            ),
            PipelineStep(
                id="instant", kind="post_correction",
                adapter_name="instant",
                input_types=(ArtifactType.RAW_TEXT,),
                output_types=(ArtifactType.CORRECTED_TEXT,),
                inputs_from={ArtifactType.RAW_TEXT: "slow"},
            ),
        ),
    )


class TestExecutorTiming:
    def test_step_duration_reflects_sleep(
        self, doc, ctx, image_artifact,
    ) -> None:
        registry = {"slow": _SlowStub(0.05), "instant": _InstantStub()}
        executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])

        result = executor.run(
            _spec_two_steps(), doc,
            {ArtifactType.IMAGE: image_artifact}, ctx,
        )
        assert result.succeeded
        slow_dur = result.step_result_by_id("slow").duration_seconds  # type: ignore[union-attr]
        # Marges larges pour absorber le bruit OS.
        assert 0.04 < slow_dur < 0.5

    def test_total_duration_at_least_sum_of_steps(
        self, doc, ctx, image_artifact,
    ) -> None:
        registry = {"slow": _SlowStub(0.02), "instant": _InstantStub()}
        executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])

        result = executor.run(
            _spec_two_steps(), doc,
            {ArtifactType.IMAGE: image_artifact}, ctx,
        )
        sum_steps = sum(r.duration_seconds for r in result.step_results)
        # Le total inclut l'overhead orchestration → légèrement >.
        assert result.duration_seconds >= sum_steps - 0.01
        # Marge raisonnable pour ne pas exploser à cause du timing.
        assert result.duration_seconds < sum_steps + 0.5

    def test_duration_is_non_negative_even_on_failure(
        self, doc, ctx, image_artifact,
    ) -> None:
        class _Crasher:
            name = "crash"
            input_types = frozenset({ArtifactType.IMAGE})
            output_types = frozenset({ArtifactType.RAW_TEXT})
            execution_mode = "cpu"

            def execute(self, *a, **kw):
                raise RuntimeError("boom")

        registry = {"crash": _Crasher()}
        executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
        spec = PipelineSpec(
            name="crashing",
            initial_inputs=(ArtifactType.IMAGE,),
            steps=(
                PipelineStep(
                    id="bad", kind="ocr", adapter_name="crash",
                    input_types=(ArtifactType.IMAGE,),
                    output_types=(ArtifactType.RAW_TEXT,),
                ),
            ),
        )
        result = executor.run(
            spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
        )
        assert not result.succeeded
        assert result.step_results[0].duration_seconds >= 0.0

    def test_def_of_done_under_100ms(
        self, doc, ctx, image_artifact,
    ) -> None:
        """Définition de done du S7 : pipeline mock en < 100ms."""
        registry = {
            "slow": _SlowStub(0.0),  # pas de sleep
            "instant": _InstantStub(),
        }
        executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])

        t0 = time.perf_counter()
        result = executor.run(
            _spec_two_steps(), doc,
            {ArtifactType.IMAGE: image_artifact}, ctx,
        )
        elapsed = time.perf_counter() - t0

        assert result.succeeded
        # Marge généreuse pour la CI : 100ms est largement atteignable.
        assert elapsed < 0.1, f"trop lent : {elapsed * 1000:.2f}ms"