File size: 11,216 Bytes
57272d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""OpenAlex source: search + single-work lookup.

OpenAlex returns abstracts as an inverted index (``{word: [positions]}``)
rather than plain text, so every parser here passes through
``_reconstruct_abstract`` before populating the standard paper dict.

The polite pool is opted into by setting ``OPENALEX_MAILTO`` in the
environment. With a mailto the public limit is 10 req/s; without one it
collapses to ~1 req/s.
"""

import datetime
import logging
import os
import re

from ._http import _http
from .base import Capability, PaperSource, RateLimitHint, paper_dict

logger = logging.getLogger(__name__)

_OPENALEX_BASE_URL = "https://api.openalex.org"
_OPENALEX_WORKS_URL = f"{_OPENALEX_BASE_URL}/works"

# Cap per OpenAlex paging contract.
_OPENALEX_MAX_PER_PAGE = 200

_OPENALEX_ID_PREFIX = "https://openalex.org/"
_DOI_PREFIX = "https://doi.org/"
_PMID_PREFIX = "https://pubmed.ncbi.nlm.nih.gov/"
_OPENALEX_ID_RE = re.compile(r"W\d+", re.IGNORECASE)


def _polite_params() -> dict:
    """Return base query params, including ``mailto`` when configured."""
    mailto = os.getenv("OPENALEX_MAILTO", "").strip()
    return {"mailto": mailto} if mailto else {}


def _strip_openalex_id(value: str | None) -> str | None:
    """Drop the ``https://openalex.org/`` prefix from a Work URI."""
    if not value:
        return None
    if value.startswith(_OPENALEX_ID_PREFIX):
        return value[len(_OPENALEX_ID_PREFIX):]
    return value


def _strip_doi(value: str | None) -> str | None:
    if not value:
        return None
    if value.startswith(_DOI_PREFIX):
        return value[len(_DOI_PREFIX):]
    return value


def _strip_pmid(value: str | None) -> str | None:
    if not value:
        return None
    if value.startswith(_PMID_PREFIX):
        return value[len(_PMID_PREFIX):].rstrip("/")
    return value


def _reconstruct_abstract(inverted_index: dict | None) -> str | None:
    """Reconstruct an abstract string from OpenAlex's inverted index format.

    The index is ``{word: [positions, ...]}``. We rebuild a sparse list
    sized to the largest position and then join tokens with spaces. Any
    missing slots stay empty and are collapsed so a corrupt index degrades
    to readable text rather than blowing up.
    """
    if not inverted_index or not isinstance(inverted_index, dict):
        return None
    positions: list[tuple[int, str]] = []
    for word, idxs in inverted_index.items():
        if not isinstance(idxs, list):
            continue
        for idx in idxs:
            if isinstance(idx, int) and idx >= 0:
                positions.append((idx, word))
    if not positions:
        return None
    positions.sort(key=lambda p: p[0])
    tokens = [word for _, word in positions]
    text = " ".join(tokens).strip()
    return text or None


def _parse_authors(authorships: list | None) -> str | None:
    if not authorships:
        return None
    names: list[str] = []
    for authorship in authorships:
        author = (authorship or {}).get("author") or {}
        name = (author.get("display_name") or "").strip()
        if name:
            names.append(name)
    return ", ".join(names) if names else None


def _parse_publication_date(work: dict) -> datetime.datetime | None:
    date_str = work.get("publication_date")
    if date_str:
        try:
            return datetime.datetime.strptime(date_str, "%Y-%m-%d")
        except ValueError:
            pass
    year = work.get("publication_year")
    if isinstance(year, int):
        try:
            return datetime.datetime(year, 1, 1)
        except ValueError:
            return None
    return None


def _parse_concepts(concepts: list | None) -> list[str]:
    """Pull human-readable concept names; ignore noise below level 0."""
    if not concepts:
        return []
    names: list[str] = []
    for c in concepts:
        if not isinstance(c, dict):
            continue
        name = (c.get("display_name") or "").strip()
        if not name:
            continue
        # Keep highest-confidence concepts only; OpenAlex returns long tails.
        score = c.get("score")
        if isinstance(score, (int, float)) and score < 0.15:
            continue
        names.append(name)
    # Dedupe but preserve order.
    seen: set[str] = set()
    deduped: list[str] = []
    for n in names:
        key = n.lower()
        if key in seen:
            continue
        seen.add(key)
        deduped.append(n)
    return deduped[:10]


def _parse_concept_records(concepts: list | None) -> list[dict]:
    """Structured concept assignments: name, hierarchy level, and score.

    These feed the zero-cost OpenAlex topic backend, which needs the level to
    keep mid-hierarchy concepts and the score as a confidence.
    """
    if not concepts:
        return []
    out: list[dict] = []
    for c in concepts:
        if not isinstance(c, dict):
            continue
        name = (c.get("display_name") or "").strip()
        if not name:
            continue
        out.append({"name": name, "level": c.get("level"), "score": c.get("score")})
    return out


