kirana-detective / storage.py
naazimsnh02's picture
Finetuning completed for yolo26n-indian-fmcg-detection and minicpm5-1b-indian-fmcg-normalizer
7b5611f
Raw
History Blame
8.05 kB
from __future__ import annotations
import json
import logging
import sqlite3
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from models import InvoiceJSON, NormalisedInvoice
logger = logging.getLogger(__name__)
_DB_PATH = Path(__file__).parent / "data" / "kirana.db"
_DDL = """
CREATE TABLE IF NOT EXISTS invoices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
audit_run_id TEXT NOT NULL,
invoice_number TEXT,
supplier TEXT,
invoice_date TEXT,
invoice_json TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_invoices_audit ON invoices(audit_run_id);
CREATE INDEX IF NOT EXISTS idx_invoices_number ON invoices(invoice_number);
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
audit_run_id TEXT NOT NULL,
product_id TEXT NOT NULL,
unit_price REAL NOT NULL,
supplier TEXT,
audit_date TEXT NOT NULL DEFAULT (date('now'))
);
CREATE INDEX IF NOT EXISTS idx_price_product ON price_history(product_id, audit_date);
CREATE TABLE IF NOT EXISTS agent_traces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
audit_run_id TEXT NOT NULL UNIQUE,
trace_json TEXT NOT NULL,
published INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
_RETENTION_DAYS = 90
class StorageManager:
def __init__(self, db_path: Path = _DB_PATH) -> None:
self.db_path = db_path
self.available = False
self._conn: Optional[sqlite3.Connection] = None
try:
db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self.available = True
except (sqlite3.OperationalError, OSError, PermissionError) as e:
logger.warning("StorageManager: cannot open DB at %s: %s β€” running in degraded mode", db_path, e)
# ── Schema ────────────────────────────────────────────────────────────────
def initialise_schema(self) -> None:
if not self.available:
return
try:
self._conn.executescript(_DDL)
self._conn.commit()
# 90-day retention on price_history
cutoff = (date.today() - timedelta(days=_RETENTION_DAYS)).isoformat()
self._conn.execute("DELETE FROM price_history WHERE audit_date < ?", (cutoff,))
self._conn.commit()
except sqlite3.OperationalError as e:
logger.warning("StorageManager.initialise_schema failed: %s", e)
self.available = False
# ── Write operations ──────────────────────────────────────────────────────
def save_invoice(self, audit_run_id: str, invoice: InvoiceJSON) -> None:
if not self.available:
return
try:
payload = json.dumps(invoice.__dict__, default=lambda o: o.__dict__)
self._conn.execute(
"INSERT OR REPLACE INTO invoices (audit_run_id, invoice_number, supplier, invoice_date, invoice_json) "
"VALUES (?, ?, ?, ?, ?)",
(audit_run_id, invoice.invoice_number, invoice.supplier, invoice.date, payload),
)
self._conn.commit()
except sqlite3.OperationalError as e:
logger.warning("StorageManager.save_invoice failed: %s", e)
self.available = False
def save_price_history(self, audit_run_id: str, invoice: NormalisedInvoice) -> None:
if not self.available:
return
try:
rows = [
(audit_run_id, item.product_id, item.unit_price, invoice.supplier)
for item in invoice.items
if item.product_id is not None
]
self._conn.executemany(
"INSERT INTO price_history (audit_run_id, product_id, unit_price, supplier) VALUES (?,?,?,?)",
rows,
)
self._conn.commit()
except sqlite3.OperationalError as e:
logger.warning("StorageManager.save_price_history failed: %s", e)
self.available = False
def save_audit_run(self, audit_run_id: str, trace_json: str) -> None:
if not self.available:
return
try:
self._conn.execute(
"INSERT OR REPLACE INTO agent_traces (audit_run_id, trace_json) VALUES (?,?)",
(audit_run_id, trace_json),
)
self._conn.commit()
except sqlite3.OperationalError as e:
logger.warning("StorageManager.save_audit_run failed: %s", e)
self.available = False
def mark_trace_published(self, audit_run_id: str) -> None:
if not self.available:
return
try:
self._conn.execute(
"UPDATE agent_traces SET published=1 WHERE audit_run_id=?",
(audit_run_id,),
)
self._conn.commit()
except sqlite3.OperationalError as e:
logger.warning("StorageManager.mark_trace_published failed: %s", e)
self.available = False
# ── Read operations ───────────────────────────────────────────────────────
def get_price_history(self, product_id: str) -> List[float]:
if not self.available:
return []
try:
cutoff = (date.today() - timedelta(days=_RETENTION_DAYS)).isoformat()
rows = self._conn.execute(
"SELECT unit_price FROM price_history WHERE product_id=? AND audit_date >= ? ORDER BY audit_date DESC",
(product_id, cutoff),
).fetchall()
return [float(r["unit_price"]) for r in rows]
except sqlite3.OperationalError as e:
logger.warning("StorageManager.get_price_history failed: %s", e)
self.available = False
return []
def invoice_number_exists(self, invoice_number: str) -> Optional[str]:
"""Return prior audit_run_id if this invoice_number was already processed, else None."""
if not self.available or not invoice_number:
return None
try:
row = self._conn.execute(
"SELECT audit_run_id FROM invoices WHERE invoice_number=? LIMIT 1",
(invoice_number,),
).fetchone()
return row["audit_run_id"] if row else None
except sqlite3.OperationalError as e:
logger.warning("StorageManager.invoice_number_exists failed: %s", e)
self.available = False
return None
def close(self) -> None:
if self._conn:
try:
self._conn.close()
except Exception:
pass
def get_recent_audits(self, limit: int = 20) -> List[Dict]:
if not self.available:
return []
try:
rows = self._conn.execute(
"""
SELECT i.audit_run_id, i.invoice_number, i.supplier, i.invoice_date,
i.created_at, t.published
FROM invoices i
LEFT JOIN agent_traces t ON i.audit_run_id = t.audit_run_id
ORDER BY i.created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
except sqlite3.OperationalError as e:
logger.warning("StorageManager.get_recent_audits failed: %s", e)
self.available = False
return []