from __future__ import annotations import json import time import uuid from typing import Any import aio_pika from redis import Redis from backend.app.core.dlq import DeadLetterQueue from backend.app.core.idempotency import IdempotencyStore class TransactionalOutbox: def __init__( self, redis_client: Redis, idempotency_store: IdempotencyStore, dlq: DeadLetterQueue, outbox_key: str = "scenarist:outbox", stream_key: str = "scenarist:events", transport: str = "redis_streams", amqp_url: str = "amqp://guest:guest@localhost:5672/", ) -> None: self.redis = redis_client self.idempotency = idempotency_store self.dlq = dlq self.outbox_key = outbox_key self.stream_key = stream_key self.transport = transport self.amqp_url = amqp_url def enqueue(self, event_type: str, payload: dict[str, Any]) -> dict[str, Any]: event = { "event_id": str(uuid.uuid4()), "event_type": event_type, "payload": payload, "created_at": int(time.time()), "retries": 0, "available_at": int(time.time()), } self.redis.hset(self.outbox_key, event["event_id"], json.dumps(event)) return event async def _publish_amqp(self, event: dict[str, Any]) -> None: connection = await aio_pika.connect_robust(self.amqp_url) async with connection: channel = await connection.channel() queue = await channel.declare_queue(self.stream_key, durable=True) await channel.default_exchange.publish( aio_pika.Message(body=json.dumps(event).encode("utf-8")), routing_key=queue.name, ) def _publish(self, event: dict[str, Any]) -> None: if self.transport == "amqp": import asyncio asyncio.run(self._publish_amqp(event)) return self.redis.xadd(self.stream_key, {"event": json.dumps(event)}, maxlen=10000, approximate=True) def _next_due_event(self) -> dict[str, Any] | None: now = int(time.time()) for raw in self.redis.hvals(self.outbox_key): event = json.loads(raw) if int(event.get("available_at", 0)) <= now: return event return None def dispatch_once(self) -> bool: event = self._next_due_event() if event is None: return False event_id = event["event_id"] # Short-circuit if already successfully processed (idempotency guard). if self.idempotency.is_processed(event_id): # Safe to clean up – delivery already confirmed. self.redis.hdel(self.outbox_key, event_id) return True try: # ── PUBLISH FIRST ─────────────────────────────────────────────── # Delete only after a confirmed publish. If the process crashes # after publish but before hdel the idempotency check above will # catch the duplicate on the next loop iteration. self._publish(event) self.idempotency.mark_processed(event_id) self.redis.hdel(self.outbox_key, event_id) except Exception as exc: # pragma: no cover - redis/network failures # Publish failed: leave the event in the hash with backoff. event["retries"] = event.get("retries", 0) + 1 if event["retries"] >= 3: self.redis.hdel(self.outbox_key, event_id) self.dlq.push(event, reason=str(exc)) else: # Exponential backoff before next dispatch attempt. event["available_at"] = int(time.time()) + (2 ** int(event["retries"])) self.redis.hset(self.outbox_key, event_id, json.dumps(event)) return True