from __future__ import annotations import datetime import json import logging import threading import time from typing import List, Optional, TYPE_CHECKING from models import AgentTraceEntry if TYPE_CHECKING: from storage import StorageManager logger = logging.getLogger(__name__) _HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces" # Dataset card pushed once on first publish (ensures format:agent-traces tag is set) _DATASET_README = """\ --- license: mit language: - en tags: - agent-traces - kirana-detective - invoice-audit - indian-fmcg format: agent-traces --- # Kirana Detective — Runtime Audit Traces Per-audit agent execution traces from the Kirana Detective AI app. Each file records one full audit run: all six agents, their inputs, outputs, and timings. Trace format: one JSON object per file under `traces/`. - **Build traces** (Claude Code sessions): [build-small-hackathon/kirana-detective-build-traces](https://huggingface.co/datasets/build-small-hackathon/kirana-detective-build-traces) - **Space**: [build-small-hackathon/kirana-detective](https://huggingface.co/spaces/build-small-hackathon/kirana-detective) """ MAX_RETRIES = 3 BACKOFF_BASE_SECONDS = 2 # sleeps 2s, 4s, 8s on successive failures def _make_trace_entry( agent_name: str, agent_version: str, audit_run_id: str, t_start: float, # time.monotonic() value t_end: float, input_summary: str, output_summary: str, ) -> AgentTraceEntry: now_utc = datetime.datetime.now(datetime.timezone.utc) duration_ms = int((t_end - t_start) * 1000) # Approximate start timestamp from end - duration ts_end = now_utc.isoformat() ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat() return AgentTraceEntry( agent_name=agent_name, agent_version=agent_version, audit_run_id=audit_run_id, timestamp_start=ts_start, timestamp_end=ts_end, duration_ms=duration_ms, input_summary=input_summary, output_summary=output_summary, ) class AgentTracer: def __init__(self, hf_token: Optional[str] = None) -> None: self._hf_token = hf_token self._buffer: List[AgentTraceEntry] = [] # ── Collection ──────────────────────────────────────────────────────────── def collect(self, entry: AgentTraceEntry) -> None: self._buffer.append(entry) def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]: entries = list(self._buffer) self._buffer.clear() return entries # ── Publishing ──────────────────────────────────────────────────────────── def publish_async( self, audit_run_id: str, entries: List[AgentTraceEntry], storage: "StorageManager", ) -> None: t = threading.Thread( target=self._publish_with_retry, args=(audit_run_id, entries, storage), daemon=True, ) t.start() def _publish_with_retry( self, audit_run_id: str, entries: List[AgentTraceEntry], storage: "StorageManager", ) -> None: trace_json = json.dumps( [e.__dict__ for e in entries], ensure_ascii=False, default=str ) # Persist to local SQLite first (fast, synchronous) storage.save_audit_run(audit_run_id, trace_json) # HF Hub publishing disabled — runtime trace dataset was not completed logger.info("Trace %s stored locally (HF Hub publishing disabled)", audit_run_id) def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None: from huggingface_hub import HfApi, CommitOperationAdd if not self._hf_token: raise ValueError("HF_TOKEN not set — cannot publish trace") api = HfApi(token=self._hf_token) # Ensure the dataset repo exists with correct tags on first publish api.create_repo(repo_id=_HF_DATASET_REPO, repo_type="dataset", exist_ok=True, private=False) now = datetime.datetime.now(datetime.timezone.utc).isoformat() row = { "audit_run_id": audit_run_id, "timestamp": now, "agents": [e.__dict__ for e in entries], } trace_content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8") operations = [ CommitOperationAdd( path_in_repo=f"traces/{audit_run_id}.jsonl", path_or_fileobj=trace_content, ), ] # Push dataset README on every call (idempotent — ensures tag is always set) operations.append( CommitOperationAdd( path_in_repo="README.md", path_or_fileobj=_DATASET_README.encode("utf-8"), ) ) api.create_commit( repo_id=_HF_DATASET_REPO, repo_type="dataset", operations=operations, commit_message=f"Add audit trace {audit_run_id[:8]}", ) # ── Module-level helper (used by agents) ───────────────────────────────────── def make_trace_entry( agent_name: str, agent_version: str, audit_run_id: str, t_start: float, t_end: float, input_summary: str, output_summary: str, ) -> AgentTraceEntry: return _make_trace_entry( agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary )