Spaces:
Sleeping
Sleeping
File size: 7,494 Bytes
166b00b 5d3ba70 166b00b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | """``PipelineStep`` et ``PipelineSpec`` — Sprints A14-S6 / S40.
Description **purement déclarative** d'un DAG de transformation
documentaire. Sérialisable en YAML, versionnable en git, valide
sans avoir besoin d'instancier les modules concrets.
Sprint S40 — migration depuis ``picarones.pipeline.spec``
---------------------------------------------------------
Le module canonique est désormais en cercle 1 (``picarones/domain/``)
— c'est un type pur qui n'a aucune dépendance d'exécution
(``picarones/pipeline/`` qui contient le runtime n'est en fait pas
nécessaire pour décrire la spec). ``picarones.pipeline.spec`` reste
exposé en re-export pour ne pas casser les callers existants — ce
n'est pas un shim au sens architectural (adaptation d'une API
incompatible) mais un alias de chemin.
Différence avec ``picarones.evaluation.pipeline`` (Sprint 63)
-------------------------------------------------------------
``PipelineStep`` legacy (relocalisé en ``picarones.evaluation.pipeline``)
porte un champ ``module: BaseModule``
— une **instance** d'objet exécutable. Conséquence : la spec
n'était pas sérialisable en YAML, et un test qui voulait juste
valider la cohérence des types devait instancier des stubs.
Ici, ``PipelineStep`` ne porte qu'un ``adapter_name: str``. Le
mapping ``nom → instance`` est maintenu par un service applicatif
(``picarones.app.services.adapter_registry`` au S19) et résolu au
moment de l'exécution, pas de la spec.
Bénéfices :
- Le YAML d'une pipeline composée est versionnable en git
indépendamment de l'environnement Python (BnF peut commit
``ocr_llm_alto_remap.yaml`` sans imposer aux contributeurs
d'avoir tous les SDK installés).
- ``validate_spec`` peut s'exécuter sans instancier aucun module
→ tests rapides et déterministes.
- Le rapport de reproductibilité peut citer le YAML exact, le
commit du code et la version des adapters utilisés —
séparation propre de la déclaration et de l'implémentation.
Anti-sur-ingénierie
-------------------
- Pas de typage des ``params`` par adapter ici (chaque adapter
validera ses propres params au moment de l'exécution).
- Pas de versioning de spec — un nouveau champ se traduit par un
rebump pydantic. Si on veut migrer entre versions de schéma,
on l'ajoutera quand le besoin sera concret.
- Pas d'``outputs_preferred`` (mapping logique "preferred_text =
step3.RAW_TEXT"). Reporté quand un caller en aura concrètement
besoin.
"""
from __future__ import annotations
import re
from pydantic import BaseModel, ConfigDict, Field, field_validator
from picarones.domain.artifacts import ArtifactType
#: Identifiant d'étape — alphanum + ``_-``. Doit être un nom court
#: lisible par un humain dans les logs et le rapport.
_STEP_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
#: Sentinel pour ``inputs_from`` qui désigne les artefacts initiaux
#: fournis au runner (typiquement ``IMAGE``).
INITIAL_STEP_ID = "__initial__"
class PipelineStep(BaseModel):
"""Une étape déclarative dans un DAG de pipeline.
Attributs
---------
id:
Identifiant unique de l'étape dans la pipeline (alphanum +
``_-``). Sert dans les logs, le rapport, et comme cible
des références ``inputs_from`` des étapes en aval.
kind:
Catégorie informationnelle de l'étape (``"ocr"``,
``"post_correction"``, ``"alto_remapping"``,
``"alto_reconstruction"``, etc.). Pas de validation
d'enum — c'est un label libre que les services et le
rapport peuvent grouper. Par convention, en
``snake_case``.
adapter_name:
Nom de l'adapter dans le registre runtime (résolu par
``app/services`` au S19). Convention :
``"<provider>:<engine_or_model>"`` (ex : ``"tesseract"``,
``"openai:gpt-4o"``, ``"mistral:large"``,
``"<vendor>:<custom_module>"``).
params:
Paramètres passés à l'adapter au moment de l'exécution.
Format libre (chaque adapter valide les siens) — typage
scalaire pour rester sérialisable en YAML.
input_types:
Types d'artefacts consommés par l'étape. Validés par
``validate_spec`` contre les outputs des étapes antérieures.
output_types:
Types d'artefacts produits. Validés au runtime par
l'executor (qui vérifie que tous les types déclarés sont
bien dans le dict retourné par l'adapter).
inputs_from:
DAG branchant (héritage du Sprint 66). Pour chaque type
d'entrée, désigne explicitement l'étape source. La chaîne
spéciale ``"__initial__"`` désigne les entrées initiales
du runner. Si le dict est vide, l'executor prend la
version la plus récente de chaque type dans le bag.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
id: str = Field(min_length=1, max_length=128)
kind: str = Field(min_length=1, max_length=64)
adapter_name: str = Field(min_length=1, max_length=256)
params: dict[str, str | int | float | bool] = Field(default_factory=dict)
input_types: tuple[ArtifactType, ...] = Field(default_factory=tuple)
output_types: tuple[ArtifactType, ...] = Field(default_factory=tuple)
inputs_from: dict[ArtifactType, str] = Field(default_factory=dict)
@field_validator("id")
@classmethod
def _validate_step_id(cls, v: str) -> str:
if not _STEP_ID_RE.match(v):
from picarones.domain.errors import PicaronesError
raise PicaronesError(
f"step id invalide : {v!r}. "
f"Doit matcher {_STEP_ID_RE.pattern!r} (alphanum + _-)."
)
if v == INITIAL_STEP_ID:
from picarones.domain.errors import PicaronesError
raise PicaronesError(
f"step id réservé : {INITIAL_STEP_ID!r} désigne "
"les entrées initiales du runner."
)
return v
class PipelineSpec(BaseModel):
"""DAG déclaratif d'une pipeline composée.
Sérialisable en YAML via ``model_dump()`` + ``yaml.safe_dump``,
chargeable via ``model_validate(yaml.safe_load(text))``. Le
round-trip est testé.
Attributs
---------
name:
Nom court de la pipeline (utilisé dans les logs, le cache,
le rapport). Convention ``snake_case``.
description:
Phrase courte d'introduction affichée dans le rapport.
initial_inputs:
Types d'artefacts qui doivent être fournis par le caller
au moment de l'exécution. Convention : ``(IMAGE,)`` pour
une pipeline OCR classique, ``(IMAGE, RAW_TEXT)`` pour
une post-correction qui part d'un OCR pré-calculé.
steps:
Étapes du DAG, ordonnées par dépendance topologique
d'exécution. Si une étape ``s2`` dépend de ``s1``, alors
``s1`` apparaît avant ``s2``. ``validate_spec`` détecte
les violations.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
name: str = Field(min_length=1, max_length=128)
description: str = ""
initial_inputs: tuple[ArtifactType, ...] = Field(default_factory=tuple)
steps: tuple[PipelineStep, ...] = Field(default_factory=tuple)
def step_by_id(self, step_id: str) -> PipelineStep | None:
for s in self.steps:
if s.id == step_id:
return s
return None
__all__ = ["PipelineStep", "PipelineSpec", "INITIAL_STEP_ID"]
|