from __future__ import annotations import json import logging import time import threading from typing import Optional, Tuple from models import InvoiceJSON, NormalisedInvoice, AgentTraceEntry from catalog import FMCGCatalog from tracer import make_trace_entry logger = logging.getLogger(__name__) AGENT_NAME = "Product_Matcher" AGENT_VERSION = "1.0.0" MODEL_REPO = "build-small-hackathon/minicpm5-1b-indian-fmcg-normalizer" _TIMEOUT_SECONDS = 60 _SYSTEM_PROMPT = ( "You are an Indian FMCG product name normalizer. " "Given a raw product name from a distributor invoice, return ONLY the canonical product_id " "from the list provided. If no match, return null. Return ONLY a JSON object: " '{"product_id": "" | null}' ) def _build_llm_prompt(product_raw: str, catalog: FMCGCatalog) -> str: """Build a compact prompt listing all known product IDs.""" ids = sorted(catalog.all_product_ids()) id_list = "\n".join(f"- {pid}" for pid in ids[:100]) # cap at 100 to stay within context return ( f'Raw invoice name: "{product_raw}"\n\n' f"Known product IDs (pick exactly one or return null):\n{id_list}\n\n" 'Return ONLY: {"product_id": "" | null}' ) class ProductMatcherAgent: def __init__(self, llm, catalog: FMCGCatalog) -> None: self._llm = llm self._catalog = catalog def normalize_single(self, product_raw: str) -> Tuple[Optional[str], Optional[str]]: """Return (product_normalized, product_id). Both None if unmatched.""" # Fast path: alias lookup pid = self._catalog.lookup_alias(product_raw) if pid: entry = self._catalog.get_by_id(pid) canonical = entry.canonical_name if entry else product_raw return canonical, pid # Slow path: LLM if self._llm is None: return None, None try: prompt = _build_llm_prompt(product_raw, self._catalog) response = self._llm.create_chat_completion( messages=[ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], max_tokens=64, temperature=0.0, ) text = response["choices"][0]["message"]["content"].strip() if not text: logger.warning("ProductMatcher LLM returned empty response for %r", product_raw) return None, None data = json.loads(text) returned_id = data.get("product_id") except Exception as e: logger.warning("ProductMatcher LLM call failed for %r: %s", product_raw, e) return None, None # Validate returned ID against catalog if returned_id and returned_id in self._catalog.all_product_ids(): entry = self._catalog.get_by_id(returned_id) canonical = entry.canonical_name if entry else product_raw return canonical, returned_id return None, None def normalize( self, invoice: InvoiceJSON, audit_run_id: str ) -> Tuple[NormalisedInvoice, AgentTraceEntry]: t_start = time.monotonic() unmatched: list[str] = [] done = threading.Event() exception: list[Exception] = [] def _run(): try: for item in invoice.items: if done.is_set(): # Timeout hit — mark remaining as unmatched if item.product_id is None: unmatched.append(item.product_raw) continue normalized, pid = self.normalize_single(item.product_raw) item.product_normalized = normalized item.product_id = pid if pid is None: unmatched.append(item.product_raw) except Exception as e: exception.append(e) thread = threading.Thread(target=_run, daemon=True) thread.start() thread.join(timeout=_TIMEOUT_SECONDS) if thread.is_alive(): done.set() logger.warning("ProductMatcher: timeout after %ds — some items unmatched", _TIMEOUT_SECONDS) # Collect items that were never processed for item in invoice.items: if item.product_id is None and item.product_raw not in unmatched: unmatched.append(item.product_raw) elif exception: raise exception[0] invoice.unmatched_products = unmatched t_end = time.monotonic() n_items = len(invoice.items) n_unmatched = len(unmatched) trace = make_trace_entry( agent_name=AGENT_NAME, agent_version=AGENT_VERSION, audit_run_id=audit_run_id, t_start=t_start, t_end=t_end, input_summary=f"{n_items} items to normalize", output_summary=f"{n_items} items; {n_unmatched} unmatched", ) return invoice, trace