Claude commited on
Commit
fcef144
·
unverified ·
1 Parent(s): 8a4d05b

feat(migration): Phase B2.3 — partial_dir resume pivoté par pipeline

Browse files

Phase B2.3 du chantier Option B. Le RunOrchestrator supporte
maintenant la reprise sur interruption via spec.partial_dir.
Atteint le **Checkpoint C1** : toutes les 7 features de
run_benchmark_via_service sont portées dans RunOrchestrator.

Nouveau module picarones/app/services/_orchestrator_partial.py (256 LOC)
- compute_pipeline_fingerprint() : SHA-256 stable basé sur
structure pipeline + normalization + char_exclude + profile +
doc_ids (stables cross-workspace) + code_version. Utilise
compute_run_fingerprint du partial_store legacy comme base.
- partial_path_for_pipeline() : chemin canonique du JSONL.
- load_partial_pipeline_results() : reconstruit list[PipelineResult]
via PipelineResult.model_validate_json. Tolérance : lignes
corrompues sautées avec warning.
- append_pipeline_result() : append-only au format JSONL.
- delete_partial() : cleanup idempotent.
- filter_remaining_documents() : déduplique + filtre les docs déjà
persistés.

RunOrchestrator._execute_with_partial() (run_orchestrator.py)
- Pivot par pipeline : un sub-bench.run() par pipeline avec uniquement
les docs manquants.
- Append au partial au fil de l'eau (un crash mid-pipeline préserve
les docs déjà persistés).
- Cleanup du partial à la fin d'un pipeline complet.
- Reconstruction du RunResult final en mergeant loaded + new
PipelineResult par doc_id, dans l'ordre du corpus original.
- Synthèse d'un RunManifest cohérent (start/completed du 1er sub-run,
metadata.fully_resumed flag quand 100% rechargé).

Limites volontaires (scope B2.3) :
- ViewResult préservés uniquement pour les sub-runs courants
(pas pour les PipelineResult rechargés depuis partial).
- Le fingerprint utilise doc_id (stable) au lieu de mtime+size
(qui divergerait entre workspaces) — ne détecte pas une
modification du contenu d'un doc à id identique.

Tests : 4 cas dans TestParityPartialDir + 1 cas TestParityAllFeaturesCombined
- test_partial_dir_fresh_start_creates_no_orphan_files :
run complet → cleanup du partial.
- test_partial_dir_resume_after_complete_pipeline :
partial complet pré-existant → pipeline skippé, manifest porte
fully_resumed=true.
- test_partial_dir_fingerprint_isolation :
partial avec fingerprint divergent → ignoré, run propre.
- test_combined_features_produce_coherent_result :
GATE FINALE checkpoint C1 — toutes les 7 features actives
simultanément (progress + cancel + partial + entity_extractor +
char_exclude + normalization + profile + output_json).
Vérifie cohérence du BenchmarkResult legacy produit.

Total feature parity : 23 tests verts (tous les test_parity_*).
Aucune feature legacy n'est plus ignorée par le RunOrchestrator.

Budgets : run_orchestrator.py 1107 LOC (budget 1300).

Invariance : test_migration_invariance.py reste vert.

Prochaine phase : B3 (migrer les call sites publics CLI/Web vers
RunOrchestrator).

picarones/app/services/_orchestrator_partial.py ADDED
@@ -0,0 +1,256 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Phase B2.3 — reprise sur interruption pour ``RunOrchestrator``.
2
+
3
+ Pivote par **pipeline** (vs par **engine** dans le legacy
4
+ ``_benchmark_orchestration.run_benchmark_with_partial``). Cohérent
5
+ avec l'architecture du ``RunOrchestrator`` qui raisonne en
6
+ ``PipelineSpec``.
7
+
8
+ Format
9
+ ------
10
+ Pour chaque pipeline d'un run, un fichier JSONL séparé :
11
+
12
+ ::
13
+
14
+ {partial_dir}/picarones_{corpus_name}_{pipeline_name}_{fingerprint}.partial.jsonl
15
+
16
+ Chaque ligne = ``PipelineResult.model_dump_json()`` d'un document
17
+ traité. Append-only ; la sérialisation Pydantic garantit le
18
+ roundtrip ``model_validate_json`` propre.
19
+
20
+ Fingerprint
21
+ -----------
22
+ Le fingerprint SHA-256 mélange :
23
+
24
+ - Le nom + structure de la pipeline (steps + adapter_class).
25
+ - ``normalization_profile`` (string canonique).
26
+ - ``char_exclude`` (caractères triés).
27
+ - ``profile`` (hooks document-level).
28
+ - Les ``mtime``/``size`` de chaque fichier du corpus
29
+ (détection de modifs sans coût hash de contenu).
30
+ - ``code_version``.
31
+
32
+ Deux runs avec des configs divergentes → fingerprints différents →
33
+ fichiers de partial distincts → pas de réutilisation accidentelle
34
+ de résultats incompatibles.
35
+
36
+ Sémantique du resume
37
+ --------------------
38
+ 1. Au démarrage du run : pour chaque ``pipeline_spec``, on cherche un
39
+ partial existant matchant le fingerprint. S'il existe, on
40
+ charge les ``PipelineResult`` déjà calculés.
41
+ 2. On filtre le corpus pour ne soumettre au ``BenchmarkService`` que
42
+ les documents **manquants**.
43
+ 3. Chaque nouveau ``PipelineResult`` est appendé au partial.
44
+ 4. À la fin d'une pipeline traitée avec succès complet, le partial
45
+ est supprimé (cleanup). Une exception en cours préserve le
46
+ partial pour la prochaine reprise.
47
+
48
+ Tolérance
49
+ ---------
50
+ - Partial corrompu (JSON invalide) : on log un warning et on traite
51
+ le document comme s'il n'avait jamais été calculé (recalcul propre).
52
+ - Partial avec fingerprint divergent : ignoré, fichier laissé tel
53
+ quel (sera écrasé par le nouveau partial avec son propre
54
+ fingerprint).
55
+ """
56
+
57
+ from __future__ import annotations
58
+
59
+ import logging
60
+ from pathlib import Path
61
+ from typing import TYPE_CHECKING, Any, Iterable
62
+
63
+ from picarones.pipeline.types import PipelineResult
64
+
65
+ if TYPE_CHECKING:
66
+ from picarones.domain.corpus import CorpusSpec
67
+ from picarones.domain.pipeline_spec import PipelineSpec
68
+
69
+ logger = logging.getLogger(__name__)
70
+
71
+
72
+ # ──────────────────────────────────────────────────────────────────────
73
+ # Fingerprint
74
+ # ──────────────────────────────────────────────────────────────────────
75
+
76
+
77
+ def compute_pipeline_fingerprint(
78
+ *,
79
+ pipeline_spec: "PipelineSpec",
80
+ corpus_spec: "CorpusSpec",
81
+ normalization_profile: str | None,
82
+ char_exclude: str | None,
83
+ profile: str,
84
+ code_version: str,
85
+ ) -> str:
86
+ """Phase B2.3 — fingerprint SHA-256 d'un run pour une pipeline donnée.
87
+
88
+ Délègue à ``compute_run_fingerprint`` (helper legacy partagé
89
+ avec ``run_benchmark_with_partial``) en construisant un
90
+ ``engine_config`` qui matérialise la structure de la pipeline
91
+ (nom + steps + adapter classes). Deux pipelines avec le même
92
+ nom mais des steps différents → fingerprints différents.
93
+
94
+ Les chemins d'images du corpus sont aussi inclus pour détecter
95
+ les modifications de fichiers entre runs (mtime+size, sans coût
96
+ hash de contenu).
97
+ """
98
+ from picarones.app.services.partial_store import compute_run_fingerprint
99
+
100
+ pipeline_engine_config: dict[str, Any] = {
101
+ "pipeline_name": pipeline_spec.name,
102
+ "steps": [
103
+ {
104
+ "id": step.id,
105
+ "adapter": step.adapter_name,
106
+ "inputs": sorted(t.value for t in step.input_types),
107
+ "outputs": sorted(t.value for t in step.output_types),
108
+ }
109
+ for step in pipeline_spec.steps
110
+ ],
111
+ }
112
+ # Phase B2.3 — on utilise les ``doc.id`` (stables cross-workspace)
113
+ # plutôt que les ``image_uri`` (qui changent à chaque extraction du
114
+ # corpus_zip vers un workspace temporaire). Sinon le fingerprint
115
+ # divergerait entre runs successifs même avec le même corpus,
116
+ # rendant le resume inopérant.
117
+ #
118
+ # Limite : ce fingerprint ne détecte pas une modification du
119
+ # contenu d'un doc (même id mais image différente). Acceptable
120
+ # pour le scope B2.3 ; pour une vraie détection de modifs, hasher
121
+ # le contenu du corpus_zip d'origine (coûteux, scope futur).
122
+ doc_signatures = sorted(doc.id for doc in corpus_spec.documents)
123
+ return compute_run_fingerprint(
124
+ engine_config=pipeline_engine_config,
125
+ normalization_profile=normalization_profile,
126
+ char_exclude=char_exclude,
127
+ corpus_files=None,
128
+ code_version=code_version,
129
+ extra={
130
+ "profile": profile,
131
+ "doc_ids": ",".join(doc_signatures),
132
+ } if profile else {"doc_ids": ",".join(doc_signatures)},
133
+ )
134
+
135
+
136
+ def partial_path_for_pipeline(
137
+ *,
138
+ partial_dir: Path,
139
+ corpus_name: str,
140
+ pipeline_name: str,
141
+ fingerprint: str,
142
+ ) -> Path:
143
+ """Chemin du fichier JSONL partiel pour une pipeline donnée."""
144
+ from picarones.app.services.partial_store import _partial_path
145
+
146
+ return _partial_path(
147
+ corpus_name=corpus_name,
148
+ engine_name=pipeline_name,
149
+ partial_dir=partial_dir,
150
+ fingerprint=fingerprint,
151
+ )
152
+
153
+
154
+ # ──────────────────────────────────────────────────────────────────────
155
+ # I/O JSONL
156
+ # ──────────────────────────────────────────────────────────────────────
157
+
158
+
159
+ def load_partial_pipeline_results(
160
+ partial_path: Path,
161
+ ) -> list[PipelineResult]:
162
+ """Charge tous les ``PipelineResult`` déjà persistés dans un partial.
163
+
164
+ Retourne ``[]`` si le fichier n'existe pas ou est vide.
165
+
166
+ Tolérance : une ligne JSON corrompue est sautée avec un warning ;
167
+ les autres lignes valides sont conservées. Le caller peut
168
+ décider quoi faire des doc_id manquants (typiquement : les
169
+ recalculer).
170
+ """
171
+ if not partial_path.exists() or partial_path.stat().st_size == 0:
172
+ return []
173
+
174
+ results: list[PipelineResult] = []
175
+ with partial_path.open(encoding="utf-8") as f:
176
+ for line_no, line in enumerate(f, start=1):
177
+ line = line.strip()
178
+ if not line:
179
+ continue
180
+ try:
181
+ results.append(PipelineResult.model_validate_json(line))
182
+ except Exception as exc: # noqa: BLE001
183
+ logger.warning(
184
+ "[orchestrator_partial] ligne %d corrompue dans %s "
185
+ "— sautée : %s",
186
+ line_no, partial_path.name, exc,
187
+ )
188
+ return results
189
+
190
+
191
+ def append_pipeline_result(
192
+ partial_path: Path,
193
+ pipeline_result: PipelineResult,
194
+ ) -> None:
195
+ """Append un ``PipelineResult`` au fichier partiel (JSONL).
196
+
197
+ Crée les répertoires parents et le fichier si nécessaire.
198
+ Mode ``a`` : un crash mid-write peut laisser une ligne partielle
199
+ en queue qui sera ignorée au prochain ``load_partial_pipeline_results``
200
+ grâce au filet ``try/except`` autour de ``model_validate_json``.
201
+ """
202
+ partial_path.parent.mkdir(parents=True, exist_ok=True)
203
+ with partial_path.open("a", encoding="utf-8") as f:
204
+ f.write(pipeline_result.model_dump_json() + "\n")
205
+
206
+
207
+ def delete_partial(partial_path: Path) -> None:
208
+ """Supprime le fichier partiel (cleanup post-success).
209
+
210
+ Idempotent : pas d'erreur si le fichier n'existe pas (autre
211
+ pipeline a déjà nettoyé, fingerprint divergent, etc.).
212
+ """
213
+ try:
214
+ partial_path.unlink(missing_ok=True)
215
+ except OSError as exc:
216
+ logger.warning(
217
+ "[orchestrator_partial] échec suppression %s : %s",
218
+ partial_path, exc,
219
+ )
220
+
221
+
222
+ # ──────────────────────────────────────────────────────────────────────
223
+ # Filtrage des documents restants
224
+ # ──────────────────────────────────────────────────────────────────────
225
+
226
+
227
+ def filter_remaining_documents(
228
+ documents: Iterable[Any],
229
+ loaded_results: list[PipelineResult],
230
+ ) -> tuple[list[Any], list[PipelineResult]]:
231
+ """Retourne ``(docs_à_traiter, results_déjà_persistés_filtrés)``.
232
+
233
+ Filtre les documents dont le ``id`` est déjà dans
234
+ ``loaded_results``. Les doublons éventuels du partial (ne devrait
235
+ pas arriver vu le append-only) sont déduplitqués par ``document_id``
236
+ en gardant le premier.
237
+ """
238
+ seen_doc_ids: set[str] = set()
239
+ deduplicated: list[PipelineResult] = []
240
+ for pr in loaded_results:
241
+ if pr.document_id not in seen_doc_ids:
242
+ seen_doc_ids.add(pr.document_id)
243
+ deduplicated.append(pr)
244
+
245
+ remaining = [d for d in documents if d.id not in seen_doc_ids]
246
+ return remaining, deduplicated
247
+
248
+
249
+ __all__ = [
250
+ "append_pipeline_result",
251
+ "compute_pipeline_fingerprint",
252
+ "delete_partial",
253
+ "filter_remaining_documents",
254
+ "load_partial_pipeline_results",
255
+ "partial_path_for_pipeline",
256
+ ]
picarones/app/services/run_orchestrator.py CHANGED
@@ -224,21 +224,39 @@ class RunOrchestrator:
224
  deps_lock = capture_dependencies_lock()
225
  bin_lock = capture_system_binaries_lock()
226
 
227
- result = bench.run(
228
- corpus=corpus_spec,
229
- pipelines=pipeline_specs,
230
- views=views,
231
- ground_truth_factory=_default_gt_factory,
232
- pipeline_inputs_factory=_default_inputs_factory,
233
- context_factory=_make_context_factory(
234
- spec.code_version,
235
- progress_callback=self._progress_callback,
236
- ),
237
- adapter_kwargs=adapter_kwargs,
238
- dependencies_lock=deps_lock,
239
- system_binaries_lock=bin_lock,
240
- metadata={"orchestrator": "picarones.app.services.run_orchestrator"},
241
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
 
243
  # 6. Persistance JSONL.
244
  persist_dir = self._output_dir / "results"
@@ -396,6 +414,260 @@ class RunOrchestrator:
396
  }
397
  return pipeline_specs, resolver, adapter_kwargs_dump
398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
399
  @staticmethod
400
  def _persist_legacy_benchmark_json(
401
  *,
@@ -624,7 +896,7 @@ class _PipelineEngineProxy:
624
  "pipeline_name": self._spec.name,
625
  "steps": [
626
  {
627
- "id": step.name,
628
  "input_types": sorted(t.value for t in step.input_types),
629
  "output_types": sorted(t.value for t in step.output_types),
630
  }
 
224
  deps_lock = capture_dependencies_lock()
225
  bin_lock = capture_system_binaries_lock()
226
 
227
+ # Phase B2.3 — si ``spec.partial_dir`` est fourni, on pivote
228
+ # par pipeline avec reprise sur interruption. Sinon, chemin
229
+ # rapide en un seul ``bench.run`` multi-pipeline.
230
+ if spec.partial_dir:
231
+ result = self._execute_with_partial(
232
+ spec=spec,
233
+ bench=bench,
234
+ corpus_spec=corpus_spec,
235
+ pipeline_specs=pipeline_specs,
236
+ views=views,
237
+ adapter_kwargs=adapter_kwargs,
238
+ deps_lock=deps_lock,
239
+ bin_lock=bin_lock,
240
+ )
241
+ else:
242
+ result = bench.run(
243
+ corpus=corpus_spec,
244
+ pipelines=pipeline_specs,
245
+ views=views,
246
+ ground_truth_factory=_default_gt_factory,
247
+ pipeline_inputs_factory=_default_inputs_factory,
248
+ context_factory=_make_context_factory(
249
+ spec.code_version,
250
+ progress_callback=self._progress_callback,
251
+ ),
252
+ adapter_kwargs=adapter_kwargs,
253
+ dependencies_lock=deps_lock,
254
+ system_binaries_lock=bin_lock,
255
+ metadata={
256
+ "orchestrator":
257
+ "picarones.app.services.run_orchestrator",
258
+ },
259
+ )
260
 
261
  # 6. Persistance JSONL.
262
  persist_dir = self._output_dir / "results"
 
414
  }
415
  return pipeline_specs, resolver, adapter_kwargs_dump
416
 
417
+ def _execute_with_partial(
418
+ self,
419
+ *,
420
+ spec: Any,
421
+ bench: Any,
422
+ corpus_spec: Any,
423
+ pipeline_specs: list[Any],
424
+ views: list[Any],
425
+ adapter_kwargs: dict[str, Any],
426
+ deps_lock: dict[str, Any],
427
+ bin_lock: dict[str, Any],
428
+ ) -> Any:
429
+ """Phase B2.3 — exécution pivotée par pipeline avec reprise.
430
+
431
+ Pour chaque ``pipeline_spec`` :
432
+
433
+ 1. Calcule un fingerprint SHA-256 du run (pipeline structure +
434
+ normalization + char_exclude + profile + corpus
435
+ mtime/size + code_version).
436
+ 2. Cherche un fichier partial existant matchant ce fingerprint.
437
+ 3. Charge les ``PipelineResult`` déjà calculés.
438
+ 4. Filtre le corpus pour ne soumettre au ``BenchmarkService``
439
+ que les documents manquants.
440
+ 5. Append chaque nouveau ``PipelineResult`` au fichier partial
441
+ au fil de l'eau (un crash mid-run préserve ce qui a été
442
+ calculé).
443
+ 6. À la fin d'une pipeline traitée intégralement, supprime
444
+ le partial (cleanup).
445
+
446
+ Le résultat final est un ``RunResult`` reconstruit à partir de
447
+ tous les ``PipelineResult`` (chargés + nouveaux), réorganisés
448
+ par document selon l'ordre du corpus original.
449
+
450
+ Limitations volontaires (scope B2.3) : les ``ViewResult`` ne
451
+ sont conservés que pour les ``PipelineResult`` calculés dans
452
+ le run courant (pas pour ceux rechargés depuis partial).
453
+ Pour relancer les vues sur l'ensemble, le caller doit relancer
454
+ sans ``partial_dir`` ou pré-supprimer les partials.
455
+ """
456
+ from picarones.app.results import RunResult
457
+ from picarones.app.services._orchestrator_partial import (
458
+ append_pipeline_result,
459
+ compute_pipeline_fingerprint,
460
+ delete_partial,
461
+ filter_remaining_documents,
462
+ load_partial_pipeline_results,
463
+ partial_path_for_pipeline,
464
+ )
465
+ from picarones.domain.corpus import CorpusSpec
466
+ from picarones.domain.run_manifest import RunManifest
467
+ from picarones.pipeline.run_result import RunDocumentResult
468
+
469
+ partial_dir = Path(spec.partial_dir)
470
+ partial_dir.mkdir(parents=True, exist_ok=True)
471
+
472
+ # Map : pipeline_name → (partial_path, list[PipelineResult])
473
+ per_pipeline_state: dict[str, tuple[Path, list[Any]]] = {}
474
+ for pipeline_spec in pipeline_specs:
475
+ fingerprint = compute_pipeline_fingerprint(
476
+ pipeline_spec=pipeline_spec,
477
+ corpus_spec=corpus_spec,
478
+ normalization_profile=spec.normalization_profile,
479
+ char_exclude=spec.char_exclude,
480
+ profile=spec.profile,
481
+ code_version=spec.code_version,
482
+ )
483
+ path = partial_path_for_pipeline(
484
+ partial_dir=partial_dir,
485
+ corpus_name=corpus_spec.name,
486
+ pipeline_name=pipeline_spec.name,
487
+ fingerprint=fingerprint,
488
+ )
489
+ loaded = load_partial_pipeline_results(path)
490
+ if loaded:
491
+ logger.info(
492
+ "[run_orchestrator] reprise pipeline %r : %d/%d "
493
+ "documents déjà persistés.",
494
+ pipeline_spec.name,
495
+ len(loaded), len(corpus_spec.documents),
496
+ )
497
+ per_pipeline_state[pipeline_spec.name] = (path, loaded)
498
+
499
+ # Lance un sub-run par pipeline avec uniquement les docs
500
+ # manquants. Sub-RunResult séparés ; on agrège ensuite.
501
+ sub_run_results: list[Any] = []
502
+ for pipeline_spec in pipeline_specs:
503
+ partial_path, loaded_results = per_pipeline_state[pipeline_spec.name]
504
+
505
+ remaining_docs, deduplicated_loaded = filter_remaining_documents(
506
+ corpus_spec.documents, loaded_results,
507
+ )
508
+ per_pipeline_state[pipeline_spec.name] = (
509
+ partial_path, deduplicated_loaded,
510
+ )
511
+
512
+ if not remaining_docs:
513
+ logger.info(
514
+ "[run_orchestrator] pipeline %r déjà complet — "
515
+ "skip exécution.", pipeline_spec.name,
516
+ )
517
+ # Cleanup du partial : le pipeline est entièrement
518
+ # rechargé, plus besoin de garder le fichier sur disque.
519
+ delete_partial(partial_path)
520
+ continue
521
+
522
+ sub_corpus_spec = CorpusSpec(
523
+ name=corpus_spec.name,
524
+ documents=tuple(remaining_docs),
525
+ metadata=dict(corpus_spec.metadata),
526
+ )
527
+
528
+ sub_result = bench.run(
529
+ corpus=sub_corpus_spec,
530
+ pipelines=[pipeline_spec],
531
+ views=views,
532
+ ground_truth_factory=_default_gt_factory,
533
+ pipeline_inputs_factory=_default_inputs_factory,
534
+ context_factory=_make_context_factory(
535
+ spec.code_version,
536
+ progress_callback=self._progress_callback,
537
+ ),
538
+ adapter_kwargs=adapter_kwargs,
539
+ dependencies_lock=deps_lock,
540
+ system_binaries_lock=bin_lock,
541
+ metadata={
542
+ "orchestrator":
543
+ "picarones.app.services.run_orchestrator",
544
+ "partial_pipeline": pipeline_spec.name,
545
+ },
546
+ )
547
+ sub_run_results.append(sub_result)
548
+
549
+ # Persiste chaque nouveau PipelineResult au partial.
550
+ new_count = 0
551
+ for doc_result in sub_result.document_results:
552
+ for pr in doc_result.pipeline_results:
553
+ if pr.pipeline_name == pipeline_spec.name:
554
+ append_pipeline_result(partial_path, pr)
555
+ new_count += 1
556
+
557
+ # Si tous les docs du corpus original ont été traités
558
+ # (loaded + new) → cleanup du partial.
559
+ loaded_doc_ids = {pr.document_id for pr in deduplicated_loaded}
560
+ new_doc_ids = {
561
+ pr.document_id
562
+ for doc_result in sub_result.document_results
563
+ for pr in doc_result.pipeline_results
564
+ if pr.pipeline_name == pipeline_spec.name
565
+ }
566
+ all_doc_ids = {d.id for d in corpus_spec.documents}
567
+ if (loaded_doc_ids | new_doc_ids) >= all_doc_ids:
568
+ delete_partial(partial_path)
569
+ logger.info(
570
+ "[run_orchestrator] pipeline %r complet (%d docs) "
571
+ "— partial supprimé.",
572
+ pipeline_spec.name, len(all_doc_ids),
573
+ )
574
+
575
+ # Reconstruit le RunResult final : pour chaque doc du corpus
576
+ # original, agrège les PipelineResult de tous les pipelines.
577
+ # Map (doc_id, pipeline_name) → PipelineResult
578
+ pr_index: dict[tuple[str, str], Any] = {}
579
+ # Map (doc_id, pipeline_name) → list[ViewResult]
580
+ vr_index: dict[tuple[str, str], list[Any]] = {}
581
+
582
+ # Charge les pipeline_results depuis les partials (rechargés).
583
+ for pipeline_name, (_, loaded_list) in per_pipeline_state.items():
584
+ for pr in loaded_list:
585
+ pr_index[(pr.document_id, pipeline_name)] = pr
586
+
587
+ # Charge les pipeline_results et view_results depuis les sub-runs.
588
+ for sub_result in sub_run_results:
589
+ for sub_doc in sub_result.document_results:
590
+ for pr in sub_doc.pipeline_results:
591
+ pr_index[(sub_doc.document_id, pr.pipeline_name)] = pr
592
+ for vr in sub_doc.view_results:
593
+ # ``ViewResult.pipeline_name`` n'existe pas ; on
594
+ # regroupe par doc seulement (pas suffisamment
595
+ # granulaire mais OK pour la sortie).
596
+ vr_index.setdefault(
597
+ (sub_doc.document_id, ""), [],
598
+ ).append(vr)
599
+
600
+ # Construit les RunDocumentResult dans l'ordre du corpus.
601
+ final_doc_results: list[Any] = []
602
+ for doc in corpus_spec.documents:
603
+ doc_pipeline_results = tuple(
604
+ pr_index[(doc.id, ps.name)]
605
+ for ps in pipeline_specs
606
+ if (doc.id, ps.name) in pr_index
607
+ )
608
+ doc_view_results = tuple(vr_index.get((doc.id, ""), []))
609
+ final_doc_results.append(RunDocumentResult(
610
+ document_id=doc.id,
611
+ pipeline_results=doc_pipeline_results,
612
+ view_results=doc_view_results,
613
+ ))
614
+
615
+ # Synthétise un RunManifest minimal (on prend celui d'un
616
+ # sub-run s'il y en a eu, sinon on synthétise from scratch).
617
+ if sub_run_results:
618
+ # Fusionne les pipeline_specs de tous les sub-runs.
619
+ base_manifest = sub_run_results[0].manifest
620
+ manifest = RunManifest(
621
+ run_id=base_manifest.run_id,
622
+ corpus_name=corpus_spec.name,
623
+ n_documents=len(corpus_spec.documents),
624
+ pipeline_specs=tuple(pipeline_specs),
625
+ adapter_kwargs=adapter_kwargs,
626
+ view_specs=tuple(views),
627
+ code_version=spec.code_version,
628
+ started_at=base_manifest.started_at,
629
+ completed_at=base_manifest.completed_at,
630
+ dependencies_lock=deps_lock,
631
+ system_binaries_lock=bin_lock,
632
+ metadata={
633
+ "orchestrator":
634
+ "picarones.app.services.run_orchestrator",
635
+ "partial_dir": str(partial_dir),
636
+ },
637
+ )
638
+ else:
639
+ # Tous les pipelines ont été chargés depuis partial — pas
640
+ # de sub-run. On synthétise un manifest from scratch.
641
+ from picarones.app.services.benchmark_service import (
642
+ _default_run_id,
643
+ )
644
+ from picarones.domain.run_manifest import utcnow
645
+ now = utcnow()
646
+ manifest = RunManifest(
647
+ run_id=_default_run_id(corpus_spec.name, now),
648
+ corpus_name=corpus_spec.name,
649
+ n_documents=len(corpus_spec.documents),
650
+ pipeline_specs=tuple(pipeline_specs),
651
+ adapter_kwargs=adapter_kwargs,
652
+ view_specs=tuple(views),
653
+ code_version=spec.code_version,
654
+ started_at=now,
655
+ completed_at=now,
656
+ dependencies_lock=deps_lock,
657
+ system_binaries_lock=bin_lock,
658
+ metadata={
659
+ "orchestrator":
660
+ "picarones.app.services.run_orchestrator",
661
+ "partial_dir": str(partial_dir),
662
+ "fully_resumed": "true",
663
+ },
664
+ )
665
+
666
+ return RunResult(
667
+ manifest=manifest,
668
+ document_results=tuple(final_doc_results),
669
+ )
670
+
671
  @staticmethod
672
  def _persist_legacy_benchmark_json(
673
  *,
 
896
  "pipeline_name": self._spec.name,
897
  "steps": [
898
  {
899
+ "id": step.id,
900
  "input_types": sorted(t.value for t in step.input_types),
901
  "output_types": sorted(t.value for t in step.output_types),
902
  }
tests/app/services/test_run_orchestrator_feature_parity.py CHANGED
@@ -226,47 +226,178 @@ class TestParityCancelEvent:
226
  # ──────────────────────────────────────────────────────────────────────
227
 
228
 
229
- @pytest.mark.skip(reason=f"{SKIP_REASON_PREFIX}3 — port partial_dir resume")
230
- def test_parity_partial_dir_resume_fresh_start(tmp_path: Path) -> None:
231
- """Premier run avec ``partial_dir`` non existant → comportement
232
- identique à un run sans ``partial_dir``.
233
-
234
- Spec
235
- ----
236
- - ``partial_dir`` = répertoire vide.
237
- - Lancer le bench.
238
- - À la fin, le fichier ``{partial_dir}/picarones_{corpus}_{engine}
239
- .partial.jsonl`` est supprimé (succès complet).
240
- - Le ``BenchmarkResult`` est identique au run sans ``partial_dir``.
241
  """
242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
 
244
- @pytest.mark.skip(reason=f"{SKIP_REASON_PREFIX}3 — port partial_dir resume")
245
- def test_parity_partial_dir_resume_after_crash(tmp_path: Path) -> None:
246
- """Reprise après crash partiel : 3 docs sur 5 déjà persistés →
247
- seuls les 2 restants sont soumis au runner.
 
 
 
 
 
 
248
 
