""" DuckDuckGo Live Search Adapter Provides real-time news search using DuckDuckGo's news search API. Integrates with the hybrid RAG pipeline to supplement database results with fresh, live content. Features: - Async execution with timeout (2s default) - Ethiopia-focused filtering - Error handling and graceful fallbacks - Result normalization for hybrid ranking """ import logging import asyncio from typing import List, Dict, Any, Optional from datetime import datetime import traceback logger = logging.getLogger(__name__) try: from ddgs import DDGS HAS_DDGS = True except ImportError: # Fallback to old package name for backward compatibility try: from duckduckgo_search import DDGS HAS_DDGS = True except ImportError: HAS_DDGS = False logger.warning("ddgs (duckduckgo-search) not installed. Live search disabled.") class DuckDuckGoAdapter: """ Adapter for DuckDuckGo news search. Provides real-time news results to complement database search. Designed to be fast (2s timeout) and resilient (graceful fallbacks). """ def __init__(self, timeout: float = 1.5, max_results: int = 15): """ Initialize DuckDuckGo adapter. Args: timeout: Maximum time to wait for results (seconds) max_results: Maximum number of results to return """ self.timeout = timeout self.max_results = max_results self.ddgs = DDGS() if HAS_DDGS else None self.retry_count = 1 # Retry once on failure # Ethiopia context detection - multi-tier approach # Tier 1: Direct Ethiopia mentions self.ethiopia_direct = { "ethiopia", "ethiopian", "ethiopians", "addis ababa" } # Tier 2: Ethiopian regions (strong Ethiopia context) self.ethiopia_regions = { "amhara", "tigray", "oromia", "somali region", "afar", "sidama", "snnpr", "benishangul", "gambela", "harari", "dire dawa" } # Tier 3: Ethiopian political entities (strong Ethiopia context) self.ethiopia_political = { "abiy ahmed", "endf", "tplf", "fano", "oneg", "olf", "prosperity party", "eprdf", "ethiopian government" } # Tier 4: Horn of Africa context (weak Ethiopia context - needs boost) self.horn_africa = { "horn of africa", "east africa", "nile dam", "gerd", "renaissance dam" } # Tier 5: Neighboring countries (NO Ethiopia context - don't add filter) self.neighboring_countries = { "somalia", "somali", "kenya", "kenyan", "sudan", "sudanese", "south sudan", "eritrea", "eritrean", "djibouti" } if not HAS_DDGS: logger.error( "DuckDuckGo search unavailable. " "Install with: pip install duckduckgo-search" ) def _analyze_ethiopia_context(self, query: str) -> Dict[str, Any]: """ Analyze query to determine Ethiopia context and optimal search strategy. Returns: { "has_ethiopia_context": bool, "context_strength": str, # "strong", "medium", "weak", "none" "should_add_filter": bool, "search_modifier": str, # What to add to query "reason": str } """ query_lower = query.lower() # Tier 1: Direct Ethiopia mention - STRONG context, no filter needed if any(term in query_lower for term in self.ethiopia_direct): return { "has_ethiopia_context": True, "context_strength": "strong", "should_add_filter": False, "search_modifier": "", "reason": "Direct Ethiopia mention detected" } # Tier 2: Ethiopian regions - STRONG context, no filter needed if any(region in query_lower for region in self.ethiopia_regions): return { "has_ethiopia_context": True, "context_strength": "strong", "should_add_filter": False, "search_modifier": "", "reason": f"Ethiopian region detected" } # Tier 3: Ethiopian political entities - STRONG context, no filter needed if any(entity in query_lower for entity in self.ethiopia_political): return { "has_ethiopia_context": True, "context_strength": "strong", "should_add_filter": False, "search_modifier": "", "reason": "Ethiopian political entity detected" } # Tier 4: Horn of Africa - MEDIUM context, add Ethiopia for specificity if any(term in query_lower for term in self.horn_africa): return { "has_ethiopia_context": True, "context_strength": "medium", "should_add_filter": True, "search_modifier": "Ethiopia", "reason": "Horn of Africa context - adding Ethiopia for specificity" } # Tier 5: Neighboring countries - NO Ethiopia context, don't add filter if any(country in query_lower for country in self.neighboring_countries): return { "has_ethiopia_context": False, "context_strength": "none", "should_add_filter": False, "search_modifier": "", "reason": "Neighboring country detected - respecting user intent" } # Default: No Ethiopia context - WEAK, add filter for Ethiopia focus return { "has_ethiopia_context": False, "context_strength": "weak", "should_add_filter": True, "search_modifier": "Ethiopia OR \"Horn of Africa\"", "reason": "No Ethiopia context - adding broad filter" } async def search( self, query: str, max_results: Optional[int] = None, region: str = "et-en", # Ethiopia English add_ethiopia_filter: bool = None # Auto-detect if None ) -> List[Dict[str, Any]]: """ Search DuckDuckGo news for the given query with smart Ethiopia filtering. Args: query: Search query max_results: Override default max_results region: DuckDuckGo region code (et-en = Ethiopia English) add_ethiopia_filter: Override auto-detection (None = auto-detect) Returns: List of normalized search results """ if not self.ddgs: logger.warning("DuckDuckGo unavailable - returning empty results") return [] max_results = max_results or self.max_results # Smart Ethiopia filtering with context analysis if add_ethiopia_filter is None: # Auto-detect using multi-tier analysis context = self._analyze_ethiopia_context(query) logger.info( f"[DDG] Context analysis: {context['context_strength']} " f"({context['reason']})" ) if context["should_add_filter"]: search_query = f"{query} {context['search_modifier']}" logger.info(f"[DDG] Enhanced query: '{search_query}'") else: search_query = query logger.info(f"[DDG] Using original query (sufficient context)") else: # Manual override search_query = f"{query} Ethiopia" if add_ethiopia_filter else query logger.info(f"[DDG] Manual filter override: {add_ethiopia_filter}") # Try search with retry for attempt in range(self.retry_count + 1): try: # Run sync DuckDuckGo search in thread pool with timeout loop = asyncio.get_event_loop() results = await asyncio.wait_for( loop.run_in_executor( None, self._search_sync, search_query, max_results, region ), timeout=self.timeout ) logger.info( f"[DDG] Search completed: '{query[:50]}' → {len(results)} results " f"(attempt {attempt + 1}/{self.retry_count + 1})" ) return results except asyncio.TimeoutError: if attempt < self.retry_count: logger.warning( f"[DDG] Timeout ({self.timeout}s) - retrying ({attempt + 1}/{self.retry_count})" ) await asyncio.sleep(0.5) # Brief delay before retry continue else: logger.warning( f"[DDG] Search timeout ({self.timeout}s) after {self.retry_count + 1} attempts" ) return [] except Exception as e: if attempt < self.retry_count: logger.warning( f"[DDG] Error: {e} - retrying ({attempt + 1}/{self.retry_count})" ) await asyncio.sleep(0.5) continue else: logger.error( f"[DDG] Search error after {self.retry_count + 1} attempts: {e}\n" f"{traceback.format_exc()}" ) return [] return [] def _search_sync( self, query: str, max_results: int, region: str ) -> List[Dict[str, Any]]: """ Synchronous DuckDuckGo search (runs in thread pool). Args: query: Search query max_results: Maximum results to return region: DuckDuckGo region code Returns: List of normalized results """ results = [] try: # DuckDuckGo news search (ddgs package uses query as first positional arg) raw_results = self.ddgs.news( query, # First positional argument region=region, max_results=max_results ) # Normalize results to common format for r in raw_results: normalized = self._normalize_result(r) if normalized: results.append(normalized) except Exception as e: # Handle specific DuckDuckGo errors gracefully error_msg = str(e) if "DecodeError" in error_msg or "Body collection error" in error_msg: logger.warning(f"DuckDuckGo decode error (likely rate limit or API issue): {e}") # Return empty results instead of raising - system will use database fallback return [] elif "No results found" in error_msg: logger.debug(f"DuckDuckGo: No results for query '{query[:50]}'") return [] else: logger.error(f"DuckDuckGo API error: {e}") # Return empty results for graceful degradation return [] return results def _normalize_result(self, raw_result: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Normalize DuckDuckGo result to common format. Args: raw_result: Raw result from DuckDuckGo API Returns: Normalized result dict or None if invalid """ try: # Extract fields (DuckDuckGo news format) title = raw_result.get("title", "").strip() url = raw_result.get("url", "").strip() snippet = raw_result.get("body", "").strip() source = raw_result.get("source", "").strip() date_str = raw_result.get("date") # Validate required fields if not title or not url: logger.debug(f"Skipping invalid result: missing title or URL") return None # Parse date published_at = self._parse_date(date_str) # Calculate freshness score (live results are freshest) freshness_score = self._calculate_freshness(published_at) image_url = raw_result.get("image") or raw_result.get("thumbnail") return { "title": title, "url": url, "content": snippet or title, # Use title if no snippet "snippet": snippet, "source": source or self._extract_domain(url), "published_at": published_at, "image_url": image_url, "source_type": "live", "is_live": True, "freshness_score": freshness_score, "language": "en", # DuckDuckGo returns English "metadata": { "title": title, "url": url, "source": source, "published_at": published_at, "image_url": image_url, "search_engine": "duckduckgo" } } except Exception as e: logger.warning(f"Failed to normalize result: {e}") return None def _parse_date(self, date_str: Optional[str]) -> str: """ Parse date string to ISO format. Args: date_str: Date string from DuckDuckGo Returns: ISO format date string or current time if parsing fails """ if not date_str: return datetime.utcnow().isoformat() try: # DuckDuckGo returns ISO-like format # Try parsing common formats from dateutil import parser parsed = parser.parse(date_str) return parsed.isoformat() except: # Fallback to current time return datetime.utcnow().isoformat() def _calculate_freshness(self, published_at: str) -> float: """ Calculate freshness score based on article age. Args: published_at: ISO format date string Returns: Freshness score (0.0 to 1.0) """ try: pub_date = datetime.fromisoformat(published_at.replace('Z', '+00:00')) age = datetime.utcnow() - pub_date.replace(tzinfo=None) age_minutes = age.total_seconds() / 60 # Live results are very fresh if age_minutes < 10: return 1.0 # < 10 min elif age_minutes < 60: return 0.95 # < 1 hour elif age_minutes < 360: return 0.9 # < 6 hours elif age_minutes < 1440: return 0.85 # < 24 hours else: return 0.8 # Older but still from live search except: return 1.0 # Default to fresh for live results def _extract_domain(self, url: str) -> str: """ Extract domain name from URL. Args: url: Full URL Returns: Domain name (e.g., "bbc.com") """ try: from urllib.parse import urlparse parsed = urlparse(url) domain = parsed.netloc # Remove www. prefix if domain.startswith("www."): domain = domain[4:] return domain except: return "unknown" def is_available(self) -> bool: """ Check if DuckDuckGo search is available. Returns: True if available, False otherwise """ return HAS_DDGS and self.ddgs is not None # Module-level singleton for easy import _default_adapter = None def get_duckduckgo_adapter(timeout: float = 1.5, max_results: int = 15) -> DuckDuckGoAdapter: """ Get or create the default DuckDuckGo adapter instance. Args: timeout: Search timeout in seconds max_results: Maximum results to return Returns: DuckDuckGoAdapter instance """ global _default_adapter if _default_adapter is None: _default_adapter = DuckDuckGoAdapter(timeout=timeout, max_results=max_results) return _default_adapter