Layer 1: Surface
When an LLM system fails in production, the first question is always: what did it receive, and what did it return? Without structured logging, answering this question means manual reconstruction from fragments. With structured logging, it means querying a trace store.
An LLM trace is a structured record of a single requestβs lifecycle: the prompt that went in, the model that handled it, the tokens consumed, the latency at each step, the tool calls made, and the final output. A trace is composed of spans: one for the overall request, one for each LLM call, one for each tool call, one for each retrieval operation.
Good tracing makes the difference between debugging in minutes and debugging in days. It also enables continuous quality measurement: if every response is logged with a correlation ID, you can later route a sample of responses to an LLM judge and get continuous quality metrics without blocking any real requests.
Why it matters
LLM systems have more moving parts than traditional APIs: model calls, retrieval operations, tool executions, prompt assembly, response parsing. Each step can fail in ways that produce a valid-looking output with wrong content. Without traces, these failures are invisible until a user complains.
Production Gotcha
Common Gotcha: Logging full prompts and responses is invaluable for debugging but creates a PII liability if user data appears in prompts. Establish a scrubbing policy before logging goes live: retrofitting PII scrubbing onto existing logs is expensive and error-prone, and a prompt that contains a user's name, email, or health data is a data breach waiting to be discovered.
Define your scrubbing policy in the design phase. Build a scrubbing step into the logging pipeline before the first line of trace data is written.
Layer 2: Guided
The span model
A trace is a tree of spans. The root span represents the entire request; child spans represent sub-operations.
import time
import uuid
import json
from dataclasses import dataclass, field
from typing import Any
@dataclass
class Span:
span_id: str
trace_id: str # Links all spans in one request
parent_span_id: str | None
operation: str # "llm_call", "tool_call", "retrieval", "request"
start_time: float # Unix timestamp (seconds since epoch, float precision)
end_time: float | None = None
attributes: dict[str, Any] = field(default_factory=dict)
error: str | None = None
@property
def duration_ms(self) -> float | None:
if self.end_time is None:
return None
return (self.end_time - self.start_time) * 1000
def finish(self, error: str | None = None) -> None:
self.end_time = time.time()
if error:
self.error = error
def to_dict(self) -> dict:
return {
"span_id": self.span_id,
"trace_id": self.trace_id,
"parent_span_id": self.parent_span_id,
"operation": self.operation,
"start_time": self.start_time,
"end_time": self.end_time,
"duration_ms": self.duration_ms,
"attributes": self.attributes,
"error": self.error,
}
class Tracer:
def __init__(self, trace_id: str | None = None):
self.trace_id = trace_id or str(uuid.uuid4())
self.spans: list[Span] = []
def start_span(
self,
operation: str,
parent: Span | None = None,
attributes: dict | None = None,
) -> Span:
span = Span(
span_id=str(uuid.uuid4()),
trace_id=self.trace_id,
parent_span_id=parent.span_id if parent else None,
operation=operation,
start_time=time.time(),
attributes=attributes or {},
)
self.spans.append(span)
return span
def emit(self, sink) -> None:
"""Write all completed spans to the sink (file, OTLP exporter, etc.)."""
for span in self.spans:
sink.write(json.dumps(span.to_dict()) + "\n")
What to log at minimum
Every production request must log at a minimum:
@dataclass
class RequestLog:
# Identity
request_id: str # Unique per request
trace_id: str # May span multiple services
session_id: str | None # User session, if applicable
# Model
model_id: str # Exact model version string
feature_id: str # Which product feature made this call
# Tokens
input_tokens: int
output_tokens: int
total_tokens: int
# Performance
latency_ms: float
time_to_first_token_ms: float | None # For streaming responses
# Outcome
finish_reason: str # "end_turn", "max_tokens", "error", etc.
error: str | None
# Derived cost (compute at log time)
estimated_cost_usd: float
def build_request_log(
request_id: str,
model_id: str,
feature_id: str,
response, # Model response object
latency_ms: float,
) -> RequestLog:
usage = response.usage
cost = estimate_cost(model_id, usage.input_tokens, usage.output_tokens)
return RequestLog(
request_id=request_id,
trace_id=request_id, # Simple case; use distributed trace ID in multi-service systems
session_id=None,
model_id=model_id,
feature_id=feature_id,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
total_tokens=usage.input_tokens + usage.output_tokens,
latency_ms=latency_ms,
time_to_first_token_ms=None,
finish_reason=response.stop_reason,
error=None,
estimated_cost_usd=cost,
)
COST_PER_MILLION_TOKENS = {
# Example pricing β always verify with provider documentation
"fast": {"input": 0.25, "output": 1.25},
"balanced": {"input": 3.00, "output": 15.00},
"frontier": {"input": 15.00, "output": 75.00},
}
def estimate_cost(model_id: str, input_tokens: int, output_tokens: int) -> float:
prices = COST_PER_MILLION_TOKENS.get(model_id, {"input": 1.0, "output": 5.0})
return (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000
What to log for debugging
When debugging is needed, richer logs are invaluable, but they carry a PII obligation:
import re
import hashlib
class PIIScrubber:
PATTERNS = [
(re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), '[EMAIL]'),
(re.compile(r'\b(\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'), '[PHONE]'),
(re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), '[SSN]'),
(re.compile(r'\b(?:\d[ -]?){13,16}\b'), '[CC_NUMBER]'),
]
def scrub(self, text: str) -> str:
for pattern, replacement in self.PATTERNS:
text = pattern.sub(replacement, text)
return text
_scrubber = PIIScrubber()
@dataclass
class DebugLog:
request_id: str
prompt_hash: str # Hash of the full prompt β for dedup without storing
prompt_scrubbed: str # Prompt after PII scrubbing
response_scrubbed: str # Response after PII scrubbing
tool_calls: list[dict] # Tool call arguments (scrubbed)
retrieval_chunks: list[str] # Retrieved context (scrubbed)
def build_debug_log(
request_id: str,
full_prompt: str,
response_text: str,
tool_calls: list[dict],
retrieval_chunks: list[str],
) -> DebugLog:
scrubbed_prompt = _scrubber.scrub(full_prompt)
scrubbed_response = _scrubber.scrub(response_text)
prompt_hash = hashlib.sha256(full_prompt.encode()).hexdigest()[:16]
return DebugLog(
request_id=request_id,
prompt_hash=prompt_hash,
prompt_scrubbed=scrubbed_prompt,
response_scrubbed=scrubbed_response,
tool_calls=[
{k: _scrubber.scrub(str(v)) for k, v in call.items()}
for call in tool_calls
],
retrieval_chunks=[_scrubber.scrub(c) for c in retrieval_chunks],
)
Structured logging format
All logs should be JSON lines: one JSON object per line, with consistent field names:
import json
import sys
from datetime import datetime, timezone
def emit_log(level: str, event: str, data: dict) -> None:
"""Write a single structured log line to stdout."""
record = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": level,
"event": event,
**data,
}
# Write to stdout; your log aggregation system (Datadog, CloudWatch, etc.)
# picks this up and indexes the JSON fields
sys.stdout.write(json.dumps(record) + "\n")
sys.stdout.flush()
# Usage
emit_log("info", "llm_request_complete", {
"request_id": "req-abc123",
"model_id": "balanced",
"feature_id": "support_chat",
"input_tokens": 512,
"output_tokens": 128,
"latency_ms": 847.3,
"finish_reason": "end_turn",
"estimated_cost_usd": 0.00353,
})
Correlation IDs across services
In a multi-service architecture, a single user request may touch multiple services. Correlation IDs let you reconstruct the full trace:
def propagate_trace_context(
outgoing_headers: dict,
trace_id: str,
parent_span_id: str,
) -> dict:
"""
Add OpenTelemetry-compatible trace context headers to an outgoing HTTP request.
The receiving service reads these and creates child spans under the same trace.
"""
outgoing_headers["traceparent"] = f"00-{trace_id}-{parent_span_id}-01"
return outgoing_headers
def extract_trace_context(incoming_headers: dict) -> tuple[str, str | None]:
"""
Extract trace context from an incoming HTTP request.
Returns (trace_id, parent_span_id).
"""
traceparent = incoming_headers.get("traceparent", "")
parts = traceparent.split("-")
if len(parts) == 4:
return parts[1], parts[2]
return str(uuid.uuid4()), None # New trace if no context
Layer 3: Deep Dive
OpenTelemetry as the vendor-neutral standard
OpenTelemetry (OTel) is the CNCF-standard framework for distributed tracing, metrics, and logging. Its key concepts map directly to LLM tracing:
| OTel concept | LLM tracing equivalent |
|---|---|
| Trace | Full request lifecycle (user query β final response) |
| Span | Individual operation (one LLM call, one tool call) |
| Attribute | Span metadata (model_id, token counts, feature_id) |
| Exporter | Sink that forwards spans to a backend (Jaeger, Grafana Tempo, Langfuse) |
| Propagator | Mechanism for passing trace context across service boundaries |
The OTel semantic conventions for LLM systems (under the gen_ai namespace) define standard attribute names so that any OTel-compatible tool can index and query LLM traces without custom configuration.
The cost of over-logging and under-logging
Both extremes create problems:
| Over-logging | Under-logging | |
|---|---|---|
| Storage cost | High: full prompts can be kilobytes each; at scale, terabytes per month | Low |
| PII liability | High: prompts often contain user data | Low |
| Debugging capability | High | Low: βwhat did it receive?β canβt be answered |
| Latency impact | Low-medium (async writes) | None |
The practical balance: log request metadata (tokens, cost, latency, model, feature) always; log full prompt/response only for a configurable sample (typically 1β10%) with PII scrubbing applied. Keep sampled debug logs for a shorter retention window (7β30 days) than metric logs (90+ days).
import random
def should_log_debug(request_id: str, sample_rate: float = 0.05) -> bool:
"""
Deterministic sampling β same request always gets the same decision.
Uses the request_id to avoid needing to store sampling state.
"""
# Hash the request_id to a value in [0, 1)
digest = int(hashlib.sha256(request_id.encode()).hexdigest(), 16)
normalized = (digest % 1_000_000) / 1_000_000.0
return normalized < sample_rate
Trace storage and query patterns
Traces are typically stored in two places with different access patterns:
Time-series / metrics store (Prometheus, InfluxDB, CloudWatch Metrics): aggregated metrics derived from traces: p50/p95/p99 latency, error rates, cost per feature per hour. Fast to query; no raw data.
Trace store (Jaeger, Grafana Tempo, Langfuse): raw span trees, queryable by trace_id, model_id, feature_id, time range. Slower to query; full context for debugging.
The most useful query patterns for LLM systems:
# Find all traces where a specific model exceeded 5000ms latency
WHERE model_id = "frontier" AND latency_ms > 5000 ORDER BY latency_ms DESC LIMIT 20
# Find all traces where output_tokens exceeded the budget
WHERE output_tokens > 1000 AND feature_id = "report_generator"
# Find error traces for a specific feature in the last hour
WHERE error IS NOT NULL AND feature_id = "support_chat" AND ts > NOW() - 1h
Further reading
- OpenTelemetry Semantic Conventions for Generative AI; OpenTelemetry, 2024. The canonical attribute names for LLM tracing; use these to ensure compatibility with any OTel backend.
- Distributed Systems Observability; Majors, 2018. Foundational text on observability; the three pillars (logs, metrics, traces) apply directly to LLM systems.
- Langfuse, Open Source LLM Engineering Platform, Langfuse, 2024. An open-source tracing and eval platform with first-class LLM support; useful as a reference implementation of the trace model described in this module.