"""PaperSource protocol and the federated registry. Concrete sources implement `PaperSource` and register themselves into `REGISTRY` (explicit registration in `sources/__init__.py`, no metaclass magic). The federated dispatcher iterates `REGISTRY`; `_VALID_SOURCES` in the MCP server is derived from `REGISTRY.keys()`. The contract is deliberately narrow. It covers what arXiv, PubMed, Semantic Scholar, and OpenAlex actually expose today. Pagination, streaming, and other speculative concerns are intentionally absent until a real source forces them. """ from __future__ import annotations import datetime from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Literal Capability = Literal["citations", "references", "recs", "full_text"] @dataclass(frozen=True) class RateLimitHint: """Conservative per-source rate-limit guidance for the dispatcher. Sources advertise the floor of their public rate limits so callers can pace requests. Concrete enforcement still lives in the source itself (Retry on 429, per-batch sleeps). """ min_interval_seconds: float daily_quota: int | None = None notes: str | None = None def paper_dict( title: str, source: str, source_id: str, abstract: str | None = None, authors: str | None = None, publication_date: datetime.datetime | None = None, url: str | None = None, doi: str | None = None, topics: list[str] | None = None, citation_count: int | None = None, influential_citation_count: int | None = None, ) -> dict: """Construct a paper dict in the schema every source must return.""" return { "title": title, "abstract": abstract, "authors": authors, "publication_date": publication_date, "source": source, "source_id": source_id, "url": url, "doi": doi, "topics": topics or [], "citation_count": citation_count, "influential_citation_count": influential_citation_count, } # Backwards-compatible alias for the original private helper name. _paper_dict = paper_dict class PaperSource(ABC): """Abstract base class for a remote paper source. Subclasses set `name` and implement `search`, `get_by_id`, `supports`, and `rate_limit_hint`. Instances are registered into `REGISTRY` from `sources/__init__.py`. """ name: str = "" @abstractmethod def search( self, query: str, max_results: int = 20, filters: dict | None = None, ) -> list[dict]: """Search the source and return papers in the standard dict schema.""" @abstractmethod def get_by_id(self, identifier: str) -> dict | None: """Fetch one paper by its source-native identifier. Returns `None` if the identifier does not resolve. The identifier format is source-specific (arXiv ID, PMID, S2 paper ID / DOI / etc.). """ @abstractmethod def supports(self, capability: Capability) -> bool: """Whether this source can supply a capability beyond plain search.""" @abstractmethod def rate_limit_hint(self) -> RateLimitHint: """Conservative rate-limit guidance for the dispatcher.""" REGISTRY: dict[str, PaperSource] = {} def register_source(source: PaperSource) -> PaperSource: """Register a PaperSource instance under its `name`. Replaces any prior registration under the same name so test stubs can swap in temporarily. """ name = source.name if not name: raise ValueError("PaperSource must declare a non-empty name") REGISTRY[name] = source return source def unregister_source(name: str) -> PaperSource | None: """Remove a source from the registry. Returns the removed instance, if any.""" return REGISTRY.pop(name, None)