AURA-chatbot / ragpipeline.py
Flavio Casadei Della Chiesa
versione aggiornata con ollama
ad14e53
Raw
History Blame
8.15 kB
from textutils import ParagraphDocumentProcessor, DocumentProcessor
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import ollama
import faiss
import os
import csv
import re
import pandas as pd
from datetime import datetime
class RAGPipeline:
def __init__(self,
model_name: str = "flaollama",
model_orig: str = "mistral",
docprocessor = ParagraphDocumentProcessor(),
sentence_transformer_name: str = 'paraphrase-multilingual-MiniLM-L12-v2',
numero_frammenti = 10
) :
self.model_name = model_name
self.model_orig = model_orig
self.docprocessor = docprocessor
self.sentence_transformer_name = sentence_transformer_name
self.sentence_transformer_model = SentenceTransformer( self.sentence_transformer_name )
self.numero_frammenti = numero_frammenti
self.documenti = []
self.files_pdf =[]
self.indice =False
self.timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
##attrributi_frammenti contiene una lista di frammenti con attribui es:
##
##
##
self.attributi_frammenti = [] #elenco di dizionari di tutti i frammenti
##LISTA DI FRAMMENTI INDICIZZATI (lista di testi)
self. frammenti_indicizzati = [] #sonoi testi dei vari frammenti
#print(f"NUMERODI FRAMMENTIIIII {self.numero_frammenti} param {numero_frammenti}")
@staticmethod
def dump_excel(dizionario, filename ):
"""Salva un dizionario in un file Excel accodando i dati se il file esiste."""
file_esiste = os.path.isfile(filename)
# Converti il dizionario in un DataFrame con una sola riga
df_nuova_riga = pd.DataFrame([dizionario])
if file_esiste:
# Carica il file Excel esistente
df_esistente = pd.read_excel(filename, engine='openpyxl')
# Concatenazione dei DataFrame
df_finale = pd.concat([df_esistente, df_nuova_riga], ignore_index=True)
else:
# Se il file non esiste, il DataFrame finale è la nuova riga
df_finale = df_nuova_riga
# Salva il DataFrame finale nel file Excel
df_finale.to_excel(filename, index=False, engine='openpyxl')
def crea_indice(self):
# Converte i documenti in vettori
docId = 0
fraId = 0
for documento in self.documenti:
frammenti = self.docprocessor.scomponi_in_frammenti(documento, self.numero_frammenti )
for frammento in frammenti:
dizionario_frammenti = {
'timestamp': self.timestamp,
'id': f"{docId}-{fraId}",
"documento": docId,
"frammento": fraId,
"nomefile": os.path.basename(self.files_pdf[docId]),
'testo_frammento':frammento
}
self.attributi_frammenti.append( dizionario_frammenti )
self.frammenti_indicizzati.append(frammento)
fraId = fraId +1
docId = docId +1
self.doc_embeddings = self.sentence_transformer_model.encode(self.frammenti_indicizzati)
# Creazione dell'indice Faiss
dimension = self.doc_embeddings.shape[1]
self.indice = faiss.IndexFlatL2(dimension) # Indice L2 (distanza euclidea)
self.indice.add(self.doc_embeddings)
def aggiungi_file_pdf(self, filename: str) :
text= self.docprocessor.estrai_da_pdf(filename)
self.documenti.append(text)
self.files_pdf.append(filename)
class Retriever:
def __init__(self,
indice ,
sentence_transformer_model ,
query : str,
documenti =[],
frammenti_indicizzati = [], #tutti i frammenti ??
attributi_frammenti = [] ##elenco attrivuti frammenti
):
self.indice = indice
self.sentence_transformer = sentence_transformer_model
self.query = query
self.documenti = documenti
self.frammenti_indicizzati = frammenti_indicizzati
self.attributi_frammenti = attributi_frammenti
self.passaggi_rilevanti = [] ## documenti rilevanti per la query (recuperati)
self.attributi_rilevanti = [] ## atteributi dei frammenti rilevanti
def esegui_query(self, top_k = 5):
# Embedding della query
query_embedding = self.sentence_transformer.encode([self.query])
# Recupero dei documenti più simili
distances, indices = self.indice.search(query_embedding, top_k)
# documenti rilevanti e passaggi rilevanti
self.passaggi_rilevanti = [self.frammenti_indicizzati[j] for j in indices[0]] #frammenti rilevanti
self.attributi_rilevanti = [self.attributi_frammenti[j] for j in indices[0]] #passaggi rilevanti
class ChatBot:
def pulisci_risposta(self,
response: str):
retval=re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL).strip()
retval = retval.replace("\n", " ").replace("\t", " ").replace("|", " ")
return retval
def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str:
raise NotImplementedError("Questo metodo deve essere implementato nelle sottoclassi.")
def generate(self, relevant_docs = [], attributi_frammenti_rilevanti = [], query="", istruzioni :str = None ) -> str:
raise NotImplementedError("Questo metodo deve essere implementato nelle sottoclassi.")
class OllamaChatbot(ChatBot):
def __init__(self,
model_name: str = "flaollama",
model_orig: str = "mistral" ,
model_system=(
"""Sei un esperto di diritto amministrativo che deve eseguire il
controllo di regolarità amministrativa su un atto amministrativo di un comune italiano.
Ti verranno forniti un atto amministrativo (determinazione dirigenziale) ed eventuali allegati, questi sono forniti come frammenti rilevanti.
Utilizza solamente i frammenti che ti verranno inviati.
Rispondi in Italiano usando al massimo 50 parole.
Basati esclusivamente sul seguente testo: """
),
dump_filename="dump.csv"
):
self.model_name = model_name
self.model_orig = model_orig
self.model_system = model_system
self.dump_filename = dump_filename
ollama.create(
model=model_name,
from_=model_orig,
system = model_system
)
def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str:
prompt = f"ISTRUZIONI: {istruzioni}\n\nCONTESTO:\n" + "\n".join(frammenti) + f"\n\nDOMANDA: {domanda}"
response = ollama.chat(model=self.model_orig, messages=[
{"role": "user", "content": prompt}
], options= {'num_ctx' : 4096 })
return response["message"]["content"]
def generate(self, relevant_docs = [], attributi_frammenti_rilevanti = [], query="", istruzioni :str = None ):
i = 0
#print (f"DIMESIONE FILE {len(relevant_files)}")
#print (f"DIMESIONE TESTI {len(relevant_docs)}")
prompt=""
for documento in relevant_docs:
prompt += f"{relevant_docs[i]} "
i = i+1
#"{context}\n\nDomanda: {query}"
if istruzioni is not None:
query = query + " Istruzioni: " + istruzioni
prompt +=f"\n\nDomanda:{query} \n\n"
#print(prompt)
rersponse = ollama.generate(
model=self.model_name,
prompt=prompt
, options= {'num_ctx' : 4096 }
)
return rersponse['response']