Picarones / tests /web /test_sprint26_jobs_persistence.py
Claude
feat(sprint-F)!: web/ → interfaces/web/_legacy/ (Phase 9)
f53c0aa unverified
raw
history blame
14.6 kB
"""Tests Sprint 26 — persistance SQLite des jobs + reprise SSE.
Le Sprint 26 introduit :
1. ``picarones/core/jobs.py`` — ``JobStore`` SQLite avec API
``create_job``, ``update_progress``, ``set_status``,
``append_event``, ``get_events_after``, ``get_job``,
``mark_orphaned_jobs_interrupted``, ``cleanup_old``.
2. Intégration dans ``picarones/web/app.py`` :
- ``BenchmarkJob`` reçoit un ``_store`` et persiste ses événements
+ sa progression à chaque ``add_event``.
- ``set_status`` remplace les mutations directes ``job.status = ...``
pour assurer la cohérence DB.
- SSE endpoint accepte le header ``Last-Event-ID`` et rejoue depuis
la DB ; émet ``id: <seq>`` dans chaque message.
- ``GET /status`` fallback à la DB si le job n'est plus en RAM.
- Hook ``startup`` marque les jobs vivants en base comme
``interrupted``.
"""
from __future__ import annotations
import sqlite3
import time
import pytest
from fastapi.testclient import TestClient
from picarones.interfaces.web._legacy.jobs import JobStore, get_default_store, reset_default_store
# ---------------------------------------------------------------------------
# 1. JobStore CRUD basique
# ---------------------------------------------------------------------------
@pytest.fixture
def store(tmp_path) -> JobStore:
return JobStore(tmp_path / "jobs.db")
class TestJobStoreCRUD:
def test_create_returns_uuid_when_no_id(self, store):
jid = store.create_job()
assert isinstance(jid, str) and len(jid) == 36 # UUID4 canonique
def test_create_uses_provided_id(self, store):
jid = store.create_job(job_id="custom-id-42")
assert jid == "custom-id-42"
def test_get_returns_none_for_unknown(self, store):
assert store.get_job("ne-pas-exister") is None
def test_get_returns_pending_status_after_create(self, store):
jid = store.create_job()
job = store.get_job(jid)
assert job is not None
assert job["status"] == "pending"
assert job["progress"] == 0.0
assert job["payload"] == {}
def test_payload_round_trip(self, store):
payload = {"engines": ["tesseract"], "lang": "fra", "n": 12}
jid = store.create_job(payload=payload)
assert store.get_job(jid)["payload"] == payload
def test_update_progress_partial(self, store):
jid = store.create_job()
store.update_progress(jid, progress=0.5, current_engine="tesseract")
job = store.get_job(jid)
assert job["progress"] == 0.5
assert job["current_engine"] == "tesseract"
# Les champs non passés restent à zéro
assert job["total_docs"] == 0
def test_set_status_to_complete_sets_finished_at(self, store):
jid = store.create_job()
before = time.time()
store.set_status(jid, "complete")
job = store.get_job(jid)
assert job["status"] == "complete"
assert job["finished_at"] is not None
assert job["finished_at"] >= before
def test_set_status_running_does_not_set_finished(self, store):
jid = store.create_job()
store.set_status(jid, "running")
assert store.get_job(jid)["finished_at"] is None
def test_set_status_with_error_message(self, store):
jid = store.create_job()
store.set_status(jid, "error", error="OOM dans Tesseract")
job = store.get_job(jid)
assert job["status"] == "error"
assert "OOM" in job["error"]
def test_list_jobs_returns_descending(self, store):
jids = [store.create_job() for _ in range(3)]
# Petit délai pour différencier les ``created_at``
listed = [j["job_id"] for j in store.list_jobs()]
# Les plus récents en tête
assert listed[0] == jids[-1]
# ---------------------------------------------------------------------------
# 2. Événements et reprise via seq
# ---------------------------------------------------------------------------
class TestJobStoreEvents:
def test_append_event_returns_increasing_seq(self, store):
jid = store.create_job()
s1 = store.append_event(jid, "log", {"msg": "a"})
s2 = store.append_event(jid, "log", {"msg": "b"})
s3 = store.append_event(jid, "log", {"msg": "c"})
assert (s1, s2, s3) == (1, 2, 3)
def test_append_event_isolates_per_job(self, store):
a = store.create_job()
b = store.create_job()
store.append_event(a, "log", {})
store.append_event(a, "log", {})
store.append_event(b, "log", {})
# Le seq de chaque job redémarre à 1
evs_b = store.get_events_after(b, last_seq=0)
assert [e["seq"] for e in evs_b] == [1]
def test_get_events_after_skips_seen(self, store):
jid = store.create_job()
for i in range(5):
store.append_event(jid, "log", {"i": i})
new = store.get_events_after(jid, last_seq=2)
assert [e["seq"] for e in new] == [3, 4, 5]
assert new[0]["data"] == {"i": 2}
def test_get_events_after_zero_returns_all(self, store):
jid = store.create_job()
store.append_event(jid, "log", {})
store.append_event(jid, "log", {})
assert len(store.get_events_after(jid, last_seq=0)) == 2
def test_get_events_after_handles_unicode(self, store):
jid = store.create_job()
store.append_event(jid, "log", {"msg": "Médiéval & œ"})
ev = store.get_events_after(jid, last_seq=0)[0]
assert ev["data"]["msg"] == "Médiéval & œ"
def test_count_events(self, store):
jid = store.create_job()
for _ in range(7):
store.append_event(jid, "log", {})
assert store.count_events(jid) == 7
# ---------------------------------------------------------------------------
# 3. Détection des orphelins au boot
# ---------------------------------------------------------------------------
class TestMarkOrphanedJobs:
def test_running_job_becomes_interrupted(self, store):
jid = store.create_job()
store.set_status(jid, "running")
n = store.mark_orphaned_jobs_interrupted()
assert n == 1
job = store.get_job(jid)
assert job["status"] == "interrupted"
assert "interrompu" in job["error"].lower()
assert job["finished_at"] is not None
def test_completed_job_is_not_touched(self, store):
jid = store.create_job()
store.set_status(jid, "complete")
before = store.get_job(jid)["finished_at"]
store.mark_orphaned_jobs_interrupted()
after = store.get_job(jid)
assert after["status"] == "complete"
assert after["finished_at"] == before
def test_no_running_jobs_returns_zero(self, store):
store.create_job() # pending
n = store.mark_orphaned_jobs_interrupted()
# 'pending' est aussi considéré vivant — il devient interrupted lui aussi
assert n == 1
def test_existing_error_message_preserved(self, store):
jid = store.create_job()
store.set_status(jid, "running")
# Simule une erreur déjà enregistrée par une autre route
with sqlite3.connect(str(store.path), isolation_level=None) as c:
c.execute("UPDATE jobs SET error = ? WHERE job_id = ?", ("ma erreur", jid))
store.mark_orphaned_jobs_interrupted()
# L'erreur existante n'est pas écrasée par "interrompu par redémarrage"
assert store.get_job(jid)["error"] == "ma erreur"
# ---------------------------------------------------------------------------
# 4. Cleanup
# ---------------------------------------------------------------------------
class TestCleanup:
def test_old_finished_jobs_removed(self, store):
jid = store.create_job()
store.set_status(jid, "complete")
# Antedate la fin pour simuler un vieux job
with sqlite3.connect(str(store.path), isolation_level=None) as c:
c.execute(
"UPDATE jobs SET finished_at = ? WHERE job_id = ?",
(time.time() - 30 * 86400, jid),
)
removed = store.cleanup_old(retention_days=7)
assert removed == 1
assert store.get_job(jid) is None
def test_recent_jobs_kept(self, store):
jid = store.create_job()
store.set_status(jid, "complete")
removed = store.cleanup_old(retention_days=7)
assert removed == 0
assert store.get_job(jid) is not None
def test_running_jobs_never_cleaned(self, store):
jid = store.create_job()
# finished_at IS NULL
store.cleanup_old(retention_days=0)
assert store.get_job(jid) is not None
# ---------------------------------------------------------------------------
# 5. Singleton paresseux
# ---------------------------------------------------------------------------
class TestDefaultStore:
def test_default_store_is_singleton(self, monkeypatch, tmp_path):
monkeypatch.setenv("PICARONES_JOBS_DB", str(tmp_path / "x.db"))
reset_default_store()
a = get_default_store()
b = get_default_store()
assert a is b
reset_default_store()
def test_env_var_drives_path(self, monkeypatch, tmp_path):
target = tmp_path / "custom.db"
monkeypatch.setenv("PICARONES_JOBS_DB", str(target))
reset_default_store()
s = get_default_store()
assert s.path == target
s.create_job()
assert target.exists()
reset_default_store()
# ---------------------------------------------------------------------------
# 6. Intégration FastAPI : SSE Last-Event-ID + status fallback
# ---------------------------------------------------------------------------
@pytest.fixture
def client_with_isolated_store(monkeypatch, tmp_path):
"""Réinitialise le ``JOB_STORE`` global de l'app vers un fichier vierge."""
db = tmp_path / "jobs.db"
monkeypatch.setenv("PICARONES_JOBS_DB", str(db))
reset_default_store()
from picarones.interfaces.web._legacy import app as web_app
from picarones.interfaces.web._legacy import state as web_state
web_state.JOB_STORE = get_default_store()
# Vide aussi le cache RAM des jobs
web_state.JOBS.clear()
return TestClient(web_app.app), web_state
class TestStatusFallbackToDB:
def test_status_falls_back_to_db_after_ram_eviction(self, client_with_isolated_store):
client, web_state = client_with_isolated_store
# Crée un job directement en base (simule un job d'un précédent worker)
jid = web_state.JOB_STORE.create_job(job_id="ghost-1")
web_state.JOB_STORE.set_status(jid, "complete")
web_state.JOB_STORE.update_progress(jid, progress=1.0, total_docs=10, processed_docs=10)
r = client.get(f"/api/benchmark/{jid}/status")
assert r.status_code == 200, r.text
d = r.json()
assert d["job_id"] == jid
assert d["status"] == "complete"
assert d["progress"] == 1.0
assert d["total_docs"] == 10
def test_status_404_for_truly_unknown_job(self, client_with_isolated_store):
client, _ = client_with_isolated_store
r = client.get("/api/benchmark/never-existed/status")
assert r.status_code == 404
class TestSSEReplay:
def test_sse_replays_backlog_for_finished_job(self, client_with_isolated_store):
client, web_state = client_with_isolated_store
jid = web_state.JOB_STORE.create_job(job_id="replay-1")
web_state.JOB_STORE.append_event(jid, "log", {"msg": "hello"})
web_state.JOB_STORE.append_event(jid, "log", {"msg": "world"})
web_state.JOB_STORE.set_status(jid, "complete")
web_state.JOB_STORE.append_event(jid, "complete", {"output_html": "/tmp/x.html"})
with client.stream("GET", f"/api/benchmark/{jid}/stream") as r:
assert r.status_code == 200
text = "".join(chunk for chunk in r.iter_text())
assert "hello" in text
assert "world" in text
# Les seq doivent être présents (Last-Event-ID resumability)
assert "id: 1" in text
assert "id: 2" in text
# Marqueur de fin
assert "event: done" in text or "event: complete" in text
def test_sse_resumes_from_last_event_id(self, client_with_isolated_store):
client, web_state = client_with_isolated_store
jid = web_state.JOB_STORE.create_job(job_id="resume-1")
for i in range(5):
web_state.JOB_STORE.append_event(jid, "log", {"i": i})
web_state.JOB_STORE.set_status(jid, "complete")
# Reprise depuis seq=3 — on doit recevoir uniquement 4 et 5.
with client.stream(
"GET",
f"/api/benchmark/{jid}/stream",
headers={"Last-Event-ID": "3"},
) as r:
text = "".join(chunk for chunk in r.iter_text())
assert "id: 4" in text
assert "id: 5" in text
# Les anciens événements ne doivent PAS être réémis
assert "id: 1" not in text
assert "id: 2" not in text
def test_sse_invalid_last_event_id_falls_back_to_zero(self, client_with_isolated_store):
client, web_state = client_with_isolated_store
jid = web_state.JOB_STORE.create_job(job_id="bad-header")
web_state.JOB_STORE.append_event(jid, "log", {"i": 1})
web_state.JOB_STORE.set_status(jid, "complete")
with client.stream(
"GET",
f"/api/benchmark/{jid}/stream",
headers={"Last-Event-ID": "not-a-number"},
) as r:
text = "".join(chunk for chunk in r.iter_text())
# Doit envoyer le backlog complet sans crasher
assert "id: 1" in text
class TestStartupOrphansHook:
def test_startup_marks_running_jobs_interrupted(self, monkeypatch, tmp_path):
db = tmp_path / "jobs.db"
# Préparer la DB avec un job 'running' avant import de l'app
s = JobStore(db)
jid = s.create_job(job_id="orphan-1")
s.set_status(jid, "running")
monkeypatch.setenv("PICARONES_JOBS_DB", str(db))
reset_default_store()
# Forcer le startup hook via TestClient context manager
from picarones.interfaces.web._legacy import app as web_app
from picarones.interfaces.web._legacy import state as web_state
web_state.JOB_STORE = get_default_store()
with TestClient(web_app.app):
pass # __enter__ déclenche startup, __exit__ shutdown
job = web_state.JOB_STORE.get_job(jid)
assert job["status"] == "interrupted"