249
- Spec
250
- ----
251
- - Pré-écrire un partial JSONL avec 3 ``DocumentResult`` valides.
252
- - Lancer le bench sur le corpus de 5 docs.
253
- - Le ``CorpusRunner.run`` est appelé sur **2 docs seulement**
254
- (vérifier via spy).
255
- - Le ``BenchmarkResult`` final agrège les 5 docs (3 réutilisés +
256
- 2 nouveaux).
257
- """
258
 
 
 
 
 
 
259
 
260
- @pytest.mark.skip(reason=f"{SKIP_REASON_PREFIX}3 — port partial_dir resume")
261
- def test_parity_partial_dir_fingerprint_invalidates(tmp_path: Path) -> None:
262
- """Fingerprint divergent invalide le partial (re-calcul depuis 0).
 
 
263
 
264
- Spec
265
- ----
266
- - Pré-écrire un partial avec un ``code_version`` différent.
267
- - Lancer le bench.
268
- - Le partial est ignoré, les 5 docs sont recalculés.
269
- """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
270
 
271
 
272
  # ──────────────────────────────────────────────────────────────────────
@@ -699,24 +830,95 @@ class TestParityOutputJsonLegacy:
699
  # ──────────────────────────────────────────────────────────────────────
700
 
701
 
702
- @pytest.mark.skip(reason=f"{SKIP_REASON_PREFIX}* — toutes features portées")
703
- def test_parity_all_features_combined(tmp_path: Path) -> None:
704
- """Lance les deux chemins avec toutes les features actives et
705
- vérifie l'égalité numérique du ``BenchmarkResult``.
706
-
707
- Spec
708
- ----
709
- - Construire un ``RunSpec`` avec : ``profile="standard"``,
710
- ``partial_dir=tmp_path/"partial"``, ``output_json=tmp_path/
711
- "bm.json"``, ``char_exclude="!."``,
712
- ``normalization_profile="caseless"``.
713
- - Lancer ``run_benchmark_via_service`` avec les mêmes paramètres.
714
- - Lancer ``RunOrchestrator().execute(spec)``.
715
- - Normaliser les 2 ``BenchmarkResult`` (cf.
716
- ``test_migration_invariance.py:_normalize_for_snapshot``).
717
- - Vérifier ``a == b``.
718
-
719
- Ce test est le **gate finale du Checkpoint C1**. Quand il passe,
720
- la Phase B2 est terminée et on peut commencer B3 (migration des
721
- call sites).
722
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
  # ──────────────────────────────────────────────────────────────────────
