Spaces:
Sleeping
Sleeping
File size: 11,216 Bytes
57272d3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 | """OpenAlex source: search + single-work lookup.
OpenAlex returns abstracts as an inverted index (``{word: [positions]}``)
rather than plain text, so every parser here passes through
``_reconstruct_abstract`` before populating the standard paper dict.
The polite pool is opted into by setting ``OPENALEX_MAILTO`` in the
environment. With a mailto the public limit is 10 req/s; without one it
collapses to ~1 req/s.
"""
import datetime
import logging
import os
import re
from ._http import _http
from .base import Capability, PaperSource, RateLimitHint, paper_dict
logger = logging.getLogger(__name__)
_OPENALEX_BASE_URL = "https://api.openalex.org"
_OPENALEX_WORKS_URL = f"{_OPENALEX_BASE_URL}/works"
# Cap per OpenAlex paging contract.
_OPENALEX_MAX_PER_PAGE = 200
_OPENALEX_ID_PREFIX = "https://openalex.org/"
_DOI_PREFIX = "https://doi.org/"
_PMID_PREFIX = "https://pubmed.ncbi.nlm.nih.gov/"
_OPENALEX_ID_RE = re.compile(r"W\d+", re.IGNORECASE)
def _polite_params() -> dict:
"""Return base query params, including ``mailto`` when configured."""
mailto = os.getenv("OPENALEX_MAILTO", "").strip()
return {"mailto": mailto} if mailto else {}
def _strip_openalex_id(value: str | None) -> str | None:
"""Drop the ``https://openalex.org/`` prefix from a Work URI."""
if not value:
return None
if value.startswith(_OPENALEX_ID_PREFIX):
return value[len(_OPENALEX_ID_PREFIX):]
return value
def _strip_doi(value: str | None) -> str | None:
if not value:
return None
if value.startswith(_DOI_PREFIX):
return value[len(_DOI_PREFIX):]
return value
def _strip_pmid(value: str | None) -> str | None:
if not value:
return None
if value.startswith(_PMID_PREFIX):
return value[len(_PMID_PREFIX):].rstrip("/")
return value
def _reconstruct_abstract(inverted_index: dict | None) -> str | None:
"""Reconstruct an abstract string from OpenAlex's inverted index format.
The index is ``{word: [positions, ...]}``. We rebuild a sparse list
sized to the largest position and then join tokens with spaces. Any
missing slots stay empty and are collapsed so a corrupt index degrades
to readable text rather than blowing up.
"""
if not inverted_index or not isinstance(inverted_index, dict):
return None
positions: list[tuple[int, str]] = []
for word, idxs in inverted_index.items():
if not isinstance(idxs, list):
continue
for idx in idxs:
if isinstance(idx, int) and idx >= 0:
positions.append((idx, word))
if not positions:
return None
positions.sort(key=lambda p: p[0])
tokens = [word for _, word in positions]
text = " ".join(tokens).strip()
return text or None
def _parse_authors(authorships: list | None) -> str | None:
if not authorships:
return None
names: list[str] = []
for authorship in authorships:
author = (authorship or {}).get("author") or {}
name = (author.get("display_name") or "").strip()
if name:
names.append(name)
return ", ".join(names) if names else None
def _parse_publication_date(work: dict) -> datetime.datetime | None:
date_str = work.get("publication_date")
if date_str:
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
pass
year = work.get("publication_year")
if isinstance(year, int):
try:
return datetime.datetime(year, 1, 1)
except ValueError:
return None
return None
def _parse_concepts(concepts: list | None) -> list[str]:
"""Pull human-readable concept names; ignore noise below level 0."""
if not concepts:
return []
names: list[str] = []
for c in concepts:
if not isinstance(c, dict):
continue
name = (c.get("display_name") or "").strip()
if not name:
continue
# Keep highest-confidence concepts only; OpenAlex returns long tails.
score = c.get("score")
if isinstance(score, (int, float)) and score < 0.15:
continue
names.append(name)
# Dedupe but preserve order.
seen: set[str] = set()
deduped: list[str] = []
for n in names:
key = n.lower()
if key in seen:
continue
seen.add(key)
deduped.append(n)
return deduped[:10]
def _parse_concept_records(concepts: list | None) -> list[dict]:
"""Structured concept assignments: name, hierarchy level, and score.
These feed the zero-cost OpenAlex topic backend, which needs the level to
keep mid-hierarchy concepts and the score as a confidence.
"""
if not concepts:
return []
out: list[dict] = []
for c in concepts:
if not isinstance(c, dict):
continue
name = (c.get("display_name") or "").strip()
if not name:
continue
out.append({"name": name, "level": c.get("level"), "score": c.get("score")})
return out
def _parse_work(work: dict) -> dict | None:
"""Convert one OpenAlex Work object into the standard paper dict."""
if not work:
return None
work_id = _strip_openalex_id(work.get("id"))
title = (work.get("title") or work.get("display_name") or "").strip()
if not work_id or not title:
return None
abstract = _reconstruct_abstract(work.get("abstract_inverted_index"))
authors = _parse_authors(work.get("authorships"))
pub_date = _parse_publication_date(work)
doi = _strip_doi(work.get("doi"))
citation_count = work.get("cited_by_count")
if not isinstance(citation_count, int):
citation_count = None
url = (
work.get("primary_location", {}).get("landing_page_url")
if isinstance(work.get("primary_location"), dict)
else None
)
if not url:
url = f"{_OPENALEX_ID_PREFIX}{work_id}"
concepts = _parse_concepts(work.get("concepts"))
paper = paper_dict(
title=title,
abstract=abstract,
authors=authors,
publication_date=pub_date,
source="openalex",
source_id=work_id,
url=url,
doi=doi,
topics=concepts,
citation_count=citation_count,
)
# Structured assignments drive the OpenAlex topic backend; the flat names
# in ``topics`` above are kept for backward compatibility.
paper["concepts"] = _parse_concept_records(work.get("concepts"))
return paper
def fetch_openalex(
query: str = "",
max_results: int = 20,
filters: dict | None = None,
) -> list[dict]:
"""Search OpenAlex Works and return parsed paper dicts.
Args:
query: Free-text search across title, abstract, and full text.
max_results: Capped at 200 by OpenAlex.
filters: Optional dict. ``filter`` may be a raw OpenAlex filter
string (e.g. ``"concepts.id:C41008148"``). ``from_publication_date``
and ``to_publication_date`` are forwarded as the corresponding
``filter`` clauses.
"""
filters = filters or {}
params: dict[str, str | int] = {
**_polite_params(),
"per-page": min(max(max_results, 1), _OPENALEX_MAX_PER_PAGE),
}
if query:
params["search"] = query
filter_clauses: list[str] = []
raw_filter = filters.get("filter")
if raw_filter:
filter_clauses.append(raw_filter)
if filters.get("from_publication_date"):
filter_clauses.append(f"from_publication_date:{filters['from_publication_date']}")
if filters.get("to_publication_date"):
filter_clauses.append(f"to_publication_date:{filters['to_publication_date']}")
if filter_clauses:
params["filter"] = ",".join(filter_clauses)
try:
resp = _http.get(_OPENALEX_WORKS_URL, params=params, timeout=20)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.warning("OpenAlex search failed: %s", exc)
return []
results = data.get("results") or []
papers: list[dict] = []
for work in results:
try:
parsed = _parse_work(work)
if parsed:
papers.append(parsed)
except Exception as exc:
logger.warning("Skipping malformed OpenAlex work: %s", exc)
return papers
def fetch_openalex_by_id(identifier: str) -> dict | None:
"""Fetch a single OpenAlex Work by its W-ID, DOI, or PMID URN."""
if not identifier:
return None
# OpenAlex accepts W-IDs directly and DOI/PMID URNs as path segments.
url = f"{_OPENALEX_WORKS_URL}/{identifier}"
try:
resp = _http.get(url, params=_polite_params(), timeout=15)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.warning("OpenAlex get_by_id failed for %s: %s", identifier, exc)
return None
return _parse_work(data)
def fetch_openalex_cross_refs(work_id: str) -> dict[str, str]:
"""Look up cross-source identifiers (DOI, PMID, MAG) for an OpenAlex Work.
Returns a dict with whichever of ``doi``, ``pmid``, ``mag`` are
populated. Prefixes are stripped so callers can compose
Semantic Scholar prefixed IDs directly.
"""
if not work_id or not _OPENALEX_ID_RE.fullmatch(work_id):
return {}
url = f"{_OPENALEX_WORKS_URL}/{work_id}"
try:
resp = _http.get(url, params=_polite_params(), timeout=10)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.warning("OpenAlex cross-ref lookup failed for %s: %s", work_id, exc)
return {}
ids = data.get("ids") or {}
out: dict[str, str] = {}
if doi := _strip_doi(ids.get("doi")):
out["doi"] = doi
if pmid := _strip_pmid(ids.get("pmid")):
out["pmid"] = pmid
mag = ids.get("mag")
if mag:
out["mag"] = str(mag)
return out
class OpenAlexSource(PaperSource):
"""OpenAlex Works API. Search + Work-ID/DOI/PMID lookup."""
name = "openalex"
def search(
self,
query: str,
max_results: int = 20,
filters: dict | None = None,
) -> list[dict]:
return fetch_openalex(
query=query,
max_results=max_results,
filters=filters,
)
def get_by_id(self, identifier: str) -> dict | None:
return fetch_openalex_by_id(identifier)
def supports(self, capability: Capability) -> bool:
# OpenAlex exposes citation edges via referenced_works/cited_by_api_url
# but we don't ship a dedicated fetcher for them yet.
return False
def rate_limit_hint(self) -> RateLimitHint:
if os.getenv("OPENALEX_MAILTO", "").strip():
return RateLimitHint(
min_interval_seconds=0.1,
daily_quota=100_000,
notes="OpenAlex polite pool: 10 req/s, 100k req/day with mailto",
)
return RateLimitHint(
min_interval_seconds=1.0,
daily_quota=100_000,
notes="OpenAlex public pool: ~1 req/s without mailto; set OPENALEX_MAILTO",
)
|