Testing
Effective testing and debugging are essential for building reliable applications. This guide covers strategies for testing and debugging complex flows, and monitoring applications in production.
Testing Approaches
BrainyFlow supports multiple testing approaches to ensure your applications work correctly:
Unit Testing (Nodes)
Individual nodes can be tested in isolation to verify their behavior:
import unittest
from unittest.mock import AsyncMock, patch
from brainyflow import Node
class TestSummarizeNode(unittest.TestCase):
async def test_summarize_node(self):
# Create the node
summarize_node = SummarizeNode()
# Create a mock shared store
memory = {"text": "This is a long text that needs to be summarized."}
# Mock the LLM call
with patch('utils.call_llm', new_callable=AsyncMock) as mock_llm:
mock_llm.return_value = "Short summary."
# Run the node
await summarize_node.run(memory)
# Verify the node called the LLM with the right prompt
mock_llm.assert_called_once()
call_args = mock_llm.call_args[0][0]
self.assertIn("summarize", call_args.lower())
# Verify the result was stored correctly
self.assertEqual(memory.summary, "Short summary.") # Access memory object
if __name__ == "__main__":
# Use asyncio.run for async tests if needed, or run within an existing loop
# For simplicity, assuming standard unittest runner handles async test cases
unittest.main()
import { describe, expect, it, vi } from 'vitest'
import { SummarizeNode } from './SummarizeNode' // Your Node implementation
import { callLLM } from './utils/callLLM' // Your LLM utility
// Mock the LLM utility
vi.mock('./utils/callLLM', () => ({
callLLM: vi.fn().mockResolvedValue('Short summary.'),
}))
describe('SummarizeNode', () => {
it('should summarize text correctly', async () => {
// Create the node instance
const summarizeNode = new SummarizeNode()
// Create initial global memory state
const memory = { text: 'This is a long text that needs to be summarized.' }
// Run the node's lifecycle (prep → exec → post)
await summarizeNode.run(memory) // Pass memory object
// Verify the LLM call
expect(callLLM).toHaveBeenCalledTimes(1)
const callArgs = vi.mocked(callLLM).mock.calls[0][0] // Get the first argument of the first call
expect(callArgs.toLowerCase()).toContain('summarize') // Check if prompt contains 'summarize'
// Verify the result was stored correctly in the global memory object
expect(memory.summary).toBe('Short summary.') // Access memory object
})
})
Integration Testing (Flows)
Test complete flows to verify that nodes work together correctly:
import unittest
from unittest.mock import AsyncMock, patch
from brainyflow import Flow
class TestQuestionAnsweringFlow(unittest.TestCase):
async def test_qa_flow(self):
# Create the flow
qa_flow = create_qa_flow()
# Create a mock shared store
memory = {"question": "What is the capital of France?"}
# Mock all LLM calls
with patch('utils.call_llm', new_callable=AsyncMock) as mock_llm:
# Configure the mock to return different values for different prompts
def mock_llm_side_effect(prompt):
if "search" in prompt.lower():
return "Paris is the capital of France."
elif "answer" in prompt.lower():
return "The capital of France is Paris."
return "Unexpected prompt"
mock_llm.side_effect = mock_llm_side_effect
# Run the flow
await qa_flow.run(memory)
# Verify the final answer
self.assertEqual(memory.answer, "The capital of France is Paris.") # Access memory object
# Verify the LLM was called the expected number of times
self.assertEqual(mock_llm.call_count, 2)
if __name__ == '__main__':
# Use asyncio.run for async tests if needed
unittest.main()
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createQaFlow } from './qaFlow' // Your function that creates the Flow
import { callLLM } from './utils/callLLM' // Your LLM utility
// Mock the LLM utility
vi.mock('./utils/callLLM', () => ({
callLLM: vi.fn(),
}))
describe('Question Answering Flow', () => {
beforeEach(() => {
// Clear any previous mock calls before each test
vi.clearAllMocks()
})
it('should generate an answer using the flow', async () => {
// Configure mock to return different values based on the prompt
vi.mocked(callLLM).mockImplementation((prompt: string) => {
// Simulate different stages of a potential QA flow (e.g., search vs. answer)
if (prompt.toLowerCase().includes('search')) {
return Promise.resolve('Paris is the capital of France.')
} else if (prompt.toLowerCase().includes('answer')) {
return Promise.resolve('The capital of France is Paris.')
}
return Promise.resolve('Unexpected prompt')
})
// Create the flow
const qaFlow = createQaFlow()
// Create initial memory state
const memory = { question: 'What is the capital of France?' }
// Run the flow
await qaFlow.run(memory) // Pass memory object
// Verify the final answer
expect(memory.answer).toBe('The capital of France is Paris.') // Access memory object
// Verify the LLM was called the expected number of times
expect(callLLM).toHaveBeenCalledTimes(2)
// Verify the calls were made with appropriate prompts
const calls = vi.mocked(callLLM).mock.calls
const retrieveCall = calls.some(
(call) => typeof call === 'string' && call.toLowerCase().includes('retrieve'),
)
const generateCall = calls.some(
(call) => typeof call === 'string' && call.toLowerCase().includes('generate'),
)
expect(retrieveCall).toBe(true)
expect(generateCall).toBe(true)
})
})
// Example testing a MapReduce flow (Trigger, Processor, Reducer)
describe('MapReduce Flow Test', () => {
// Mock the nodes used in the MapReduce example
class TriggerNode extends Node<Memory, any, ['process_item','reduce']> {
async post(memory: Memory, prepRes: any, execRes: any): Promise<void> {
const items = memory.items || []
memory.results = [] // Initialize results
items.forEach((item: any, index: number) => {
this.trigger('process_item', { item, index })
})
this.trigger('reduce')
}
}
const ProcessorNode = class extends Node {
async prep(memory: Memory): Promise<any> { return { item: memory.item, index: memory.index }; }
async exec(prepRes: { item: any, index: number }): Promise<string> { return `Processed ${prepRes.item}`; }
async post(memory: Memory, prepRes: { item: any, index: number }, execRes: string): Promise<void> {
if (!memory.results) memory.results = [];
// Store result at the correct index if possible, or just push
memory.results[prepRes.index] = execRes;
}
}
const ReducerNode = class extends Node {
async prep(memory: Memory): Promise<any[]> { return memory.results || []; }
async exec(results: any[]): Promise<string> { return `Combined: ${results.join(', ')}`; }
async post(memory: Memory, prepRes: any, execRes: string): Promise<void> { memory.final_result = execRes; }
}
it('should process items via map and reduce steps', async () => {
// Instantiate nodes
const trigger = new TriggerNode()
const processor = new ProcessorNode()
const reducer = new ReducerNode()
// Connect nodes
trigger.on('process_item', processor)
trigger.on('reduce', reducer) // This action is triggered after all 'process_item'
// Use ParallelFlow for the map phase
const mapReduceFlow = new ParallelFlow(trigger)
// Initial memory
const memory = { items: ['A', 'B', 'C'] }
// Run the flow
await mapReduceFlow.run(memory)
// Verify final result in memory
expect(memory.results).toEqual(['Processed A', 'Processed B', 'Processed C'])
expect(memory.final_result).toBe('Combined: Processed A, Processed B, Processed C')
})
})
</div>
</div>
## Testing Approaches
### Unit Testing Individual Nodes
1. **Isolate Dependencies**: Mock external services and LLM calls
2. **Test Each Lifecycle Method**: Verify `prep`, `exec`, and `post` individually
3. **Test Error Handling**: Ensure `exec_fallback` works as expected
4. **Verify Memory Updates**: Check if memory is modified correctly
5. **Test Triggers**: Ensure the right actions are triggered
### Integration Testing Flows
1. **Mock External Services**: Keep tests deterministic by mocking APIs
2. **Verify End-to-End Behavior**: Test the entire flow from start to finish
3. **Test Branching Logic**: Ensure different paths work correctly
4. **Check Final Memory State**: Verify that the memory contains expected results
5. **Test Error Handling**: Make sure flows handle errors gracefully
### Testing Strategies
#### Testing LLM-Based Nodes
For nodes that call LLMs, you can use these approaches:
1. **Canned Responses**: Prepare fixed responses for specific prompts.
2. **Prompt Verification**: Check if prompts contain expected information.
3. **Response Validation**: Test if the node correctly handles various LLM responses.
<div data-gb-custom-block data-tag="tabs">
<div data-gb-custom-block data-tag="tab" data-title='Python (unittest.mock)'>
```python
from unittest.mock import patch, AsyncMock
import asyncio
# Mock LLM with canned responses based on prompt content
async def mock_llm_logic(prompt: str) -> str:
if "summarize" in prompt.lower():
return "This is a summary."
elif "extract" in prompt.lower():
# Simulate returning a JSON-like string
return '{"key": "value"}'
else:
return "Default response"
# Example usage in a test
async def test_node_with_mocked_llm():
# Assume MyLlmNode calls utils.call_llm internally
# node = MyLlmNode()
# memory = Memory({"input": "some text to summarize"})
# Use patch to replace the actual call_llm
with patch('utils.call_llm', new=AsyncMock(side_effect=mock_llm_logic)) as mock_call:
# await node.run(memory) # Run the node that uses the LLM
pass # Replace pass with actual node execution
# Assertions can be made here on memory state or mock calls
# mock_call.assert_called_once()
# assert memory.summary == "This is a summary."
# asyncio.run(test_node_with_mocked_llm())
import { describe, expect, it, vi } from 'vitest'
import { callLLM } from './utils/callLLM' // Your LLM utility
// import { MyLlmNode } from './MyLlmNode'; // Your Node implementation
// import { Memory } from 'brainyflow'; // Assuming Memory is imported if needed
// Mock the LLM utility module
vi.mock('./utils/callLLM', () => ({
callLLM: vi.fn(), // Create a mock function
}))
describe('Testing LLM Nodes', () => {
it('should use canned responses based on prompt', async () => {
// Configure the mock implementation
vi.mocked(callLLM).mockImplementation(async (prompt: string): Promise<string> => {
if (prompt.toLowerCase().includes('summarize')) {
return 'This is a summary.'
} else if (prompt.toLowerCase().includes('extract')) {
return JSON.stringify({ key: 'value' }) // Return JSON string
} else {
return 'Default response'
}
})
// const node = new MyLlmNode();
// const memory = { input: 'some text to summarize' }; // Initial memory state
// await node.run(memory); // Run the node
// Add assertions here
// expect(callLLM).toHaveBeenCalled();
// expect(memory.summary).toBe('This is a summary.');
})
})
Testing Retry Logic
To test retry behavior:
Simulate Transient Failures: Make the mock function fail a few times before succeeding.
Check Retry Count: Verify that retries happened the expected number of times (e.g., by checking
node.cur_retry
inside the mock or tracking calls).Test Backoff: If using
wait
, mockasyncio.sleep
(Python) orsetTimeout
(TypeScript) to verify delays without actually waiting.
from unittest.mock import patch, AsyncMock
import asyncio
# from brainyflow import Node # Assuming Node is imported
# Mock function that fails twice, then succeeds
call_count_retry = 0
async def mock_fails_then_succeeds(*args, **kwargs):
global call_count_retry
call_count_retry += 1
print(f"Mock called (Attempt {call_count_retry})") # For debugging test
if call_count_retry <= 2:
raise ValueError("Temporary network failure")
return "Success on third try"
# Example Node (conceptual)
# class NodeWithRetry(Node):
# def __init__(self):
# super().__init__(max_retries=3, wait=0.1) # Retry up to 3 times (4 attempts total)
# async def exec(self, prep_res):
# # This method calls the function we will mock
# return await some_external_call(prep_res)
async def test_retry_logic():
global call_count_retry
call_count_retry = 0 # Reset counter for test
# node = NodeWithRetry()
# memory = Memory({})
# Patch the external call made within node.exec
# Also patch asyncio.sleep to avoid actual waiting
with patch('__main__.some_external_call', new=AsyncMock(side_effect=mock_fails_then_succeeds)), \
patch('asyncio.sleep', new=AsyncMock()) as mock_sleep:
# await node.run(memory) # Run the node
pass # Replace pass with actual node execution
# Assertions
# assert call_count_retry == 3 # Should be called 3 times (1 initial + 2 retries)
# assert memory.result == "Success on third try" # Check final result
# assert mock_sleep.call_count == 2 # Check if sleep was called between retries
# asyncio.run(test_retry_logic())
import { beforeEach, describe, expect, it, vi } from 'vitest'
// import { Node, Memory } from 'brainyflow'; // Assuming imports
// import { someExternalCall } from './utils/externalCall'; // The function called by exec
// Mock the external call module
vi.mock('./utils/externalCall', () => ({
someExternalCall: vi.fn(),
}))
// Mock setTimeout used for 'wait' (if applicable)
vi.useFakeTimers()
// Example Node (conceptual)
// class NodeWithRetry extends Node<any, any, [], any, string> {
// constructor() {
// super({ maxRetries: 3, wait: 100 }); // Retry up to 3 times, wait 100ms
// }
// async exec(prepRes: any): Promise<string> {
// // This method calls the function we will mock
// return await someExternalCall(prepRes);
// }
// }
describe('Retry Logic Testing', () => {
let callCountRetry = 0
beforeEach(() => {
callCountRetry = 0 // Reset counter
vi.clearAllMocks() // Clear mock history
vi.clearAllTimers() // Clear pending timers
})
it('should retry exec on failure and succeed eventually', async () => {
// Configure the mock to fail twice, then succeed
vi.mocked(someExternalCall).mockImplementation(async () => {
callCountRetry++
console.log(`Mock called (Attempt ${callCountRetry})`) // For debugging test
if (callCountRetry <= 2) {
throw new Error('Temporary network failure')
}
return 'Success on third try'
})
// const node = new NodeWithRetry();
// const memory = {}; // Initial memory
// await node.run(memory); // Run the node
// Advance timers to simulate waiting (if wait > 0)
// vi.advanceTimersByTime(100); // Advance by wait time
// await Promise.resolve(); // Allow promises to settle after timer advance
// vi.advanceTimersByTime(100); // Advance for second wait
// await Promise.resolve();
// Assertions
// expect(callCountRetry).toBe(3); // Called 3 times
// expect(memory.result).toBe('Success on third try'); // Check final result
// expect(vi.getTimerCount()).toBe(0); // Ensure all timers were cleared/run
})
})
Test Fixtures and Helpers
Creating helper functions can make tests more readable and maintainable.
# Example helpers (can be placed in a conftest.py for pytest or a base class for unittest)
# from brainyflow import Memory, Node # Assuming imports
def create_default_test_memory() -> dict:
"""Creates a standard dictionary for test memory."""
return {"input": "test data", "config": {"setting": "value"}}
async def run_node_with_memory(node: Node, initial_memory: dict | None = None) -> dict:
"""Runs a node with provided or default initial memory."""
memory_obj = initial_memory if initial_memory is not None else create_default_test_memory()
# Assuming node.run modifies the dictionary in place or returns it
await node.run(memory_obj)
return memory_obj
def assert_memory_contains(memory: dict, expected_data: dict):
"""Asserts that the memory dictionary contains the expected key-value pairs."""
for key, value in expected_data.items():
assert key in memory, f"Memory missing key: {key}"
assert memory[key] == value, f"Memory value mismatch for key '{key}': expected {value}, got {memory[key]}"
# Example usage in a test
# async def test_my_node_output():
# node = MyProcessingNode()
# final_memory = await run_node_with_memory(node)
# assert_memory_contains(final_memory, {"output": "processed data", "status": "completed"})
import { expect } from 'vitest'
// import { Node, Memory } from 'brainyflow'; // Assuming imports
// Define a type for your standard test memory if desired
interface TestMemory {
input?: string
config?: { setting: string }
output?: any
status?: string
[key: string]: any // Allow other properties
}
export function createDefaultTestMemory(): TestMemory {
/** Creates a standard object for test memory. */
return { input: 'test data', config: { setting: 'value' } }
}
export async function runNodeWithMemory(node: Node, initialMemory?: TestMemory): Promise<TestMemory> {
/** Runs a node with provided or default initial memory. */
const memory = initialMemory ?? createDefaultTestMemory()
// Assumes node.run modifies the object in place
await node.run(memory)
return memory
}
export function assertMemoryContains(memory: TestMemory, expectedData: Partial<TestMemory>): void {
/** Asserts that the memory object contains the expected key-value pairs. */
for (const key in expectedData) {
expect(memory).toHaveProperty(key)
expect(memory[key]).toEqual(expectedData[key])
}
}
// Example usage in a test
/*
import { MyProcessingNode } from './MyProcessingNode';
import { runNodeWithMemory, assertMemoryContains } from './testHelpers';
it('should produce correct output in memory', async () => {
const node = new MyProcessingNode();
const finalMemory = await runNodeWithMemory(node);
assertMemoryContains(finalMemory, { output: "processed data", status: "completed" });
});
*/
Common Testing Patterns
1. Input Validation Testing
Test that nodes properly handle invalid or unexpected inputs.
# Requires: pip install pytest pytest-asyncio
import pytest
# from brainyflow import Node, Memory # Assuming imports
# from my_nodes import MyNodeThatValidates # Your node
@pytest.mark.parametrize("invalid_input", [None, "", {}, [], {"wrong_key": 1}])
@pytest.mark.asyncio
async def test_node_handles_invalid_input(invalid_input):
"""Tests if the node handles various invalid inputs gracefully."""
node = MyNodeThatValidates() # Node that should validate memory.input_data
memory = {"input_data": invalid_input} # Pass invalid data
# Expect the node to run without unhandled exceptions
# and potentially set an error state or default output
await node.run(memory)
# Example assertions: Check for an error flag or a specific state
assert memory.get("error_message") is not None or memory.get("status") == "validation_failed"
# Or assert that a default value was set
# assert memory.get("output") == "default_value"
import { describe, expect, it } from 'vitest'
// import { MyNodeThatValidates } from './MyNodeThatValidates'; // Your node
// import { Memory } from 'brainyflow'; // Assuming imports
describe('Input Validation', () => {
const invalidInputs = [null, undefined, '', {}, [], { wrongKey: 1 }]
it.each(invalidInputs)('should handle invalid input: %s', async (invalidInput) => {
/** Tests if the node handles various invalid inputs gracefully. */
// const node = new MyNodeThatValidates(); // Node that should validate memory.input_data
const memory: Record<string, any> = { input_data: invalidInput } // Pass invalid data
// Expect the node to run without unhandled exceptions
// Use try/catch if specific errors are expected, otherwise just run
await node.run(memory)
// Example assertions: Check for an error flag or a specific state
expect(memory.error_message || memory.status).toBeDefined() // Check if either is set
expect(memory.status === 'validation_failed' || memory.error_message).toBeTruthy()
// Or assert that a default value was set
// expect(memory.output).toBe('default_value');
})
})
2. Flow Path Testing
Test that flows follow the expected paths based on node triggers.
import asyncio
# from brainyflow import Node, Flow, Memory # Assuming imports
async def test_flow_follows_correct_path():
"""Tests if the flow executes nodes in the expected sequence."""
visited_nodes_log = []
# Define simple tracking nodes
class SimpleTrackingNode(Node):
def __init__(self, name: str, trigger_action: str = "default"):
super().__init__()
self._node_name = name
self._trigger_action = trigger_action
async def exec(self, prep_res):
# No real work, just track visit
visited_nodes_log.append(self._node_name)
return f"Processed by {self._node_name}" # Return something for post
async def post(self, memory, prep_res, exec_res):
# Trigger the specified action
self.trigger(self._trigger_action)
# Create nodes for a simple path: A -> B -> C
node_a = SimpleTrackingNode("A", trigger_action="next_step")
node_b = SimpleTrackingNode("B", trigger_action="finish")
node_c = SimpleTrackingNode("C") # This node shouldn't be reached
# Connect nodes based on actions
node_a.on("next_step", node_b)
node_b.on("finish", node_c) # Connect C, but B will trigger 'finish'
# Create and run the flow
flow = Flow(start=node_a)
await flow.run({}) # Pass empty memory
# Verify the execution path
assert visited_nodes_log == ["A", "B"], f"Expected A->B, but got: {visited_nodes_log}"
# asyncio.run(test_flow_follows_correct_path())
import { describe, expect, it } from 'vitest'
// import { Node, Flow, Memory, BaseNode } from 'brainyflow'; // Assuming imports
describe('Flow Path Testing', () => {
it('should follow the correct path based on triggers', async () => {
/** Tests if the flow executes nodes in the expected sequence. */
const visitedNodesLog: string[] = []
// Define simple tracking nodes
class SimpleTrackingNode extends Node<any, any, ['next_step', 'finish']> {
private nodeName: string
private triggerAction: 'next_step' | 'finish' | 'default'
constructor(name: string, triggerAction: 'next_step' | 'finish' | 'default' = 'default') {
super()
this.nodeName = name
this.triggerAction = triggerAction
}
async exec(prepRes: any): Promise<string> {
// No real work, just track visit
visitedNodesLog.push(this.nodeName)
return `Processed by ${this.nodeName}` // Return something for post
}
async post(memory: Memory, prepRes: any, execRes: string): Promise<void> {
// Trigger the specified action
this.trigger(this.triggerAction)
}
}
// Create nodes for a path: A -> B -> C (where B triggers 'finish')
const nodeA = new SimpleTrackingNode('A', 'next_step')
const nodeB = new SimpleTrackingNode('B', 'finish')
const nodeC = new SimpleTrackingNode('C') // This node shouldn't be reached
// Connect nodes based on actions
nodeA.on('next_step', nodeB)
nodeB.on('finish', nodeC) // Connect C, but B will trigger 'finish'
// Create and run the flow
const flow = new Flow(nodeA)
await flow.run({}) // Pass empty memory
// Verify the execution path
expect(visitedNodesLog).toEqual(['A', 'B'])
})
})
Best Practices
Testing Best Practices
Test Each Node Individually: Verify that each node performs its specific task correctly
Test Flows as Integration Tests: Ensure nodes work together as expected
Mock External Dependencies: Use mocks for LLMs, APIs, and databases to ensure consistent testing
Test Error Handling: Explicitly test how your application handles failures
Automate Tests: Include BrainyFlow tests in your CI/CD pipeline
Debugging Best Practices
Start Simple: Begin with a minimal flow and add complexity incrementally
Visualize Your Flow: Generate flow diagrams to understand the structure
Isolate Issues: Test individual nodes to narrow down problems
Check Shared Store: Verify that data is correctly passed between nodes
Monitor Actions: Ensure nodes are returning the expected actions
Monitoring Best Practices
Monitor Node Performance: Track execution time for each node
Watch for Bottlenecks: Identify nodes that take longer than expected
Track Error Rates: Monitor how often nodes and flows fail
Set Up Alerts: Configure alerts for critical failures
Log Judiciously: Log important events without overwhelming storage
Implement Distributed Tracing: Use tracing for complex, distributed applications
By applying these testing techniques, you can ensure your BrainyFlow applications are reliable and maintainable.
Last updated