Multi-Agents (Advanced)
Multi-agent systems involve multiple autonomous agents (or sub-flows) that interact and collaborate to achieve a common goal. BrainyFlow's modularity and flexible communication mechanisms make it well-suited for building such systems.
Most of time, you don't need Multi-Agents. Start with a simple solution first.
Key Concepts
Agent Specialization: Each agent (or sub-flow) can be designed with a specific role or expertise (e.g., a "researcher" agent, a "coder" agent, a "critic" agent).
Communication Channels: Agents need ways to exchange information. This can be achieved through:
Shared Memory: Agents read from and write to a common
Memory
object.Message Queues: Agents send and receive structured messages through a queue (e.g.,
asyncio.Queue
in Python, customAsyncQueue
in TypeScript).Dedicated Nodes: A "broker" or "router" node can facilitate communication between agents.
Orchestration: A higher-level flow or a "supervisor" agent can manage the interaction and execution order of individual agents.
Example: Simple Message-Passing Agents
Let's create a simple multi-agent system where agents communicate via a shared message queue.
For simplicity, these will be overly-simplified mock tools/nodes. For a more in-depth implementation, check the implementations in our cookbook for Multi-Agent Taboo Game (Python) or Agent with A2A Protocol (Python) - more TypeScript examples coming soon (PRs welcome!).
1. Define Agent Node
This node represents an individual agent that processes messages from a queue.
import asyncio
from brainyflow import Node
class AgentNode(Node):
async def prep(self, memory):
# We'll store the queue in global memory for simplicity here,
# though in real apps, dependency injection might be better.
message_queue = memory.message_queue
if message_queue.empty():
print("AgentNode: Queue is empty, waiting...")
# In a real app, you might want a timeout or a more sophisticated wait
await asyncio.sleep(1) # Small delay to prevent busy-waiting
return None # No message to process yet
message = await message_queue.get()
print(f"AgentNode: Received message: {message}")
return message
async def exec(self, message: str):
# Simulate processing the message
processed_message = f"Processed: {message.upper()}"
return processed_message
async def post(self, memory, prep_res: str, exec_res: str):
if prep_res: # Only trigger if a message was processed
memory.last_processed_message = exec_res
print(f"AgentNode: Stored processed message: {exec_res}")
self.trigger("default") # Continue processing
else:
# If no message was processed, re-trigger self to check queue again
self.trigger("check_queue")
2. Define Message Sender (External Process)
This function simulates an external system sending messages to the queue.
import asyncio
async def send_system_messages(queue: asyncio.Queue):
messages = ["Hello Agent 1", "Task A", "Task B", "Shutdown"]
for i, msg in enumerate(messages):
await asyncio.sleep(0.5) # Simulate delay
await queue.put(msg)
print(f"System: Sent message: {msg}")
if msg == "Shutdown":
break
3. Assemble the Flow and Run
We create a flow where the AgentNode
loops back to itself to continuously check the message queue.
import asyncio
from brainyflow import Flow, Memory
# Instantiate agent node
agent_node = AgentNode()
# Agent loops back to itself to keep checking the queue
agent_node >> agent_node # After processing, check again
agent_node - "check_queue" >> agent_node # If queue was empty, check again
flow = Flow(start=agent_node)
async def main():
message_queue = asyncio.Queue()
# Pass queue via initial memory object
memory_obj = Memory(global_store={"message_queue": message_queue})
print("Starting agent listener and message sender...")
# Run both coroutines
# Note: This will run indefinitely without a termination mechanism
await asyncio.gather(
flow.run(memory_obj),
send_system_messages(message_queue)
)
if __name__ == "__main__":
asyncio.run(main())
This example demonstrates a basic multi-agent setup using a shared message queue and a looping flow.
This pattern demonstrates several key advantages:
Specialization: Each agent can focus on a specific task
Independence: Agents can operate on different schedules or priorities
Coordination: Agents can collaborate through shared memory and queues
Flexibility: Easy to add new agents or modify existing ones
Scalability: The system can grow to include many specialized agents
More complex multi-agent systems can be built by introducing:
Supervisor Flows: A main flow that orchestrates multiple sub-flows (agents).
Tool Calling: Agents can use tools to interact with external systems or other agents.
Shared Global State: Beyond message queues, agents can update a shared global memory for broader coordination.
Dynamic Agent Creation: Agents could dynamically create and manage other agents based on task requirements.
Last updated