def _parse_work(work: dict) -> dict | None:
    """Convert one OpenAlex Work object into the standard paper dict."""
    if not work:
        return None
    work_id = _strip_openalex_id(work.get("id"))
    title = (work.get("title") or work.get("display_name") or "").strip()
    if not work_id or not title:
        return None

    abstract = _reconstruct_abstract(work.get("abstract_inverted_index"))
    authors = _parse_authors(work.get("authorships"))
    pub_date = _parse_publication_date(work)
    doi = _strip_doi(work.get("doi"))
    citation_count = work.get("cited_by_count")
    if not isinstance(citation_count, int):
        citation_count = None
    url = (
        work.get("primary_location", {}).get("landing_page_url")
        if isinstance(work.get("primary_location"), dict)
        else None
    )
    if not url:
        url = f"{_OPENALEX_ID_PREFIX}{work_id}"

    concepts = _parse_concepts(work.get("concepts"))

    paper = paper_dict(
        title=title,
        abstract=abstract,
        authors=authors,
        publication_date=pub_date,
        source="openalex",
        source_id=work_id,
        url=url,
        doi=doi,
        topics=concepts,
        citation_count=citation_count,
    )
    # Structured assignments drive the OpenAlex topic backend; the flat names
    # in ``topics`` above are kept for backward compatibility.
    paper["concepts"] = _parse_concept_records(work.get("concepts"))
    return paper


def fetch_openalex(
    query: str = "",
    max_results: int = 20,
    filters: dict | None = None,
) -> list[dict]:
    """Search OpenAlex Works and return parsed paper dicts.

    Args:
        query: Free-text search across title, abstract, and full text.
        max_results: Capped at 200 by OpenAlex.
        filters: Optional dict. ``filter`` may be a raw OpenAlex filter
            string (e.g. ``"concepts.id:C41008148"``). ``from_publication_date``
            and ``to_publication_date`` are forwarded as the corresponding
            ``filter`` clauses.
    """
    filters = filters or {}
    params: dict[str, str | int] = {
        **_polite_params(),
        "per-page": min(max(max_results, 1), _OPENALEX_MAX_PER_PAGE),
    }
    if query:
        params["search"] = query

    filter_clauses: list[str] = []
    raw_filter = filters.get("filter")
    if raw_filter:
        filter_clauses.append(raw_filter)
    if filters.get("from_publication_date"):
        filter_clauses.append(f"from_publication_date:{filters['from_publication_date']}")
    if filters.get("to_publication_date"):
        filter_clauses.append(f"to_publication_date:{filters['to_publication_date']}")
    if filter_clauses:
        params["filter"] = ",".join(filter_clauses)

    try:
        resp = _http.get(_OPENALEX_WORKS_URL, params=params, timeout=20)
        resp.raise_for_status()
        data = resp.json()
    except Exception as exc:
        logger.warning("OpenAlex search failed: %s", exc)
        return []

    results = data.get("results") or []
    papers: list[dict] = []
    for work in results:
        try:
            parsed = _parse_work(work)
            if parsed:
                papers.append(parsed)
        except Exception as exc:
            logger.warning("Skipping malformed OpenAlex work: %s", exc)
    return papers


def fetch_openalex_by_id(identifier: str) -> dict | None:
    """Fetch a single OpenAlex Work by its W-ID, DOI, or PMID URN."""
    if not identifier:
        return None
    # OpenAlex accepts W-IDs directly and DOI/PMID URNs as path segments.
    url = f"{_OPENALEX_WORKS_URL}/{identifier}"
    try:
        resp = _http.get(url, params=_polite_params(), timeout=15)
        resp.raise_for_status()
        data = resp.json()
    except Exception as exc:
        logger.warning("OpenAlex get_by_id failed for %s: %s", identifier, exc)
        return None
    return _parse_work(data)


def fetch_openalex_cross_refs(work_id: str) -> dict[str, str]:
    """Look up cross-source identifiers (DOI, PMID, MAG) for an OpenAlex Work.

    Returns a dict with whichever of ``doi``, ``pmid``, ``mag`` are
    populated. Prefixes are stripped so callers can compose
    Semantic Scholar prefixed IDs directly.
    """
    if not work_id or not _OPENALEX_ID_RE.fullmatch(work_id):
        return {}
    url = f"{_OPENALEX_WORKS_URL}/{work_id}"
    try:
        resp = _http.get(url, params=_polite_params(), timeout=10)
        resp.raise_for_status()
        data = resp.json()
    except Exception as exc:
        logger.warning("OpenAlex cross-ref lookup failed for %s: %s", work_id, exc)
        return {}

    ids = data.get("ids") or {}
    out: dict[str, str] = {}
    if doi := _strip_doi(ids.get("doi")):
        out["doi"] = doi
    if pmid := _strip_pmid(ids.get("pmid")):
        out["pmid"] = pmid
    mag = ids.get("mag")
    if mag:
        out["mag"] = str(mag)
    return out


class OpenAlexSource(PaperSource):
    """OpenAlex Works API. Search + Work-ID/DOI/PMID lookup."""

    name = "openalex"

    def search(
        self,
        query: str,
        max_results: int = 20,
        filters: dict | None = None,
    ) -> list[dict]:
        return fetch_openalex(
            query=query,
            max_results=max_results,
            filters=filters,
        )

    def get_by_id(self, identifier: str) -> dict | None:
        return fetch_openalex_by_id(identifier)

    def supports(self, capability: Capability) -> bool:
        # OpenAlex exposes citation edges via referenced_works/cited_by_api_url
        # but we don't ship a dedicated fetcher for them yet.
        return False

    def rate_limit_hint(self) -> RateLimitHint:
        if os.getenv("OPENALEX_MAILTO", "").strip():
            return RateLimitHint(
                min_interval_seconds=0.1,
                daily_quota=100_000,
                notes="OpenAlex polite pool: 10 req/s, 100k req/day with mailto",
            )
        return RateLimitHint(
            min_interval_seconds=1.0,
            daily_quota=100_000,
            notes="OpenAlex public pool: ~1 req/s without mailto; set OPENALEX_MAILTO",
        )