barissozudogru's picture
bundle research_papers_mcp source, drop git+install
57272d3 verified
Raw
History Blame Contribute Delete
1.84 kB
"""Registry-driven federated search across all registered PaperSources."""
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from .base import REGISTRY
logger = logging.getLogger(__name__)
def federated_search(
query: str,
sources: list[str] | None = None,
max_results: int = 20,
filters: dict | None = None,
) -> list[dict]:
"""Search across registered sources concurrently and deduplicate results.
Iterates `REGISTRY` so adding a new source means registering it once;
no change is needed here. Dedupes by DOI first, then by
`(source, source_id)` so the same arXiv preprint surfaced through
multiple sources collapses to one row.
"""
active_names = list(sources) if sources else list(REGISTRY.keys())
targets = [(name, REGISTRY[name]) for name in active_names if name in REGISTRY]
if not targets:
return []
all_papers: list[dict] = []
with ThreadPoolExecutor(max_workers=max(len(targets), 1)) as executor:
futures = {
executor.submit(src.search, query, max_results, filters): name
for name, src in targets
}
for future in as_completed(futures):
source_name = futures[future]
try:
all_papers.extend(future.result())
except Exception as exc:
logger.warning("Source %s failed: %s", source_name, exc)
seen_dois: set[str] = set()
seen_ids: set[tuple[str, str]] = set()
unique: list[dict] = []
for p in all_papers:
doi = p.get("doi")
sid = (p["source"], p["source_id"])
if doi and doi in seen_dois:
continue
if sid in seen_ids:
continue
if doi:
seen_dois.add(doi)
seen_ids.add(sid)
unique.append(p)
return unique