Spaces:
Sleeping
Sleeping
| """Registry-driven federated search across all registered PaperSources.""" | |
| import logging | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from .base import REGISTRY | |
| logger = logging.getLogger(__name__) | |
| def federated_search( | |
| query: str, | |
| sources: list[str] | None = None, | |
| max_results: int = 20, | |
| filters: dict | None = None, | |
| ) -> list[dict]: | |
| """Search across registered sources concurrently and deduplicate results. | |
| Iterates `REGISTRY` so adding a new source means registering it once; | |
| no change is needed here. Dedupes by DOI first, then by | |
| `(source, source_id)` so the same arXiv preprint surfaced through | |
| multiple sources collapses to one row. | |
| """ | |
| active_names = list(sources) if sources else list(REGISTRY.keys()) | |
| targets = [(name, REGISTRY[name]) for name in active_names if name in REGISTRY] | |
| if not targets: | |
| return [] | |
| all_papers: list[dict] = [] | |
| with ThreadPoolExecutor(max_workers=max(len(targets), 1)) as executor: | |
| futures = { | |
| executor.submit(src.search, query, max_results, filters): name | |
| for name, src in targets | |
| } | |
| for future in as_completed(futures): | |
| source_name = futures[future] | |
| try: | |
| all_papers.extend(future.result()) | |
| except Exception as exc: | |
| logger.warning("Source %s failed: %s", source_name, exc) | |
| seen_dois: set[str] = set() | |
| seen_ids: set[tuple[str, str]] = set() | |
| unique: list[dict] = [] | |
| for p in all_papers: | |
| doi = p.get("doi") | |
| sid = (p["source"], p["source_id"]) | |
| if doi and doi in seen_dois: | |
| continue | |
| if sid in seen_ids: | |
| continue | |
| if doi: | |
| seen_dois.add(doi) | |
| seen_ids.add(sid) | |
| unique.append(p) | |
| return unique | |