surbi karki commited on
Commit
16f0d20
·
verified ·
1 Parent(s): d37e0aa

Update ingestion_service.py

Browse files
Files changed (1) hide show
  1. ingestion_service.py +142 -129
ingestion_service.py CHANGED
@@ -1,129 +1,142 @@
1
- import time
2
- import pandas as pd
3
- import joblib
4
- from Bio import Entrez
5
- from sqlalchemy import create_engine, text
6
- from urllib.parse import quote_plus
7
- from text_utils import TextProcessor
8
- from sklearn.feature_extraction.text import TfidfVectorizer
9
-
10
- # --- CONFIGURATION ---
11
- DB_USER = "postgres"
12
- DB_PASSWORD = quote_plus("subisu")
13
- DB_NAME = "ppd_project_db"
14
- DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@localhost:5432/{DB_NAME}'
15
-
16
- Entrez.email = "surbi.211740@ncit.edu.np"
17
- Entrez.tool = "PPD_Recommender_App"
18
-
19
- class IngestionService:
20
- def __init__(self):
21
- self.engine = create_engine(DB_URI)
22
-
23
- def fetch_from_pubmed(self, query, limit=100):
24
- print(f"🔍 Searching PubMed: '{query}'...")
25
- try:
26
- h1 = Entrez.esearch(db="pubmed", term=query, retmax=limit, sort="relevance")
27
- ids = Entrez.read(h1)["IdList"]
28
-
29
- if not ids: return []
30
-
31
- h2 = Entrez.efetch(db="pubmed", id=ids, retmode="xml")
32
- papers = Entrez.read(h2)
33
-
34
- results = []
35
- for paper in papers['PubmedArticle']:
36
- try:
37
- article = paper['MedlineCitation']['Article']
38
- title = article.get('ArticleTitle', '')
39
- abstract_data = article.get('Abstract', {}).get('AbstractText', [])
40
- abstract = " ".join([str(x) for x in abstract_data]) if isinstance(abstract_data, list) else str(abstract_data)
41
-
42
- if not abstract: continue
43
-
44
- results.append({
45
- "title": title,
46
- "content": abstract,
47
- "url": f"https://pubmed.ncbi.nlm.nih.gov/{paper['MedlineCitation']['PMID']}/"
48
- })
49
- except: continue
50
- return results
51
- except Exception as e:
52
- print(f"Pubmed Error: {e}")
53
- return []
54
-
55
- def store_articles(self, articles, category="General", risk="All"):
56
- """Modular requirement: Stores articles with deduplication."""
57
- added = 0
58
- with self.engine.connect() as conn:
59
- for art in articles:
60
- # Preprocessing
61
- clean_title = TextProcessor.clean_html(art['title'])
62
- clean_content = TextProcessor.clean_html(art['content'])
63
-
64
- query = text("""
65
- INSERT INTO articles
66
- (title, content_clean, content_raw, category, risk_level, status, format_type, external_url)
67
- VALUES (:t, :cc, :cr, :cat, :risk, 'Approved', 'pubmed', :url)
68
- ON CONFLICT (external_url) DO NOTHING
69
- """)
70
-
71
- try:
72
- res = conn.execute(query, {
73
- "t": clean_title,
74
- "cc": clean_content,
75
- "cr": f"<h3>Source: PubMed</h3><p>{art['content']}</p>",
76
- "cat": category,
77
- "risk": risk,
78
- "url": art['url']
79
- })
80
- conn.commit()
81
- if res.rowcount > 0: added += 1
82
- except Exception as e:
83
- print(f"DB Error: {e}")
84
- print(f"✅ Stored {added} new articles.")
85
- return added
86
-
87
- def build_tfidf_model(self, force=False):
88
- """Modular requirement: Builds the TF-IDF model with weighted fields."""
89
- print("🧠 Building Weighted TF-IDF Model...")
90
- # Use ORDER BY for deterministic indexing
91
- df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine)
92
- df = df.reset_index(drop=True)
93
-
94
- if df.empty:
95
- print("⚠️ No articles to build model.")
96
- return
97
-
98
- # Multi-Field Weighting
99
- # Title (3x) + Content (1x) + Tags/Categories (1x)
100
- # We also apply normalization and phrase detection
101
-
102
- def prepare_features(row):
103
- title = TextProcessor.normalize(row['title'])
104
- content = TextProcessor.normalize(row['content_clean'])
105
- tags = TextProcessor.normalize(str(row['tags']) + " " + str(row['category']))
106
-
107
- # Phrase detection on title and content
108
- title = TextProcessor.detect_phrases(title)
109
- content = TextProcessor.detect_phrases(content)
110
-
111
- # Weighted concatenation
112
- return (title + " ") * 3 + content + " " + tags
113
-
114
- features = df.apply(prepare_features, axis=1)
115
-
116
- vectorizer = TfidfVectorizer(ngram_range=(1, 2)) # Support bigrams natively
117
- tfidf_matrix = vectorizer.fit_transform(features)
118
-
119
- joblib.dump(vectorizer, 'vectorizer.pkl')
120
- joblib.dump(tfidf_matrix, 'tfidf_matrix.pkl')
121
- print(f"💾 Model optimized and saved. Vocabulary size: {len(vectorizer.vocabulary_)}")
122
-
123
- if __name__ == "__main__":
124
- service = IngestionService()
125
- # 24-hour broad update
126
- arts = service.fetch_from_pubmed("postpartum depression OR maternal mental health", 100)
127
- if arts:
128
- service.store_articles(arts)
129
- service.build_tfidf_model()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import pandas as pd
3
+ import joblib
4
+ from Bio import Entrez
5
+ from sqlalchemy import create_engine, text
6
+ from urllib.parse import quote_plus
7
+ from text_utils import TextProcessor
8
+ from sklearn.feature_extraction.text import TfidfVectorizer
9
+
10
+ # --- CONFIGURATION ---
11
+ import os
12
+ DATABASE_URL = os.getenv("DATABASE_URL")
13
+ if not DATABASE_URL:
14
+ DB_USER = os.getenv("DB_USER", "postgres")
15
+ DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD", "subisu"))
16
+ DB_HOST = os.getenv("DB_HOST", "localhost")
17
+ DB_PORT = os.getenv("DB_PORT", "5432")
18
+ DB_NAME = os.getenv("DB_NAME", "ppd_project_db")
19
+ DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
20
+ else:
21
+ # Ensure URL is compatible with SQLAlchemy if it starts with postgres://
22
+ if DATABASE_URL.startswith("postgres://"):
23
+ DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1)
24
+ elif "postgresql://" in DATABASE_URL and "+psycopg2" not in DATABASE_URL:
25
+ DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1)
26
+ DB_URI = DATABASE_URL
27
+
28
+
29
+ Entrez.email = "surbi.211740@ncit.edu.np"
30
+ Entrez.tool = "PPD_Recommender_App"
31
+
32
+ class IngestionService:
33
+ def __init__(self):
34
+ self.engine = create_engine(DB_URI)
35
+
36
+ def fetch_from_pubmed(self, query, limit=100):
37
+ print(f"🔍 Searching PubMed: '{query}'...")
38
+ try:
39
+ h1 = Entrez.esearch(db="pubmed", term=query, retmax=limit, sort="relevance")
40
+ ids = Entrez.read(h1)["IdList"]
41
+
42
+ if not ids: return []
43
+
44
+ h2 = Entrez.efetch(db="pubmed", id=ids, retmode="xml")
45
+ papers = Entrez.read(h2)
46
+
47
+ results = []
48
+ for paper in papers['PubmedArticle']:
49
+ try:
50
+ article = paper['MedlineCitation']['Article']
51
+ title = article.get('ArticleTitle', '')
52
+ abstract_data = article.get('Abstract', {}).get('AbstractText', [])
53
+ abstract = " ".join([str(x) for x in abstract_data]) if isinstance(abstract_data, list) else str(abstract_data)
54
+
55
+ if not abstract: continue
56
+
57
+ results.append({
58
+ "title": title,
59
+ "content": abstract,
60
+ "url": f"https://pubmed.ncbi.nlm.nih.gov/{paper['MedlineCitation']['PMID']}/"
61
+ })
62
+ except: continue
63
+ return results
64
+ except Exception as e:
65
+ print(f"Pubmed Error: {e}")
66
+ return []
67
+
68
+ def store_articles(self, articles, category="General", risk="All"):
69
+ """Modular requirement: Stores articles with deduplication."""
70
+ added = 0
71
+ with self.engine.connect() as conn:
72
+ for art in articles:
73
+ # Preprocessing
74
+ clean_title = TextProcessor.clean_html(art['title'])
75
+ clean_content = TextProcessor.clean_html(art['content'])
76
+
77
+ query = text("""
78
+ INSERT INTO articles
79
+ (title, content_clean, content_raw, category, risk_level, status, format_type, external_url)
80
+ VALUES (:t, :cc, :cr, :cat, :risk, 'Approved', 'pubmed', :url)
81
+ ON CONFLICT (external_url) DO NOTHING
82
+ """)
83
+
84
+ try:
85
+ res = conn.execute(query, {
86
+ "t": clean_title,
87
+ "cc": clean_content,
88
+ "cr": f"<h3>Source: PubMed</h3><p>{art['content']}</p>",
89
+ "cat": category,
90
+ "risk": risk,
91
+ "url": art['url']
92
+ })
93
+ conn.commit()
94
+ if res.rowcount > 0: added += 1
95
+ except Exception as e:
96
+ print(f"DB Error: {e}")
97
+ print(f"✅ Stored {added} new articles.")
98
+ return added
99
+
100
+ def build_tfidf_model(self, force=False):
101
+ """Modular requirement: Builds the TF-IDF model with weighted fields."""
102
+ print("🧠 Building Weighted TF-IDF Model...")
103
+ # Use ORDER BY for deterministic indexing
104
+ df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine)
105
+ df = df.reset_index(drop=True)
106
+
107
+ if df.empty:
108
+ print("⚠️ No articles to build model.")
109
+ return
110
+
111
+ # Multi-Field Weighting
112
+ # Title (3x) + Content (1x) + Tags/Categories (1x)
113
+ # We also apply normalization and phrase detection
114
+
115
+ def prepare_features(row):
116
+ title = TextProcessor.normalize(row['title'])
117
+ content = TextProcessor.normalize(row['content_clean'])
118
+ tags = TextProcessor.normalize(str(row['tags']) + " " + str(row['category']))
119
+
120
+ # Phrase detection on title and content
121
+ title = TextProcessor.detect_phrases(title)
122
+ content = TextProcessor.detect_phrases(content)
123
+
124
+ # Weighted concatenation
125
+ return (title + " ") * 3 + content + " " + tags
126
+
127
+ features = df.apply(prepare_features, axis=1)
128
+
129
+ vectorizer = TfidfVectorizer(ngram_range=(1, 2)) # Support bigrams natively
130
+ tfidf_matrix = vectorizer.fit_transform(features)
131
+
132
+ joblib.dump(vectorizer, 'vectorizer.pkl')
133
+ joblib.dump(tfidf_matrix, 'tfidf_matrix.pkl')
134
+ print(f"💾 Model optimized and saved. Vocabulary size: {len(vectorizer.vocabulary_)}")
135
+
136
+ if __name__ == "__main__":
137
+ service = IngestionService()
138
+ # 24-hour broad update
139
+ arts = service.fetch_from_pubmed("postpartum depression OR maternal mental health", 100)
140
+ if arts:
141
+ service.store_articles(arts)
142
+ service.build_tfidf_model()