Spaces:
Sleeping
Sleeping
File size: 5,106 Bytes
823fb32 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | """Sprint A14-S8 — annulation propre du ``CorpusRunner``.
Vérifie qu'un ``threading.Event`` partagé permet au caller
(typiquement un endpoint FastAPI ``cancel``) de signaler l'arrêt.
Les futures non démarrées sont annulées proprement, les futures
en cours se terminent (Python ne permet pas de tuer un thread).
"""
from __future__ import annotations
import threading
import time
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _EventAwareAdapter:
"""Adapter qui dort par petites tranches et signale qu'il a démarré."""
name = "event"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def __init__(
self,
sleep_seconds: float,
started_event: threading.Event | None = None,
) -> None:
self._sleep = sleep_seconds
self._started = started_event
def execute(self, inputs, params, context):
if self._started is not None:
self._started.set()
time.sleep(self._sleep)
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
def _build(adapter, max_in_flight: int = 1):
registry = {"event": adapter}
exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
runner = CorpusRunner(
exe,
max_in_flight=max_in_flight,
timeout_seconds_per_doc=10.0,
poll_interval_seconds=0.01,
)
spec = PipelineSpec(
name="c", initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="ocr", adapter_name="event",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
return runner, spec
def _factories():
def inputs(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image",
document_id=doc.id,
type=ArtifactType.IMAGE,
)}
def ctx(doc):
return RunContext(
document_id=doc.id, code_version="1.0.0", pipeline_name="c",
)
return inputs, ctx
def test_cancel_before_run_yields_zero_progress() -> None:
"""Cancel signalé avant le run → aucun doc ne démarre."""
adapter = _EventAwareAdapter(sleep_seconds=1.0)
runner, spec = _build(adapter, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(10)]
cancel_event = threading.Event()
cancel_event.set() # déjà signalé
result = runner.run(
spec, docs, inputs, ctx, cancel_event=cancel_event,
)
# Tous les docs sont cancelled (ou en partie cancelled si
# quelques-uns ont eu le temps d'être amorcés avant la
# première itération de la boucle).
assert result.n_succeeded == 0
def test_cancel_during_run_stops_pending_docs() -> None:
"""Cancel signalé pendant l'exécution → les docs en attente sont
annulés, ceux en cours se terminent."""
started = threading.Event()
adapter = _EventAwareAdapter(sleep_seconds=0.1, started_event=started)
runner, spec = _build(adapter, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(20)]
cancel_event = threading.Event()
def _trigger_cancel():
# Attendre que le premier doc démarre, puis annuler.
started.wait(timeout=2.0)
cancel_event.set()
canceller = threading.Thread(target=_trigger_cancel, daemon=True)
canceller.start()
t0 = time.perf_counter()
result = runner.run(
spec, docs, inputs, ctx, cancel_event=cancel_event,
)
elapsed = time.perf_counter() - t0
canceller.join(timeout=1.0)
# On a au plus quelques docs réussis (ceux qui ont démarré avant
# la cancellation), et le reste cancellé. Pas tous succeeded.
assert result.n_succeeded < len(docs)
# Le run ne dure pas 20 * 0.1 = 2s ; il s'arrête bien plus tôt
# grâce à la cancellation.
assert elapsed < 1.5, f"cancellation trop lente : {elapsed:.2f}s"
def test_cancel_returns_well_formed_result() -> None:
"""Même en cas de cancel, le ``CorpusRunResult`` reste cohérent
(n_succeeded + n_failed + n_timed_out + n_cancelled <=
n_documents, outcomes correspondants)."""
adapter = _EventAwareAdapter(sleep_seconds=0.5)
runner, spec = _build(adapter, max_in_flight=2)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(10)]
cancel_event = threading.Event()
cancel_event.set()
result = runner.run(
spec, docs, inputs, ctx, cancel_event=cancel_event,
)
total = (
result.n_succeeded + result.n_failed
+ result.n_timed_out + result.n_cancelled
)
assert total <= result.n_documents
assert len(result.outcomes) == total
|