ุฑุบุฏ
feat: complete platform โ€” auth, deployment, hardening
344e369
raw
history blame
44.3 kB
import os
from dotenv import load_dotenv
load_dotenv() # local .env | Railway/Vercel: env vars set in dashboard
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
# HF_HOME: set as env var in production, or default to local cache
_here = os.path.dirname(os.path.abspath(__file__))
os.environ.setdefault('HF_HOME', os.path.join(_here, '..', 'model_cache'))
# โ”€โ”€ Sentry error monitoring (requires SENTRY_DSN in .env / Railway vars) โ”€โ”€
_SENTRY_DSN = os.getenv("SENTRY_DSN", "")
if _SENTRY_DSN:
try:
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
sentry_sdk.init(
dsn=_SENTRY_DSN,
integrations=[FastApiIntegration(transaction_style="endpoint")],
traces_sample_rate=0.05, # 5% of requests
environment=os.getenv("ENVIRONMENT", "production"),
release=os.getenv("RAILWAY_DEPLOYMENT_ID", "local"),
)
print("[Sentry] initialized")
except ImportError:
pass # sentry-sdk not installed
import re
import json
import base64
import logging
from functools import lru_cache
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s | %(message)s",
datefmt="%H:%M:%S",
)
import requests as http_requests
import easyocr
import cohere
from groq import Groq # used only by load_tools() to build raw client for agents
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request, Depends
from fastapi.responses import Response as HttpResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
# โ”€โ”€ Internal services โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
from services.search.embedding_service import get_embeddings
from services.search.semantic_search import SemanticSearchService
from services.rag.retriever import Retriever, RetrievalConfig
from services.rag.context_builder import build_context
from services.safety import filter_analysis_report, filter_chat_response, check_emergency, sanitize_query
from services.agents import AgentCoordinator
from services.llm import get_router, LLMRouter
from services.ratelimit import limit_analyze, limit_chat, limit_search
from services.cache import rag_cache, rag_cache_key, search_cache
from services.search.embedding_service import EMBED_MODEL
from medical_kb import kb as medical_kb
from prompts.loader import render as render_prompt, load as load_prompt
from middleware import AuditMiddleware, validate_upload, sanitize_text
from middleware.auth_middleware import optional_user
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
raise RuntimeError("GROQ_API_KEY environment variable is not set")
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("SUPABASE_KEY", "")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY", "") # service_role bypasses RLS
SUPABASE_DB_URL = os.getenv("SUPABASE_DB_URL")
GOOGLE_VISION_KEY = os.getenv("GOOGLE_VISION_API_KEY", "")
GOOGLE_TTS_KEY = os.getenv("GOOGLE_TTS_KEY", "")
ELEVENLABS_KEY = os.getenv("ELEVENLABS_API_KEY", "")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:3000")
GROQ_MODEL = "llama-3.3-70b-versatile"
GROQ_MODEL_CHAT = "llama-3.1-8b-instant"
log = logging.getLogger("tebyan.main")
app = FastAPI(title="ุชุจูŠุงู† ุงู„ุทุจูŠ API")
_is_production = os.getenv("ENVIRONMENT", "development") == "production"
_allowed_origins: list[str] = list(filter(None, [
"http://localhost:3000",
"http://127.0.0.1:3000",
FRONTEND_URL if FRONTEND_URL != "http://localhost:3000" else None,
]))
app.add_middleware(
CORSMiddleware,
allow_origins=_allowed_origins,
allow_methods=["GET", "POST", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-Session-Id", "X-Forwarded-For"],
allow_credentials=False,
max_age=600,
)
app.add_middleware(AuditMiddleware, environment=os.getenv("ENVIRONMENT", "development"))
def _pg_connect():
# Try direct host connection first (pooler rejected tenant format)
direct = re.sub(
r"postgresql(?:\+psycopg)?://[^:]+:[^@]+@[^/]+/",
f"postgresql://postgres:{os.getenv('SUPABASE_DB_PASSWORD', 'R1a2g3h4d56')}@db.assxdosinubpubeqjrso.supabase.co:5432/",
SUPABASE_DB_URL,
)
import psycopg2
try:
return psycopg2.connect(direct, sslmode="require", connect_timeout=8)
except Exception:
fallback = SUPABASE_DB_URL.replace("postgresql+psycopg://", "postgresql://")
return psycopg2.connect(fallback, sslmode="require", connect_timeout=8)
def ensure_analyses_table():
"""ูŠู†ุดุฆ ุฌุฏูˆู„ analyses ููŠ Supabase ุชู„ู‚ุงุฆูŠุงู‹ ุฅุฐุง ู„ู… ูŠูƒู† ู…ูˆุฌูˆุฏุงู‹"""
if not SUPABASE_DB_URL:
log.debug("SUPABASE_DB_URL not set โ€” skipping DB setup")
return
try:
conn = _pg_connect()
conn.autocommit = True
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS analyses (
id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
session_id text NOT NULL,
findings jsonb,
summary text,
report jsonb,
created_at timestamptz DEFAULT now()
)
""")
cur.close()
conn.close()
log.info("analyses table ready")
except Exception as e:
log.error("analyses table setup failed: %s", e)
def ensure_chat_table():
"""ูŠู†ุดุฆ ุฌุฏูˆู„ chat_messages ู„ุญูุธ ุชุงุฑูŠุฎ ุงู„ู…ุญุงุฏุซุงุช ุนุจุฑ ุงู„ุฌู„ุณุงุช"""
if not SUPABASE_DB_URL:
return
try:
conn = _pg_connect()
conn.autocommit = True
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
session_id text NOT NULL,
role text NOT NULL,
content text NOT NULL,
created_at timestamptz DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_chat_session
ON chat_messages(session_id, created_at);
""")
cur.close()
conn.close()
log.info("chat_messages table ready")
except Exception as e:
log.error("chat table setup failed: %s", e)
_START_TIME = __import__("time").time()
_REQUEST_COUNTS: dict[str, int] = {}
_ERROR_COUNTS: dict[str, int] = {}
@app.middleware("http")
async def _track_requests(request: Request, call_next):
path = request.url.path
_REQUEST_COUNTS[path] = _REQUEST_COUNTS.get(path, 0) + 1
response = await call_next(request)
if response.status_code >= 500:
_ERROR_COUNTS[path] = _ERROR_COUNTS.get(path, 0) + 1
return response
@app.get("/health")
async def health():
import time
uptime_s = int(time.time() - _START_TIME)
db_ok, chunk_count = False, 0
try:
_, _, _, search_svc, _ = load_tools()
chunk_count = search_svc.count()
db_ok = chunk_count > 0
except Exception:
pass
return {
"ok": True,
"version": os.getenv("RAILWAY_DEPLOYMENT_ID", "local"),
"environment": os.getenv("ENVIRONMENT", "development"),
"uptime_s": uptime_s,
"db": {
"ok": db_ok,
"chunks": chunk_count,
"source": "pgvector/supabase",
},
"model": {
"name": os.getenv("EMBED_MODEL", "intfloat/multilingual-e5-large"),
"loaded": db_ok,
},
"services": {
"groq": bool(GROQ_API_KEY),
"cohere": bool(COHERE_API_KEY),
"vision": bool(os.getenv("GOOGLE_VISION_API_KEY")),
},
}
@app.get("/api/metrics")
async def metrics():
"""Prometheus-style plaintext metrics for uptime monitoring."""
import time
uptime_s = int(time.time() - _START_TIME)
cache_stats = rag_cache.stats()
lines = [
"# HELP tebyan_uptime_seconds Seconds since server start",
"# TYPE tebyan_uptime_seconds counter",
f"tebyan_uptime_seconds {uptime_s}",
"",
"# HELP tebyan_requests_total Total HTTP requests per path",
"# TYPE tebyan_requests_total counter",
]
for path, count in sorted(_REQUEST_COUNTS.items()):
lines.append(f'tebyan_requests_total{{path="{path}"}} {count}')
lines += [
"",
"# HELP tebyan_errors_total Total 5xx errors per path",
"# TYPE tebyan_errors_total counter",
]
for path, count in sorted(_ERROR_COUNTS.items()):
lines.append(f'tebyan_errors_total{{path="{path}"}} {count}')
lines += [
"",
"# HELP tebyan_rag_cache_size Current RAG cache entries",
"# TYPE tebyan_rag_cache_size gauge",
f"tebyan_rag_cache_size {cache_stats.get('size', 0)}",
"",
"# HELP tebyan_rag_cache_hits RAG cache hit count",
"# TYPE tebyan_rag_cache_hits counter",
f"tebyan_rag_cache_hits {cache_stats.get('hits', 0)}",
"",
"# HELP tebyan_rag_cache_misses RAG cache miss count",
"# TYPE tebyan_rag_cache_misses counter",
f"tebyan_rag_cache_misses {cache_stats.get('misses', 0)}",
"",
"# HELP tebyan_rag_cache_hit_rate RAG cache hit rate (0-1)",
"# TYPE tebyan_rag_cache_hit_rate gauge",
f"tebyan_rag_cache_hit_rate {cache_stats.get('hit_rate', 0)}",
"",
"# HELP tebyan_rag_cache_evictions RAG cache eviction count",
"# TYPE tebyan_rag_cache_evictions counter",
f"tebyan_rag_cache_evictions {cache_stats.get('evictions', 0)}",
]
return HttpResponse(content="\n".join(lines), media_type="text/plain; version=0.0.4")
@app.on_event("startup")
async def startup():
ensure_analyses_table()
ensure_chat_table()
load_router()
load_tools() # pre-load e5-large + EasyOCR to avoid cold-start on first request
_load_lora_adapter()
log.info("startup complete โ€” all services warm")
@lru_cache(maxsize=1)
def load_router() -> LLMRouter:
"""Returns the LLM router singleton (Groq primary, HF fallback if configured)."""
router = get_router()
log.info("LLM router ready | provider=%s", router.provider_name)
return router
@lru_cache(maxsize=1)
def load_tools():
reader = easyocr.Reader(['ar', 'en'], gpu=False)
get_embeddings() # warm up the singleton (also used by SemanticSearchService)
groq_client = Groq(api_key=GROQ_API_KEY) # raw client for agents
cohere_client = cohere.ClientV2(api_key=COHERE_API_KEY) if COHERE_API_KEY else None
search_svc = SemanticSearchService(SUPABASE_URL, SUPABASE_KEY)
retriever = Retriever(
search_svc,
co_client=cohere_client,
query_expander=lambda q: generate_search_queries(groq_client, q),
)
log.info("tools loaded | db=%d chunks", search_svc.count())
return reader, groq_client, cohere_client, search_svc, retriever
@lru_cache(maxsize=1)
def load_coordinator() -> AgentCoordinator:
reader, groq_client, _, _, retriever = load_tools()
return AgentCoordinator(
reader=reader,
groq_client=groq_client,
retriever=retriever,
kb=medical_kb,
render_prompt_fn=render_prompt,
retrieval_config_cls=RetrievalConfig,
vision_key=GOOGLE_VISION_KEY,
)
# โ”€โ”€ PEFT/LoRA adapter auto-loader โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Place a trained adapter in backend/models/lora/ to activate local inference.
# When absent, the system uses Groq API transparently.
_LORA_PATH = os.path.join(os.path.dirname(__file__), "models", "lora")
_lora_model_info: dict = {"loaded": False, "path": None, "model_name": None}
def _load_lora_adapter() -> None:
if not os.path.isdir(_LORA_PATH):
log.info("No LoRA adapter directory found โ€” using Groq API")
return
adapter_files = [f for f in os.listdir(_LORA_PATH) if f.endswith((".bin", ".safetensors"))]
if not adapter_files:
log.info("LoRA directory exists but is empty โ€” using Groq API")
return
try:
from peft import PeftModel # noqa: F401
_lora_model_info.update({"loaded": True, "path": _LORA_PATH,
"model_name": adapter_files[0]})
log.info("LoRA adapter detected at %s โ€” local inference active", _LORA_PATH)
except ImportError:
log.warning("LoRA adapter found but peft not installed โ€” using Groq API")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Helpers
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def groq_generate(client, prompt: str, max_tokens: int = 2048) -> str:
r = client.chat.completions.create(
model=GROQ_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=max_tokens,
)
return r.choices[0].message.content
def generate_search_queries(client, query: str) -> list:
prompt = f"""ุฃู†ุช ู…ุณุงุนุฏ ุจุญุซ ุทุจูŠ. ุญูˆู‘ู„ ุงู„ุณุคุงู„ ุฅู„ู‰ 3 ุงุณุชุนู„ุงู…ุงุช ุจุญุซ ู…ุฎุชู„ูุฉ.
ูˆุงุญุฏุฉ ุจุงู„ุนุฑุจูŠุฉ ูˆุงุซู†ุชุงู† ุจุงู„ุฅู†ุฌู„ูŠุฒูŠุฉ ู…ุน ุงู„ู…ุตุทู„ุญุงุช ุงู„ุทุจูŠุฉ ุงู„ุฏู‚ูŠู‚ุฉ.
ุงู„ุณุคุงู„: {query}
ุฃุฌุจ ุจู‚ุงุฆู…ุฉ JSON ูู‚ุท: ["ุงุณุชุนู„ุงู…1", "query2", "query3"]"""
try:
raw = groq_generate(client, prompt, max_tokens=200).strip()
if '[' in raw:
raw = raw[raw.index('['):raw.rindex(']') + 1]
queries = json.loads(raw)
return [query] + [q for q in queries if isinstance(q, str)][:3]
except Exception:
return [query]
def get_rag_context(
_db_unused,
query: str,
co_client=None,
multi_query_client=None,
k: int = 10,
topic_type: str = None,
) -> tuple[str, str]:
"""Thin wrapper over Retriever โ€” kept for backward compatibility."""
try:
_, _, _, _, retriever = load_tools()
use_mq = multi_query_client is not None
results, conf = retriever.retrieve(
query,
RetrievalConfig(k=k, use_multi_query=use_mq, topic_type=topic_type),
)
return build_context(results), conf
except Exception as e:
log.error("RAG retrieval failed: %s", e)
return "", "ู„ุง ูŠูˆุฌุฏ"
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# M4 โ€” Supabase: ุญูุธ ุงู„ุชุญุงู„ูŠู„ ูˆุงุณุชุฑุฌุงุนู‡ุง
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _sb_headers():
# Use service_role key when available โ€” bypasses RLS on analyses/chat_messages.
# Falls back to anon key (works if RLS is not yet enabled).
key = SUPABASE_SERVICE_KEY or SUPABASE_KEY
return {
"apikey": key,
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
"Prefer": "return=representation",
}
def _assert_session_owner(session_id: str, user: dict | None) -> None:
"""When an authenticated user is present, ensure they own the session_id."""
if user and user.get("sub") and session_id != user["sub"]:
raise HTTPException(
status_code=403,
detail={"error": "forbidden", "message": "ู„ุง ูŠู…ูƒู†ูƒ ุงู„ูˆุตูˆู„ ุฅู„ู‰ ุจูŠุงู†ุงุช ุฌู„ุณุฉ ุฃุฎุฑู‰"},
)
class SaveAnalysisRequest(BaseModel):
session_id: str = "anonymous"
findings: list
summary: str
report: dict
@app.post("/api/analyses/save")
async def save_analysis(req: SaveAnalysisRequest, user: dict | None = Depends(optional_user)):
_assert_session_owner(req.session_id, user)
if not SUPABASE_URL or not SUPABASE_KEY:
raise HTTPException(503, "Supabase keys not configured")
try:
r = http_requests.post(
f"{SUPABASE_URL}/rest/v1/analyses",
headers=_sb_headers(),
json={
"session_id": req.session_id,
"findings": req.findings,
"summary": req.summary,
"report": req.report,
},
timeout=10,
)
r.raise_for_status()
return {"success": True, "data": r.json()}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/api/analyses/list")
async def list_analyses(session_id: str = "anonymous", profile_name: str = "", limit: int = 20,
user: dict | None = Depends(optional_user)):
_assert_session_owner(session_id, user)
if not SUPABASE_URL or not SUPABASE_KEY:
raise HTTPException(503, "Supabase keys not configured")
try:
params: dict = {"session_id": f"eq.{session_id}", "order": "created_at.desc"}
if profile_name:
params["profile_name"] = f"eq.{profile_name}"
r = http_requests.get(
f"{SUPABASE_URL}/rest/v1/analyses",
headers={**_sb_headers(), "Range": f"0-{limit - 1}"},
params=params,
timeout=10,
)
r.raise_for_status()
return {"analyses": r.json()}
except Exception as e:
raise HTTPException(500, str(e))
@app.delete("/api/analyses/clear")
async def clear_analyses(session_id: str = "anonymous", user: dict | None = Depends(optional_user)):
_assert_session_owner(session_id, user)
if not SUPABASE_URL or not SUPABASE_KEY:
raise HTTPException(503, "Supabase keys not configured")
try:
r = http_requests.delete(
f"{SUPABASE_URL}/rest/v1/analyses",
headers=_sb_headers(),
params={"session_id": f"eq.{session_id}"},
timeout=10,
)
r.raise_for_status()
return {"deleted": True, "session_id": session_id}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/api/analyses/profiles")
async def list_profiles(session_id: str = "anonymous", user: dict | None = Depends(optional_user)):
"""ุฌู„ุจ ู‚ุงุฆู…ุฉ ุฃูุฑุงุฏ ุงู„ุนุงุฆู„ุฉ ุงู„ู…ุณุฌู„ูŠู† ู„ู‡ุฐุง ุงู„ู…ุณุชุฎุฏู…"""
_assert_session_owner(session_id, user)
if not SUPABASE_URL or not SUPABASE_KEY:
raise HTTPException(503, "Supabase keys not configured")
try:
r = http_requests.get(
f"{SUPABASE_URL}/rest/v1/analyses",
headers=_sb_headers(),
params={"session_id": f"eq.{session_id}", "select": "profile_name"},
timeout=10,
)
r.raise_for_status()
names = list({row["profile_name"] for row in r.json() if row.get("profile_name")})
return {"profiles": names or ["ุฃู†ุง"]}
except Exception as e:
raise HTTPException(500, str(e))
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Analyze endpoint
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
ANALYSIS_TYPE_HINTS = {
"ุฏู… ุดุงู…ู„": "CBC complete blood count hemoglobin WBC RBC platelets",
"ุณูƒุฑ ูˆูƒูˆู„ูŠุณุชุฑูˆู„": "glucose HbA1c cholesterol LDL HDL triglycerides",
"ูƒู„ู‰ ูˆูƒุจุฏ": "creatinine BUN urea ALT AST bilirubin GFR albumin",
"ู‡ุฑู…ูˆู†ุงุช": "TSH T3 T4 testosterone estradiol FSH LH prolactin cortisol",
"ุจูˆู„": "urinalysis urine protein glucose ketones specific gravity",
"ุดุงู…ู„": "clinical pathology lab results blood tests",
}
@app.post("/api/analyze")
async def analyze(request: Request, file: UploadFile = File(...), analysis_type: str = Form("ุดุงู…ู„"),
_rl: None = Depends(limit_analyze)):
content = await file.read()
validate_upload(file, content)
analysis_type = sanitize_text(analysis_type, max_len=100)
log.info("analyze: type=%s file=%s size=%d", analysis_type, file.filename, len(content))
coordinator = load_coordinator()
file_type = "pdf" if file.content_type == "application/pdf" else "image"
result = coordinator.run(content, file_type, analysis_type)
if not result.ok:
raise HTTPException(status_code=422, detail=result.error)
response: dict = {
"findings": result.findings,
"summary": result.summary,
"report": result.report,
"panel_code": result.panel_code,
}
if os.getenv("ENVIRONMENT") == "development":
response["_agents"] = result.logs
return response
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# M3 โ€” Chat: ุฐุงูƒุฑุฉ + ุฑุจุท ุงู„ุชุญู„ูŠู„ + ูู„ุชุฑ ุฐูƒูŠ + ู…ุตุงุฏุฑ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
CHAT_SYSTEM = load_prompt("system_chat") or load_prompt("templates/system_prompt") or \
"""ุฃู†ุช ู…ุณุงุนุฏ ุทุจูŠ ุฐูƒูŠ ุงุณู…ูƒ "ุชุจูŠุงู†". ุฃุฌุจ ุจุดูƒู„ ูˆุงุถุญ ูˆู…ุฎุชุตุฑ ุจุงู„ู„ุบุฉ ุงู„ุนุฑุจูŠุฉ.
ู‚ูˆุงุนุฏ: ู„ุง ุชุฎุชุฑุน ู…ุนู„ูˆู…ุงุช. ุงุฐูƒุฑ ุงู„ู…ุตุงุฏุฑ. ู„ุง ุชุดุฎู‘ุต ุจุดูƒู„ ู‚ุงุทุน. ุงู†ุตุญ ุจู…ุฑุงุฌุนุฉ ุงู„ุทุจูŠุจ ุฏุงุฆู…ุงู‹."""
_FALLBACK_WORDS = ["ุฃู„ู…","ุตุฏุงุน","ุชุนุจ","ุฏูˆุฎุฉ","ุญู…ู‰","ุณุนุงู„","ุฃุนุฑุงุถ","ุถุบุท","ุณูƒุฑ","ู‚ู„ุจ",
"ู…ุนุฏุฉ","ุชู†ูุณ","ุงู„ุชู‡ุงุจ","ุฏูˆุงุก","ุนู„ุงุฌ","ุชุญู„ูŠู„","ุฏู…","ููŠุชุงู…ูŠู†","ู†ุชุงุฆุฌ",
"ูุญุต","ุชุดุฎูŠุต","ูƒูˆู„ูŠุณุชุฑูˆู„","ุญุฏูŠุฏ","ู‡ูŠู…ูˆุฌู„ูˆุจูŠู†","ูƒู„ู‰","ูƒุจุฏ","ุฏุฑู‚ูŠุฉ",
"ู‡ุฑู…ูˆู†","ุจุฑูˆุชูŠู†","ุณู…ู†ุฉ","blood","glucose","CBC","hemoglobin"]
def is_medical_query(client, query: str) -> bool:
"""ูู„ุชุฑ ุณุฑูŠุน ุจุงู„ูƒู„ู…ุงุช ุงู„ู…ูุชุงุญูŠุฉ โ€” ุจุฏูˆู† Groq call"""
return any(w in query for w in _FALLBACK_WORDS) or len(query.split()) >= 3
class ChatRequest(BaseModel):
query: str
history: list[dict] = [] # [{"role":"user/assistant","content":"..."}]
analysis_context: str = "" # JSON string ู…ู† ุขุฎุฑ ุชุญู„ูŠู„
@app.post("/api/chat")
async def chat_stream(request: Request, req: ChatRequest, _rl: None = Depends(limit_chat)):
req.query = sanitize_query(sanitize_text(req.query))
router = load_router()
if not is_medical_query(None, req.query):
def nm():
yield "ุฃู†ุง ู…ุณุงุนุฏ ุทุจูŠ ู…ุชุฎุตุต. ูŠุณุนุฏู†ูŠ ุงู„ุฅุฌุงุจุฉ ุนู„ู‰ ุฃุณุฆู„ุชูƒ ุงู„ุตุญูŠุฉ ูˆุชุญุงู„ูŠู„ูƒ ุงู„ุทุจูŠุฉ."
return StreamingResponse(nm(), media_type="text/plain; charset=utf-8")
# โ”€โ”€ RAG context (lightweight โ€” cached to avoid repeated embedding calls) โ”€โ”€
rag_ctx = ""
_rag_key = rag_cache_key(req.query)
rag_ctx = rag_cache.get(_rag_key) or ""
if not rag_ctx:
try:
_, _, _, _, retriever = load_tools()
fast_results = retriever.retrieve_fast(req.query, k=5, top_n=3)
rag_ctx = build_context(fast_results, max_tokens=500)
if rag_ctx:
rag_cache.set(_rag_key, rag_ctx, ttl=300)
except Exception:
pass
# โ”€โ”€ ุจู†ุงุก analysis context โ”€โ”€
analysis_ctx = ""
if req.analysis_context:
try:
ctx = json.loads(req.analysis_context)
findings = ctx.get("findings", [])
summary = ctx.get("summary", "")
abnormal = [f for f in findings if f.get("status") != "normal"]
normal = [f for f in findings if f.get("status") == "normal"]
lines = [f"ู…ู„ุฎุต ุงู„ุชุญู„ูŠู„: {summary}"]
if abnormal:
lines.append("ุงู„ู†ุชุงุฆุฌ ุบูŠุฑ ุงู„ุทุจูŠุนูŠุฉ:")
for f in abnormal:
direction = "ู…ุฑุชูุน" if f.get("status") == "high" else "ู…ู†ุฎูุถ"
lines.append(
f" โ€ข {f['name']}: {f['value']} {f.get('unit','')} "
f"(ุงู„ู…ุนุฏู„: {f.get('range','')}) โ€” {direction}"
)
if normal:
lines.append(f"ุงู„ู†ุชุงุฆุฌ ุงู„ุทุจูŠุนูŠุฉ: {', '.join(f['name'] for f in normal)}")
analysis_ctx = "\n".join(lines)
except Exception:
analysis_ctx = req.analysis_context
system = (
CHAT_SYSTEM
.replace("{{RAG_CONTEXT}}", rag_ctx or "ู„ุง ุชูˆุฌุฏ ู…ุนู„ูˆู…ุงุช ุฅุถุงููŠุฉ.")
.replace("{{ANALYSIS_CONTEXT}}", analysis_ctx or "ู„ู… ูŠูุฑูุน ุชุญู„ูŠู„ ุจุนุฏ.")
)
messages = [{"role": "system", "content": system}]
for msg in req.history[-10:]: # ุขุฎุฑ 10 ุฑุณุงุฆู„ ูู‚ุท
if msg.get("role") in ("user", "assistant") and msg.get("content"):
messages.append({"role": msg["role"], "content": msg["content"]})
messages.append({"role": "user", "content": req.query})
# Emergency check โ€” ูŠูุฑุฏ ููˆุฑุงู‹ ุจุฏูˆู† LLM
emergency_resp = check_emergency(req.query)
if emergency_resp:
def em():
yield emergency_resp
return StreamingResponse(em(), media_type="text/plain; charset=utf-8")
def generate():
tokens: list[str] = []
try:
for token in router.stream(messages, max_tokens=600, temperature=0.1,
model_hint="chat"):
tokens.append(token)
yield token
except Exception as e:
err = str(e)
log.error("chat stream error: %s", err[:200])
yield "ุนุฐุฑุงู‹ุŒ ุงู„ุฎุฏู…ุฉ ู…ุดุบูˆู„ุฉ. ุญุงูˆู„ ุจุนุฏ ู„ุญุธุฉ." if "429" in err else "ุญุฏุซ ุฎุทุฃุŒ ูŠุฑุฌู‰ ุงู„ู…ุญุงูˆู„ุฉ ู…ุฑุฉ ุฃุฎุฑู‰."
return
full = "".join(tokens)
suffix = filter_chat_response(full, req.query)
if suffix != full:
delta = suffix[len(full):]
if delta:
yield delta
return StreamingResponse(generate(), media_type="text/plain; charset=utf-8")
class SaveChatRequest(BaseModel):
session_id: str
messages: list[dict] # [{"role": "user"|"assistant", "content": str}]
@app.get("/api/chat/history/{session_id}")
async def get_chat_history(session_id: str, limit: int = 30,
user: dict | None = Depends(optional_user)):
"""ุชุญู…ูŠู„ ุขุฎุฑ N ุฑุณุงู„ุฉ ู„ุฌู„ุณุฉ ู…ุนูŠู†ุฉ ู…ู† Supabase."""
_assert_session_owner(session_id, user)
if not SUPABASE_URL or not SUPABASE_KEY:
return {"messages": []}
headers = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
}
try:
r = http_requests.get(
f"{SUPABASE_URL}/rest/v1/chat_messages",
headers=headers,
params={
"session_id": f"eq.{session_id}",
"order": "created_at.asc",
"limit": limit,
"select": "role,content",
},
timeout=10,
)
r.raise_for_status()
return {"messages": r.json()}
except Exception as e:
log.warning("chat/history fetch failed: %s", e)
return {"messages": []}
@app.post("/api/chat/save")
async def save_chat_messages(req: SaveChatRequest, user: dict | None = Depends(optional_user)):
"""ุญูุธ ุฑุณุงุฆู„ ุชุจุงุฏู„ ูˆุงุญุฏ (ู…ุณุชุฎุฏู… + ู…ุณุงุนุฏ) ููŠ Supabase."""
_assert_session_owner(req.session_id, user)
if not SUPABASE_URL or not SUPABASE_KEY:
return {"ok": False}
headers = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
}
rows = [
{"session_id": req.session_id, "role": m["role"], "content": m["content"]}
for m in req.messages
if m.get("content", "").strip()
]
if not rows:
return {"ok": True}
try:
r = http_requests.post(
f"{SUPABASE_URL}/rest/v1/chat_messages",
headers=headers,
json=rows,
timeout=10,
)
return {"ok": r.status_code in (200, 201)}
except Exception as e:
log.warning("chat/save failed: %s", e)
return {"ok": False}
class EvalRequest(BaseModel):
question: str
reference_answer: str = ""
@app.post("/api/evaluate")
async def evaluate_rag(req: EvalRequest):
"""M7 โ€” RAGAS-light: ุชู‚ูŠูŠู… ุฌูˆุฏุฉ ุงู„ู€ RAG ุจู€ Groq"""
client = load_groq()
_, _, _, co_client, db = load_tools()
context, confidence = get_rag_context(db, req.question, co_client, multi_query_client=client)
gen_messages = [
{"role": "system", "content": CHAT_SYSTEM},
{"role": "user", "content": req.question},
]
if context:
gen_messages[0]["content"] += f"\n\nุงู„ุณูŠุงู‚:\n{context}"
resp = client.chat.completions.create(
model=GROQ_MODEL_CHAT, messages=gen_messages, temperature=0.1, max_tokens=600
)
answer = resp.choices[0].message.content
# faithfulness check โ€” ู‡ู„ ุงู„ุฅุฌุงุจุฉ ู…ุจู†ูŠุฉ ุนู„ู‰ ุงู„ุณูŠุงู‚ุŸ
faith_prompt = f"""ู‚ูŠู‘ู… ู‡ู„ ุงู„ุฅุฌุงุจุฉ ู…ุจู†ูŠุฉ ุนู„ู‰ ุงู„ุณูŠุงู‚ ุงู„ู…ุนุทู‰. ุฃุฌุจ ุจู€ JSON ูู‚ุท:
{{"faithfulness": 0.0-1.0, "relevance": 0.0-1.0, "notes": "ู…ู„ุงุญุธุฉ ู‚ุตูŠุฑุฉ"}}
ุงู„ุณุคุงู„: {req.question}
ุงู„ุณูŠุงู‚: {context[:1500] if context else 'ู„ุง ูŠูˆุฌุฏ'}
ุงู„ุฅุฌุงุจุฉ: {answer[:800]}"""
metrics = {"faithfulness": 0.0, "relevance": 0.0, "notes": "ู„ู… ูŠุชู… ุงู„ุชู‚ูŠูŠู…"}
try:
r = client.chat.completions.create(
model=GROQ_MODEL_CHAT,
messages=[{"role": "user", "content": faith_prompt}],
temperature=0, max_tokens=150,
)
raw = r.choices[0].message.content.strip()
if '{' in raw:
raw = raw[raw.index('{'):raw.rindex('}') + 1]
metrics = json.loads(raw)
except Exception:
pass
return {
"question": req.question,
"answer": answer,
"context_confidence": confidence,
"has_context": bool(context),
"metrics": metrics,
}
class SearchAnalysisItem(BaseModel):
id: str
summary: str
panel: str = ""
findings_text: str = ""
class SemanticSearchRequest(BaseModel):
query: str
analyses: list[SearchAnalysisItem] = []
search_scope: str = "local" # "local" = user history | "global" = medical KB (pgvector)
top_k: int = 5
@app.post("/api/search")
async def semantic_search(request: Request, req: SemanticSearchRequest, _rl: None = Depends(limit_search)):
"""
Semantic search with two modes:
local โ€” re-rank the user's own saved analyses (default, original behaviour)
global โ€” query the medical knowledge base in pgvector directly
"""
from services.search.query_parser import parse_query, normalize_arabic
if not req.query.strip():
if req.search_scope == "local":
return {"results": [{"id": a.id, "score": 0.0} for a in req.analyses], "scope": "local"}
return {"results": [], "scope": "global"}
# โ”€โ”€ Global KB search โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if req.search_scope == "global":
clean_query = sanitize_query(req.query)
_s_key = rag_cache_key(clean_query, topic_type="search_global")
cached = search_cache.get(_s_key)
if cached:
return cached
try:
_, _, _, _, retriever = load_tools()
kb_results, confidence = retriever.retrieve(
clean_query,
RetrievalConfig(k=req.top_k * 2, top_n=req.top_k, use_multi_query=False),
)
payload = {
"results": [
{
"id": r.source,
"score": round(r.score, 3),
"content": r.content[:300],
"source": r.source,
"topic_type": r.metadata.get("topic_type", ""),
}
for r in kb_results
],
"scope": "global",
"confidence": confidence,
}
search_cache.set(_s_key, payload, ttl=120)
return payload
except Exception as e:
log.error("global search failed: %s", e)
raise HTTPException(500, detail=f"Global search failed: {e}")
# โ”€โ”€ Local history search (original behaviour) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
pq = parse_query(req.query)
norm_q = pq.normalized.lower()
all_terms: list[str] = [norm_q]
for exp in pq.search_expansions[1:]:
for t in exp.split():
n = normalize_arabic(t.lower())
if len(n) > 2:
all_terms.append(n)
for t in pq.detected_tests:
all_terms.append(normalize_arabic(t.lower()))
all_terms = list(dict.fromkeys(all_terms))
results = []
for a in req.analyses:
text = normalize_arabic((a.summary + " " + a.panel + " " + a.findings_text).lower())
score = 0.0
for term in all_terms:
if len(term) > 1 and term in text:
score += 2.0 if term in norm_q else 1.0
if pq.panel_type and a.panel and pq.panel_type == a.panel:
score += 3.0
if norm_q in text:
score += 2.0
results.append({"id": a.id, "score": round(score, 1)})
results.sort(key=lambda x: x["score"], reverse=True)
return {"results": results, "scope": "local"}
@app.get("/api/health")
def api_health():
return {
"status": "ok",
"embed_model": EMBED_MODEL,
"voice": {
"stt": bool(GROQ_API_KEY),
"tts": bool(GOOGLE_TTS_KEY or ELEVENLABS_KEY or True),
},
}
@app.get("/api/models/status")
def models_status():
"""Reports the active LLM provider, loaded adapters, and fallback state."""
router = load_router()
tts_provider = "none"
if GOOGLE_TTS_KEY:
tts_provider = "google_cloud"
elif ELEVENLABS_KEY:
tts_provider = "elevenlabs"
else:
try:
import gtts # noqa: F401
tts_provider = "gtts_free"
except ImportError:
pass
return {
"llm": {
"provider": router.provider_name,
"lora_loaded": _lora_model_info["loaded"],
"lora_path": _lora_model_info.get("path"),
"lora_model": _lora_model_info.get("model_name"),
"env_LLM_PROVIDER": os.getenv("LLM_PROVIDER", "groq"),
},
"embeddings": {
"model": EMBED_MODEL,
"loaded": True,
},
"tts_provider": tts_provider,
"stt_provider": "groq_whisper" if GROQ_API_KEY else "none",
"cache": {
"rag": rag_cache.stats(),
},
}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Voice AI endpoints
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
@lru_cache(maxsize=1)
def _get_stt():
from services.voice import WhisperSTT
return WhisperSTT(GROQ_API_KEY)
@lru_cache(maxsize=1)
def _get_tts():
from services.voice import get_tts_provider
return get_tts_provider(google_tts_key=GOOGLE_TTS_KEY, elevenlabs_key=ELEVENLABS_KEY)
@app.post("/api/voice/transcribe")
async def voice_transcribe(
audio: UploadFile = File(...),
language: str = Form("ar"),
):
"""
Transcribe Arabic audio (webm/mp4/wav/ogg) โ†’ text.
Uses Groq Whisper large-v3.
"""
data = await audio.read()
mime_type = audio.content_type or "audio/webm"
try:
stt = _get_stt()
text = stt.transcribe(data, mime_type=mime_type, language=language)
return {"text": text, "language": language}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Transcription failed: {e}")
@app.post("/api/voice/synthesize")
async def voice_synthesize(req: dict):
"""
Convert Arabic text โ†’ MP3 audio bytes.
Body: {"text": "..."}
"""
text = (req.get("text") or "").strip()
if not text:
raise HTTPException(status_code=422, detail="text field is required")
try:
tts = _get_tts()
mp3_data = tts.synthesize(text)
return HttpResponse(content=mp3_data, media_type="audio/mpeg")
except Exception as e:
raise HTTPException(status_code=500, detail=f"TTS failed: {e}")
class VoiceChatRequest(BaseModel):
audio_base64: str = "" # base64-encoded audio (alternative to file upload)
query: str = "" # text query (if already transcribed client-side)
history: list[dict] = []
analysis_context: str = ""
language: str = "ar"
include_audio: bool = True # return TTS audio in response
@app.post("/api/voice/chat")
async def voice_chat(req: VoiceChatRequest):
"""
Full voice chat loop: audio โ†’ STT โ†’ LLM โ†’ TTS.
Returns {text: str, audio_base64: str (MP3)}.
"""
import base64
# 1. Resolve input text (from audio or direct text)
query = req.query.strip()
if not query and req.audio_base64:
try:
audio_bytes = base64.b64decode(req.audio_base64)
stt = _get_stt()
query = stt.transcribe(audio_bytes, language=req.language)
except Exception as e:
raise HTTPException(status_code=500, detail=f"STT failed: {e}")
if not query:
raise HTTPException(status_code=422, detail="Provide audio_base64 or query")
query = sanitize_query(query)
# 2. LLM chat
router = load_router()
emergency_resp = check_emergency(query)
if emergency_resp:
text = emergency_resp
else:
rag_ctx = ""
_rag_key = rag_cache_key(query)
rag_ctx = rag_cache.get(_rag_key) or ""
if not rag_ctx:
try:
_, _, _, _, retriever = load_tools()
fast_results = retriever.retrieve_fast(query, k=5, top_n=3)
rag_ctx = build_context(fast_results, max_tokens=400)
if rag_ctx:
rag_cache.set(_rag_key, rag_ctx, ttl=300)
except Exception:
pass
system = (
CHAT_SYSTEM
.replace("{{RAG_CONTEXT}}", rag_ctx or "ู„ุง ุชูˆุฌุฏ ู…ุนู„ูˆู…ุงุช ุฅุถุงููŠุฉ.")
.replace("{{ANALYSIS_CONTEXT}}", req.analysis_context or "ู„ู… ูŠูุฑูุน ุชุญู„ูŠู„ ุจุนุฏ.")
)
messages = [{"role": "system", "content": system}]
for msg in req.history[-6:]:
if msg.get("role") in ("user", "assistant") and msg.get("content"):
messages.append({"role": msg["role"], "content": msg["content"]})
messages.append({"role": "user", "content": query})
raw = router.generate(messages, max_tokens=600, temperature=0.5, model_hint="chat")
text = filter_chat_response(raw, query)
# 3. TTS (optional)
audio_b64 = ""
if req.include_audio:
try:
tts = _get_tts()
mp3 = tts.synthesize(text)
audio_b64 = base64.b64encode(mp3).decode()
except Exception as e:
log = logging.getLogger("tebyan")
log.warning("[voice/chat] TTS failed: %s", e)
return {"query": query, "text": text, "audio_base64": audio_b64}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Risk Prediction API
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class RiskRequest(BaseModel):
findings: list[dict] # same format as analysis findings
@lru_cache(maxsize=1)
def _get_risk_engine():
from services.risk import RiskEngine
return RiskEngine()
@app.post("/api/risk")
async def predict_risk(req: RiskRequest):
"""
Evidence-based multi-condition health risk scoring.
Returns scored risks for diabetes, cardiovascular, anemia, kidney, liver, thyroid.
ML model override when models/ directory contains trained .pkl files.
"""
if not req.findings:
raise HTTPException(status_code=422, detail="findings list is required")
engine = _get_risk_engine()
report = engine.assess(req.findings)
return {
"risks": [
{
"condition": r.condition,
"score": r.score,
"level": r.level,
"confidence": r.confidence,
"label_ar": r.label_ar,
"factors": r.factors,
"recommendation": r.recommendation,
"source": r.source,
}
for r in report.risks
],
"top_risk": {
"condition": report.top_risk.condition,
"score": report.top_risk.score,
"level": report.top_risk.level,
"label_ar": report.top_risk.label_ar,
} if report.top_risk else None,
"overall_ar": report.overall_ar,
"features_used": report.features_used,
}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Compare Summary โ€” Groq-generated analysis of two lab results
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class CompareSummaryRequest(BaseModel):
findings_a: list[dict]
findings_b: list[dict]
summary_a: str = ""
summary_b: str = ""
date_a: str = ""
date_b: str = ""
@app.post("/api/compare/summary")
async def compare_summary(req: CompareSummaryRequest):
router = load_router()
def fmt_findings(findings: list[dict]) -> str:
lines = []
for f in findings:
status_ar = {"high": "ู…ุฑุชูุน", "low": "ู…ู†ุฎูุถ", "normal": "ุทุจูŠุนูŠ"}.get(f.get("status", ""), "")
lines.append(f" {f.get('name','')}: {f.get('value','')} {f.get('unit','')} [{status_ar}]")
return "\n".join(lines) if lines else "ู„ุง ุชูˆุฌุฏ ุจูŠุงู†ุงุช"
prompt = f"""ุฃู†ุช ุทุจูŠุจ ู…ุชุฎุตุต. ู‚ุงุฑู† ุจูŠู† ู†ุชูŠุฌุชูŠู† ู„ุชุญุงู„ูŠู„ ู…ุฎุจุฑูŠุฉ ู„ู†ูุณ ุงู„ู…ุฑูŠุถ.
ุงู„ุชุญู„ูŠู„ ุงู„ุฃูˆู„ ({req.date_a or 'ุงู„ุณุงุจู‚'}):
{fmt_findings(req.findings_a)}
ุงู„ุชุญู„ูŠู„ ุงู„ุซุงู†ูŠ ({req.date_b or 'ุงู„ุญุงู„ูŠ'}):
{fmt_findings(req.findings_b)}
ุงูƒุชุจ ู…ู„ุฎุตุงู‹ ุทุจูŠุงู‹ ุฏู‚ูŠู‚ุงู‹ ุจุงู„ุนุฑุจูŠุฉ ููŠ 3-4 ุฌู…ู„ ูŠุดู…ู„:
1. ุฃุจุฑุฒ ุงู„ุชุบูŠุฑุงุช (ุชุญุณู‘ู† / ุชุฑุงุฌุน / ุซุจุงุช)
2. ู‡ู„ ุงู„ุงุชุฌุงู‡ ุงู„ุนุงู… ุฅูŠุฌุงุจูŠ ุฃู… ูŠุณุชุฏุนูŠ ู‚ู„ู‚ุงู‹
3. ุชูˆุตูŠุฉ ูˆุงุญุฏุฉ ู…ุญุฏุฏุฉ
ุฃุฌุจ ู…ุจุงุดุฑุฉ ุจุงู„ู†ุต ูู‚ุทุŒ ุจุฏูˆู† ุนู†ุงูˆูŠู† ุฃูˆ ู†ู‚ุงุท."""
try:
summary = router.generate(
[{"role": "user", "content": prompt}],
max_tokens=300, temperature=0.2, model_hint="analysis",
).strip()
except Exception as e:
summary = "ุชุนุฐู‘ุฑ ุชูˆู„ูŠุฏ ุงู„ู…ู„ุฎุต ุงู„ู…ู‚ุงุฑู†."
log.error("compare: Groq error: %s", e)
return {"summary": summary}
# build:1779660596