kirana-detective / tracer.py
naazimsnh02's picture
Final Submission
e0446f7
Raw
History Blame
5.77 kB
from __future__ import annotations
import datetime
import json
import logging
import threading
import time
from typing import List, Optional, TYPE_CHECKING
from models import AgentTraceEntry
if TYPE_CHECKING:
from storage import StorageManager
logger = logging.getLogger(__name__)
_HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces"
# Dataset card pushed once on first publish (ensures format:agent-traces tag is set)
_DATASET_README = """\
---
license: mit
language:
- en
tags:
- agent-traces
- kirana-detective
- invoice-audit
- indian-fmcg
format: agent-traces
---
# Kirana Detective β€” Runtime Audit Traces
Per-audit agent execution traces from the Kirana Detective AI app.
Each file records one full audit run: all six agents, their inputs,
outputs, and timings.
Trace format: one JSON object per file under `traces/`.
- **Build traces** (Claude Code sessions): [build-small-hackathon/kirana-detective-build-traces](https://huggingface.co/datasets/build-small-hackathon/kirana-detective-build-traces)
- **Space**: [build-small-hackathon/kirana-detective](https://huggingface.co/spaces/build-small-hackathon/kirana-detective)
"""
MAX_RETRIES = 3
BACKOFF_BASE_SECONDS = 2 # sleeps 2s, 4s, 8s on successive failures
def _make_trace_entry(
agent_name: str,
agent_version: str,
audit_run_id: str,
t_start: float, # time.monotonic() value
t_end: float,
input_summary: str,
output_summary: str,
) -> AgentTraceEntry:
now_utc = datetime.datetime.now(datetime.timezone.utc)
duration_ms = int((t_end - t_start) * 1000)
# Approximate start timestamp from end - duration
ts_end = now_utc.isoformat()
ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat()
return AgentTraceEntry(
agent_name=agent_name,
agent_version=agent_version,
audit_run_id=audit_run_id,
timestamp_start=ts_start,
timestamp_end=ts_end,
duration_ms=duration_ms,
input_summary=input_summary,
output_summary=output_summary,
)
class AgentTracer:
def __init__(self, hf_token: Optional[str] = None) -> None:
self._hf_token = hf_token
self._buffer: List[AgentTraceEntry] = []
# ── Collection ────────────────────────────────────────────────────────────
def collect(self, entry: AgentTraceEntry) -> None:
self._buffer.append(entry)
def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]:
entries = list(self._buffer)
self._buffer.clear()
return entries
# ── Publishing ────────────────────────────────────────────────────────────
def publish_async(
self,
audit_run_id: str,
entries: List[AgentTraceEntry],
storage: "StorageManager",
) -> None:
t = threading.Thread(
target=self._publish_with_retry,
args=(audit_run_id, entries, storage),
daemon=True,
)
t.start()
def _publish_with_retry(
self,
audit_run_id: str,
entries: List[AgentTraceEntry],
storage: "StorageManager",
) -> None:
trace_json = json.dumps(
[e.__dict__ for e in entries], ensure_ascii=False, default=str
)
# Persist to local SQLite first (fast, synchronous)
storage.save_audit_run(audit_run_id, trace_json)
# HF Hub publishing disabled β€” runtime trace dataset was not completed
logger.info("Trace %s stored locally (HF Hub publishing disabled)", audit_run_id)
def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None:
from huggingface_hub import HfApi, CommitOperationAdd
if not self._hf_token:
raise ValueError("HF_TOKEN not set β€” cannot publish trace")
api = HfApi(token=self._hf_token)
# Ensure the dataset repo exists with correct tags on first publish
api.create_repo(repo_id=_HF_DATASET_REPO, repo_type="dataset", exist_ok=True, private=False)
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
row = {
"audit_run_id": audit_run_id,
"timestamp": now,
"agents": [e.__dict__ for e in entries],
}
trace_content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8")
operations = [
CommitOperationAdd(
path_in_repo=f"traces/{audit_run_id}.jsonl",
path_or_fileobj=trace_content,
),
]
# Push dataset README on every call (idempotent β€” ensures tag is always set)
operations.append(
CommitOperationAdd(
path_in_repo="README.md",
path_or_fileobj=_DATASET_README.encode("utf-8"),
)
)
api.create_commit(
repo_id=_HF_DATASET_REPO,
repo_type="dataset",
operations=operations,
commit_message=f"Add audit trace {audit_run_id[:8]}",
)
# ── Module-level helper (used by agents) ─────────────────────────────────────
def make_trace_entry(
agent_name: str,
agent_version: str,
audit_run_id: str,
t_start: float,
t_end: float,
input_summary: str,
output_summary: str,
) -> AgentTraceEntry:
return _make_trace_entry(
agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary
)