Picarones / tests /adapters /storage /test_job_store_migrations.py
Claude
refactor: audit S58 — 2 HIGH + 4 MED + 4 LOW bricolages corrigés
c21d686 unverified
Raw
History Blame
4.5 kB
"""Garde-fous sur la stratégie de migration de schéma du ``JobStore``.
L'audit S58 a identifié que la table ``schema_version`` était une
coquille vide : aucun dispatcher de migrations, aucun warning si
``existing < SCHEMA_VERSION``, aucun test E2E. Ces tests verrouillent
le contrat :
1. Si ``SCHEMA_VERSION = N``, alors ``_MIGRATIONS`` doit contenir
les clés ``0..N-1`` (toute base v0..N-1 doit pouvoir migrer
ascendamment vers N).
2. Une base à une version intermédiaire est migrée jusqu'à
``SCHEMA_VERSION``.
3. Une migration manquante est une erreur dure (pas un warning).
"""
from __future__ import annotations
import sqlite3
from pathlib import Path
import pytest
from picarones.adapters.storage.job_store import (
_MIGRATIONS,
JobStore,
JobStoreError,
)
def test_migrations_dispatcher_covers_all_intermediate_versions() -> None:
"""``_MIGRATIONS`` couvre toutes les transitions ``v_n → v_{n+1}``
pour ``n`` de 1 à ``SCHEMA_VERSION - 1``.
Si ``SCHEMA_VERSION = 1``, le dispatcher peut être vide (pas
encore de migrations). Si ``SCHEMA_VERSION = 3``, le dispatcher
doit avoir les clés 1 et 2.
"""
for from_v in range(1, JobStore.SCHEMA_VERSION):
assert from_v in _MIGRATIONS, (
f"Migration manquante : v{from_v} → v{from_v + 1}. "
f"SCHEMA_VERSION = {JobStore.SCHEMA_VERSION} mais "
f"``_MIGRATIONS[{from_v}]`` est absent."
)
def test_fresh_db_writes_current_schema_version(tmp_path: Path) -> None:
"""Une DB neuve persiste ``SCHEMA_VERSION`` en clair."""
JobStore(tmp_path / "fresh.sqlite")
with sqlite3.connect(str(tmp_path / "fresh.sqlite")) as conn:
cur = conn.execute("SELECT version FROM schema_version")
version = cur.fetchone()[0]
assert version == JobStore.SCHEMA_VERSION
def test_db_at_current_version_opens_idempotently(tmp_path: Path) -> None:
"""Réouvrir une DB à la même version est un no-op (pas de
double-INSERT, pas de migration spurieuse).
"""
db = tmp_path / "idem.sqlite"
JobStore(db)
JobStore(db) # ne doit pas lever
with sqlite3.connect(str(db)) as conn:
cur = conn.execute("SELECT COUNT(*) FROM schema_version")
n = cur.fetchone()[0]
assert n == 1, "schema_version ne doit avoir qu'une ligne."
def test_db_at_future_version_rejected(tmp_path: Path) -> None:
"""Une DB écrite par un binaire futur est rejetée (downgrade
non supporté)."""
db = tmp_path / "future.sqlite"
JobStore(db)
with sqlite3.connect(str(db)) as conn:
conn.execute(
"UPDATE schema_version SET version = ?",
(JobStore.SCHEMA_VERSION + 99,),
)
conn.commit()
with pytest.raises(JobStoreError, match="Downgrade non supporté"):
JobStore(db)
def test_missing_migration_is_hard_error(tmp_path: Path) -> None:
"""Si ``existing < SCHEMA_VERSION`` mais qu'aucune migration n'est
enregistrée pour la version intermédiaire, ``JobStoreError``.
Ce test simule SCHEMA_VERSION = 99 sans entrée dans _MIGRATIONS
en patchant directement. Garantie : on ne laisse jamais une base
dans un état schématiquement incohérent silencieusement.
"""
db = tmp_path / "stale.sqlite"
JobStore(db) # crée v1
# Patch in-test : prétendons que le code attend v99.
original = JobStore.SCHEMA_VERSION
JobStore.SCHEMA_VERSION = 99
try:
with pytest.raises(JobStoreError, match="migration manquante"):
JobStore(db)
finally:
JobStore.SCHEMA_VERSION = original
def test_migration_chain_applied(tmp_path: Path) -> None:
"""Si SCHEMA_VERSION saute de N versions, toutes les migrations
intermédiaires sont appliquées dans l'ordre.
Simule une migration v1 → v2 fictive enregistrée temporairement.
"""
db = tmp_path / "chain.sqlite"
JobStore(db) # v1
applied: list[int] = []
def fake_v1_to_v2(conn: sqlite3.Connection) -> None:
applied.append(1)
original_version = JobStore.SCHEMA_VERSION
JobStore.SCHEMA_VERSION = 2
_MIGRATIONS[1] = fake_v1_to_v2
try:
JobStore(db) # déclenche v1 → v2
assert applied == [1]
with sqlite3.connect(str(db)) as conn:
cur = conn.execute("SELECT version FROM schema_version")
assert cur.fetchone()[0] == 2
finally:
JobStore.SCHEMA_VERSION = original_version
_MIGRATIONS.pop(1, None)