NEtraAi / backend /app /core /event_bus.py
093xpku
Clean project layout deployment
9bc686b
Raw
History Blame
1.23 kB
"""
event_bus.py — In-process broadcast bus for real-time scan events.
Any code path that creates an AttendanceLog (kiosk scan) calls
`publish_scan_event(payload_dict)`. The SSE endpoint in analytics.py
subscribes a new asyncio.Queue per connected client and streams the events.
"""
import asyncio
from typing import List
_subscribers: List[asyncio.Queue] = []
def subscribe() -> asyncio.Queue:
"""Register a new SSE client and return its dedicated queue."""
q: asyncio.Queue = asyncio.Queue(maxsize=50)
_subscribers.append(q)
return q
def unsubscribe(q: asyncio.Queue) -> None:
"""Remove a client queue when the SSE connection closes."""
try:
_subscribers.remove(q)
except ValueError:
pass
def publish_scan_event(payload: dict) -> None:
"""
Broadcast a scan-event dict to all connected SSE clients.
Called from sync code (kiosk endpoint) — safe because Queue.put_nowait
is thread-safe in CPython and does not need an event loop reference.
"""
dead: List[asyncio.Queue] = []
for q in _subscribers:
try:
q.put_nowait(payload)
except asyncio.QueueFull:
dead.append(q)
for q in dead:
unsubscribe(q)