Finetuning completed for yolo26n-indian-fmcg-detection and minicpm5-1b-indian-fmcg-normalizer
7b5611f | from __future__ import annotations | |
| import json | |
| import logging | |
| import sqlite3 | |
| from datetime import date, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| from models import InvoiceJSON, NormalisedInvoice | |
| logger = logging.getLogger(__name__) | |
| _DB_PATH = Path(__file__).parent / "data" / "kirana.db" | |
| _DDL = """ | |
| CREATE TABLE IF NOT EXISTS invoices ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| audit_run_id TEXT NOT NULL, | |
| invoice_number TEXT, | |
| supplier TEXT, | |
| invoice_date TEXT, | |
| invoice_json TEXT NOT NULL, | |
| created_at TEXT NOT NULL DEFAULT (datetime('now')) | |
| ); | |
| CREATE UNIQUE INDEX IF NOT EXISTS idx_invoices_audit ON invoices(audit_run_id); | |
| CREATE INDEX IF NOT EXISTS idx_invoices_number ON invoices(invoice_number); | |
| CREATE TABLE IF NOT EXISTS price_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| audit_run_id TEXT NOT NULL, | |
| product_id TEXT NOT NULL, | |
| unit_price REAL NOT NULL, | |
| supplier TEXT, | |
| audit_date TEXT NOT NULL DEFAULT (date('now')) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_price_product ON price_history(product_id, audit_date); | |
| CREATE TABLE IF NOT EXISTS agent_traces ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| audit_run_id TEXT NOT NULL UNIQUE, | |
| trace_json TEXT NOT NULL, | |
| published INTEGER NOT NULL DEFAULT 0, | |
| created_at TEXT NOT NULL DEFAULT (datetime('now')) | |
| ); | |
| """ | |
| _RETENTION_DAYS = 90 | |
| class StorageManager: | |
| def __init__(self, db_path: Path = _DB_PATH) -> None: | |
| self.db_path = db_path | |
| self.available = False | |
| self._conn: Optional[sqlite3.Connection] = None | |
| try: | |
| db_path.parent.mkdir(parents=True, exist_ok=True) | |
| self._conn = sqlite3.connect(str(db_path), check_same_thread=False) | |
| self._conn.row_factory = sqlite3.Row | |
| self.available = True | |
| except (sqlite3.OperationalError, OSError, PermissionError) as e: | |
| logger.warning("StorageManager: cannot open DB at %s: %s β running in degraded mode", db_path, e) | |
| # ββ Schema ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def initialise_schema(self) -> None: | |
| if not self.available: | |
| return | |
| try: | |
| self._conn.executescript(_DDL) | |
| self._conn.commit() | |
| # 90-day retention on price_history | |
| cutoff = (date.today() - timedelta(days=_RETENTION_DAYS)).isoformat() | |
| self._conn.execute("DELETE FROM price_history WHERE audit_date < ?", (cutoff,)) | |
| self._conn.commit() | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.initialise_schema failed: %s", e) | |
| self.available = False | |
| # ββ Write operations ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_invoice(self, audit_run_id: str, invoice: InvoiceJSON) -> None: | |
| if not self.available: | |
| return | |
| try: | |
| payload = json.dumps(invoice.__dict__, default=lambda o: o.__dict__) | |
| self._conn.execute( | |
| "INSERT OR REPLACE INTO invoices (audit_run_id, invoice_number, supplier, invoice_date, invoice_json) " | |
| "VALUES (?, ?, ?, ?, ?)", | |
| (audit_run_id, invoice.invoice_number, invoice.supplier, invoice.date, payload), | |
| ) | |
| self._conn.commit() | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.save_invoice failed: %s", e) | |
| self.available = False | |
| def save_price_history(self, audit_run_id: str, invoice: NormalisedInvoice) -> None: | |
| if not self.available: | |
| return | |
| try: | |
| rows = [ | |
| (audit_run_id, item.product_id, item.unit_price, invoice.supplier) | |
| for item in invoice.items | |
| if item.product_id is not None | |
| ] | |
| self._conn.executemany( | |
| "INSERT INTO price_history (audit_run_id, product_id, unit_price, supplier) VALUES (?,?,?,?)", | |
| rows, | |
| ) | |
| self._conn.commit() | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.save_price_history failed: %s", e) | |
| self.available = False | |
| def save_audit_run(self, audit_run_id: str, trace_json: str) -> None: | |
| if not self.available: | |
| return | |
| try: | |
| self._conn.execute( | |
| "INSERT OR REPLACE INTO agent_traces (audit_run_id, trace_json) VALUES (?,?)", | |
| (audit_run_id, trace_json), | |
| ) | |
| self._conn.commit() | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.save_audit_run failed: %s", e) | |
| self.available = False | |
| def mark_trace_published(self, audit_run_id: str) -> None: | |
| if not self.available: | |
| return | |
| try: | |
| self._conn.execute( | |
| "UPDATE agent_traces SET published=1 WHERE audit_run_id=?", | |
| (audit_run_id,), | |
| ) | |
| self._conn.commit() | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.mark_trace_published failed: %s", e) | |
| self.available = False | |
| # ββ Read operations βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_price_history(self, product_id: str) -> List[float]: | |
| if not self.available: | |
| return [] | |
| try: | |
| cutoff = (date.today() - timedelta(days=_RETENTION_DAYS)).isoformat() | |
| rows = self._conn.execute( | |
| "SELECT unit_price FROM price_history WHERE product_id=? AND audit_date >= ? ORDER BY audit_date DESC", | |
| (product_id, cutoff), | |
| ).fetchall() | |
| return [float(r["unit_price"]) for r in rows] | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.get_price_history failed: %s", e) | |
| self.available = False | |
| return [] | |
| def invoice_number_exists(self, invoice_number: str) -> Optional[str]: | |
| """Return prior audit_run_id if this invoice_number was already processed, else None.""" | |
| if not self.available or not invoice_number: | |
| return None | |
| try: | |
| row = self._conn.execute( | |
| "SELECT audit_run_id FROM invoices WHERE invoice_number=? LIMIT 1", | |
| (invoice_number,), | |
| ).fetchone() | |
| return row["audit_run_id"] if row else None | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.invoice_number_exists failed: %s", e) | |
| self.available = False | |
| return None | |
| def close(self) -> None: | |
| if self._conn: | |
| try: | |
| self._conn.close() | |
| except Exception: | |
| pass | |
| def get_recent_audits(self, limit: int = 20) -> List[Dict]: | |
| if not self.available: | |
| return [] | |
| try: | |
| rows = self._conn.execute( | |
| """ | |
| SELECT i.audit_run_id, i.invoice_number, i.supplier, i.invoice_date, | |
| i.created_at, t.published | |
| FROM invoices i | |
| LEFT JOIN agent_traces t ON i.audit_run_id = t.audit_run_id | |
| ORDER BY i.created_at DESC | |
| LIMIT ? | |
| """, | |
| (limit,), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| except sqlite3.OperationalError as e: | |
| logger.warning("StorageManager.get_recent_audits failed: %s", e) | |
| self.available = False | |
| return [] | |