RAG
Last updated
Last updated
For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a two-stage RAG pipeline:
Offline stage: Preprocess and index documents ("building the index").
Online stage: Given a question, generate answers by retrieving the most relevant context.
We create three Nodes:
class ChunkDocs(BatchNode):
def prep(self, shared):
# A list of file paths in shared["files"]. We process each file.
return shared["files"]
def exec(self, filepath):
# read file content. In real usage, do error handling.
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# chunk by 100 chars each
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks
def post(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of chunk-lists, one per file.
# flatten them all into a single list of chunks.
all_chunks = []
for chunk_list in exec_res_list:
all_chunks.extend(chunk_list)
shared["all_chunks"] = all_chunks
class ChunkDocs extends BatchNode {
prep(shared: any): string[] {
// A list of file paths in shared["files"]. We process each file.
return shared['files']
}
exec(filepath: string): string[] {
// read file content. In real usage, do error handling.
const text = fs.readFileSync(filepath, 'utf-8')
// chunk by 100 chars each
const chunks: string[] = []
const size = 100
for (let i = 0; i < text.length; i += size) {
chunks.push(text.slice(i, i + size))
}
return chunks
}
post(shared: any, prepRes: string[], execResList: string[][]): void {
// execResList is a list of chunk-lists, one per file.
// flatten them all into a single list of chunks.
const allChunks: string[] = []
for (const chunkList of execResList) {
allChunks.push(...chunkList)
}
shared['all_chunks'] = allChunks
}
}
class EmbedDocs(BatchNode):
def prep(self, shared):
return shared["all_chunks"]
def exec(self, chunk):
return get_embedding(chunk)
def post(self, shared, prep_res, exec_res_list):
# Store the list of embeddings.
shared["all_embeds"] = exec_res_list
print(f"Total embeddings: {len(exec_res_list)}")
class EmbedDocs extends BatchNode {
prep(shared: any): string[] {
return shared['all_chunks']
}
exec(chunk: string): number[] {
return getEmbedding(chunk)
}
post(shared: any, prepRes: string[], execResList: number[][]): void {
// Store the list of embeddings.
shared['all_embeds'] = execResList
console.log(`Total embeddings: ${execResList.length}`)
}
}
class StoreIndex(Node):
def prep(self, shared):
# We'll read all embeds from shared.
return shared["all_embeds"]
def exec(self, all_embeds):
# Create a vector index (faiss or other DB in real usage).
index = create_index(all_embeds)
return index
def post(self, shared, prep_res, index):
shared["index"] = index
class StoreIndex extends Node {
prep(shared: any): number[][] {
// We'll read all embeds from shared.
return shared['all_embeds']
}
exec(allEmbeds: number[][]): any {
// Create a vector index (faiss or other DB in real usage).
const index = createIndex(allEmbeds)
return index
}
post(shared: any, prepRes: number[][], index: any): void {
shared['index'] = index
}
}
# Wire them in sequence
chunk_node = ChunkDocs()
embed_node = EmbedDocs()
store_node = StoreIndex()
chunk_node >> embed_node >> store_node
OfflineFlow = Flow(start=chunk_node)
// Wire them in sequence
const chunkNode = new ChunkDocs()
const embedNode = new EmbedDocs()
const storeNode = new StoreIndex()
chunkNode.next(embedNode).next(storeNode)
const OfflineFlow = new Flow(chunkNode)
Usage example:
shared = {
"files": ["doc1.txt", "doc2.txt"], # any text files
}
OfflineFlow.run(shared)
const shared = {
files: ['doc1.txt', 'doc2.txt'], // any text files
}
OfflineFlow.run(shared)
We have 3 nodes:
EmbedQuery
β embeds the userβs question.
RetrieveDocs
β retrieves top chunk from the index.
GenerateAnswer
β calls the LLM with the question + chunk to produce the final answer.
class EmbedQuery(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
return get_embedding(question)
def post(self, shared, prep_res, q_emb):
shared["q_emb"] = q_emb
class EmbedQuery extends Node {
prep(shared: any): string {
return shared['question']
}
exec(question: string): number[] {
return getEmbedding(question)
}
post(shared: any, prepRes: string, qEmb: number[]): void {
shared['q_emb'] = qEmb
}
}
class RetrieveDocs(Node):
def prep(self, shared):
# We'll need the query embedding, plus the offline index/chunks
return shared["q_emb"], shared["index"], shared["all_chunks"]
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
best_id = I[0][0]
relevant_chunk = chunks[best_id]
return relevant_chunk
def post(self, shared, prep_res, relevant_chunk):
shared["retrieved_chunk"] = relevant_chunk
print("Retrieved chunk:", relevant_chunk[:60], "...")
class RetrieveDocs extends Node {
prep(shared: any): [number[], any, string[]] {
// We'll need the query embedding, plus the offline index/chunks
return [shared['q_emb'], shared['index'], shared['all_chunks']]
}
exec(inputs: [number[], any, string[]]): string {
const [qEmb, index, chunks] = inputs
const [I, D] = searchIndex(index, qEmb, 1)
const bestId = I[0][0]
const relevantChunk = chunks[bestId]
return relevantChunk
}
post(shared: any, prepRes: [number[], any, string[]], relevantChunk: string): void {
shared['retrieved_chunk'] = relevantChunk
console.log(`Retrieved chunk: ${relevantChunk.slice(0, 60)}...`)
}
}
class GenerateAnswer(Node):
def prep(self, shared):
return shared["question"], shared["retrieved_chunk"]
def exec(self, inputs):
question, chunk = inputs
prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, answer):
shared["answer"] = answer
print("Answer:", answer)
class GenerateAnswer extends Node {
prep(shared: any): [string, string] {
return [shared['question'], shared['retrieved_chunk']]
}
exec(inputs: [string, string]): string {
const [question, chunk] = inputs
const prompt = `Question: ${question}\nContext: ${chunk}\nAnswer:`
return callLLM(prompt)
}
post(shared: any, prepRes: [string, string], answer: string): void {
shared['answer'] = answer
console.log(`Answer: ${answer}`)
}
}
embed_qnode = EmbedQuery()
retrieve_node = RetrieveDocs()
generate_node = GenerateAnswer()
embed_qnode >> retrieve_node >> generate_node
OnlineFlow = Flow(start=embed_qnode)
const embedQNode = new EmbedQuery()
const retrieveNode = new RetrieveDocs()
const generateNode = new GenerateAnswer()
embedQNode.next(retrieveNode).next(generateNode)
const OnlineFlow = new Flow(embedQNode)
Usage example:
# Suppose we already ran OfflineFlow and have:
# shared["all_chunks"], shared["index"], etc.
shared["question"] = "Why do people like cats?"
OnlineFlow.run(shared)
# final answer in shared["answer"]
// Suppose we already ran OfflineFlow and have:
// shared["all_chunks"], shared["index"], etc.
shared['question'] = 'Why do people like cats?'
OnlineFlow.run(shared)
// final answer in shared["answer"]
ChunkDocs
β raw text.
EmbedDocs
β each chunk.
StoreIndex
β stores embeddings into a .