Spaces:
Sleeping
Sleeping
File size: 6,044 Bytes
3b65839 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """Sprint A14-S7 — mesure de temps par étape.
Vérifie que ``StepResult.duration_seconds`` reflète le temps réel
d'exécution de l'adapter (pas zéro, pas négatif), et que la durée
totale est cohérente avec la somme des étapes.
Définition de done : pipeline mock en moins de 100 ms.
"""
from __future__ import annotations
import time
import pytest
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _SlowStub:
"""Adapter qui dort un certain temps avant de retourner."""
def __init__(self, sleep_seconds: float) -> None:
self._sleep = sleep_seconds
name = "slow"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "cpu"
def execute(self, inputs, params, context):
time.sleep(self._sleep)
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:slow:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
produced_by_step="slow",
),
}
class _InstantStub:
name = "instant"
input_types = frozenset({ArtifactType.RAW_TEXT})
output_types = frozenset({ArtifactType.CORRECTED_TEXT})
execution_mode = "io"
def execute(self, inputs, params, context):
return {
ArtifactType.CORRECTED_TEXT: Artifact(
id=f"{context.document_id}:instant:corrected",
document_id=context.document_id,
type=ArtifactType.CORRECTED_TEXT,
produced_by_step="instant",
),
}
@pytest.fixture
def doc() -> DocumentRef:
return DocumentRef(id="d1", image_uri="/tmp/x.png")
@pytest.fixture
def ctx() -> RunContext:
return RunContext(
document_id="d1", code_version="1.0.0", pipeline_name="timing",
)
@pytest.fixture
def image_artifact() -> Artifact:
return Artifact(
id="d1:image", document_id="d1", type=ArtifactType.IMAGE,
uri="/tmp/x.png",
)
def _spec_two_steps() -> PipelineSpec:
return PipelineSpec(
name="timing",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="slow", kind="ocr", adapter_name="slow",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
PipelineStep(
id="instant", kind="post_correction",
adapter_name="instant",
input_types=(ArtifactType.RAW_TEXT,),
output_types=(ArtifactType.CORRECTED_TEXT,),
inputs_from={ArtifactType.RAW_TEXT: "slow"},
),
),
)
class TestExecutorTiming:
def test_step_duration_reflects_sleep(
self, doc, ctx, image_artifact,
) -> None:
registry = {"slow": _SlowStub(0.05), "instant": _InstantStub()}
executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
result = executor.run(
_spec_two_steps(), doc,
{ArtifactType.IMAGE: image_artifact}, ctx,
)
assert result.succeeded
slow_dur = result.step_result_by_id("slow").duration_seconds # type: ignore[union-attr]
# Marges larges pour absorber le bruit OS.
assert 0.04 < slow_dur < 0.5
def test_total_duration_at_least_sum_of_steps(
self, doc, ctx, image_artifact,
) -> None:
registry = {"slow": _SlowStub(0.02), "instant": _InstantStub()}
executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
result = executor.run(
_spec_two_steps(), doc,
{ArtifactType.IMAGE: image_artifact}, ctx,
)
sum_steps = sum(r.duration_seconds for r in result.step_results)
# Le total inclut l'overhead orchestration → légèrement >.
assert result.duration_seconds >= sum_steps - 0.01
# Marge raisonnable pour ne pas exploser à cause du timing.
assert result.duration_seconds < sum_steps + 0.5
def test_duration_is_non_negative_even_on_failure(
self, doc, ctx, image_artifact,
) -> None:
class _Crasher:
name = "crash"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "cpu"
def execute(self, *a, **kw):
raise RuntimeError("boom")
registry = {"crash": _Crasher()}
executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
spec = PipelineSpec(
name="crashing",
initial_inputs=(ArtifactType.IMAGE,),
steps=(
PipelineStep(
id="bad", kind="ocr", adapter_name="crash",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),
),
)
result = executor.run(
spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
)
assert not result.succeeded
assert result.step_results[0].duration_seconds >= 0.0
def test_def_of_done_under_100ms(
self, doc, ctx, image_artifact,
) -> None:
"""Définition de done du S7 : pipeline mock en < 100ms."""
registry = {
"slow": _SlowStub(0.0), # pas de sleep
"instant": _InstantStub(),
}
executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
t0 = time.perf_counter()
result = executor.run(
_spec_two_steps(), doc,
{ArtifactType.IMAGE: image_artifact}, ctx,
)
elapsed = time.perf_counter() - t0
assert result.succeeded
# Marge généreuse pour la CI : 100ms est largement atteignable.
assert elapsed < 0.1, f"trop lent : {elapsed * 1000:.2f}ms"
|