Throttling
Concurrency Control Patterns
import asyncio
from caskada import Node, Memory # Assuming imports
class LimitedParallelNode(Node):
def __init__(self, concurrency_limit: int = 3, **kwargs): # Allow passing other Node args
super().__init__(**kwargs) # Call parent constructor
if concurrency_limit <= 0:
raise ValueError("Concurrency limit must be positive")
self._semaphore = asyncio.Semaphore(concurrency_limit)
print(f"Node initialized with concurrency limit: {concurrency_limit}")
# Prep is usually needed to get 'items' from memory
async def prep(self, memory):
# Example: Fetch items from memory
items = memory.items_to_process or []
print(f"Prep: Found {len(items)} items to process.")
return items # Assuming items are in memory.items_to_process
async def exec(self, items: list): # exec receives result from prep
if not items:
print("Exec: No items to process.")
return []
async def limited_task_runner(item):
async with self._semaphore:
print(f" Starting processing item: {item}")
# process_one_item should ideally be defined in the subclass or passed in
result = await self.process_one_item(item) # Renamed for clarity
print(f" Finished processing item: {item} -> {result}")
return result
print(f"Exec: Starting processing of {len(items)} items with limit {self._semaphore._value}...")
tasks = [limited_task_runner(item) for item in items]
results = await asyncio.gather(*tasks)
print("Exec: All items processed.")
return results
async def process_one_item(self, item):
"""Placeholder: Subclasses must implement this method."""
# Example implementation:
await asyncio.sleep(0.5) # Simulate async work
return f"Processed_{item}"
# raise NotImplementedError("process_one_item must be implemented by subclasses")
# Post is needed to store results and trigger next step
async def post(self, memory, prep_res: list, exec_res: list):
print(f"Post: Storing {len(exec_res)} results.")
memory.processed_results = exec_res # Store results
self.trigger('default') # Trigger next nodeRate Limiting with Window Limits
Throttler Utility
Advanced Throttling Patterns
1. Token Bucket Rate Limiter
2. Sliding Window Rate Limiter
Best Practices
Integration with Caskada
Throttled LLM Node
Linking to Related Concepts
Last updated