Spaces:
Sleeping
Sleeping
File size: 9,553 Bytes
70584c1 75b91fd 979f3c3 70584c1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | """Loader YAML pour spécifier des pipelines composées (Sprint 70).
Sprint 70 — Étape 4 / axe B du plan d'évolution 2026 : permet de
décrire une ``PipelineSpec`` (Sprint 63) ou une comparaison de N
pipelines (Sprint 65) dans un fichier **YAML déclaratif**, sans
écrire de code Python.
Philosophie inchangée
---------------------
Picarones reste un **banc d'essai**, pas un atelier de production.
Le YAML ne crée pas de modules — il **référence** des classes
``BaseModule`` que l'utilisateur a installées dans son environnement
(via ``pip install`` ou en plaçant le module dans le ``PYTHONPATH``).
Format YAML — pipeline simple
-----------------------------
.. code-block:: yaml
name: ocr_then_correct
steps:
- name: ocr
module: my_package.my_ocr.MyOCR
args:
tesseract_path: /usr/bin/tesseract
- name: correct
module: my_package.correctors.LLMCorrector
args:
model: gpt-4
inputs_from:
text: ocr
- ``name`` : nom de la pipeline (chaîne)
- ``steps`` : liste d'étapes
- ``steps[*].name`` : nom de l'étape (utilisé dans le rapport)
- ``steps[*].module`` : **chemin Python pointé** vers la classe
``BaseModule`` à instancier
- ``steps[*].args`` : kwargs du constructeur (optionnel)
- ``steps[*].inputs_from`` : map ``{type: source_step}`` pour le
DAG branchant Sprint 66 (optionnel)
Format YAML — comparaison de N pipelines
-----------------------------------------
.. code-block:: yaml
name: comparaison
pipelines:
- name: baseline
steps: [...]
- name: with_correcteur_a
steps: [...]
Limites documentées
-------------------
- Les valeurs ``args`` doivent être sérialisables en YAML (str,
int, float, bool, list, dict). Pas de support pour des objets
Python complexes en argument.
- L'import dynamique repose sur ``importlib.import_module`` ;
la classe doit être accessible depuis l'environnement Python.
"""
from __future__ import annotations
import importlib
import logging
from pathlib import Path
from typing import Any
from picarones.domain.artifacts import ArtifactType
from picarones.domain.module_protocol import BaseModule
from picarones.core.pipeline import PipelineSpec, PipelineStep
logger = logging.getLogger(__name__)
class PipelineSpecLoadError(ValueError):
"""Erreur levée lors du chargement d'une spec YAML invalide."""
def _resolve_class(dotted_path: str) -> type:
"""Importe et retourne la classe désignée par ``dotted_path``.
Format attendu : ``"package.module.ClassName"``.
"""
if not isinstance(dotted_path, str) or "." not in dotted_path:
raise PipelineSpecLoadError(
f"chemin Python invalide : {dotted_path!r} "
f"(attendu : 'package.module.ClassName')"
)
module_path, _sep, class_name = dotted_path.rpartition(".")
try:
module = importlib.import_module(module_path)
except ImportError as exc:
raise PipelineSpecLoadError(
f"module {module_path!r} introuvable : {exc}"
) from exc
if not hasattr(module, class_name):
raise PipelineSpecLoadError(
f"classe {class_name!r} introuvable dans {module_path!r}"
)
cls = getattr(module, class_name)
if not isinstance(cls, type):
raise PipelineSpecLoadError(
f"{dotted_path!r} n'est pas une classe (type : {type(cls).__name__})"
)
return cls
def _instantiate_module(dotted_path: str, args: dict[str, Any]) -> BaseModule:
"""Instancie un ``BaseModule`` depuis son dotted path + kwargs."""
cls = _resolve_class(dotted_path)
if not issubclass(cls, BaseModule):
raise PipelineSpecLoadError(
f"{dotted_path!r} n'est pas une sous-classe de BaseModule"
)
try:
instance = cls(**args)
except TypeError as exc:
raise PipelineSpecLoadError(
f"impossible d'instancier {dotted_path!r} avec args={args!r} : {exc}"
) from exc
return instance
def _parse_inputs_from(
raw: Any, step_name: str,
) -> dict[ArtifactType, str]:
"""Parse le champ ``inputs_from`` d'un step YAML."""
if not raw:
return {}
if not isinstance(raw, dict):
raise PipelineSpecLoadError(
f"étape {step_name!r} : ``inputs_from`` doit être un dict, "
f"pas {type(raw).__name__}"
)
out: dict[ArtifactType, str] = {}
for key, value in raw.items():
try:
at = ArtifactType(key)
except ValueError as exc:
raise PipelineSpecLoadError(
f"étape {step_name!r} : type d'artefact inconnu "
f"dans inputs_from : {key!r}"
) from exc
if not isinstance(value, str) or not value:
raise PipelineSpecLoadError(
f"étape {step_name!r} : inputs_from[{key!r}] doit "
f"être un nom d'étape (str non vide)"
)
out[at] = value
return out
def _build_step(raw: dict, index: int) -> PipelineStep:
if not isinstance(raw, dict):
raise PipelineSpecLoadError(
f"étape {index} : entrée doit être un dict YAML, "
f"pas {type(raw).__name__}"
)
name = raw.get("name")
if not name or not isinstance(name, str):
raise PipelineSpecLoadError(
f"étape {index} : champ ``name`` requis (str)"
)
module_path = raw.get("module")
if not module_path or not isinstance(module_path, str):
raise PipelineSpecLoadError(
f"étape {name!r} : champ ``module`` requis (dotted path Python)"
)
args = raw.get("args") or {}
if not isinstance(args, dict):
raise PipelineSpecLoadError(
f"étape {name!r} : ``args`` doit être un dict, "
f"pas {type(args).__name__}"
)
instance = _instantiate_module(module_path, args)
inputs_from = _parse_inputs_from(raw.get("inputs_from"), name)
return PipelineStep(
name=name, module=instance, inputs_from=inputs_from,
)
def load_pipeline_spec_from_dict(data: dict) -> PipelineSpec:
"""Construit une ``PipelineSpec`` depuis un dict (déjà parsé YAML).
Utile pour les tests qui veulent sauter l'étape de parsing.
"""
if not isinstance(data, dict):
raise PipelineSpecLoadError(
f"document YAML doit être un mapping, pas {type(data).__name__}"
)
name = data.get("name")
if not name or not isinstance(name, str):
raise PipelineSpecLoadError(
"champ ``name`` requis au niveau racine"
)
raw_steps = data.get("steps")
if not raw_steps or not isinstance(raw_steps, list):
raise PipelineSpecLoadError(
"champ ``steps`` requis (liste non vide)"
)
steps = [_build_step(s, i) for i, s in enumerate(raw_steps)]
return PipelineSpec(name=name, steps=steps)
def load_pipeline_spec_from_yaml(path: Path | str) -> PipelineSpec:
"""Charge un fichier YAML et construit la ``PipelineSpec``.
Lève ``PipelineSpecLoadError`` si le fichier n'est pas trouvé,
si le YAML est invalide, ou si la spec ne respecte pas le
format attendu.
"""
try:
import yaml
except ImportError as exc: # pragma: no cover
raise PipelineSpecLoadError(
"PyYAML requis pour charger une spec YAML "
"(pip install pyyaml)"
) from exc
p = Path(path)
if not p.exists():
raise PipelineSpecLoadError(f"fichier introuvable : {p}")
try:
data = yaml.safe_load(p.read_text(encoding="utf-8"))
except yaml.YAMLError as exc:
raise PipelineSpecLoadError(f"YAML invalide : {exc}") from exc
return load_pipeline_spec_from_dict(data)
def load_comparison_specs_from_dict(data: dict) -> list[PipelineSpec]:
"""Construit une liste de ``PipelineSpec`` depuis un dict
contenant ``pipelines`` (comparaison Sprint 65)."""
if not isinstance(data, dict):
raise PipelineSpecLoadError(
f"document YAML doit être un mapping, pas {type(data).__name__}"
)
raw_pipelines = data.get("pipelines")
if not raw_pipelines or not isinstance(raw_pipelines, list):
raise PipelineSpecLoadError(
"champ ``pipelines`` requis (liste non vide)"
)
return [load_pipeline_spec_from_dict(p) for p in raw_pipelines]
def load_comparison_specs_from_yaml(
path: Path | str,
) -> tuple[list[PipelineSpec], dict]:
"""Charge un fichier YAML décrivant une comparaison.
Retourne un tuple ``(specs, extras)`` où ``extras`` est le
dict YAML brut (utile pour récupérer ``baseline``,
``rankings``, etc. au niveau du document).
"""
try:
import yaml
except ImportError as exc: # pragma: no cover
raise PipelineSpecLoadError(
"PyYAML requis pour charger une spec YAML"
) from exc
p = Path(path)
if not p.exists():
raise PipelineSpecLoadError(f"fichier introuvable : {p}")
try:
data = yaml.safe_load(p.read_text(encoding="utf-8"))
except yaml.YAMLError as exc:
raise PipelineSpecLoadError(f"YAML invalide : {exc}") from exc
return load_comparison_specs_from_dict(data), data
__all__ = [
"PipelineSpecLoadError",
"load_pipeline_spec_from_dict",
"load_pipeline_spec_from_yaml",
"load_comparison_specs_from_dict",
"load_comparison_specs_from_yaml",
]
|