Layer 1: Surface
A tool that calls a real API has to handle what real APIs do: require authentication, enforce rate limits, sometimes fail, return more results than fit in one response, and occasionally deliver results asynchronously via webhooks.
The five integration mechanics that catch teams by surprise:
| Mechanic | What goes wrong without it |
|---|---|
| Auth | Credentials expire mid-session; secrets in logs |
| Retries | Transient errors become permanent failures |
| Rate limits | A burst of tool calls triggers 429s and the whole session fails |
| Idempotency | A retry creates a duplicate transaction or record |
| Pagination | The tool returns page 1 and the model never knows there are 47 more pages |
Each of these is a one-time implementation that prevents an entire class of production incident.
Layer 2: Guided
Authentication patterns
API key in header (most common for tool integrations):
import httpx
import os
class APIClient:
def __init__(self):
self.api_key = os.environ["SERVICE_API_KEY"] # Never hardcode
self.base_url = "https://api.example.com/v1"
self.client = httpx.Client(
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=30.0,
)
def get(self, path: str, **params) -> dict:
response = self.client.get(f"{self.base_url}{path}", params=params)
response.raise_for_status()
return response.json()
OAuth 2.0 client credentials (service-to-service):
import time
class OAuthClient:
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: str | None = None
self._token_expiry: float = 0
def _get_token(self) -> str:
if self._token and time.time() < self._token_expiry - 60:
return self._token # Reuse if not within 60s of expiry
response = httpx.post(self.token_url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
})
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._token_expiry = time.time() + data["expires_in"]
return self._token
def get(self, url: str) -> dict:
headers = {"Authorization": f"Bearer {self._get_token()}"}
response = httpx.get(url, headers=headers)
response.raise_for_status()
return response.json()
Never fetch a new token on every request: that’s N extra round-trips per tool call and will hit auth rate limits. Cache with expiry.
Retry with exponential backoff and jitter
import time
import random
import httpx
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}
def request_with_retry(
client: httpx.Client,
method: str,
url: str,
max_attempts: int = 4,
base_delay: float = 1.0,
**kwargs,
) -> httpx.Response:
for attempt in range(max_attempts):
try:
response = client.request(method, url, **kwargs)
if response.status_code == 429:
# Retry-After may be seconds (numeric) or an HTTP-date string
retry_after_header = response.headers.get("Retry-After")
if retry_after_header:
try:
retry_after = float(retry_after_header)
except ValueError:
# HTTP-date format: "Wed, 21 Oct 2026 07:28:00 GMT"
from email.utils import parsedate_to_datetime
from datetime import timezone, datetime as dt
retry_dt = parsedate_to_datetime(retry_after_header)
retry_after = max(0.0, (retry_dt - dt.now(timezone.utc)).total_seconds())
else:
retry_after = base_delay * (2 ** attempt)
time.sleep(retry_after)
continue
if response.status_code in RETRYABLE_STATUS_CODES and attempt < max_attempts - 1:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1.0) # jitter
time.sleep(delay)
continue
return response
except (httpx.ConnectError, httpx.TimeoutException) as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1.0)
time.sleep(delay)
raise RuntimeError(f"All {max_attempts} attempts failed for {method} {url}")
What to retry: 429 (rate limited), 5xx (server error), network errors. What not to retry: 400 (bad request), 401 (auth), 403 (forbidden), 404 (not found). These will never succeed without fixing the request.
Idempotency keys
import uuid
def create_order(
customer_id: str,
items: list[dict],
idempotency_key: str | None = None,
) -> dict:
"""Create a customer order. Pass idempotency_key to safely retry a specific attempt."""
# Generate a random key once per user-intended operation and reuse it across retries.
# Do NOT derive the key from the payload — a customer placing the same order twice
# is two distinct operations and both should be processed.
key = idempotency_key or str(uuid.uuid4())
response = request_with_retry(
client,
"POST",
"/orders",
json={"customer_id": customer_id, "items": items},
headers={"Idempotency-Key": key},
)
return response.json()
Generate a random key (uuid4) once when the user initiates the operation and pass it through all retry attempts. The same key scopes deduplication to a single intended operation: the server returns the original result if it has already processed a request with this key. Never derive the key from the payload alone: two customers placing identical orders, or one customer legitimately reordering the same items, would be wrongly collapsed into one.
Not all APIs support idempotency keys. For those that don’t, either avoid retrying mutations or implement deduplication on your side by caching the operation result.
Pagination
def list_all_orders(customer_id: str, page_size: int = 50) -> list[dict]:
"""Fetch all orders across all pages."""
orders = []
cursor = None
while True:
params = {"customer_id": customer_id, "limit": page_size}
if cursor:
params["cursor"] = cursor
response = client.get("/orders", params=params)
data = response.json()
orders.extend(data["items"])
# Cursor-based pagination: stop when next_cursor is absent or null
cursor = data.get("next_cursor")
if not cursor:
break
return orders
Be careful with pagination in tools: if a customer has 10,000 orders, fetching all of them into the model context is wasteful and expensive. Prefer tools that return a bounded result set with a “more available” signal:
def list_customer_orders(customer_id: str, limit: int = 20, cursor: str | None = None) -> dict:
"""
List customer orders. Returns up to `limit` orders and a cursor for the next page.
If `has_more` is true, call again with the returned `next_cursor` for more results.
"""
response = client.get("/orders", params={
"customer_id": customer_id, "limit": limit, "cursor": cursor
})
data = response.json()
return {
"orders": data["items"],
"has_more": bool(data.get("next_cursor")),
"next_cursor": data.get("next_cursor"),
}
Return has_more and next_cursor so the model can decide whether to request the next page based on what it has found so far.
Receiving webhooks
Webhooks deliver events asynchronously: the API calls you, rather than you calling the API. Common pattern: register a webhook endpoint, verify the payload, enqueue for processing.
import hashlib
import hmac
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]
@app.post("/webhooks/orders")
async def receive_order_webhook(request: Request):
body = await request.body()
# Verify the payload signature — reject unauthenticated webhooks
signature = request.headers.get("X-Signature-SHA256", "")
expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(f"sha256={expected}", signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = await request.json()
# Enqueue for async processing — respond fast (200) so the API doesn't retry
await event_queue.put(event)
return {"status": "accepted"}
Return 200 immediately: a slow webhook response causes the API to retry, creating duplicate events. Do the work asynchronously.
Layer 3: Deep Dive
Backoff strategies compared
| Strategy | Formula | When to use |
|---|---|---|
| Fixed | Always wait d seconds | Simple; okay for low-concurrency |
| Exponential | Wait d × 2^n seconds | Standard for most APIs |
| Exponential + jitter | Wait d × 2^n + rand(0, d) | Prevents thundering herd when many clients retry simultaneously |
| Decorrelated jitter | Wait rand(d, prev_wait × 3) | Best distribution for high-concurrency clients |
For tool integrations where many parallel sessions may hit the same API, always add jitter.
Circuit breaker implementation
from enum import Enum
import threading
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject calls immediately
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time: float = 0
self._lock = threading.Lock()
def call(self, fn, *args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise RuntimeError("Circuit open — service unavailable")
try:
result = fn(*args, **kwargs)
with self._lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
return result
except Exception as e:
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
The circuit breaker prevents the agentic loop from hammering a broken downstream service: after failure_threshold consecutive failures, it rejects calls immediately until the service has had time to recover.
Webhook security patterns
| Pattern | What it prevents |
|---|---|
| HMAC signature verification | Forged webhook payloads from attackers |
| Timestamp in signature | Replay attacks using old valid payloads |
| Idempotency on event ID | Duplicate processing from API retries |
| Allowlisted source IPs | Webhooks from unexpected origins |
Always implement at minimum HMAC verification and event ID deduplication. Timestamp validation (reject events older than 5 minutes) prevents replay attacks.
API versioning compatibility
APIs evolve. Protect your tool integrations from breaking changes:
class VersionedAPIClient:
API_VERSION = "2026-01" # Pin to a specific API version
def get(self, path: str, **params) -> dict:
response = self.client.get(
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {self.api_key}",
"API-Version": self.API_VERSION, # Explicit version header
},
params=params,
)
response.raise_for_status()
return response.json()
Pin API versions explicitly. Subscribe to the provider’s deprecation notices and test upgrades in staging before updating API_VERSION in production.
Further reading
- AWS, Exponential Backoff and Jitter, The definitive analysis of backoff strategies under concurrent load, with simulation data.
- Stripe, Idempotent Requests, Clear example of how idempotency keys work in a production payment API.
- Webhook.site; Free tool for testing and debugging webhook payloads during development.