πŸ€– AI Explained
6 min read

Tool Execution Patterns

A single tool call is easy. Production tool use involves chains of calls, parallel execution, shared state, and the ever-present risk of runaway loops. This module covers the patterns that make multi-step tool execution reliable.

Layer 1: Surface

When a model has access to tools, the conversation becomes a loop: model calls a tool, your code executes it, the result goes back to the model, the model decides what to do next. This is the agentic loop.

User message
     β”‚
     β–Ό
  Model ─── wants tool? ──► Execute tool(s)
     β–²                            β”‚
     └─────── tool results β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚
   Final answer (no more tools needed)

Three questions drive how you implement this loop:

  1. Sequential or parallel?: Do the tool calls depend on each other, or can they run at the same time?
  2. How much state to carry?: What does the model need to remember across multiple tool rounds?
  3. When to stop?: How do you detect a finished conversation vs a runaway loop?

Layer 2: Guided

The basic agentic loop

def run_tool_loop(user_message: str, tools: list[dict], max_iterations: int = 8) -> str:
    messages = [{"role": "user", "content": user_message}]

    for iteration in range(max_iterations):
        response = llm.chat(model="balanced", messages=messages, tools=tools)

        # Model is done β€” no more tool calls
        if response.stop_reason == "end_turn":
            return response.text

        # Model wants to call tools
        if response.stop_reason == "tool_use":
            # Add the model's response (including tool_use blocks) to messages
            messages.append({"role": "assistant", "content": response.content})

            # Execute each requested tool
            tool_results = []
            for tool_call in response.tool_calls:
                result = execute_tool(tool_call.name, tool_call.arguments)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": tool_call.id,
                    "content": str(result),
                })

            # Add all tool results back into the conversation
            messages.append({"role": "user", "content": tool_results})

    # Loop cap reached β€” fail gracefully
    return "I wasn't able to complete this task within the allowed number of steps."

Key points:

  • The model’s response (including the tool_use blocks) must be added to messages before the results: the model needs to see its own call to interpret the result
  • All tool results for a single round go into one "user" message
  • The loop cap is a hard safety limit, not just a performance concern

Sequential vs parallel

Sequential: each call depends on the result of the previous one:

# Example: search β†’ read β†’ summarise
# Step 1: search for relevant documents
results = search_knowledge_base(query="authentication patterns")

# Step 2: read the top result (depends on step 1's output)
content = read_document(doc_id=results[0]["id"])

# Step 3: summarise (depends on step 2's output)
summary = summarise(text=content)

The model handles sequential calls naturally: it waits for each result before deciding the next call.

Parallel: calls are independent and can run simultaneously:

import asyncio

async def run_parallel_tools(tool_calls: list) -> list:
    """Execute all tool calls concurrently."""
    tasks = [execute_tool_async(tc.name, tc.arguments) for tc in tool_calls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return [
        {"tool_use_id": tc.id, "content": str(r) if not isinstance(r, Exception) else f"Error: {r}"}
        for tc, r in zip(tool_calls, results)
    ]

The model can request multiple tool calls in a single response. Run them in parallel when the calls are independent: this cuts latency proportionally.

How to detect parallelism: if the model returns multiple tool calls in a single response, they are intended to run in parallel. Execute them concurrently. If they arrive one at a time across multiple rounds, they are sequential.

State accumulation

For multi-step tasks, the model accumulates state through the message history: it can read its previous tool results to inform the next call. But for long chains, you may want to maintain explicit state:

def run_research_task(question: str) -> str:
    messages = [{"role": "user", "content": question}]
    gathered_facts = []  # explicit state alongside message history

    for _ in range(10):
        response = llm.chat(model="balanced", messages=messages, tools=RESEARCH_TOOLS)
        if response.stop_reason == "end_turn":
            return response.text

        messages.append({"role": "assistant", "content": response.content})
        tool_results = []

        for tc in response.tool_calls:
            result = execute_tool(tc.name, tc.arguments)
            gathered_facts.append({"tool": tc.name, "result": result})
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tc.id,
                "content": result,
            })

        messages.append({"role": "user", "content": tool_results})

    return "Task incomplete after maximum steps."

Loop detection

Beyond the iteration cap, detect and break obvious loops:

from collections import Counter

def detect_repeated_calls(call_history: list[dict], window: int = 3) -> bool:
    """Return True if the same tool+args combo appears twice in the last `window` calls."""
    if len(call_history) < window:
        return False
    recent = call_history[-window:]
    signatures = [f"{c['name']}:{sorted(c['arguments'].items())}" for c in recent]
    return any(count >= 2 for count in Counter(signatures).values())

