Claude
feat(pipeline): Sprint A14-S6 — PipelineSpec déclaratif + validation + YAML round-trip
b9ff8de unverified
Raw
History Blame
5.18 kB
"""``RunContext``, ``StepResult``, ``PipelineResult`` — Sprint A14-S6.
Types runtime du pipeline executor (à implémenter au Sprint S7).
Distincts des specs déclaratives (``picarones.pipeline.spec``) —
ces types portent les **résultats** de l'exécution, pas la
description du DAG.
Aucune logique métier ici : juste des dataclasses pydantic qu'un
service applicatif peut sérialiser dans le manifest d'un run.
"""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from picarones.domain.artifacts import Artifact
class RunContext(BaseModel):
"""Contexte d'exécution passé à chaque ``StepExecutor.execute()``.
Le caller (typiquement ``app/services/benchmark_service`` au
S19) construit un ``RunContext`` par document et le passe à
l'executor pour chaque étape.
Attributs
---------
document_id:
``DocumentRef.id`` du document en cours de traitement.
code_version:
Version du code (``picarones.__version__``) au moment du
run. Sert à étiqueter la ``ProvenanceRecord`` de chaque
artefact produit.
pipeline_name:
Nom de la pipeline en cours. Permet à un adapter de
loguer ``[pipeline_x] step_y : ...`` plutôt que
``[unknown] ...``.
workspace_uri:
URI/chemin du workspace dans lequel l'adapter peut écrire
ses artefacts intermédiaires. ``None`` autorisé pour les
adapters qui n'écrivent rien sur disque (mode in-memory).
Anti-sur-ingénierie : pas de logger injecté, pas d'horloge
abstraite, pas de cancellation token. Ces extras viendront
quand un caller en aura concrètement besoin (probablement S7
pour la cancellation, S8 pour le timeout réel).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
document_id: str = Field(min_length=1, max_length=256)
code_version: str = Field(min_length=1, max_length=128)
pipeline_name: str = Field(min_length=1, max_length=128)
workspace_uri: str | None = Field(default=None, max_length=2048)
class StepResult(BaseModel):
"""Résultat de l'exécution d'une étape sur un document.
Sérialisable JSON pour persistance dans le manifest du run.
Attributs
---------
step_id:
Identifiant de l'étape (cf. ``PipelineStep.id``).
succeeded:
``True`` si l'étape s'est exécutée sans lever d'exception
et a produit tous les types déclarés dans
``output_types``. ``False`` sinon.
duration_seconds:
Wall-clock time de ``execute()`` (du début effectif à la
fin). L'executor du S8 garantira que ce temps est mesuré
depuis le démarrage réel (pas depuis la submission au pool).
produced_artifacts:
Map ``{ArtifactType: artifact_id}`` des artefacts produits.
Vide en cas d'échec.
error:
``None`` en cas de succès ; sinon message d'erreur. Format
libre (le caller décide de la structure dans son rapport).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
step_id: str = Field(min_length=1, max_length=128)
succeeded: bool
duration_seconds: float = Field(ge=0.0)
produced_artifacts: dict[str, str] = Field(default_factory=dict)
"""Map ``{ArtifactType.value: Artifact.id}``.
Sérialisée avec la valeur string de l'enum (``"raw_text"``,
``"alto_xml"``) pour faciliter la lecture humaine du JSON.
"""
error: str | None = None
class PipelineResult(BaseModel):
"""Résultat complet d'une exécution de pipeline sur un document.
Attributs
---------
pipeline_name:
Nom de la pipeline qui a produit ce résultat.
document_id:
Document traité.
step_results:
Résultats de chaque étape, dans l'ordre d'exécution.
succeeded:
``True`` ssi tous les ``step_results`` sont des succès.
Si ``False``, un ou plusieurs ``StepResult.error`` sont
non-None.
duration_seconds:
Wall-clock total (somme des étapes + overhead orchestration).
artifacts:
Liste **plate** de tous les artefacts produits par la
pipeline. Permet à un consommateur (rapport, vue
d'évaluation) d'accéder directement à un artefact par son
id sans parcourir l'arborescence des étapes.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
pipeline_name: str
document_id: str
step_results: tuple[StepResult, ...] = Field(default_factory=tuple)
succeeded: bool = False
duration_seconds: float = Field(default=0.0, ge=0.0)
artifacts: tuple[Artifact, ...] = Field(default_factory=tuple)
def step_result_by_id(self, step_id: str) -> StepResult | None:
for r in self.step_results:
if r.step_id == step_id:
return r
return None
def artifacts_of_type(self, artifact_type: Any) -> tuple[Artifact, ...]:
"""Retourne tous les artefacts du type donné dans l'ordre
de production."""
return tuple(a for a in self.artifacts if a.type == artifact_type)
__all__ = ["RunContext", "StepResult", "PipelineResult"]