Layer 1: Surface
Streaming: the model sends tokens as it generates them instead of waiting for the full response. A user sees the first word in under a second instead of waiting 5 seconds for the whole answer.
Async tools: some tools start work that takes longer than a request-response cycle (run a CI build, process a video, generate a report). Instead of blocking, the tool returns a job ID immediately and the result is collected later.
These two capabilities are often combined: a model triggers an async tool, streams a status message to the user, polls for completion, then streams the final result.
Layer 2: Guided
Basic streaming
# Pseudocode β streaming a response to the user
def stream_response(user_message: str):
with llm.stream(
model="balanced",
messages=[{"role": "user", "content": user_message}],
) as stream:
for chunk in stream:
if chunk.type == "text_delta":
print(chunk.text, end="", flush=True) # or yield to frontend
# In practice β Anthropic SDK:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# In practice β OpenAI SDK:
# stream = client.chat.completions.create(model="gpt-4o", messages=[...], stream=True)
# for chunk in stream:
# print(chunk.choices[0].delta.content or "", end="", flush=True)
Streaming with tool calls
Tool use blocks arrive in a stream as events. You must buffer the entire tool call before executing:
def stream_with_tools(user_message: str, tools: list[dict]):
messages = [{"role": "user", "content": user_message}]
while True:
# Collect the full response β buffer everything
accumulated_content = []
current_tool_inputs: dict[str, str] = {} # tool_use_id β partial JSON
with llm.stream(model="balanced", messages=messages, tools=tools) as stream:
for event in stream:
if event.type == "text_delta":
# Safe to stream to user immediately
yield ("text", event.text)
elif event.type == "tool_use_start":
current_tool_inputs[event.id] = {"name": event.name, "input": ""}
elif event.type == "tool_input_delta":
# Accumulate β do NOT execute yet
current_tool_inputs[event.id]["input"] += event.partial_json
elif event.type == "message_stop":
accumulated_content = stream.get_final_message().content
# Only execute tools after streaming is complete
if not current_tool_inputs:
return # No tools β we're done
messages.append({"role": "assistant", "content": accumulated_content})
tool_results = []
for tool_id, tool_data in current_tool_inputs.items():
import json
arguments = json.loads(tool_data["input"])
result = execute_tool(tool_data["name"], arguments)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_id,
"content": str(result),
})
messages.append({"role": "user", "content": tool_results})
yield ("tool_done", tool_results)
The pattern: stream text events directly to the user, buffer tool events silently, execute after the stream closes, then start a new stream for the modelβs response to the results.
Async tools: fire-and-forget
# Tool that starts a job and returns immediately
def run_data_export(query: str, format: str = "csv") -> dict:
"""
Starts an async export job. Returns a job_id to track progress.
Use check_job_status to poll for completion.
"""
response = api_client.post("/exports", json={"query": query, "format": format})
job = response.json()
return {
"job_id": job["id"],
"status": "queued",
"message": f"Export started. Use check_job_status(job_id='{job['id']}') to track progress.",
}
def check_job_status(job_id: str) -> dict:
"""
Check the status of an async job. Returns status and result_url when complete.
"""
response = api_client.get(f"/exports/{job_id}")
data = response.json()
if data["status"] == "completed":
return {
"status": "completed",
"result_url": data["download_url"],
"row_count": data.get("row_count"),
}
elif data["status"] == "failed":
return {"status": "failed", "error": data.get("error_message")}
else:
return {"status": data["status"], "progress": data.get("progress_pct")}
The model handles polling naturally: it calls run_data_export, receives a job_id, and the agentic loop allows it to call check_job_status in subsequent turns. Keep the polling tool description clear about how long to wait between checks.
Polling with progress updates
For long-running jobs, stream status updates to the user while polling:
import asyncio
async def run_and_poll(job_id: str, poll_interval: float = 3.0):
"""Poll a job and yield status updates."""
while True:
status = await check_job_status_async(job_id)
if status["status"] == "completed":
yield ("complete", status)
return
elif status["status"] == "failed":
yield ("error", status)
return
else:
yield ("progress", status)
await asyncio.sleep(poll_interval)
Cap polling duration: donβt poll forever. If a job hasnβt completed in a reasonable time (say, 5 minutes for a data export), return an error rather than blocking indefinitely.
Cancellation
For async tools that support cancellation:
async def run_with_cancellation(job_id: str, timeout: float = 300.0) -> dict:
"""Run a job with a timeout; cancel it if the timeout is exceeded."""
try:
return await asyncio.wait_for(
poll_until_complete(job_id),
timeout=timeout,
)
except asyncio.TimeoutError:
# Cancel the remote job
await api_client.post(f"/jobs/{job_id}/cancel")
return {
"status": "cancelled",
"reason": f"Job exceeded {timeout}s timeout and was cancelled.",
}
Cancellation is a courtesy to the downstream service. Without it, timed-out jobs continue consuming resources on the remote side.
Server-sent events (SSE) for streaming to browsers
When building a web frontend for a tool-using agent:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/chat")
async def chat_stream(request: ChatRequest):
async def generate():
async for event_type, data in stream_with_tools(request.message, TOOLS):
# Format as SSE
yield f"data: {json.dumps({'type': event_type, 'data': data})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
On the frontend, use fetch(): EventSource only supports GET and cannot send a request body. Read the response as a stream and parse SSE lines manually:
async function streamChat(message) {
const response = await fetch("/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = ""; // accumulate across chunk boundaries
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Only process complete lines β keep the last incomplete fragment in the buffer
const lines = buffer.split("\n");
buffer = lines.pop(); // last element may be an incomplete line
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const payload = line.slice(6).trim();
if (payload === "[DONE]") return; // plain sentinel β check before JSON.parse
const { type, data } = JSON.parse(payload);
if (type === "text") appendToChat(data);
}
}
}
Layer 3: Deep Dive
Streaming and tool use: event taxonomy
A streaming response with tool use produces events in this order:
message_start
content_block_start (index=0, type="text")
text_delta Γ N β stream to user
content_block_stop
content_block_start (index=1, type="tool_use", id="...", name="...")
input_json_delta Γ M β buffer silently
content_block_stop
message_delta (stop_reason="tool_use")
message_stop
The stop_reason in message_delta tells you whether to start the tool execution phase. A stop_reason of end_turn means no tool calls.
Timeout chain design
In a streaming tool workflow, several timeouts layer on top of each other:
User request timeout: 30s β HTTP request must complete within this
ββ LLM stream timeout: 25s β streaming response must start within this
ββ Tool call timeout: 10s β each tool must return within this
ββ Async job timeout: 300s β background job; user polls separately
Never nest async job waits inside the streaming request: they will blow the outer timeout. Async jobs should return a job ID immediately and be polled on a separate endpoint.
Partial result handling
Some tools can return partial results as they stream:
async def search_with_progress(query: str):
"""Generator tool that yields results as they are found."""
async for chunk in streaming_search_api(query):
yield {
"type": "partial",
"results": chunk["results"],
"total_found": chunk["total"],
}
Not all model APIs support streaming tool results: check your providerβs documentation. When streaming tool results are not supported, aggregate to a final result before returning.
Further reading
- Anthropic, Streaming messages [Anthropic], Full event taxonomy for streaming responses including tool_use blocks.
- MDN, Server-sent events, SSE specification and browser API for streaming to web clients.
- OpenAI, Streaming, OpenAIβs streaming format for comparison; uses
deltaobjects rather than typed events.