Map Reduce
The MapReduce pattern is a powerful way to process large datasets by breaking down a complex task into two main phases: Map (processing individual items) and Reduce (aggregating results). BrainyFlow provides the perfect abstractions to implement this pattern efficiently, especially with its ParallelFlow
for concurrent mapping.

Core Components
Trigger Node:
Role: Triggers the fan-out to Mapper Nodes from a list of items.
Implementation: A standard
Node
whosepost
method iterates through the input collection and callsthis.trigger()
for each item, passing item-specific data asforkingData
.
Mapper Node:
Role: Takes a single item from the input collection and transforms it.
Implementation: A standard
Node
that receives an individual item (often viaforkingData
in its local memory) and performs a specific computation (e.g., summarizing a document, extracting key-value pairs).Output: Stores the processed result for that item in the global
Memory
object, typically using a unique key (e.g.,memory.results[item_id] = processed_data
).
Reducer Node:
Role: Collects and aggregates the results from all Mapper Nodes.
Implementation: A standard
Node
that reads all individual results from the globalMemory
and combines them into a final output. This node is usually triggered after all Mapper Nodes have completed.
MapReduce Flow
Pattern Implementation
We use a ParallelFlow
for the mapping phase for performance gain.
from brainyflow import Node, Flow, ParallelFlow
class Trigger(Node):
async def prep(self, memory):
assert hasattr(memory, "items"), f"'items' must be set in memory"
return getattr(memory, "items")
async def post(self, memory, items, exec_res):
setattr(memory,"output", {} if isinstance(items, dict) else [None] * len(items))
for index, input in (enumerate(items) if isinstance(items, (list, tuple)) else items.items()):
self.trigger("default", {"index": index, "item": input})
class Reducer(Node):
async def prep(self, memory):
assert hasattr(memory, "index"), "index of processed item must be set in memory"
assert hasattr(memory, "item"), "processed item must be set in memory"
return memory.index, memory.item
async def post(self, memory, prep_res, exec_res):
memory.output[prep_res[0]] = prep_res[1]
def mapreduce(iterate: Node | Flow):
trigger = Trigger()
reduce = Reducer()
trigger >> iterate >> reduce
return ParallelFlow(start=trigger)
Example: Document Summarization
Let's create a flow to summarize multiple text files using the pattern we just created.
For simplicity, these will be overly-simplified mock tools/nodes. For a more in-depth implementation, check the implementations in our cookbook for Resume Qualification (Python) - more TypeScript examples coming soon (PRs welcome!).
1. Define Mapper Node (Summarizer)
This node will summarize a single file.
from brainyflow import Node
# Assume call_llm is defined elsewhere
class SummarizeFileNode(Node):
async def prep(self, memory):
# Item data comes from forkingData in local memory
return {"filename": memory.item.filename, "content": memory.item.content}
async def exec(self, prep_res: dict):
prompt = f"Summarize the following text from {prep_res['filename']}:\n{prep_res['content']}"
return await call_llm(prompt)
async def post(self, memory, prep_res: dict, exec_res: str):
# Store individual summary in local memory reusing the "item" key
memory.local.item.summary = exec_res
2. Define Reducer Node (Aggregator)
This node will combine all individual summaries.
from brainyflow import Node
class AggregateSummariesNode(Node):
async def prep(self, memory):
# Collect all file summaries from global memory
return [item["summary"] for item in memory.output]
async def exec(self, summaries: list[str]):
combined_text = "\n\n".join(summaries)
prompt = f"Combine the following summaries into one cohesive summary:\n{combined_text}"
return await call_llm(prompt)
async def post(self, memory, prep_res: dict, exec_res: str):
memory.final_summary = exec_res # Store final aggregated summary
3. Assemble the Flow
import asyncio
from brainyflow import Flow
# (mapreduce, SummarizeFileNode and AggregateSummariesNode definitions as above)
# Instantiate nodes
summarizer = SummarizeFileNode()
aggregator = AggregateSummariesNode()
mapreduce_flow = mapreduce(summarizer)
mapreduce_flow >> aggregator
main_flow = Flow(start=mapreduce_flow)
# --- Execution ---
async def main():
memory = {
"items": {
"file1.txt": "Alice was beginning to get very tired of sitting by her sister...",
"file2.txt": "The quick brown fox jumps over the lazy dog.",
"file3.txt": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
}
}
await main_flow.run(memory) # Pass memory object
print('\n--- MapReduce Complete ---')
print("Individual Summaries:", memory.get("file_summaries"))
print("\nFinal Summary:\n", memory.get("final_summary"))
if __name__ == "__main__":
asyncio.run(main())
This example demonstrates how to implement a MapReduce pattern using BrainyFlow, leveraging ParallelFlow
for concurrent processing of the map phase and the Memory
object for collecting results before the reduce phase.
Last updated