Multi-Agents (Advanced)

Multi-agent systems involve multiple autonomous agents (or sub-flows) that interact and collaborate to achieve a common goal. BrainyFlow's modularity and flexible communication mechanisms make it well-suited for building such systems.

Key Concepts

  1. Agent Specialization: Each agent (or sub-flow) can be designed with a specific role or expertise (e.g., a "researcher" agent, a "coder" agent, a "critic" agent).

  2. Communication Channels: Agents need ways to exchange information. This can be achieved through:

    • Shared Memory: Agents read from and write to a common Memory object.

    • Message Queues: Agents send and receive structured messages through a queue (e.g., asyncio.Queue in Python, custom AsyncQueue in TypeScript).

    • Dedicated Nodes: A "broker" or "router" node can facilitate communication between agents.

  3. Orchestration: A higher-level flow or a "supervisor" agent can manage the interaction and execution order of individual agents.

Example: Simple Message-Passing Agents

Let's create a simple multi-agent system where agents communicate via a shared message queue.

For simplicity, these will be overly-simplified mock tools/nodes. For a more in-depth implementation, check the implementations in our cookbook for Multi-Agent Taboo Game (Python) or Agent with A2A Protocol (Python) - more TypeScript examples coming soon (PRs welcome!).

1. Define Agent Node

This node represents an individual agent that processes messages from a queue.

import asyncio
from brainyflow import Node

class AgentNode(Node):
    async def prep(self, memory):
        # We'll store the queue in global memory for simplicity here,
        # though in real apps, dependency injection might be better.
        message_queue = memory.message_queue
        if message_queue.empty():
            print("AgentNode: Queue is empty, waiting...")
            # In a real app, you might want a timeout or a more sophisticated wait
            await asyncio.sleep(1) # Small delay to prevent busy-waiting
            return None # No message to process yet

        message = await message_queue.get()
        print(f"AgentNode: Received message: {message}")
        return message

    async def exec(self, message: str):
        # Simulate processing the message
        processed_message = f"Processed: {message.upper()}"
        return processed_message

    async def post(self, memory, prep_res: str, exec_res: str):
        if prep_res: # Only trigger if a message was processed
            memory.last_processed_message = exec_res
            print(f"AgentNode: Stored processed message: {exec_res}")
            self.trigger("default") # Continue processing
        else:
            # If no message was processed, re-trigger self to check queue again
            self.trigger("check_queue")

2. Define Message Sender (External Process)

This function simulates an external system sending messages to the queue.

import asyncio

async def send_system_messages(queue: asyncio.Queue):
    messages = ["Hello Agent 1", "Task A", "Task B", "Shutdown"]
    for i, msg in enumerate(messages):
        await asyncio.sleep(0.5) # Simulate delay
        await queue.put(msg)
        print(f"System: Sent message: {msg}")
        if msg == "Shutdown":
            break

3. Assemble the Flow and Run

We create a flow where the AgentNode loops back to itself to continuously check the message queue.

import asyncio
from brainyflow import Flow, Memory

# Instantiate agent node
agent_node = AgentNode()

# Agent loops back to itself to keep checking the queue
agent_node >> agent_node # After processing, check again
agent_node - "check_queue" >> agent_node # If queue was empty, check again

flow = Flow(start=agent_node)

async def main():
    message_queue = asyncio.Queue()
    # Pass queue via initial memory object
    memory_obj = Memory(global_store={"message_queue": message_queue})

    print("Starting agent listener and message sender...")
    # Run both coroutines
    # Note: This will run indefinitely without a termination mechanism
    await asyncio.gather(
        flow.run(memory_obj),
        send_system_messages(message_queue)
    )

if __name__ == "__main__":
    asyncio.run(main())

This example demonstrates a basic multi-agent setup using a shared message queue and a looping flow.

This pattern demonstrates several key advantages:

  1. Specialization: Each agent can focus on a specific task

  2. Independence: Agents can operate on different schedules or priorities

  3. Coordination: Agents can collaborate through shared memory and queues

  4. Flexibility: Easy to add new agents or modify existing ones

  5. Scalability: The system can grow to include many specialized agents

More complex multi-agent systems can be built by introducing:

  • Supervisor Flows: A main flow that orchestrates multiple sub-flows (agents).

  • Tool Calling: Agents can use tools to interact with external systems or other agents.

  • Shared Global State: Beyond message queues, agents can update a shared global memory for broader coordination.

  • Dynamic Agent Creation: Agents could dynamically create and manage other agents based on task requirements.

Last updated