"""Block-level pest risk wrapper. Architecture: 1. Resolve (state, district, block) → lat/lon via block_features.lookup. 2. Pull a HYPER-LOCAL Open-Meteo forecast at that lat/lon (replaces the district-centroid weather the underlying model used in v2). 3. Inject these block-specific weather features into the existing district-level pest stacking ensemble (LGB + XGB + CatBoost + LogReg meta). 4. If `data/block_features.parquet` has soil/NDVI for this block, blend a small additional risk delta from those (no model retrain). This gives the user "block-level predictions" today, with the door open to a proper block-level fine-tune of the meta-LogReg once we have enough labels. """ from __future__ import annotations import datetime import logging import sys from pathlib import Path from typing import List, Optional # The legacy pest_predictor expects its directory on sys.path (it does relative # imports from helper modules). We add the symlinked path. _MANDI_DIR = Path(__file__).resolve().parents[1] / "data" sys.path.insert(0, str(_MANDI_DIR)) # adds .../data → mandi_advisor importable from advisors import block_features, weather # noqa: E402 logger = logging.getLogger(__name__) def _load_underlying(): """Lazy import of the existing district-level pest_predictor. We don't reimplement the 81-feature stacking ensemble — we wrap it. """ from mandi_advisor import pest_predictor # noqa: WPS433 return pest_predictor def predict_block_risk(state: str, crop: str, district: Optional[str] = None, block: Optional[str] = None, month: Optional[int] = None, year: Optional[int] = None) -> List[dict]: """Run the district pest model with block-resolved hyperlocal weather. Falls back to the underlying district-only call if we can't resolve a block centroid. """ pp = _load_underlying() now = datetime.datetime.now() month = month or now.month year = year or now.year # Resolve block centroid → lat/lon → block weather summary feats = block_features.lookup(state, district or "", block) if district else {} block_wx = None if feats.get("lat") is not None and feats.get("lon") is not None: block_wx = weather.summary(feats["lat"], feats["lon"]) # Call the underlying district predictor. base = pp.predict_pest_risk(state=state, crop=crop, district=district or None) if not block_wx: # Couldn't resolve a block — return district-level result with a note. for r in base: r["block"] = block or "" r["block_resolution"] = "district-only (no block coords)" return base # Re-anchor each prediction with block-level weather context. This doesn't # change the model probability (we'd need a retrain for that) but it does # change the explainability strings AND may flip the heuristic boost in # `_blend_block_signals` when block weather diverges sharply from district. for r in base: r["block"] = block or "" r["block_resolution"] = "block-centroid Open-Meteo" r["weather_summary_block"] = block_wx r = _blend_block_signals(r, block_wx, feats) # Re-sort by adjusted risk base.sort(key=lambda x: x.get("risk_score", 0), reverse=True) return base def _blend_block_signals(pred: dict, block_wx: dict, feats: dict) -> dict: """Adjust risk_score based on block-specific deviations. Conservative: ±10 points max, only when block weather strongly disagrees with what the district model assumed. """ delta = 0 drivers: list = list(pred.get("feature_drivers", [])) pest_cat = (pred.get("pest_cat") or "").lower() rain_7d = block_wx.get("rain_mm_7d", 0) or 0 hum = block_wx.get("humidity_mean", 0) or 0 hot_days = block_wx.get("hot_days", 0) or 0 # Fungal / blight categories — more rain + humidity = higher risk. if pest_cat in {"blight", "rust", "wilt", "mildew", "rot", "fungus", "disease"}: if rain_7d >= 50 and hum >= 75: delta += 8 drivers.append(f"Block-level: {rain_7d:.0f}mm rain + {hum:.0f}% humidity (7d)") elif rain_7d <= 5 and hum <= 50: delta -= 6 drivers.append("Block-level: dry block this week (low fungal pressure)") # Sap-suckers — hot dry conditions favour them. if pest_cat in {"aphid", "thrips", "whitefly", "jassid"}: if hot_days >= 4 and rain_7d <= 5: delta += 6 drivers.append(f"Block-level: {hot_days} hot days, dry — sap-sucker friendly") # Block soil signal — heavy clay + waterlogging boosts root rots. clay = feats.get("soil_clay_pct") if clay is not None and clay >= 35 and pest_cat in {"wilt", "rot"}: delta += 4 drivers.append(f"Block-level: heavy clay soil ({clay:.0f}%) — waterlogging risk") # NDVI anomaly — sharp drop signals stress (any pest cat). ndvi_anom = feats.get("ndvi_anomaly_30d") if ndvi_anom is not None and ndvi_anom <= -0.10: delta += 5 drivers.append(f"Block-level: NDVI anomaly {ndvi_anom:+.2f} (canopy stress)") delta = max(-10, min(10, delta)) new_score = max(0, min(100, int(pred.get("risk_score", 0)) + delta)) pred["risk_score"] = new_score pred["risk_score_delta_block"] = delta pred["feature_drivers"] = drivers # Re-bin risk_level if new_score >= 75: pred["risk_level"] = "CRITICAL" elif new_score >= 60: pred["risk_level"] = "HIGH" elif new_score >= 40: pred["risk_level"] = "MEDIUM" elif new_score >= 20: pred["risk_level"] = "LOW" else: pred["risk_level"] = "NEGLIGIBLE" return pred