kirana-detective / tracer.py
naazimsnh02's picture
All models training uploaded
9d75c8c
Raw
History Blame
4.94 kB
from __future__ import annotations
import datetime
import json
import logging
import threading
import time
from typing import List, Optional, TYPE_CHECKING
from models import AgentTraceEntry
if TYPE_CHECKING:
from storage import StorageManager
logger = logging.getLogger(__name__)
_HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces"
MAX_RETRIES = 3
BACKOFF_BASE_SECONDS = 2 # sleeps 2s, 4s, 8s on successive failures
def _make_trace_entry(
agent_name: str,
agent_version: str,
audit_run_id: str,
t_start: float, # time.monotonic() value
t_end: float,
input_summary: str,
output_summary: str,
) -> AgentTraceEntry:
now_utc = datetime.datetime.now(datetime.timezone.utc)
duration_ms = int((t_end - t_start) * 1000)
# Approximate start timestamp from end - duration
ts_end = now_utc.isoformat()
ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat()
return AgentTraceEntry(
agent_name=agent_name,
agent_version=agent_version,
audit_run_id=audit_run_id,
timestamp_start=ts_start,
timestamp_end=ts_end,
duration_ms=duration_ms,
input_summary=input_summary,
output_summary=output_summary,
)
class AgentTracer:
def __init__(self, hf_token: Optional[str] = None) -> None:
self._hf_token = hf_token
self._buffer: List[AgentTraceEntry] = []
# ── Collection ────────────────────────────────────────────────────────────
def collect(self, entry: AgentTraceEntry) -> None:
self._buffer.append(entry)
def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]:
entries = list(self._buffer)
self._buffer.clear()
return entries
# ── Publishing ────────────────────────────────────────────────────────────
def publish_async(
self,
audit_run_id: str,
entries: List[AgentTraceEntry],
storage: "StorageManager",
) -> None:
t = threading.Thread(
target=self._publish_with_retry,
args=(audit_run_id, entries, storage),
daemon=True,
)
t.start()
def _publish_with_retry(
self,
audit_run_id: str,
entries: List[AgentTraceEntry],
storage: "StorageManager",
) -> None:
trace_json = json.dumps(
[e.__dict__ for e in entries], ensure_ascii=False, default=str
)
# Persist to local SQLite first (fast, synchronous)
storage.save_audit_run(audit_run_id, trace_json)
# Then push to HF Hub with retries
for attempt in range(MAX_RETRIES):
try:
self._publish_to_hf_hub(audit_run_id, entries)
storage.mark_trace_published(audit_run_id)
logger.info("Trace %s published to HF Hub", audit_run_id)
return
except Exception as e:
wait = BACKOFF_BASE_SECONDS ** (attempt + 1)
logger.warning(
"Trace publish attempt %d/%d failed: %s β€” retrying in %ds",
attempt + 1, MAX_RETRIES, e, wait,
)
time.sleep(wait)
logger.error("Trace %s: all %d publish attempts failed; stored locally only", audit_run_id, MAX_RETRIES)
def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None:
from huggingface_hub import HfApi
if not self._hf_token:
raise ValueError("HF_TOKEN not set β€” cannot publish trace")
row = {
"audit_run_id": audit_run_id,
"trace_json": json.dumps([e.__dict__ for e in entries], default=str),
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
}
# Exclude any fields that could contain PII or raw image bytes
content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8")
api = HfApi(token=self._hf_token)
api.upload_file(
path_or_fileobj=content,
path_in_repo=f"traces/{audit_run_id}.json",
repo_id=_HF_DATASET_REPO,
repo_type="dataset",
)
# ── Module-level helper (used by agents) ─────────────────────────────────────
def make_trace_entry(
agent_name: str,
agent_version: str,
audit_run_id: str,
t_start: float,
t_end: float,
input_summary: str,
output_summary: str,
) -> AgentTraceEntry:
return _make_trace_entry(
agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary
)