Claude commited on
Commit
19e1a5d
·
unverified ·
1 Parent(s): 27d155d

feat(app/services,interfaces/web): Sprint A14-S48 — JobRunner + POST /api/jobs (fix audit #2)

Browse files

L'audit avait identifié que JobStore (S37) était à moitié branché :
- POST /api/jobs manquant — impossible de créer un job via l'API.
- mark_orphaned_jobs_interrupted() documenté mais jamais appelé au boot.
- Pas d'orchestrateur async qui pousse les jobs.

Ce sprint ferme les 3 chantiers.

picarones/app/services/job_runner.py (nouveau, 247 lignes)
----------------------------------------------------------
Service applicatif qui pont entre l'API web et RunOrchestrator :

- JobRunner(job_store, orchestrator_factory, report_renderer=None).
- submit(run_spec, output_dir, job_id=None, payload=None) -> job_id :
· crée un JobRecord (pending) ;
· spawn un threading.Thread(daemon=True) ;
· retourne immédiatement.
- _run worker thread :
· vérifie statut pré-démarrage (skip si cancelled avant start) ;
· mark_running, exécute orchestrator.execute() ;
· capture toutes les exceptions → mark_error avec
"{type}: {msg}" ;
· vérifie statut post-exécution (cancelled pendant le run →
résultat discardé, statut reste cancelled) ;
· sinon mark_complete avec output_path = manifest persisté.
- wait(job_id, timeout) helper pour les tests.

Cancellation coopérative best-effort : DELETE /api/jobs/{id} marque
cancelled, le worker observe à 2 checkpoints (pré et post execute).
Pendant l'exécution longue de orchestrator.execute, le worker ne
peut pas l'interrompre (pas de cancel_event natif sur l'orchestrator —
amélioration future).

