Spaces:
Sleeping
Sleeping
| """Loader YAML pour spécifier des pipelines composées (Sprint 70). | |
| Sprint 70 — Étape 4 / axe B du plan d'évolution 2026 : permet de | |
| décrire une ``PipelineSpec`` (Sprint 63) ou une comparaison de N | |
| pipelines (Sprint 65) dans un fichier **YAML déclaratif**, sans | |
| écrire de code Python. | |
| Philosophie inchangée | |
| --------------------- | |
| Picarones reste un **banc d'essai**, pas un atelier de production. | |
| Le YAML ne crée pas de modules — il **référence** des classes | |
| ``BaseModule`` que l'utilisateur a installées dans son environnement | |
| (via ``pip install`` ou en plaçant le module dans le ``PYTHONPATH``). | |
| Format YAML — pipeline simple | |
| ----------------------------- | |
| .. code-block:: yaml | |
| name: ocr_then_correct | |
| steps: | |
| - name: ocr | |
| module: my_package.my_ocr.MyOCR | |
| args: | |
| tesseract_path: /usr/bin/tesseract | |
| - name: correct | |
| module: my_package.correctors.LLMCorrector | |
| args: | |
| model: gpt-4 | |
| inputs_from: | |
| text: ocr | |
| - ``name`` : nom de la pipeline (chaîne) | |
| - ``steps`` : liste d'étapes | |
| - ``steps[*].name`` : nom de l'étape (utilisé dans le rapport) | |
| - ``steps[*].module`` : **chemin Python pointé** vers la classe | |
| ``BaseModule`` à instancier | |
| - ``steps[*].args`` : kwargs du constructeur (optionnel) | |
| - ``steps[*].inputs_from`` : map ``{type: source_step}`` pour le | |
| DAG branchant Sprint 66 (optionnel) | |
| Format YAML — comparaison de N pipelines | |
| ----------------------------------------- | |
| .. code-block:: yaml | |
| name: comparaison | |
| pipelines: | |
| - name: baseline | |
| steps: [...] | |
| - name: with_correcteur_a | |
| steps: [...] | |
| Limites documentées | |
| ------------------- | |
| - Les valeurs ``args`` doivent être sérialisables en YAML (str, | |
| int, float, bool, list, dict). Pas de support pour des objets | |
| Python complexes en argument. | |
| - L'import dynamique repose sur ``importlib.import_module`` ; | |
| la classe doit être accessible depuis l'environnement Python. | |
| """ | |
| from __future__ import annotations | |
| import importlib | |
| import logging | |
| from pathlib import Path | |
| from typing import Any | |
| from picarones.domain.artifacts import ArtifactType | |
| from picarones.domain.module_protocol import BaseModule | |
| from picarones.core.pipeline import PipelineSpec, PipelineStep | |
| logger = logging.getLogger(__name__) | |
| class PipelineSpecLoadError(ValueError): | |
| """Erreur levée lors du chargement d'une spec YAML invalide.""" | |
| def _resolve_class(dotted_path: str) -> type: | |
| """Importe et retourne la classe désignée par ``dotted_path``. | |
| Format attendu : ``"package.module.ClassName"``. | |
| """ | |
| if not isinstance(dotted_path, str) or "." not in dotted_path: | |
| raise PipelineSpecLoadError( | |
| f"chemin Python invalide : {dotted_path!r} " | |
| f"(attendu : 'package.module.ClassName')" | |
| ) | |
| module_path, _sep, class_name = dotted_path.rpartition(".") | |
| try: | |
| module = importlib.import_module(module_path) | |
| except ImportError as exc: | |
| raise PipelineSpecLoadError( | |
| f"module {module_path!r} introuvable : {exc}" | |
| ) from exc | |
| if not hasattr(module, class_name): | |
| raise PipelineSpecLoadError( | |
| f"classe {class_name!r} introuvable dans {module_path!r}" | |
| ) | |
| cls = getattr(module, class_name) | |
| if not isinstance(cls, type): | |
| raise PipelineSpecLoadError( | |
| f"{dotted_path!r} n'est pas une classe (type : {type(cls).__name__})" | |
| ) | |
| return cls | |
| def _instantiate_module(dotted_path: str, args: dict[str, Any]) -> BaseModule: | |
| """Instancie un ``BaseModule`` depuis son dotted path + kwargs.""" | |
| cls = _resolve_class(dotted_path) | |
| if not issubclass(cls, BaseModule): | |
| raise PipelineSpecLoadError( | |
| f"{dotted_path!r} n'est pas une sous-classe de BaseModule" | |
| ) | |
| try: | |
| instance = cls(**args) | |
| except TypeError as exc: | |
| raise PipelineSpecLoadError( | |
| f"impossible d'instancier {dotted_path!r} avec args={args!r} : {exc}" | |
| ) from exc | |
| return instance | |
| def _parse_inputs_from( | |
| raw: Any, step_name: str, | |
| ) -> dict[ArtifactType, str]: | |
| """Parse le champ ``inputs_from`` d'un step YAML.""" | |
| if not raw: | |
| return {} | |
| if not isinstance(raw, dict): | |
| raise PipelineSpecLoadError( | |
| f"étape {step_name!r} : ``inputs_from`` doit être un dict, " | |
| f"pas {type(raw).__name__}" | |
| ) | |
| out: dict[ArtifactType, str] = {} | |
| for key, value in raw.items(): | |
| try: | |
| at = ArtifactType(key) | |
| except ValueError as exc: | |
| raise PipelineSpecLoadError( | |
| f"étape {step_name!r} : type d'artefact inconnu " | |
| f"dans inputs_from : {key!r}" | |
| ) from exc | |
| if not isinstance(value, str) or not value: | |
| raise PipelineSpecLoadError( | |
| f"étape {step_name!r} : inputs_from[{key!r}] doit " | |
| f"être un nom d'étape (str non vide)" | |
| ) | |
| out[at] = value | |
| return out | |
| def _build_step(raw: dict, index: int) -> PipelineStep: | |
| if not isinstance(raw, dict): | |
| raise PipelineSpecLoadError( | |
| f"étape {index} : entrée doit être un dict YAML, " | |
| f"pas {type(raw).__name__}" | |
| ) | |
| name = raw.get("name") | |
| if not name or not isinstance(name, str): | |
| raise PipelineSpecLoadError( | |
| f"étape {index} : champ ``name`` requis (str)" | |
| ) | |
| module_path = raw.get("module") | |
| if not module_path or not isinstance(module_path, str): | |
| raise PipelineSpecLoadError( | |
| f"étape {name!r} : champ ``module`` requis (dotted path Python)" | |
| ) | |
| args = raw.get("args") or {} | |
| if not isinstance(args, dict): | |
| raise PipelineSpecLoadError( | |
| f"étape {name!r} : ``args`` doit être un dict, " | |
| f"pas {type(args).__name__}" | |
| ) | |
| instance = _instantiate_module(module_path, args) | |
| inputs_from = _parse_inputs_from(raw.get("inputs_from"), name) | |
| return PipelineStep( | |
| name=name, module=instance, inputs_from=inputs_from, | |
| ) | |
| def load_pipeline_spec_from_dict(data: dict) -> PipelineSpec: | |
| """Construit une ``PipelineSpec`` depuis un dict (déjà parsé YAML). | |
| Utile pour les tests qui veulent sauter l'étape de parsing. | |
| """ | |
| if not isinstance(data, dict): | |
| raise PipelineSpecLoadError( | |
| f"document YAML doit être un mapping, pas {type(data).__name__}" | |
| ) | |
| name = data.get("name") | |
| if not name or not isinstance(name, str): | |
| raise PipelineSpecLoadError( | |
| "champ ``name`` requis au niveau racine" | |
| ) | |
| raw_steps = data.get("steps") | |
| if not raw_steps or not isinstance(raw_steps, list): | |
| raise PipelineSpecLoadError( | |
| "champ ``steps`` requis (liste non vide)" | |
| ) | |
| steps = [_build_step(s, i) for i, s in enumerate(raw_steps)] | |
| return PipelineSpec(name=name, steps=steps) | |
| def load_pipeline_spec_from_yaml(path: Path | str) -> PipelineSpec: | |
| """Charge un fichier YAML et construit la ``PipelineSpec``. | |
| Lève ``PipelineSpecLoadError`` si le fichier n'est pas trouvé, | |
| si le YAML est invalide, ou si la spec ne respecte pas le | |
| format attendu. | |
| """ | |
| try: | |
| import yaml | |
| except ImportError as exc: # pragma: no cover | |
| raise PipelineSpecLoadError( | |
| "PyYAML requis pour charger une spec YAML " | |
| "(pip install pyyaml)" | |
| ) from exc | |
| p = Path(path) | |
| if not p.exists(): | |
| raise PipelineSpecLoadError(f"fichier introuvable : {p}") | |
| try: | |
| data = yaml.safe_load(p.read_text(encoding="utf-8")) | |
| except yaml.YAMLError as exc: | |
| raise PipelineSpecLoadError(f"YAML invalide : {exc}") from exc | |
| return load_pipeline_spec_from_dict(data) | |
| def load_comparison_specs_from_dict(data: dict) -> list[PipelineSpec]: | |
| """Construit une liste de ``PipelineSpec`` depuis un dict | |
| contenant ``pipelines`` (comparaison Sprint 65).""" | |
| if not isinstance(data, dict): | |
| raise PipelineSpecLoadError( | |
| f"document YAML doit être un mapping, pas {type(data).__name__}" | |
| ) | |
| raw_pipelines = data.get("pipelines") | |
| if not raw_pipelines or not isinstance(raw_pipelines, list): | |
| raise PipelineSpecLoadError( | |
| "champ ``pipelines`` requis (liste non vide)" | |
| ) | |
| return [load_pipeline_spec_from_dict(p) for p in raw_pipelines] | |
| def load_comparison_specs_from_yaml( | |
| path: Path | str, | |
| ) -> tuple[list[PipelineSpec], dict]: | |
| """Charge un fichier YAML décrivant une comparaison. | |
| Retourne un tuple ``(specs, extras)`` où ``extras`` est le | |
| dict YAML brut (utile pour récupérer ``baseline``, | |
| ``rankings``, etc. au niveau du document). | |
| """ | |
| try: | |
| import yaml | |
| except ImportError as exc: # pragma: no cover | |
| raise PipelineSpecLoadError( | |
| "PyYAML requis pour charger une spec YAML" | |
| ) from exc | |
| p = Path(path) | |
| if not p.exists(): | |
| raise PipelineSpecLoadError(f"fichier introuvable : {p}") | |
| try: | |
| data = yaml.safe_load(p.read_text(encoding="utf-8")) | |
| except yaml.YAMLError as exc: | |
| raise PipelineSpecLoadError(f"YAML invalide : {exc}") from exc | |
| return load_comparison_specs_from_dict(data), data | |
| __all__ = [ | |
| "PipelineSpecLoadError", | |
| "load_pipeline_spec_from_dict", | |
| "load_pipeline_spec_from_yaml", | |
| "load_comparison_specs_from_dict", | |
| "load_comparison_specs_from_yaml", | |
| ] | |