Multi-Agents (Advanced)
Multi-agent systems involve multiple autonomous agents (or sub-flows) that interact and collaborate to achieve a common goal. Caskada's modularity and flexible communication mechanisms make it well-suited for building such systems.
Most of time, you don't need Multi-Agents. Start with a simple solution first.
Key Concepts
Agent Specialization: Each agent (or sub-flow) can be designed with a specific role or expertise (e.g., a "researcher" agent, a "coder" agent, a "critic" agent).
Communication Channels: Agents need ways to exchange information. This can be achieved through:
Shared Memory: Agents read from and write to a common
Memoryobject.Message Queues: Agents send and receive structured messages through a queue (e.g.,
asyncio.Queuein Python, customAsyncQueuein TypeScript).Dedicated Nodes: A "broker" or "router" node can facilitate communication between agents.
Orchestration: A higher-level flow or a "supervisor" agent can manage the interaction and execution order of individual agents.
Example: Simple Message-Passing Agents
Let's create a simple multi-agent system where agents communicate via a shared message queue.
For simplicity, these will be overly-simplified mock tools/nodes. For a more in-depth implementation, check the implementations in our cookbook for Multi-Agent Taboo Game (Python) or Agent with A2A Protocol (Python) - more TypeScript examples coming soon (PRs welcome!).
1. Define Agent Node
This node represents an individual agent that processes messages from a queue.
import asyncio
from caskada import Node
class AgentNode(Node):
async def prep(self, memory):
# We'll store the queue in global memory for simplicity here,
# though in real apps, dependency injection might be better.
message_queue = memory.message_queue
if message_queue.empty():
print("AgentNode: Queue is empty, waiting...")
# In a real app, you might want a timeout or a more sophisticated wait
await asyncio.sleep(1) # Small delay to prevent busy-waiting
return None # No message to process yet
message = await message_queue.get()
print(f"AgentNode: Received message: {message}")
return message
async def exec(self, message: str):
# Simulate processing the message
processed_message = f"Processed: {message.upper()}"
return processed_message
async def post(self, memory, prep_res: str, exec_res: str):
if prep_res: # Only trigger if a message was processed
memory.last_processed_message = exec_res
print(f"AgentNode: Stored processed message: {exec_res}")
self.trigger("default") # Continue processing
else:
# If no message was processed, re-trigger self to check queue again
self.trigger("check_queue")2. Define Message Sender (External Process)
This function simulates an external system sending messages to the queue.
3. Assemble the Flow and Run
We create a flow where the AgentNode loops back to itself to continuously check the message queue.
This example demonstrates a basic multi-agent setup using a shared message queue and a looping flow.
This pattern demonstrates several key advantages:
Specialization: Each agent can focus on a specific task
Independence: Agents can operate on different schedules or priorities
Coordination: Agents can collaborate through shared memory and queues
Flexibility: Easy to add new agents or modify existing ones
Scalability: The system can grow to include many specialized agents
More complex multi-agent systems can be built by introducing:
Supervisor Flows: A main flow that orchestrates multiple sub-flows (agents).
Tool Calling: Agents can use tools to interact with external systems or other agents.
Shared Global State: Beyond message queues, agents can update a shared global memory for broader coordination.
Dynamic Agent Creation: Agents could dynamically create and manage other agents based on task requirements.
Last updated