"""``PipelineStep`` et ``PipelineSpec`` — Sprint A14-S6. Description **purement déclarative** d'un DAG de transformation documentaire. Sérialisable en YAML, versionnable en git, valide sans avoir besoin d'instancier les modules concrets. Différence avec l'ancien ``picarones.core.pipeline`` (Sprint 63) ---------------------------------------------------------------- L'ancien ``PipelineStep`` portait un champ ``module: BaseModule`` — une **instance** d'objet exécutable. Conséquence : la spec n'était pas sérialisable en YAML, et un test qui voulait juste valider la cohérence des types devait instancier des stubs. Ici, ``PipelineStep`` ne porte qu'un ``adapter_name: str``. Le mapping ``nom → instance`` est maintenu par un service applicatif (``picarones.app.services.adapter_registry`` au S19) et résolu au moment de l'exécution, pas de la spec. Bénéfices : - Le YAML d'une pipeline composée est versionnable en git indépendamment de l'environnement Python (BnF peut commit ``ocr_llm_alto_remap.yaml`` sans imposer aux contributeurs d'avoir tous les SDK installés). - ``validate_spec`` peut s'exécuter sans instancier aucun module → tests rapides et déterministes. - Le rapport de reproductibilité peut citer le YAML exact, le commit du code et la version des adapters utilisés — séparation propre de la déclaration et de l'implémentation. Anti-sur-ingénierie ------------------- - Pas de typage des ``params`` par adapter ici (chaque adapter validera ses propres params au moment de l'exécution). - Pas de versioning de spec — un nouveau champ se traduit par un rebump pydantic. Si on veut migrer entre versions de schéma, on l'ajoutera quand le besoin sera concret. - Pas d'``outputs_preferred`` (mapping logique "preferred_text = step3.RAW_TEXT"). Reporté quand un caller en aura concrètement besoin. """ from __future__ import annotations import re from pydantic import BaseModel, ConfigDict, Field, field_validator from picarones.domain.artifacts import ArtifactType #: Identifiant d'étape — alphanum + ``_-``. Doit être un nom court #: lisible par un humain dans les logs et le rapport. _STEP_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$") #: Sentinel pour ``inputs_from`` qui désigne les artefacts initiaux #: fournis au runner (typiquement ``IMAGE``). INITIAL_STEP_ID = "__initial__" class PipelineStep(BaseModel): """Une étape déclarative dans un DAG de pipeline. Attributs --------- id: Identifiant unique de l'étape dans la pipeline (alphanum + ``_-``). Sert dans les logs, le rapport, et comme cible des références ``inputs_from`` des étapes en aval. kind: Catégorie informationnelle de l'étape (``"ocr"``, ``"post_correction"``, ``"alto_remapping"``, ``"alto_reconstruction"``, etc.). Pas de validation d'enum — c'est un label libre que les services et le rapport peuvent grouper. Par convention, en ``snake_case``. adapter_name: Nom de l'adapter dans le registre runtime (résolu par ``app/services`` au S19). Convention : ``":"`` (ex : ``"tesseract"``, ``"openai:gpt-4o"``, ``"mistral:large"``, ``":"``). params: Paramètres passés à l'adapter au moment de l'exécution. Format libre (chaque adapter valide les siens) — typage scalaire pour rester sérialisable en YAML. input_types: Types d'artefacts consommés par l'étape. Validés par ``validate_spec`` contre les outputs des étapes antérieures. output_types: Types d'artefacts produits. Validés au runtime par l'executor (qui vérifie que tous les types déclarés sont bien dans le dict retourné par l'adapter). inputs_from: DAG branchant (héritage du Sprint 66). Pour chaque type d'entrée, désigne explicitement l'étape source. La chaîne spéciale ``"__initial__"`` désigne les entrées initiales du runner. Si le dict est vide, l'executor prend la version la plus récente de chaque type dans le bag. """ model_config = ConfigDict(frozen=True, extra="forbid") id: str = Field(min_length=1, max_length=128) kind: str = Field(min_length=1, max_length=64) adapter_name: str = Field(min_length=1, max_length=256) params: dict[str, str | int | float | bool] = Field(default_factory=dict) input_types: tuple[ArtifactType, ...] = Field(default_factory=tuple) output_types: tuple[ArtifactType, ...] = Field(default_factory=tuple) inputs_from: dict[ArtifactType, str] = Field(default_factory=dict) @field_validator("id") @classmethod def _validate_step_id(cls, v: str) -> str: if not _STEP_ID_RE.match(v): from picarones.domain.errors import PicaronesError raise PicaronesError( f"step id invalide : {v!r}. " f"Doit matcher {_STEP_ID_RE.pattern!r} (alphanum + _-)." ) if v == INITIAL_STEP_ID: from picarones.domain.errors import PicaronesError raise PicaronesError( f"step id réservé : {INITIAL_STEP_ID!r} désigne " "les entrées initiales du runner." ) return v class PipelineSpec(BaseModel): """DAG déclaratif d'une pipeline composée. Sérialisable en YAML via ``model_dump()`` + ``yaml.safe_dump``, chargeable via ``model_validate(yaml.safe_load(text))``. Le round-trip est testé. Attributs --------- name: Nom court de la pipeline (utilisé dans les logs, le cache, le rapport). Convention ``snake_case``. description: Phrase courte d'introduction affichée dans le rapport. initial_inputs: Types d'artefacts qui doivent être fournis par le caller au moment de l'exécution. Convention : ``(IMAGE,)`` pour une pipeline OCR classique, ``(IMAGE, RAW_TEXT)`` pour une post-correction qui part d'un OCR pré-calculé. steps: Étapes du DAG, ordonnées par dépendance topologique d'exécution. Si une étape ``s2`` dépend de ``s1``, alors ``s1`` apparaît avant ``s2``. ``validate_spec`` détecte les violations. """ model_config = ConfigDict(frozen=True, extra="forbid") name: str = Field(min_length=1, max_length=128) description: str = "" initial_inputs: tuple[ArtifactType, ...] = Field(default_factory=tuple) steps: tuple[PipelineStep, ...] = Field(default_factory=tuple) def step_by_id(self, step_id: str) -> PipelineStep | None: for s in self.steps: if s.id == step_id: return s return None __all__ = ["PipelineStep", "PipelineSpec", "INITIAL_STEP_ID"]