import asyncio
import os # Assuming file operations
from caskada import Node, Flow, Memory, ParallelFlow
# Assume get_embedding, create_index, search_index are defined elsewhere
# async def get_embedding(text: str) -> list[float]: ...
# def create_index(embeddings: list[list[float]]) -> Any: ... # Returns index object
# def search_index(index: Any, query_embedding: list[float], top_k: int) -> tuple[list[list[int]], list[list[float]]]: ...
# --- Stage 1: Offline Indexing Nodes ---
# 1a. Node to trigger chunking for each file
class TriggerChunkingNode(Node):
async def prep(self, memory):
return memory.files or []
async def exec(self, files: list):
# Optional: could return file count or validate paths
return len(files)
async def post(self, memory, files: list, file_count: int):
print(f"Triggering chunking for {file_count} files.")
memory.all_chunks = [] # Initialize chunk store
memory.chunk_metadata = [] # Store metadata like source file
for index, filepath in enumerate(files):
if os.path.exists(filepath): # Basic check
self.trigger('chunk_file', { "filepath": filepath, "file_index": index })
else:
print(f"Warning: File not found {filepath}")
# Trigger next major step after attempting all files
self.trigger('embed_chunks')
# 1b. Node to chunk a single file
class ChunkFileNode(Node):
async def prep(self, memory):
# Read filepath from local memory
return memory.filepath, memory.file_index
async def exec(self, prep_res):
filepath, file_index = prep_res
print(f"Chunking {filepath}")
try:
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# Simple fixed-size chunking
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks, filepath # Pass filepath for metadata
except Exception as e:
print(f"Error chunking {filepath}: {e}")
return [], filepath # Return empty list on error
async def post(self, memory, prep_res, exec_res):
chunks, filepath = exec_res
file_index = prep_res[1]
# Append chunks and their source metadata to global lists
# Note: If using ParallelFlow, direct append might lead to race conditions.
# Consider storing per-file results then combining in the next step.
start_index = len(memory.all_chunks)
memory.all_chunks.extend(chunks)
for i, chunk in enumerate(chunks):
memory.chunk_metadata.append({"source": filepath, "chunk_index_in_file": i, "global_chunk_index": start_index + i})
# This node doesn't trigger further processing for this specific file branch
# 1c. Node to trigger embedding for each chunk
class TriggerEmbeddingNode(Node):
async def prep(self, memory):
# This node runs after all 'chunk_file' triggers are processed by the Flow
return memory.all_chunks or []
async def exec(self, chunks: list):
return len(chunks)
async def post(self, memory, chunks: list, chunk_count: int):
print(f"Triggering embedding for {chunk_count} chunks.")
memory.all_embeds = [None] * chunk_count # Pre-allocate list for parallel writes
for index, chunk in enumerate(chunks):
# Pass chunk and its global index via forkingData
self.trigger('embed_chunk', { "chunk": chunk, "global_index": index })
# Trigger storing index after all embedding triggers are fired
self.trigger('store_index')
# 1d. Node to embed a single chunk
class EmbedChunkNode(Node):
async def prep(self, memory):
# Read chunk and global index from local memory
return memory.chunk, memory.global_index
async def exec(self, prep_res):
chunk, index = prep_res
# print(f"Embedding chunk {index}") # Can be noisy
return await get_embedding(chunk), index # Pass index through
async def post(self, memory, prep_res, exec_res):
embedding, index = exec_res
# Store embedding at the correct index in the pre-allocated list
memory.all_embeds[index] = embedding
# This node doesn't trigger further processing for this chunk branch
import * as fs from 'fs'
import { Flow, Memory, Node, ParallelFlow } from 'caskada'
// Assume getEmbedding and createIndex/searchIndex are defined elsewhere
declare function getEmbedding(text: string): Promise<number[]>
declare function createIndex(embeddings: number[][]): any // Returns index object
declare function searchIndex(index: any, queryEmbedding: number[], topK: number): [number[][], number[][]] // Returns [[ids]], [[distances]]
// --- Stage 1: Offline Indexing Nodes ---
// 1a. Node to trigger chunking for each file
class TriggerChunkingNode extends Node {
async prep(memory): Promise<string[]> {
return memory.files ?? [] // Expects memory.files = ['doc1.txt', ...]
}
async exec(files: string[]): Promise<number> {
return files.length
}
async post(memory, prepRes: string[], fileCount: number): Promise<void> {
console.log(`Triggering chunking for ${fileCount} files.`)
memory.all_chunks = [] // Initialize chunk store
;(prepRes as string[]).forEach((filepath, index) => {
this.trigger('chunk_file', { filepath, index }) // Pass filepath via local memory
})
this.trigger('embed_chunks') // Trigger embedding after all files are processed
}
}
// 1b. Node to chunk a single file
class ChunkFileNode extends Node {
async prep(memory): Promise<{ filepath: string; index: number }> {
return { filepath: memory.filepath, index: memory.index } // Read from local memory
}
async exec(prepRes: { filepath: string; index: number }): Promise<string[]> {
console.log(`Chunking ${prepRes.filepath}`)
const text = fs.readFileSync(prepRes.filepath, 'utf-8')
const chunks: string[] = []
const size = 100 // Simple fixed-size chunking
for (let i = 0; i < text.length; i += size) {
chunks.push(text.slice(i, i + size))
}
return chunks
}
async post(memory, prepRes: { index: number }, chunks: string[]): Promise<void> {
// Add chunks to the global list (careful with concurrency if using ParallelFlow)
// A safer parallel approach might store chunks per file then combine later.
memory.all_chunks.push(...chunks)
}
}
// 1c. Node to trigger embedding for each chunk
class TriggerEmbeddingNode extends Node {
async prep(memory): Promise<string[]> {
return memory.all_chunks ?? []
}
async exec(chunks: string[]): Promise<number> {
return chunks.length
}
async post(memory, prepRes: string[], chunkCount: number): Promise<void> {
console.log(`Triggering embedding for ${chunkCount} chunks.`)
memory.all_embeds = [] // Initialize embedding store
;(prepRes as string[]).forEach((chunk, index) => {
this.trigger('embed_chunk', { chunk, index })
})
this.trigger('store_index') // Trigger storing after all chunks processed
}
}
// 1d. Node to embed a single chunk
class EmbedChunkNode extends Node {
async prep(memory): Promise<{ chunk: string; index: number }> {
return { chunk: memory.chunk, index: memory.index } // Read from local memory
}
async exec(prepRes: { chunk: string; index: number }): Promise<number[]> {
console.log(`Embedding chunk ${prepRes.index}`)
return await getEmbedding(prepRes.chunk)
}
async post(memory, prepRes: { index: number }, embedding: number[]): Promise<void> {
// Store embedding in global list (careful with concurrency)
memory.all_embeds[prepRes.index] = embedding
}
}
# 1e. Node to store the final index
class StoreIndexNode(Node):
async def prep(self, memory):
# Read all embeddings from global memory
# Filter out potential None values if embedding failed for some chunks
embeddings = [emb for emb in (memory.all_embeds or []) if emb is not None]
if len(embeddings) != len(memory.all_embeds or []):
print(f"Warning: Some chunks failed to embed. Indexing {len(embeddings)} embeddings.")
return embeddings
async def exec(self, all_embeds: list):
if not all_embeds:
print("No embeddings to store.")
return None
print(f"Storing index for {len(all_embeds)} embeddings.")
# Create a vector index (implementation depends on library)
index = create_index(all_embeds)
return index
async def post(self, memory, prep_res, index):
# Store the created index in global memory
memory.index = index
if index:
print('Index created and stored.')
else:
print('Index creation skipped.')
# End of offline flow
// 1e. Node to store the final index
class StoreIndexNode extends Node {
async prep(memory): Promise<number[][]> {
// Read all embeddings from global memory
return memory.all_embeds ?? []
}
async exec(allEmbeds: number[][]): Promise<any> {
console.log(`Storing index for ${allEmbeds.length} embeddings.`)
// Create a vector index (implementation depends on library)
const index = createIndex(allEmbeds)
return index
}
async post(memory, prepRes: any, index: any): Promise<void> {
// Store the created index in global memory
memory.index = index
console.log('Index created and stored.')
}
}
// --- Offline Flow Definition ---
const triggerChunking = new TriggerChunkingNode()
const chunkFile = new ChunkFileNode()
const triggerEmbedding = new TriggerEmbeddingNode()
const embedChunk = new EmbedChunkNode()
const storeIndex = new StoreIndexNode()
// Define transitions
triggerChunking.on('chunk_file', chunkFile)
triggerChunking.on('embed_chunks', triggerEmbedding)
triggerEmbedding.on('embed_chunk', embedChunk)
triggerEmbedding.on('store_index', storeIndex)
// Use ParallelFlow for chunking and embedding if desired
const OfflineFlow = new ParallelFlow(triggerChunking)
// Or sequential: const OfflineFlow = new Flow(triggerChunking);
# --- Offline Flow Execution ---
async def run_offline():
# Create dummy files for example
if not os.path.exists('doc1.txt'): fs.writeFileSync('doc1.txt', 'Alice was beginning to get very tired.')
if not os.path.exists('doc2.txt'): fs.writeFileSync('doc2.txt', 'The quick brown fox jumps over the lazy dog.')
initial_memory = {
"files": ["doc1.txt", "doc2.txt"], # Example file paths
}
print('Starting offline indexing flow...')
await OfflineFlow.run(initial_memory)
print('Offline indexing complete.')
# Clean up dummy files
# os.remove('doc1.txt')
# os.remove('doc2.txt')
return initial_memory # Return memory containing index, chunks, embeds
# asyncio.run(run_offline()) # Example call
// --- Offline Flow Execution ---
async function runOffline() {
const initialMemory = {
files: ['doc1.txt', 'doc2.txt'], // Example file paths
}
console.log('Starting offline indexing flow...')
// Create dummy files for example
fs.writeFileSync('doc1.txt', 'Alice was beginning to get very tired.')
fs.writeFileSync('doc2.txt', 'The quick brown fox jumps over the lazy dog.')
await OfflineFlow.run(initialMemory)
console.log('Offline indexing complete.')
// Clean up dummy files
fs.unlinkSync('doc1.txt')
fs.unlinkSync('doc2.txt')
return initialMemory // Return memory containing index, chunks, embeds
}
// runOffline(); // Example call
# 2b. Retrieve Docs Node
class RetrieveDocsNode(Node):
async def prep(self, memory):
# Need query embedding, index, and original chunks
# Also retrieve metadata to know the source
return memory.q_emb, memory.index, memory.all_chunks, memory.chunk_metadata
async def exec(self, inputs):
q_emb, index, chunks, metadata = inputs
if not q_emb or not index or not chunks:
raise ValueError("Missing data for retrieval in memory")
print("Retrieving relevant chunk...")
# Assuming search_index returns [[ids]], [[distances]]
I, D = search_index(index, q_emb, top_k=1)
if not I or not I[0]:
return "Could not find relevant chunk.", None
best_global_id = I[0][0]
if best_global_id >= len(chunks):
return "Index out of bounds.", None
relevant_chunk = chunks[best_global_id]
relevant_metadata = metadata[best_global_id] if metadata and best_global_id < len(metadata) else {}
return relevant_chunk, relevant_metadata
async def post(self, memory, prep_res, exec_res):
relevant_chunk, relevant_metadata = exec_res
memory.retrieved_chunk = relevant_chunk # Write to memory
memory.retrieved_metadata = relevant_metadata # Write metadata too
print(f"Retrieved chunk: {relevant_chunk[:60]}... (Source: {relevant_metadata.get('source', 'N/A')})")
self.trigger('generate_answer')
# 2c. Generate Answer Node
class GenerateAnswerNode(Node):
async def prep(self, memory):
return memory.question, memory.retrieved_chunk # Read from memory
async def exec(self, inputs):
question, chunk = inputs
if not chunk or chunk == "Could not find relevant chunk.":
return "Sorry, I couldn't find relevant information to answer the question."
prompt = f"Using the following context, answer the question.\nContext: {chunk}\nQuestion: {question}\nAnswer:"
print("Generating final answer...")
return await call_llm(prompt)
async def post(self, memory, prep_res, answer):
memory.answer = answer # Write to memory
print("Answer:", answer)
# End of online flow
// 2c. Generate Answer Node
class GenerateAnswerNode extends Node {
async prep(memory): Promise<{ question: string; chunk: string }> {
return { question: memory.question, chunk: memory.retrieved_chunk }
}
async exec(prepRes: { question: string; chunk: string }): Promise<string> {
const { question, chunk } = prepRes
const prompt = `Using the following context, answer the question.
Context: ${chunk}
Question: ${question}
Answer:`
console.log('Generating final answer...')
return await callLLM(prompt)
}
async post(memory, prepRes: any, answer: string): Promise<void> {
memory.answer = answer // Store final answer
console.log(`Answer: ${answer}`)
// End of flow
}
}
// --- Online Flow Definition ---
const embedQueryNode = new EmbedQueryNode()
const retrieveDocsNode = new RetrieveDocsNode()
const generateAnswerNode = new GenerateAnswerNode()
// Define transitions
embedQueryNode.on('retrieve_docs', retrieveDocsNode)
retrieveDocsNode.on('generate_answer', generateAnswerNode)
const OnlineFlow = new Flow(embedQueryNode)
# --- Online Flow Execution ---
async def run_online(memory_from_offline: dict):
# Add the user's question to the memory from the offline stage
memory_from_offline["question"] = "Why do people like cats?"
print(f"\nStarting online RAG flow for question: \"{memory_from_offline['question']}\"")
await OnlineFlow.run(memory_from_offline) # Pass memory object
# final answer in memory_from_offline["answer"]
print("Final Answer:", memory_from_offline.get("answer", "N/A")) # Read from memory
return memory_from_offline
# Example usage combining both stages
async def main():
# Mock external functions if not defined
# global get_embedding, create_index, search_index, call_llm
# get_embedding = ...
# create_index = ...
# search_index = ...
# call_llm = ...
memory_after_offline = await run_offline()
if memory_after_offline.get("index"): # Only run online if index exists
await run_online(memory_after_offline)
else:
print("Skipping online flow due to missing index.")
if __name__ == "__main__":
# Note: Ensure dummy files exist or are created before running
# For simplicity, file creation moved to run_offline
asyncio.run(main())
// --- Online Flow Execution ---
async function runOnline(memoryFromOffline: any) {
// Add the user's question to the memory from the offline stage
memoryFromOffline.question = 'Why do people like cats?'
console.log(`\nStarting online RAG flow for question: "${memoryFromOffline.question}"`)
await OnlineFlow.run(memoryFromOffline)
console.log('Online RAG complete.')
return memoryFromOffline
}
// --- Combined Example ---
async function runFullRAG() {
// Mock external functions for example
globalThis.getEmbedding = async (text: string) => Array(5).fill(Math.random())
globalThis.createIndex = (embeds: number[][]) => ({
search: (q: number[], k: number) => [[Math.floor(Math.random() * embeds.length)]],
}) // Mock index
globalThis.searchIndex = (index: any, q: number[], k: number) => index.search(q, k)
globalThis.callLLM = async (prompt: string) => `Mock LLM answer for: ${prompt.split('\n')[1]}`
const memoryAfterOffline = await runOffline()
const finalMemory = await runOnline(memoryAfterOffline)
console.log('\n--- Full RAG Result ---')
console.log('Final Answer:', finalMemory.answer)
}
runFullRAG().catch(console.error)