Spaces:
Sleeping
Sleeping
Claude
feat(pipeline): Sprint A14-S6 — PipelineSpec déclaratif + validation + YAML round-trip
b9ff8de unverified | """``validate_spec`` — Sprint A14-S6. | |
| Validation statique d'une ``PipelineSpec`` : vérifier que les | |
| types s'enchaînent, qu'il n'y a pas d'IDs dupliqués, que les | |
| références ``inputs_from`` pointent bien vers des étapes | |
| antérieures qui produisent le bon type, et (optionnellement) que | |
| les ``adapter_name`` existent dans un registre fourni. | |
| S'exécute **sans instancier aucun adapter** — c'est le bénéfice | |
| clé de la séparation déclaratif/runtime du S6. | |
| API : | |
| >>> errors = validate_spec(spec) | |
| >>> if errors: | |
| ... for e in errors: | |
| ... print(f"{e.step_id}: {e.message}") | |
| Le caller décide de la suite — typiquement un service applicatif | |
| refuse de démarrer un run si la spec a des erreurs. | |
| Anti-sur-ingénierie | |
| ------------------- | |
| Pas de détection de cycles graphes complexe (le DAG est exprimé | |
| par ordre des steps, donc impossible de référencer une étape | |
| postérieure : si tu as une boucle, c'est qu'une référence pointe | |
| vers un nom inconnu, déjà détecté). | |
| Pas de validation des params (chaque adapter validera les siens | |
| au moment de l'exécution — le format libre des params est un | |
| choix assumé). | |
| """ | |
| from __future__ import annotations | |
| from pydantic import BaseModel, ConfigDict | |
| from picarones.domain.artifacts import ArtifactType | |
| from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec, PipelineStep | |
| class ValidationError(BaseModel): | |
| """Une erreur de validation d'une ``PipelineSpec``. | |
| Format structuré pour faciliter le rendu (CLI, rapport, JSON). | |
| Volontairement plat — pas de hiérarchie d'erreurs ; on ajoute | |
| un ``code`` discriminant si un caller en a besoin. | |
| """ | |
| model_config = ConfigDict(frozen=True, extra="forbid") | |
| step_id: str | None | |
| """Step concerné, ou ``None`` pour les erreurs globales (DAG vide, | |
| ID dupliqué détecté entre deux steps...).""" | |
| code: str | |
| """Identifiant court (``"duplicate_id"``, ``"unknown_adapter"``, | |
| ``"missing_input"``, ``"unknown_input_source"``, ...). Permet | |
| à un test d'asserter sur le code plutôt que sur le message | |
| français. | |
| """ | |
| message: str | |
| """Description humainement lisible (français).""" | |
| def validate_spec( | |
| spec: PipelineSpec, | |
| available_adapters: set[str] | None = None, | |
| ) -> list[ValidationError]: | |
| """Vérifie une ``PipelineSpec`` et retourne la liste des erreurs. | |
| Parameters | |
| ---------- | |
| spec: | |
| La spec à valider. | |
| available_adapters: | |
| Set des noms d'adapters connus. Si fourni, chaque | |
| ``adapter_name`` du DAG est vérifié. Si ``None`` (défaut), | |
| cette validation est sautée — utile pour les tests qui | |
| valident la cohérence d'un YAML sans avoir le runtime | |
| chargé. | |
| Returns | |
| ------- | |
| list[ValidationError] | |
| Liste vide si la spec est valide ; sinon un ou plusieurs | |
| problèmes (ne s'arrête pas à la première erreur — le | |
| caller veut tout voir d'un coup). | |
| """ | |
| errors: list[ValidationError] = [] | |
| # -- 0. Steps absents | |
| if not spec.steps: | |
| errors.append(ValidationError( | |
| step_id=None, | |
| code="empty_pipeline", | |
| message="pipeline vide : au moins une étape est requise", | |
| )) | |
| return errors # impossible de continuer | |
| # -- 1. IDs dupliqués | |
| seen_ids: dict[str, int] = {} | |
| for i, step in enumerate(spec.steps): | |
| if step.id in seen_ids: | |
| errors.append(ValidationError( | |
| step_id=step.id, | |
| code="duplicate_id", | |
| message=( | |
| f"id dupliqué : '{step.id}' apparaît à l'étape {i} " | |
| f"et précédemment à {seen_ids[step.id]}" | |
| ), | |
| )) | |
| else: | |
| seen_ids[step.id] = i | |
| # -- 2. Adapter inconnu (si registre fourni) | |
| if available_adapters is not None: | |
| for step in spec.steps: | |
| if step.adapter_name not in available_adapters: | |
| errors.append(ValidationError( | |
| step_id=step.id, | |
| code="unknown_adapter", | |
| message=( | |
| f"adapter '{step.adapter_name}' non disponible. " | |
| f"Adapters connus : {sorted(available_adapters)}" | |
| ), | |
| )) | |
| # -- 3. Cohérence des types et des références inputs_from | |
| # On simule un parcours topologique en ordre de spec.steps. | |
| # À chaque étape : | |
| # a) Tout type de input_types doit être disponible (soit | |
| # initial, soit produit par une étape antérieure). | |
| # b) Si inputs_from[type] = "src", "src" doit être une étape | |
| # antérieure connue (ou "__initial__") qui produit ce type. | |
| # Map { step_id (ou "__initial__") -> set(types qu'elle produit) }. | |
| step_outputs: dict[str, set[ArtifactType]] = { | |
| INITIAL_STEP_ID: set(spec.initial_inputs), | |
| } | |
| # Set des types disponibles à un instant t (latest seulement). | |
| available: set[ArtifactType] = set(spec.initial_inputs) | |
| for step in spec.steps: | |
| errors.extend(_validate_step_against_state( | |
| step=step, | |
| step_outputs=step_outputs, | |
| available=available, | |
| )) | |
| # Mise à jour de l'état pour les étapes suivantes. | |
| step_outputs[step.id] = set(step.output_types) | |
| available.update(step.output_types) | |
| return errors | |
| def _validate_step_against_state( | |
| *, | |
| step: PipelineStep, | |
| step_outputs: dict[str, set[ArtifactType]], | |
| available: set[ArtifactType], | |
| ) -> list[ValidationError]: | |
| """Valide une étape donnée contre l'état des types | |
| disponibles et des outputs des étapes antérieures.""" | |
| errors: list[ValidationError] = [] | |
| # 3.a — entrées disponibles | |
| missing = [t for t in step.input_types if t not in available] | |
| if missing: | |
| errors.append(ValidationError( | |
| step_id=step.id, | |
| code="missing_input", | |
| message=( | |
| f"types d'entrée non disponibles : " | |
| f"{[t.value for t in missing]}. " | |
| f"Disponibles : {sorted(t.value for t in available)}" | |
| ), | |
| )) | |
| # 3.b — références inputs_from | |
| for ref_type, ref_step in step.inputs_from.items(): | |
| if ref_type not in step.input_types: | |
| errors.append(ValidationError( | |
| step_id=step.id, | |
| code="inputs_from_unused", | |
| message=( | |
| f"inputs_from[{ref_type.value}]={ref_step!r} " | |
| "mais l'étape ne consomme pas ce type " | |
| f"(input_types = {[t.value for t in step.input_types]})" | |
| ), | |
| )) | |
| continue | |
| if ref_step not in step_outputs: | |
| errors.append(ValidationError( | |
| step_id=step.id, | |
| code="unknown_input_source", | |
| message=( | |
| f"inputs_from[{ref_type.value}]={ref_step!r} " | |
| "ne désigne pas une étape antérieure connue " | |
| f"({INITIAL_STEP_ID!r} pour les entrées initiales)" | |
| ), | |
| )) | |
| continue | |
| if ref_type not in step_outputs[ref_step]: | |
| errors.append(ValidationError( | |
| step_id=step.id, | |
| code="source_does_not_produce_type", | |
| message=( | |
| f"inputs_from[{ref_type.value}]={ref_step!r} " | |
| f"mais '{ref_step}' ne produit pas {ref_type.value!r}" | |
| ), | |
| )) | |
| return errors | |
| __all__ = ["validate_spec", "ValidationError"] | |