"""OpenAlex source: search + single-work lookup. OpenAlex returns abstracts as an inverted index (``{word: [positions]}``) rather than plain text, so every parser here passes through ``_reconstruct_abstract`` before populating the standard paper dict. The polite pool is opted into by setting ``OPENALEX_MAILTO`` in the environment. With a mailto the public limit is 10 req/s; without one it collapses to ~1 req/s. """ import datetime import logging import os import re from ._http import _http from .base import Capability, PaperSource, RateLimitHint, paper_dict logger = logging.getLogger(__name__) _OPENALEX_BASE_URL = "https://api.openalex.org" _OPENALEX_WORKS_URL = f"{_OPENALEX_BASE_URL}/works" # Cap per OpenAlex paging contract. _OPENALEX_MAX_PER_PAGE = 200 _OPENALEX_ID_PREFIX = "https://openalex.org/" _DOI_PREFIX = "https://doi.org/" _PMID_PREFIX = "https://pubmed.ncbi.nlm.nih.gov/" _OPENALEX_ID_RE = re.compile(r"W\d+", re.IGNORECASE) def _polite_params() -> dict: """Return base query params, including ``mailto`` when configured.""" mailto = os.getenv("OPENALEX_MAILTO", "").strip() return {"mailto": mailto} if mailto else {} def _strip_openalex_id(value: str | None) -> str | None: """Drop the ``https://openalex.org/`` prefix from a Work URI.""" if not value: return None if value.startswith(_OPENALEX_ID_PREFIX): return value[len(_OPENALEX_ID_PREFIX):] return value def _strip_doi(value: str | None) -> str | None: if not value: return None if value.startswith(_DOI_PREFIX): return value[len(_DOI_PREFIX):] return value def _strip_pmid(value: str | None) -> str | None: if not value: return None if value.startswith(_PMID_PREFIX): return value[len(_PMID_PREFIX):].rstrip("/") return value def _reconstruct_abstract(inverted_index: dict | None) -> str | None: """Reconstruct an abstract string from OpenAlex's inverted index format. The index is ``{word: [positions, ...]}``. We rebuild a sparse list sized to the largest position and then join tokens with spaces. Any missing slots stay empty and are collapsed so a corrupt index degrades to readable text rather than blowing up. """ if not inverted_index or not isinstance(inverted_index, dict): return None positions: list[tuple[int, str]] = [] for word, idxs in inverted_index.items(): if not isinstance(idxs, list): continue for idx in idxs: if isinstance(idx, int) and idx >= 0: positions.append((idx, word)) if not positions: return None positions.sort(key=lambda p: p[0]) tokens = [word for _, word in positions] text = " ".join(tokens).strip() return text or None def _parse_authors(authorships: list | None) -> str | None: if not authorships: return None names: list[str] = [] for authorship in authorships: author = (authorship or {}).get("author") or {} name = (author.get("display_name") or "").strip() if name: names.append(name) return ", ".join(names) if names else None def _parse_publication_date(work: dict) -> datetime.datetime | None: date_str = work.get("publication_date") if date_str: try: return datetime.datetime.strptime(date_str, "%Y-%m-%d") except ValueError: pass year = work.get("publication_year") if isinstance(year, int): try: return datetime.datetime(year, 1, 1) except ValueError: return None return None def _parse_concepts(concepts: list | None) -> list[str]: """Pull human-readable concept names; ignore noise below level 0.""" if not concepts: return [] names: list[str] = [] for c in concepts: if not isinstance(c, dict): continue name = (c.get("display_name") or "").strip() if not name: continue # Keep highest-confidence concepts only; OpenAlex returns long tails. score = c.get("score") if isinstance(score, (int, float)) and score < 0.15: continue names.append(name) # Dedupe but preserve order. seen: set[str] = set() deduped: list[str] = [] for n in names: key = n.lower() if key in seen: continue seen.add(key) deduped.append(n) return deduped[:10] def _parse_concept_records(concepts: list | None) -> list[dict]: """Structured concept assignments: name, hierarchy level, and score. These feed the zero-cost OpenAlex topic backend, which needs the level to keep mid-hierarchy concepts and the score as a confidence. """ if not concepts: return [] out: list[dict] = [] for c in concepts: if not isinstance(c, dict): continue name = (c.get("display_name") or "").strip() if not name: continue out.append({"name": name, "level": c.get("level"), "score": c.get("score")}) return out def _parse_work(work: dict) -> dict | None: """Convert one OpenAlex Work object into the standard paper dict.""" if not work: return None work_id = _strip_openalex_id(work.get("id")) title = (work.get("title") or work.get("display_name") or "").strip() if not work_id or not title: return None abstract = _reconstruct_abstract(work.get("abstract_inverted_index")) authors = _parse_authors(work.get("authorships")) pub_date = _parse_publication_date(work) doi = _strip_doi(work.get("doi")) citation_count = work.get("cited_by_count") if not isinstance(citation_count, int): citation_count = None url = ( work.get("primary_location", {}).get("landing_page_url") if isinstance(work.get("primary_location"), dict) else None ) if not url: url = f"{_OPENALEX_ID_PREFIX}{work_id}" concepts = _parse_concepts(work.get("concepts")) paper = paper_dict( title=title, abstract=abstract, authors=authors, publication_date=pub_date, source="openalex", source_id=work_id, url=url, doi=doi, topics=concepts, citation_count=citation_count, ) # Structured assignments drive the OpenAlex topic backend; the flat names # in ``topics`` above are kept for backward compatibility. paper["concepts"] = _parse_concept_records(work.get("concepts")) return paper def fetch_openalex( query: str = "", max_results: int = 20, filters: dict | None = None, ) -> list[dict]: """Search OpenAlex Works and return parsed paper dicts. Args: query: Free-text search across title, abstract, and full text. max_results: Capped at 200 by OpenAlex. filters: Optional dict. ``filter`` may be a raw OpenAlex filter string (e.g. ``"concepts.id:C41008148"``). ``from_publication_date`` and ``to_publication_date`` are forwarded as the corresponding ``filter`` clauses. """ filters = filters or {} params: dict[str, str | int] = { **_polite_params(), "per-page": min(max(max_results, 1), _OPENALEX_MAX_PER_PAGE), } if query: params["search"] = query filter_clauses: list[str] = [] raw_filter = filters.get("filter") if raw_filter: filter_clauses.append(raw_filter) if filters.get("from_publication_date"): filter_clauses.append(f"from_publication_date:{filters['from_publication_date']}") if filters.get("to_publication_date"): filter_clauses.append(f"to_publication_date:{filters['to_publication_date']}") if filter_clauses: params["filter"] = ",".join(filter_clauses) try: resp = _http.get(_OPENALEX_WORKS_URL, params=params, timeout=20) resp.raise_for_status() data = resp.json() except Exception as exc: logger.warning("OpenAlex search failed: %s", exc) return [] results = data.get("results") or [] papers: list[dict] = [] for work in results: try: parsed = _parse_work(work) if parsed: papers.append(parsed) except Exception as exc: logger.warning("Skipping malformed OpenAlex work: %s", exc) return papers def fetch_openalex_by_id(identifier: str) -> dict | None: """Fetch a single OpenAlex Work by its W-ID, DOI, or PMID URN.""" if not identifier: return None # OpenAlex accepts W-IDs directly and DOI/PMID URNs as path segments. url = f"{_OPENALEX_WORKS_URL}/{identifier}" try: resp = _http.get(url, params=_polite_params(), timeout=15) resp.raise_for_status() data = resp.json() except Exception as exc: logger.warning("OpenAlex get_by_id failed for %s: %s", identifier, exc) return None return _parse_work(data) def fetch_openalex_cross_refs(work_id: str) -> dict[str, str]: """Look up cross-source identifiers (DOI, PMID, MAG) for an OpenAlex Work. Returns a dict with whichever of ``doi``, ``pmid``, ``mag`` are populated. Prefixes are stripped so callers can compose Semantic Scholar prefixed IDs directly. """ if not work_id or not _OPENALEX_ID_RE.fullmatch(work_id): return {} url = f"{_OPENALEX_WORKS_URL}/{work_id}" try: resp = _http.get(url, params=_polite_params(), timeout=10) resp.raise_for_status() data = resp.json() except Exception as exc: logger.warning("OpenAlex cross-ref lookup failed for %s: %s", work_id, exc) return {} ids = data.get("ids") or {} out: dict[str, str] = {} if doi := _strip_doi(ids.get("doi")): out["doi"] = doi if pmid := _strip_pmid(ids.get("pmid")): out["pmid"] = pmid mag = ids.get("mag") if mag: out["mag"] = str(mag) return out class OpenAlexSource(PaperSource): """OpenAlex Works API. Search + Work-ID/DOI/PMID lookup.""" name = "openalex" def search( self, query: str, max_results: int = 20, filters: dict | None = None, ) -> list[dict]: return fetch_openalex( query=query, max_results=max_results, filters=filters, ) def get_by_id(self, identifier: str) -> dict | None: return fetch_openalex_by_id(identifier) def supports(self, capability: Capability) -> bool: # OpenAlex exposes citation edges via referenced_works/cited_by_api_url # but we don't ship a dedicated fetcher for them yet. return False def rate_limit_hint(self) -> RateLimitHint: if os.getenv("OPENALEX_MAILTO", "").strip(): return RateLimitHint( min_interval_seconds=0.1, daily_quota=100_000, notes="OpenAlex polite pool: 10 req/s, 100k req/day with mailto", ) return RateLimitHint( min_interval_seconds=1.0, daily_quota=100_000, notes="OpenAlex public pool: ~1 req/s without mailto; set OPENALEX_MAILTO", )