Layer 1: Surface
When a model has access to tools, the conversation becomes a loop: model calls a tool, your code executes it, the result goes back to the model, the model decides what to do next. This is the agentic loop.
User message
β
βΌ
Model βββ wants tool? βββΊ Execute tool(s)
β² β
ββββββββ tool results ββββββββ
β
Final answer (no more tools needed)
Three questions drive how you implement this loop:
- Sequential or parallel?: Do the tool calls depend on each other, or can they run at the same time?
- How much state to carry?: What does the model need to remember across multiple tool rounds?
- When to stop?: How do you detect a finished conversation vs a runaway loop?
Layer 2: Guided
The basic agentic loop
def run_tool_loop(user_message: str, tools: list[dict], max_iterations: int = 8) -> str:
messages = [{"role": "user", "content": user_message}]
for iteration in range(max_iterations):
response = llm.chat(model="balanced", messages=messages, tools=tools)
# Model is done β no more tool calls
if response.stop_reason == "end_turn":
return response.text
# Model wants to call tools
if response.stop_reason == "tool_use":
# Add the model's response (including tool_use blocks) to messages
messages.append({"role": "assistant", "content": response.content})
# Execute each requested tool
tool_results = []
for tool_call in response.tool_calls:
result = execute_tool(tool_call.name, tool_call.arguments)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": str(result),
})
# Add all tool results back into the conversation
messages.append({"role": "user", "content": tool_results})
# Loop cap reached β fail gracefully
return "I wasn't able to complete this task within the allowed number of steps."
Key points:
- The modelβs response (including the
tool_useblocks) must be added tomessagesbefore the results: the model needs to see its own call to interpret the result - All tool results for a single round go into one
"user"message - The loop cap is a hard safety limit, not just a performance concern
Sequential vs parallel
Sequential: each call depends on the result of the previous one:
# Example: search β read β summarise
# Step 1: search for relevant documents
results = search_knowledge_base(query="authentication patterns")
# Step 2: read the top result (depends on step 1's output)
content = read_document(doc_id=results[0]["id"])
# Step 3: summarise (depends on step 2's output)
summary = summarise(text=content)
The model handles sequential calls naturally: it waits for each result before deciding the next call.
Parallel: calls are independent and can run simultaneously:
import asyncio
async def run_parallel_tools(tool_calls: list) -> list:
"""Execute all tool calls concurrently."""
tasks = [execute_tool_async(tc.name, tc.arguments) for tc in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{"tool_use_id": tc.id, "content": str(r) if not isinstance(r, Exception) else f"Error: {r}"}
for tc, r in zip(tool_calls, results)
]
The model can request multiple tool calls in a single response. Run them in parallel when the calls are independent: this cuts latency proportionally.
How to detect parallelism: if the model returns multiple tool calls in a single response, they are intended to run in parallel. Execute them concurrently. If they arrive one at a time across multiple rounds, they are sequential.
State accumulation
For multi-step tasks, the model accumulates state through the message history: it can read its previous tool results to inform the next call. But for long chains, you may want to maintain explicit state:
def run_research_task(question: str) -> str:
messages = [{"role": "user", "content": question}]
gathered_facts = [] # explicit state alongside message history
for _ in range(10):
response = llm.chat(model="balanced", messages=messages, tools=RESEARCH_TOOLS)
if response.stop_reason == "end_turn":
return response.text
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for tc in response.tool_calls:
result = execute_tool(tc.name, tc.arguments)
gathered_facts.append({"tool": tc.name, "result": result})
tool_results.append({
"type": "tool_result",
"tool_use_id": tc.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
return "Task incomplete after maximum steps."
Loop detection
Beyond the iteration cap, detect and break obvious loops:
from collections import Counter
def detect_repeated_calls(call_history: list[dict], window: int = 3) -> bool:
"""Return True if the same tool+args combo appears twice in the last `window` calls."""
if len(call_history) < window:
return False
recent = call_history[-window:]
signatures = [f"{c['name']}:{sorted(c['arguments'].items())}" for c in recent]
return any(count >= 2 for count in Counter(signatures).values())
When a repeated call is detected, either inject a corrective message (βYou already called this tool with these arguments: the result was X. Please proceed to the next step.β) or terminate the loop.
Error handling
Tool failures should be returned as tool results, not raised as exceptions. The model can often self-correct when it knows what went wrong:
def execute_tool(name: str, arguments: dict) -> str:
try:
handler = TOOL_REGISTRY.get(name)
if handler is None:
return f"Error: unknown tool '{name}'"
result = handler(**arguments)
return json.dumps(result) if isinstance(result, dict) else str(result)
except ValueError as e:
return f"Error: invalid arguments β {e}"
except TimeoutError:
return "Error: tool timed out β try again or use a different approach"
except Exception as e:
logger.error(f"Tool {name} failed: {e}", exc_info=True)
return "Error: tool temporarily unavailable"
Return useful error messages, not stack traces. Stack traces add noise to the context window and expose implementation details.
Layer 3: Deep Dive
DAG-based execution
For complex pipelines where the dependency graph is known in advance, a DAG executor is more efficient than the agentic loop:
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class ToolNode:
name: str
tool_fn: Callable
depends_on: list[str] = field(default_factory=list)
def execute_dag(nodes: list[ToolNode], inputs: dict) -> dict:
results = dict(inputs)
completed = set()
# Simple topological execution β no cycle detection shown here
remaining = list(nodes)
while remaining:
ready = [n for n in remaining if all(d in completed for d in n.depends_on)]
if not ready:
raise RuntimeError("Cycle detected or unresolvable dependencies")
# Execute all ready nodes in parallel
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(n.tool_fn, **{d: results[d] for d in n.depends_on}): n for n in ready}
for future, node in futures.items():
results[node.name] = future.result()
completed.add(node.name)
remaining = [n for n in remaining if n.name not in completed]
return results
Use DAG execution when the task structure is known upfront (e.g. a data pipeline). Use the agentic loop when the model needs to decide dynamically what to do next.
Partial failure handling
In parallel execution, one tool failing should not necessarily fail the whole task:
async def run_parallel_with_fallback(tool_calls: list) -> list:
results = await asyncio.gather(
*[execute_tool_async(tc.name, tc.arguments) for tc in tool_calls],
return_exceptions=True,
)
tool_results = []
for tc, result in zip(tool_calls, results):
if isinstance(result, Exception):
content = f"Tool '{tc.name}' failed: {result}. Continue with available results."
else:
content = result
tool_results.append({"type": "tool_result", "tool_use_id": tc.id, "content": content})
return tool_results
Returning partial results lets the model either attempt a recovery path or produce a partial answer with a clear note about what failed.
Iteration cap by task complexity
Not all tasks need the same cap:
| Task type | Recommended cap | Rationale |
|---|---|---|
| Simple lookup (Q&A) | 3β5 | Should find the answer in 1β2 tool calls |
| Research / synthesis | 8β12 | Multiple sources, follow-ups expected |
| Code generation + test | 10β15 | Write β run β fix loop |
| Autonomous workflow | 15β25 | Long multi-step tasks with recovery |
Always expose the cap as a configurable parameter, not a hard constant. Different callers (interactive chat vs background job) have different tolerance for iteration depth.
Further reading
- Anthropic, Building Effective Agents [Anthropic], Practical patterns for agentic loops including when to use orchestrators vs subagents; the sections on tool use apply to any provider.
- LangGraph documentation; Graph-based agent execution framework; useful for seeing how state and conditional edges work in production agentic systems.