Claude
feat: Sprint A14-S57 β€” Wave F clΓ΄ture audit (issues #15 #16 #21 #23 #24 #25 #26 #30)
7d68969 unverified
Raw
History Blame
19.1 kB
"""``CorpusRunner`` β€” Sprint A14-S8.
Orchestre l'exΓ©cution d'une ``PipelineSpec`` sur un corpus complet
avec trois propriΓ©tΓ©s critiques que l'ancien
``measurements.runner`` ne garantissait pas correctement :
1. **Backpressure** β€” pas de "submit all upfront". L'orchestrateur
ne soumet jamais plus de ``max_in_flight`` documents en
parallΓ¨le. RAM bornΓ©e mΓͺme sur des corpus de plusieurs milliers
de documents.
2. **Timeout depuis le dΓ©but d'exΓ©cution rΓ©elle** β€” l'ancien runner
calculait le timeout depuis la submission au pool, donc un
document pouvait Γͺtre marquΓ© timeout parce qu'il avait passΓ©
N secondes en queue, pas N secondes en train de tourner. Le
nouveau runner mesure depuis le moment oΓΉ le worker dΓ©marre
rΓ©ellement.
3. **Annulation propre** β€” un ``threading.Event`` partagΓ© permet
au caller (typiquement un service applicatif sur un endpoint
FastAPI ``cancel``) de signaler l'arrΓͺt. Les workers
coopΓ©ratifs vΓ©rifient l'event ; les futures non dΓ©marrΓ©es sont
sautΓ©es ; les futures dΓ©jΓ  en cours se terminent (Python ne
permet pas de tuer un thread en cours).
Limites assumΓ©es pour S8
------------------------
- **Mode threads uniquement.** Le mode process (``ProcessPoolExecutor``)
ajoutΓ© au S11 quand on dΓ©placera les adapters CPU-bound.
Aujourd'hui, un adapter Tesseract local en thread fonctionne
(le GIL est relΓ’chΓ© par le sous-processus pytesseract β†’ OK).
- **Pas de kill-thread garanti.** Si un adapter ne coopère pas avec
``cancel_event`` et fait un appel C bloquant non-interruptible,
le runner attend la fin naturelle. C'est documentΓ©.
- **Pas de retry automatique.** Si un adapter Γ©choue, le doc est
marquΓ© en Γ©chec et on passe au suivant.
DΓ©finition de done
------------------
``CorpusRunner.run(spec, 1000 docs synthΓ©tiques)`` se termine en
moins de 10 minutes sans dΓ©passer 500 MB de RAM rΓ©sidente. Le
test ``test_sprint_a14_s8_def_of_done`` valide ce critère
(Γ©chantillon paramΓ©trable pour CI rapide).
"""
from __future__ import annotations
import concurrent.futures
import logging
import threading
import time
from collections.abc import Iterable
from typing import Callable
from pydantic import BaseModel, ConfigDict, Field
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.documents import DocumentRef
from picarones.domain.errors import PicaronesError
from picarones.pipeline.executor import PipelineExecutor
from picarones.domain.pipeline_spec import PipelineSpec
from picarones.pipeline.types import PipelineResult, RunContext
logger = logging.getLogger(__name__)
#: Factories injectΓ©es par le caller pour adapter le runner Γ 
#: son contexte (corpus local, IIIF, HF, etc.).
InitialInputsFactory = Callable[
[DocumentRef],
dict[ArtifactType, Artifact],
]
ContextFactory = Callable[[DocumentRef], RunContext]
class DocumentOutcome(BaseModel):
"""RΓ©sultat de l'exΓ©cution d'une pipeline sur **un** document.
Distinct de ``PipelineResult`` : porte un statut
(``"succeeded"`` / ``"failed"`` / ``"timed_out"`` /
``"cancelled"``) et conserve le ``PipelineResult`` quand il
existe (peut Γͺtre ``None`` si annulation avant dΓ©marrage).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
document_id: str
status: str = Field(pattern=r"^(succeeded|failed|timed_out|cancelled)$")
duration_seconds: float = Field(ge=0.0)
error: str | None = None
pipeline_result: PipelineResult | None = None
class CorpusRunResult(BaseModel):
"""RΓ©sultat agrΓ©gΓ© d'un run de corpus.
Attributs
---------
pipeline_name:
Nom de la pipeline exΓ©cutΓ©e.
corpus_name:
Nom du corpus (libre, fourni par le caller).
n_documents:
Nombre total de documents tentΓ©s.
n_succeeded:
Nombre de documents pour lesquels la pipeline a complètement
rΓ©ussi (``PipelineResult.succeeded == True``).
n_failed:
Nombre de documents avec au moins une Γ©tape en Γ©chec.
n_timed_out:
Nombre de documents tuΓ©s par timeout.
n_cancelled:
Nombre de documents jamais dΓ©marrΓ©s (cancel_event signalΓ©
avant leur tour).
duration_seconds:
Wall-clock total du run.
outcomes:
Détail document par document, ordre d'achèvement.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
pipeline_name: str
corpus_name: str
n_documents: int = Field(ge=0)
n_succeeded: int = Field(ge=0)
n_failed: int = Field(ge=0)
n_timed_out: int = Field(ge=0)
n_cancelled: int = Field(ge=0)
duration_seconds: float = Field(ge=0.0)
outcomes: tuple[DocumentOutcome, ...] = Field(default_factory=tuple)
class CorpusRunner:
"""Orchestre ``PipelineExecutor`` sur un corpus avec backpressure
+ timeout rΓ©el + cancellation.
Une instance est rΓ©utilisable Γ  travers plusieurs runs.
"""
def __init__(
self,
executor: PipelineExecutor,
max_in_flight: int = 4,
timeout_seconds_per_doc: float = 300.0,
poll_interval_seconds: float = 0.05,
) -> None:
if max_in_flight < 1:
raise PicaronesError(
f"max_in_flight doit Γͺtre >= 1 (reΓ§u {max_in_flight})."
)
if timeout_seconds_per_doc <= 0:
raise PicaronesError(
f"timeout_seconds_per_doc doit Γͺtre > 0 (reΓ§u "
f"{timeout_seconds_per_doc})."
)
if poll_interval_seconds <= 0:
raise PicaronesError(
"poll_interval_seconds doit Γͺtre > 0."
)
self._executor = executor
self._max_in_flight = max_in_flight
self._timeout = timeout_seconds_per_doc
self._poll = poll_interval_seconds
def run(
self,
spec: PipelineSpec,
documents: Iterable[DocumentRef],
initial_inputs_factory: InitialInputsFactory,
context_factory: ContextFactory,
corpus_name: str = "corpus",
cancel_event: threading.Event | None = None,
) -> CorpusRunResult:
"""ExΓ©cute ``spec`` sur tous les ``documents`` du corpus.
Returns
-------
CorpusRunResult
Résultat agrégé. Ne lève jamais — toute erreur d'un
document est capturΓ©e dans son ``DocumentOutcome``.
"""
documents_list = list(documents)
run_started = time.perf_counter()
# Γ‰tat partagΓ© entre threads : ``started_at[doc_id]`` =
# monotonic au moment oΓΉ le worker du doc a vraiment dΓ©marrΓ©
# ``execute()``. L'orchestrateur lit ce dict pour dΓ©cider
# d'un timeout depuis le dΓ©but d'exΓ©cution rΓ©elle.
started_at: dict[str, float] = {}
started_at_lock = threading.Lock()
outcomes: list[DocumentOutcome] = []
# Fast path : aucun document β†’ rΓ©sultat vide immΓ©diat.
if not documents_list:
return CorpusRunResult(
pipeline_name=spec.name,
corpus_name=corpus_name,
n_documents=0,
n_succeeded=0,
n_failed=0,
n_timed_out=0,
n_cancelled=0,
duration_seconds=0.0,
outcomes=(),
)
# S28 : on planifie une seule fois pour la spec. Si la spec
# est invalide, on lève maintenant — pas dans chaque worker.
# Les workers consomment ensuite ``executor.run_plan(plan, ...)``
# β†’ N-1 validations Γ©conomisΓ©es.
plan = self._executor.plan(spec)
# Pool instanciΓ© explicitement avec ``shutdown(wait=False,
# cancel_futures=True)`` Γ  la sortie : les futures en queue
# sont annulΓ©es, les threads en cours continuent en
# arrière-plan jusqu'à leur fin naturelle (Python ne permet
# pas de tuer un thread). Le caller récupère le résultat
# immédiatement après le timeout / la cancellation, sans
# attendre que les threads en cours se terminent β€” c'est
# critique pour la latence perΓ§ue du runner.
pool = concurrent.futures.ThreadPoolExecutor(
max_workers=self._max_in_flight,
thread_name_prefix=f"picarones-{spec.name}",
)
try:
future_to_doc: dict[concurrent.futures.Future, DocumentRef] = {}
doc_iter = iter(documents_list)
in_flight = 0
done_count = 0
def _submit_next() -> bool:
"""Tente de soumettre le prochain document au pool.
Retourne ``True`` si un doc a Γ©tΓ© soumis,
``False`` si l'itΓ©rateur est Γ©puisΓ© ou si
cancel_event est signalΓ©.
"""
nonlocal in_flight
if cancel_event is not None and cancel_event.is_set():
return False
try:
doc = next(doc_iter)
except StopIteration:
return False
fut = pool.submit(
self._run_one,
plan=plan,
document=doc,
initial_inputs_factory=initial_inputs_factory,
context_factory=context_factory,
started_at=started_at,
started_at_lock=started_at_lock,
)
future_to_doc[fut] = doc
in_flight += 1
return True
# 1. Amorcer le pool : ne pas dΓ©passer max_in_flight.
for _ in range(self._max_in_flight):
if not _submit_next():
break
# 2. Boucle principale : rΓ©colter les rΓ©sultats, surveiller
# les timeouts, soumettre le suivant Γ  chaque libΓ©ration.
while future_to_doc:
# Polling court pour pouvoir vΓ©rifier les timeouts en
# parallèle des completions naturelles.
done_set, _ = concurrent.futures.wait(
future_to_doc.keys(),
timeout=self._poll,
return_when=concurrent.futures.FIRST_COMPLETED,
)
# 2a. RΓ©colter les futures terminΓ©es.
for fut in done_set:
doc = future_to_doc.pop(fut)
in_flight -= 1
outcomes.append(_outcome_from_future(fut, doc))
done_count += 1
# Soumettre le suivant pour maintenir la backpressure.
_submit_next()
# 2b. VΓ©rifier les timeouts depuis le dΓ©but d'exΓ©cution
# rΓ©elle (pas depuis la submission).
now = time.monotonic()
timed_out_futures: list[concurrent.futures.Future] = []
with started_at_lock:
started_snapshot = dict(started_at)
for fut, doc in list(future_to_doc.items()):
started = started_snapshot.get(doc.id)
if started is None:
continue # pas encore dΓ©marrΓ© β†’ pas de timeout
if now - started > self._timeout:
timed_out_futures.append(fut)
for fut in timed_out_futures:
doc = future_to_doc.pop(fut)
in_flight -= 1
# On ne peut pas vraiment killer un thread en
# Python ; on signale via cancel_event si fourni
# ET on enregistre le timeout immΓ©diatement (le
# thread continuera en arrière-plan jusqu'à ce
# qu'il ait fini, mais le run principal n'attend
# plus son rΓ©sultat).
duration = (
now - started_snapshot.get(doc.id, now)
)
outcomes.append(DocumentOutcome(
document_id=doc.id,
status="timed_out",
duration_seconds=max(duration, 0.0),
error=(
f"timeout: doc {doc.id} a dΓ©passΓ© "
f"{self._timeout:.1f}s d'exΓ©cution rΓ©elle"
),
))
done_count += 1
_submit_next()
# 2c. Cancellation explicite : marquer toutes les
# futures non dΓ©marrΓ©es comme annulΓ©es.
if cancel_event is not None and cancel_event.is_set():
cancelled = []
with started_at_lock:
started_snapshot = dict(started_at)
for fut, doc in list(future_to_doc.items()):
if doc.id not in started_snapshot:
# Future encore en queue β†’ on peut la
# canceller proprement.
if fut.cancel():
cancelled.append(doc)
future_to_doc.pop(fut, None)
in_flight -= 1
for doc in cancelled:
outcomes.append(DocumentOutcome(
document_id=doc.id,
status="cancelled",
duration_seconds=0.0,
error="cancelled before start",
))
finally:
# Sortie immΓ©diate : on ne bloque pas sur les threads en
# cours. Les futures en queue sont annulΓ©es, les threads
# dΓ©jΓ  actifs continuent jusqu'Γ  leur fin naturelle (cf.
# commentaire Γ  l'instanciation du pool).
pool.shutdown(wait=False, cancel_futures=True)
# 3. AgrΓ©gation finale.
run_duration = time.perf_counter() - run_started
return _aggregate(
pipeline_name=spec.name,
corpus_name=corpus_name,
n_documents=len(documents_list),
outcomes=outcomes,
duration_seconds=run_duration,
)
# ──────────────────────────────────────────────────────────────────
# Worker
# ──────────────────────────────────────────────────────────────────
def _run_one(
self,
*,
plan, # ExecutionPlan ; type omis pour Γ©viter l'import top-level
document: DocumentRef,
initial_inputs_factory: InitialInputsFactory,
context_factory: ContextFactory,
started_at: dict[str, float],
started_at_lock: threading.Lock,
) -> PipelineResult:
"""ExΓ©cute le plan prΓ©-calculΓ© sur un document. AppelΓ© dans
un thread du pool.
Enregistre ``started_at[doc.id]`` au tout dΓ©but pour que
l'orchestrateur puisse mesurer le timeout depuis le dΓ©but
d'exΓ©cution rΓ©elle.
"""
# 1. Marquer le dΓ©marrage rΓ©el. Ce moment est ce qui sert de
# rΓ©fΓ©rence pour le timeout.
with started_at_lock:
started_at[document.id] = time.monotonic()
# 2. Construire les inputs et le contexte.
initial_inputs = initial_inputs_factory(document)
context = context_factory(document)
# 3. DΓ©lΓ©guer au PipelineExecutor.run_plan (S28). Le plan a
# dΓ©jΓ  Γ©tΓ© validΓ© une fois par le runner ; pas de re-validation
# par doc.
return self._executor.run_plan(
plan=plan,
document=document,
initial_inputs=initial_inputs,
context=context,
)
# ──────────────────────────────────────────────────────────────────────
# Helpers d'agrΓ©gation
# ──────────────────────────────────────────────────────────────────────
def _outcome_from_future(
fut: concurrent.futures.Future,
doc: DocumentRef,
) -> DocumentOutcome:
"""Convertit une future achevΓ©e en ``DocumentOutcome``.
- Future qui a levΓ© β†’ ``status="failed"``, ``error=str(exc)``.
- Future qui a renvoyΓ© un ``PipelineResult`` succeeded β†’ ``"succeeded"``.
- Future qui a renvoyΓ© un ``PipelineResult`` non-succeeded β†’
``"failed"`` (au moins une Γ©tape en erreur).
"""
try:
result = fut.result(timeout=0) # dΓ©jΓ  done
except concurrent.futures.CancelledError:
return DocumentOutcome(
document_id=doc.id,
status="cancelled",
duration_seconds=0.0,
error="cancelled",
)
except Exception as exc: # noqa: BLE001
# PipelineExecutor capture toutes les erreurs des steps,
# donc une exception ici signale un bug profond (typiquement
# un PipelineSpecInvalid levΓ© par l'executor).
return DocumentOutcome(
document_id=doc.id,
status="failed",
duration_seconds=0.0,
error=f"runner_internal_error: {type(exc).__name__}: {exc}",
)
if result.succeeded:
status = "succeeded"
error: str | None = None
else:
status = "failed"
# ConcatΓ©ner les erreurs de step pour le diagnostic.
step_errors = [
f"{r.step_id}: {r.error}"
for r in result.step_results
if not r.succeeded
]
error = "; ".join(step_errors) if step_errors else "unknown failure"
return DocumentOutcome(
document_id=doc.id,
status=status,
duration_seconds=result.duration_seconds,
error=error,
pipeline_result=result,
)
def _aggregate(
*,
pipeline_name: str,
corpus_name: str,
n_documents: int,
outcomes: list[DocumentOutcome],
duration_seconds: float,
) -> CorpusRunResult:
return CorpusRunResult(
pipeline_name=pipeline_name,
corpus_name=corpus_name,
n_documents=n_documents,
n_succeeded=sum(1 for o in outcomes if o.status == "succeeded"),
n_failed=sum(1 for o in outcomes if o.status == "failed"),
n_timed_out=sum(1 for o in outcomes if o.status == "timed_out"),
n_cancelled=sum(1 for o in outcomes if o.status == "cancelled"),
duration_seconds=duration_seconds,
outcomes=tuple(outcomes),
)
__all__ = [
"CorpusRunner",
"CorpusRunResult",
"DocumentOutcome",
"InitialInputsFactory",
"ContextFactory",
]