File size: 5,773 Bytes
7b5611f 9d75c8c 3b757a5 7b5611f e0446f7 7b5611f 3b757a5 7b5611f 3b757a5 7b5611f 3b757a5 7b5611f 3b757a5 7b5611f 3b757a5 7b5611f 3b757a5 7b5611f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | from __future__ import annotations
import datetime
import json
import logging
import threading
import time
from typing import List, Optional, TYPE_CHECKING
from models import AgentTraceEntry
if TYPE_CHECKING:
from storage import StorageManager
logger = logging.getLogger(__name__)
_HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces"
# Dataset card pushed once on first publish (ensures format:agent-traces tag is set)
_DATASET_README = """\
---
license: mit
language:
- en
tags:
- agent-traces
- kirana-detective
- invoice-audit
- indian-fmcg
format: agent-traces
---
# Kirana Detective β Runtime Audit Traces
Per-audit agent execution traces from the Kirana Detective AI app.
Each file records one full audit run: all six agents, their inputs,
outputs, and timings.
Trace format: one JSON object per file under `traces/`.
- **Build traces** (Claude Code sessions): [build-small-hackathon/kirana-detective-build-traces](https://huggingface.co/datasets/build-small-hackathon/kirana-detective-build-traces)
- **Space**: [build-small-hackathon/kirana-detective](https://huggingface.co/spaces/build-small-hackathon/kirana-detective)
"""
MAX_RETRIES = 3
BACKOFF_BASE_SECONDS = 2 # sleeps 2s, 4s, 8s on successive failures
def _make_trace_entry(
agent_name: str,
agent_version: str,
audit_run_id: str,
t_start: float, # time.monotonic() value
t_end: float,
input_summary: str,
output_summary: str,
) -> AgentTraceEntry:
now_utc = datetime.datetime.now(datetime.timezone.utc)
duration_ms = int((t_end - t_start) * 1000)
# Approximate start timestamp from end - duration
ts_end = now_utc.isoformat()
ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat()
return AgentTraceEntry(
agent_name=agent_name,
agent_version=agent_version,
audit_run_id=audit_run_id,
timestamp_start=ts_start,
timestamp_end=ts_end,
duration_ms=duration_ms,
input_summary=input_summary,
output_summary=output_summary,
)
class AgentTracer:
def __init__(self, hf_token: Optional[str] = None) -> None:
self._hf_token = hf_token
self._buffer: List[AgentTraceEntry] = []
# ββ Collection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def collect(self, entry: AgentTraceEntry) -> None:
self._buffer.append(entry)
def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]:
entries = list(self._buffer)
self._buffer.clear()
return entries
# ββ Publishing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def publish_async(
self,
audit_run_id: str,
entries: List[AgentTraceEntry],
storage: "StorageManager",
) -> None:
t = threading.Thread(
target=self._publish_with_retry,
args=(audit_run_id, entries, storage),
daemon=True,
)
t.start()
def _publish_with_retry(
self,
audit_run_id: str,
entries: List[AgentTraceEntry],
storage: "StorageManager",
) -> None:
trace_json = json.dumps(
[e.__dict__ for e in entries], ensure_ascii=False, default=str
)
# Persist to local SQLite first (fast, synchronous)
storage.save_audit_run(audit_run_id, trace_json)
# HF Hub publishing disabled β runtime trace dataset was not completed
logger.info("Trace %s stored locally (HF Hub publishing disabled)", audit_run_id)
def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None:
from huggingface_hub import HfApi, CommitOperationAdd
if not self._hf_token:
raise ValueError("HF_TOKEN not set β cannot publish trace")
api = HfApi(token=self._hf_token)
# Ensure the dataset repo exists with correct tags on first publish
api.create_repo(repo_id=_HF_DATASET_REPO, repo_type="dataset", exist_ok=True, private=False)
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
row = {
"audit_run_id": audit_run_id,
"timestamp": now,
"agents": [e.__dict__ for e in entries],
}
trace_content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8")
operations = [
CommitOperationAdd(
path_in_repo=f"traces/{audit_run_id}.jsonl",
path_or_fileobj=trace_content,
),
]
# Push dataset README on every call (idempotent β ensures tag is always set)
operations.append(
CommitOperationAdd(
path_in_repo="README.md",
path_or_fileobj=_DATASET_README.encode("utf-8"),
)
)
api.create_commit(
repo_id=_HF_DATASET_REPO,
repo_type="dataset",
operations=operations,
commit_message=f"Add audit trace {audit_run_id[:8]}",
)
# ββ Module-level helper (used by agents) βββββββββββββββββββββββββββββββββββββ
def make_trace_entry(
agent_name: str,
agent_version: str,
audit_run_id: str,
t_start: float,
t_end: float,
input_summary: str,
output_summary: str,
) -> AgentTraceEntry:
return _make_trace_entry(
agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary
)
|