from __future__ import annotations import json from typing import Any from redis import Redis class DeadLetterQueue: def __init__(self, redis_client: Redis, queue_name: str = "scenarist:dlq") -> None: self.redis = redis_client self.queue_name = queue_name def push(self, event: dict[str, Any], reason: str) -> None: payload = {"event": event, "reason": reason} self.redis.rpush(self.queue_name, json.dumps(payload)) def list_items(self, limit: int = 100) -> list[dict[str, Any]]: raw = self.redis.lrange(self.queue_name, 0, max(0, limit - 1)) return [json.loads(item) for item in raw] def replay_one(self) -> dict[str, Any] | None: raw = self.redis.lpop(self.queue_name) if raw is None: return None replay = json.loads(raw) event = replay.get("event", {}) if isinstance(event, dict) and event.get("event_id"): # Reset retry bookkeeping before replaying into outbox hash. event["retries"] = 0 event["available_at"] = 0 self.redis.hset("scenarist:outbox", event["event_id"], json.dumps(event)) return replay