🤖 AI Explained
6 min read

Real API Integration

Wrapping a real API as a tool means handling all the things the happy path ignores: auth token expiry, rate limits, flaky networks, non-idempotent operations, and paginated results. This module covers the mechanics of building tool integrations that survive production.

Layer 1: Surface

A tool that calls a real API has to handle what real APIs do: require authentication, enforce rate limits, sometimes fail, return more results than fit in one response, and occasionally deliver results asynchronously via webhooks.

The five integration mechanics that catch teams by surprise:

MechanicWhat goes wrong without it
AuthCredentials expire mid-session; secrets in logs
RetriesTransient errors become permanent failures
Rate limitsA burst of tool calls triggers 429s and the whole session fails
IdempotencyA retry creates a duplicate transaction or record
PaginationThe tool returns page 1 and the model never knows there are 47 more pages

Each of these is a one-time implementation that prevents an entire class of production incident.


Layer 2: Guided

Authentication patterns

API key in header (most common for tool integrations):

import httpx
import os

class APIClient:
    def __init__(self):
        self.api_key = os.environ["SERVICE_API_KEY"]  # Never hardcode
        self.base_url = "https://api.example.com/v1"
        self.client = httpx.Client(
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=30.0,
        )

    def get(self, path: str, **params) -> dict:
        response = self.client.get(f"{self.base_url}{path}", params=params)
        response.raise_for_status()
        return response.json()

OAuth 2.0 client credentials (service-to-service):

import time

class OAuthClient:
    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._token: str | None = None
        self._token_expiry: float = 0

    def _get_token(self) -> str:
        if self._token and time.time() < self._token_expiry - 60:
            return self._token  # Reuse if not within 60s of expiry
        response = httpx.post(self.token_url, data={
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        })
        response.raise_for_status()
        data = response.json()
        self._token = data["access_token"]
        self._token_expiry = time.time() + data["expires_in"]
        return self._token

    def get(self, url: str) -> dict:
        headers = {"Authorization": f"Bearer {self._get_token()}"}
        response = httpx.get(url, headers=headers)
        response.raise_for_status()
        return response.json()

Never fetch a new token on every request: that’s N extra round-trips per tool call and will hit auth rate limits. Cache with expiry.

Retry with exponential backoff and jitter

import time
import random
import httpx

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

def request_with_retry(
    client: httpx.Client,
    method: str,
    url: str,
    max_attempts: int = 4,
    base_delay: float = 1.0,
    **kwargs,
) -> httpx.Response:
    for attempt in range(max_attempts):
        try:
            response = client.request(method, url, **kwargs)

            if response.status_code == 429:
                # Retry-After may be seconds (numeric) or an HTTP-date string
                retry_after_header = response.headers.get("Retry-After")
                if retry_after_header:
                    try:
                        retry_after = float(retry_after_header)
                    except ValueError:
                        # HTTP-date format: "Wed, 21 Oct 2026 07:28:00 GMT"
                        from email.utils import parsedate_to_datetime
                        from datetime import timezone, datetime as dt
                        retry_dt = parsedate_to_datetime(retry_after_header)
                        retry_after = max(0.0, (retry_dt - dt.now(timezone.utc)).total_seconds())
                else:
                    retry_after = base_delay * (2 ** attempt)
                time.sleep(retry_after)
                continue

            if response.status_code in RETRYABLE_STATUS_CODES and attempt < max_attempts - 1:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1.0)  # jitter
                time.sleep(delay)
                continue

            return response

        except (httpx.ConnectError, httpx.TimeoutException) as e:
            if attempt == max_attempts - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1.0)
            time.sleep(delay)

    raise RuntimeError(f"All {max_attempts} attempts failed for {method} {url}")

What to retry: 429 (rate limited), 5xx (server error), network errors. What not to retry: 400 (bad request), 401 (auth), 403 (forbidden), 404 (not found). These will never succeed without fixing the request.

Idempotency keys

import uuid

def create_order(
    customer_id: str,
    items: list[dict],
    idempotency_key: str | None = None,
) -> dict:
    """Create a customer order. Pass idempotency_key to safely retry a specific attempt."""
    # Generate a random key once per user-intended operation and reuse it across retries.
    # Do NOT derive the key from the payload — a customer placing the same order twice
    # is two distinct operations and both should be processed.
    key = idempotency_key or str(uuid.uuid4())

    response = request_with_retry(
        client,
        "POST",
        "/orders",
        json={"customer_id": customer_id, "items": items},
        headers={"Idempotency-Key": key},
    )
    return response.json()

Generate a random key (uuid4) once when the user initiates the operation and pass it through all retry attempts. The same key scopes deduplication to a single intended operation: the server returns the original result if it has already processed a request with this key. Never derive the key from the payload alone: two customers placing identical orders, or one customer legitimately reordering the same items, would be wrongly collapsed into one.

Not all APIs support idempotency keys. For those that don’t, either avoid retrying mutations or implement deduplication on your side by caching the operation result.

Pagination