227
 
228
 
229
+ class TestParityPartialDir:
230
+ """Phase B2.3 reprise sur interruption pivotée par pipeline.
231
+
232
+ Le format JSONL est partagé entre tous les pipelines d'un run :
233
+ un fichier par pipeline, append-only, supprimé à la fin du
234
+ pipeline si traité intégralement.
 
 
 
 
 
 
235
  """
236
 
237
+ def _build_spec(
238
+ self, tmp_path: Path, *,
239
+ n_docs: int = 3,
240
+ partial_dir: Path | None,
241
+ ) -> "RunSpec":
242
+ tmp_path.mkdir(parents=True, exist_ok=True)
243
+ corpus_zip = tmp_path / "c.zip"
244
+ corpus_zip.write_bytes(_make_corpus_zip(n_docs=n_docs))
245
+ out_dir = tmp_path / "out"
246
+ yaml = _build_spec_yaml(corpus_zip, out_dir)
247
+ if partial_dir is not None:
248
+ yaml += f"partial_dir: {partial_dir}\n"
249
+ return load_run_spec_from_yaml(yaml)
250
 
251
+ def test_partial_dir_fresh_start_creates_no_orphan_files(
252
+ self, tmp_path: Path,
253
+ ) -> None:
254
+ """Fresh start : tous les docs traités → partial supprimé
255
+ à la fin (cleanup). Le répertoire partial_dir reste vide
256
+ après un run réussi complet."""
257
+ partial_dir = tmp_path / "partial"
258
+ spec = self._build_spec(
259
+ tmp_path, n_docs=2, partial_dir=partial_dir,
260
+ )
261
 
