from textutils import ParagraphDocumentProcessor, DocumentProcessor from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModel import ollama import faiss import os import csv import re import pandas as pd from datetime import datetime class RAGPipeline: def __init__(self, model_name: str = "flaollama", model_orig: str = "mistral", docprocessor = ParagraphDocumentProcessor(), sentence_transformer_name: str = 'paraphrase-multilingual-MiniLM-L12-v2', numero_frammenti = 10 ) : self.model_name = model_name self.model_orig = model_orig self.docprocessor = docprocessor self.sentence_transformer_name = sentence_transformer_name self.sentence_transformer_model = SentenceTransformer( self.sentence_transformer_name ) self.numero_frammenti = numero_frammenti self.documenti = [] self.files_pdf =[] self.indice =False self.timestamp = datetime.now().strftime("%Y%m%d%H%M%S") ##attrributi_frammenti contiene una lista di frammenti con attribui es: ## ## ## self.attributi_frammenti = [] #elenco di dizionari di tutti i frammenti ##LISTA DI FRAMMENTI INDICIZZATI (lista di testi) self. frammenti_indicizzati = [] #sonoi testi dei vari frammenti #print(f"NUMERODI FRAMMENTIIIII {self.numero_frammenti} param {numero_frammenti}") @staticmethod def dump_excel(dizionario, filename ): """Salva un dizionario in un file Excel accodando i dati se il file esiste.""" file_esiste = os.path.isfile(filename) # Converti il dizionario in un DataFrame con una sola riga df_nuova_riga = pd.DataFrame([dizionario]) if file_esiste: # Carica il file Excel esistente df_esistente = pd.read_excel(filename, engine='openpyxl') # Concatenazione dei DataFrame df_finale = pd.concat([df_esistente, df_nuova_riga], ignore_index=True) else: # Se il file non esiste, il DataFrame finale è la nuova riga df_finale = df_nuova_riga # Salva il DataFrame finale nel file Excel df_finale.to_excel(filename, index=False, engine='openpyxl') def crea_indice(self): # Converte i documenti in vettori docId = 0 fraId = 0 for documento in self.documenti: frammenti = self.docprocessor.scomponi_in_frammenti(documento, self.numero_frammenti ) for frammento in frammenti: dizionario_frammenti = { 'timestamp': self.timestamp, 'id': f"{docId}-{fraId}", "documento": docId, "frammento": fraId, "nomefile": os.path.basename(self.files_pdf[docId]), 'testo_frammento':frammento } self.attributi_frammenti.append( dizionario_frammenti ) self.frammenti_indicizzati.append(frammento) fraId = fraId +1 docId = docId +1 self.doc_embeddings = self.sentence_transformer_model.encode(self.frammenti_indicizzati) # Creazione dell'indice Faiss dimension = self.doc_embeddings.shape[1] self.indice = faiss.IndexFlatL2(dimension) # Indice L2 (distanza euclidea) self.indice.add(self.doc_embeddings) def aggiungi_file_pdf(self, filename: str) : text= self.docprocessor.estrai_da_pdf(filename) self.documenti.append(text) self.files_pdf.append(filename) class Retriever: def __init__(self, indice , sentence_transformer_model , query : str, documenti =[], frammenti_indicizzati = [], #tutti i frammenti ?? attributi_frammenti = [] ##elenco attrivuti frammenti ): self.indice = indice self.sentence_transformer = sentence_transformer_model self.query = query self.documenti = documenti self.frammenti_indicizzati = frammenti_indicizzati self.attributi_frammenti = attributi_frammenti self.passaggi_rilevanti = [] ## documenti rilevanti per la query (recuperati) self.attributi_rilevanti = [] ## atteributi dei frammenti rilevanti def esegui_query(self, top_k = 5): # Embedding della query query_embedding = self.sentence_transformer.encode([self.query]) # Recupero dei documenti più simili distances, indices = self.indice.search(query_embedding, top_k) # documenti rilevanti e passaggi rilevanti self.passaggi_rilevanti = [self.frammenti_indicizzati[j] for j in indices[0]] #frammenti rilevanti self.attributi_rilevanti = [self.attributi_frammenti[j] for j in indices[0]] #passaggi rilevanti class ChatBot: def __init__(self, model_name: str = "flaollama", model_orig: str = "mistral" , model_system=( "Sei un esperto di diritto amministrativo che deve eseguire il " "controllo di regolarità amministrativa su un atto amministrativo di un comune italiano. " "Ti verranno forniti un atto amministrativo (determinazione dirigenziale) ed eventuali allegati, questi sono forniti come frammenti rilevanti. " "Utilizza solamente i frammenti che ti verranno inviati." "Rispondi in Italiano usando al massimo 50 parole. " "Basati esclusivamente sul seguente testo: " ), dump_filename="dump.csv" ): self.model_name = model_name self.model_orig = model_orig self.model_system = model_system self.dump_filename = dump_filename ollama.create( model=model_name, from_=model_orig, system = model_system ) def dump_excel(self, dizionario, filename ): RAGPipeline.dump_excel(dizionario=dizionario, filename=filename) def dump_csv(self,dizionario): """Salva un dizionario in un file CSV con separatore '|' accodando i dati se il file esiste.""" file_esiste = os.path.isfile(self.dump_filename) with open(self.dump_filename, mode="a", newline="", encoding="utf-8") as file: writer = csv.writer(file, delimiter="|") # Scrive l'intestazione solo se il file viene creato ex novo if not file_esiste: writer.writerow(dizionario.keys()) # Scrive i valori come una nuova riga writer.writerow([str(val).replace("\n", "").replace("\r", "").replace("\t", "") for val in dizionario.values()]) def pulisci_risposta(self, response: str): retval=re.sub(r".*?", "", response, flags=re.DOTALL).strip() retval = retval.replace("\n", " ").replace("\t", " ").replace("|", " ") return retval def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str: prompt = f"ISTRUZIONI: {istruzioni}\n\nCONTESTO:\n" + "\n".join(frammenti) + f"\n\nDOMANDA: {domanda}" response = ollama.chat(model=self.model_orig, messages=[ {"role": "user", "content": prompt} ], options= {'num_ctx' : 4096 }) return response["message"]["content"] def generate(self, relevant_docs = [], attributi_frammenti_rilevanti = [], query="", istruzioni :str = None ##togliere ): i = 0 #print (f"DIMESIONE FILE {len(relevant_files)}") #print (f"DIMESIONE TESTI {len(relevant_docs)}") prompt="" for documento in relevant_docs: prompt += f"{relevant_docs[i]} " i = i+1 #"{context}\n\nDomanda: {query}" if istruzioni is not None: query = query + " Istruzioni: " + istruzioni prompt +=f"\n\nDomanda:{query} \n\n" #print(prompt) rersponse = ollama.generate( model=self.model_name, prompt=prompt , options= {'num_ctx' : 4096 } ) return rersponse['response']