Claude commited on
Commit
31f753b
·
unverified ·
1 Parent(s): 0c91c9b

fix(web): JOBS registry thread-safe via helpers state.register_job / get_job_in_memory

Browse files

Audit a révélé une race condition asymétrique sur le registre
``state.JOBS`` :

- ``cleanup_old_jobs()`` itérait sous ``JOBS_LOCK``,
- Mais les 6 accès dans les routeurs (``benchmark.py``,
``synthesis.py``) faisaient ``state.JOBS[id] = job`` et
``state.JOBS.get(id)`` **sans verrou**.

Conséquence : sous charge concurrente, ``cleanup_old_jobs()``
itérant sur ``JOBS.items()`` pouvait lever
``RuntimeError: dictionary changed size during iteration`` quand
un autre thread insérait un nouveau job. Le GIL protège
l'atomicité d'une opération ``dict[k] = v``, pas la cohérence
d'une boucle.

Fix : trois helpers thread-safe ajoutés à ``state.py`` :

- ``register_job(job)`` — ``JOBS[job.job_id] = job`` sous lock
- ``get_job_in_memory(job_id) -> Optional[BenchmarkJob]`` — ``JOBS.get(...)`` sous lock
- ``unregister_job(job_id)`` — ``JOBS.pop(..., None)`` sous lock (idempotent)

Tous les routeurs utilisent désormais ces helpers ; ``state.JOBS``
n'est plus accédé directement depuis l'extérieur de ``state.py``
(sauf par les fixtures de tests qui font ``state.JOBS.clear()``,
opération atomique sans itération concurrente).

Discipline d'accès documentée dans la docstring de ``state.JOBS``.

Pytest : 3354 passed, 2 skipped, 0 failed. Ruff : All checks passed.

https://claude.ai/code/session_01Hsd7kL8yeCbXn1mA7GQK9L

picarones/web/routers/benchmark.py CHANGED
@@ -87,7 +87,7 @@ async def api_benchmark_start(req: BenchmarkRequest, request: Request) -> dict:
87
  job_id = str(uuid.uuid4())
88
  job = state.BenchmarkJob(job_id=job_id, _store=state.JOB_STORE)
89
  state.JOB_STORE.create_job(job_id)
90
- state.JOBS[job_id] = job
91
  state.cleanup_old_jobs()
92
 
93
  _start_job_thread(job, run_benchmark_thread, req)
@@ -136,7 +136,7 @@ async def api_benchmark_run(req: BenchmarkRunRequest, request: Request) -> dict:
136
  job_id = str(uuid.uuid4())
137
  job = state.BenchmarkJob(job_id=job_id, _store=state.JOB_STORE)
138
  state.JOB_STORE.create_job(job_id)
139
- state.JOBS[job_id] = job
140
 
141
  _start_job_thread(job, run_benchmark_thread_v2, req)
142
  return {"job_id": job_id, "status": "pending"}
@@ -149,7 +149,7 @@ async def api_benchmark_run(req: BenchmarkRunRequest, request: Request) -> dict:
149
  @router.get("/api/benchmark/{job_id}/status")
150
  async def api_benchmark_status(job_id: str) -> dict:
151
  """Statut courant d'un job (RAM si disponible, sinon DB)."""
152
- job = state.JOBS.get(job_id)
153
  if job is not None:
154
  return job.as_dict()
155
  # Sprint 26 — fallback DB : le job n'est pas (plus) en RAM dans ce
@@ -174,7 +174,7 @@ async def api_benchmark_status(job_id: str) -> dict:
174
  @router.post("/api/benchmark/{job_id}/cancel")
175
  async def api_benchmark_cancel(job_id: str) -> dict:
176
  """Annule un job en cours (no-op si déjà terminé)."""
177
- job = state.JOBS.get(job_id)
178
  if not job:
179
  raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
180
  if job.status in ("complete", "error"):
@@ -216,7 +216,7 @@ async def api_benchmark_stream(job_id: str, request: Request) -> StreamingRespon
216
  except ValueError:
217
  last_seq = 0
218
 
219
- job = state.JOBS.get(job_id)
220
  db_job = state.JOB_STORE.get_job(job_id)
221
  if job is None and db_job is None:
222
  raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
 
87
  job_id = str(uuid.uuid4())
88
  job = state.BenchmarkJob(job_id=job_id, _store=state.JOB_STORE)
89
  state.JOB_STORE.create_job(job_id)
90
+ state.register_job(job)
91
  state.cleanup_old_jobs()
92
 
93
  _start_job_thread(job, run_benchmark_thread, req)
 
136
  job_id = str(uuid.uuid4())
137
  job = state.BenchmarkJob(job_id=job_id, _store=state.JOB_STORE)
138
  state.JOB_STORE.create_job(job_id)
139
+ state.register_job(job)
140
 
141
  _start_job_thread(job, run_benchmark_thread_v2, req)
142
  return {"job_id": job_id, "status": "pending"}
 
149
  @router.get("/api/benchmark/{job_id}/status")
150
  async def api_benchmark_status(job_id: str) -> dict:
