Spaces:
Sleeping
Sleeping
| """OpenAlex source: search + single-work lookup. | |
| OpenAlex returns abstracts as an inverted index (``{word: [positions]}``) | |
| rather than plain text, so every parser here passes through | |
| ``_reconstruct_abstract`` before populating the standard paper dict. | |
| The polite pool is opted into by setting ``OPENALEX_MAILTO`` in the | |
| environment. With a mailto the public limit is 10 req/s; without one it | |
| collapses to ~1 req/s. | |
| """ | |
| import datetime | |
| import logging | |
| import os | |
| import re | |
| from ._http import _http | |
| from .base import Capability, PaperSource, RateLimitHint, paper_dict | |
| logger = logging.getLogger(__name__) | |
| _OPENALEX_BASE_URL = "https://api.openalex.org" | |
| _OPENALEX_WORKS_URL = f"{_OPENALEX_BASE_URL}/works" | |
| # Cap per OpenAlex paging contract. | |
| _OPENALEX_MAX_PER_PAGE = 200 | |
| _OPENALEX_ID_PREFIX = "https://openalex.org/" | |
| _DOI_PREFIX = "https://doi.org/" | |
| _PMID_PREFIX = "https://pubmed.ncbi.nlm.nih.gov/" | |
| _OPENALEX_ID_RE = re.compile(r"W\d+", re.IGNORECASE) | |
| def _polite_params() -> dict: | |
| """Return base query params, including ``mailto`` when configured.""" | |
| mailto = os.getenv("OPENALEX_MAILTO", "").strip() | |
| return {"mailto": mailto} if mailto else {} | |
| def _strip_openalex_id(value: str | None) -> str | None: | |
| """Drop the ``https://openalex.org/`` prefix from a Work URI.""" | |
| if not value: | |
| return None | |
| if value.startswith(_OPENALEX_ID_PREFIX): | |
| return value[len(_OPENALEX_ID_PREFIX):] | |
| return value | |
| def _strip_doi(value: str | None) -> str | None: | |
| if not value: | |
| return None | |
| if value.startswith(_DOI_PREFIX): | |
| return value[len(_DOI_PREFIX):] | |
| return value | |
| def _strip_pmid(value: str | None) -> str | None: | |
| if not value: | |
| return None | |
| if value.startswith(_PMID_PREFIX): | |
| return value[len(_PMID_PREFIX):].rstrip("/") | |
| return value | |
| def _reconstruct_abstract(inverted_index: dict | None) -> str | None: | |
| """Reconstruct an abstract string from OpenAlex's inverted index format. | |
| The index is ``{word: [positions, ...]}``. We rebuild a sparse list | |
| sized to the largest position and then join tokens with spaces. Any | |
| missing slots stay empty and are collapsed so a corrupt index degrades | |
| to readable text rather than blowing up. | |
| """ | |
| if not inverted_index or not isinstance(inverted_index, dict): | |
| return None | |
| positions: list[tuple[int, str]] = [] | |
| for word, idxs in inverted_index.items(): | |
| if not isinstance(idxs, list): | |
| continue | |
| for idx in idxs: | |
| if isinstance(idx, int) and idx >= 0: | |
| positions.append((idx, word)) | |
| if not positions: | |
| return None | |
| positions.sort(key=lambda p: p[0]) | |
| tokens = [word for _, word in positions] | |
| text = " ".join(tokens).strip() | |
| return text or None | |
| def _parse_authors(authorships: list | None) -> str | None: | |
| if not authorships: | |
| return None | |
| names: list[str] = [] | |
| for authorship in authorships: | |
| author = (authorship or {}).get("author") or {} | |
| name = (author.get("display_name") or "").strip() | |
| if name: | |
| names.append(name) | |
| return ", ".join(names) if names else None | |
| def _parse_publication_date(work: dict) -> datetime.datetime | None: | |
| date_str = work.get("publication_date") | |
| if date_str: | |
| try: | |
| return datetime.datetime.strptime(date_str, "%Y-%m-%d") | |
| except ValueError: | |
| pass | |
| year = work.get("publication_year") | |
| if isinstance(year, int): | |
| try: | |
| return datetime.datetime(year, 1, 1) | |
| except ValueError: | |
| return None | |
| return None | |
| def _parse_concepts(concepts: list | None) -> list[str]: | |
| """Pull human-readable concept names; ignore noise below level 0.""" | |
| if not concepts: | |
| return [] | |
| names: list[str] = [] | |
| for c in concepts: | |
| if not isinstance(c, dict): | |
| continue | |
| name = (c.get("display_name") or "").strip() | |
| if not name: | |
| continue | |
| # Keep highest-confidence concepts only; OpenAlex returns long tails. | |
| score = c.get("score") | |
| if isinstance(score, (int, float)) and score < 0.15: | |
| continue | |
| names.append(name) | |
| # Dedupe but preserve order. | |
| seen: set[str] = set() | |
| deduped: list[str] = [] | |
| for n in names: | |
| key = n.lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| deduped.append(n) | |
| return deduped[:10] | |
| def _parse_concept_records(concepts: list | None) -> list[dict]: | |
| """Structured concept assignments: name, hierarchy level, and score. | |
| These feed the zero-cost OpenAlex topic backend, which needs the level to | |
| keep mid-hierarchy concepts and the score as a confidence. | |
| """ | |
| if not concepts: | |
| return [] | |
| out: list[dict] = [] | |
| for c in concepts: | |
| if not isinstance(c, dict): | |
| continue | |
| name = (c.get("display_name") or "").strip() | |
| if not name: | |
| continue | |
| out.append({"name": name, "level": c.get("level"), "score": c.get("score")}) | |
| return out | |
| def _parse_work(work: dict) -> dict | None: | |
| """Convert one OpenAlex Work object into the standard paper dict.""" | |
| if not work: | |
| return None | |
| work_id = _strip_openalex_id(work.get("id")) | |
| title = (work.get("title") or work.get("display_name") or "").strip() | |
| if not work_id or not title: | |
| return None | |
| abstract = _reconstruct_abstract(work.get("abstract_inverted_index")) | |
| authors = _parse_authors(work.get("authorships")) | |
| pub_date = _parse_publication_date(work) | |
| doi = _strip_doi(work.get("doi")) | |
| citation_count = work.get("cited_by_count") | |
| if not isinstance(citation_count, int): | |
| citation_count = None | |
| url = ( | |
| work.get("primary_location", {}).get("landing_page_url") | |
| if isinstance(work.get("primary_location"), dict) | |
| else None | |
| ) | |
| if not url: | |
| url = f"{_OPENALEX_ID_PREFIX}{work_id}" | |
| concepts = _parse_concepts(work.get("concepts")) | |
| paper = paper_dict( | |
| title=title, | |
| abstract=abstract, | |
| authors=authors, | |
| publication_date=pub_date, | |
| source="openalex", | |
| source_id=work_id, | |
| url=url, | |
| doi=doi, | |
| topics=concepts, | |
| citation_count=citation_count, | |
| ) | |
| # Structured assignments drive the OpenAlex topic backend; the flat names | |
| # in ``topics`` above are kept for backward compatibility. | |
| paper["concepts"] = _parse_concept_records(work.get("concepts")) | |
| return paper | |
| def fetch_openalex( | |
| query: str = "", | |
| max_results: int = 20, | |
| filters: dict | None = None, | |
| ) -> list[dict]: | |
| """Search OpenAlex Works and return parsed paper dicts. | |
| Args: | |
| query: Free-text search across title, abstract, and full text. | |
| max_results: Capped at 200 by OpenAlex. | |
| filters: Optional dict. ``filter`` may be a raw OpenAlex filter | |
| string (e.g. ``"concepts.id:C41008148"``). ``from_publication_date`` | |
| and ``to_publication_date`` are forwarded as the corresponding | |
| ``filter`` clauses. | |
| """ | |
| filters = filters or {} | |
| params: dict[str, str | int] = { | |
| **_polite_params(), | |
| "per-page": min(max(max_results, 1), _OPENALEX_MAX_PER_PAGE), | |
| } | |
| if query: | |
| params["search"] = query | |
| filter_clauses: list[str] = [] | |
| raw_filter = filters.get("filter") | |
| if raw_filter: | |
| filter_clauses.append(raw_filter) | |
| if filters.get("from_publication_date"): | |
| filter_clauses.append(f"from_publication_date:{filters['from_publication_date']}") | |
| if filters.get("to_publication_date"): | |
| filter_clauses.append(f"to_publication_date:{filters['to_publication_date']}") | |
| if filter_clauses: | |
| params["filter"] = ",".join(filter_clauses) | |
| try: | |
| resp = _http.get(_OPENALEX_WORKS_URL, params=params, timeout=20) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| except Exception as exc: | |
| logger.warning("OpenAlex search failed: %s", exc) | |
| return [] | |
| results = data.get("results") or [] | |
| papers: list[dict] = [] | |
| for work in results: | |
| try: | |
| parsed = _parse_work(work) | |
| if parsed: | |
| papers.append(parsed) | |
| except Exception as exc: | |
| logger.warning("Skipping malformed OpenAlex work: %s", exc) | |
| return papers | |
| def fetch_openalex_by_id(identifier: str) -> dict | None: | |
| """Fetch a single OpenAlex Work by its W-ID, DOI, or PMID URN.""" | |
| if not identifier: | |
| return None | |
| # OpenAlex accepts W-IDs directly and DOI/PMID URNs as path segments. | |
| url = f"{_OPENALEX_WORKS_URL}/{identifier}" | |
| try: | |
| resp = _http.get(url, params=_polite_params(), timeout=15) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| except Exception as exc: | |
| logger.warning("OpenAlex get_by_id failed for %s: %s", identifier, exc) | |
| return None | |
| return _parse_work(data) | |
| def fetch_openalex_cross_refs(work_id: str) -> dict[str, str]: | |
| """Look up cross-source identifiers (DOI, PMID, MAG) for an OpenAlex Work. | |
| Returns a dict with whichever of ``doi``, ``pmid``, ``mag`` are | |
| populated. Prefixes are stripped so callers can compose | |
| Semantic Scholar prefixed IDs directly. | |
| """ | |
| if not work_id or not _OPENALEX_ID_RE.fullmatch(work_id): | |
| return {} | |
| url = f"{_OPENALEX_WORKS_URL}/{work_id}" | |
| try: | |
| resp = _http.get(url, params=_polite_params(), timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| except Exception as exc: | |
| logger.warning("OpenAlex cross-ref lookup failed for %s: %s", work_id, exc) | |
| return {} | |
| ids = data.get("ids") or {} | |
| out: dict[str, str] = {} | |
| if doi := _strip_doi(ids.get("doi")): | |
| out["doi"] = doi | |
| if pmid := _strip_pmid(ids.get("pmid")): | |
| out["pmid"] = pmid | |
| mag = ids.get("mag") | |
| if mag: | |
| out["mag"] = str(mag) | |
| return out | |
| class OpenAlexSource(PaperSource): | |
| """OpenAlex Works API. Search + Work-ID/DOI/PMID lookup.""" | |
| name = "openalex" | |
| def search( | |
| self, | |
| query: str, | |
| max_results: int = 20, | |
| filters: dict | None = None, | |
| ) -> list[dict]: | |
| return fetch_openalex( | |
| query=query, | |
| max_results=max_results, | |
| filters=filters, | |
| ) | |
| def get_by_id(self, identifier: str) -> dict | None: | |
| return fetch_openalex_by_id(identifier) | |
| def supports(self, capability: Capability) -> bool: | |
| # OpenAlex exposes citation edges via referenced_works/cited_by_api_url | |
| # but we don't ship a dedicated fetcher for them yet. | |
| return False | |
| def rate_limit_hint(self) -> RateLimitHint: | |
| if os.getenv("OPENALEX_MAILTO", "").strip(): | |
| return RateLimitHint( | |
| min_interval_seconds=0.1, | |
| daily_quota=100_000, | |
| notes="OpenAlex polite pool: 10 req/s, 100k req/day with mailto", | |
| ) | |
| return RateLimitHint( | |
| min_interval_seconds=1.0, | |
| daily_quota=100_000, | |
| notes="OpenAlex public pool: ~1 req/s without mailto; set OPENALEX_MAILTO", | |
| ) | |