Skip to main content

Tool Calling Pattern

Tool calling is the most common pattern in production AI agents. Whether you’re calling a weather API, querying a database, or executing a payment, this pattern ensures your agent handles external systems reliably.

The Core Pattern

from statebase import StateBase

sb = StateBase(api_key="your-key")

def agent_with_tool_calling(session_id, user_message):
    # 1. Get current context
    context = sb.sessions.get_context(session_id=session_id)
    
    # 2. LLM decides which tool to call
    response = llm.generate(
        prompt=f"User: {user_message}\nContext: {context}",
        tools=[weather_tool, database_tool, calculator_tool]
    )
    
    # 3. Checkpoint BEFORE calling tool
    sb.sessions.update_state(
        session_id=session_id,
        state={
            **context["state"],
            "pending_tool": response.tool_name,
            "tool_args": response.tool_args
        },
        reasoning=f"About to call {response.tool_name}"
    )
    
    # 4. Execute tool call with error handling
    try:
        result = execute_tool(response.tool_name, response.tool_args)
        
        # 5. Checkpoint AFTER successful tool call
        sb.sessions.update_state(
            session_id=session_id,
            state={
                **context["state"],
                "last_tool_result": result,
                "last_tool_success": True
            },
            reasoning=f"Tool {response.tool_name} succeeded"
        )
        
        return result
        
    except ToolError as e:
        # 6. Roll back on failure
        sb.sessions.rollback(session_id=session_id, version=-1)
        
        # 7. Log the failure
        sb.sessions.add_turn(
            session_id=session_id,
            input={"type": "tool_call", "tool": response.tool_name},
            output={"type": "error", "message": str(e)},
            reasoning=f"Tool call failed: {e}"
        )
        
        return f"I encountered an error: {e}. Let me try a different approach."

Tool Definition

Define tools with clear schemas to help the LLM make correct calls:
weather_tool = {
    "name": "get_weather",
    "description": "Get current weather for a city",
    "parameters": {
        "type": "object",
        "properties": {
            "city": {
                "type": "string",
                "description": "City name (e.g., 'San Francisco')"
            },
            "units": {
                "type": "string",
                "enum": ["celsius", "fahrenheit"],
                "default": "fahrenheit"
            }
        },
        "required": ["city"]
    }
}

def get_weather(city: str, units: str = "fahrenheit") -> dict:
    """Actual implementation"""
    response = requests.get(
        f"https://api.weather.com/v1/current",
        params={"city": city, "units": units}
    )
    return response.json()

Error Handling Strategies

Strategy 1: Retry with Exponential Backoff

import time

def call_tool_with_retry(tool_name, args, max_retries=3):
    for attempt in range(max_retries):
        try:
            return execute_tool(tool_name, args)
        except TemporaryError as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 1s, 2s, 4s
                time.sleep(wait_time)
                continue
            else:
                raise  # Give up after max retries

Strategy 2: Fallback Tools

def call_with_fallback(primary_tool, fallback_tool, args):
    try:
        return execute_tool(primary_tool, args)
    except APIError:
        # Try fallback
        return execute_tool(fallback_tool, args)

# Example: Primary weather API fails, use backup
result = call_with_fallback(
    primary_tool="openweather_api",
    fallback_tool="weatherapi_com",
    args={"city": "San Francisco"}
)

Strategy 3: Circuit Breaker

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def call(self, func, *args, **kwargs):
        if self.state == "open":
            # Check if timeout has passed
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            # Success - reset
            self.failure_count = 0
            self.state = "closed"
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            
            raise e

# Usage
weather_breaker = CircuitBreaker(failure_threshold=5, timeout=60)

try:
    result = weather_breaker.call(get_weather, city="SF")
except Exception as e:
    # Circuit is open, don't keep hammering the API
    return "Weather service is temporarily unavailable"

Caching Tool Results

Avoid redundant API calls by caching results in session state:
def get_weather_cached(session_id, city):
    # Check cache first
    state = sb.sessions.get(session_id).state
    cache_key = f"weather_{city}"
    
    if cache_key in state:
        cached = state[cache_key]
        # Check if cache is still fresh (< 1 hour old)
        if time.time() - cached["timestamp"] < 3600:
            return cached["data"]
    
    # Cache miss or stale - fetch fresh data
    weather = get_weather(city)
    
    # Store in cache
    sb.sessions.update_state(
        session_id=session_id,
        state={
            **state,
            cache_key: {
                "data": weather,
                "timestamp": time.time()
            }
        },
        reasoning=f"Cached weather for {city}"
    )
    
    return weather

Parallel Tool Calls

When tools don’t depend on each other, call them in parallel:
import asyncio

async def get_travel_info(session_id, destination, dates):
    # Call multiple APIs in parallel
    results = await asyncio.gather(
        get_flights_async(destination, dates),
        get_hotels_async(destination, dates),
        get_weather_async(destination),
        return_exceptions=True  # Don't fail if one fails
    )
    
    flights, hotels, weather = results
    
    # Handle partial failures
    if isinstance(flights, Exception):
        flights = None
    if isinstance(hotels, Exception):
        hotels = None
    if isinstance(weather, Exception):
        weather = None
    
    # Store results
    sb.sessions.update_state(
        session_id=session_id,
        state={
            "flights": flights,
            "hotels": hotels,
            "weather": weather,
            "search_completed": True
        },
        reasoning="Parallel tool calls completed"
    )
    
    return {
        "flights": flights or "Unavailable",
        "hotels": hotels or "Unavailable",
        "weather": weather or "Unavailable"
    }

