Migrating from PocketFlow
BrainyFlow originated as a fork of PocketFlow, aiming to refine its core concepts, enhance type safety (in both language ports), and improve the developer experience for building agentic systems. If you have an existing PocketFlow application, migrating to BrainyFlow involves several key changes. This guide focuses on migrating from typical PocketFlow patterns to the modern BrainyFlow (v2.0+).
Key Conceptual Differences & Changes
Async*
Classes Removed: BrainyFlow'sNode
andFlow
are inherently async-capable in both Python and TypeScript. AllAsyncNode
,AsyncFlow
, etc., from PocketFlow are removed. Simply make yourprep
,exec
,post
methodsasync
and useawait
where appropriate.Memory
Object:BrainyFlow's
Memory
object is more central and refined.PocketFlow's
Params
concept is absorbed into theMemory
object's local store, typically populated viaforkingData
.
Triggering Actions: In
post
, instead ofreturn "action_name"
, you must useself.trigger("action_name", forking_data={...})
(Python) orthis.trigger("action_name", { ... })
(TypeScript).Batch Processing (
*BatchNode
/*BatchFlow
Removal): PocketFlow's specialized batch classes are removed. BrainyFlow handles batching via a "fan-out" pattern: a standardNode
callstrigger
multiple times in itspost
method, each call typically including item-specificforkingData
. This is then orchestrated by aFlow
(for sequential batching) orParallelFlow
(for concurrent batching).Flow.run()
Result: Returns a structuredExecutionTree
detailing the execution path, rather than a simple dictionary of results.
Why Async?
The move to async brings several benefits:
Improved performance: Asynchronous code can handle I/O-bound operations more efficiently
Better concurrency: Easier to implement parallel processing patterns
Simplified codebase: No need for separate sync and async implementations
Modern Python: Aligns with Python's direction for handling concurrent operations
Migration Steps
Step 1: Update Imports and Dependencies
Replace all
from pocketflow import ...
withfrom brainyflow import ...
(Python) orimport { ... } from 'brainyflow'
(TypeScript).
Update your
requirements.txt
orpackage.json
to usebrainyflow
.
Step 2: Convert to Async and Update Method Signatures
Add
async
beforedef
for yourprep
,exec
,post
, andexec_fallback
methods in Nodes and Flows.Remove any
_async
suffix from the method names.Add
await
before any calls to these methods,run()
methods,asyncio.sleep()
, or other async library functions.
Node Example (Before):
Node Example (After):
Step 3: Use .trigger()
for next actions
.trigger()
for next actionsIn all Node
subclasses, within the post
method:
Replace any
return "action_name"
statements withself.trigger("action_name")
(Python) orthis.trigger("action_name")
(TypeScript).If you were passing data to the next node's local context (PocketFlow's
params
), pass this data as the second argument totrigger
(theforkingData
object). Example:self.trigger("process_item", {"item": current_item})
If
post
simply completed without returning an action (implying default), you can either explicitly callself.trigger("default)
or rely on the implicit default trigger if notrigger
calls are made.
Step 4: Update Batch Processing Implementation (*BatchNode
/ *BatchFlow
Removal)
*BatchNode
/ *BatchFlow
Removal)BrainyFlow removes all specialized BatchNode
and BatchFlow
classes. Batch functionality is achieved using standard Node
s and Flow
s combined with the "fan-out/fan-in" trigger pattern.
The batch functionality is now achieved using standard Node
s and Flow
s combined with a specific pattern:
Fan-Out (Map) Phase:
The Prepare/Trigger Node (replaces the
prep
part of aBatchNode
):Use the
prep
method to fetch the list of items to process, as usual.Use the
post
method to iterate through these items. For each item, callsself.trigger(action, forkingData={"item": current_item, "index": i, ...})
. TheforkingData
dictionary passes item-specific data into the local memory of the triggered successor. (theaction
name can be any of your choice as long as you connect the nodes in the flow; e.g.process_one
,default
)This node might also initialize an aggregate result structure in the global memory (e.g.,
memory.batch_results = {}
).
The Item Processor Node (replaces the
exec_one
part of aBatchNode
):Its
prep
method reads the specific item data (e.g.,memory.item
,memory.index
) from its local memory (which was populated byforkingData
from the trigger node).The logic previously in the
exec_one
method of theBatchNode
should now be in this node'sexec
method.Its
post
method typically writes the individual item's result back to the global memory, often using an index or unique key (e.g.,memory.batch_results[prep_res.index] = exec_res.item_result
).
Aggregation (Optional Fan-In Node): If you need to aggregate results after all items are processed (you probably should implement mapreduce), you might have the Item Processor Node also trigger an "aggregation_pending" action, and another (final) node conditionalized on all items being done (e.g., via a counter in global memory or by checking the length of results). Or, the Prepare/Trigger node itself might have a separate trigger for an aggregation step after it has fanned out all items.
Choose the Right Flow:
Wrap the
TriggerNode
andProcessorNode
in a standardbrainyflow.Flow
if you need items processed sequentially.Wrap them in a
ParallelFlow
if you need items processed concurrently.
Rename All Classes:
Replace
AsyncParallelBatchFlow
withParallelFlow
.Replace
AsyncParallelBatchNode
,ParallelBatchNode
,AsyncBatchNode
,BatchNode
with the standardNode
.Replace
AsyncBatchFlow
,BatchFlow
withbrainyflow.Flow
.Remember to make
prep
,exec
,post
methodsasync
as per Step 2.
Step 6: Python NodeError
Protocol
NodeError
ProtocolIf you were catching NodeError
exceptions, note that in Python it's now a typing.Protocol
. This means you'd typically catch the underlying error (e.g., ValueError
) and then check if it conforms to NodeError
via isinstance(error, NodeError)
if you need to access error.retry_count
. For TypeScript, it remains an Error
subtype.
Step 6: Run with asyncio
:
asyncio
:Ensure your main application entry point uses asyncio.run()
(Python) or Promise.all()
/async
functions (TypeScript) to execute your flows.
Summary of Key Migration Points:
Updating imports to
brainyflow
and addingimport asyncio
.Adding
async
to your Node/Flow method definitions (prep
,exec
,post
,exec_fallback
) and removing any_async
suffix from the method names.Replacing any
return action
inpost()
withself.trigger(action, forking_data={...})
(Python) orthis.trigger(action, { ... })
(TypeScript).Using
await
when callingrun()
methods and any other asynchronous operations within your methods.Refactoring
BatchNode
/BatchFlow
usage to the fan-out pattern using standardNode
s orchestrated byFlow
orParallelFlow
.Running your main execution logic within an
async def main()
function called byasyncio.run()
.
This transition enables you to leverage the performance and concurrency benefits of asynchronous programming in your workflows.
Last updated