Layer 1: Surface
Agent failures are harder to detect than API failures. A 500 error is obvious. An agent that completes with a confident wrong answer because step 2 retrieved the wrong data and every subsequent step built on it: that is invisible without deliberate instrumentation.
The five failure modes:
| Failure mode | What happens | Why it is dangerous |
|---|---|---|
| Infinite loop | Agent calls tools repeatedly without reaching a conclusion | Unbounded cost; context window fills; session never returns |
| Hallucinated tool call | Model invents tool arguments that pass schema but are factually wrong | Wrong action taken with no error signal |
| Compounding errors | Wrong output from step N feeds step N+1 | Error amplifies across the task; hard to trace at the end |
| Context overflow | Accumulated history exceeds context window | Model loses access to early context; behaviour degrades silently |
| Stuck state | Agent is waiting for something that will never arrive | Session hangs; resource held indefinitely |
Layer 2: Guided
Infinite loop detection
from collections import Counter
import hashlib
class LoopDetector:
def __init__(self, window: int = 5, threshold: int = 2):
self.window = window
self.threshold = threshold
self._history: list[str] = []
def record(self, tool_name: str, arguments: dict) -> bool:
"""Record a tool call and return True if a loop is detected."""
# Normalise to catch near-duplicates (whitespace, key order)
sig = hashlib.md5(
f"{tool_name}:{sorted(arguments.items())}".encode()
).hexdigest()
self._history.append(sig)
recent = self._history[-self.window:]
counts = Counter(recent)
if any(c >= self.threshold for c in counts.values()):
return True
return False
def run_agent_with_loop_detection(goal: str, tools: list[dict]) -> str:
messages = [{"role": "user", "content": goal}]
detector = LoopDetector()
for step in range(10):
response = llm.chat(model="balanced", messages=messages, tools=tools)
if response.stop_reason == "end_turn":
return response.text
messages.append({"role": "assistant", "content": response.content})
for tc in response.tool_calls:
if detector.record(tc.name, tc.arguments):
# Inject a corrective message instead of continuing blindly
messages.append({"role": "user", "content": [{
"type": "tool_result",
"tool_use_id": tc.id,
"content": (
"You have called this tool with the same arguments multiple times. "
"The results are not giving you what you need. "
"Try a different approach or conclude with what you have."
)
}]})
continue
result = execute_tool(tc.name, tc.arguments)
messages.append({"role": "user", "content": [{
"type": "tool_result",
"tool_use_id": tc.id,
"content": result,
}]})
return "Task incomplete β loop or step limit reached."
Hallucinated tool arguments
The model passes structurally valid arguments that are factually invented: an order ID that doesnβt exist, a date in the wrong format, a search query that constructs a URL the model made up.
def execute_with_verification(tool_name: str, arguments: dict) -> str:
# 1. Schema validation (catches structural errors)
validate_schema(tool_name, arguments)
# 2. Semantic validation (catches hallucinated values)
validators = SEMANTIC_VALIDATORS.get(tool_name, {})
for param, validator in validators.items():
if param in arguments:
valid, reason = validator(arguments[param])
if not valid:
return (
f"Error: argument '{param}' failed validation β {reason}. "
f"Verify the correct value and try again."
)
return TOOL_REGISTRY[tool_name](**arguments)
# Example semantic validators
SEMANTIC_VALIDATORS = {
"get_customer_order": {
"order_id": lambda v: (
(True, "") if re.match(r"^ORD-\d{8}$", v)
else (False, f"expected format ORD-XXXXXXXX, got {v!r}")
)
},
"lookup_user": {
"email": lambda v: (
(True, "") if "@" in v and "." in v.split("@")[-1]
else (False, f"{v!r} does not look like a valid email")
)
},
}
Return validation errors as tool results so the model can self-correct in the next step.
Compounding error detection
Add checkpoint verification between high-stakes steps:
def verify_intermediate_result(
step_description: str,
result: str,
expected_properties: list[str],
) -> tuple[bool, str]:
"""Ask a fast model to verify that an intermediate result meets basic criteria."""
check = llm.chat(
model="fast",
messages=[{
"role": "user",
"content": f"""Verify this result from: "{step_description}"
Result:
{result[:1000]}
Does the result satisfy ALL of these:
{chr(10).join(f'- {p}' for p in expected_properties)}
Answer YES or NO, then briefly explain."""
}]
)
passed = check.text.strip().upper().startswith("YES")
return passed, check.text
# Use in multi-step tasks before irreversible actions
def execute_plan_with_checkpoints(plan: list[dict], tools: list[dict]) -> str:
results = {}
for step in plan:
results[step["id"]] = execute_step(step, tools, context=results)
if step.get("checkpoint_before_next"):
passed, reason = verify_intermediate_result(
step["description"],
results[step["id"]],
step["checkpoint_criteria"],
)
if not passed:
raise StepVerificationError(
f"Step {step['id']} failed verification: {reason}"
)
return synthesise(results)
Context overflow management
def estimate_tokens(messages: list[dict]) -> int:
"""Rough estimate: 1 token β 4 characters."""
total_chars = sum(
len(str(m.get("content", ""))) for m in messages
)
return total_chars // 4
def compress_context(messages: list[dict], target_tokens: int) -> list[dict]:
"""Summarise the middle of the conversation to stay within budget."""
if estimate_tokens(messages) <= target_tokens:
return messages
# Always keep: system prompt (index 0) and recent N turns
keep_recent = 6
if len(messages) <= keep_recent + 1:
return messages
core = messages[1:-keep_recent]
tail = messages[-keep_recent:]
summary = llm.chat(
model="fast",
messages=[{
"role": "user",
"content": (
"Summarise the following agent steps and findings concisely. "
"Preserve key facts, decisions, and tool results:\n\n"
+ format_messages(core)
)
}]
).text
return [
messages[0], # system prompt
{"role": "user", "content": f"[Compressed prior steps]: {summary}"},
*tail,
]
Stuck state detection
import time
class StuckStateDetector:
def __init__(self, inactivity_threshold: float = 300.0): # 5 minutes
self.threshold = inactivity_threshold
self.last_progress_at = time.monotonic()
self._last_state_hash = ""
def record_progress(self, state_snapshot: str):
current_hash = hashlib.md5(state_snapshot.encode()).hexdigest()
if current_hash != self._last_state_hash:
self.last_progress_at = time.monotonic()
self._last_state_hash = current_hash
def is_stuck(self) -> bool:
return (time.monotonic() - self.last_progress_at) > self.threshold
When stuck state is detected: cancel the task, release locks, return a partial result with a clear explanation of where the agent stopped.
Layer 3: Deep Dive
Failure taxonomy and recovery strategies
| Failure | Detection | Recovery |
|---|---|---|
| Infinite loop | Repeated call signatures in sliding window | Inject corrective message; escalate if persists |
| Hallucinated args | Schema + semantic validation | Return structured error; model self-corrects |
| Compounding error | Checkpoint verification between steps | Replan from last good checkpoint |
| Context overflow | Token count monitoring | Compress middle context; prune old tool results |
| Stuck state | Inactivity timeout on state hash | Cancel task; release resources; return partial |
| Wrong tool selected | Post-call validation of result relevance | Retry with corrective context; escalate |
Cascading failure in multi-agent systems
In a multi-agent system, one agentβs failure can cascade:
Orchestrator β Research agent (fails silently, returns partial data)
β Writer agent (builds on partial data, produces confident wrong output)
β Review agent (reviews plausible-looking output, approves)
β User receives wrong answer with high confidence
Containment strategies:
- Validate at every handoff point (not just at final output)
- Use explicit confidence scores on intermediate results
- Route below-threshold confidence results to a human gate before proceeding
Idempotency in failure recovery
When an agent step fails and is retried, it must not duplicate side effects:
class IdempotentStepExecutor:
def __init__(self, result_cache):
self.cache = result_cache
def execute(self, step_id: str, fn, *args, **kwargs) -> str:
cached = self.cache.get(step_id)
if cached:
return cached # Return previous result β do not re-execute
result = fn(*args, **kwargs)
self.cache.set(step_id, result)
return result
Step IDs should be deterministic from the task context: the same logical step in a retried task must reuse the same ID to hit the cache.
Further reading
- Failure Modes in LLM Agents; Empirical study of agent failures across benchmark tasks; useful taxonomy that matches the structure of this module.
- Risks from Learned Optimization; Foundational analysis of how optimising systems fail in unexpected ways; background reading for understanding why agentic failure is structurally different.