File size: 13,716 Bytes
890e849
 
 
0137610
890e849
 
 
 
 
 
 
 
 
 
e407ec0
890e849
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
"""Persistance SQLite des jobs de benchmark (Sprint 26).

Avant le Sprint 26, l'état des benchmarks vivait uniquement en mémoire dans
``picarones.interfaces.web.app._JOBS``. Trois conséquences :

1. Un redémarrage du worker uvicorn (OOM, déploiement, ``kill -HUP``)
   perdait l'état de tous les benchmarks en cours et un client SSE qui se
   reconnectait recevait un ``404`` cohérent.
2. Le ``asyncio.Queue(maxsize=200)`` des SSE perdait silencieusement des
   événements (``put_nowait`` swallow ``QueueFull``).
3. Aucune trace pour debug si un benchmark se figeait — pas d'historique
   au-delà de ce que ``BenchmarkJob.events`` portait en RAM.

Le Sprint 26 adresse les trois en persistant les jobs et leurs événements
dans une base SQLite locale (cohérent avec ``picarones.evaluation.metrics.history``,
qui utilise déjà SQLite). La base joue trois rôles :

- **Source de vérité** pour le statut/progression d'un job — ``BenchmarkJob``
  reste comme cache RAM mais n'est plus le ground truth.
- **Backlog d'événements** pour les SSE — un client qui reprend la
  connexion envoie ``Last-Event-ID`` et reçoit tous les événements de
  ``seq > last_seq`` puis bascule en streaming live.
- **Détection des jobs orphelins** au boot — tout job ``running`` à
  l'initialisation de l'app est marqué ``interrupted`` (le processus
  précédent est mort sans le finir).

Conventions
-----------
- Une seule base par instance (``./jobs.db`` par défaut, configurable via
  l'env var ``PICARONES_JOBS_DB``).
- Mode WAL activé pour permettre les lectures concurrentes pendant qu'un
  thread écrit la progression.
- Chaque appel ouvre une nouvelle connexion : SQLite gère lui-même le
  pool ; ouvrir/fermer est sub-milliseconde.
- Les ``data`` d'événement et le ``payload`` du job sont stockés en JSON
  texte (``ensure_ascii=False`` pour la lisibilité dans ``sqlite3 .dump``).
"""

from __future__ import annotations

import json
import logging
import os
import sqlite3
import threading
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, Optional

logger = logging.getLogger(__name__)


# Statuts terminaux : un job dans cet état ne peut plus changer.
_TERMINAL_STATUSES: frozenset[str] = frozenset({
    "complete", "error", "cancelled", "interrupted"
})

# Statuts vivants : un job dans cet état est marqué orphelin au boot.
_LIVE_STATUSES: frozenset[str] = frozenset({"pending", "running"})


_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS jobs (
    job_id          TEXT PRIMARY KEY,
    status          TEXT NOT NULL DEFAULT 'pending',
    progress        REAL NOT NULL DEFAULT 0.0,
    current_engine  TEXT NOT NULL DEFAULT '',
    total_docs      INTEGER NOT NULL DEFAULT 0,
    processed_docs  INTEGER NOT NULL DEFAULT 0,
    output_path     TEXT NOT NULL DEFAULT '',
    error           TEXT NOT NULL DEFAULT '',
    payload_json    TEXT NOT NULL DEFAULT '{}',
    created_at      REAL NOT NULL,
    updated_at      REAL NOT NULL,
    finished_at     REAL
);

CREATE TABLE IF NOT EXISTS job_events (
    job_id   TEXT NOT NULL,
    seq      INTEGER NOT NULL,
    kind     TEXT NOT NULL,
    data_json TEXT NOT NULL,
    ts       REAL NOT NULL,
    PRIMARY KEY (job_id, seq)
);

