"""Registry-driven federated search across all registered PaperSources.""" import logging from concurrent.futures import ThreadPoolExecutor, as_completed from .base import REGISTRY logger = logging.getLogger(__name__) def federated_search( query: str, sources: list[str] | None = None, max_results: int = 20, filters: dict | None = None, ) -> list[dict]: """Search across registered sources concurrently and deduplicate results. Iterates `REGISTRY` so adding a new source means registering it once; no change is needed here. Dedupes by DOI first, then by `(source, source_id)` so the same arXiv preprint surfaced through multiple sources collapses to one row. """ active_names = list(sources) if sources else list(REGISTRY.keys()) targets = [(name, REGISTRY[name]) for name in active_names if name in REGISTRY] if not targets: return [] all_papers: list[dict] = [] with ThreadPoolExecutor(max_workers=max(len(targets), 1)) as executor: futures = { executor.submit(src.search, query, max_results, filters): name for name, src in targets } for future in as_completed(futures): source_name = futures[future] try: all_papers.extend(future.result()) except Exception as exc: logger.warning("Source %s failed: %s", source_name, exc) seen_dois: set[str] = set() seen_ids: set[tuple[str, str]] = set() unique: list[dict] = [] for p in all_papers: doi = p.get("doi") sid = (p["source"], p["source_id"]) if doi and doi in seen_dois: continue if sid in seen_ids: continue if doi: seen_dois.add(doi) seen_ids.add(sid) unique.append(p) return unique