Layer 1: Surface
A production tool-using system is a distributed system: your application, the LLM API, and N downstream tool dependencies all need to be up for a session to succeed. Each dependency is a potential failure point.
The four operational concerns that matter most:
| Concern | Symptom without it | Fix |
|---|---|---|
| Observability | Canβt tell whether failures are in the model, tools, or application logic | Log every tool call with outcome, latency, and cost |
| Timeouts | One slow tool blocks every session | Per-tool timeout with structured error return |
| Circuit breaker | Broken downstream hammered by retries, making recovery slower | Stop calling a failing dependency until it recovers |
| Graceful degradation | A broken tool causes the whole session to fail | Return a useful fallback when a non-critical tool is unavailable |
Layer 2: Guided
Logging tool call lifecycle
Every tool call should emit a structured log entry covering the full lifecycle:
import time
import json
import logging
logger = logging.getLogger("tool_calls")
def execute_and_log(
tool_name: str,
arguments: dict,
user_id: str,
session_id: str,
) -> str:
start = time.monotonic()
success = False
result_size = 0
try:
result = TOOL_REGISTRY[tool_name](**arguments)
result_str = json.dumps(result) if isinstance(result, dict) else str(result)
success = True
result_size = len(result_str)
return result_str
except Exception as e:
result_str = f"Error: {e}"
return result_str
finally:
duration_ms = (time.monotonic() - start) * 1000
logger.info("tool_call", extra={
"tool": tool_name,
"session_id": session_id,
"user_id": user_id,
"success": success,
"duration_ms": round(duration_ms, 1),
"result_bytes": result_size,
# Don't log argument values that contain PII β log argument keys only
"argument_keys": list(arguments.keys()),
})
Key fields to capture:
tool: which tool was calledsuccess: did it return a non-error resultduration_ms: how long it took (p50/p95 per tool)session_id: links all tool calls in one conversationresult_bytes: tracks whether tools are returning unexpectedly large responses
Per-session cost tracking
from dataclasses import dataclass, field
@dataclass
class SessionCostTracker:
llm_input_tokens: int = 0
llm_output_tokens: int = 0
tool_calls: int = 0
tool_errors: int = 0
def record_llm_usage(self, usage):
self.llm_input_tokens += usage.input_tokens
self.llm_output_tokens += usage.output_tokens
def record_tool_call(self, success: bool):
self.tool_calls += 1
if not success:
self.tool_errors += 1
def estimated_cost(
self,
input_price_per_mtok: float = 3.0, # adjust for your model
output_price_per_mtok: float = 15.0,
) -> float:
llm_cost = (
self.llm_input_tokens / 1_000_000 * input_price_per_mtok +
self.llm_output_tokens / 1_000_000 * output_price_per_mtok
)
return llm_cost # tool costs added separately if applicable
def run_session(user_message: str) -> str:
tracker = SessionCostTracker()
messages = [{"role": "user", "content": user_message}]
for _ in range(8):
response = llm.chat(model="balanced", messages=messages, tools=TOOLS)
tracker.record_llm_usage(response.usage)
if response.stop_reason == "end_turn":
log_session_cost(tracker)
return response.text
# ... tool execution ...
return "Task incomplete."
Circuit breaker per tool
from collections import deque
class ToolCircuitBreaker:
"""Per-tool circuit breaker that opens after consecutive failures."""
def __init__(self, failure_threshold: int = 3, reset_after: float = 60.0):
self.failure_threshold = failure_threshold
self.reset_after = reset_after
self._circuits: dict[str, dict] = {}
def _state(self, tool: str) -> dict:
if tool not in self._circuits:
self._circuits[tool] = {"failures": 0, "opened_at": None}
return self._circuits[tool]
def is_open(self, tool: str) -> bool:
s = self._state(tool)
if s["opened_at"] is None:
return False
if time.time() - s["opened_at"] > self.reset_after:
s["opened_at"] = None # Allow one probe request
return False
return True
def record_success(self, tool: str):
s = self._state(tool)
s["failures"] = 0
s["opened_at"] = None
def record_failure(self, tool: str):
s = self._state(tool)
s["failures"] += 1
if s["failures"] >= self.failure_threshold:
s["opened_at"] = time.time()
logger.warning(f"Circuit opened for tool: {tool}")
circuit_breaker = ToolCircuitBreaker()
def execute_with_circuit(tool_name: str, arguments: dict) -> str:
if circuit_breaker.is_open(tool_name):
return f"Error: {tool_name} is temporarily unavailable β circuit open"
try:
result = TOOL_REGISTRY[tool_name](**arguments)
circuit_breaker.record_success(tool_name)
return str(result)
except Exception as e:
circuit_breaker.record_failure(tool_name)
raise
Graceful degradation
Not all tools are equally critical. Design fallback behavior for non-critical tools:
TOOL_CRITICALITY = {
"search_knowledge_base": "critical", # Task fails without this
"get_user_preferences": "optional", # Nice to have, task proceeds without
"fetch_related_content": "optional", # Enrichment only
}
def execute_with_degradation(tool_name: str, arguments: dict) -> str:
try:
return execute_with_circuit(tool_name, arguments)
except Exception as e:
if TOOL_CRITICALITY.get(tool_name) == "optional":
logger.warning(f"Optional tool {tool_name} failed: {e}")
return f"Note: {tool_name} is currently unavailable. Proceeding without it."
raise # Critical tool failure propagates up
When an optional tool returns a degradation message, the model understands it should proceed without that data: youβve given it a path forward.
Alerting thresholds
# Metrics to alert on β adjust thresholds for your workload
ALERT_RULES = [
# Tool health
{"metric": "tool_error_rate", "threshold": 0.10, "window": "5m", "severity": "warning"},
{"metric": "tool_error_rate", "threshold": 0.25, "window": "5m", "severity": "critical"},
{"metric": "tool_p95_latency_ms", "threshold": 5000, "window": "5m", "severity": "warning"},
# Session cost
{"metric": "session_cost_p95_usd", "threshold": 0.50, "window": "1h", "severity": "warning"},
{"metric": "daily_cost_usd", "threshold": 500, "window": "1d", "severity": "critical"},
# Model
{"metric": "llm_error_rate", "threshold": 0.05, "window": "5m", "severity": "warning"},
{"metric": "session_iterations_p95", "threshold": 7, "window": "1h", "severity": "warning"},
]
High session_iterations_p95 is an early warning for loop-like behavior before users explicitly report it.
Layer 3: Deep Dive
Distributed tracing for tool chains
When a session calls 4 tools across 3 services, a single trace ID links them together:
import uuid
class TraceContext:
def __init__(self):
self.trace_id = str(uuid.uuid4())
self.spans: list[dict] = []
def span(self, name: str):
return TraceSpan(self, name)
class TraceSpan:
def __init__(self, ctx: TraceContext, name: str):
self.ctx = ctx
self.name = name
self.span_id = str(uuid.uuid4())
self.start = time.monotonic()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration_ms = (time.monotonic() - self.start) * 1000
self.ctx.spans.append({
"trace_id": self.ctx.trace_id,
"span_id": self.span_id,
"name": self.name,
"duration_ms": duration_ms,
"error": exc_type is not None,
})
Pass X-Trace-ID headers to downstream API calls so the full tool chain is visible in your observability platform.
SLO definition for tool-using systems
| SLO | Target | Measurement |
|---|---|---|
| Session completion rate | 95% | Sessions reaching a final answer vs hitting max_iterations or error |
| Tool availability | 99.5% per tool | Tool calls succeeding / total tool calls |
| p95 session latency | under 15s | Time from first user message to final response |
| Cost per session p95 | under $0.10 | LLM tokens + tool costs |
Track per-tool availability separately: when SLO breaches, you need to know which tool caused it.
On-call runbook template
For each tool integration, document:
## Tool: search_knowledge_base
**Dependency:** Internal search API at search.internal:8080
**Criticality:** Critical β sessions fail without this tool
### On-call checks
1. Check tool error rate dashboard: [link]
2. Check circuit breaker state: `GET /admin/circuits`
3. Check search service health: `curl search.internal:8080/health`
### Incident playbook
| Symptom | Likely cause | Action |
|---|---|---|
| Error rate > 25% | Search service down | Check #ops-alerts, escalate to search team |
| p95 latency > 5s | Index overloaded | Check query volume, scale read replicas |
| Circuit open | Repeated failures | Check search service logs, wait for auto-recovery (60s) |
### Rollback
To disable this tool without a deploy: `POST /admin/tools/search_knowledge_base/disable`
Runbooks reduce MTTR and prevent each incident from requiring tribal knowledge.
Further reading
- Google SRE Book, Chapter 6: Monitoring Distributed Systems, The four golden signals (latency, traffic, errors, saturation) applied to tool-using systems; relevant beyond Google infrastructure.
- Martin Fowler, Circuit Breaker, Definitive description of the circuit breaker pattern with state diagram.
- OpenTelemetry; Vendor-neutral distributed tracing standard; use to propagate trace IDs across model API calls and tool invocations.