CREATE INDEX IF NOT EXISTS job_events_seq_idx ON job_events(job_id, seq);
CREATE INDEX IF NOT EXISTS jobs_status_idx ON jobs(status);
CREATE INDEX IF NOT EXISTS jobs_created_idx ON jobs(created_at);
"""


def _default_db_path() -> Path:
    """Chemin par défaut, surchargeable via ``PICARONES_JOBS_DB``."""
    env = os.environ.get("PICARONES_JOBS_DB")
    if env:
        return Path(env)
    return Path("./jobs.db")


class JobStore:
    """Backend SQLite thread-safe pour la persistance des jobs."""

    def __init__(self, db_path: str | Path | None = None):
        self._path = Path(db_path) if db_path is not None else _default_db_path()
        self._init_lock = threading.Lock()
        self._init_schema()

    @property
    def path(self) -> Path:
        return self._path

    # ---- Connexion -------------------------------------------------------

    @contextmanager
    def _conn(self) -> Iterator[sqlite3.Connection]:
        # ``check_same_thread=False`` parce qu'on ouvre une connexion par
        # appel — la sécurité vient de ne pas partager la connexion entre
        # threads. ``isolation_level=None`` pour gérer nous-mêmes les
        # transactions via ``BEGIN``/``COMMIT``.
        c = sqlite3.connect(
            str(self._path),
            isolation_level=None,
            check_same_thread=False,
        )
        c.row_factory = sqlite3.Row
        try:
            c.execute("PRAGMA journal_mode = WAL")
            c.execute("PRAGMA synchronous = NORMAL")
            c.execute("PRAGMA foreign_keys = ON")
            yield c
        finally:
            c.close()

    def _init_schema(self) -> None:
        # Crée le parent si l'utilisateur a passé un chemin imbriqué.
        with self._init_lock:
            self._path.parent.mkdir(parents=True, exist_ok=True)
            with self._conn() as c:
                c.executescript(_SCHEMA_SQL)

    # ---- Opérations sur les jobs -----------------------------------------

    def create_job(
        self,
        job_id: Optional[str] = None,
        payload: Optional[dict] = None,
    ) -> str:
        """Crée un job ``pending`` ; retourne son ``job_id``."""
        jid = job_id or str(uuid.uuid4())
        now = time.time()
        with self._conn() as c:
            c.execute(
                """
                INSERT INTO jobs
                  (job_id, status, payload_json, created_at, updated_at)
                VALUES (?, 'pending', ?, ?, ?)
                """,
                (jid, json.dumps(payload or {}, ensure_ascii=False), now, now),
            )
        return jid

    def get_job(self, job_id: str) -> Optional[dict]:
        with self._conn() as c:
            row = c.execute(
                "SELECT * FROM jobs WHERE job_id = ?", (job_id,)
            ).fetchone()
        if row is None:
            return None
        d = dict(row)
        try:
            d["payload"] = json.loads(d.pop("payload_json", "{}"))
        except json.JSONDecodeError:
            d["payload"] = {}
        return d

    def list_jobs(self, limit: int = 100, status: Optional[str] = None) -> list[dict]:
        with self._conn() as c:
            if status:
                rows = c.execute(
                    "SELECT * FROM jobs WHERE status = ? ORDER BY created_at DESC LIMIT ?",
                    (status, limit),
                ).fetchall()
            else:
                rows = c.execute(
                    "SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?", (limit,)
                ).fetchall()
        out = []
        for r in rows:
            d = dict(r)
            try:
                d["payload"] = json.loads(d.pop("payload_json", "{}"))
            except json.JSONDecodeError:
                d["payload"] = {}
            out.append(d)
        return out

    def update_progress(
        self,
        job_id: str,
        *,
        progress: Optional[float] = None,
        current_engine: Optional[str] = None,
        total_docs: Optional[int] = None,
        processed_docs: Optional[int] = None,
        output_path: Optional[str] = None,
    ) -> None:
        """Met à jour les champs de progression d'un job (les ``None`` sont ignorés)."""
        fields: list[str] = []
        values: list[Any] = []
        if progress is not None:
            fields.append("progress = ?")
            values.append(float(progress))
        if current_engine is not None:
            fields.append("current_engine = ?")
            values.append(current_engine)
        if total_docs is not None:
            fields.append("total_docs = ?")
            values.append(int(total_docs))
        if processed_docs is not None:
            fields.append("processed_docs = ?")
            values.append(int(processed_docs))
        if output_path is not None:
            fields.append("output_path = ?")
            values.append(output_path)
        if not fields:
            return
        fields.append("updated_at = ?")
        values.append(time.time())
        values.append(job_id)
        with self._conn() as c:
            c.execute(
                f"UPDATE jobs SET {', '.join(fields)} WHERE job_id = ?",
                values,
            )

    def set_status(
        self,
        job_id: str,
        status: str,
        *,
        error: Optional[str] = None,
    ) -> None:
        now = time.time()
        finished_at = now if status in _TERMINAL_STATUSES else None
        with self._conn() as c:
            c.execute(
                """
                UPDATE jobs
                SET status = ?, error = COALESCE(?, error),
                    updated_at = ?,
                    finished_at = COALESCE(?, finished_at)
                WHERE job_id = ?
                """,
                (status, error, now, finished_at, job_id),
            )

    def mark_orphaned_jobs_interrupted(self) -> int:
        """À appeler au boot : passe en ``interrupted`` les jobs en vie.

        Sous l'hypothèse qu'au boot d'un nouveau processus, *aucun* job
        ne peut être réellement vivant — le précédent worker a forcément
        été tué entre-temps. Retourne le nombre de jobs marqués.
        """
        now = time.time()
        with self._conn() as c:
            cur = c.execute(
                """
                UPDATE jobs
                SET status = 'interrupted',
                    updated_at = ?,
                    finished_at = ?,
                    error = CASE
                      WHEN error = '' THEN 'Job interrompu par redémarrage du serveur.'
                      ELSE error
                    END
                WHERE status IN ('pending', 'running')
                """,
                (now, now),
            )
            count = cur.rowcount
        if count > 0:
            logger.warning(
                "[jobs] %d job(s) orphelin(s) marqué(s) 'interrupted' au boot.", count
            )
        return count

    def cleanup_old(self, retention_days: int = 7) -> int:
        """Supprime les jobs terminés depuis plus de ``retention_days`` jours."""
        cutoff = time.time() - retention_days * 86400.0
        with self._conn() as c:
            cur = c.execute(
                """
                DELETE FROM jobs
                WHERE finished_at IS NOT NULL AND finished_at < ?
                """,
                (cutoff,),
            )
            removed = cur.rowcount
            # Cascade manuelle (pas de FK ON DELETE — schéma léger).
            c.execute(
                """
                DELETE FROM job_events
                WHERE job_id NOT IN (SELECT job_id FROM jobs)
                """
            )
        return removed

    # ---- Événements ------------------------------------------------------

    def append_event(self, job_id: str, kind: str, data: Any) -> int:
        """Ajoute un événement et retourne son numéro de séquence (>= 1)."""
        with self._conn() as c:
            row = c.execute(
                "SELECT COALESCE(MAX(seq), 0) FROM job_events WHERE job_id = ?",
                (job_id,),
            ).fetchone()
            next_seq = int(row[0]) + 1 if row else 1
            c.execute(
                """
                INSERT INTO job_events (job_id, seq, kind, data_json, ts)
                VALUES (?, ?, ?, ?, ?)
                """,
                (job_id, next_seq, kind, json.dumps(data, ensure_ascii=False), time.time()),
            )
        return next_seq

    def get_events_after(self, job_id: str, last_seq: int = 0) -> list[dict]:
        """Retourne les événements ``seq > last_seq``, triés croissant."""
        with self._conn() as c:
            rows = c.execute(
                """
                SELECT seq, kind, data_json, ts
                FROM job_events
                WHERE job_id = ? AND seq > ?
                ORDER BY seq ASC
                """,
                (job_id, int(last_seq)),
            ).fetchall()
        out: list[dict] = []
        for r in rows:
            try:
                data = json.loads(r["data_json"])
            except json.JSONDecodeError:
                data = {}
            out.append({
                "seq": int(r["seq"]),
                "kind": r["kind"],
                "data": data,
                "ts": float(r["ts"]),
            })
        return out

    def count_events(self, job_id: str) -> int:
        with self._conn() as c:
            row = c.execute(
                "SELECT COUNT(*) FROM job_events WHERE job_id = ?", (job_id,)
            ).fetchone()
        return int(row[0]) if row else 0


# ---------------------------------------------------------------------------
# Singleton paresseux — facilite l'import depuis web/app.py.
# ---------------------------------------------------------------------------

_default_store: Optional[JobStore] = None
_default_lock = threading.Lock()


def get_default_store() -> JobStore:
    """Retourne (ou crée) le ``JobStore`` par défaut.

    Le chemin est lu depuis ``PICARONES_JOBS_DB`` à la première création.
    """
    global _default_store
    with _default_lock:
        if _default_store is None:
            _default_store = JobStore()
        return _default_store


def reset_default_store() -> None:
    """Réinitialise le store par défaut (utilisé par les tests)."""
    global _default_store
    with _default_lock:
        _default_store = None