File size: 7,494 Bytes
166b00b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d3ba70
 
 
 
166b00b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""``PipelineStep`` et ``PipelineSpec`` — Sprints A14-S6 / S40.

Description **purement déclarative** d'un DAG de transformation
documentaire.  Sérialisable en YAML, versionnable en git, valide
sans avoir besoin d'instancier les modules concrets.

Sprint S40 — migration depuis ``picarones.pipeline.spec``
---------------------------------------------------------
Le module canonique est désormais en cercle 1 (``picarones/domain/``)
— c'est un type pur qui n'a aucune dépendance d'exécution
(``picarones/pipeline/`` qui contient le runtime n'est en fait pas
nécessaire pour décrire la spec).  ``picarones.pipeline.spec`` reste
exposé en re-export pour ne pas casser les callers existants — ce
n'est pas un shim au sens architectural (adaptation d'une API
incompatible) mais un alias de chemin.

Différence avec ``picarones.evaluation.pipeline`` (Sprint 63)
-------------------------------------------------------------
``PipelineStep`` legacy (relocalisé en ``picarones.evaluation.pipeline``)
porte un champ ``module: BaseModule``
— une **instance** d'objet exécutable.  Conséquence : la spec
n'était pas sérialisable en YAML, et un test qui voulait juste
valider la cohérence des types devait instancier des stubs.

Ici, ``PipelineStep`` ne porte qu'un ``adapter_name: str``.  Le
mapping ``nom → instance`` est maintenu par un service applicatif
(``picarones.app.services.adapter_registry`` au S19) et résolu au
moment de l'exécution, pas de la spec.

Bénéfices :

- Le YAML d'une pipeline composée est versionnable en git
  indépendamment de l'environnement Python (BnF peut commit
  ``ocr_llm_alto_remap.yaml`` sans imposer aux contributeurs
  d'avoir tous les SDK installés).
- ``validate_spec`` peut s'exécuter sans instancier aucun module
  → tests rapides et déterministes.
- Le rapport de reproductibilité peut citer le YAML exact, le
  commit du code et la version des adapters utilisés —
  séparation propre de la déclaration et de l'implémentation.

