"""Hugging Face Space wrapper for research-papers-mcp federated search. Live API calls to arXiv, PubMed, Semantic Scholar, and OpenAlex. No persistent state. """ from __future__ import annotations import logging from concurrent.futures import ThreadPoolExecutor, as_completed import gradio as gr import pandas as pd from research_papers_mcp.sources import REGISTRY logger = logging.getLogger(__name__) AVAILABLE_SOURCES = list(REGISTRY.keys()) EXAMPLE_QUERIES = [ "graph neural networks for protein folding", "diffusion models for medical imaging", "mechanistic interpretability of large language models", "private 5G for autonomous systems", ] def _format_authors(authors): if not authors: return "" short = ", ".join(authors[:3]) if len(authors) > 3: short += " et al." return short def _dedupe(papers): seen_dois = set() seen_source_ids = set() out = [] for p in papers: doi = (p.get("doi") or "").lower().strip() or None sid = (p.get("source"), p.get("source_id")) if doi and doi in seen_dois: continue if sid in seen_source_ids: continue if doi: seen_dois.add(doi) seen_source_ids.add(sid) out.append(p) return out def search( query: str, source_list: list, max_results: int, progress=gr.Progress(), ): """Federated academic paper search across arXiv, PubMed, Semantic Scholar, and OpenAlex. Returns deduplicated results from the selected sources. Deduplication is by DOI when available, then by (source, source_id). Args: query: Search query string. source_list: List of source names to query. Valid values: arxiv, pubmed, semantic_scholar, openalex. max_results: Maximum results to fetch per source (5-50). Returns: Tuple of (DataFrame of deduplicated papers, markdown diagnostics string). """ if not query or not query.strip(): return pd.DataFrame(), "Enter a search query above." if not source_list: return pd.DataFrame(), "Select at least one source." targets = [(name, REGISTRY[name]) for name in source_list if name in REGISTRY] if not targets: return pd.DataFrame(), "No registered sources available." progress(0.05, desc="submitting queries") per_source_counts = {name: 0 for name, _ in targets} per_source_status = {name: "..." for name, _ in targets} all_papers = [] with ThreadPoolExecutor(max_workers=max(len(targets), 1)) as executor: futures = { executor.submit(src.search, query, int(max_results), None): name for name, src in targets } done = 0 for future in as_completed(futures): source_name = futures[future] try: results = future.result() per_source_counts[source_name] = len(results) per_source_status[source_name] = "ok" all_papers.extend(results) except Exception as exc: per_source_status[source_name] = "failed" logger.warning("Source %s failed: %s", source_name, exc) done += 1 progress(0.1 + 0.7 * done / len(targets), desc=f"received {source_name}") progress(0.85, desc="deduplicating") deduped = _dedupe(all_papers) diag_parts = [] for name, _ in targets: count = per_source_counts[name] status = per_source_status[name] if status == "failed": diag_parts.append(f"**{name}** failed") else: diag_parts.append(f"**{name}** {count}") diag_line = " · ".join(diag_parts) total_raw = sum(per_source_counts.values()) dedup_drop = total_raw - len(deduped) diagnostics = ( f"{diag_line}\n\n" f"Raw: {total_raw} → deduplicated to **{len(deduped)}** " f"({dedup_drop} duplicates removed)." ) if not deduped: return pd.DataFrame(), diagnostics rows = [] for p in deduped: title = (p.get("title") or "").strip() authors = _format_authors(p.get("authors") or []) year = p.get("year") or "" source = p.get("source") or "" link = p.get("url") or p.get("doi_url") or "" rows.append([title, authors, year, source, link]) progress(1.0, desc="done") df = pd.DataFrame(rows, columns=["Title", "Authors", "Year", "Source", "Link"]) return df, diagnostics theme = gr.themes.Soft( primary_hue="indigo", secondary_hue="indigo", neutral_hue="slate", ) with gr.Blocks(title="Research Papers Federated Search", theme=theme) as demo: gr.Markdown("# Research Papers Federated Search") gr.Markdown( "Federated search across arXiv, PubMed, Semantic Scholar, and OpenAlex. " "Results are deduplicated by DOI when available, then by `(source, source_id)`." ) with gr.Row(): with gr.Column(scale=4): query = gr.Textbox( label="Query", placeholder="e.g., graph neural networks for protein folding", lines=1, ) with gr.Column(scale=1): max_results = gr.Slider( minimum=5, maximum=50, value=15, step=5, label="Max per source", ) sources_input = gr.CheckboxGroup( choices=AVAILABLE_SOURCES, value=AVAILABLE_SOURCES, label="Sources", ) submit = gr.Button("Search", variant="primary") gr.Examples( examples=[[q] for q in EXAMPLE_QUERIES], inputs=[query], label="Example queries", ) diagnostics_out = gr.Markdown() results_out = gr.DataFrame( headers=["Title", "Authors", "Year", "Source", "Link"], wrap=True, interactive=False, label="Results", ) submit.click( fn=search, inputs=[query, sources_input, max_results], outputs=[results_out, diagnostics_out], ) query.submit( fn=search, inputs=[query, sources_input, max_results], outputs=[results_out, diagnostics_out], ) gr.Markdown( "First query after idle takes 30-60s while the Space wakes up. " "Rate-limited sources may return partial results." ) if __name__ == "__main__": demo.launch(mcp_server=True)