| from __future__ import annotations |
|
|
| import datetime |
| import json |
| import logging |
| import threading |
| import time |
| from typing import List, Optional, TYPE_CHECKING |
|
|
| from models import AgentTraceEntry |
|
|
| if TYPE_CHECKING: |
| from storage import StorageManager |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _HF_DATASET_REPO = "build-small-hackathon/kirana-detective-traces" |
| MAX_RETRIES = 3 |
| BACKOFF_BASE_SECONDS = 2 |
|
|
|
|
| def _make_trace_entry( |
| agent_name: str, |
| agent_version: str, |
| audit_run_id: str, |
| t_start: float, |
| t_end: float, |
| input_summary: str, |
| output_summary: str, |
| ) -> AgentTraceEntry: |
| now_utc = datetime.datetime.now(datetime.timezone.utc) |
| duration_ms = int((t_end - t_start) * 1000) |
| |
| ts_end = now_utc.isoformat() |
| ts_start = (now_utc - datetime.timedelta(milliseconds=duration_ms)).isoformat() |
| return AgentTraceEntry( |
| agent_name=agent_name, |
| agent_version=agent_version, |
| audit_run_id=audit_run_id, |
| timestamp_start=ts_start, |
| timestamp_end=ts_end, |
| duration_ms=duration_ms, |
| input_summary=input_summary, |
| output_summary=output_summary, |
| ) |
|
|
|
|
| class AgentTracer: |
| def __init__(self, hf_token: Optional[str] = None) -> None: |
| self._hf_token = hf_token |
| self._buffer: List[AgentTraceEntry] = [] |
|
|
| |
|
|
| def collect(self, entry: AgentTraceEntry) -> None: |
| self._buffer.append(entry) |
|
|
| def finalise(self, audit_run_id: str) -> List[AgentTraceEntry]: |
| entries = list(self._buffer) |
| self._buffer.clear() |
| return entries |
|
|
| |
|
|
| def publish_async( |
| self, |
| audit_run_id: str, |
| entries: List[AgentTraceEntry], |
| storage: "StorageManager", |
| ) -> None: |
| t = threading.Thread( |
| target=self._publish_with_retry, |
| args=(audit_run_id, entries, storage), |
| daemon=True, |
| ) |
| t.start() |
|
|
| def _publish_with_retry( |
| self, |
| audit_run_id: str, |
| entries: List[AgentTraceEntry], |
| storage: "StorageManager", |
| ) -> None: |
| trace_json = json.dumps( |
| [e.__dict__ for e in entries], ensure_ascii=False, default=str |
| ) |
|
|
| |
| storage.save_audit_run(audit_run_id, trace_json) |
|
|
| |
| for attempt in range(MAX_RETRIES): |
| try: |
| self._publish_to_hf_hub(audit_run_id, entries) |
| storage.mark_trace_published(audit_run_id) |
| logger.info("Trace %s published to HF Hub", audit_run_id) |
| return |
| except Exception as e: |
| wait = BACKOFF_BASE_SECONDS ** (attempt + 1) |
| logger.warning( |
| "Trace publish attempt %d/%d failed: %s β retrying in %ds", |
| attempt + 1, MAX_RETRIES, e, wait, |
| ) |
| time.sleep(wait) |
|
|
| logger.error("Trace %s: all %d publish attempts failed; stored locally only", audit_run_id, MAX_RETRIES) |
|
|
| def _publish_to_hf_hub(self, audit_run_id: str, entries: List[AgentTraceEntry]) -> None: |
| from huggingface_hub import HfApi |
|
|
| if not self._hf_token: |
| raise ValueError("HF_TOKEN not set β cannot publish trace") |
|
|
| row = { |
| "audit_run_id": audit_run_id, |
| "trace_json": json.dumps([e.__dict__ for e in entries], default=str), |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| } |
| |
| content = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8") |
|
|
| api = HfApi(token=self._hf_token) |
| api.upload_file( |
| path_or_fileobj=content, |
| path_in_repo=f"traces/{audit_run_id}.json", |
| repo_id=_HF_DATASET_REPO, |
| repo_type="dataset", |
| ) |
|
|
|
|
| |
|
|
| def make_trace_entry( |
| agent_name: str, |
| agent_version: str, |
| audit_run_id: str, |
| t_start: float, |
| t_end: float, |
| input_summary: str, |
| output_summary: str, |
| ) -> AgentTraceEntry: |
| return _make_trace_entry( |
| agent_name, agent_version, audit_run_id, t_start, t_end, input_summary, output_summary |
| ) |
|
|