""" 3-layer RAG + Claude Haiku 답변 생성. Layer 1 (집계): aggregates.json — analysis_summary 텍스트, 카테고리/월별 카운트 Layer 2 (카테고리): category_summary.json — primary_cat별 대표 게시글 Layer 3 (검색): ChromaDB 의미 검색 질문마다: 1. ChromaDB에서 관련 게시글 Top-k 검색 2. 집계·카테고리 요약은 시스템 프롬프트에 주입 (prompt cache 적용) 3. Haiku가 최종 답변 생성 (스트리밍) """ from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Iterator import chromadb from anthropic import Anthropic from chromadb.utils import embedding_functions ROOT = Path(__file__).resolve().parent CHROMA_DIR = ROOT / "chroma_db" EMBED_MODEL = "jhgan/ko-sroberta-multitask" COLLECTION = "voc" HAIKU = "claude-haiku-4-5-20251001" @dataclass class Hit: id: str title: str text: str month: str primary_cat: str posvote: int comment_count: int concern_score: int distance: float class Chatbot: def __init__(self) -> None: if not CHROMA_DIR.exists(): raise RuntimeError( f"ChromaDB 없음: {CHROMA_DIR}. `python build_index.py` 먼저 실행." ) self._client = chromadb.PersistentClient(path=str(CHROMA_DIR)) embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction( model_name=EMBED_MODEL ) self._collection = self._client.get_collection( COLLECTION, embedding_function=embed_fn ) self._aggregates = json.loads( (ROOT / "aggregates.json").read_text(encoding="utf-8") ) self._category = json.loads( (ROOT / "category_summary.json").read_text(encoding="utf-8") ) self._anthropic = Anthropic() def retrieve(self, question: str, k: int = 8) -> list[Hit]: res = self._collection.query(query_texts=[question], n_results=k) out: list[Hit] = [] for i, doc in enumerate(res["documents"][0]): md = res["metadatas"][0][i] title, _, text = doc.partition("\n") out.append(Hit( id=res["ids"][0][i], title=title, text=text, month=str(md.get("month", "")), primary_cat=str(md.get("primary_cat", "")), posvote=int(md.get("posvote", 0) or 0), comment_count=int(md.get("comment_count", 0) or 0), concern_score=int(md.get("concern_score", 0) or 0), distance=float(res["distances"][0][i]), )) return out def _system_text(self) -> str: summary = self._aggregates.get("analysis_summary_txt", "")[:2500] cats = "\n".join( f"- {k}: {v['total']}건" for k, v in list(self._category.items())[:15] ) period = self._aggregates.get("period", {}) total = self._aggregates.get("total_posts_indexed", 0) return ( "너는 호서대학교 IR(Institutional Research) 챗봇이다. " "호서대 에브리타임 자유게시판 2년치 게시글 분석 데이터를 바탕으로 " "교수·교직원의 질문에 답한다.\n\n" f"[인덱스 범위]\n- 기간: {period.get('start','')} ~ {period.get('end','')}\n" f"- 총 게시글(인덱싱): {total:,}건\n\n" "[답변 원칙]\n" "- 아래 컨텍스트와 대화에서 주어진 게시글만 근거로 답한다. 근거 없는 주장 금지.\n" "- 에브리타임은 익명이라 학년·학과 메타데이터가 없다. 학년 관련 질문이면 " "(1) 시기 기반 추론(3월 수강신청·신입생 OT 시즌=1학년 비중↑), " "(2) '1학년/새내기/신입생' 자기식별 키워드 포함 게시글로만 근사 가능함을 답변에 반드시 명시한다.\n" "- 인용은 [id:407xxxxxx] 형식으로 본문에 삽입.\n" "- 답변은 3~6문장 간결체. 필요시 불릿 사용.\n" "- 학생 개인 식별·조롱은 금지.\n\n" f"[전체 분석 요약]\n{summary}\n\n" f"[카테고리 분포 (상위 15)]\n{cats}\n" ) def _user_text(self, question: str, hits: list[Hit]) -> str: posts = "" for h in hits: posts += ( f"\n---\n[id:{h.id}] {h.month} · {h.primary_cat} · " f"추천{h.posvote}·댓글{h.comment_count}\n" f"제목: {h.title}\n본문: {h.text[:400]}\n" ) return ( f"## 교수님 질문\n{question}\n\n" f"## 질문 관련 게시글 (의미 검색 Top-{len(hits)})\n{posts}\n\n" "위 컨텍스트와 시스템 프롬프트의 분석 요약을 바탕으로 답하라. " "근거는 [id:...] 형식으로 인용." ) def answer_stream( self, question: str, k: int = 8 ) -> tuple[list[Hit], Iterator[str]]: hits = self.retrieve(question, k) system_blocks = [ { "type": "text", "text": self._system_text(), "cache_control": {"type": "ephemeral"}, } ] user_content = self._user_text(question, hits) def gen() -> Iterator[str]: with self._anthropic.messages.stream( model=HAIKU, max_tokens=1024, system=system_blocks, messages=[{"role": "user", "content": user_content}], ) as stream: for text in stream.text_stream: yield text return hits, gen()