picarones/interfaces/web/app.py (modifié)
-----------------------------------------
- WebAppState : nouveau champ optionnel job_runner: JobRunner | None.
- Lifespan hook @asynccontextmanager : au boot, appelle
state.job_store.mark_orphaned_jobs_interrupted() si store
configuré, log le compte ; tolère les exceptions sqlite (log error,
l'app continue à démarrer).

picarones/interfaces/web/routers/jobs.py (modifié)
--------------------------------------------------
- Nouveau endpoint POST /api/jobs avec status_code=202 :
· accepte le YAML d'un RunSpec en raw body (Body(media_type="text/plain")) ;
· rejette body vide → 400 ;
· parse + valide via load_run_spec_from_yaml → 400 si invalide ;
· output_dir = workspace.root / "runs" / {job_id} ;
· délègue à state.job_runner.submit ;
· retourne JobSubmitResponse avec job_id + status="pending" ;
· 503 si job_runner non configuré.
- Helper _require_job_runner(state) symétrique à _require_job_store.

Tests S48 dédiés (13 nouveaux)
------------------------------
- TestJobRunnerConstructor : rejet non-JobStore, non-callable factory,
non-callable renderer.
- TestJobRunnerHappyPath : submit → mark_complete avec output_path,
UUID4 unique sans job_id explicite, job_id explicite respecté.
- TestJobRunnerErrorPath : exception orchestrator → mark_error avec
type+message.
- TestJobRunnerCancellation : cancel pendant l'exécution → résultat
discardé, statut reste cancelled.
- TestLifespanHook : 2 jobs zombie (pending + running) au démarrage
→ marked interrupted ; jobs déjà complete intacts ; /health
répond.
- TestPostJobsEndpoint : YAML valide → 202 avec job_id, YAML invalide
→ 400, body vide → 400 ou 422 (Pydantic), pas de runner → 503.

Tests : 4933 passed, 11 skipped (vs 4920 avant : +13 S48).
Lint : ruff check picarones/ tests/ → All checks passed.

Pourquoi ce fix maintenant
--------------------------
La directive *« sans dette technique »* exigeait que tout code livré
soit utilisable bout-en-bout. Pour les jobs, S37 livrait la
persistance + lecture mais pas la création. L'utilisateur ne pouvait
PAS soumettre un benchmark via l'API, alors que le legacy
picarones/web/jobs.py l'autorisait.

Le branchement S48 réalise enfin la promesse implicite d'une API
de jobs complète : POST → 202, GET → status, DELETE → cancel.

https://claude.ai/code/session_011XQZNitg1rCgia8ZD1a2hP

README.md CHANGED
@@ -396,7 +396,7 @@ ruff check picarones/ tests/
396
  python -m mypy picarones/core/
397
  ```
398
 
399
- **Test suite**: ~4940 tests, ~3 min on a modern laptop. Coverage
400
  floor at 85% (currently ~87%). The `network` marker excludes tests
401
  requiring live HTTP. A handful of tests depend on optional engines
402
  (`pero-ocr`, `pytesseract`) and are skipped/fail gracefully when
 
396
  python -m mypy picarones/core/
397
  ```
398
 
399
+ **Test suite**: ~4950 tests, ~3 min on a modern laptop. Coverage
400
  floor at 85% (currently ~87%). The `network` marker excludes tests
401
  requiring live HTTP. A handful of tests depend on optional engines
402
  (`pero-ocr`, `pytesseract`) and are skipped/fail gracefully when
picarones/app/services/__init__.py CHANGED
@@ -34,6 +34,7 @@ from picarones.app.services.corpus_service import (
34
  CorpusImportReport,
35
  CorpusService,
36
  )
 
37
  from picarones.app.services.path_security import (
38
  PathValidationError,
39
  WorkspaceManager,
@@ -63,6 +64,7 @@ __all__ = [
63
  "CorpusImportReport",
64
  "CorpusService",
65
  "GroundTruthFactory",
 
66
  "OrchestrationResult",
67
  "PathValidationError",
68
  "PipelineInputsFactory",
 
34
  CorpusImportReport,
35
  CorpusService,
36
  )
37
+ from picarones.app.services.job_runner import JobRunner
38
  from picarones.app.services.path_security import (
39
  PathValidationError,
40
  WorkspaceManager,
 
64
  "CorpusImportReport",
65
  "CorpusService",
66
  "GroundTruthFactory",
67
+ "JobRunner",
68
  "OrchestrationResult",
69
  "PathValidationError",
70
  "PipelineInputsFactory",
picarones/app/services/job_runner.py ADDED
@@ -0,0 +1,256 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """``JobRunner`` — Sprint A14-S48.
2
+
3
+ Fix audit #2 : avant ce sprint, ``JobStore`` (S37) existait avec ses
4
+ endpoints ``GET / DELETE /api/jobs``, mais aucun moyen de **créer**
5
+ un job via l'API — pas de ``POST /api/jobs``, pas d'orchestrateur
6
+ async qui pousse les jobs. ``mark_orphaned_jobs_interrupted()`` était
7
+ documenté mais jamais appelé au boot.
8
+
9
+ ``JobRunner`` est le pont manquant entre l'API web et
10
+ ``RunOrchestrator``. Il :
11
+
12
+ 1. Crée un ``JobRecord`` dans le ``JobStore`` (status ``pending``).
13
+ 2. Lance un **thread daemon** qui exécute l'orchestrator de façon
14
+ synchrone.
15
+ 3. Met à jour le statut au fur et à mesure : ``running`` au démarrage,
16
+ ``complete`` ou ``error`` à la fin.
17
+ 4. Si le caller annule via ``DELETE /api/jobs/{id}`` (qui appelle
18
+ ``store.mark_cancelled``), le thread l'observe au prochain check
19
+ et abandonne — le résultat partiel est discardé.
20
+
21
+ Pourquoi un thread, pas asyncio
22
+ -------------------------------
23
+ ``RunOrchestrator.execute`` est **synchrone** et utilise un
24
+ ``ThreadPoolExecutor`` interne (``CorpusRunner``). Le wrapper avec
25
+ asyncio créerait du complexité gratuite (mix sync/async, GIL).
26
+ Un ``threading.Thread(daemon=True)`` est l'outil correct ici.
27
+
28
+ Cancellation coopérative
29
+ ------------------------
30
+ Pour S48, la cancellation est **best-effort** : le thread vérifie
31
+ ``store.get(job_id).status == "cancelled"`` AVANT et APRÈS l'appel
32
+ à ``orchestrator.execute``. Pendant l'exécution (potentiellement
33
+ plusieurs minutes), le thread ne peut pas interrompre l'orchestrator
34
+ sans support natif (cf. ``CorpusRunner.run(cancel_event=...)`` —
35
+ non encore propagé jusqu'à ``RunOrchestrator``).
36
+
37
+ Conséquence : ``DELETE /api/jobs/{id}`` pendant que le thread tourne
38
+ marque le statut comme ``cancelled``, mais le benchmark continue et
39
+ son résultat est discardé à la fin. Une amélioration future
40
+ propagerait le ``cancel_event`` jusqu'au runner.
41
+
42
+ Anti-sur-ingénierie
43
+ -------------------
44
+ - Pas de queue de jobs avec backpressure : un thread par submit.
45
+ Pour 100+ jobs simultanés, ajouter un ``ThreadPoolExecutor`` au
46
+ niveau du runner.
47
+ - Pas de retry automatique sur échec.
48
+ - Pas de notification SSE des changements de statut (le caller
49
+ poll ``GET /api/jobs/{id}``).
50
+ """
51
+
52
+ from __future__ import annotations
53
+
54
+ import logging
55
+ import threading
56
+ import uuid
57
+ from pathlib import Path
58
+ from typing import Any, Callable
59
+
60
+ from picarones.adapters.storage import JobStore
61
+
62
+ logger = logging.getLogger(__name__)
63
+
64
+
65
+ # Factory : un caller fournit un callable qui construit un
66
+ # ``RunOrchestrator`` lié à un ``output_dir`` donné. L'inversion
67
+ # évite à ce module d'importer ``RunOrchestrator`` directement
68
+ # (cycles potentiels) et permet aux tests d'injecter un mock.
69
+ OrchestratorFactory = Callable[[Path], Any]
70
+ ReportRenderer = Callable[[Any, Path, str], Path]
71
+
72
+
73
+ class JobRunner:
74
+ """Lance des jobs de benchmark en arrière-plan.
75
+
76
+ Parameters
77
+ ----------
78
+ job_store:
79
+ ``JobStore`` partagé avec les endpoints de lecture
80
+ (``GET /api/jobs``, ``DELETE /api/jobs/{id}``).
81
+ orchestrator_factory:
82
+ Callable ``(output_dir: Path) -> RunOrchestrator`` qui
83
+ construit un orchestrator par job. Permet à chaque job
84
+ d'avoir son propre output_dir isolé.
85
+ report_renderer:
86
+ Optionnel — passé à ``orchestrator.execute()`` pour rendre
87
+ le rapport HTML. Si ``None``, pas de rapport produit.
88
+
89
+ Notes
90
+ -----
91
+ L'instance est thread-safe : ``submit`` est appelé depuis le
92
+ thread FastAPI, le thread daemon écrit dans ``JobStore`` qui
93
+ sérialise ses opérations SQLite.
94
+ """
95
+
96
+ def __init__(
97
+ self,
98
+ job_store: JobStore,
99
+ orchestrator_factory: OrchestratorFactory,
100
+ report_renderer: ReportRenderer | None = None,
101
+ ) -> None:
102
+ if not isinstance(job_store, JobStore):
103
+ raise TypeError("job_store doit être un JobStore.")
104
+ if not callable(orchestrator_factory):
105
+ raise TypeError("orchestrator_factory doit être callable.")
106
+ if report_renderer is not None and not callable(report_renderer):
107
+ raise TypeError("report_renderer doit être callable ou None.")
108
+ self._store = job_store
109
+ self._factory = orchestrator_factory
110
+ self._report_renderer = report_renderer
111
+ # Tracking des threads actifs — utile pour les tests qui
112
+ # attendent la fin d'un job soumis.
113
+ self._threads: dict[str, threading.Thread] = {}
114
+
115
+ # ──────────────────────────────────────────────────────────────────
116
+ # API publique
117
+ # ──────────────────────────────────────────────────────────────────
118
+
119
+ def submit(
120
+ self,
121
+ run_spec: Any,
122
+ output_dir: Path | str,
123
+ *,
124
+ job_id: str | None = None,
125
+ payload: dict | None = None,
126
+ ) -> str:
127
+ """Crée un job et lance son exécution en thread arrière-plan.
128
+
129
+ Returns
130
+ -------
131
+ str
132
+ ``job_id`` (généré si non fourni). Utilisable pour
133
+ interroger ``GET /api/jobs/{job_id}``.
134
+
135
+ Notes
136
+ -----
137
+ Idempotent uniquement si ``job_id`` est fourni explicitement
138
+ (sinon UUID4 garantit l'unicité). Si le ``job_id`` existe
139
+ déjà, ``JobStore.create`` lève ``JobStoreError``.
140
+ """
141
+ job_id = job_id or uuid.uuid4().hex
142
+ out_path = Path(output_dir)
143
+ # ``payload`` est sérialisé en JSON dans le store — on stocke
144
+ # la version du run_spec pour traçabilité.
145
+ record_payload = dict(payload or {})
146
+ record_payload.setdefault("output_dir", str(out_path))
147
+ self._store.create(job_id, payload=record_payload)
148
+
149
+ thread = threading.Thread(
150
+ target=self._run,
151
+ args=(job_id, run_spec, out_path),
152
+ daemon=True,
153
+ name=f"picarones-job-{job_id[:8]}",
154
+ )
155
+ self._threads[job_id] = thread
156
+ thread.start()
157
+ logger.info("[job_runner] job %s soumis (thread démarré).", job_id)
158
+ return job_id
159
+
160
+ def wait(self, job_id: str, timeout: float | None = None) -> bool:
161
+ """Attend la fin du thread d'un job (utile aux tests).
162
+
163
+ Returns
164
+ -------
165
+ bool
166
+ ``True`` si le thread est terminé, ``False`` si timeout.
167
+ """
168
+ thread = self._threads.get(job_id)
169
+ if thread is None:
170
+ return True # job inconnu = considéré fini
171
+ thread.join(timeout=timeout)
172
+ return not thread.is_alive()
173
+
174
+ # ──────────────────────────────────────────────────────────────────
175
+ # Worker thread
176
+ # ──────────────────────────────────────────────────────────────────
177
+
178
+ def _run(
179
+ self,
180
+ job_id: str,
181
+ run_spec: Any,
182
+ output_dir: Path,
183
+ ) -> None:
184
+ """Logique exécutée dans le thread daemon. Capture toutes les
185
+ exceptions et les transcrit en statut ``error`` du store.
186
+
187
+ Hooks de cancellation coopérative :
188
+
189
+ - **Avant** ``orchestrator.execute()`` : si le statut a été
190
+ basculé en ``cancelled`` entre le ``submit`` et le démarrage
191
+ du thread, on saute l'exécution.
192
+ - **Après** ``orchestrator.execute()`` : si le statut a été
193
+ basculé en ``cancelled`` pendant l'exécution, on discarde
194
+ le résultat (le statut reste ``cancelled``).
195
+
196
+ Sinon, statut final = ``complete`` ou ``error``.
197
+ """
198
+ # 1. Check pré-démarrage : annulé avant que le thread n'ait
199
+ # pris la main ?
200
+ rec = self._store.get(job_id)
201
+ if rec is None:
202
+ logger.warning(
203
+ "[job_runner] job %s introuvable au démarrage du "
204
+ "thread — abandon.", job_id,
205
+ )
206
+ return
207
+ if rec.status == "cancelled":
208
+ logger.info(
209
+ "[job_runner] job %s annulé avant démarrage — skip.",
210
+ job_id,
211
+ )
212
+ return
213
+
214
+ # 2. Marquer en cours.
215
+ try:
216
+ self._store.mark_running(job_id)
217
+ except Exception as exc: # noqa: BLE001
218
+ logger.error(
219
+ "[job_runner] échec mark_running sur %s : %s — abandon.",
220
+ job_id, exc,
221
+ )
222
+ return
223
+
224
+ # 3. Exécution effective.
225
+ try:
226
+ orchestrator = self._factory(output_dir)
227
+ result = orchestrator.execute(
228
+ run_spec,
229
+ report_renderer=self._report_renderer,
230
+ )
231
+ except Exception as exc: # noqa: BLE001
232
+ error_msg = f"{type(exc).__name__}: {exc}"
233
+ logger.error(
234
+ "[job_runner] job %s en échec : %s",
235
+ job_id, error_msg,
236
+ )
237
+ self._store.mark_error(job_id, error_msg)
238
+ return
239
+
240
+ # 4. Check post-exécution : annulé pendant que le run tournait ?
241
+ rec_after = self._store.get(job_id)
242
+ if rec_after is not None and rec_after.status == "cancelled":
243
+ logger.info(
244
+ "[job_runner] job %s annulé pendant l'exécution — "
245
+ "résultat discardé.", job_id,
246
+ )
247
+ return
248
+
249
+ # 5. Succès — output_path = chemin du manifest persisté.
250
+ manifest_path = result.persisted_files.get("manifest")
251
+ output_path_str = str(manifest_path) if manifest_path else ""
252
+ self._store.mark_complete(job_id, output_path=output_path_str)
253
+ logger.info("[job_runner] job %s terminé avec succès.", job_id)
254
+
255
+
256
+ __all__ = ["JobRunner", "OrchestratorFactory", "ReportRenderer"]
picarones/interfaces/web/app.py CHANGED
@@ -36,6 +36,8 @@ mount des fichiers statiques.
36
 
37
  from __future__ import annotations
38
 
 
 
39
  from dataclasses import dataclass
40
  from pathlib import Path
41
 
@@ -45,10 +47,13 @@ from fastapi.staticfiles import StaticFiles
45
  from fastapi.templating import Jinja2Templates
46
  from pydantic import BaseModel
47
 
 
 
48
  from picarones.adapters.storage import JobStore
49
  from picarones.app.services import (
50
  BenchmarkService,
51
  CorpusService,
 
52
  RegistryService,
53
  RunOrchestrator,
54
  WorkspaceManager,
@@ -98,6 +103,7 @@ class WebAppState:
98
  benchmark: BenchmarkService
99
  orchestrator: RunOrchestrator
100
  job_store: JobStore | None = None
 
101
  version: str = "1.0.0"
102
 
103
 
@@ -141,15 +147,38 @@ def create_app(state: WebAppState) -> FastAPI:
141
  f"reçu {type(state).__name__}.",
142
  )
143
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  app = FastAPI(
145
  title="Picarones",
146
  description=(
147
  "Plateforme de benchmark OCR/HTR pour documents patrimoniaux. "
148
- "API du nouveau monde (Sprint A14-S35)."
149
  ),
150
  version=state.version,
151
  docs_url="/api/docs",
152
  redoc_url="/api/redoc",
 
153
  )
154
 
155
  # On stocke l'état dans app.state.picarones pour permettre aux
 
36
 
37
  from __future__ import annotations
38
 
39
+ import logging
40
+ from contextlib import asynccontextmanager
41
  from dataclasses import dataclass
42
  from pathlib import Path
43
 
 
47
  from fastapi.templating import Jinja2Templates
48
  from pydantic import BaseModel
49
 
50
+ _logger = logging.getLogger(__name__)
51
+
52
  from picarones.adapters.storage import JobStore
53
  from picarones.app.services import (
54
  BenchmarkService,
55
  CorpusService,
56
+ JobRunner,
57
  RegistryService,
58
  RunOrchestrator,
59
  WorkspaceManager,
 
103
  benchmark: BenchmarkService
104
  orchestrator: RunOrchestrator
105
  job_store: JobStore | None = None
106
+ job_runner: JobRunner | None = None
107
  version: str = "1.0.0"
108
 
109
 
 
147
  f"reçu {type(state).__name__}.",
148
  )
149
 
150
+ # Lifespan hook (S48) : nettoyage des jobs zombies au boot.
151
+ # Tout job en statut ``pending`` ou ``running`` au démarrage du
152
+ # process est forcément orphelin (le process précédent est mort
153
+ # sans le finir). On les bascule en ``interrupted`` pour ne pas
154
+ # laisser d'état mensonger sur le tableau de bord.
155
+ @asynccontextmanager
156
+ async def _lifespan(_app: FastAPI):
157
+ if state.job_store is not None:
158
+ try:
159
+ n = state.job_store.mark_orphaned_jobs_interrupted()
160
+ if n > 0:
161
+ _logger.info(
162
+ "[lifespan] %d job(s) orphelin(s) marqué(s) "
163
+ "interrupted au boot.", n,
164
+ )
165
+ except Exception as exc: # noqa: BLE001 — défense en profondeur
166
+ _logger.error(
167
+ "[lifespan] mark_orphaned_jobs_interrupted ÉCHOUÉ "
168
+ "— jobs zombies possibles : %s", exc,
169
+ )
170
+ yield
171
+
172
  app = FastAPI(
173
  title="Picarones",
174
  description=(
175
  "Plateforme de benchmark OCR/HTR pour documents patrimoniaux. "
176
+ "API du nouveau monde (Sprint A14-S35+)."
177
  ),
178
  version=state.version,
179
  docs_url="/api/docs",
180
  redoc_url="/api/redoc",
181
+ lifespan=_lifespan,
182
  )
183
 
184
  # On stocke l'état dans app.state.picarones pour permettre aux
picarones/interfaces/web/routers/jobs.py CHANGED
@@ -1,19 +1,18 @@
1
- """Router jobs — Sprint A14-S37.
2
 
3
- Endpoints de listing/lecture/cancellation des jobs de benchmark
4
- persistés via ``JobStore`` (S37, ``picarones.adapters.storage``).
5
 
6
  Endpoints
7
  ---------
8
  - ``GET /api/jobs`` : liste des jobs (récents en tête).
9
  - ``GET /api/jobs/{job_id}`` : détail + progression.
 
10
  - ``DELETE /api/jobs/{job_id}`` : annulation explicite.
11
 
12
- L'endpoint **POST /api/jobs** (création + lancement asynchrone) est
13
- volontairement reporté à un sprint dédié de l'intégration runtime
14
- il nécessite un thread d'exécution branché sur ``RunOrchestrator``
15
- (au-delà du périmètre S37 qui livre la persistance + les endpoints
16
- de lecture).
17
 
18
  Anti-sur-ingénierie
19
  -------------------
@@ -28,9 +27,14 @@ from __future__ import annotations
28
 
29
  import logging
30
 
31
- from fastapi import APIRouter, HTTPException, Request, status
32
  from pydantic import BaseModel, Field
33
 
 
 
 
 
 
34
  logger = logging.getLogger(__name__)
35
 
36
 
@@ -82,6 +86,19 @@ class JobCancelResponse(BaseModel):
82
  status: str
83
 
84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  # ──────────────────────────────────────────────────────────────────────
86
  # Helpers
87
  # ──────────────────────────────────────────────────────────────────────
@@ -99,6 +116,19 @@ def _require_job_store(state) -> "object":
99
  return state.job_store
100
 
101
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
  def _to_summary(rec) -> JobSummary:
103
  return JobSummary(
104
  job_id=rec.job_id,
@@ -135,6 +165,88 @@ def _to_detail(rec) -> JobDetailResponse:
135
  # ──────────────────────────────────────────────────────────────────────
136
 
137
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  @router.get("", response_model=JobListResponse)
139
  async def list_jobs(request: Request) -> JobListResponse:
140
  """Liste les jobs (récents en tête)."""
 
1
+ """Router jobs — Sprints A14-S37 + S48.
2
 
3
+ Endpoints de gestion des jobs de benchmark, adossés à
4
+ ``JobStore`` (S37) + ``JobRunner`` (S48).
5
 
6
  Endpoints
7
  ---------
8
  - ``GET /api/jobs`` : liste des jobs (récents en tête).
9
  - ``GET /api/jobs/{job_id}`` : détail + progression.
10
+ - ``POST /api/jobs`` : création + lancement asynchrone.
11
  - ``DELETE /api/jobs/{job_id}`` : annulation explicite.
12
 
13
+ S37 (initial) livrait les 3 premiers (lecture + cancellation).
14
+ S48 ajoute ``POST`` qui était identifié comme **manque critique**
15
+ dans l'audit du rewrite (l'audit #2).
 
 
16
 
17
  Anti-sur-ingénierie
18
  -------------------
 
27
 
28
  import logging
29
 
30
+ from fastapi import APIRouter, Body, HTTPException, Request, status
31
  from pydantic import BaseModel, Field
32
 
33
+ from picarones.app.schemas.run_spec import (
34
+ RunSpecLoadError,
35
+ load_run_spec_from_yaml,
36
+ )
37
+
38
  logger = logging.getLogger(__name__)
39
 
40
 
 
86
  status: str
87
 
88
 
89
+ class JobSubmitResponse(BaseModel):
90
+ """Réponse JSON pour ``POST /api/jobs`` (202 Accepted)."""
91
+
92
+ job_id: str
93
+ status: str = Field(
94
+ default="pending",
95
+ description=(
96
+ "Statut au moment de la soumission. Le client poll "
97
+ "``GET /api/jobs/{job_id}`` pour suivre la progression."
98
+ ),
99
+ )
100
+
101
+
102
  # ──────────────────────────────────────────────────────────────────────
103
  # Helpers
104
  # ──────────────────────────────────────────────────────────────────────
 
116
  return state.job_store
117
 
118
 
119
+ def _require_job_runner(state) -> "object":
120
+ if state.job_runner is None:
121
+ raise HTTPException(
122
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
123
+ detail=(
124
+ "Job runner non configuré dans WebAppState — "
125
+ "l'exécution asynchrone des jobs n'est pas activée. "
126
+ "Voir picarones.app.services.JobRunner pour le câblage."
127
+ ),
128
+ )
129
+ return state.job_runner
130
+
131
+
132
  def _to_summary(rec) -> JobSummary:
133
  return JobSummary(
134
  job_id=rec.job_id,
 
165
  # ──────────────────────────────────────────────────────────────────────
166
 
167
 
168
+ @router.post(
169
+ "",
170
+ response_model=JobSubmitResponse,
171
+ status_code=status.HTTP_202_ACCEPTED,
172
+ )
173
+ async def submit_job(
174
+ request: Request,
175
+ run_spec_yaml: str = Body(
176
+ ...,
177
+ media_type="text/plain",
178
+ description=(
179
+ "Contenu YAML d'un ``RunSpec`` (cf. picarones.app.schemas."
180
+ "run_spec). Le corps de la requête est le YAML brut."
181
+ ),
182
+ ),
183
+ ) -> JobSubmitResponse:
184
+ """Crée un job + lance son exécution en arrière-plan (S48).
185
+
186
+ Le corps de la requête est le YAML brut d'un ``RunSpec`` (mêmes
187
+ champs que ce que la CLI ``picarones-rewrite run`` accepte).
188
+
189
+ Comportement :
190
+
191
+ 1. Le YAML est parsé et validé (``load_run_spec_from_yaml``).
192
+ Erreur de format → 400 avec message du loader.
193
+ 2. Un ``JobRecord`` est créé en statut ``pending`` avec un
194
+ ``job_id`` UUID4.
195
+ 3. Un thread daemon est lancé pour exécuter le ``RunOrchestrator``
196
+ avec le ``RunSpec``.
197
+ 4. Réponse immédiate ``202 Accepted`` avec ``job_id`` — le
198
+ client poll ``GET /api/jobs/{job_id}`` pour suivre.
199
+
200
+ Concurrence
201
+ -----------
202
+ Un thread par job ; pas de queue/backpressure. Pour 100+ jobs
203
+ simultanés, ajouter un ``ThreadPoolExecutor`` au niveau de
204
+ ``JobRunner`` (post-livraison).
205
+ """
206
+ state = request.app.state.picarones
207
+ runner = _require_job_runner(state)
208
+
209
+ if not run_spec_yaml or not run_spec_yaml.strip():
210
+ raise HTTPException(
211
+ status_code=status.HTTP_400_BAD_REQUEST,
212
+ detail="Corps de la requête vide — YAML RunSpec attendu.",
213
+ )
214
+
215
+ try:
216
+ run_spec = load_run_spec_from_yaml(run_spec_yaml)
217
+ except RunSpecLoadError as exc:
218
+ raise HTTPException(
219
+ status_code=status.HTTP_400_BAD_REQUEST,
220
+ detail=f"RunSpec invalide : {exc}",
221
+ ) from exc
222
+
223
+ # Output dir : sous-dossier dédié au job dans le workspace. Le
224
+ # JobRunner s'en sert pour construire un RunOrchestrator isolé.
225
+ import uuid
226
+ job_id_candidate = uuid.uuid4().hex
227
+ output_dir = (
228
+ state.workspace.root / "runs" / job_id_candidate
229
+ )
230
+
231
+ try:
232
+ job_id = runner.submit(
233
+ run_spec=run_spec,
234
+ output_dir=output_dir,
235
+ job_id=job_id_candidate,
236
+ payload={"corpus_name": run_spec.corpus_name or ""},
237
+ )
238
+ except Exception as exc: # noqa: BLE001
239
+ logger.error(
240
+ "[jobs] échec de submit pour run_spec : %s", exc, exc_info=True,
241
+ )
242
+ raise HTTPException(
243
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
244
+ detail=f"Échec de soumission du job : {type(exc).__name__}",
245
+ ) from exc
246
+
247
+ return JobSubmitResponse(job_id=job_id, status="pending")
248
+
249
+
250
  @router.get("", response_model=JobListResponse)
251
  async def list_jobs(request: Request) -> JobListResponse:
252
  """Liste les jobs (récents en tête)."""
tests/app/services/test_sprint_a14_s48_job_runner.py ADDED
@@ -0,0 +1,380 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Sprint A14-S48 — ``JobRunner`` + lifespan hook + ``POST /api/jobs``.
2
+
3
+ Fix audit #2 : avant ce sprint, ``JobStore`` (S37) était à moitié
4
+ branché — pas de ``POST /api/jobs``, pas de lifespan hook, pas
5
+ d'orchestrateur async.
6
+
7
+ Tests couvrent les 3 chantiers :
8
+
9
+ 1. ``JobRunner`` (service applicatif) :
10
+ - submit + thread démarré, job marqué ``running`` puis ``complete`` ;
11
+ - exception orchestrator → ``error`` avec message ;
12
+ - cancellation pré-démarrage → thread skippe l'exécution ;
13
+ - cancellation post-démarrage → résultat discardé.
14
+
15
+ 2. Lifespan hook : ``mark_orphaned_jobs_interrupted`` appelé au boot.
16
+
17
+ 3. ``POST /api/jobs`` :
18
+ - YAML valide → 202 + job_id ;
19
+ - YAML invalide → 400 ;
20
+ - corps vide → 400 ;
21
+ - sans job_runner configuré → 503.
22
+ """
23
+
24
+ from __future__ import annotations
25
+
26
+ import time
27
+ from pathlib import Path
28
+ from unittest.mock import MagicMock
29
+
30
+ import pytest
31
+ from fastapi.testclient import TestClient
32
+
33
+ from picarones.adapters.storage import JobStore
34
+ from picarones.app.services import JobRunner
35
+ from picarones.app.services import (
36
+ RegistryService,
37
+ WorkspaceManager,
38
+ )
39
+ from picarones.interfaces.web import WebAppState, create_app
40
+
41
+
42
+ # ──────────────────────────────────────────────────────────────────────
43
+ # Stub orchestrator + factory
44
+ # ──────────────────────────────────────────────────────────────────────
45
+
46
+
47
+ class _StubOrchestrator:
48
+ """Stub qui simule un orchestrator : succès, échec, ou délai."""
49
+
50
+ def __init__(
51
+ self,
52
+ *,
53
+ manifest_path: Path,
54
+ delay_seconds: float = 0.0,
55
+ raise_on_execute: Exception | None = None,
56
+ ) -> None:
57
+ self.manifest_path = manifest_path
58
+ self.delay_seconds = delay_seconds
59
+ self.raise_on_execute = raise_on_execute
60
+ self.execute_called = False
61
+
62
+ def execute(self, run_spec, *, report_renderer=None):
63
+ self.execute_called = True
64
+ if self.delay_seconds:
65
+ time.sleep(self.delay_seconds)
66
+ if self.raise_on_execute is not None:
67
+ raise self.raise_on_execute
68
+ result = MagicMock()
69
+ result.persisted_files = {"manifest": self.manifest_path}
70
+ return result
71
+
72
+
73
+ def _make_factory(stub: _StubOrchestrator):
74
+ """Retourne une factory `(output_dir) -> stub` pour JobRunner."""
75
+ def _factory(output_dir):
76
+ return stub
77
+ return _factory
78
+
79
+
80
+ # ──────────────────────────────────────────────────────────────────────
81
+ # JobRunner unitaires
82
+ # ──────────────────────────────────────────────────────────────────────
83
+
84
+
85
+ class TestJobRunnerConstructor:
86
+ def test_rejects_non_jobstore(self) -> None:
87
+ with pytest.raises(TypeError, match="JobStore"):
88
+ JobRunner(
89
+ job_store="nope", # type: ignore[arg-type]
90
+ orchestrator_factory=lambda d: None,
91
+ )
92
+
93
+ def test_rejects_non_callable_factory(self, tmp_path: Path) -> None:
94
+ store = JobStore(tmp_path / "jobs.db")
95
+ with pytest.raises(TypeError, match="orchestrator_factory"):
96
+ JobRunner(
97
+ job_store=store,
98
+ orchestrator_factory="nope", # type: ignore[arg-type]
99
+ )
100
+
101
+ def test_rejects_non_callable_renderer(self, tmp_path: Path) -> None:
102
+ store = JobStore(tmp_path / "jobs.db")
103
+ with pytest.raises(TypeError, match="report_renderer"):
104
+ JobRunner(
105
+ job_store=store,
106
+ orchestrator_factory=lambda d: None,
107
+ report_renderer="nope", # type: ignore[arg-type]
108
+ )
109
+
110
+
111
+ class TestJobRunnerHappyPath:
112
+ def test_submit_creates_job_and_marks_complete(self, tmp_path: Path) -> None:
113
+ store = JobStore(tmp_path / "jobs.db")
114
+ manifest = tmp_path / "manifest.json"
115
+ manifest.write_text("{}", encoding="utf-8")
116
+ stub = _StubOrchestrator(manifest_path=manifest)
117
+ runner = JobRunner(store, _make_factory(stub))
118
+
119
+ job_id = runner.submit(
120
+ run_spec=MagicMock(),
121
+ output_dir=tmp_path / "run_out",
122
+ )
123
+ assert runner.wait(job_id, timeout=5.0)
124
+ assert stub.execute_called
125
+
126
+ rec = store.get(job_id)
127
+ assert rec is not None
128
+ assert rec.status == "complete"
129
+ assert rec.output_path == str(manifest)
130
+
131
+ def test_submit_returns_unique_uuid_when_no_id(
132
+ self, tmp_path: Path,
133
+ ) -> None:
134
+ store = JobStore(tmp_path / "jobs.db")
135
+ manifest = tmp_path / "manifest.json"
136
+ manifest.write_text("{}", encoding="utf-8")
137
+ stub = _StubOrchestrator(manifest_path=manifest)
138
+ runner = JobRunner(store, _make_factory(stub))
139
+
140
+ job_id_1 = runner.submit(
141
+ run_spec=MagicMock(),
142
+ output_dir=tmp_path / "out1",
143
+ )
144
+ job_id_2 = runner.submit(
145
+ run_spec=MagicMock(),
146
+ output_dir=tmp_path / "out2",
147
+ )
148
+ assert job_id_1 != job_id_2
149
+ runner.wait(job_id_1, timeout=5.0)
150
+ runner.wait(job_id_2, timeout=5.0)
151
+
152
+ def test_submit_stores_explicit_job_id(self, tmp_path: Path) -> None:
153
+ store = JobStore(tmp_path / "jobs.db")
154
+ manifest = tmp_path / "m.json"
155
+ manifest.write_text("{}", encoding="utf-8")
156
+ stub = _StubOrchestrator(manifest_path=manifest)
157
+ runner = JobRunner(store, _make_factory(stub))
158
+
159
+ returned = runner.submit(
160
+ run_spec=MagicMock(),
161
+ output_dir=tmp_path / "out",
162
+ job_id="my_explicit_id",
163
+ )
164
+ assert returned == "my_explicit_id"
165
+ runner.wait("my_explicit_id", timeout=5.0)
166
+ assert store.get("my_explicit_id") is not None
167
+
168
+
169
+ class TestJobRunnerErrorPath:
170
+ def test_orchestrator_exception_marks_error(self, tmp_path: Path) -> None:
171
+ store = JobStore(tmp_path / "jobs.db")
172
+ stub = _StubOrchestrator(
173
+ manifest_path=tmp_path / "x",
174
+ raise_on_execute=RuntimeError("orchestrator boom"),
175
+ )
176
+ runner = JobRunner(store, _make_factory(stub))
177
+
178
+ job_id = runner.submit(
179
+ run_spec=MagicMock(),
180
+ output_dir=tmp_path / "out",
181
+ )
182
+ runner.wait(job_id, timeout=5.0)
183
+
184
+ rec = store.get(job_id)
185
+ assert rec is not None
186
+ assert rec.status == "error"
187
+ assert "RuntimeError" in rec.error
188
+ assert "orchestrator boom" in rec.error
189
+
190
+
191
+ class TestJobRunnerCancellation:
192
+ def test_cancel_during_execution_discards_result(
193
+ self, tmp_path: Path,
194
+ ) -> None:
195
+ """Cancel pendant que le worker tourne → le résultat est
196
+ discardé (statut reste cancelled)."""
197
+ store = JobStore(tmp_path / "jobs.db")
198
+ manifest = tmp_path / "m.json"
199
+ manifest.write_text("{}", encoding="utf-8")
200
+ # Délai suffisant pour cancel avant complétion.
201
+ stub = _StubOrchestrator(
202
+ manifest_path=manifest, delay_seconds=0.3,
203
+ )
204
+ runner = JobRunner(store, _make_factory(stub))
205
+
206
+ job_id = runner.submit(
207
+ run_spec=MagicMock(),
208
+ output_dir=tmp_path / "out",
209
+ )
210
+ # Attendre que mark_running ait été appelé (le thread a démarré).
211
+ for _ in range(50):
212
+ time.sleep(0.01)
213
+ rec = store.get(job_id)
214
+ if rec is not None and rec.status == "running":
215
+ break
216
+ # Cancel en pleine exécution.
217
+ store.mark_cancelled(job_id)
218
+ # Attendre la fin du thread (~0.3s).
219
+ runner.wait(job_id, timeout=5.0)
220
+ rec_final = store.get(job_id)
221
+ assert rec_final.status == "cancelled", (
222
+ f"Status final attendu cancelled, obtenu {rec_final.status}"
223
+ )
224
+
225
+
226
+ # ──────────────────────────────────────────────────────────────────────
227
+ # Lifespan hook (mark_orphaned_jobs_interrupted au boot)
228
+ # ──────────────────────────────────────────────────────────────────────
229
+
230
+
231
+ class TestLifespanHook:
232
+ def test_orphaned_jobs_marked_interrupted_on_app_start(
233
+ self, tmp_path: Path,
234
+ ) -> None:
235
+ """Pré-condition : un job ``running`` existe dans le store
236
+ (simule un crash du process précédent).
237
+ Action : démarrage de l'app FastAPI (lifespan hook).
238
+ Résultat : le job orphelin est marqué ``interrupted``."""
239
+ # Phase 1 : pré-pollution du store (simule l'état après crash).
240
+ db_path = tmp_path / "jobs.db"
241
+ store = JobStore(db_path)
242
+ store.create("zombie_pending")
243
+ store.create("zombie_running")
244
+ store.mark_running("zombie_running")
245
+ store.create("complete_one")
246
+ store.mark_complete("complete_one")
247
+ # Vérification pré-état.
248
+ assert store.get("zombie_pending").status == "pending"
249
+ assert store.get("zombie_running").status == "running"
250
+ assert store.get("complete_one").status == "complete"
251
+
252
+ # Phase 2 : démarrage de l'app — lifespan hook s'exécute.
253
+ workspace = WorkspaceManager(base_dir=tmp_path, session_id="s48")
254
+ registry = RegistryService.bootstrap_defaults()
255
+ state = WebAppState(
256
+ workspace=workspace,
257
+ registry=registry,
258
+ corpus=MagicMock(),
259
+ benchmark=MagicMock(),
260
+ orchestrator=MagicMock(),
261
+ job_store=store, # store pré-pollué
262
+ )
263
+ app = create_app(state)
264
+ # Le lifespan hook tourne au context manager du TestClient.
265
+ with TestClient(app) as client:
266
+ # Le hook a tourné au démarrage. On vérifie l'état du store.
267
+ assert store.get("zombie_pending").status == "interrupted"
268
+ assert store.get("zombie_running").status == "interrupted"
269
+ # Les jobs déjà terminaux ne sont pas touchés.
270
+ assert store.get("complete_one").status == "complete"
271
+ # Sanity check : l'app répond.
272
+ assert client.get("/health").status_code == 200
273
+
274
+
275
+ # ──────────────────────────────────────────────────────────────────────
276
+ # POST /api/jobs (intégration end-to-end via TestClient)
277
+ # ──────────────────────────────────────────────────────────────────────
278
+
279
+
280
+ def _make_state_with_runner(tmp_path: Path) -> WebAppState:
281
+ """Construit un WebAppState complet avec JobStore + JobRunner.
282
+
283
+ L'orchestrator est un stub qui complète immédiatement (pour que
284
+ les tests POST puissent vérifier le statut).
285
+ """
286
+ workspace = WorkspaceManager(base_dir=tmp_path, session_id="s48")
287
+ registry = RegistryService.bootstrap_defaults()
288
+ job_store = JobStore(tmp_path / "jobs.db")
289
+
290
+ manifest_path = tmp_path / "manifest.json"
291
+ manifest_path.write_text("{}", encoding="utf-8")
292
+
293
+ # Stub orchestrator factory.
294
+ def _factory(output_dir):
295
+ return _StubOrchestrator(manifest_path=manifest_path)
296
+
297
+ job_runner = JobRunner(
298
+ job_store=job_store,
299
+ orchestrator_factory=_factory,
300
+ )
301
+ return WebAppState(
302
+ workspace=workspace,
303
+ registry=registry,
304
+ corpus=MagicMock(),
305
+ benchmark=MagicMock(),
306
+ orchestrator=MagicMock(),
307
+ job_store=job_store,
308
+ job_runner=job_runner,
309
+ )
310
+
311
+
312
+ _VALID_RUNSPEC_YAML = """
313
+ corpus_dir: /tmp/c
314
+ output_dir: /tmp/out
315
+ pipelines:
316
+ - name: ocr_only
317
+ initial_inputs: [image]
318
+ steps:
319
+ - id: ocr
320
+ adapter_class: my_pkg.OCR
321
+ input_types: [image]
322
+ output_types: [raw_text]
323
+ views: [text_final]
324
+ """.strip()
325
+
326
+
327
+ class TestPostJobsEndpoint:
328
+ def test_valid_yaml_returns_202_with_job_id(self, tmp_path: Path) -> None:
329
+ state = _make_state_with_runner(tmp_path)
330
+ app = create_app(state)
331
+ with TestClient(app) as client:
332
+ response = client.post("/api/jobs", content=_VALID_RUNSPEC_YAML)
333
+ assert response.status_code == 202, response.text
334
+ body = response.json()
335
+ assert "job_id" in body
336
+ assert body["status"] == "pending"
337
+ # Le job_id retourné est dans le store.
338
+ assert state.job_store.get(body["job_id"]) is not None
339
+
340
+ def test_invalid_yaml_returns_400(self, tmp_path: Path) -> None:
341
+ state = _make_state_with_runner(tmp_path)
342
+ app = create_app(state)
343
+ with TestClient(app) as client:
344
+ response = client.post(
345
+ "/api/jobs",
346
+ content="not a valid runspec yaml: [",
347
+ )
348
+ assert response.status_code == 400
349
+ assert "RunSpec" in response.json()["detail"]
350
+
351
+ def test_empty_body_returns_400_or_422(self, tmp_path: Path) -> None:
352
+ """Body vide → 400 (notre check) ou 422 (pydantic validation
353
+ en amont du handler). Les deux sont acceptables pour
354
+ l'utilisateur."""
355
+ state = _make_state_with_runner(tmp_path)
356
+ app = create_app(state)
357
+ with TestClient(app) as client:
358
+ response = client.post("/api/jobs", content="")
359
+ # FastAPI/Starlette peut valider Body(...) en 422 avant
360
+ # d'atteindre notre handler ; sinon notre check répond 400.
361
+ assert response.status_code in (400, 422)
362
+
363
+ def test_no_job_runner_returns_503(self, tmp_path: Path) -> None:
364
+ """Sans WebAppState.job_runner, POST /api/jobs → 503."""
365
+ workspace = WorkspaceManager(base_dir=tmp_path, session_id="s48")
366
+ registry = RegistryService.bootstrap_defaults()
367
+ state = WebAppState(
368
+ workspace=workspace,
369
+ registry=registry,
370
+ corpus=MagicMock(),
371
+ benchmark=MagicMock(),
372
+ orchestrator=MagicMock(),
373
+ job_store=JobStore(tmp_path / "jobs.db"),
374
+ # job_runner=None par défaut
375
+ )
376
+ app = create_app(state)
377
+ with TestClient(app) as client:
378
+ response = client.post("/api/jobs", content=_VALID_RUNSPEC_YAML)
379
+ assert response.status_code == 503
380
+ assert "Job runner" in response.json()["detail"]