Anti-sur-ingénierie
-------------------
- Pas de typage des ``params`` par adapter ici (chaque adapter
  validera ses propres params au moment de l'exécution).
- Pas de versioning de spec — un nouveau champ se traduit par un
  rebump pydantic.  Si on veut migrer entre versions de schéma,
  on l'ajoutera quand le besoin sera concret.
- Pas d'``outputs_preferred`` (mapping logique "preferred_text =
  step3.RAW_TEXT").  Reporté quand un caller en aura concrètement
  besoin.
"""

from __future__ import annotations

import re

from pydantic import BaseModel, ConfigDict, Field, field_validator

from picarones.domain.artifacts import ArtifactType


#: Identifiant d'étape — alphanum + ``_-``.  Doit être un nom court
#: lisible par un humain dans les logs et le rapport.
_STEP_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")

#: Sentinel pour ``inputs_from`` qui désigne les artefacts initiaux
#: fournis au runner (typiquement ``IMAGE``).
INITIAL_STEP_ID = "__initial__"


class PipelineStep(BaseModel):
    """Une étape déclarative dans un DAG de pipeline.

    Attributs
    ---------
    id:
        Identifiant unique de l'étape dans la pipeline (alphanum +
        ``_-``).  Sert dans les logs, le rapport, et comme cible
        des références ``inputs_from`` des étapes en aval.
    kind:
        Catégorie informationnelle de l'étape (``"ocr"``,
        ``"post_correction"``, ``"alto_remapping"``,
        ``"alto_reconstruction"``, etc.).  Pas de validation
        d'enum — c'est un label libre que les services et le
        rapport peuvent grouper.  Par convention, en
        ``snake_case``.
    adapter_name:
        Nom de l'adapter dans le registre runtime (résolu par
        ``app/services`` au S19).  Convention :
        ``"<provider>:<engine_or_model>"`` (ex : ``"tesseract"``,
        ``"openai:gpt-4o"``, ``"mistral:large"``,
        ``"<vendor>:<custom_module>"``).
    params:
        Paramètres passés à l'adapter au moment de l'exécution.
        Format libre (chaque adapter valide les siens) — typage
        scalaire pour rester sérialisable en YAML.
    input_types:
        Types d'artefacts consommés par l'étape.  Validés par
        ``validate_spec`` contre les outputs des étapes antérieures.
    output_types:
        Types d'artefacts produits.  Validés au runtime par
        l'executor (qui vérifie que tous les types déclarés sont
        bien dans le dict retourné par l'adapter).
    inputs_from:
        DAG branchant (héritage du Sprint 66).  Pour chaque type
        d'entrée, désigne explicitement l'étape source.  La chaîne
        spéciale ``"__initial__"`` désigne les entrées initiales
        du runner.  Si le dict est vide, l'executor prend la
        version la plus récente de chaque type dans le bag.
    """

    model_config = ConfigDict(frozen=True, extra="forbid")

    id: str = Field(min_length=1, max_length=128)
    kind: str = Field(min_length=1, max_length=64)
    adapter_name: str = Field(min_length=1, max_length=256)
    params: dict[str, str | int | float | bool] = Field(default_factory=dict)
    input_types: tuple[ArtifactType, ...] = Field(default_factory=tuple)
    output_types: tuple[ArtifactType, ...] = Field(default_factory=tuple)
    inputs_from: dict[ArtifactType, str] = Field(default_factory=dict)

    @field_validator("id")
    @classmethod
    def _validate_step_id(cls, v: str) -> str:
        if not _STEP_ID_RE.match(v):
            from picarones.domain.errors import PicaronesError
            raise PicaronesError(
                f"step id invalide : {v!r}.  "
                f"Doit matcher {_STEP_ID_RE.pattern!r} (alphanum + _-)."
            )
        if v == INITIAL_STEP_ID:
            from picarones.domain.errors import PicaronesError
            raise PicaronesError(
                f"step id réservé : {INITIAL_STEP_ID!r} désigne "
                "les entrées initiales du runner."
            )
        return v


class PipelineSpec(BaseModel):
    """DAG déclaratif d'une pipeline composée.

    Sérialisable en YAML via ``model_dump()`` + ``yaml.safe_dump``,
    chargeable via ``model_validate(yaml.safe_load(text))``.  Le
    round-trip est testé.

    Attributs
    ---------
    name:
        Nom court de la pipeline (utilisé dans les logs, le cache,
        le rapport).  Convention ``snake_case``.
    description:
        Phrase courte d'introduction affichée dans le rapport.
    initial_inputs:
        Types d'artefacts qui doivent être fournis par le caller
        au moment de l'exécution.  Convention : ``(IMAGE,)`` pour
        une pipeline OCR classique, ``(IMAGE, RAW_TEXT)`` pour
        une post-correction qui part d'un OCR pré-calculé.
    steps:
        Étapes du DAG, ordonnées par dépendance topologique
        d'exécution.  Si une étape ``s2`` dépend de ``s1``, alors
        ``s1`` apparaît avant ``s2``.  ``validate_spec`` détecte
        les violations.
    """

    model_config = ConfigDict(frozen=True, extra="forbid")

    name: str = Field(min_length=1, max_length=128)
    description: str = ""
    initial_inputs: tuple[ArtifactType, ...] = Field(default_factory=tuple)
    steps: tuple[PipelineStep, ...] = Field(default_factory=tuple)

    def step_by_id(self, step_id: str) -> PipelineStep | None:
        for s in self.steps:
            if s.id == step_id:
                return s
        return None


__all__ = ["PipelineStep", "PipelineSpec", "INITIAL_STEP_ID"]