Throttling
Effective rate limiting is crucial when working with external APIs and services. This guide covers patterns for implementing throttling in Caskada applications.
This is particularly important when:
Calling external APIs with rate limits
Managing expensive operations (like LLM calls)
Preventing system overload from too many parallel requests
Concurrency Control Patterns
These patterns limit the number of concurrent operations within a node.
import asyncio
from caskada import Node, Memory # Assuming imports
class LimitedParallelNode(Node):
def __init__(self, concurrency_limit: int = 3, **kwargs): # Allow passing other Node args
super().__init__(**kwargs) # Call parent constructor
if concurrency_limit <= 0:
raise ValueError("Concurrency limit must be positive")
self._semaphore = asyncio.Semaphore(concurrency_limit)
print(f"Node initialized with concurrency limit: {concurrency_limit}")
# Prep is usually needed to get 'items' from memory
async def prep(self, memory):
# Example: Fetch items from memory
items = memory.items_to_process or []
print(f"Prep: Found {len(items)} items to process.")
return items # Assuming items are in memory.items_to_process
async def exec(self, items: list): # exec receives result from prep
if not items:
print("Exec: No items to process.")
return []
async def limited_task_runner(item):
async with self._semaphore:
print(f" Starting processing item: {item}")
# process_one_item should ideally be defined in the subclass or passed in
result = await self.process_one_item(item) # Renamed for clarity
print(f" Finished processing item: {item} -> {result}")
return result
print(f"Exec: Starting processing of {len(items)} items with limit {self._semaphore._value}...")
tasks = [limited_task_runner(item) for item in items]
results = await asyncio.gather(*tasks)
print("Exec: All items processed.")
return results
async def process_one_item(self, item):
"""Placeholder: Subclasses must implement this method."""
# Example implementation:
await asyncio.sleep(0.5) # Simulate async work
return f"Processed_{item}"
# raise NotImplementedError("process_one_item must be implemented by subclasses")
# Post is needed to store results and trigger next step
async def post(self, memory, prep_res: list, exec_res: list):
print(f"Post: Storing {len(exec_res)} results.")
memory.processed_results = exec_res # Store results
self.trigger('default') # Trigger next nodeRate Limiting with Window Limits
Throttler Utility
Advanced Throttling Patterns
1. Token Bucket Rate Limiter
2. Sliding Window Rate Limiter
Best Practices
Monitor API Responses: Watch for 429 (Too Many Requests) responses and adjust your rate limiting accordingly
Implement Retry Logic: When hitting rate limits, implement exponential backoff for retries
Distribute Load: If possible, spread requests across multiple API keys or endpoints
Cache Responses: Cache frequent identical requests to reduce API calls
Batch Requests: Combine multiple requests into single API calls when possible
Integration with Caskada
Throttled LLM Node
Linking to Related Concepts
For batch processing patterns, see Flow.
Last updated