Spaces:
Sleeping
Sleeping
| """Garde-fous sur la stratégie de migration de schéma du ``JobStore``. | |
| L'audit S58 a identifié que la table ``schema_version`` était une | |
| coquille vide : aucun dispatcher de migrations, aucun warning si | |
| ``existing < SCHEMA_VERSION``, aucun test E2E. Ces tests verrouillent | |
| le contrat : | |
| 1. Si ``SCHEMA_VERSION = N``, alors ``_MIGRATIONS`` doit contenir | |
| les clés ``0..N-1`` (toute base v0..N-1 doit pouvoir migrer | |
| ascendamment vers N). | |
| 2. Une base à une version intermédiaire est migrée jusqu'à | |
| ``SCHEMA_VERSION``. | |
| 3. Une migration manquante est une erreur dure (pas un warning). | |
| """ | |
| from __future__ import annotations | |
| import sqlite3 | |
| from pathlib import Path | |
| import pytest | |
| from picarones.adapters.storage.job_store import ( | |
| _MIGRATIONS, | |
| JobStore, | |
| JobStoreError, | |
| ) | |
| def test_migrations_dispatcher_covers_all_intermediate_versions() -> None: | |
| """``_MIGRATIONS`` couvre toutes les transitions ``v_n → v_{n+1}`` | |
| pour ``n`` de 1 à ``SCHEMA_VERSION - 1``. | |
| Si ``SCHEMA_VERSION = 1``, le dispatcher peut être vide (pas | |
| encore de migrations). Si ``SCHEMA_VERSION = 3``, le dispatcher | |
| doit avoir les clés 1 et 2. | |
| """ | |
| for from_v in range(1, JobStore.SCHEMA_VERSION): | |
| assert from_v in _MIGRATIONS, ( | |
| f"Migration manquante : v{from_v} → v{from_v + 1}. " | |
| f"SCHEMA_VERSION = {JobStore.SCHEMA_VERSION} mais " | |
| f"``_MIGRATIONS[{from_v}]`` est absent." | |
| ) | |
| def test_fresh_db_writes_current_schema_version(tmp_path: Path) -> None: | |
| """Une DB neuve persiste ``SCHEMA_VERSION`` en clair.""" | |
| JobStore(tmp_path / "fresh.sqlite") | |
| with sqlite3.connect(str(tmp_path / "fresh.sqlite")) as conn: | |
| cur = conn.execute("SELECT version FROM schema_version") | |
| version = cur.fetchone()[0] | |
| assert version == JobStore.SCHEMA_VERSION | |
| def test_db_at_current_version_opens_idempotently(tmp_path: Path) -> None: | |
| """Réouvrir une DB à la même version est un no-op (pas de | |
| double-INSERT, pas de migration spurieuse). | |
| """ | |
| db = tmp_path / "idem.sqlite" | |
| JobStore(db) | |
| JobStore(db) # ne doit pas lever | |
| with sqlite3.connect(str(db)) as conn: | |
| cur = conn.execute("SELECT COUNT(*) FROM schema_version") | |
| n = cur.fetchone()[0] | |
| assert n == 1, "schema_version ne doit avoir qu'une ligne." | |
| def test_db_at_future_version_rejected(tmp_path: Path) -> None: | |
| """Une DB écrite par un binaire futur est rejetée (downgrade | |
| non supporté).""" | |
| db = tmp_path / "future.sqlite" | |
| JobStore(db) | |
| with sqlite3.connect(str(db)) as conn: | |
| conn.execute( | |
| "UPDATE schema_version SET version = ?", | |
| (JobStore.SCHEMA_VERSION + 99,), | |
| ) | |
| conn.commit() | |
| with pytest.raises(JobStoreError, match="Downgrade non supporté"): | |
| JobStore(db) | |
| def test_missing_migration_is_hard_error(tmp_path: Path) -> None: | |
| """Si ``existing < SCHEMA_VERSION`` mais qu'aucune migration n'est | |
| enregistrée pour la version intermédiaire, ``JobStoreError``. | |
| Ce test simule SCHEMA_VERSION = 99 sans entrée dans _MIGRATIONS | |
| en patchant directement. Garantie : on ne laisse jamais une base | |
| dans un état schématiquement incohérent silencieusement. | |
| """ | |
| db = tmp_path / "stale.sqlite" | |
| JobStore(db) # crée v1 | |
| # Patch in-test : prétendons que le code attend v99. | |
| original = JobStore.SCHEMA_VERSION | |
| JobStore.SCHEMA_VERSION = 99 | |
| try: | |
| with pytest.raises(JobStoreError, match="migration manquante"): | |
| JobStore(db) | |
| finally: | |
| JobStore.SCHEMA_VERSION = original | |
| def test_migration_chain_applied(tmp_path: Path) -> None: | |
| """Si SCHEMA_VERSION saute de N versions, toutes les migrations | |
| intermédiaires sont appliquées dans l'ordre. | |
| Simule une migration v1 → v2 fictive enregistrée temporairement. | |
| """ | |
| db = tmp_path / "chain.sqlite" | |
| JobStore(db) # v1 | |
| applied: list[int] = [] | |
| def fake_v1_to_v2(conn: sqlite3.Connection) -> None: | |
| applied.append(1) | |
| original_version = JobStore.SCHEMA_VERSION | |
| JobStore.SCHEMA_VERSION = 2 | |
| _MIGRATIONS[1] = fake_v1_to_v2 | |
| try: | |
| JobStore(db) # déclenche v1 → v2 | |
| assert applied == [1] | |
| with sqlite3.connect(str(db)) as conn: | |
| cur = conn.execute("SELECT version FROM schema_version") | |
| assert cur.fetchone()[0] == 2 | |
| finally: | |
| JobStore.SCHEMA_VERSION = original_version | |
| _MIGRATIONS.pop(1, None) | |