πŸ€– AI Explained
5 min read

Production Operations

A tool-using system has more moving parts than a simple prompt-response loop, and more things that can go wrong. This module covers the observability, cost management, and resilience patterns that keep tool integrations reliable after launch.

Layer 1: Surface

A production tool-using system is a distributed system: your application, the LLM API, and N downstream tool dependencies all need to be up for a session to succeed. Each dependency is a potential failure point.

The four operational concerns that matter most:

ConcernSymptom without itFix
ObservabilityCan’t tell whether failures are in the model, tools, or application logicLog every tool call with outcome, latency, and cost
TimeoutsOne slow tool blocks every sessionPer-tool timeout with structured error return
Circuit breakerBroken downstream hammered by retries, making recovery slowerStop calling a failing dependency until it recovers
Graceful degradationA broken tool causes the whole session to failReturn a useful fallback when a non-critical tool is unavailable

Layer 2: Guided

Logging tool call lifecycle

Every tool call should emit a structured log entry covering the full lifecycle:

import time
import json
import logging

logger = logging.getLogger("tool_calls")

def execute_and_log(
    tool_name: str,
    arguments: dict,
    user_id: str,
    session_id: str,
) -> str:
    start = time.monotonic()
    success = False
    result_size = 0

    try:
        result = TOOL_REGISTRY[tool_name](**arguments)
        result_str = json.dumps(result) if isinstance(result, dict) else str(result)
        success = True
        result_size = len(result_str)
        return result_str

    except Exception as e:
        result_str = f"Error: {e}"
        return result_str

    finally:
        duration_ms = (time.monotonic() - start) * 1000
        logger.info("tool_call", extra={
            "tool": tool_name,
            "session_id": session_id,
            "user_id": user_id,
            "success": success,
            "duration_ms": round(duration_ms, 1),
            "result_bytes": result_size,
            # Don't log argument values that contain PII β€” log argument keys only
            "argument_keys": list(arguments.keys()),
        })

Key fields to capture:

  • tool: which tool was called
  • success: did it return a non-error result
  • duration_ms: how long it took (p50/p95 per tool)
  • session_id: links all tool calls in one conversation
  • result_bytes: tracks whether tools are returning unexpectedly large responses

Per-session cost tracking

from dataclasses import dataclass, field

@dataclass
class SessionCostTracker:
    llm_input_tokens: int = 0
    llm_output_tokens: int = 0
    tool_calls: int = 0
    tool_errors: int = 0

    def record_llm_usage(self, usage):
        self.llm_input_tokens += usage.input_tokens
        self.llm_output_tokens += usage.output_tokens

    def record_tool_call(self, success: bool):
        self.tool_calls += 1
        if not success:
            self.tool_errors += 1

    def estimated_cost(
        self,
        input_price_per_mtok: float = 3.0,   # adjust for your model
        output_price_per_mtok: float = 15.0,
    ) -> float:
        llm_cost = (
            self.llm_input_tokens / 1_000_000 * input_price_per_mtok +
            self.llm_output_tokens / 1_000_000 * output_price_per_mtok
        )
        return llm_cost  # tool costs added separately if applicable

def run_session(user_message: str) -> str:
    tracker = SessionCostTracker()
    messages = [{"role": "user", "content": user_message}]

    for _ in range(8):
        response = llm.chat(model="balanced", messages=messages, tools=TOOLS)
        tracker.record_llm_usage(response.usage)

        if response.stop_reason == "end_turn":
            log_session_cost(tracker)
            return response.text

        # ... tool execution ...

    return "Task incomplete."

Circuit breaker per tool

from collections import deque

class ToolCircuitBreaker:
    """Per-tool circuit breaker that opens after consecutive failures."""

    def __init__(self, failure_threshold: int = 3, reset_after: float = 60.0):
        self.failure_threshold = failure_threshold
        self.reset_after = reset_after
        self._circuits: dict[str, dict] = {}

    def _state(self, tool: str) -> dict:
        if tool not in self._circuits:
            self._circuits[tool] = {"failures": 0, "opened_at": None}
        return self._circuits[tool]

    def is_open(self, tool: str) -> bool:
        s = self._state(tool)
        if s["opened_at"] is None:
            return False
        if time.time() - s["opened_at"] > self.reset_after:
            s["opened_at"] = None  # Allow one probe request
            return False
        return True

    def record_success(self, tool: str):
        s = self._state(tool)
        s["failures"] = 0
        s["opened_at"] = None

    def record_failure(self, tool: str):
        s = self._state(tool)
        s["failures"] += 1
        if s["failures"] >= self.failure_threshold:
            s["opened_at"] = time.time()
            logger.warning(f"Circuit opened for tool: {tool}")

circuit_breaker = ToolCircuitBreaker()

def execute_with_circuit(tool_name: str, arguments: dict) -> str:
    if circuit_breaker.is_open(tool_name):
        return f"Error: {tool_name} is temporarily unavailable β€” circuit open"
    try:
        result = TOOL_REGISTRY[tool_name](**arguments)
        circuit_breaker.record_success(tool_name)
        return str(result)
    except Exception as e:
        circuit_breaker.record_failure(tool_name)
        raise

