from __future__ import annotations import datetime import json import logging import threading import time from typing import List, Optional, TYPE_CHECKING from models import AgentTraceEntry if TYPE_CHECKING: from storage import StorageManager logger = logging.getLogger(__name__) _HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces" MAX_RETRIES = 3 BACKOFF_BASE_SECONDS = 2 # sleeps 2s, 4s, 8s on successive failures def _make_trace_entry( agent_name: str, agent_version: str, audit_run_id: str, t_start: float, # time.monotonic() value t_end: float, input_summary: str, output_summary: str, ) -> AgentTraceEntry: now_utc = datetime.datetime.now(datetime.timezone.utc) duration_ms = int((t_end - t_start) * 1000) # Approximate start timestamp from end - duration ts_end = now_utc.isoformat() ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat() return AgentTraceEntry( agent_name=agent_name, agent_version=agent_version, audit_run_id=audit_run_id, timestamp_start=ts_start, timestamp_end=ts_end, duration_ms=duration_ms, input_summary=input_summary, output_summary=output_summary, ) class AgentTracer: def __init__(self, hf_token: Optional[str] = None) -> None: self._hf_token = hf_token self._buffer: List[AgentTraceEntry] = [] # ── Collection ──────────────────────────────────────────────────────────── def collect(self, entry: AgentTraceEntry) -> None: self._buffer.append(entry) def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]: entries = list(self._buffer) self._buffer.clear() return entries # ── Publishing ──────────────────────────────────────────────────────────── def publish_async( self, audit_run_id: str, entries: List[AgentTraceEntry], storage: "StorageManager", ) -> None: t = threading.Thread( target=self._publish_with_retry, args=(audit_run_id, entries, storage), daemon=True, ) t.start() def _publish_with_retry( self, audit_run_id: str, entries: List[AgentTraceEntry], storage: "StorageManager", ) -> None: trace_json = json.dumps( [e.__dict__ for e in entries], ensure_ascii=False, default=str ) # Persist to local SQLite first (fast, synchronous) storage.save_audit_run(audit_run_id, trace_json) # Then push to HF Hub with retries for attempt in range(MAX_RETRIES): try: self._publish_to_hf_hub(audit_run_id, entries) storage.mark_trace_published(audit_run_id) logger.info("Trace %s published to HF Hub", audit_run_id) return except Exception as e: wait = BACKOFF_BASE_SECONDS ** (attempt + 1) logger.warning( "Trace publish attempt %d/%d failed: %s — retrying in %ds", attempt + 1, MAX_RETRIES, e, wait, ) time.sleep(wait) logger.error("Trace %s: all %d publish attempts failed; stored locally only", audit_run_id, MAX_RETRIES) def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None: from huggingface_hub import HfApi if not self._hf_token: raise ValueError("HF_TOKEN not set — cannot publish trace") row = { "audit_run_id": audit_run_id, "trace_json": json.dumps([e.__dict__ for e in entries], default=str), "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), } # Exclude any fields that could contain PII or raw image bytes content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8") api = HfApi(token=self._hf_token) api.upload_file( path_or_fileobj=content, path_in_repo=f"traces/{audit_run_id}.json", repo_id=_HF_DATASET_REPO, repo_type="dataset", ) # ── Module-level helper (used by agents) ───────────────────────────────────── def make_trace_entry( agent_name: str, agent_version: str, audit_run_id: str, t_start: float, t_end: float, input_summary: str, output_summary: str, ) -> AgentTraceEntry: return _make_trace_entry( agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary )