Layer 1: Surface
LLM API costs are invisible until the invoice arrives. Traditional software infrastructure costs are predictable, a server costs roughly the same whether you send it one request or a thousand. LLM APIs charge per token: every character in, every character out, at rates that vary by model tier. A single engineering decision, using the frontier model instead of a fast model, or generating verbose outputs instead of concise ones: can multiply your monthly bill by 10x or 100x.
The problem is not that costs are high. The problem is that costs are unattributed. Most teams cannot answer: which product feature is driving the most spend? Which user segment has the highest cost per session? Which prompt change doubled output token consumption last week?
Cost attribution is the practice of tagging every LLM request with enough metadata to answer these questions, and of aggregating that data into actionable dashboards and alerts. It is not complicated to implement, but it must be designed before launch: retrofitting attribution onto an untagged system requires touching every call site.
Why it matters
Without cost attribution, you are flying blind. You will discover expensive features through the invoice, not through instrumentation. By the time you see the bill, the feature is already in production and changing it requires user-visible work.
Production Gotcha
Common Gotcha: Output tokens cost 3–5x more than input tokens at most providers, but most teams instrument input tokens only and are surprised when long-output features dominate the bill. Always track and alert on both independently. A feature that generates 2000-token reports looks cheap on input counts and expensive on the invoice: the mismatch is always output tokens.
Track input tokens and output tokens as separate metrics with separate alerts. The ratio between them is a leading indicator of cost anomalies.
Layer 2: Guided
Per-request cost tracking
Tag every LLM call with the metadata needed for attribution:
from dataclasses import dataclass
from datetime import datetime, timezone
@dataclass
class CostRecord:
request_id: str
timestamp: str
model_id: str
feature_id: str # e.g. "support_chat", "report_generator", "search_summary"
user_segment: str # e.g. "free", "pro", "enterprise"
input_tokens: int
output_tokens: int
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float
# Pricing table — verify with provider documentation; update when pricing changes
TOKEN_PRICES = {
# model_id: (input_price_per_million, output_price_per_million)
"fast": (0.25, 1.25),
"balanced": (3.00, 15.00),
"frontier": (15.00, 75.00),
}
def compute_cost(
model_id: str,
input_tokens: int,
output_tokens: int,
) -> tuple[float, float, float]:
"""Returns (input_cost, output_cost, total_cost) in USD."""
input_price, output_price = TOKEN_PRICES.get(model_id, (1.0, 5.0))
input_cost = input_tokens * input_price / 1_000_000
output_cost = output_tokens * output_price / 1_000_000
return input_cost, output_cost, input_cost + output_cost
def record_request_cost(
request_id: str,
model_id: str,
feature_id: str,
user_segment: str,
input_tokens: int,
output_tokens: int,
) -> CostRecord:
input_cost, output_cost, total_cost = compute_cost(model_id, input_tokens, output_tokens)
record = CostRecord(
request_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
model_id=model_id,
feature_id=feature_id,
user_segment=user_segment,
input_tokens=input_tokens,
output_tokens=output_tokens,
input_cost_usd=input_cost,
output_cost_usd=output_cost,
total_cost_usd=total_cost,
)
cost_sink.write(record) # Write to your cost aggregation store
return record
Cost per feature aggregation
Aggregate cost records to understand which features drive spend:
from collections import defaultdict
@dataclass
class FeatureCostSummary:
feature_id: str
request_count: int
total_input_tokens: int
total_output_tokens: int
total_cost_usd: float
avg_cost_per_request_usd: float
cost_fraction: float # This feature's share of total spend
def aggregate_feature_costs(
records: list[CostRecord],
window_hours: int = 24,
) -> list[FeatureCostSummary]:
feature_data: dict[str, dict] = defaultdict(lambda: {
"request_count": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"total_cost_usd": 0.0,
})
for record in records:
d = feature_data[record.feature_id]
d["request_count"] += 1
d["total_input_tokens"] += record.input_tokens
d["total_output_tokens"] += record.output_tokens
d["total_cost_usd"] += record.total_cost_usd
total_spend = sum(d["total_cost_usd"] for d in feature_data.values())
summaries = []
for feature_id, d in feature_data.items():
summaries.append(FeatureCostSummary(
feature_id=feature_id,
request_count=d["request_count"],
total_input_tokens=d["total_input_tokens"],
total_output_tokens=d["total_output_tokens"],
total_cost_usd=d["total_cost_usd"],
avg_cost_per_request_usd=(
d["total_cost_usd"] / d["request_count"]
if d["request_count"] > 0 else 0.0
),
cost_fraction=d["total_cost_usd"] / total_spend if total_spend > 0 else 0.0,
))
return sorted(summaries, key=lambda s: s.total_cost_usd, reverse=True)
Where tokens hide
Before enforcing budgets, you need to know where your input tokens go. The major contributors:
def audit_prompt_token_sources(
system_prompt: str,
retrieved_context: list[str],
conversation_history: list[dict],
user_message: str,
tokenizer, # Your tokenizer function
) -> dict:
"""
Break down input token consumption by source.
Use this to identify where to cut tokens without affecting quality.
"""
def count(text: str) -> int:
return len(tokenizer.encode(text))
system_tokens = count(system_prompt)
context_tokens = sum(count(chunk) for chunk in retrieved_context)
history_tokens = sum(
count(msg.get("content", ""))
for msg in conversation_history
)
user_tokens = count(user_message)
total = system_tokens + context_tokens + history_tokens + user_tokens
return {
"system_prompt": system_tokens,
"retrieved_context": context_tokens,
"conversation_history": history_tokens,
"user_message": user_tokens,
"total": total,
"breakdown_pct": {
"system_prompt": round(system_tokens / total * 100, 1) if total else 0,
"retrieved_context": round(context_tokens / total * 100, 1) if total else 0,
"conversation_history": round(history_tokens / total * 100, 1) if total else 0,
"user_message": round(user_tokens / total * 100, 1) if total else 0,
},
}
Token budget enforcement
Hard limits reject requests that would exceed the budget. Soft limits warn but allow through:
from enum import Enum
class BudgetAction(Enum):
ALLOW = "allow"
WARN = "warn"
TRUNCATE = "truncate"
REJECT = "reject"
@dataclass
class TokenBudget:
max_input_tokens: int
max_output_tokens: int
soft_warn_fraction: float = 0.80 # Warn at 80% of limit
action_on_exceed: BudgetAction = BudgetAction.TRUNCATE
def check_input_budget(
input_tokens: int,
budget: TokenBudget,
) -> tuple[BudgetAction, str]:
"""Returns (action, reason)."""
if input_tokens > budget.max_input_tokens:
return (
budget.action_on_exceed,
f"Input {input_tokens} tokens exceeds hard limit {budget.max_input_tokens}",
)
soft_limit = int(budget.max_input_tokens * budget.soft_warn_fraction)
if input_tokens > soft_limit:
return (
BudgetAction.WARN,
f"Input {input_tokens} tokens approaching limit {budget.max_input_tokens}",
)
return BudgetAction.ALLOW, ""
def truncate_history_to_budget(
history: list[dict],
budget_tokens: int,
tokenizer,
) -> list[dict]:
"""
Remove oldest messages from history until the total fits within budget.
Always keep the most recent user message and the system prompt is handled separately.
"""
while history:
total = sum(len(tokenizer.encode(m.get("content", ""))) for m in history)
if total <= budget_tokens:
break
# Remove the oldest message that is not the most recent user turn
if len(history) > 1:
history = history[1:]
else:
# Only one message left and it's still too long — truncate content
content = history[0].get("content", "")
tokens = tokenizer.encode(content)
history[0]["content"] = tokenizer.decode(tokens[:budget_tokens])
break
return history
Cost anomaly detection
Alert when cost per request deviates significantly from baseline:
import math
from collections import deque
class CostAnomalyDetector:
def __init__(self, window_size: int = 100, z_threshold: float = 2.0):
self.window_size = window_size
self.z_threshold = z_threshold
self.recent_costs: deque[float] = deque(maxlen=window_size)
def update(self, cost_usd: float) -> dict:
self.recent_costs.append(cost_usd)
if len(self.recent_costs) < 10:
return {"anomaly": False, "reason": "insufficient_data"}
values = list(self.recent_costs)
mean = sum(values) / len(values)
variance = sum((v - mean) ** 2 for v in values) / len(values)
stddev = math.sqrt(variance)
if stddev == 0:
return {"anomaly": False, "mean": mean, "stddev": 0}
z_score = abs(cost_usd - mean) / stddev
anomaly = z_score > self.z_threshold
return {
"anomaly": anomaly,
"cost_usd": cost_usd,
"mean": round(mean, 6),
"stddev": round(stddev, 6),
"z_score": round(z_score, 2),
"threshold": self.z_threshold,
}
# One detector per feature_id
detectors: dict[str, CostAnomalyDetector] = {}
def check_cost_anomaly(feature_id: str, cost_usd: float) -> dict:
if feature_id not in detectors:
detectors[feature_id] = CostAnomalyDetector()
result = detectors[feature_id].update(cost_usd)
if result.get("anomaly"):
emit_alert(f"Cost anomaly on {feature_id}: ${cost_usd:.4f} (z={result['z_score']})")
return result
def emit_alert(message: str) -> None:
# Route to PagerDuty, Slack, or your alerting system
print(f"[ALERT] {message}")
Layer 3: Deep Dive
The output token multiplier
Understanding the economics of LLM APIs requires accepting one counterintuitive fact: output tokens are expensive relative to input tokens. The ratio varies by provider and model tier, but output typically costs 3–5 times as much as input per token.
This matters because many teams instrument “tokens sent” (input) and ignore “tokens generated” (output). The economic consequence is that features which produce verbose outputs, long reports, detailed explanations, code with comments, can cost dramatically more than features that produce terse outputs, even when the input prompts are similar in length.
| Feature type | Typical input tokens | Typical output tokens | Cost ratio (output/input) | Cost driver |
|---|---|---|---|---|
| Classification | 500 | 10 | 0.02 | Input |
| Summarization | 2000 | 300 | 0.15 | Input |
| Report generation | 1000 | 3000 | 3.0 | Output |
| Code generation | 500 | 2000 | 4.0 | Output |
| Chatbot (verbose) | 800 | 1500 | 1.9 | Output |
Instrument both dimensions and set alerts on each independently.
Model selection as a cost lever
The fastest and highest-quality model is rarely the right choice for every feature. Matching model capability to task complexity is a major cost lever:
def select_model_for_task(
task_type: str,
requires_reasoning: bool,
output_token_estimate: int,
latency_sla_ms: int,
) -> str:
"""
Route tasks to the appropriate model tier based on requirements.
This prevents over-provisioning (using frontier for classification).
"""
# Simple structured tasks: fast model is adequate and much cheaper
if task_type in ("classification", "extraction", "routing") and not requires_reasoning:
return "fast"
# Moderate complexity: balanced model
if output_token_estimate < 500 and latency_sla_ms > 2000:
return "balanced"
# Complex reasoning or long generation: frontier model
if requires_reasoning or output_token_estimate > 1000:
return "frontier"
return "balanced" # Default to mid-tier when uncertain
Cost attribution in practice
A mature cost attribution system has three layers:
- Request-level tagging: every call includes feature_id, user_segment, model_id, request_id
- Hourly aggregation: sum token counts and costs by feature and model; write to a time-series store
- Budget alerts: alert when a feature’s hourly cost exceeds 2 standard deviations from its rolling average
The minimum viable dashboard shows: top 5 features by spend, cost per request trend over the last 7 days for each feature, and input vs output token ratio per feature (sudden changes in this ratio indicate prompt or behavior changes).
Further reading
- Anthropic API Pricing; Anthropic. Official token pricing; verify before implementing cost calculations.
- OpenAI Tokenizer; OpenAI. Interactive tokenizer for counting tokens; useful for calibrating budget estimates. The tokenization approach is similar across major providers.
- FinOps for Cloud, Framework, FinOps Foundation, 2023. The FinOps discipline applies directly to LLM cost management: make costs visible, attribute them accurately, and optimize collaboratively across engineering and finance.