Picarones / picarones /pipeline /executor.py
Claude
feat: audit S59 institutionnel — 2 BLOCKER + 4 HIGH + 3 MEDIUM corrigés
d4d4112 unverified
Raw
History Blame
21.6 kB
"""``PipelineExecutor`` mono-document — Sprints A14-S7 / S28.
Exécuteur séquentiel d'une pipeline composée sur un document.
Sprint S7 livrait ``run(spec, document, initial_inputs, context)``
qui validait la spec en interne et résolvait les bindings au
runtime via un bag versionné.
Sprint S28 introduit le ``PipelinePlanner`` qui transforme une
``PipelineSpec`` en ``ExecutionPlan`` immuable (validations +
bindings résolus + jonctions de métriques détectées). L'executor
consomme désormais soit :
- Un ``ExecutionPlan`` pré-calculé via ``run_plan(plan, ...)`` —
signature canonique, contrat explicite.
- Une ``PipelineSpec`` brute via ``run(spec, ...)`` — sucre
ergonomique qui appelle le planner en interne (planification
systématique, pas de cache implicite).
Contrat
-------
Le caller (typiquement ``BenchmarkService`` ou ``CorpusRunner``)
fournit :
- un ``ExecutionPlan`` (canonique) ou ``PipelineSpec`` (sucre),
- un ``DocumentRef`` du document à traiter,
- un dict ``{ArtifactType: Artifact}`` des entrées initiales
(typiquement ``{IMAGE: Artifact(...)}``),
- un ``RunContext`` (``document_id``, ``code_version``,
``pipeline_name``, éventuel ``workspace_uri``),
- un ``adapter_resolver: Callable[[str], StepExecutor]`` injecté
au constructeur.
L'executor garantit :
- Les étapes sont exécutées dans l'ordre du plan
(``resolved_steps``).
- Chaque entrée d'une étape est résolue depuis les
``StepInputBinding`` du plan — fini la résolution implicite
« dernier producteur » au runtime.
- Toute exception levée par un adapter est capturée — le step
est marqué ``succeeded=False`` avec ``error=str(exc)``, et le
pipeline continue (les étapes en aval pourront échouer si elles
dépendaient des outputs de ce step, ce qui est explicite).
- Les ``output_types`` déclarés par l'adapter sont validés au
retour : un type promis manquant marque le step en échec avec
``error="missing_output: <type>"``.
L'executor ne garantit PAS (reportés à des sprints suivants) :
- Cache d'artefacts inter-runs (S29 livre ``ArtifactStore``).
- Parallélisation inter-documents ou inter-étapes (cf. S8 pour
inter-doc via ``CorpusRunner``).
Compat S7
---------
La signature historique ``run(spec, document, ...)`` reste
exposée — elle planifie la spec systématiquement à chaque appel
et délègue à ``run_plan``. Aucune logique nouvelle n'y vit.
"""
from __future__ import annotations
import logging
import time
from typing import Callable
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.documents import DocumentRef
from picarones.domain.errors import PicaronesError
from picarones.pipeline.cache_helpers import (
compute_step_artifact_key,
read_cached_outputs,
write_outputs_to_cache,
)
from picarones.pipeline.cache_protocol import ArtifactCachePort
from picarones.pipeline.planner import (
ExecutionPlan,
PipelinePlanner,
PlanningError,
ResolvedStep,
)
from picarones.pipeline.protocols import StepExecutor
from picarones.domain.pipeline_spec import INITIAL_STEP_ID, PipelineSpec
from picarones.pipeline.types import PipelineResult, RunContext, StepResult
logger = logging.getLogger(__name__)
class PipelineSpecInvalid(PicaronesError):
"""``PipelineSpec`` mal formée — l'executor refuse de démarrer.
Wrappe le ``PlanningError`` produit par ``PipelinePlanner`` pour
préserver la sémantique historique : un caller qui catchait
``PipelineSpecInvalid`` continue de fonctionner.
"""
#: Type alias pour le resolver d'adapters. Une fonction qui
#: prend un ``adapter_name`` (str) et retourne une instance
#: ``StepExecutor`` prête à l'emploi. Si le resolver lève
#: ``KeyError``, l'executor traduit en step en échec avec
#: ``error="adapter_not_found: ..."``.
AdapterResolver = Callable[[str], StepExecutor]
class PipelineExecutor:
"""Exécuteur séquentiel mono-document.
Une instance peut traiter plusieurs documents (l'état est
porté par les paramètres de ``run()``, pas par le constructeur).
L'instance est thread-safe en lecture (rien n'est muté après
construction).
Parameters
----------
adapter_resolver:
Callable qui résout un ``adapter_name`` en instance
``StepExecutor``. Typiquement
``lambda name: registry[name]`` en test, ou un service
applicatif qui injecte les bonnes dépendances en prod.
planner:
``PipelinePlanner`` injecté (S28). Si ``None``, un planner
par défaut sans ``MetricRegistry`` est instancié.
artifact_store:
``ArtifactStore`` optionnel (S29 + S47) pour la **reprise par
hash**. Si fourni, l'executor :
- **avant** chaque step, calcule la clé du step via
``compute_step_artifact_key`` et interroge le store ; si
toutes les sorties attendues sont présentes ET valides
(URIs accessibles), saute l'exécution et retourne les
artefacts cachés (``StepResult.duration_seconds=0.0``) ;
- **après** chaque step réussi, persiste les outputs dans
le store sous la clé dérivée.
Si ``None`` (défaut), aucun cache n'est consulté ni écrit.
Le comportement est strictement identique à l'avant-S47.
"""
def __init__(
self,
adapter_resolver: AdapterResolver,
planner: PipelinePlanner | None = None,
artifact_store: ArtifactCachePort | None = None,
) -> None:
if not callable(adapter_resolver):
raise PicaronesError(
"PipelineExecutor : adapter_resolver doit être callable."
)
if planner is not None and not isinstance(planner, PipelinePlanner):
raise PicaronesError(
"PipelineExecutor : planner doit être un PipelinePlanner ou None."
)
# ``isinstance(artifact_store, ArtifactCachePort)`` est un duck
# typing check (Protocol @runtime_checkable) — valide get/put/
# __contains__ par leur seule présence. Permet à un caller
# tiers (Redis, S3) de fournir un store custom satisfaisant
# le protocol sans hériter de la classe ABC ``ArtifactStore``.
if artifact_store is not None and not isinstance(
artifact_store, ArtifactCachePort,
):
raise PicaronesError(
"PipelineExecutor : artifact_store doit satisfaire le "
"protocole ArtifactCachePort (get / put / __contains__) "
"ou être None.",
)
self._resolver = adapter_resolver
# Si pas de planner injecté, on en fabrique un sans MetricRegistry —
# les jonctions seront vides mais la planification reste correcte.
self._planner = planner if planner is not None else PipelinePlanner()
self._artifact_store = artifact_store
def plan(self, spec: PipelineSpec) -> ExecutionPlan:
"""Planifie une ``PipelineSpec`` en ``ExecutionPlan``.
Sucre exposant le planner injecté. Permet aux callers
(typiquement ``CorpusRunner`` qui exécute la même spec sur
N documents) de planifier **une fois** puis appeler
``run_plan`` N fois — économisant N-1 validations.
Raises
------
PipelineSpecInvalid
Si la planification échoue (validations statiques).
"""
try:
return self._planner.plan(spec)
except PlanningError as exc:
messages = "; ".join(
f"{e.step_id or '<global>'}: {e.message}"
for e in exc.errors
)
raise PipelineSpecInvalid(
f"Spec {spec.name!r} invalide : {messages}"
) from exc
def run(
self,
spec: PipelineSpec,
document: DocumentRef,
initial_inputs: dict[ArtifactType, Artifact],
context: RunContext,
) -> PipelineResult:
"""Exécute une pipeline complète sur un document (sucre).
Sucre ergonomique sur ``run_plan`` : appelle
``self._planner.plan(spec)`` puis ``run_plan(plan, ...)``.
Aucune logique nouvelle n'y vit — l'API canonique est
``run_plan(plan, document, initial_inputs, context)`` qui
accepte un ``ExecutionPlan`` pré-calculé.
Returns
-------
PipelineResult
``succeeded`` global = True ssi toutes les étapes ont
réussi. Une étape en échec n'arrête PAS l'exécution —
les étapes suivantes peuvent quand même tourner si
leurs entrées ne dépendent pas du step en échec.
Raises
------
PipelineSpecInvalid
Si la planification échoue (validations statiques).
L'executor ne masque pas ce type d'erreur : c'est un
bug de programmation, pas un problème runtime.
"""
plan = self.plan(spec)
return self.run_plan(plan, document, initial_inputs, context)
def run_plan(
self,
plan: ExecutionPlan,
document: DocumentRef,
initial_inputs: dict[ArtifactType, Artifact],
context: RunContext,
) -> PipelineResult:
"""Exécute un ``ExecutionPlan`` pré-calculé sur un document.
Signature canonique du S28. Le caller a déjà appelé
``planner.plan(spec)`` (typiquement ``CorpusRunner`` qui
planifie une fois pour N documents). L'executor consomme
directement ``plan.resolved_steps`` sans re-valider la
spec ni re-résoudre les bindings.
Toute la logique d'exécution vit ici ; ``run`` n'est qu'un
sucre.
"""
if not isinstance(plan, ExecutionPlan):
raise PicaronesError(
f"run_plan : plan doit être un ExecutionPlan, "
f"reçu {type(plan).__name__}"
)
# 1. Bag versionné : map (type, step_id) → Artifact.
versioned: dict[tuple[ArtifactType, str], Artifact] = {}
for art_type, art in initial_inputs.items():
versioned[(art_type, INITIAL_STEP_ID)] = art
# 2. Exécution séquentielle des steps résolus.
step_results: list[StepResult] = []
all_artifacts: list[Artifact] = list(initial_inputs.values())
run_started = time.perf_counter()
for resolved_step in plan.resolved_steps:
result, produced = self._run_step(
resolved_step=resolved_step,
versioned=versioned,
context=context,
)
step_results.append(result)
for art_type, art in produced.items():
versioned[(art_type, resolved_step.id)] = art
all_artifacts.append(art)
run_duration = time.perf_counter() - run_started
succeeded = all(r.succeeded for r in step_results)
return PipelineResult(
pipeline_name=plan.spec.name,
document_id=document.id,
step_results=tuple(step_results),
succeeded=succeeded,
duration_seconds=run_duration,
artifacts=tuple(all_artifacts),
)
# ──────────────────────────────────────────────────────────────────
# Helpers internes
# ──────────────────────────────────────────────────────────────────
def _run_step(
self,
*,
resolved_step: ResolvedStep,
versioned: dict[tuple[ArtifactType, str], Artifact],
context: RunContext,
) -> tuple[StepResult, dict[ArtifactType, Artifact]]:
"""Exécute une étape résolue, retourne (result, artefacts produits).
Le tuple est important : si le step échoue, on retourne quand
même un dict vide pour les artefacts → le caller peut
continuer la boucle proprement.
"""
step = resolved_step.step
step_started = time.perf_counter()
# 1. Résoudre les inputs depuis le bag en suivant les bindings
# explicites du plan.
try:
inputs = self._inputs_from_bindings(
resolved_step=resolved_step,
versioned=versioned,
)
except _InputResolutionError as exc:
duration = time.perf_counter() - step_started
return (
StepResult(
step_id=step.id,
succeeded=False,
duration_seconds=duration,
error=str(exc),
),
{},
)
# 1bis. S47 — Reprise par hash via ArtifactStore.
# Si un store est injecté et que tous les inputs ont un
# ``content_hash``, on calcule la clé du step et on interroge
# le store. Hit complet → on saute l'exécution (durée 0,
# même artefacts que la dernière exécution réussie). Miss
# ou cache partiel → on tombe dans l'exécution normale.
if self._artifact_store is not None:
cached_outputs = self._try_resume_from_cache(
step=step, inputs=inputs, context=context,
)
if cached_outputs is not None:
logger.info(
"[pipeline:%s] step '%s' : hit cache "
"(reprise par hash, exécution sautée).",
context.pipeline_name, step.id,
)
return (
StepResult(
step_id=step.id,
succeeded=True,
duration_seconds=0.0,
produced_artifacts={
t.value: a.id
for t, a in cached_outputs.items()
},
),
cached_outputs,
)
# 2. Résoudre l'adapter.
try:
adapter = self._resolver(step.adapter_name)
except KeyError:
duration = time.perf_counter() - step_started
return (
StepResult(
step_id=step.id,
succeeded=False,
duration_seconds=duration,
error=f"adapter_not_found: {step.adapter_name}",
),
{},
)
except Exception as exc: # noqa: BLE001
duration = time.perf_counter() - step_started
return (
StepResult(
step_id=step.id,
succeeded=False,
duration_seconds=duration,
error=f"adapter_resolver_failed: {exc}",
),
{},
)
# 3. Exécuter. Toute exception est capturée → step en échec.
try:
outputs = adapter.execute(inputs, dict(step.params), context)
except Exception as exc: # noqa: BLE001
duration = time.perf_counter() - step_started
logger.warning(
"[pipeline:%s] step '%s' a levé : %s",
context.pipeline_name, step.id, exc,
)
return (
StepResult(
step_id=step.id,
succeeded=False,
duration_seconds=duration,
error=f"adapter_raised: {type(exc).__name__}: {exc}",
),
{},
)
# 4. Valider les outputs déclarés.
missing = [
t for t in step.output_types
if t not in outputs
]
duration = time.perf_counter() - step_started
if missing:
return (
StepResult(
step_id=step.id,
succeeded=False,
duration_seconds=duration,
error=(
"missing_output: "
f"{[t.value for t in missing]}"
),
),
# On garde quand même les outputs qui ont été produits,
# pour que les éventuels steps en aval puissent les
# utiliser si la pipeline est résiliente.
outputs,
)
# 5. Filtrage sur ``step.output_types``.
# Un adapter peut produire plus de types que le YAML n'en
# déclare (ex: Tesseract avec ``expose_confidences=True``
# mais le step ne déclare que ``[raw_text]``). Le contrat
# est que seuls les outputs déclarés en sortie de step
# passent en aval — sinon un DAG branchant pourrait recevoir
# des artefacts qui ne devaient pas exister à cette jonction.
declared = set(step.output_types)
outputs = {t: a for t, a in outputs.items() if t in declared}
# 6. Succès — persiste dans le store si fourni. La méthode
# interne sait gérer le cas content_hash manquant (skip
# silencieux) — on lui passe la responsabilité.
if self._artifact_store is not None:
self._persist_to_cache(
step=step, inputs=inputs, context=context, outputs=outputs,
)
produced_map = {
t.value: a.id for t, a in outputs.items()
}
return (
StepResult(
step_id=step.id,
succeeded=True,
duration_seconds=duration,
produced_artifacts=produced_map,
),
outputs,
)
# ──────────────────────────────────────────────────────────────────
# S47 — Reprise par hash via ArtifactStore
# ──────────────────────────────────────────────────────────────────
def _try_resume_from_cache(
self,
*,
step,
inputs: dict[ArtifactType, Artifact],
context: RunContext,
) -> dict[ArtifactType, Artifact] | None:
"""Tente de retrouver les outputs cachés du step.
Retourne ``None`` (cache miss) dans 3 cas :
1. Un input n'a pas de ``content_hash`` → la clé n'est pas
calculable (cf. ``ArtifactKey.hash_hex``).
2. Le store ne contient pas TOUS les ``output_types`` du step.
3. Une URI cachée pointe vers un fichier qui n'existe plus.
"""
# Nécessairement non-None ici (vérifié par le caller), mais on
# défend en profondeur.
if self._artifact_store is None:
return None
key = compute_step_artifact_key(step, inputs, context)
step_hash = key.hash_hex()
if step_hash is None:
return None
return read_cached_outputs(
store=self._artifact_store,
step=step,
step_hash=step_hash,
)
def _persist_to_cache(
self,
*,
step,
inputs: dict[ArtifactType, Artifact],
context: RunContext,
outputs: dict[ArtifactType, Artifact],
) -> None:
"""Persiste les outputs d'un step réussi dans le store.
Skip silencieux si la clé n'est pas calculable (un input sans
``content_hash``).
"""
if self._artifact_store is None:
return
key = compute_step_artifact_key(step, inputs, context)
step_hash = key.hash_hex()
if step_hash is None:
return
write_outputs_to_cache(
store=self._artifact_store,
step=step,
step_hash=step_hash,
outputs=outputs,
)
def _inputs_from_bindings(
self,
*,
resolved_step: ResolvedStep,
versioned: dict[tuple[ArtifactType, str], Artifact],
) -> dict[ArtifactType, Artifact]:
"""Construit le dict ``{ArtifactType: Artifact}`` à passer
à l'adapter à partir des bindings explicites du plan.
Le plan a déjà résolu chaque ``input_type`` à une
``source_step_id`` (soit ``INITIAL_STEP_ID``, soit l'ID
d'une étape antérieure). L'executor n'a plus qu'à indexer
le bag par ``(input_type, source_step_id)``.
Lève ``_InputResolutionError`` si l'artefact attendu
n'est pas dans le bag — typiquement parce qu'une étape
antérieure a échoué et n'a pas produit son output.
"""
inputs: dict[ArtifactType, Artifact] = {}
for binding in resolved_step.input_bindings:
key = (binding.input_type, binding.source_step_id)
if key not in versioned:
raise _InputResolutionError(
f"missing_input: {binding.input_type.value}"
f"@{binding.source_step_id}"
)
inputs[binding.input_type] = versioned[key]
return inputs
class _InputResolutionError(Exception):
"""Erreur interne signalant qu'un input n'a pas pu être résolu.
Capturée par ``_run_step`` qui la traduit en ``StepResult``
en échec avec ``error="missing_input: ..."``.
"""
__all__ = [
"AdapterResolver",
"PipelineExecutor",
"PipelineSpecInvalid",
]