| from __future__ import annotations |
|
|
| import datetime |
| import json |
| import logging |
| import threading |
| import time |
| from typing import List, Optional, TYPE_CHECKING |
|
|
| from models import AgentTraceEntry |
|
|
| if TYPE_CHECKING: |
| from storage import StorageManager |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces" |
|
|
| |
| _DATASET_README = """\ |
| --- |
| license: mit |
| language: |
| - en |
| tags: |
| - agent-traces |
| - kirana-detective |
| - invoice-audit |
| - indian-fmcg |
| format: agent-traces |
| --- |
| |
| # Kirana Detective β Runtime Audit Traces |
| |
| Per-audit agent execution traces from the Kirana Detective AI app. |
| Each file records one full audit run: all six agents, their inputs, |
| outputs, and timings. |
| |
| Trace format: one JSON object per file under `traces/`. |
| |
| - **Build traces** (Claude Code sessions): [build-small-hackathon/kirana-detective-build-traces](https://huggingface.co/datasets/build-small-hackathon/kirana-detective-build-traces) |
| - **Space**: [build-small-hackathon/kirana-detective](https://huggingface.co/spaces/build-small-hackathon/kirana-detective) |
| """ |
| MAX_RETRIES = 3 |
| BACKOFF_BASE_SECONDS = 2 |
|
|
|
|
| def _make_trace_entry( |
| agent_name: str, |
| agent_version: str, |
| audit_run_id: str, |
| t_start: float, |
| t_end: float, |
| input_summary: str, |
| output_summary: str, |
| ) -> AgentTraceEntry: |
| now_utc = datetime.datetime.now(datetime.timezone.utc) |
| duration_ms = int((t_end - t_start) * 1000) |
| |
| ts_end = now_utc.isoformat() |
| ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat() |
| return AgentTraceEntry( |
| agent_name=agent_name, |
| agent_version=agent_version, |
| audit_run_id=audit_run_id, |
| timestamp_start=ts_start, |
| timestamp_end=ts_end, |
| duration_ms=duration_ms, |
| input_summary=input_summary, |
| output_summary=output_summary, |
| ) |
|
|
|
|
| class AgentTracer: |
| def __init__(self, hf_token: Optional[str] = None) -> None: |
| self._hf_token = hf_token |
| self._buffer: List[AgentTraceEntry] = [] |
|
|
| |
|
|
| def collect(self, entry: AgentTraceEntry) -> None: |
| self._buffer.append(entry) |
|
|
| def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]: |
| entries = list(self._buffer) |
| self._buffer.clear() |
| return entries |
|
|
| |
|
|
| def publish_async( |
| self, |
| audit_run_id: str, |
| entries: List[AgentTraceEntry], |
| storage: "StorageManager", |
| ) -> None: |
| t = threading.Thread( |
| target=self._publish_with_retry, |
| args=(audit_run_id, entries, storage), |
| daemon=True, |
| ) |
| t.start() |
|
|
| def _publish_with_retry( |
| self, |
| audit_run_id: str, |
| entries: List[AgentTraceEntry], |
| storage: "StorageManager", |
| ) -> None: |
| trace_json = json.dumps( |
| [e.__dict__ for e in entries], ensure_ascii=False, default=str |
| ) |
|
|
| |
| storage.save_audit_run(audit_run_id, trace_json) |
|
|
| |
| logger.info("Trace %s stored locally (HF Hub publishing disabled)", audit_run_id) |
|
|
| def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None: |
| from huggingface_hub import HfApi, CommitOperationAdd |
|
|
| if not self._hf_token: |
| raise ValueError("HF_TOKEN not set β cannot publish trace") |
|
|
| api = HfApi(token=self._hf_token) |
|
|
| |
| api.create_repo(repo_id=_HF_DATASET_REPO, repo_type="dataset", exist_ok=True, private=False) |
|
|
| now = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| row = { |
| "audit_run_id": audit_run_id, |
| "timestamp": now, |
| "agents": [e.__dict__ for e in entries], |
| } |
| trace_content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8") |
|
|
| operations = [ |
| CommitOperationAdd( |
| path_in_repo=f"traces/{audit_run_id}.jsonl", |
| path_or_fileobj=trace_content, |
| ), |
| ] |
|
|
| |
| operations.append( |
| CommitOperationAdd( |
| path_in_repo="README.md", |
| path_or_fileobj=_DATASET_README.encode("utf-8"), |
| ) |
| ) |
|
|
| api.create_commit( |
| repo_id=_HF_DATASET_REPO, |
| repo_type="dataset", |
| operations=operations, |
| commit_message=f"Add audit trace {audit_run_id[:8]}", |
| ) |
|
|
|
|
| |
|
|
| def make_trace_entry( |
| agent_name: str, |
| agent_version: str, |
| audit_run_id: str, |
| t_start: float, |
| t_end: float, |
| input_summary: str, |
| output_summary: str, |
| ) -> AgentTraceEntry: |
| return _make_trace_entry( |
| agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary |
| ) |
|
|