Spaces:
Sleeping
Sleeping
| """Persistance SQLite des jobs de benchmark (Sprint 26). | |
| Avant le Sprint 26, l'état des benchmarks vivait uniquement en mémoire dans | |
| ``picarones.web.app._JOBS``. Trois conséquences : | |
| 1. Un redémarrage du worker uvicorn (OOM, déploiement, ``kill -HUP``) | |
| perdait l'état de tous les benchmarks en cours et un client SSE qui se | |
| reconnectait recevait un ``404`` cohérent. | |
| 2. Le ``asyncio.Queue(maxsize=200)`` des SSE perdait silencieusement des | |
| événements (``put_nowait`` swallow ``QueueFull``). | |
| 3. Aucune trace pour debug si un benchmark se figeait — pas d'historique | |
| au-delà de ce que ``BenchmarkJob.events`` portait en RAM. | |
| Le Sprint 26 adresse les trois en persistant les jobs et leurs événements | |
| dans une base SQLite locale (cohérent avec ``picarones.core.history``, | |
| qui utilise déjà SQLite). La base joue trois rôles : | |
| - **Source de vérité** pour le statut/progression d'un job — ``BenchmarkJob`` | |
| reste comme cache RAM mais n'est plus le ground truth. | |
| - **Backlog d'événements** pour les SSE — un client qui reprend la | |
| connexion envoie ``Last-Event-ID`` et reçoit tous les événements de | |
| ``seq > last_seq`` puis bascule en streaming live. | |
| - **Détection des jobs orphelins** au boot — tout job ``running`` à | |
| l'initialisation de l'app est marqué ``interrupted`` (le processus | |
| précédent est mort sans le finir). | |
| Conventions | |
| ----------- | |
| - Une seule base par instance (``./jobs.db`` par défaut, configurable via | |
| l'env var ``PICARONES_JOBS_DB``). | |
| - Mode WAL activé pour permettre les lectures concurrentes pendant qu'un | |
| thread écrit la progression. | |
| - Chaque appel ouvre une nouvelle connexion : SQLite gère lui-même le | |
| pool ; ouvrir/fermer est sub-milliseconde. | |
| - Les ``data`` d'événement et le ``payload`` du job sont stockés en JSON | |
| texte (``ensure_ascii=False`` pour la lisibilité dans ``sqlite3 .dump``). | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import sqlite3 | |
| import threading | |
| import time | |
| import uuid | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| from typing import Any, Iterator, Optional | |
| logger = logging.getLogger(__name__) | |
| # Statuts terminaux : un job dans cet état ne peut plus changer. | |
| _TERMINAL_STATUSES: frozenset[str] = frozenset({ | |
| "complete", "error", "cancelled", "interrupted" | |
| }) | |
| # Statuts vivants : un job dans cet état est marqué orphelin au boot. | |
| _LIVE_STATUSES: frozenset[str] = frozenset({"pending", "running"}) | |
| _SCHEMA_SQL = """ | |
| CREATE TABLE IF NOT EXISTS jobs ( | |
| job_id TEXT PRIMARY KEY, | |
| status TEXT NOT NULL DEFAULT 'pending', | |
| progress REAL NOT NULL DEFAULT 0.0, | |
| current_engine TEXT NOT NULL DEFAULT '', | |
| total_docs INTEGER NOT NULL DEFAULT 0, | |
| processed_docs INTEGER NOT NULL DEFAULT 0, | |
| output_path TEXT NOT NULL DEFAULT '', | |
| error TEXT NOT NULL DEFAULT '', | |
| payload_json TEXT NOT NULL DEFAULT '{}', | |
| created_at REAL NOT NULL, | |
| updated_at REAL NOT NULL, | |
| finished_at REAL | |
| ); | |
| CREATE TABLE IF NOT EXISTS job_events ( | |
| job_id TEXT NOT NULL, | |
| seq INTEGER NOT NULL, | |
| kind TEXT NOT NULL, | |
| data_json TEXT NOT NULL, | |
| ts REAL NOT NULL, | |
| PRIMARY KEY (job_id, seq) | |
| ); | |
| CREATE INDEX IF NOT EXISTS job_events_seq_idx ON job_events(job_id, seq); | |
| CREATE INDEX IF NOT EXISTS jobs_status_idx ON jobs(status); | |
| CREATE INDEX IF NOT EXISTS jobs_created_idx ON jobs(created_at); | |
| """ | |
| def _default_db_path() -> Path: | |
| """Chemin par défaut, surchargeable via ``PICARONES_JOBS_DB``.""" | |
| env = os.environ.get("PICARONES_JOBS_DB") | |
| if env: | |
| return Path(env) | |
| return Path("./jobs.db") | |
| class JobStore: | |
| """Backend SQLite thread-safe pour la persistance des jobs.""" | |
| def __init__(self, db_path: str | Path | None = None): | |
| self._path = Path(db_path) if db_path is not None else _default_db_path() | |
| self._init_lock = threading.Lock() | |
| self._init_schema() | |
| def path(self) -> Path: | |
| return self._path | |
| # ---- Connexion ------------------------------------------------------- | |
| def _conn(self) -> Iterator[sqlite3.Connection]: | |
| # ``check_same_thread=False`` parce qu'on ouvre une connexion par | |
| # appel — la sécurité vient de ne pas partager la connexion entre | |
| # threads. ``isolation_level=None`` pour gérer nous-mêmes les | |
| # transactions via ``BEGIN``/``COMMIT``. | |
| c = sqlite3.connect( | |
| str(self._path), | |
| isolation_level=None, | |
| check_same_thread=False, | |
| ) | |
| c.row_factory = sqlite3.Row | |
| try: | |
| c.execute("PRAGMA journal_mode = WAL") | |
| c.execute("PRAGMA synchronous = NORMAL") | |
| c.execute("PRAGMA foreign_keys = ON") | |
| yield c | |
| finally: | |
| c.close() | |
| def _init_schema(self) -> None: | |
| # Crée le parent si l'utilisateur a passé un chemin imbriqué. | |
| with self._init_lock: | |
| self._path.parent.mkdir(parents=True, exist_ok=True) | |
| with self._conn() as c: | |
| c.executescript(_SCHEMA_SQL) | |
| # ---- Opérations sur les jobs ----------------------------------------- | |
| def create_job( | |
| self, | |
| job_id: Optional[str] = None, | |
| payload: Optional[dict] = None, | |
| ) -> str: | |
| """Crée un job ``pending`` ; retourne son ``job_id``.""" | |
| jid = job_id or str(uuid.uuid4()) | |
| now = time.time() | |
| with self._conn() as c: | |
| c.execute( | |
| """ | |
| INSERT INTO jobs | |
| (job_id, status, payload_json, created_at, updated_at) | |
| VALUES (?, 'pending', ?, ?, ?) | |
| """, | |
| (jid, json.dumps(payload or {}, ensure_ascii=False), now, now), | |
| ) | |
| return jid | |
| def get_job(self, job_id: str) -> Optional[dict]: | |
| with self._conn() as c: | |
| row = c.execute( | |
| "SELECT * FROM jobs WHERE job_id = ?", (job_id,) | |
| ).fetchone() | |
| if row is None: | |
| return None | |
| d = dict(row) | |
| try: | |
| d["payload"] = json.loads(d.pop("payload_json", "{}")) | |
| except json.JSONDecodeError: | |
| d["payload"] = {} | |
| return d | |
| def list_jobs(self, limit: int = 100, status: Optional[str] = None) -> list[dict]: | |
| with self._conn() as c: | |
| if status: | |
| rows = c.execute( | |
| "SELECT * FROM jobs WHERE status = ? ORDER BY created_at DESC LIMIT ?", | |
| (status, limit), | |
| ).fetchall() | |
| else: | |
| rows = c.execute( | |
| "SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?", (limit,) | |
| ).fetchall() | |
| out = [] | |
| for r in rows: | |
| d = dict(r) | |
| try: | |
| d["payload"] = json.loads(d.pop("payload_json", "{}")) | |
| except json.JSONDecodeError: | |
| d["payload"] = {} | |
| out.append(d) | |
| return out | |
| def update_progress( | |
| self, | |
| job_id: str, | |
| *, | |
| progress: Optional[float] = None, | |
| current_engine: Optional[str] = None, | |
| total_docs: Optional[int] = None, | |
| processed_docs: Optional[int] = None, | |
| output_path: Optional[str] = None, | |
| ) -> None: | |
| """Met à jour les champs de progression d'un job (les ``None`` sont ignorés).""" | |
| fields: list[str] = [] | |
| values: list[Any] = [] | |
| if progress is not None: | |
| fields.append("progress = ?") | |
| values.append(float(progress)) | |
| if current_engine is not None: | |
| fields.append("current_engine = ?") | |
| values.append(current_engine) | |
| if total_docs is not None: | |
| fields.append("total_docs = ?") | |
| values.append(int(total_docs)) | |
| if processed_docs is not None: | |
| fields.append("processed_docs = ?") | |
| values.append(int(processed_docs)) | |
| if output_path is not None: | |
| fields.append("output_path = ?") | |
| values.append(output_path) | |
| if not fields: | |
| return | |
| fields.append("updated_at = ?") | |
| values.append(time.time()) | |
| values.append(job_id) | |
| with self._conn() as c: | |
| c.execute( | |
| f"UPDATE jobs SET {', '.join(fields)} WHERE job_id = ?", | |
| values, | |
| ) | |
| def set_status( | |
| self, | |
| job_id: str, | |
| status: str, | |
| *, | |
| error: Optional[str] = None, | |
| ) -> None: | |
| now = time.time() | |
| finished_at = now if status in _TERMINAL_STATUSES else None | |
| with self._conn() as c: | |
| c.execute( | |
| """ | |
| UPDATE jobs | |
| SET status = ?, error = COALESCE(?, error), | |
| updated_at = ?, | |
| finished_at = COALESCE(?, finished_at) | |
| WHERE job_id = ? | |
| """, | |
| (status, error, now, finished_at, job_id), | |
| ) | |
| def mark_orphaned_jobs_interrupted(self) -> int: | |
| """À appeler au boot : passe en ``interrupted`` les jobs en vie. | |
| Sous l'hypothèse qu'au boot d'un nouveau processus, *aucun* job | |
| ne peut être réellement vivant — le précédent worker a forcément | |
| été tué entre-temps. Retourne le nombre de jobs marqués. | |
| """ | |
| now = time.time() | |
| with self._conn() as c: | |
| cur = c.execute( | |
| """ | |
| UPDATE jobs | |
| SET status = 'interrupted', | |
| updated_at = ?, | |
| finished_at = ?, | |
| error = CASE | |
| WHEN error = '' THEN 'Job interrompu par redémarrage du serveur.' | |
| ELSE error | |
| END | |
| WHERE status IN ('pending', 'running') | |
| """, | |
| (now, now), | |
| ) | |
| count = cur.rowcount | |
| if count > 0: | |
| logger.warning( | |
| "[jobs] %d job(s) orphelin(s) marqué(s) 'interrupted' au boot.", count | |
| ) | |
| return count | |
| def cleanup_old(self, retention_days: int = 7) -> int: | |
| """Supprime les jobs terminés depuis plus de ``retention_days`` jours.""" | |
| cutoff = time.time() - retention_days * 86400.0 | |
| with self._conn() as c: | |
| cur = c.execute( | |
| """ | |
| DELETE FROM jobs | |
| WHERE finished_at IS NOT NULL AND finished_at < ? | |
| """, | |
| (cutoff,), | |
| ) | |
| removed = cur.rowcount | |
| # Cascade manuelle (pas de FK ON DELETE — schéma léger). | |
| c.execute( | |
| """ | |
| DELETE FROM job_events | |
| WHERE job_id NOT IN (SELECT job_id FROM jobs) | |
| """ | |
| ) | |
| return removed | |
| # ---- Événements ------------------------------------------------------ | |
| def append_event(self, job_id: str, kind: str, data: Any) -> int: | |
| """Ajoute un événement et retourne son numéro de séquence (>= 1).""" | |
| with self._conn() as c: | |
| row = c.execute( | |
| "SELECT COALESCE(MAX(seq), 0) FROM job_events WHERE job_id = ?", | |
| (job_id,), | |
| ).fetchone() | |
| next_seq = int(row[0]) + 1 if row else 1 | |
| c.execute( | |
| """ | |
| INSERT INTO job_events (job_id, seq, kind, data_json, ts) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, | |
| (job_id, next_seq, kind, json.dumps(data, ensure_ascii=False), time.time()), | |
| ) | |
| return next_seq | |
| def get_events_after(self, job_id: str, last_seq: int = 0) -> list[dict]: | |
| """Retourne les événements ``seq > last_seq``, triés croissant.""" | |
| with self._conn() as c: | |
| rows = c.execute( | |
| """ | |
| SELECT seq, kind, data_json, ts | |
| FROM job_events | |
| WHERE job_id = ? AND seq > ? | |
| ORDER BY seq ASC | |
| """, | |
| (job_id, int(last_seq)), | |
| ).fetchall() | |
| out: list[dict] = [] | |
| for r in rows: | |
| try: | |
| data = json.loads(r["data_json"]) | |
| except json.JSONDecodeError: | |
| data = {} | |
| out.append({ | |
| "seq": int(r["seq"]), | |
| "kind": r["kind"], | |
| "data": data, | |
| "ts": float(r["ts"]), | |
| }) | |
| return out | |
| def count_events(self, job_id: str) -> int: | |
| with self._conn() as c: | |
| row = c.execute( | |
| "SELECT COUNT(*) FROM job_events WHERE job_id = ?", (job_id,) | |
| ).fetchone() | |
| return int(row[0]) if row else 0 | |
| # --------------------------------------------------------------------------- | |
| # Singleton paresseux — facilite l'import depuis web/app.py. | |
| # --------------------------------------------------------------------------- | |
| _default_store: Optional[JobStore] = None | |
| _default_lock = threading.Lock() | |
| def get_default_store() -> JobStore: | |
| """Retourne (ou crée) le ``JobStore`` par défaut. | |
| Le chemin est lu depuis ``PICARONES_JOBS_DB`` à la première création. | |
| """ | |
| global _default_store | |
| with _default_lock: | |
| if _default_store is None: | |
| _default_store = JobStore() | |
| return _default_store | |
| def reset_default_store() -> None: | |
| """Réinitialise le store par défaut (utilisé par les tests).""" | |
| global _default_store | |
| with _default_lock: | |
| _default_store = None | |