262
+ result = RunOrchestrator(tmp_path / "out").execute(spec)
263
+ assert result.run_result.n_documents == 2
 
 
 
 
 
 
 
264
 
265
+ # Aucun fichier .partial.jsonl résiduel après run complet.
266
+ residual = list(partial_dir.glob("*.partial.jsonl"))
267
+ assert residual == [], (
268
+ f"Fichiers partiels résiduels : {residual}"
269
+ )
270
 
271
+ def test_partial_dir_resume_after_complete_pipeline(
272
+ self, tmp_path: Path,
273
+ ) -> None:
274
+ """Si un partial existant contient déjà tous les docs d'un
275
+ pipeline, ce pipeline n'est pas relancé.
276
 
277
+ Pré-condition : on lance le bench une 1re fois pour créer
278
+ le partial via les helpers (puisque le partial est supprimé
279
+ en cleanup post-success). On simule un crash en pré-écrivant
280
+ directement le partial JSONL.
281
+ """
282
+ from picarones.app.services._orchestrator_partial import (
283
+ append_pipeline_result,
284
+ compute_pipeline_fingerprint,
285
+ partial_path_for_pipeline,
286
+ )
287
+
288
+ partial_dir = tmp_path / "partial"
289
+ partial_dir.mkdir()
290
+ spec = self._build_spec(
291
+ tmp_path, n_docs=2, partial_dir=partial_dir,
292
+ )
293
+
294
+ # Construire à la main les pipeline_specs (même logique que
295
+ # RunOrchestrator._build_pipelines) pour pouvoir calculer le
296
+ # fingerprint et pré-écrire le partial.
297
+ orchestrator = RunOrchestrator(tmp_path / "out")
298
+ orchestrator._output_dir.mkdir(parents=True, exist_ok=True)
299
+
300
+ # On lance un premier run sans partial pour récupérer les
301
+ # PipelineResult — puis on les rejoue via le partial.
302
+ spec_no_partial = self._build_spec(
303
+ tmp_path / "first", n_docs=2, partial_dir=None,
304
+ )
305
+ first_result = RunOrchestrator(
306
+ tmp_path / "first" / "out",
307
+ ).execute(spec_no_partial)
308
+
309
+ # Pré-écrire le partial avec les 2 PipelineResult du 1er run.
310
+ # On a besoin du fingerprint cohérent → on construit la spec
311
+ # via orchestrator._build_pipelines et la corpus_spec via
312
+ # _load_corpus.
313
+ from picarones.app.services.path_security import WorkspaceManager
314
+ workspace = WorkspaceManager(orchestrator._output_dir)
315
+ corpus_spec, _ = orchestrator._load_corpus(spec, workspace)
316
+ pipeline_specs, _, _ = orchestrator._build_pipelines(spec)
317
+
318
+ for ps in pipeline_specs:
319
+ fingerprint = compute_pipeline_fingerprint(
320
+ pipeline_spec=ps,
321
+ corpus_spec=corpus_spec,
322
+ normalization_profile=spec.normalization_profile,
323
+ char_exclude=spec.char_exclude,
324
+ profile=spec.profile,
325
+ code_version=spec.code_version,
326
+ )
327
+ partial_path = partial_path_for_pipeline(
328
+ partial_dir=partial_dir,
329
+ corpus_name=corpus_spec.name,
330
+ pipeline_name=ps.name,
331
+ fingerprint=fingerprint,
332
+ )
333
+ # Persister tous les PipelineResult du 1er run dans le partial.
334
+ for first_doc in first_result.run_result.document_results:
335
+ for pr in first_doc.pipeline_results:
336
+ if pr.pipeline_name == ps.name:
337
+ append_pipeline_result(partial_path, pr)
338
+
339
+ # 2e run sur le même spec : le partial est complet, aucun
340
+ # nouveau calcul n'est requis.
341
+ second_result = RunOrchestrator(
342
+ tmp_path / "out",
343
+ ).execute(spec)
344
+
345
+ # Tous les docs sont présents dans le résultat final.
346
+ assert second_result.run_result.n_documents == 2
347
+ # ``fully_resumed`` flag dans la metadata du manifest signale
348
+ # qu'aucun sub-run n'a été nécessaire.
349
+ assert second_result.run_result.manifest.metadata.get(
350
+ "fully_resumed",
351
+ ) == "true"
352
+ # Cleanup : le partial est supprimé même en mode fully resumed.
353
+ assert list(partial_dir.glob("*.partial.jsonl")) == []
354
+
355
+ def test_partial_dir_fingerprint_isolation(
356
+ self, tmp_path: Path,
357
+ ) -> None:
358
+ """Deux runs avec des configs différentes ont des fingerprints
359
+ différents → fichiers partiels distincts → pas de réutilisation
360
+ croisée.
361
+
362
+ Test : crée un partial avec un fingerprint forgé (différent),
363
+ puis lance le bench. Le bench doit ignorer ce partial et
364
+ produire un résultat propre.
365
+ """
366
+ from picarones.app.services._orchestrator_partial import (
367
+ partial_path_for_pipeline,
368
+ )
369
+
370
+ partial_dir = tmp_path / "partial"
371
+ partial_dir.mkdir()
372
+
373
+ # Pré-écrire un partial avec un fingerprint forgé qui ne
374
+ # matchera pas le fingerprint calculé par le bench.
375
+ fake_path = partial_path_for_pipeline(
376
+ partial_dir=partial_dir,
377
+ corpus_name="feature_parity",
378
+ pipeline_name="tess_only",
379
+ fingerprint="0" * 64, # fingerprint forgé
380
+ )
381
+ fake_path.write_text(
382
+ '{"document_id": "ghost_doc",'
383
+ ' "pipeline_name": "tess_only",'
384
+ ' "step_results": [],'
385
+ ' "succeeded": false,'
386
+ ' "duration_seconds": 0.0,'
387
+ ' "artifacts": []}\n',
388
+ encoding="utf-8",
389
+ )
390
+
391
+ spec = self._build_spec(
392
+ tmp_path, n_docs=2, partial_dir=partial_dir,
393
+ )
394
+ result = RunOrchestrator(tmp_path / "out").execute(spec)
395
+
396
+ # Le run produit ses 2 docs propres (ne charge pas le fake).
397
+ assert result.run_result.n_documents == 2
398
+ doc_ids = {dr.document_id for dr in result.run_result.document_results}
399
+ assert "ghost_doc" not in doc_ids
400
+ assert doc_ids == {"doc01", "doc02"}
401
 