Tool Call Logging

Always log tool calls for debugging and analytics:
def execute_tool_with_logging(session_id, tool_name, args):
    start_time = time.time()
    
    try:
        result = execute_tool(tool_name, args)
        latency = (time.time() - start_time) * 1000  # ms
        
        # Log successful call
        sb.sessions.add_turn(
            session_id=session_id,
            input={"type": "tool_call", "tool": tool_name, "args": args},
            output={"type": "tool_result", "result": result},
            metadata={
                "latency_ms": latency,
                "success": True,
                "tool_name": tool_name
            },
            reasoning=f"Tool {tool_name} completed in {latency:.0f}ms"
        )
        
        return result
        
    except Exception as e:
        latency = (time.time() - start_time) * 1000
        
        # Log failed call
        sb.sessions.add_turn(
            session_id=session_id,
            input={"type": "tool_call", "tool": tool_name, "args": args},
            output={"type": "error", "error": str(e)},
            metadata={
                "latency_ms": latency,
                "success": False,
                "tool_name": tool_name,
                "error_type": type(e).__name__
            },
            reasoning=f"Tool {tool_name} failed: {e}"
        )
        
        raise

Best Practices

✅ Do This

  • Checkpoint before expensive tool calls (you can retry without re-calling)
  • Validate tool arguments before calling (prevent bad API requests)
  • Implement timeouts (don’t wait forever for slow APIs)
  • Cache results when appropriate (save money and time)
  • Log everything (you’ll need it for debugging)

❌ Avoid This

  • Don’t call tools without checkpointing (you’ll lose progress on failure)
  • Don’t ignore errors (handle them gracefully)
  • Don’t hammer failing APIs (use circuit breakers)
  • Don’t trust LLM-generated arguments blindly (validate first)

Complete Example

Here’s a production-ready tool calling implementation:
from statebase import StateBase
import requests
import time

sb = StateBase(api_key="your-key")

class ToolExecutor:
    def __init__(self, session_id):
        self.session_id = session_id
        self.circuit_breakers = {}
    
    def execute(self, tool_name, args, cache_ttl=None):
        # 1. Validate arguments
        if not self.validate_args(tool_name, args):
            raise ValueError(f"Invalid arguments for {tool_name}")
        
        # 2. Check cache
        if cache_ttl:
            cached = self.get_cached(tool_name, args)
            if cached:
                return cached
        
        # 3. Checkpoint before call
        sb.sessions.update_state(
            session_id=self.session_id,
            state={
                "pending_tool": tool_name,
                "tool_args": args
            },
            reasoning=f"About to call {tool_name}"
        )
        
        # 4. Execute with circuit breaker
        breaker = self.get_circuit_breaker(tool_name)
        
        try:
            result = breaker.call(self._execute_tool, tool_name, args)
            
            # 5. Cache result
            if cache_ttl:
                self.cache_result(tool_name, args, result, cache_ttl)
            
            # 6. Checkpoint after success
            sb.sessions.update_state(
                session_id=self.session_id,
                state={"last_tool_result": result},
                reasoning=f"Tool {tool_name} succeeded"
            )
            
            return result
            
        except Exception as e:
            # 7. Roll back on failure
            sb.sessions.rollback(session_id=self.session_id, version=-1)
            raise
    
    def _execute_tool(self, tool_name, args):
        # Actual tool implementations
        if tool_name == "get_weather":
            return self.get_weather(**args)
        elif tool_name == "search_database":
            return self.search_database(**args)
        else:
            raise ValueError(f"Unknown tool: {tool_name}")
    
    def get_weather(self, city):
        response = requests.get(
            "https://api.weather.com/v1/current",
            params={"city": city},
            timeout=5
        )
        response.raise_for_status()
        return response.json()
    
    def validate_args(self, tool_name, args):
        # Implement validation logic
        return True
    
    def get_circuit_breaker(self, tool_name):
        if tool_name not in self.circuit_breakers:
            self.circuit_breakers[tool_name] = CircuitBreaker()
        return self.circuit_breakers[tool_name]
    
    def get_cached(self, tool_name, args):
        state = sb.sessions.get(self.session_id).state
        cache_key = f"cache_{tool_name}_{hash(str(args))}"
        
        if cache_key in state:
            cached = state[cache_key]
            if time.time() - cached["timestamp"] < cached["ttl"]:
                return cached["data"]
        
        return None
    
    def cache_result(self, tool_name, args, result, ttl):
        state = sb.sessions.get(self.session_id).state
        cache_key = f"cache_{tool_name}_{hash(str(args))}"
        
        sb.sessions.update_state(
            session_id=self.session_id,
            state={
                **state,
                cache_key: {
                    "data": result,
                    "timestamp": time.time(),
                    "ttl": ttl
                }
            },
            reasoning=f"Cached {tool_name} result"
        )

# Usage
executor = ToolExecutor(session_id="sess_123")

weather = executor.execute(
    tool_name="get_weather",
    args={"city": "San Francisco"},
    cache_ttl=3600  # Cache for 1 hour
)

Next Steps


Key Takeaway: Tool calling is where agents interact with the real world. Checkpoint before calls, handle errors gracefully, and cache aggressively.