from __future__ import annotations import json import logging import sqlite3 from datetime import date, timedelta from pathlib import Path from typing import Dict, List, Optional from models import InvoiceJSON, NormalisedInvoice logger = logging.getLogger(__name__) _DB_PATH = Path(__file__).parent / "data" / "kirana.db" _DDL = """ CREATE TABLE IF NOT EXISTS invoices ( id INTEGER PRIMARY KEY AUTOINCREMENT, audit_run_id TEXT NOT NULL, invoice_number TEXT, supplier TEXT, invoice_date TEXT, invoice_json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE UNIQUE INDEX IF NOT EXISTS idx_invoices_audit ON invoices(audit_run_id); CREATE INDEX IF NOT EXISTS idx_invoices_number ON invoices(invoice_number); CREATE TABLE IF NOT EXISTS price_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, audit_run_id TEXT NOT NULL, product_id TEXT NOT NULL, unit_price REAL NOT NULL, supplier TEXT, audit_date TEXT NOT NULL DEFAULT (date('now')) ); CREATE INDEX IF NOT EXISTS idx_price_product ON price_history(product_id, audit_date); CREATE TABLE IF NOT EXISTS agent_traces ( id INTEGER PRIMARY KEY AUTOINCREMENT, audit_run_id TEXT NOT NULL UNIQUE, trace_json TEXT NOT NULL, published INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ _RETENTION_DAYS = 90 class StorageManager: def __init__(self, db_path: Path = _DB_PATH) -> None: self.db_path = db_path self.available = False self._conn: Optional[sqlite3.Connection] = None try: db_path.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect(str(db_path), check_same_thread=False) self._conn.row_factory = sqlite3.Row self.available = True except (sqlite3.OperationalError, OSError, PermissionError) as e: logger.warning("StorageManager: cannot open DB at %s: %s — running in degraded mode", db_path, e) # ── Schema ──────────────────────────────────────────────────────────────── def initialise_schema(self) -> None: if not self.available: return try: self._conn.executescript(_DDL) self._conn.commit() # 90-day retention on price_history cutoff = (date.today() - timedelta(days=_RETENTION_DAYS)).isoformat() self._conn.execute("DELETE FROM price_history WHERE audit_date < ?", (cutoff,)) self._conn.commit() except sqlite3.OperationalError as e: logger.warning("StorageManager.initialise_schema failed: %s", e) self.available = False # ── Write operations ────────────────────────────────────────────────────── def save_invoice(self, audit_run_id: str, invoice: InvoiceJSON) -> None: if not self.available: return try: payload = json.dumps(invoice.__dict__, default=lambda o: o.__dict__) self._conn.execute( "INSERT OR REPLACE INTO invoices (audit_run_id, invoice_number, supplier, invoice_date, invoice_json) " "VALUES (?, ?, ?, ?, ?)", (audit_run_id, invoice.invoice_number, invoice.supplier, invoice.date, payload), ) self._conn.commit() except sqlite3.OperationalError as e: logger.warning("StorageManager.save_invoice failed: %s", e) self.available = False def save_price_history(self, audit_run_id: str, invoice: NormalisedInvoice) -> None: if not self.available: return try: rows = [ (audit_run_id, item.product_id, item.unit_price, invoice.supplier) for item in invoice.items if item.product_id is not None ] self._conn.executemany( "INSERT INTO price_history (audit_run_id, product_id, unit_price, supplier) VALUES (?,?,?,?)", rows, ) self._conn.commit() except sqlite3.OperationalError as e: logger.warning("StorageManager.save_price_history failed: %s", e) self.available = False def save_audit_run(self, audit_run_id: str, trace_json: str) -> None: if not self.available: return try: self._conn.execute( "INSERT OR REPLACE INTO agent_traces (audit_run_id, trace_json) VALUES (?,?)", (audit_run_id, trace_json), ) self._conn.commit() except sqlite3.OperationalError as e: logger.warning("StorageManager.save_audit_run failed: %s", e) self.available = False def mark_trace_published(self, audit_run_id: str) -> None: if not self.available: return try: self._conn.execute( "UPDATE agent_traces SET published=1 WHERE audit_run_id=?", (audit_run_id,), ) self._conn.commit() except sqlite3.OperationalError as e: logger.warning("StorageManager.mark_trace_published failed: %s", e) self.available = False # ── Read operations ─────────────────────────────────────────────────────── def get_price_history(self, product_id: str) -> List[float]: if not self.available: return [] try: cutoff = (date.today() - timedelta(days=_RETENTION_DAYS)).isoformat() rows = self._conn.execute( "SELECT unit_price FROM price_history WHERE product_id=? AND audit_date >= ? ORDER BY audit_date DESC", (product_id, cutoff), ).fetchall() return [float(r["unit_price"]) for r in rows] except sqlite3.OperationalError as e: logger.warning("StorageManager.get_price_history failed: %s", e) self.available = False return [] def invoice_number_exists(self, invoice_number: str) -> Optional[str]: """Return prior audit_run_id if this invoice_number was already processed, else None.""" if not self.available or not invoice_number: return None try: row = self._conn.execute( "SELECT audit_run_id FROM invoices WHERE invoice_number=? LIMIT 1", (invoice_number,), ).fetchone() return row["audit_run_id"] if row else None except sqlite3.OperationalError as e: logger.warning("StorageManager.invoice_number_exists failed: %s", e) self.available = False return None def close(self) -> None: if self._conn: try: self._conn.close() except Exception: pass def get_recent_audits(self, limit: int = 20) -> List[Dict]: if not self.available: return [] try: rows = self._conn.execute( """ SELECT i.audit_run_id, i.invoice_number, i.supplier, i.invoice_date, i.created_at, t.published FROM invoices i LEFT JOIN agent_traces t ON i.audit_run_id = t.audit_run_id ORDER BY i.created_at DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] except sqlite3.OperationalError as e: logger.warning("StorageManager.get_recent_audits failed: %s", e) self.available = False return []