Graceful degradation

Not all tools are equally critical. Design fallback behavior for non-critical tools:

TOOL_CRITICALITY = {
    "search_knowledge_base": "critical",     # Task fails without this
    "get_user_preferences":  "optional",     # Nice to have, task proceeds without
    "fetch_related_content": "optional",     # Enrichment only
}

def execute_with_degradation(tool_name: str, arguments: dict) -> str:
    try:
        return execute_with_circuit(tool_name, arguments)
    except Exception as e:
        if TOOL_CRITICALITY.get(tool_name) == "optional":
            logger.warning(f"Optional tool {tool_name} failed: {e}")
            return f"Note: {tool_name} is currently unavailable. Proceeding without it."
        raise  # Critical tool failure propagates up

When an optional tool returns a degradation message, the model understands it should proceed without that data: you’ve given it a path forward.

Alerting thresholds

# Metrics to alert on β€” adjust thresholds for your workload
ALERT_RULES = [
    # Tool health
    {"metric": "tool_error_rate",         "threshold": 0.10, "window": "5m",  "severity": "warning"},
    {"metric": "tool_error_rate",         "threshold": 0.25, "window": "5m",  "severity": "critical"},
    {"metric": "tool_p95_latency_ms",     "threshold": 5000, "window": "5m",  "severity": "warning"},

    # Session cost
    {"metric": "session_cost_p95_usd",    "threshold": 0.50, "window": "1h",  "severity": "warning"},
    {"metric": "daily_cost_usd",          "threshold": 500,  "window": "1d",  "severity": "critical"},

    # Model
    {"metric": "llm_error_rate",          "threshold": 0.05, "window": "5m",  "severity": "warning"},
    {"metric": "session_iterations_p95",  "threshold": 7,    "window": "1h",  "severity": "warning"},
]

High session_iterations_p95 is an early warning for loop-like behavior before users explicitly report it.


Layer 3: Deep Dive

Distributed tracing for tool chains

When a session calls 4 tools across 3 services, a single trace ID links them together:

import uuid

class TraceContext:
    def __init__(self):
        self.trace_id = str(uuid.uuid4())
        self.spans: list[dict] = []

    def span(self, name: str):
        return TraceSpan(self, name)

class TraceSpan:
    def __init__(self, ctx: TraceContext, name: str):
        self.ctx = ctx
        self.name = name
        self.span_id = str(uuid.uuid4())
        self.start = time.monotonic()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        duration_ms = (time.monotonic() - self.start) * 1000
        self.ctx.spans.append({
            "trace_id": self.ctx.trace_id,
            "span_id": self.span_id,
            "name": self.name,
            "duration_ms": duration_ms,
            "error": exc_type is not None,
        })

Pass X-Trace-ID headers to downstream API calls so the full tool chain is visible in your observability platform.

SLO definition for tool-using systems

SLOTargetMeasurement
Session completion rate95%Sessions reaching a final answer vs hitting max_iterations or error
Tool availability99.5% per toolTool calls succeeding / total tool calls
p95 session latencyunder 15sTime from first user message to final response
Cost per session p95under $0.10LLM tokens + tool costs

Track per-tool availability separately: when SLO breaches, you need to know which tool caused it.

On-call runbook template

For each tool integration, document:

## Tool: search_knowledge_base

**Dependency:** Internal search API at search.internal:8080
**Criticality:** Critical β€” sessions fail without this tool

### On-call checks
1. Check tool error rate dashboard: [link]
2. Check circuit breaker state: `GET /admin/circuits`
3. Check search service health: `curl search.internal:8080/health`

### Incident playbook
| Symptom | Likely cause | Action |
|---|---|---|
| Error rate > 25% | Search service down | Check #ops-alerts, escalate to search team |
| p95 latency > 5s | Index overloaded | Check query volume, scale read replicas |
| Circuit open | Repeated failures | Check search service logs, wait for auto-recovery (60s) |

### Rollback
To disable this tool without a deploy: `POST /admin/tools/search_knowledge_base/disable`

Runbooks reduce MTTR and prevent each incident from requiring tribal knowledge.

Further reading

✏ Suggest an edit on GitHub

Production Operations: Check your understanding

Q1

Three of your agent's five tools are responding slowly. Sessions are timing out after 30 seconds. You add a 30-second session-level timeout but the problem persists. What is missing?

Q2

Your tool error rate alert fires at 11% (threshold: 10%). You check the logs and find all failures come from a single tool: get_inventory. What is the correct immediate response?

Q3

A session_iterations_p95 metric shows that 95% of sessions complete in 3 iterations or fewer, but the metric has been rising from 3 to 6 over the past week. No users have complained yet. What does this indicate?

Q4

Cost per session has spiked to $0.85 (budget: $0.10). The total session count is unchanged. What is the most likely operational cause?

Q5

An optional enrichment tool (fetch_related_content) starts failing. Sessions continue but users notice responses are less detailed. What is the correct graceful degradation behavior?