402
 
403
  # ──────────────────────────────────────────────────────────────────────
 
830
  # ──────────────────────────────────────────────────────────────────────
831
 
832
 
833
+ class TestParityAllFeaturesCombined:
834
+ """Phase B2 / Checkpoint C1 — gate finale.
835
+
836
+ Lance ``RunOrchestrator.execute`` avec **toutes** les features
837
+ actives simultanément et vérifie que le ``BenchmarkResult`` legacy
838
+ persisté via ``output_json`` est cohérent (toutes les métriques,
839
+ NER, hooks, char_exclude appliqués, etc.).
840
+
841
+ Ce test certifie que les 7 features sont câblées ensemble sans
842
+ conflit ni régression croisée. C'est le gate du checkpoint C1 :
843
+ quand il passe, le ``RunOrchestrator`` est feature-complete vis-à-vis
844
+ de ``run_benchmark_via_service``.
 
 
 
 
 
 
 
 
845
  """
846
+
847
+ def test_combined_features_produce_coherent_result(
848
+ self, tmp_path: Path,
849
+ ) -> None:
850
+ import json
851
+
852
+ # Corpus avec GT TEXT + GT ENTITIES (pour NER).
853
+ buf = io.BytesIO()
854
+ with zipfile.ZipFile(buf, mode="w") as zf:
855
+ zf.writestr("doc01.png", _png_bytes())
856
+ zf.writestr("doc01.gt.txt", "Jean habite Paris!")
857
+ zf.writestr("doc01.tess.txt", "Jean habite Paris.")
858
+ zf.writestr("doc01.gt.entities.json", json.dumps({
859
+ "entities": [
860
+ {"label": "PER", "start": 0, "end": 4, "text": "Jean"},
861
+ {"label": "LOC", "start": 12, "end": 17, "text": "Paris"},
862
+ ],
863
+ }))
864
+ corpus_zip = tmp_path / "c.zip"
865
+ corpus_zip.write_bytes(buf.getvalue())
866
+
867
+ out_dir = tmp_path / "out"
868
+ partial_dir = tmp_path / "partial"
869
+ output_json = tmp_path / "bm.json"
870
+
871
+ # YAML avec TOUTES les features activées simultanément.
872
+ yaml = _build_spec_yaml(corpus_zip, out_dir)
873
+ yaml += f"partial_dir: {partial_dir}\n"
874
+ yaml += f"output_json: {output_json}\n"
875
+ yaml += 'char_exclude: "!."\n'
876
+ yaml += "normalization_profile: caseless\n"
877
+ yaml += "profile: standard\n"
878
+ yaml += (
879
+ "entity_extractor: 'tests.app.services."
880
+ "test_run_orchestrator_feature_parity:_mock_entity_extractor'\n"
881
+ )
882
+ spec = load_run_spec_from_yaml(yaml)
883
+
884
+ # Callback + cancel_event passés en kwargs d'exécution.
885
+ invocations: list[tuple[str, int, str]] = []
886
+
887
+ def cb(engine: str, idx: int, doc_id: str) -> None:
888
+ invocations.append((engine, idx, doc_id))
889
+
890
+ ev = threading.Event() # jamais set : run normal
891
+
892
+ result = RunOrchestrator(out_dir).execute(
893
+ spec, progress_callback=cb, cancel_event=ev,
894
+ )
895
+
896
+ # Le run a tourné : 1 doc, 1 callback invoqué.
897
+ assert result.run_result.n_documents == 1
898
+ assert len(invocations) == 1
899
+
900
+ # JSON legacy écrit avec TOUTES les features intégrées.
901
+ loaded = json.loads(output_json.read_text(encoding="utf-8"))
902
+ doc_result = loaded["engine_reports"][0]["document_results"][0]
903
+
904
+ # char_exclude appliqué : "!." filtré → ground_truth +
905
+ # hypothesis matchent exactement → CER = 0.
906
+ assert doc_result["metrics"]["cer"] == 0.0
907
+
908
+ # normalization_profile=caseless propagé → cer_diplomatic = 0.
909
+ assert doc_result["metrics"]["cer_diplomatic"] == 0.0
910
+
911
+ # entity_extractor invoqué → ner_metrics présent.
912
+ assert doc_result.get("ner_metrics") is not None
913
+
914
+ # profile=standard appliqué → hypothesis_length présent.
915
+ assert doc_result["metrics"]["hypothesis_length"] > 0
916
+
917
+ # Cohabitation : 4 fichiers JSONL natifs + 1 JSON legacy.
918
+ assert output_json.exists()
919
+ assert set(result.persisted_files) == {
920
+ "manifest", "pipeline_results", "artifacts_index", "view_results",
921
+ }
922
+
923
+ # partial_dir : pipeline complet → fichier nettoyé.
924
+ assert list(partial_dir.glob("*.partial.jsonl")) == []
tests/architecture/test_file_budgets.py CHANGED
@@ -124,7 +124,7 @@ FILE_BUDGETS: dict[str, int] = {
124
  # --- Services applicatifs (couche 6). Budgets ``current + 15 %``.
125
  "picarones/app/services/corpus_service.py": 625, # actuel 541
126
  "picarones/app/services/path_security.py": 470, # actuel 410
127
- "picarones/app/services/run_orchestrator.py": 1000, # actuel 835 — Phase B2.1-B2.7 migration Option B (+339 LOC : progress/cancel/output_json/normalization/entity_extractor)
128
  "picarones/app/schemas/run_spec.py": 620, # actuel 530 — Phase B1 migration Option B (+90 LOC : 7 nouveaux champs + 2 validators)
129
  "picarones/reports/html/render.py": 700, # actuel 615
130
  }
 
124
  # --- Services applicatifs (couche 6). Budgets ``current + 15 %``.
125
  "picarones/app/services/corpus_service.py": 625, # actuel 541
126
  "picarones/app/services/path_security.py": 470, # actuel 410
127
+ "picarones/app/services/run_orchestrator.py": 1300, # actuel 1107 — Phase B2 complète migration Option B (+611 LOC : 7 features)
128
  "picarones/app/schemas/run_spec.py": 620, # actuel 530 — Phase B1 migration Option B (+90 LOC : 7 nouveaux champs + 2 validators)
129
  "picarones/reports/html/render.py": 700, # actuel 615
130
  }