"""Core analytics: impact scoring, topics, trends, citations, similarity, authors, reviews. All functions operate on SQLAlchemy Paper model instances retrieved from the local SQLite cache. External API calls are made only for citation lookups and paper recommendations via Semantic Scholar. """ import logging import math import re from collections import Counter, defaultdict from datetime import datetime, timedelta, timezone from typing import Any from sqlalchemy import or_ from sqlalchemy.orm import Session from .db import Paper logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Impact scoring # --------------------------------------------------------------------------- _HALF_LIFE_DAYS = 5 * 365 # 5-year half-life for academic papers _RECENCY_FLOOR = 0.15 # Old landmark papers keep at least 15% weight def compute_impact_score( citation_count: int | None, influential_citation_count: int | None, publication_date: datetime | None, ) -> float | None: """Compute composite impact score [0, 100]. Formula: log1p(effective_citations) * recency_weight Recency decays with a 5-year half-life and a 15% floor so that highly-cited older papers still receive meaningful scores. """ if citation_count is None: return None recency_weight = 1.0 if publication_date: age_days = max( 0, (datetime.now(timezone.utc).replace(tzinfo=None) - publication_date).days, ) recency_weight = max( _RECENCY_FLOOR, math.exp(-age_days * math.log(2) / _HALF_LIFE_DAYS), ) effective = citation_count + 3 * (influential_citation_count or 0) raw = math.log1p(effective) * recency_weight return round(min(100.0, raw / 7.0 * 100.0), 2) # --------------------------------------------------------------------------- # Topic detection # --------------------------------------------------------------------------- # # Topic detection moved into the ``topics`` subpackage: OpenAlex concept # assignments first, an optional sentence-transformer second, the keyword # taxonomy last. Re-exported here so callers that import from ``core`` keep # working unchanged. from .topics import ( # noqa: E402 detect_topic_entries, detect_topics, enrich_topic_entries, enrich_topics, to_names as topics_to_names, ) # --------------------------------------------------------------------------- # Trend detection # --------------------------------------------------------------------------- def get_trends(session: Session, window_days: int = 30) -> dict[str, Any]: """Detect emerging/declining topics by comparing recent vs baseline window. Note: trends are computed over the locally cached corpus only. The cache reflects your search history, not the full publication landscape. Run broad searches via search_papers to build a representative sample. """ now = datetime.now(timezone.utc).replace(tzinfo=None) signal_start = now - timedelta(days=window_days) baseline_start = signal_start - timedelta(days=window_days) papers = session.query(Paper).filter( Paper.publication_date >= baseline_start ).all() def count_topics(start, end): c = Counter() for p in papers: if p.publication_date and start <= p.publication_date < end: for t in p.topics: c[t.lower()] += 1 return c signal = count_topics(signal_start, now) baseline = count_topics(baseline_start, signal_start) all_topics = set(signal) | set(baseline) rows = [] for topic in all_topics: sig = signal.get(topic, 0) base = baseline.get(topic, 0) score = ((sig - base) / max(base, 1)) * 100.0 if base == 0 and sig >= 1: status = "new" elif sig >= 1 and score >= 50: status = "emerging" elif base >= 1 and score <= -30: status = "declining" else: status = "stable" rows.append({ "topic": topic, "signal_count": sig, "baseline_count": base, "trend_score": round(score, 2), "status": status, }) rows.sort(key=lambda r: r["trend_score"], reverse=True) total_signal = sum(signal.values()) total_baseline = sum(baseline.values()) # OpenAlex makes broad seeding viable in a single search, so the # corpus reaches statistically meaningful counts much sooner. _LOW_CONFIDENCE_FLOOR = 5 low_confidence = ( total_signal < _LOW_CONFIDENCE_FLOOR or total_baseline < _LOW_CONFIDENCE_FLOOR ) result = { "window_days": window_days, "signal_start": signal_start.isoformat(), "signal_end": now.isoformat(), "corpus_size": len(papers), "signal_topic_occurrences": total_signal, "baseline_topic_occurrences": total_baseline, "low_confidence": low_confidence, "topics": rows, } if low_confidence: result["emerging"] = [] result["declining"] = [] result["note"] = ( f"Insufficient data for reliable trend detection. " f"Found {total_signal} signal and {total_baseline} baseline topic occurrences " f"(need at least {_LOW_CONFIDENCE_FLOOR} each). Run search_papers to grow the corpus." ) else: result["emerging"] = [r["topic"] for r in rows if r["status"] in ("emerging", "new")] result["declining"] = [r["topic"] for r in rows if r["status"] == "declining"] return result # --------------------------------------------------------------------------- # Real citation network via Semantic Scholar API # --------------------------------------------------------------------------- _DAMPING = 0.85 _PR_ITERATIONS = 50 _PR_TOL = 1e-6 def _resolve_s2_identifier(paper: Paper) -> str | None: """Resolve a cached paper to a Semantic Scholar API identifier. Priority: S2 paper ID > DOI on the local row > arXiv ID > OpenAlex cross-references (DOI / PMID / MAG). The OpenAlex hop is only taken when the local row carries no DOI of its own; we ask OpenAlex for the Work's external identifiers and map the first available one onto Semantic Scholar's prefix scheme. """ if paper.source == "semantic_scholar" and paper.source_id: return paper.source_id if paper.doi: return f"DOI:{paper.doi}" if paper.source == "arxiv" and paper.source_id: return f"ARXIV:{paper.source_id}" if paper.source == "openalex" and paper.source_id: from . import sources cross = sources.fetch_openalex_cross_refs(paper.source_id) if doi := cross.get("doi"): return f"DOI:{doi}" if pmid := cross.get("pmid"): return f"PMID:{pmid}" if mag := cross.get("mag"): return f"MAG:{mag}" return None def _pagerank(node_ids, out_edges, in_edges): n = len(node_ids) if n == 0: return {} rank = {nid: 1.0 / n for nid in node_ids} for _ in range(_PR_ITERATIONS): dangling = sum(rank[nid] for nid in node_ids if not out_edges.get(nid)) new_rank = {} for nid in node_ids: link_sum = sum( rank[src] / max(len(out_edges.get(src, [])), 1) for src in in_edges.get(nid, []) ) new_rank[nid] = (1 - _DAMPING) / n + _DAMPING * (link_sum + dangling / n) delta = sum(abs(new_rank[nid] - rank[nid]) for nid in node_ids) rank = new_rank if delta < _PR_TOL: break return rank def get_citations(session: Session, paper_id: int) -> dict[str, Any]: """Get real citation data for a paper via Semantic Scholar API. Fetches actual citing papers and references, then computes PageRank on the real citation subgraph within the cached corpus. """ from . import sources paper = session.query(Paper).filter(Paper.id == paper_id).first() if not paper: return {"paper_id": paper_id, "error": "not found", "citations": [], "references": []} s2_id = _resolve_s2_identifier(paper) if not s2_id: return { "paper_id": paper_id, "title": paper.title, "error": "Cannot resolve Semantic Scholar identifier for this paper.", "citations": [], "references": [], } # Fetch real citation data from S2 raw_citations = sources.fetch_s2_citations(s2_id, limit=100) raw_references = sources.fetch_s2_references(s2_id, limit=100) def _format_paper(p: dict) -> dict: return { "title": p.get("title"), "authors": p.get("authors"), "publication_date": ( p["publication_date"].isoformat() if p.get("publication_date") else None ), "citation_count": p.get("citation_count"), "source_id": p.get("source_id"), "url": p.get("url"), "is_influential": p.get("_is_influential", False), "citation_intents": p.get("_citation_intents", []), } citations = [_format_paper(p) for p in raw_citations] references = [_format_paper(p) for p in raw_references] # Citation velocity velocity = None if paper.citation_count is not None and paper.publication_date: age = max((datetime.now(timezone.utc).replace(tzinfo=None) - paper.publication_date).days, 1) velocity = round(paper.citation_count / age, 4) influential_count = sum(1 for c in citations if c.get("is_influential")) return { "paper_id": paper_id, "title": paper.title, "total_citations": len(citations), "influential_citations": influential_count, "citations": citations[:50], # Cap output size "total_references": len(references), "references": references[:50], "citations_per_day": velocity, "citation_count": paper.citation_count, } # --------------------------------------------------------------------------- # TF-IDF similarity (local fallback) # --------------------------------------------------------------------------- _STOPWORDS = { "a", "an", "the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", "been", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "can", "this", "that", "these", "those", "we", "our", "their", "its", "it", "which", "who", "not", "no", "also", "however", "such", "show", "shows", "paper", "propose", "proposed", "present", "results", "method", "approach", "based", "using", "used", "use", "new", "novel", "two", "three", "all", "you", "need", "one", "first", "more", "some", "than", "them", "then", "they", "what", "when", "where", "how", "into", "about", "over", "most", "each", "both", "after", "between", "through", "during", "before", "able", "many", "much", "very", "only", "other", "while", "being", "there", "here", "well", "more", "same", "different", "several", "various", "among", "within", "without", "against", "further", "compared", "achieve", "achieved", "demonstrate", "demonstrated", "perform", "performed", "existing", "recent", "recently", "previous", "previously", "significantly", "effectively", "respectively", "state", "art", "outperform", "outperforms", "benchmark", "benchmarks", } def _stem(word: str) -> str: """Minimal suffix-stripping stemmer to normalize plurals and common endings.""" if len(word) <= 4: return word for suffix in ("ations", "ation", "ments", "ment", "ness", "ings", "ing", "ises", "izes", "ous", "ive", "ers", "ies", "ing", "ed", "ly", "es", "er", "al", "s"): if word.endswith(suffix) and len(word) - len(suffix) >= 3: return word[: -len(suffix)] return word def _tokenize(text: str) -> list[str]: text = re.sub(r"[^a-zA-Z0-9\s]", " ", text.lower()) return [_stem(t) for t in text.split() if len(t) >= 3 and t not in _STOPWORDS] def _build_tfidf(papers: list[Paper]) -> dict[int, dict[str, float]]: tf: dict[int, dict[str, int]] = {} df: dict[str, int] = defaultdict(int) for p in papers: tokens = _tokenize(f"{p.title or ''} {p.abstract or ''}") freq: dict[str, int] = defaultdict(int) for tok in tokens: freq[tok] += 1 tf[p.id] = dict(freq) for term in freq: df[term] += 1 n_docs = max(len(papers), 1) tfidf: dict[int, dict[str, float]] = {} for pid, freq_map in tf.items(): doc_len = max(sum(freq_map.values()), 1) vec = {} for term, count in freq_map.items(): raw_tf = count / doc_len idf = math.log(n_docs / max(df[term], 1)) + 1.0 vec[term] = raw_tf * idf tfidf[pid] = vec return tfidf def _cosine_sim(a: dict[str, float], b: dict[str, float]) -> float: if not a or not b: return 0.0 dot = sum(a.get(t, 0.0) * b.get(t, 0.0) for t in a) na = math.sqrt(sum(v * v for v in a.values())) nb = math.sqrt(sum(v * v for v in b.values())) d = na * nb return min(dot / d, 1.0) if d > 0 else 0.0 def _topic_overlap(topics_a: list[str], topics_b: list[str]) -> float: """Jaccard similarity over topic sets.""" if not topics_a or not topics_b: return 0.0 set_a = {t.lower() for t in topics_a} set_b = {t.lower() for t in topics_b} intersection = set_a & set_b union = set_a | set_b return len(intersection) / len(union) if union else 0.0 def find_similar(session: Session, paper_id: int, top_n: int = 10) -> dict[str, Any]: """Find similar papers using Semantic Scholar recommendations with TF-IDF fallback. Tries the S2 Recommendations API first (SPECTER2 embeddings). Falls back to local TF-IDF cosine similarity if the paper has no S2 identifier or the API call fails. """ from . import sources target_paper = session.query(Paper).filter(Paper.id == paper_id).first() if not target_paper: return {"paper_id": paper_id, "similar": []} # Try S2 Recommendations API first s2_id = _resolve_s2_identifier(target_paper) if s2_id: # For S2 recommendations, we need the raw S2 paper ID (not DOI: prefix) raw_id = s2_id if raw_id.startswith("DOI:") or raw_id.startswith("ARXIV:"): # Recommendations API needs the S2 paper ID, not prefixed IDs. # Try to look it up if the paper is from S2 source. if target_paper.source == "semantic_scholar": raw_id = target_paper.source_id else: raw_id = None if raw_id: recs = sources.fetch_s2_recommendations(raw_id, limit=top_n) if recs: similar = [] for i, p in enumerate(recs[:top_n]): similar.append({ "title": p.get("title"), "authors": p.get("authors"), "publication_date": ( p["publication_date"].isoformat() if p.get("publication_date") else None ), "similarity_source": "semantic_scholar_specter2", "citation_count": p.get("citation_count"), "url": p.get("url"), "source_id": p.get("source_id"), }) return { "paper_id": paper_id, "method": "semantic_scholar_recommendations", "similar": similar, } # Fallback to local TF-IDF similarity return _find_similar_tfidf(session, target_paper, top_n) def _find_similar_tfidf( session: Session, target_paper: Paper, top_n: int = 10, ) -> dict[str, Any]: """Local TF-IDF cosine similarity fallback.""" paper_id = target_paper.id # Pre-filter: prefer papers sharing topics with the target, then fill with top-impact topic_patterns = [f'%"{t}"%' for t in target_paper.topics[:3]] if topic_patterns: related = ( session.query(Paper) .filter(or_(*[Paper.topics_json.like(p) for p in topic_patterns])) .order_by(Paper.impact_score.desc().nullslast()) .limit(300) .all() ) related_ids = {p.id for p in related} filler = ( session.query(Paper) .filter(~Paper.id.in_(related_ids)) .order_by(Paper.impact_score.desc().nullslast()) .limit(200) .all() ) papers = related + filler else: papers = session.query(Paper).order_by(Paper.impact_score.desc().nullslast()).limit(500).all() by_id = {p.id: p for p in papers} if paper_id not in by_id: by_id[paper_id] = target_paper papers.append(target_paper) tfidf = _build_tfidf(papers) target = tfidf.get(paper_id, {}) scores = [] for pid, vec in tfidf.items(): if pid == paper_id: continue text_sim = _cosine_sim(target, vec) topic_sim = _topic_overlap(target_paper.topics, by_id[pid].topics) # Blend: 60% text similarity, 40% topic overlap sim = 0.6 * text_sim + 0.4 * topic_sim if sim > 0: scores.append((pid, sim)) scores.sort(key=lambda x: x[1], reverse=True) similar = [] for pid, sim in scores[:top_n]: p = by_id[pid] similar.append({ "id": p.id, "title": p.title, "authors": p.authors, "publication_date": p.publication_date.isoformat() if p.publication_date else None, "similarity": round(sim, 4), "topics": p.topics, "similarity_source": "tfidf_local", }) return {"paper_id": paper_id, "method": "tfidf_cosine_with_topic_overlap", "similar": similar} # --------------------------------------------------------------------------- # Author analytics # --------------------------------------------------------------------------- _SPLIT_RE = re.compile(r"[;,]+") def _split_authors(authors_str: str | None) -> list[str]: if not authors_str: return [] return [a.strip() for a in _SPLIT_RE.split(authors_str) if a.strip()] def _name_matches(candidate: str, target: str) -> bool: """Word-boundary aware name matching to avoid false positives on short names.""" c_lower, t_lower = candidate.lower(), target.lower() # Tokenize both into name parts c_parts = c_lower.split() t_parts = t_lower.split() # Check if any target part matches a candidate part (whole-word match) for tp in t_parts: if len(tp) < 2: continue for cp in c_parts: if tp == cp or (len(tp) >= 3 and (cp.startswith(tp) or tp.startswith(cp))): return True return False def get_author_profile(session: Session, author_name: str) -> dict[str, Any]: """Get author analytics: papers, frequency, topics, collaborators.""" # Pre-filter by author name in SQL, then refine with word-boundary matching escaped_name = author_name.replace("/", "//").replace("%", "/%").replace("_", "/_") candidates = ( session.query(Paper) .filter(Paper.authors.ilike(f"%{escaped_name}%", escape="/")) .order_by(Paper.publication_date.desc().nullslast()) .all() ) author_papers = [ p for p in candidates if any(_name_matches(a, author_name) for a in _split_authors(p.authors)) ] author_papers.sort( key=lambda p: p.publication_date or datetime(1970, 1, 1), reverse=True, ) # Publication frequency freq = Counter() for p in author_papers: if p.publication_date: freq[p.publication_date.strftime("%Y-%m")] += 1 # Top topics topic_counts = Counter() for p in author_papers: for t in p.topics: topic_counts[t.lower()] += 1 # Collaborators collabs = Counter() for p in author_papers: for co in _split_authors(p.authors): if not _name_matches(co, author_name): collabs[co] += 1 return { "author_name": author_name, "total_papers": len(author_papers), "papers": [p.to_dict(compact=True) for p in author_papers[:20]], "publication_frequency": dict(sorted(freq.items())), "top_topics": dict(topic_counts.most_common(10)), "top_collaborators": dict(collabs.most_common(10)), } # --------------------------------------------------------------------------- # Literature review generation # --------------------------------------------------------------------------- def generate_review(session: Session, topic: str) -> dict[str, Any]: """Generate a structured literature review from cached papers.""" if not topic or not topic.strip(): return {"topic": topic, "error": "topic must not be empty", "total_papers": 0} escaped = topic.replace("/", "//").replace("%", "/%").replace("_", "/_") terms = escaped.strip().split() q = session.query(Paper) for term in terms: pattern = f"%{term}%" q = q.filter(or_( Paper.title.ilike(pattern, escape="/"), Paper.abstract.ilike(pattern, escape="/"), Paper.topics_json.ilike(pattern, escape="/"), )) papers = ( q.order_by(Paper.impact_score.desc().nullslast(), Paper.publication_date.desc().nullslast()) .limit(50) .all() ) # Group by primary topic subtopics: dict[str, list[Paper]] = defaultdict(list) for p in papers: primary = p.topics[0] if p.topics else "general" subtopics[primary].append(p) # Consensus / debate detection topic_counts = Counter(t for p in papers for t in p.topics) consensus = [t for t, c in topic_counts.items() if c >= max(len(papers) // 3, 2)] debate_topics = [ t for t, c in topic_counts.items() if c >= 2 and t not in consensus ] # Build review text lines = [ f"Literature Review: {topic}", "=" * (20 + len(topic)), "", f"This review covers {len(papers)} papers related to '{topic}', " f"spanning {len(subtopics)} subtopics.", "", ] if subtopics: lines.append("Subtopics") lines.append("-" * 9) for st, ps in sorted(subtopics.items(), key=lambda kv: len(kv[1]), reverse=True)[:8]: titles = "; ".join(p.title for p in ps[:3]) lines.append(f" {st.title()} ({len(ps)} papers): {titles}") lines.append("") key_papers = sorted(papers, key=lambda p: p.citation_count or 0, reverse=True)[:5] if key_papers: lines.append("Key Papers") lines.append("-" * 10) for p in key_papers: lines.append(f" - {p.title} (citations: {p.citation_count or 'unknown'})") lines.append("") if consensus: lines.append("Consensus Areas: " + ", ".join(consensus[:5])) if debate_topics: lines.append("Active Debate Areas: " + ", ".join(debate_topics[:5])) subtopics_out = { st: [p.to_dict(compact=True) for p in ps] for st, ps in subtopics.items() } return { "topic": topic, "total_papers": len(papers), "subtopics": subtopics_out, "consensus_topics": consensus, "debate_topics": debate_topics, "review_text": "\n".join(lines), } # --------------------------------------------------------------------------- # BibTeX export # --------------------------------------------------------------------------- def _sanitize_bibtex_key(title: str, source_id: str) -> str: """Generate a BibTeX citation key from title and source ID.""" # Take first meaningful word from title words = re.sub(r"[^a-zA-Z0-9\s]", "", title).split() first_word = words[0].lower() if words else "unknown" # Clean source_id for use as key clean_id = re.sub(r"[^a-zA-Z0-9]", "", source_id)[:20] return f"{first_word}_{clean_id}" def _escape_bibtex(value: str) -> str: """Escape special LaTeX characters in BibTeX field values.""" replacements = [("&", r"\&"), ("%", r"\%"), ("_", r"\_"), ("#", r"\#")] for old, new in replacements: value = value.replace(old, new) return value def export_bibtex(session: Session, paper_ids: list[int] | None = None, query: str | None = None, limit: int = 50) -> dict[str, Any]: """Export papers as BibTeX entries. Specify paper_ids for specific papers, or query to search the cache. """ if paper_ids: papers = session.query(Paper).filter(Paper.id.in_(paper_ids)).all() elif query: escaped = query.replace("/", "//").replace("%", "/%").replace("_", "/_") terms = escaped.strip().split() q = session.query(Paper) for term in terms: pattern = f"%{term}%" q = q.filter(or_( Paper.title.ilike(pattern, escape="/"), Paper.abstract.ilike(pattern, escape="/"), )) papers = q.order_by(Paper.impact_score.desc().nullslast()).limit(limit).all() else: return {"error": "Provide paper_ids or query", "bibtex": "", "count": 0} entries = [] for p in papers: key = _sanitize_bibtex_key(p.title or "untitled", p.source_id or str(p.id)) # Determine entry type if p.source == "arxiv": entry_type = "article" elif p.source == "pubmed": entry_type = "article" else: entry_type = "article" fields = [f" title = {{{_escape_bibtex(p.title)}}}"] if p.authors: # Convert "Last First, Last First" to "Last, First and Last, First" author_parts = [a.strip() for a in p.authors.split(",") if a.strip()] bibtex_authors = " and ".join(author_parts) fields.append(f" author = {{{_escape_bibtex(bibtex_authors)}}}") if p.publication_date: fields.append(f" year = {{{p.publication_date.year}}}") fields.append(f" month = {{{p.publication_date.strftime('%b').lower()}}}") if p.doi: fields.append(f" doi = {{{p.doi}}}") if p.url: fields.append(f" url = {{{p.url}}}") if p.source == "arxiv" and p.source_id: fields.append(f" eprint = {{{p.source_id}}}") fields.append(" archiveprefix = {arXiv}") if p.abstract: # Truncate very long abstracts for BibTeX abstract = p.abstract[:500] fields.append(f" abstract = {{{_escape_bibtex(abstract)}}}") entry = f"@{entry_type}{{{key},\n" + ",\n".join(fields) + "\n}" entries.append(entry) bibtex_str = "\n\n".join(entries) return { "count": len(entries), "bibtex": bibtex_str, } # --------------------------------------------------------------------------- # Search (local cache) # --------------------------------------------------------------------------- def search_cached( session: Session, query: str, source: str | None = None, year_from: int | None = None, year_to: int | None = None, min_citations: int | None = None, limit: int = 20, ) -> list[dict]: """Search the local paper cache by title/abstract keyword matching.""" escaped = query.replace("/", "//").replace("%", "/%").replace("_", "/_") terms = escaped.strip().split() q = session.query(Paper) for term in terms: pattern = f"%{term}%" q = q.filter(or_( Paper.title.ilike(pattern, escape="/"), Paper.abstract.ilike(pattern, escape="/"), Paper.topics_json.ilike(pattern, escape="/"), )) if source: q = q.filter(Paper.source == source) if year_from is not None: q = q.filter(Paper.publication_date >= datetime(year_from, 1, 1)) if year_to is not None: q = q.filter(Paper.publication_date < datetime(year_to + 1, 1, 1)) if min_citations is not None: q = q.filter(Paper.citation_count >= min_citations) q = q.order_by(Paper.impact_score.desc().nullslast(), Paper.publication_date.desc().nullslast()) return [p.to_dict(compact=True) for p in q.limit(limit).all()]