| """ |
| PDF Parser Service for RAG Chatbot |
| Extracts text from PDF and splits into chunks for indexing |
| """ |
|
|
| import pypdfium2 as pdfium |
| from typing import List, Dict, Optional |
| import re |
| from dataclasses import dataclass |
|
|
|
|
| @dataclass |
| class PDFChunk: |
| """Represents a chunk of text from PDF""" |
| text: str |
| page_number: int |
| chunk_index: int |
| metadata: Dict |
|
|
|
|
| class PDFParser: |
| """Parse PDF files and prepare for RAG indexing""" |
|
|
| def __init__( |
| self, |
| chunk_size: int = 500, |
| chunk_overlap: int = 50, |
| min_chunk_size: int = 50 |
| ): |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.min_chunk_size = min_chunk_size |
|
|
| def extract_text_from_pdf(self, pdf_path: str) -> Dict[int, str]: |
| """ |
| Extract text from PDF file |
| |
| Args: |
| pdf_path: Path to PDF file |
| |
| Returns: |
| Dictionary mapping page number to text content |
| """ |
| pdf_text = {} |
|
|
| try: |
| pdf = pdfium.PdfDocument(pdf_path) |
|
|
| for page_num in range(len(pdf)): |
| page = pdf[page_num] |
| textpage = page.get_textpage() |
| text = textpage.get_text_range() |
|
|
| |
| text = self._clean_text(text) |
| pdf_text[page_num + 1] = text |
|
|
| return pdf_text |
|
|
| except Exception as e: |
| raise Exception(f"Error reading PDF: {str(e)}") |
|
|
| def _clean_text(self, text: str) -> str: |
| """Clean extracted text""" |
| |
| text = re.sub(r'\s+', ' ', text) |
|
|
| |
| text = text.replace('\x00', '') |
|
|
| return text.strip() |
|
|
| def chunk_text(self, text: str, page_number: int) -> List[PDFChunk]: |
| """ |
| Split text into overlapping chunks |
| |
| Args: |
| text: Text to chunk |
| page_number: Page number this text came from |
| |
| Returns: |
| List of PDFChunk objects |
| """ |
| |
| words = text.split() |
|
|
| if len(words) < self.min_chunk_size: |
| |
| if len(words) > 0: |
| return [PDFChunk( |
| text=text, |
| page_number=page_number, |
| chunk_index=0, |
| metadata={'page': page_number, 'chunk': 0} |
| )] |
| return [] |
|
|
| chunks = [] |
| chunk_index = 0 |
| start = 0 |
|
|
| while start < len(words): |
| |
| end = min(start + self.chunk_size, len(words)) |
| chunk_words = words[start:end] |
| chunk_text = ' '.join(chunk_words) |
|
|
| chunks.append(PDFChunk( |
| text=chunk_text, |
| page_number=page_number, |
| chunk_index=chunk_index, |
| metadata={ |
| 'page': page_number, |
| 'chunk': chunk_index, |
| 'start_word': start, |
| 'end_word': end |
| } |
| )) |
|
|
| chunk_index += 1 |
|
|
| |
| start = end - self.chunk_overlap |
|
|
| |
| if start >= len(words) - self.min_chunk_size: |
| break |
|
|
| return chunks |
|
|
| def parse_pdf( |
| self, |
| pdf_path: str, |
| document_metadata: Optional[Dict] = None |
| ) -> List[PDFChunk]: |
| """ |
| Parse entire PDF into chunks |
| |
| Args: |
| pdf_path: Path to PDF file |
| document_metadata: Additional metadata for the document |
| |
| Returns: |
| List of all chunks from the PDF |
| """ |
| |
| pages_text = self.extract_text_from_pdf(pdf_path) |
|
|
| |
| all_chunks = [] |
| for page_num, text in pages_text.items(): |
| chunks = self.chunk_text(text, page_num) |
|
|
| |
| if document_metadata: |
| for chunk in chunks: |
| chunk.metadata.update(document_metadata) |
|
|
| all_chunks.extend(chunks) |
|
|
| return all_chunks |
|
|
| def parse_pdf_bytes( |
| self, |
| pdf_bytes: bytes, |
| document_metadata: Optional[Dict] = None |
| ) -> List[PDFChunk]: |
| """ |
| Parse PDF from bytes (for uploaded files) |
| |
| Args: |
| pdf_bytes: PDF file as bytes |
| document_metadata: Additional metadata |
| |
| Returns: |
| List of chunks |
| """ |
| import tempfile |
| import os |
|
|
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: |
| tmp.write(pdf_bytes) |
| tmp_path = tmp.name |
|
|
| try: |
| chunks = self.parse_pdf(tmp_path, document_metadata) |
| return chunks |
| finally: |
| |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| def get_pdf_info(self, pdf_path: str) -> Dict: |
| """ |
| Get basic info about PDF |
| |
| Args: |
| pdf_path: Path to PDF file |
| |
| Returns: |
| Dictionary with PDF information |
| """ |
| try: |
| pdf = pdfium.PdfDocument(pdf_path) |
|
|
| info = { |
| 'num_pages': len(pdf), |
| 'file_path': pdf_path, |
| } |
|
|
| return info |
|
|
| except Exception as e: |
| raise Exception(f"Error reading PDF info: {str(e)}") |
|
|
|
|
| class PDFIndexer: |
| """Index PDF chunks into RAG system""" |
|
|
| def __init__(self, embedding_service, qdrant_service, documents_collection): |
| self.embedding_service = embedding_service |
| self.qdrant_service = qdrant_service |
| self.documents_collection = documents_collection |
| self.parser = PDFParser() |
|
|
| def index_pdf( |
| self, |
| pdf_path: str, |
| document_id: str, |
| document_metadata: Optional[Dict] = None |
| ) -> Dict: |
| """ |
| Index entire PDF into RAG system |
| |
| Args: |
| pdf_path: Path to PDF file |
| document_id: Unique ID for this document |
| document_metadata: Additional metadata (title, author, etc.) |
| |
| Returns: |
| Indexing results |
| """ |
| |
| chunks = self.parser.parse_pdf(pdf_path, document_metadata) |
|
|
| |
| indexed_count = 0 |
| chunk_ids = [] |
|
|
| for chunk in chunks: |
| |
| chunk_id = f"{document_id}_p{chunk.page_number}_c{chunk.chunk_index}" |
|
|
| |
| embedding = self.embedding_service.encode_text(chunk.text) |
|
|
| |
| metadata = { |
| 'text': chunk.text, |
| 'document_id': document_id, |
| 'page': chunk.page_number, |
| 'chunk_index': chunk.chunk_index, |
| 'source': 'pdf', |
| **chunk.metadata |
| } |
|
|
| |
| self.qdrant_service.index_data( |
| doc_id=chunk_id, |
| embedding=embedding, |
| metadata=metadata |
| ) |
|
|
| chunk_ids.append(chunk_id) |
| indexed_count += 1 |
|
|
| |
| doc_info = { |
| 'document_id': document_id, |
| 'type': 'pdf', |
| 'file_path': pdf_path, |
| 'num_chunks': indexed_count, |
| 'chunk_ids': chunk_ids, |
| 'metadata': document_metadata or {}, |
| 'pdf_info': self.parser.get_pdf_info(pdf_path) |
| } |
| self.documents_collection.insert_one(doc_info) |
|
|
| return { |
| 'success': True, |
| 'document_id': document_id, |
| 'chunks_indexed': indexed_count, |
| 'chunk_ids': chunk_ids[:5] |
| } |
|
|
| def index_pdf_bytes( |
| self, |
| pdf_bytes: bytes, |
| document_id: str, |
| filename: str, |
| document_metadata: Optional[Dict] = None |
| ) -> Dict: |
| """ |
| Index PDF from bytes (for uploaded files) |
| |
| Args: |
| pdf_bytes: PDF file as bytes |
| document_id: Unique ID for this document |
| filename: Original filename |
| document_metadata: Additional metadata |
| |
| Returns: |
| Indexing results |
| """ |
| |
| metadata = document_metadata or {} |
| metadata['filename'] = filename |
|
|
| chunks = self.parser.parse_pdf_bytes(pdf_bytes, metadata) |
|
|
| |
| indexed_count = 0 |
| chunk_ids = [] |
|
|
| for chunk in chunks: |
| |
| chunk_id = f"{document_id}_p{chunk.page_number}_c{chunk.chunk_index}" |
|
|
| |
| embedding = self.embedding_service.encode_text(chunk.text) |
|
|
| |
| metadata = { |
| 'text': chunk.text, |
| 'document_id': document_id, |
| 'page': chunk.page_number, |
| 'chunk_index': chunk.chunk_index, |
| 'source': 'pdf', |
| 'filename': filename, |
| **chunk.metadata |
| } |
|
|
| |
| self.qdrant_service.index_data( |
| doc_id=chunk_id, |
| embedding=embedding, |
| metadata=metadata |
| ) |
|
|
| chunk_ids.append(chunk_id) |
| indexed_count += 1 |
|
|
| |
| doc_info = { |
| 'document_id': document_id, |
| 'type': 'pdf', |
| 'filename': filename, |
| 'num_chunks': indexed_count, |
| 'chunk_ids': chunk_ids, |
| 'metadata': metadata |
| } |
| self.documents_collection.insert_one(doc_info) |
|
|
| return { |
| 'success': True, |
| 'document_id': document_id, |
| 'filename': filename, |
| 'chunks_indexed': indexed_count, |
| 'chunk_ids': chunk_ids[:5] |
| } |
|
|