def list_all_orders(customer_id: str, page_size: int = 50) -> list[dict]:
    """Fetch all orders across all pages."""
    orders = []
    cursor = None

    while True:
        params = {"customer_id": customer_id, "limit": page_size}
        if cursor:
            params["cursor"] = cursor

        response = client.get("/orders", params=params)
        data = response.json()

        orders.extend(data["items"])

        # Cursor-based pagination: stop when next_cursor is absent or null
        cursor = data.get("next_cursor")
        if not cursor:
            break

    return orders

Be careful with pagination in tools: if a customer has 10,000 orders, fetching all of them into the model context is wasteful and expensive. Prefer tools that return a bounded result set with a “more available” signal:

def list_customer_orders(customer_id: str, limit: int = 20, cursor: str | None = None) -> dict:
    """
    List customer orders. Returns up to `limit` orders and a cursor for the next page.
    If `has_more` is true, call again with the returned `next_cursor` for more results.
    """
    response = client.get("/orders", params={
        "customer_id": customer_id, "limit": limit, "cursor": cursor
    })
    data = response.json()
    return {
        "orders": data["items"],
        "has_more": bool(data.get("next_cursor")),
        "next_cursor": data.get("next_cursor"),
    }

Return has_more and next_cursor so the model can decide whether to request the next page based on what it has found so far.

Receiving webhooks

Webhooks deliver events asynchronously: the API calls you, rather than you calling the API. Common pattern: register a webhook endpoint, verify the payload, enqueue for processing.

import hashlib
import hmac
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]

@app.post("/webhooks/orders")
async def receive_order_webhook(request: Request):
    body = await request.body()

    # Verify the payload signature — reject unauthenticated webhooks
    signature = request.headers.get("X-Signature-SHA256", "")
    expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(f"sha256={expected}", signature):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = await request.json()

    # Enqueue for async processing — respond fast (200) so the API doesn't retry
    await event_queue.put(event)
    return {"status": "accepted"}

Return 200 immediately: a slow webhook response causes the API to retry, creating duplicate events. Do the work asynchronously.


Layer 3: Deep Dive

Backoff strategies compared

StrategyFormulaWhen to use
FixedAlways wait d secondsSimple; okay for low-concurrency
ExponentialWait d × 2^n secondsStandard for most APIs
Exponential + jitterWait d × 2^n + rand(0, d)Prevents thundering herd when many clients retry simultaneously
Decorrelated jitterWait rand(d, prev_wait × 3)Best distribution for high-concurrency clients

For tool integrations where many parallel sessions may hit the same API, always add jitter.

Circuit breaker implementation

from enum import Enum
import threading

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing — reject calls immediately
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time: float = 0
        self._lock = threading.Lock()

    def call(self, fn, *args, **kwargs):
        with self._lock:
            if self.state == CircuitState.OPEN:
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise RuntimeError("Circuit open — service unavailable")

        try:
            result = fn(*args, **kwargs)
            with self._lock:
                self.failure_count = 0
                self.state = CircuitState.CLOSED
            return result
        except Exception as e:
            with self._lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = CircuitState.OPEN
            raise

The circuit breaker prevents the agentic loop from hammering a broken downstream service: after failure_threshold consecutive failures, it rejects calls immediately until the service has had time to recover.

Webhook security patterns

PatternWhat it prevents
HMAC signature verificationForged webhook payloads from attackers
Timestamp in signatureReplay attacks using old valid payloads
Idempotency on event IDDuplicate processing from API retries
Allowlisted source IPsWebhooks from unexpected origins

Always implement at minimum HMAC verification and event ID deduplication. Timestamp validation (reject events older than 5 minutes) prevents replay attacks.

API versioning compatibility

APIs evolve. Protect your tool integrations from breaking changes:

class VersionedAPIClient:
    API_VERSION = "2026-01"  # Pin to a specific API version

    def get(self, path: str, **params) -> dict:
        response = self.client.get(
            f"{self.base_url}{path}",
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "API-Version": self.API_VERSION,  # Explicit version header
            },
            params=params,
        )
        response.raise_for_status()
        return response.json()

Pin API versions explicitly. Subscribe to the provider’s deprecation notices and test upgrades in staging before updating API_VERSION in production.

Further reading

✏ Suggest an edit on GitHub

Real API Integration: Check your understanding

Q1

Your tool calls an external API to create a payment record. The call times out at the network layer, so you retry it. The payment was actually processed on the first attempt. What production failure does this cause, and what prevents it?

Q2

An API returns HTTP 429 with a Retry-After: 30 header. What is the correct handling?

Q3

A tool lists customer orders and returns all results in one call. In production, a customer has 8,000 orders. What problem does this create, and what is the better design?

Q4

You need to cache OAuth 2.0 access tokens to avoid fetching a new one on every tool call. What is the correct expiry strategy?

Q5

Your application receives a webhook event and processes it synchronously in the request handler, which takes 8 seconds. The webhook provider retries any request not acknowledged within 5 seconds. What failure mode does this create?