Spaces:
Sleeping
Sleeping
Claude
feat: Sprint A14-S57 β Wave F clΓ΄ture audit (issues #15 #16 #21 #23 #24 #25 #26 #30)
7d68969 unverified | """``CorpusRunner`` β Sprint A14-S8. | |
| Orchestre l'exΓ©cution d'une ``PipelineSpec`` sur un corpus complet | |
| avec trois propriΓ©tΓ©s critiques que l'ancien | |
| ``measurements.runner`` ne garantissait pas correctement : | |
| 1. **Backpressure** β pas de "submit all upfront". L'orchestrateur | |
| ne soumet jamais plus de ``max_in_flight`` documents en | |
| parallΓ¨le. RAM bornΓ©e mΓͺme sur des corpus de plusieurs milliers | |
| de documents. | |
| 2. **Timeout depuis le dΓ©but d'exΓ©cution rΓ©elle** β l'ancien runner | |
| calculait le timeout depuis la submission au pool, donc un | |
| document pouvait Γͺtre marquΓ© timeout parce qu'il avait passΓ© | |
| N secondes en queue, pas N secondes en train de tourner. Le | |
| nouveau runner mesure depuis le moment oΓΉ le worker dΓ©marre | |
| rΓ©ellement. | |
| 3. **Annulation propre** β un ``threading.Event`` partagΓ© permet | |
| au caller (typiquement un service applicatif sur un endpoint | |
| FastAPI ``cancel``) de signaler l'arrΓͺt. Les workers | |
| coopΓ©ratifs vΓ©rifient l'event ; les futures non dΓ©marrΓ©es sont | |
| sautΓ©es ; les futures dΓ©jΓ en cours se terminent (Python ne | |
| permet pas de tuer un thread en cours). | |
| Limites assumΓ©es pour S8 | |
| ------------------------ | |
| - **Mode threads uniquement.** Le mode process (``ProcessPoolExecutor``) | |
| ajoutΓ© au S11 quand on dΓ©placera les adapters CPU-bound. | |
| Aujourd'hui, un adapter Tesseract local en thread fonctionne | |
| (le GIL est relΓ’chΓ© par le sous-processus pytesseract β OK). | |
| - **Pas de kill-thread garanti.** Si un adapter ne coopère pas avec | |
| ``cancel_event`` et fait un appel C bloquant non-interruptible, | |
| le runner attend la fin naturelle. C'est documentΓ©. | |
| - **Pas de retry automatique.** Si un adapter Γ©choue, le doc est | |
| marquΓ© en Γ©chec et on passe au suivant. | |
| DΓ©finition de done | |
| ------------------ | |
| ``CorpusRunner.run(spec, 1000 docs synthΓ©tiques)`` se termine en | |
| moins de 10 minutes sans dΓ©passer 500 MB de RAM rΓ©sidente. Le | |
| test ``test_sprint_a14_s8_def_of_done`` valide ce critère | |
| (Γ©chantillon paramΓ©trable pour CI rapide). | |
| """ | |
| from __future__ import annotations | |
| import concurrent.futures | |
| import logging | |
| import threading | |
| import time | |
| from collections.abc import Iterable | |
| from typing import Callable | |
| from pydantic import BaseModel, ConfigDict, Field | |
| from picarones.domain.artifacts import Artifact, ArtifactType | |
| from picarones.domain.documents import DocumentRef | |
| from picarones.domain.errors import PicaronesError | |
| from picarones.pipeline.executor import PipelineExecutor | |
| from picarones.domain.pipeline_spec import PipelineSpec | |
| from picarones.pipeline.types import PipelineResult, RunContext | |
| logger = logging.getLogger(__name__) | |
| #: Factories injectΓ©es par le caller pour adapter le runner Γ | |
| #: son contexte (corpus local, IIIF, HF, etc.). | |
| InitialInputsFactory = Callable[ | |
| [DocumentRef], | |
| dict[ArtifactType, Artifact], | |
| ] | |
| ContextFactory = Callable[[DocumentRef], RunContext] | |
| class DocumentOutcome(BaseModel): | |
| """RΓ©sultat de l'exΓ©cution d'une pipeline sur **un** document. | |
| Distinct de ``PipelineResult`` : porte un statut | |
| (``"succeeded"`` / ``"failed"`` / ``"timed_out"`` / | |
| ``"cancelled"``) et conserve le ``PipelineResult`` quand il | |
| existe (peut Γͺtre ``None`` si annulation avant dΓ©marrage). | |
| """ | |
| model_config = ConfigDict(frozen=True, extra="forbid") | |
| document_id: str | |
| status: str = Field(pattern=r"^(succeeded|failed|timed_out|cancelled)$") | |
| duration_seconds: float = Field(ge=0.0) | |
| error: str | None = None | |
| pipeline_result: PipelineResult | None = None | |
| class CorpusRunResult(BaseModel): | |
| """RΓ©sultat agrΓ©gΓ© d'un run de corpus. | |
| Attributs | |
| --------- | |
| pipeline_name: | |
| Nom de la pipeline exΓ©cutΓ©e. | |
| corpus_name: | |
| Nom du corpus (libre, fourni par le caller). | |
| n_documents: | |
| Nombre total de documents tentΓ©s. | |
| n_succeeded: | |
| Nombre de documents pour lesquels la pipeline a complètement | |
| rΓ©ussi (``PipelineResult.succeeded == True``). | |
| n_failed: | |
| Nombre de documents avec au moins une Γ©tape en Γ©chec. | |
| n_timed_out: | |
| Nombre de documents tuΓ©s par timeout. | |
| n_cancelled: | |
| Nombre de documents jamais dΓ©marrΓ©s (cancel_event signalΓ© | |
| avant leur tour). | |
| duration_seconds: | |
| Wall-clock total du run. | |
| outcomes: | |
| Détail document par document, ordre d'achèvement. | |
| """ | |
| model_config = ConfigDict(frozen=True, extra="forbid") | |
| pipeline_name: str | |
| corpus_name: str | |
| n_documents: int = Field(ge=0) | |
| n_succeeded: int = Field(ge=0) | |
| n_failed: int = Field(ge=0) | |
| n_timed_out: int = Field(ge=0) | |
| n_cancelled: int = Field(ge=0) | |
| duration_seconds: float = Field(ge=0.0) | |
| outcomes: tuple[DocumentOutcome, ...] = Field(default_factory=tuple) | |
| class CorpusRunner: | |
| """Orchestre ``PipelineExecutor`` sur un corpus avec backpressure | |
| + timeout rΓ©el + cancellation. | |
| Une instance est rΓ©utilisable Γ travers plusieurs runs. | |
| """ | |
| def __init__( | |
| self, | |
| executor: PipelineExecutor, | |
| max_in_flight: int = 4, | |
| timeout_seconds_per_doc: float = 300.0, | |
| poll_interval_seconds: float = 0.05, | |
| ) -> None: | |
| if max_in_flight < 1: | |
| raise PicaronesError( | |
| f"max_in_flight doit Γͺtre >= 1 (reΓ§u {max_in_flight})." | |
| ) | |
| if timeout_seconds_per_doc <= 0: | |
| raise PicaronesError( | |
| f"timeout_seconds_per_doc doit Γͺtre > 0 (reΓ§u " | |
| f"{timeout_seconds_per_doc})." | |
| ) | |
| if poll_interval_seconds <= 0: | |
| raise PicaronesError( | |
| "poll_interval_seconds doit Γͺtre > 0." | |
| ) | |
| self._executor = executor | |
| self._max_in_flight = max_in_flight | |
| self._timeout = timeout_seconds_per_doc | |
| self._poll = poll_interval_seconds | |
| def run( | |
| self, | |
| spec: PipelineSpec, | |
| documents: Iterable[DocumentRef], | |
| initial_inputs_factory: InitialInputsFactory, | |
| context_factory: ContextFactory, | |
| corpus_name: str = "corpus", | |
| cancel_event: threading.Event | None = None, | |
| ) -> CorpusRunResult: | |
| """ExΓ©cute ``spec`` sur tous les ``documents`` du corpus. | |
| Returns | |
| ------- | |
| CorpusRunResult | |
| RΓ©sultat agrΓ©gΓ©. Ne lΓ¨ve jamais β toute erreur d'un | |
| document est capturΓ©e dans son ``DocumentOutcome``. | |
| """ | |
| documents_list = list(documents) | |
| run_started = time.perf_counter() | |
| # Γtat partagΓ© entre threads : ``started_at[doc_id]`` = | |
| # monotonic au moment oΓΉ le worker du doc a vraiment dΓ©marrΓ© | |
| # ``execute()``. L'orchestrateur lit ce dict pour dΓ©cider | |
| # d'un timeout depuis le dΓ©but d'exΓ©cution rΓ©elle. | |
| started_at: dict[str, float] = {} | |
| started_at_lock = threading.Lock() | |
| outcomes: list[DocumentOutcome] = [] | |
| # Fast path : aucun document β rΓ©sultat vide immΓ©diat. | |
| if not documents_list: | |
| return CorpusRunResult( | |
| pipeline_name=spec.name, | |
| corpus_name=corpus_name, | |
| n_documents=0, | |
| n_succeeded=0, | |
| n_failed=0, | |
| n_timed_out=0, | |
| n_cancelled=0, | |
| duration_seconds=0.0, | |
| outcomes=(), | |
| ) | |
| # S28 : on planifie une seule fois pour la spec. Si la spec | |
| # est invalide, on lΓ¨ve maintenant β pas dans chaque worker. | |
| # Les workers consomment ensuite ``executor.run_plan(plan, ...)`` | |
| # β N-1 validations Γ©conomisΓ©es. | |
| plan = self._executor.plan(spec) | |
| # Pool instanciΓ© explicitement avec ``shutdown(wait=False, | |
| # cancel_futures=True)`` Γ la sortie : les futures en queue | |
| # sont annulΓ©es, les threads en cours continuent en | |
| # arrière-plan jusqu'à leur fin naturelle (Python ne permet | |
| # pas de tuer un thread). Le caller récupère le résultat | |
| # immédiatement après le timeout / la cancellation, sans | |
| # attendre que les threads en cours se terminent β c'est | |
| # critique pour la latence perΓ§ue du runner. | |
| pool = concurrent.futures.ThreadPoolExecutor( | |
| max_workers=self._max_in_flight, | |
| thread_name_prefix=f"picarones-{spec.name}", | |
| ) | |
| try: | |
| future_to_doc: dict[concurrent.futures.Future, DocumentRef] = {} | |
| doc_iter = iter(documents_list) | |
| in_flight = 0 | |
| done_count = 0 | |
| def _submit_next() -> bool: | |
| """Tente de soumettre le prochain document au pool. | |
| Retourne ``True`` si un doc a Γ©tΓ© soumis, | |
| ``False`` si l'itΓ©rateur est Γ©puisΓ© ou si | |
| cancel_event est signalΓ©. | |
| """ | |
| nonlocal in_flight | |
| if cancel_event is not None and cancel_event.is_set(): | |
| return False | |
| try: | |
| doc = next(doc_iter) | |
| except StopIteration: | |
| return False | |
| fut = pool.submit( | |
| self._run_one, | |
| plan=plan, | |
| document=doc, | |
| initial_inputs_factory=initial_inputs_factory, | |
| context_factory=context_factory, | |
| started_at=started_at, | |
| started_at_lock=started_at_lock, | |
| ) | |
| future_to_doc[fut] = doc | |
| in_flight += 1 | |
| return True | |
| # 1. Amorcer le pool : ne pas dΓ©passer max_in_flight. | |
| for _ in range(self._max_in_flight): | |
| if not _submit_next(): | |
| break | |
| # 2. Boucle principale : rΓ©colter les rΓ©sultats, surveiller | |
| # les timeouts, soumettre le suivant Γ chaque libΓ©ration. | |
| while future_to_doc: | |
| # Polling court pour pouvoir vΓ©rifier les timeouts en | |
| # parallèle des completions naturelles. | |
| done_set, _ = concurrent.futures.wait( | |
| future_to_doc.keys(), | |
| timeout=self._poll, | |
| return_when=concurrent.futures.FIRST_COMPLETED, | |
| ) | |
| # 2a. RΓ©colter les futures terminΓ©es. | |
| for fut in done_set: | |
| doc = future_to_doc.pop(fut) | |
| in_flight -= 1 | |
| outcomes.append(_outcome_from_future(fut, doc)) | |
| done_count += 1 | |
| # Soumettre le suivant pour maintenir la backpressure. | |
| _submit_next() | |
| # 2b. VΓ©rifier les timeouts depuis le dΓ©but d'exΓ©cution | |
| # rΓ©elle (pas depuis la submission). | |
| now = time.monotonic() | |
| timed_out_futures: list[concurrent.futures.Future] = [] | |
| with started_at_lock: | |
| started_snapshot = dict(started_at) | |
| for fut, doc in list(future_to_doc.items()): | |
| started = started_snapshot.get(doc.id) | |
| if started is None: | |
| continue # pas encore dΓ©marrΓ© β pas de timeout | |
| if now - started > self._timeout: | |
| timed_out_futures.append(fut) | |
| for fut in timed_out_futures: | |
| doc = future_to_doc.pop(fut) | |
| in_flight -= 1 | |
| # On ne peut pas vraiment killer un thread en | |
| # Python ; on signale via cancel_event si fourni | |
| # ET on enregistre le timeout immΓ©diatement (le | |
| # thread continuera en arrière-plan jusqu'à ce | |
| # qu'il ait fini, mais le run principal n'attend | |
| # plus son rΓ©sultat). | |
| duration = ( | |
| now - started_snapshot.get(doc.id, now) | |
| ) | |
| outcomes.append(DocumentOutcome( | |
| document_id=doc.id, | |
| status="timed_out", | |
| duration_seconds=max(duration, 0.0), | |
| error=( | |
| f"timeout: doc {doc.id} a dΓ©passΓ© " | |
| f"{self._timeout:.1f}s d'exΓ©cution rΓ©elle" | |
| ), | |
| )) | |
| done_count += 1 | |
| _submit_next() | |
| # 2c. Cancellation explicite : marquer toutes les | |
| # futures non dΓ©marrΓ©es comme annulΓ©es. | |
| if cancel_event is not None and cancel_event.is_set(): | |
| cancelled = [] | |
| with started_at_lock: | |
| started_snapshot = dict(started_at) | |
| for fut, doc in list(future_to_doc.items()): | |
| if doc.id not in started_snapshot: | |
| # Future encore en queue β on peut la | |
| # canceller proprement. | |
| if fut.cancel(): | |
| cancelled.append(doc) | |
| future_to_doc.pop(fut, None) | |
| in_flight -= 1 | |
| for doc in cancelled: | |
| outcomes.append(DocumentOutcome( | |
| document_id=doc.id, | |
| status="cancelled", | |
| duration_seconds=0.0, | |
| error="cancelled before start", | |
| )) | |
| finally: | |
| # Sortie immΓ©diate : on ne bloque pas sur les threads en | |
| # cours. Les futures en queue sont annulΓ©es, les threads | |
| # dΓ©jΓ actifs continuent jusqu'Γ leur fin naturelle (cf. | |
| # commentaire Γ l'instanciation du pool). | |
| pool.shutdown(wait=False, cancel_futures=True) | |
| # 3. AgrΓ©gation finale. | |
| run_duration = time.perf_counter() - run_started | |
| return _aggregate( | |
| pipeline_name=spec.name, | |
| corpus_name=corpus_name, | |
| n_documents=len(documents_list), | |
| outcomes=outcomes, | |
| duration_seconds=run_duration, | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Worker | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_one( | |
| self, | |
| *, | |
| plan, # ExecutionPlan ; type omis pour Γ©viter l'import top-level | |
| document: DocumentRef, | |
| initial_inputs_factory: InitialInputsFactory, | |
| context_factory: ContextFactory, | |
| started_at: dict[str, float], | |
| started_at_lock: threading.Lock, | |
| ) -> PipelineResult: | |
| """ExΓ©cute le plan prΓ©-calculΓ© sur un document. AppelΓ© dans | |
| un thread du pool. | |
| Enregistre ``started_at[doc.id]`` au tout dΓ©but pour que | |
| l'orchestrateur puisse mesurer le timeout depuis le dΓ©but | |
| d'exΓ©cution rΓ©elle. | |
| """ | |
| # 1. Marquer le dΓ©marrage rΓ©el. Ce moment est ce qui sert de | |
| # rΓ©fΓ©rence pour le timeout. | |
| with started_at_lock: | |
| started_at[document.id] = time.monotonic() | |
| # 2. Construire les inputs et le contexte. | |
| initial_inputs = initial_inputs_factory(document) | |
| context = context_factory(document) | |
| # 3. DΓ©lΓ©guer au PipelineExecutor.run_plan (S28). Le plan a | |
| # dΓ©jΓ Γ©tΓ© validΓ© une fois par le runner ; pas de re-validation | |
| # par doc. | |
| return self._executor.run_plan( | |
| plan=plan, | |
| document=document, | |
| initial_inputs=initial_inputs, | |
| context=context, | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers d'agrΓ©gation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _outcome_from_future( | |
| fut: concurrent.futures.Future, | |
| doc: DocumentRef, | |
| ) -> DocumentOutcome: | |
| """Convertit une future achevΓ©e en ``DocumentOutcome``. | |
| - Future qui a levΓ© β ``status="failed"``, ``error=str(exc)``. | |
| - Future qui a renvoyΓ© un ``PipelineResult`` succeeded β ``"succeeded"``. | |
| - Future qui a renvoyΓ© un ``PipelineResult`` non-succeeded β | |
| ``"failed"`` (au moins une Γ©tape en erreur). | |
| """ | |
| try: | |
| result = fut.result(timeout=0) # dΓ©jΓ done | |
| except concurrent.futures.CancelledError: | |
| return DocumentOutcome( | |
| document_id=doc.id, | |
| status="cancelled", | |
| duration_seconds=0.0, | |
| error="cancelled", | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| # PipelineExecutor capture toutes les erreurs des steps, | |
| # donc une exception ici signale un bug profond (typiquement | |
| # un PipelineSpecInvalid levΓ© par l'executor). | |
| return DocumentOutcome( | |
| document_id=doc.id, | |
| status="failed", | |
| duration_seconds=0.0, | |
| error=f"runner_internal_error: {type(exc).__name__}: {exc}", | |
| ) | |
| if result.succeeded: | |
| status = "succeeded" | |
| error: str | None = None | |
| else: | |
| status = "failed" | |
| # ConcatΓ©ner les erreurs de step pour le diagnostic. | |
| step_errors = [ | |
| f"{r.step_id}: {r.error}" | |
| for r in result.step_results | |
| if not r.succeeded | |
| ] | |
| error = "; ".join(step_errors) if step_errors else "unknown failure" | |
| return DocumentOutcome( | |
| document_id=doc.id, | |
| status=status, | |
| duration_seconds=result.duration_seconds, | |
| error=error, | |
| pipeline_result=result, | |
| ) | |
| def _aggregate( | |
| *, | |
| pipeline_name: str, | |
| corpus_name: str, | |
| n_documents: int, | |
| outcomes: list[DocumentOutcome], | |
| duration_seconds: float, | |
| ) -> CorpusRunResult: | |
| return CorpusRunResult( | |
| pipeline_name=pipeline_name, | |
| corpus_name=corpus_name, | |
| n_documents=n_documents, | |
| n_succeeded=sum(1 for o in outcomes if o.status == "succeeded"), | |
| n_failed=sum(1 for o in outcomes if o.status == "failed"), | |
| n_timed_out=sum(1 for o in outcomes if o.status == "timed_out"), | |
| n_cancelled=sum(1 for o in outcomes if o.status == "cancelled"), | |
| duration_seconds=duration_seconds, | |
| outcomes=tuple(outcomes), | |
| ) | |
| __all__ = [ | |
| "CorpusRunner", | |
| "CorpusRunResult", | |
| "DocumentOutcome", | |
| "InitialInputsFactory", | |
| "ContextFactory", | |
| ] | |