Tool Calling Pattern
Tool calling is the most common pattern in production AI agents. Whether you’re calling a weather API, querying a database, or executing a payment, this pattern ensures your agent handles external systems reliably.The Core Pattern
from statebase import StateBase
sb = StateBase(api_key="your-key")
def agent_with_tool_calling(session_id, user_message):
# 1. Get current context
context = sb.sessions.get_context(session_id=session_id)
# 2. LLM decides which tool to call
response = llm.generate(
prompt=f"User: {user_message}\nContext: {context}",
tools=[weather_tool, database_tool, calculator_tool]
)
# 3. Checkpoint BEFORE calling tool
sb.sessions.update_state(
session_id=session_id,
state={
**context["state"],
"pending_tool": response.tool_name,
"tool_args": response.tool_args
},
reasoning=f"About to call {response.tool_name}"
)
# 4. Execute tool call with error handling
try:
result = execute_tool(response.tool_name, response.tool_args)
# 5. Checkpoint AFTER successful tool call
sb.sessions.update_state(
session_id=session_id,
state={
**context["state"],
"last_tool_result": result,
"last_tool_success": True
},
reasoning=f"Tool {response.tool_name} succeeded"
)
return result
except ToolError as e:
# 6. Roll back on failure
sb.sessions.rollback(session_id=session_id, version=-1)
# 7. Log the failure
sb.sessions.add_turn(
session_id=session_id,
input={"type": "tool_call", "tool": response.tool_name},
output={"type": "error", "message": str(e)},
reasoning=f"Tool call failed: {e}"
)
return f"I encountered an error: {e}. Let me try a different approach."
Tool Definition
Define tools with clear schemas to help the LLM make correct calls:weather_tool = {
"name": "get_weather",
"description": "Get current weather for a city",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name (e.g., 'San Francisco')"
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"default": "fahrenheit"
}
},
"required": ["city"]
}
}
def get_weather(city: str, units: str = "fahrenheit") -> dict:
"""Actual implementation"""
response = requests.get(
f"https://api.weather.com/v1/current",
params={"city": city, "units": units}
)
return response.json()
Error Handling Strategies
Strategy 1: Retry with Exponential Backoff
import time
def call_tool_with_retry(tool_name, args, max_retries=3):
for attempt in range(max_retries):
try:
return execute_tool(tool_name, args)
except TemporaryError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
time.sleep(wait_time)
continue
else:
raise # Give up after max retries
Strategy 2: Fallback Tools
def call_with_fallback(primary_tool, fallback_tool, args):
try:
return execute_tool(primary_tool, args)
except APIError:
# Try fallback
return execute_tool(fallback_tool, args)
# Example: Primary weather API fails, use backup
result = call_with_fallback(
primary_tool="openweather_api",
fallback_tool="weatherapi_com",
args={"city": "San Francisco"}
)
Strategy 3: Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
# Check if timeout has passed
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
# Success - reset
self.failure_count = 0
self.state = "closed"
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise e
# Usage
weather_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
try:
result = weather_breaker.call(get_weather, city="SF")
except Exception as e:
# Circuit is open, don't keep hammering the API
return "Weather service is temporarily unavailable"
Caching Tool Results
Avoid redundant API calls by caching results in session state:def get_weather_cached(session_id, city):
# Check cache first
state = sb.sessions.get(session_id).state
cache_key = f"weather_{city}"
if cache_key in state:
cached = state[cache_key]
# Check if cache is still fresh (< 1 hour old)
if time.time() - cached["timestamp"] < 3600:
return cached["data"]
# Cache miss or stale - fetch fresh data
weather = get_weather(city)
# Store in cache
sb.sessions.update_state(
session_id=session_id,
state={
**state,
cache_key: {
"data": weather,
"timestamp": time.time()
}
},
reasoning=f"Cached weather for {city}"
)
return weather
Parallel Tool Calls
When tools don’t depend on each other, call them in parallel:import asyncio
async def get_travel_info(session_id, destination, dates):
# Call multiple APIs in parallel
results = await asyncio.gather(
get_flights_async(destination, dates),
get_hotels_async(destination, dates),
get_weather_async(destination),
return_exceptions=True # Don't fail if one fails
)
flights, hotels, weather = results
# Handle partial failures
if isinstance(flights, Exception):
flights = None
if isinstance(hotels, Exception):
hotels = None
if isinstance(weather, Exception):
weather = None
# Store results
sb.sessions.update_state(
session_id=session_id,
state={
"flights": flights,
"hotels": hotels,
"weather": weather,
"search_completed": True
},
reasoning="Parallel tool calls completed"
)
return {
"flights": flights or "Unavailable",
"hotels": hotels or "Unavailable",
"weather": weather or "Unavailable"
}
Tool Call Logging
Always log tool calls for debugging and analytics:def execute_tool_with_logging(session_id, tool_name, args):
start_time = time.time()
try:
result = execute_tool(tool_name, args)
latency = (time.time() - start_time) * 1000 # ms
# Log successful call
sb.sessions.add_turn(
session_id=session_id,
input={"type": "tool_call", "tool": tool_name, "args": args},
output={"type": "tool_result", "result": result},
metadata={
"latency_ms": latency,
"success": True,
"tool_name": tool_name
},
reasoning=f"Tool {tool_name} completed in {latency:.0f}ms"
)
return result
except Exception as e:
latency = (time.time() - start_time) * 1000
# Log failed call
sb.sessions.add_turn(
session_id=session_id,
input={"type": "tool_call", "tool": tool_name, "args": args},
output={"type": "error", "error": str(e)},
metadata={
"latency_ms": latency,
"success": False,
"tool_name": tool_name,
"error_type": type(e).__name__
},
reasoning=f"Tool {tool_name} failed: {e}"
)
raise
Best Practices
✅ Do This
- Checkpoint before expensive tool calls (you can retry without re-calling)
- Validate tool arguments before calling (prevent bad API requests)
- Implement timeouts (don’t wait forever for slow APIs)
- Cache results when appropriate (save money and time)
- Log everything (you’ll need it for debugging)
❌ Avoid This
- Don’t call tools without checkpointing (you’ll lose progress on failure)
- Don’t ignore errors (handle them gracefully)
- Don’t hammer failing APIs (use circuit breakers)
- Don’t trust LLM-generated arguments blindly (validate first)
Complete Example
Here’s a production-ready tool calling implementation:from statebase import StateBase
import requests
import time
sb = StateBase(api_key="your-key")
class ToolExecutor:
def __init__(self, session_id):
self.session_id = session_id
self.circuit_breakers = {}
def execute(self, tool_name, args, cache_ttl=None):
# 1. Validate arguments
if not self.validate_args(tool_name, args):
raise ValueError(f"Invalid arguments for {tool_name}")
# 2. Check cache
if cache_ttl:
cached = self.get_cached(tool_name, args)
if cached:
return cached
# 3. Checkpoint before call
sb.sessions.update_state(
session_id=self.session_id,
state={
"pending_tool": tool_name,
"tool_args": args
},
reasoning=f"About to call {tool_name}"
)
# 4. Execute with circuit breaker
breaker = self.get_circuit_breaker(tool_name)
try:
result = breaker.call(self._execute_tool, tool_name, args)
# 5. Cache result
if cache_ttl:
self.cache_result(tool_name, args, result, cache_ttl)
# 6. Checkpoint after success
sb.sessions.update_state(
session_id=self.session_id,
state={"last_tool_result": result},
reasoning=f"Tool {tool_name} succeeded"
)
return result
except Exception as e:
# 7. Roll back on failure
sb.sessions.rollback(session_id=self.session_id, version=-1)
raise
def _execute_tool(self, tool_name, args):
# Actual tool implementations
if tool_name == "get_weather":
return self.get_weather(**args)
elif tool_name == "search_database":
return self.search_database(**args)
else:
raise ValueError(f"Unknown tool: {tool_name}")
def get_weather(self, city):
response = requests.get(
"https://api.weather.com/v1/current",
params={"city": city},
timeout=5
)
response.raise_for_status()
return response.json()
def validate_args(self, tool_name, args):
# Implement validation logic
return True
def get_circuit_breaker(self, tool_name):
if tool_name not in self.circuit_breakers:
self.circuit_breakers[tool_name] = CircuitBreaker()
return self.circuit_breakers[tool_name]
def get_cached(self, tool_name, args):
state = sb.sessions.get(self.session_id).state
cache_key = f"cache_{tool_name}_{hash(str(args))}"
if cache_key in state:
cached = state[cache_key]
if time.time() - cached["timestamp"] < cached["ttl"]:
return cached["data"]
return None
def cache_result(self, tool_name, args, result, ttl):
state = sb.sessions.get(self.session_id).state
cache_key = f"cache_{tool_name}_{hash(str(args))}"
sb.sessions.update_state(
session_id=self.session_id,
state={
**state,
cache_key: {
"data": result,
"timestamp": time.time(),
"ttl": ttl
}
},
reasoning=f"Cached {tool_name} result"
)
# Usage
executor = ToolExecutor(session_id="sess_123")
weather = executor.execute(
tool_name="get_weather",
args={"city": "San Francisco"},
cache_ttl=3600 # Cache for 1 hour
)
Next Steps
- Multi-Tool Recovery Demo: See tool calling in action
- RAG Agents Pattern: Combine tools with retrieval
- Error Handling: Deep dive into error strategies
Key Takeaway: Tool calling is where agents interact with the real world. Checkpoint before calls, handle errors gracefully, and cache aggressively.