When a repeated call is detected, either inject a corrective message (β€œYou already called this tool with these arguments: the result was X. Please proceed to the next step.”) or terminate the loop.

Error handling

Tool failures should be returned as tool results, not raised as exceptions. The model can often self-correct when it knows what went wrong:

def execute_tool(name: str, arguments: dict) -> str:
    try:
        handler = TOOL_REGISTRY.get(name)
        if handler is None:
            return f"Error: unknown tool '{name}'"
        result = handler(**arguments)
        return json.dumps(result) if isinstance(result, dict) else str(result)
    except ValueError as e:
        return f"Error: invalid arguments β€” {e}"
    except TimeoutError:
        return "Error: tool timed out β€” try again or use a different approach"
    except Exception as e:
        logger.error(f"Tool {name} failed: {e}", exc_info=True)
        return "Error: tool temporarily unavailable"

Return useful error messages, not stack traces. Stack traces add noise to the context window and expose implementation details.


Layer 3: Deep Dive

DAG-based execution

For complex pipelines where the dependency graph is known in advance, a DAG executor is more efficient than the agentic loop:

from dataclasses import dataclass, field
from typing import Callable

@dataclass
class ToolNode:
    name: str
    tool_fn: Callable
    depends_on: list[str] = field(default_factory=list)

def execute_dag(nodes: list[ToolNode], inputs: dict) -> dict:
    results = dict(inputs)
    completed = set()

    # Simple topological execution β€” no cycle detection shown here
    remaining = list(nodes)
    while remaining:
        ready = [n for n in remaining if all(d in completed for d in n.depends_on)]
        if not ready:
            raise RuntimeError("Cycle detected or unresolvable dependencies")

        # Execute all ready nodes in parallel
        import concurrent.futures
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {executor.submit(n.tool_fn, **{d: results[d] for d in n.depends_on}): n for n in ready}
            for future, node in futures.items():
                results[node.name] = future.result()
                completed.add(node.name)

        remaining = [n for n in remaining if n.name not in completed]

    return results

Use DAG execution when the task structure is known upfront (e.g. a data pipeline). Use the agentic loop when the model needs to decide dynamically what to do next.

Partial failure handling

In parallel execution, one tool failing should not necessarily fail the whole task:

async def run_parallel_with_fallback(tool_calls: list) -> list:
    results = await asyncio.gather(
        *[execute_tool_async(tc.name, tc.arguments) for tc in tool_calls],
        return_exceptions=True,
    )
    tool_results = []
    for tc, result in zip(tool_calls, results):
        if isinstance(result, Exception):
            content = f"Tool '{tc.name}' failed: {result}. Continue with available results."
        else:
            content = result
        tool_results.append({"type": "tool_result", "tool_use_id": tc.id, "content": content})
    return tool_results

Returning partial results lets the model either attempt a recovery path or produce a partial answer with a clear note about what failed.

Iteration cap by task complexity

Not all tasks need the same cap:

Task typeRecommended capRationale
Simple lookup (Q&A)3–5Should find the answer in 1–2 tool calls
Research / synthesis8–12Multiple sources, follow-ups expected
Code generation + test10–15Write β†’ run β†’ fix loop
Autonomous workflow15–25Long multi-step tasks with recovery

Always expose the cap as a configurable parameter, not a hard constant. Different callers (interactive chat vs background job) have different tolerance for iteration depth.

Further reading

  • Anthropic, Building Effective Agents [Anthropic], Practical patterns for agentic loops including when to use orchestrators vs subagents; the sections on tool use apply to any provider.
  • LangGraph documentation; Graph-based agent execution framework; useful for seeing how state and conditional edges work in production agentic systems.
✏ Suggest an edit on GitHub

Tool Execution Patterns: Check your understanding

Q1

A user asks your agent to 'check the weather in Paris, Berlin, and Tokyo'. The model returns three tool calls in a single response. How should you execute them?

Q2

Your agentic loop has no max_iterations guard. A user submits a question that the model cannot answer with the available tools. What is the most likely outcome?

Q3

When adding tool results back into the message history, you must include the model's previous response (containing the tool_use blocks) before adding the tool results. Why?

Q4

One of three parallel tool calls fails with a transient error. What is the best response pattern?

Q5

Your loop detection checks whether the exact same tool name and arguments appear twice in the last 3 calls. The model is calling get_search(query='pricing') twice with slightly different whitespace. Does your detection catch this?