Spaces:
Sleeping
Sleeping
Claude
feat(pipeline): Sprint A14-S6 — PipelineSpec déclaratif + validation + YAML round-trip
b9ff8de unverified | """``RunContext``, ``StepResult``, ``PipelineResult`` — Sprint A14-S6. | |
| Types runtime du pipeline executor (à implémenter au Sprint S7). | |
| Distincts des specs déclaratives (``picarones.pipeline.spec``) — | |
| ces types portent les **résultats** de l'exécution, pas la | |
| description du DAG. | |
| Aucune logique métier ici : juste des dataclasses pydantic qu'un | |
| service applicatif peut sérialiser dans le manifest d'un run. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| from pydantic import BaseModel, ConfigDict, Field | |
| from picarones.domain.artifacts import Artifact | |
| class RunContext(BaseModel): | |
| """Contexte d'exécution passé à chaque ``StepExecutor.execute()``. | |
| Le caller (typiquement ``app/services/benchmark_service`` au | |
| S19) construit un ``RunContext`` par document et le passe à | |
| l'executor pour chaque étape. | |
| Attributs | |
| --------- | |
| document_id: | |
| ``DocumentRef.id`` du document en cours de traitement. | |
| code_version: | |
| Version du code (``picarones.__version__``) au moment du | |
| run. Sert à étiqueter la ``ProvenanceRecord`` de chaque | |
| artefact produit. | |
| pipeline_name: | |
| Nom de la pipeline en cours. Permet à un adapter de | |
| loguer ``[pipeline_x] step_y : ...`` plutôt que | |
| ``[unknown] ...``. | |
| workspace_uri: | |
| URI/chemin du workspace dans lequel l'adapter peut écrire | |
| ses artefacts intermédiaires. ``None`` autorisé pour les | |
| adapters qui n'écrivent rien sur disque (mode in-memory). | |
| Anti-sur-ingénierie : pas de logger injecté, pas d'horloge | |
| abstraite, pas de cancellation token. Ces extras viendront | |
| quand un caller en aura concrètement besoin (probablement S7 | |
| pour la cancellation, S8 pour le timeout réel). | |
| """ | |
| model_config = ConfigDict(frozen=True, extra="forbid") | |
| document_id: str = Field(min_length=1, max_length=256) | |
| code_version: str = Field(min_length=1, max_length=128) | |
| pipeline_name: str = Field(min_length=1, max_length=128) | |
| workspace_uri: str | None = Field(default=None, max_length=2048) | |
| class StepResult(BaseModel): | |
| """Résultat de l'exécution d'une étape sur un document. | |
| Sérialisable JSON pour persistance dans le manifest du run. | |
| Attributs | |
| --------- | |
| step_id: | |
| Identifiant de l'étape (cf. ``PipelineStep.id``). | |
| succeeded: | |
| ``True`` si l'étape s'est exécutée sans lever d'exception | |
| et a produit tous les types déclarés dans | |
| ``output_types``. ``False`` sinon. | |
| duration_seconds: | |
| Wall-clock time de ``execute()`` (du début effectif à la | |
| fin). L'executor du S8 garantira que ce temps est mesuré | |
| depuis le démarrage réel (pas depuis la submission au pool). | |
| produced_artifacts: | |
| Map ``{ArtifactType: artifact_id}`` des artefacts produits. | |
| Vide en cas d'échec. | |
| error: | |
| ``None`` en cas de succès ; sinon message d'erreur. Format | |
| libre (le caller décide de la structure dans son rapport). | |
| """ | |
| model_config = ConfigDict(frozen=True, extra="forbid") | |
| step_id: str = Field(min_length=1, max_length=128) | |
| succeeded: bool | |
| duration_seconds: float = Field(ge=0.0) | |
| produced_artifacts: dict[str, str] = Field(default_factory=dict) | |
| """Map ``{ArtifactType.value: Artifact.id}``. | |
| Sérialisée avec la valeur string de l'enum (``"raw_text"``, | |
| ``"alto_xml"``) pour faciliter la lecture humaine du JSON. | |
| """ | |
| error: str | None = None | |
| class PipelineResult(BaseModel): | |
| """Résultat complet d'une exécution de pipeline sur un document. | |
| Attributs | |
| --------- | |
| pipeline_name: | |
| Nom de la pipeline qui a produit ce résultat. | |
| document_id: | |
| Document traité. | |
| step_results: | |
| Résultats de chaque étape, dans l'ordre d'exécution. | |
| succeeded: | |
| ``True`` ssi tous les ``step_results`` sont des succès. | |
| Si ``False``, un ou plusieurs ``StepResult.error`` sont | |
| non-None. | |
| duration_seconds: | |
| Wall-clock total (somme des étapes + overhead orchestration). | |
| artifacts: | |
| Liste **plate** de tous les artefacts produits par la | |
| pipeline. Permet à un consommateur (rapport, vue | |
| d'évaluation) d'accéder directement à un artefact par son | |
| id sans parcourir l'arborescence des étapes. | |
| """ | |
| model_config = ConfigDict(frozen=True, extra="forbid") | |
| pipeline_name: str | |
| document_id: str | |
| step_results: tuple[StepResult, ...] = Field(default_factory=tuple) | |
| succeeded: bool = False | |
| duration_seconds: float = Field(default=0.0, ge=0.0) | |
| artifacts: tuple[Artifact, ...] = Field(default_factory=tuple) | |
| def step_result_by_id(self, step_id: str) -> StepResult | None: | |
| for r in self.step_results: | |
| if r.step_id == step_id: | |
| return r | |
| return None | |
| def artifacts_of_type(self, artifact_type: Any) -> tuple[Artifact, ...]: | |
| """Retourne tous les artefacts du type donné dans l'ordre | |
| de production.""" | |
| return tuple(a for a in self.artifacts if a.type == artifact_type) | |
| __all__ = ["RunContext", "StepResult", "PipelineResult"] | |