Migrating from PocketFlow
Caskada originated as a fork of PocketFlow, aiming to refine its core concepts, enhance type safety (in both language ports), and improve the developer experience for building agentic systems. If you have an existing PocketFlow application, migrating to Caskada involves several key changes. This guide focuses on migrating from typical PocketFlow patterns to the modern Caskada (v2.0+).
Key Conceptual Differences & Changes
Async*Classes Removed: Caskada'sNodeandFloware inherently async-capable in both Python and TypeScript. AllAsyncNode,AsyncFlow, etc., from PocketFlow are removed. Simply make yourprep,exec,postmethodsasyncand useawaitwhere appropriate.MemoryObject:Caskada's
Memoryobject is more central and refined.PocketFlow's
Paramsconcept is absorbed into theMemoryobject's local store, typically populated viaforkingData.
Triggering Actions: In
post, instead ofreturn "action_name", you must useself.trigger("action_name", forking_data={...})(Python) orthis.trigger("action_name", { ... })(TypeScript).Batch Processing (
*BatchNode/*BatchFlowRemoval): PocketFlow's specialized batch classes are removed. Caskada handles batching via a "fan-out" pattern: a standardNodecallstriggermultiple times in itspostmethod, each call typically including item-specificforkingData. This is then orchestrated by aFlow(for sequential batching) orParallelFlow(for concurrent batching).Flow.run()Result: Returns a structuredExecutionTreedetailing the execution path, rather than a simple dictionary of results.
Why Async?
The move to async brings several benefits:
Improved performance: Asynchronous code can handle I/O-bound operations more efficiently
Better concurrency: Easier to implement parallel processing patterns
Simplified codebase: No need for separate sync and async implementations
Modern Python: Aligns with Python's direction for handling concurrent operations
Migration Steps
Step 1: Update Imports and Dependencies
Replace all
from pocketflow import ...withfrom caskada import ...(Python) orimport { ... } from 'caskada'(TypeScript).
# Before
from pocketflow import Node, Flow, BatchNode # ... etc
# After
import asyncio
from caskada import Node, Flow # ... etcUpdate your
requirements.txtorpackage.jsonto usecaskada.
Step 2: Convert to Async and Update Method Signatures
Add
asyncbeforedeffor yourprep,exec,post, andexec_fallbackmethods in Nodes and Flows.Remove any
_asyncsuffix from the method names.Add
awaitbefore any calls to these methods,run()methods,asyncio.sleep(), or other async library functions.
Node Example (Before):
class MyNode(Node):
def prep(self, shared):
# Preparation logic
return some_data
def exec(self, prep_res):
# Execution logic
return result
def post(self, shared, prep_res, exec_res):
# Post-processing logic
return action
def exec_fallback(self, prep_res, exc):
# Handle exception
return fallback_resultNode Example (After):
class MyNode(Node):
# Prefer using 'memory' parameter name for consistency
async def prep(self, memory):
# Preparation logic
# If you call other async functions here, use await
return some_data
async def exec(self, prep_res):
# Execution logic
# If you call other async functions here, use await
result = await some_async_task(prep_res)
return result
async def post(self, memory, prep_res, exec_res):
# Post-processing logic
# If you call other async functions here, use await
memory.result = exec_res # Write to memory (global store)
self.trigger(action) # Use trigger instead of returning action string
async def exec_fallback(self, prep_res, exc):
# Handle exception
# If you call other async functions here, use await
return fallback_resultStep 3: Use .trigger() for next actions
.trigger() for next actionsIn all Node subclasses, within the post method:
Replace any
return "action_name"statements withself.trigger("action_name")(Python) orthis.trigger("action_name")(TypeScript).If you were passing data to the next node's local context (PocketFlow's
params), pass this data as the second argument totrigger(theforkingDataobject). Example:self.trigger("process_item", {"item": current_item})If
postsimply completed without returning an action (implying default), you can either explicitly callself.trigger("default)or rely on the implicit default trigger if notriggercalls are made.
Step 4: Update Batch Processing Implementation (*BatchNode / *BatchFlow Removal)
*BatchNode / *BatchFlow Removal)Caskada removes all specialized BatchNode and BatchFlow classes. Batch functionality is achieved using standard Nodes and Flows combined with the "fan-out/fan-in" trigger pattern.
The batch functionality is now achieved using standard Nodes and Flows combined with a specific pattern:
Fan-Out (Map) Phase:
All
BatchNodeneed to be replaced by a fan-out flow. You can either implement a MapReduce pattern for it, or split the node into two, a Trigger Node and a Processor Node:The Prepare/Trigger Node (replaces the
preppart of aBatchNode):Use the
prepmethod to fetch the list of items to process, as usual.Use the
postmethod to iterate through these items. For each item, callsself.trigger(action, forkingData={"item": current_item, "index": i, ...}). TheforkingDatadictionary passes item-specific data into the local memory of the triggered successor. (theactionname can be any of your choice as long as you connect the nodes in the flow; e.g.process_one,default)This node might also initialize an aggregate result structure in the global memory (e.g.,
memory.batch_results = {}).
The Item Processor Node (replaces the
exec_onepart of aBatchNode):Its
prepmethod reads the specific item data (e.g.,memory.item,memory.index) from its local memory (which was populated byforkingDatafrom the trigger node).The logic previously in the
exec_onemethod of theBatchNodeshould now be in this node'sexecmethod.Its
postmethod typically writes the individual item's result back to the global memory, often using an index or unique key (e.g.,memory.batch_results[prep_res.index] = exec_res.item_result).
Aggregation (Optional Fan-In Node): If you need to aggregate results after all items are processed (you probably should implement mapreduce), you might have the Item Processor Node also trigger an "aggregation_pending" action, and another (final) node conditionalized on all items being done (e.g., via a counter in global memory or by checking the length of results). Or, the Prepare/Trigger node itself might have a separate trigger for an aggregation step after it has fanned out all items.
Choose the Right Flow:
Wrap the
TriggerNodeandProcessorNodein a standardcaskada.Flowif you need items processed sequentially.Wrap them in a
ParallelFlowif you need items processed concurrently.
Rename All Classes:
Replace
AsyncParallelBatchFlowwithParallelFlow.Replace
AsyncParallelBatchNode,ParallelBatchNode,AsyncBatchNode,BatchNodewith the standardNode.Replace
AsyncBatchFlow,BatchFlowwithcaskada.Flow.Remember to make
prep,exec,postmethodsasyncas per Step 2.
Step 6: Python NodeError Protocol
NodeError ProtocolIf you were catching NodeError exceptions, note that in Python it's now a typing.Protocol. This means you'd typically catch the underlying error (e.g., ValueError) and then check if it conforms to NodeError via isinstance(error, NodeError) if you need to access error.retry_count. For TypeScript, it remains an Error subtype.
Step 6: Run with asyncio:
asyncio:Ensure your main application entry point uses asyncio.run() (Python) or Promise.all()/async functions (TypeScript) to execute your flows.
import asyncio
async def main():
# ... setup your Caskada nodes/flows ...
memory = {}
result = await my_flow.run(memory) # Use await and pass memory object
print(result)
print(memory)
if __name__ == "__main__":
asyncio.run(main())Summary of Key Migration Points:
Updating imports to
caskadaand addingimport asyncio.Adding
asyncto your Node/Flow method definitions (prep,exec,post,exec_fallback) and removing any_asyncsuffix from the method names.Replacing any
return actioninpost()withself.trigger(action, forking_data={...})(Python) orthis.trigger(action, { ... })(TypeScript).Using
awaitwhen callingrun()methods and any other asynchronous operations within your methods.Refactoring
BatchNode/BatchFlowusage to the fan-out pattern using standardNodes orchestrated byFloworParallelFlow.Running your main execution logic within an
async def main()function called byasyncio.run().
This transition enables you to leverage the performance and concurrency benefits of asynchronous programming in your workflows.
Last updated