""" Eventual Consistency & DLQ — Orsync Scenarist v7.0 PRD Constraints ============================================================ Targets: Transactional Outbox, Idempotency (Redis sets), Dead-Letter Queue. Uses fakeredis for deterministic, infrastructure-free testing. """ from __future__ import annotations import json import uuid from unittest.mock import MagicMock, patch import pytest from backend.app.core.idempotency import IdempotencyStore from backend.app.core.dlq import DeadLetterQueue from backend.app.core.outbox import TransactionalOutbox # ═══════════════════════════════════════════════════════════════════ # 1. Idempotency — duplicate UUIDv4 must NOT produce double injection # ═══════════════════════════════════════════════════════════════════ class TestIdempotencyGuard: """PRD §8: Two identical requests with the same UUIDv4 must be intercepted on the second attempt.""" def test_first_event_is_processed(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("gold.ingest", {"doctor_id": "D001"}) dispatched = outbox.dispatch_once() assert dispatched is True assert idempotency_store.is_processed(event["event_id"]) def test_duplicate_event_is_skipped(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("gold.ingest", {"doctor_id": "D001"}) outbox.dispatch_once() assert idempotency_store.is_processed(event["event_id"]) outbox.enqueue.__func__ # noqa – just verifying callable fake_redis.hset( outbox.outbox_key, event["event_id"], json.dumps({**event, "available_at": 0}), ) stream_len_before = fake_redis.xlen(outbox.stream_key) outbox.dispatch_once() stream_len_after = fake_redis.xlen(outbox.stream_key) assert stream_len_after == stream_len_before, ( "Duplicate event must NOT produce a second stream entry" ) def test_two_different_uuids_both_processed(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) e1 = outbox.enqueue("gold.ingest", {"doctor_id": "D001"}) e2 = outbox.enqueue("gold.ingest", {"doctor_id": "D002"}) outbox.dispatch_once() outbox.dispatch_once() assert idempotency_store.is_processed(e1["event_id"]) assert idempotency_store.is_processed(e2["event_id"]) def test_idempotency_store_set_semantics(self, fake_redis): store = IdempotencyStore(fake_redis) key = str(uuid.uuid4()) assert not store.is_processed(key) store.mark_processed(key) assert store.is_processed(key) store.mark_processed(key) assert store.is_processed(key) # ═══════════════════════════════════════════════════════════════════ # 2. DLQ Routing — forced failure routes to dead-letter queue # ═══════════════════════════════════════════════════════════════════ class TestDLQRouting: """PRD §9: Persistent failures (>=3 retries) route to dlq_stream.""" def test_publish_failure_routes_to_dlq_after_retries(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("vector.inject", {"vector": [0.1, 0.2]}) with patch.object(outbox, "_publish", side_effect=ConnectionError("db timeout")): for attempt in range(3): outbox.dispatch_once() remaining = fake_redis.hgetall(outbox.outbox_key) if event["event_id"] in remaining: evt = json.loads(remaining[event["event_id"]]) evt["available_at"] = 0 fake_redis.hset(outbox.outbox_key, event["event_id"], json.dumps(evt)) dlq_items = dlq.list_items() assert len(dlq_items) >= 1, "Failed event must land in DLQ after 3 retries" dlq_event = dlq_items[0]["event"] assert dlq_event["event_id"] == event["event_id"] assert "db timeout" in dlq_items[0]["reason"] def test_transient_failure_retries_before_dlq(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("vector.inject", {"vector": [0.3]}) call_count = {"n": 0} original_publish = outbox._publish def _flaky_publish(evt): call_count["n"] += 1 if call_count["n"] < 3: raise ConnectionError("transient") original_publish(evt) with patch.object(outbox, "_publish", side_effect=_flaky_publish): for _ in range(5): remaining = fake_redis.hgetall(outbox.outbox_key) if event["event_id"] in remaining: evt = json.loads(remaining[event["event_id"]]) evt["available_at"] = 0 fake_redis.hset(outbox.outbox_key, event["event_id"], json.dumps(evt)) outbox.dispatch_once() assert idempotency_store.is_processed(event["event_id"]) or len(dlq.list_items()) > 0 def test_dlq_replay_returns_event_to_outbox(self, fake_redis): dlq = DeadLetterQueue(fake_redis) event = { "event_id": str(uuid.uuid4()), "event_type": "vector.inject", "payload": {}, "retries": 3, } dlq.push(event, reason="timeout") replayed = dlq.replay_one() assert replayed is not None assert replayed["event"]["event_id"] == event["event_id"] outbox_raw = fake_redis.hget("scenarist:outbox", event["event_id"]) assert outbox_raw is not None restored = json.loads(outbox_raw) assert restored["retries"] == 0 def test_dlq_empty_replay_returns_none(self, fake_redis): dlq = DeadLetterQueue(fake_redis) assert dlq.replay_one() is None def test_exponential_backoff_on_retry(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("vector.inject", {"vector": [0.5]}) with patch.object(outbox, "_publish", side_effect=ConnectionError("fail")): outbox.dispatch_once() remaining = fake_redis.hgetall(outbox.outbox_key) if event["event_id"] in remaining: evt = json.loads(remaining[event["event_id"]]) assert evt["retries"] == 1 assert evt["available_at"] > 0 # ═══════════════════════════════════════════════════════════════════ # 3. Outbox transport integration # ═══════════════════════════════════════════════════════════════════ class TestOutboxTransport: """Verify outbox publishes events to Redis Streams correctly.""" def test_event_appears_in_redis_stream(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("gold.ingest", {"doctor_id": "D100"}) outbox.dispatch_once() stream_entries = fake_redis.xrange(outbox.stream_key) assert len(stream_entries) >= 1 last_entry = stream_entries[-1] payload = json.loads(last_entry[1]["event"]) assert payload["event_id"] == event["event_id"] assert payload["event_type"] == "gold.ingest" def test_enqueue_returns_valid_event_structure(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) event = outbox.enqueue("test.event", {"key": "value"}) assert "event_id" in event assert "event_type" in event assert "payload" in event assert "created_at" in event assert event["retries"] == 0 try: uuid.UUID(event["event_id"], version=4) except ValueError: pytest.fail("event_id is not a valid UUIDv4") def test_no_event_returns_false(self, fake_redis, idempotency_store, dlq): outbox = TransactionalOutbox(fake_redis, idempotency_store, dlq) assert outbox.dispatch_once() is False