barissozudogru's picture
bundle research_papers_mcp source, drop git+install
57272d3 verified
Raw
History Blame Contribute Delete
11.2 kB
"""OpenAlex source: search + single-work lookup.
OpenAlex returns abstracts as an inverted index (``{word: [positions]}``)
rather than plain text, so every parser here passes through
``_reconstruct_abstract`` before populating the standard paper dict.
The polite pool is opted into by setting ``OPENALEX_MAILTO`` in the
environment. With a mailto the public limit is 10 req/s; without one it
collapses to ~1 req/s.
"""
import datetime
import logging
import os
import re
from ._http import _http
from .base import Capability, PaperSource, RateLimitHint, paper_dict
logger = logging.getLogger(__name__)
_OPENALEX_BASE_URL = "https://api.openalex.org"
_OPENALEX_WORKS_URL = f"{_OPENALEX_BASE_URL}/works"
# Cap per OpenAlex paging contract.
_OPENALEX_MAX_PER_PAGE = 200
_OPENALEX_ID_PREFIX = "https://openalex.org/"
_DOI_PREFIX = "https://doi.org/"
_PMID_PREFIX = "https://pubmed.ncbi.nlm.nih.gov/"
_OPENALEX_ID_RE = re.compile(r"W\d+", re.IGNORECASE)
def _polite_params() -> dict:
"""Return base query params, including ``mailto`` when configured."""
mailto = os.getenv("OPENALEX_MAILTO", "").strip()
return {"mailto": mailto} if mailto else {}
def _strip_openalex_id(value: str | None) -> str | None:
"""Drop the ``https://openalex.org/`` prefix from a Work URI."""
if not value:
return None
if value.startswith(_OPENALEX_ID_PREFIX):
return value[len(_OPENALEX_ID_PREFIX):]
return value
def _strip_doi(value: str | None) -> str | None:
if not value:
return None
if value.startswith(_DOI_PREFIX):
return value[len(_DOI_PREFIX):]
return value
def _strip_pmid(value: str | None) -> str | None:
if not value:
return None
if value.startswith(_PMID_PREFIX):
return value[len(_PMID_PREFIX):].rstrip("/")
return value
def _reconstruct_abstract(inverted_index: dict | None) -> str | None:
"""Reconstruct an abstract string from OpenAlex's inverted index format.
The index is ``{word: [positions, ...]}``. We rebuild a sparse list
sized to the largest position and then join tokens with spaces. Any
missing slots stay empty and are collapsed so a corrupt index degrades
to readable text rather than blowing up.
"""
if not inverted_index or not isinstance(inverted_index, dict):
return None
positions: list[tuple[int, str]] = []
for word, idxs in inverted_index.items():
if not isinstance(idxs, list):
continue
for idx in idxs:
if isinstance(idx, int) and idx >= 0:
positions.append((idx, word))
if not positions:
return None
positions.sort(key=lambda p: p[0])
tokens = [word for _, word in positions]
text = " ".join(tokens).strip()
return text or None
def _parse_authors(authorships: list | None) -> str | None:
if not authorships:
return None
names: list[str] = []
for authorship in authorships:
author = (authorship or {}).get("author") or {}
name = (author.get("display_name") or "").strip()
if name:
names.append(name)
return ", ".join(names) if names else None
def _parse_publication_date(work: dict) -> datetime.datetime | None:
date_str = work.get("publication_date")
if date_str:
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
pass
year = work.get("publication_year")
if isinstance(year, int):
try:
return datetime.datetime(year, 1, 1)
except ValueError:
return None
return None
def _parse_concepts(concepts: list | None) -> list[str]:
"""Pull human-readable concept names; ignore noise below level 0."""
if not concepts:
return []
names: list[str] = []
for c in concepts:
if not isinstance(c, dict):
continue
name = (c.get("display_name") or "").strip()
if not name:
continue
# Keep highest-confidence concepts only; OpenAlex returns long tails.
score = c.get("score")
if isinstance(score, (int, float)) and score < 0.15:
continue
names.append(name)
# Dedupe but preserve order.
seen: set[str] = set()
deduped: list[str] = []
for n in names:
key = n.lower()
if key in seen:
continue
seen.add(key)
deduped.append(n)
return deduped[:10]
def _parse_concept_records(concepts: list | None) -> list[dict]:
"""Structured concept assignments: name, hierarchy level, and score.
These feed the zero-cost OpenAlex topic backend, which needs the level to
keep mid-hierarchy concepts and the score as a confidence.
"""
if not concepts:
return []
out: list[dict] = []
for c in concepts:
if not isinstance(c, dict):
continue
name = (c.get("display_name") or "").strip()
if not name:
continue
out.append({"name": name, "level": c.get("level"), "score": c.get("score")})
return out
def _parse_work(work: dict) -> dict | None:
"""Convert one OpenAlex Work object into the standard paper dict."""
if not work:
return None
work_id = _strip_openalex_id(work.get("id"))
title = (work.get("title") or work.get("display_name") or "").strip()
if not work_id or not title:
return None
abstract = _reconstruct_abstract(work.get("abstract_inverted_index"))
authors = _parse_authors(work.get("authorships"))
pub_date = _parse_publication_date(work)
doi = _strip_doi(work.get("doi"))
citation_count = work.get("cited_by_count")
if not isinstance(citation_count, int):
citation_count = None
url = (
work.get("primary_location", {}).get("landing_page_url")
if isinstance(work.get("primary_location"), dict)
else None
)
if not url:
url = f"{_OPENALEX_ID_PREFIX}{work_id}"
concepts = _parse_concepts(work.get("concepts"))
paper = paper_dict(
title=title,
abstract=abstract,
authors=authors,
publication_date=pub_date,
source="openalex",
source_id=work_id,
url=url,
doi=doi,
topics=concepts,
citation_count=citation_count,
)
# Structured assignments drive the OpenAlex topic backend; the flat names
# in ``topics`` above are kept for backward compatibility.
paper["concepts"] = _parse_concept_records(work.get("concepts"))
return paper
def fetch_openalex(
query: str = "",
max_results: int = 20,
filters: dict | None = None,
) -> list[dict]:
"""Search OpenAlex Works and return parsed paper dicts.
Args:
query: Free-text search across title, abstract, and full text.
max_results: Capped at 200 by OpenAlex.
filters: Optional dict. ``filter`` may be a raw OpenAlex filter
string (e.g. ``"concepts.id:C41008148"``). ``from_publication_date``
and ``to_publication_date`` are forwarded as the corresponding
``filter`` clauses.
"""
filters = filters or {}
params: dict[str, str | int] = {
**_polite_params(),
"per-page": min(max(max_results, 1), _OPENALEX_MAX_PER_PAGE),
}
if query:
params["search"] = query
filter_clauses: list[str] = []
raw_filter = filters.get("filter")
if raw_filter:
filter_clauses.append(raw_filter)
if filters.get("from_publication_date"):
filter_clauses.append(f"from_publication_date:{filters['from_publication_date']}")
if filters.get("to_publication_date"):
filter_clauses.append(f"to_publication_date:{filters['to_publication_date']}")
if filter_clauses:
params["filter"] = ",".join(filter_clauses)
try:
resp = _http.get(_OPENALEX_WORKS_URL, params=params, timeout=20)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.warning("OpenAlex search failed: %s", exc)
return []
results = data.get("results") or []
papers: list[dict] = []
for work in results:
try:
parsed = _parse_work(work)
if parsed:
papers.append(parsed)
except Exception as exc:
logger.warning("Skipping malformed OpenAlex work: %s", exc)
return papers
def fetch_openalex_by_id(identifier: str) -> dict | None:
"""Fetch a single OpenAlex Work by its W-ID, DOI, or PMID URN."""
if not identifier:
return None
# OpenAlex accepts W-IDs directly and DOI/PMID URNs as path segments.
url = f"{_OPENALEX_WORKS_URL}/{identifier}"
try:
resp = _http.get(url, params=_polite_params(), timeout=15)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.warning("OpenAlex get_by_id failed for %s: %s", identifier, exc)
return None
return _parse_work(data)
def fetch_openalex_cross_refs(work_id: str) -> dict[str, str]:
"""Look up cross-source identifiers (DOI, PMID, MAG) for an OpenAlex Work.
Returns a dict with whichever of ``doi``, ``pmid``, ``mag`` are
populated. Prefixes are stripped so callers can compose
Semantic Scholar prefixed IDs directly.
"""
if not work_id or not _OPENALEX_ID_RE.fullmatch(work_id):
return {}
url = f"{_OPENALEX_WORKS_URL}/{work_id}"
try:
resp = _http.get(url, params=_polite_params(), timeout=10)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.warning("OpenAlex cross-ref lookup failed for %s: %s", work_id, exc)
return {}
ids = data.get("ids") or {}
out: dict[str, str] = {}
if doi := _strip_doi(ids.get("doi")):
out["doi"] = doi
if pmid := _strip_pmid(ids.get("pmid")):
out["pmid"] = pmid
mag = ids.get("mag")
if mag:
out["mag"] = str(mag)
return out
class OpenAlexSource(PaperSource):
"""OpenAlex Works API. Search + Work-ID/DOI/PMID lookup."""
name = "openalex"
def search(
self,
query: str,
max_results: int = 20,
filters: dict | None = None,
) -> list[dict]:
return fetch_openalex(
query=query,
max_results=max_results,
filters=filters,
)
def get_by_id(self, identifier: str) -> dict | None:
return fetch_openalex_by_id(identifier)
def supports(self, capability: Capability) -> bool:
# OpenAlex exposes citation edges via referenced_works/cited_by_api_url
# but we don't ship a dedicated fetcher for them yet.
return False
def rate_limit_hint(self) -> RateLimitHint:
if os.getenv("OPENALEX_MAILTO", "").strip():
return RateLimitHint(
min_interval_seconds=0.1,
daily_quota=100_000,
notes="OpenAlex polite pool: 10 req/s, 100k req/day with mailto",
)
return RateLimitHint(
min_interval_seconds=1.0,
daily_quota=100_000,
notes="OpenAlex public pool: ~1 req/s without mailto; set OPENALEX_MAILTO",
)