151
  """Statut courant d'un job (RAM si disponible, sinon DB)."""
152
+ job = state.get_job_in_memory(job_id)
153
  if job is not None:
154
  return job.as_dict()
155
  # Sprint 26 — fallback DB : le job n'est pas (plus) en RAM dans ce
 
174
  @router.post("/api/benchmark/{job_id}/cancel")
175
  async def api_benchmark_cancel(job_id: str) -> dict:
176
  """Annule un job en cours (no-op si déjà terminé)."""
177
+ job = state.get_job_in_memory(job_id)
178
  if not job:
179
  raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
180
  if job.status in ("complete", "error"):
 
216
  except ValueError:
217
  last_seq = 0
218
 
219
+ job = state.get_job_in_memory(job_id)
220
  db_job = state.JOB_STORE.get_job(job_id)
221
  if job is None and db_job is None:
222
  raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
picarones/web/routers/synthesis.py CHANGED
@@ -37,7 +37,7 @@ async def api_benchmark_synthesis_preview(job_id: str, lang: str = "fr") -> dict
37
  lang = "fr"
38
 
39
  # Statut courant : RAM si dispo, sinon DB.
40
- ram_job = state.JOBS.get(job_id)
41
  db_job = state.JOB_STORE.get_job(job_id)
42
  if ram_job is None and db_job is None:
43
  raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
 
37
  lang = "fr"
38
 
39
  # Statut courant : RAM si dispo, sinon DB.
40
+ ram_job = state.get_job_in_memory(job_id)
41
  db_job = state.JOB_STORE.get_job(job_id)
42
  if ram_job is None and db_job is None:
43
  raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
picarones/web/state.py CHANGED
@@ -219,7 +219,15 @@ class BenchmarkJob:
219
  # ──────────────────────────────────────────────────────────────────────────
220
 
221
  JOBS: dict[str, BenchmarkJob] = {}
222
- """Registre en mémoire des jobs (par ``job_id``)."""
 
 
 
 
 
 
 
 
223
 
224
  JOBS_MAX = 100
225
  """Nombre maximum de jobs conservés en mémoire avant nettoyage."""
@@ -227,6 +235,29 @@ JOBS_MAX = 100
227
  JOBS_LOCK = threading.Lock()
228
 
229
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
  def cleanup_old_jobs() -> None:
231
  """Supprime les jobs terminés les plus anciens si on dépasse ``JOBS_MAX``."""
232
  with JOBS_LOCK:
@@ -256,5 +287,8 @@ __all__ = [
256
  "JOBS",
257
  "JOBS_MAX",
258
  "JOBS_LOCK",
 
 
 
259
  "cleanup_old_jobs",
260
  ]
 
219
  # ──────────────────────────────────────────────────────────────────────────
220
 
221
  JOBS: dict[str, BenchmarkJob] = {}
222
+ """Registre en mémoire des jobs (par ``job_id``).
223
+
224
+ **Discipline d'accès** : tous les ``read`` et ``write`` doivent passer
225
+ par les helpers ``register_job``, ``get_job_in_memory``,
226
+ ``unregister_job`` qui prennent ``JOBS_LOCK``. Lire ou muter ce dict
227
+ sans verrou expose à un ``RuntimeError: dictionary changed size
228
+ during iteration`` sous charge concurrente (le GIL protège l'atomicité
229
+ d'une opération mais pas la cohérence d'une boucle).
230
+ """
231
 
232
  JOBS_MAX = 100
233
  """Nombre maximum de jobs conservés en mémoire avant nettoyage."""
 
235
  JOBS_LOCK = threading.Lock()
236
 
237
 
238
+ def register_job(job: BenchmarkJob) -> None:
239
+ """Enregistre ``job`` dans le registre mémoire (thread-safe)."""
240
+ with JOBS_LOCK:
241
+ JOBS[job.job_id] = job
242
+
243
+
244
+ def get_job_in_memory(job_id: str) -> Optional[BenchmarkJob]:
245
+ """Récupère un ``BenchmarkJob`` du registre mémoire (thread-safe).
246
+
247
+ Retourne ``None`` si le job n'est pas (ou plus) en RAM. Les
248
+ consommateurs qui veulent un fallback DB doivent appeler
249
+ ``JOB_STORE.get_job(job_id)`` séparément.
250
+ """
251
+ with JOBS_LOCK:
252
+ return JOBS.get(job_id)
253
+
254
+
255
+ def unregister_job(job_id: str) -> None:
256
+ """Retire un job du registre mémoire (thread-safe ; idempotent)."""
257
+ with JOBS_LOCK:
258
+ JOBS.pop(job_id, None)
259
+
260
+
261
  def cleanup_old_jobs() -> None:
262
  """Supprime les jobs terminés les plus anciens si on dépasse ``JOBS_MAX``."""
263
  with JOBS_LOCK:
 
287
  "JOBS",
288
  "JOBS_MAX",
289
  "JOBS_LOCK",
290
+ "register_job",
291
+ "get_job_in_memory",
292
+ "unregister_job",
293
  "cleanup_old_jobs",
294
  ]