πŸ€– AI Explained
Fast-moving: verify before relying on this 5 min read

Streaming and Async Tool Workflows

Streaming gives users tokens as they arrive instead of waiting for the full response. Async tools let long-running operations run in the background. Both change how you wire together models and tools, and both have sharp edges that aren't obvious until you're in production.

Layer 1: Surface

Streaming: the model sends tokens as it generates them instead of waiting for the full response. A user sees the first word in under a second instead of waiting 5 seconds for the whole answer.

Async tools: some tools start work that takes longer than a request-response cycle (run a CI build, process a video, generate a report). Instead of blocking, the tool returns a job ID immediately and the result is collected later.

These two capabilities are often combined: a model triggers an async tool, streams a status message to the user, polls for completion, then streams the final result.


Layer 2: Guided

Basic streaming

# Pseudocode β€” streaming a response to the user
def stream_response(user_message: str):
    with llm.stream(
        model="balanced",
        messages=[{"role": "user", "content": user_message}],
    ) as stream:
        for chunk in stream:
            if chunk.type == "text_delta":
                print(chunk.text, end="", flush=True)  # or yield to frontend

# In practice β€” Anthropic SDK:
import anthropic
client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": user_message}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# In practice β€” OpenAI SDK:
# stream = client.chat.completions.create(model="gpt-4o", messages=[...], stream=True)
# for chunk in stream:
#     print(chunk.choices[0].delta.content or "", end="", flush=True)

Streaming with tool calls

Tool use blocks arrive in a stream as events. You must buffer the entire tool call before executing:

def stream_with_tools(user_message: str, tools: list[dict]):
    messages = [{"role": "user", "content": user_message}]

    while True:
        # Collect the full response β€” buffer everything
        accumulated_content = []
        current_tool_inputs: dict[str, str] = {}  # tool_use_id β†’ partial JSON

        with llm.stream(model="balanced", messages=messages, tools=tools) as stream:
            for event in stream:
                if event.type == "text_delta":
                    # Safe to stream to user immediately
                    yield ("text", event.text)

                elif event.type == "tool_use_start":
                    current_tool_inputs[event.id] = {"name": event.name, "input": ""}

                elif event.type == "tool_input_delta":
                    # Accumulate β€” do NOT execute yet
                    current_tool_inputs[event.id]["input"] += event.partial_json

                elif event.type == "message_stop":
                    accumulated_content = stream.get_final_message().content

        # Only execute tools after streaming is complete
        if not current_tool_inputs:
            return  # No tools β€” we're done

        messages.append({"role": "assistant", "content": accumulated_content})

        tool_results = []
        for tool_id, tool_data in current_tool_inputs.items():
            import json
            arguments = json.loads(tool_data["input"])
            result = execute_tool(tool_data["name"], arguments)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tool_id,
                "content": str(result),
            })

        messages.append({"role": "user", "content": tool_results})
        yield ("tool_done", tool_results)

The pattern: stream text events directly to the user, buffer tool events silently, execute after the stream closes, then start a new stream for the model’s response to the results.

Async tools: fire-and-forget

# Tool that starts a job and returns immediately
def run_data_export(query: str, format: str = "csv") -> dict:
    """
    Starts an async export job. Returns a job_id to track progress.
    Use check_job_status to poll for completion.
    """
    response = api_client.post("/exports", json={"query": query, "format": format})
    job = response.json()
    return {
        "job_id": job["id"],
        "status": "queued",
        "message": f"Export started. Use check_job_status(job_id='{job['id']}') to track progress.",
    }

def check_job_status(job_id: str) -> dict:
    """
    Check the status of an async job. Returns status and result_url when complete.
    """
    response = api_client.get(f"/exports/{job_id}")
    data = response.json()
    if data["status"] == "completed":
        return {
            "status": "completed",
            "result_url": data["download_url"],
            "row_count": data.get("row_count"),
        }
    elif data["status"] == "failed":
        return {"status": "failed", "error": data.get("error_message")}
    else:
        return {"status": data["status"], "progress": data.get("progress_pct")}

The model handles polling naturally: it calls run_data_export, receives a job_id, and the agentic loop allows it to call check_job_status in subsequent turns. Keep the polling tool description clear about how long to wait between checks.

Polling with progress updates

For long-running jobs, stream status updates to the user while polling:

import asyncio

async def run_and_poll(job_id: str, poll_interval: float = 3.0):
    """Poll a job and yield status updates."""
    while True:
        status = await check_job_status_async(job_id)

        if status["status"] == "completed":
            yield ("complete", status)
            return
        elif status["status"] == "failed":
            yield ("error", status)
            return
        else:
            yield ("progress", status)
            await asyncio.sleep(poll_interval)

Cap polling duration: don’t poll forever. If a job hasn’t completed in a reasonable time (say, 5 minutes for a data export), return an error rather than blocking indefinitely.

Cancellation

For async tools that support cancellation:

async def run_with_cancellation(job_id: str, timeout: float = 300.0) -> dict:
    """Run a job with a timeout; cancel it if the timeout is exceeded."""
    try:
        return await asyncio.wait_for(
            poll_until_complete(job_id),
            timeout=timeout,
        )
    except asyncio.TimeoutError:
        # Cancel the remote job
        await api_client.post(f"/jobs/{job_id}/cancel")
        return {
            "status": "cancelled",
            "reason": f"Job exceeded {timeout}s timeout and was cancelled.",
        }

Cancellation is a courtesy to the downstream service. Without it, timed-out jobs continue consuming resources on the remote side.

Server-sent events (SSE) for streaming to browsers

When building a web frontend for a tool-using agent:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/chat")
async def chat_stream(request: ChatRequest):
    async def generate():
        async for event_type, data in stream_with_tools(request.message, TOOLS):
            # Format as SSE
            yield f"data: {json.dumps({'type': event_type, 'data': data})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

On the frontend, use fetch(): EventSource only supports GET and cannot send a request body. Read the response as a stream and parse SSE lines manually:

async function streamChat(message) {
  const response = await fetch("/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";  // accumulate across chunk boundaries

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // Only process complete lines β€” keep the last incomplete fragment in the buffer
    const lines = buffer.split("\n");
    buffer = lines.pop();  // last element may be an incomplete line

    for (const line of lines) {
      if (!line.startsWith("data: ")) continue;
      const payload = line.slice(6).trim();
      if (payload === "[DONE]") return;    // plain sentinel β€” check before JSON.parse
      const { type, data } = JSON.parse(payload);
      if (type === "text") appendToChat(data);
    }
  }
}

Layer 3: Deep Dive

Streaming and tool use: event taxonomy

A streaming response with tool use produces events in this order:

message_start
  content_block_start (index=0, type="text")
    text_delta Γ— N              ← stream to user
  content_block_stop
  content_block_start (index=1, type="tool_use", id="...", name="...")
    input_json_delta Γ— M        ← buffer silently
  content_block_stop
message_delta (stop_reason="tool_use")
message_stop

The stop_reason in message_delta tells you whether to start the tool execution phase. A stop_reason of end_turn means no tool calls.

Timeout chain design

In a streaming tool workflow, several timeouts layer on top of each other:

User request timeout:   30s  ← HTTP request must complete within this
  └─ LLM stream timeout:  25s  ← streaming response must start within this
       └─ Tool call timeout:  10s  ← each tool must return within this
            └─ Async job timeout: 300s ← background job; user polls separately

Never nest async job waits inside the streaming request: they will blow the outer timeout. Async jobs should return a job ID immediately and be polled on a separate endpoint.

Partial result handling

Some tools can return partial results as they stream:

async def search_with_progress(query: str):
    """Generator tool that yields results as they are found."""
    async for chunk in streaming_search_api(query):
        yield {
            "type": "partial",
            "results": chunk["results"],
            "total_found": chunk["total"],
        }

Not all model APIs support streaming tool results: check your provider’s documentation. When streaming tool results are not supported, aggregate to a final result before returning.

Further reading

✏ Suggest an edit on GitHub

Streaming and Async Tool Workflows: Check your understanding

Q1

A streaming response begins delivering text to the user, then mid-stream a tool_use block starts arriving. What must you do before executing the tool?

Q2

A tool starts a background data export job and returns a job_id immediately. The export takes 2–5 minutes to complete. How should the model handle this in the agentic loop?

Q3

You are building a web interface that streams the agent's response to the browser. Which protocol is standard for server-to-browser one-way streaming?

Q4

An async job is started by the model but the user navigates away. The job continues running on the remote service, consuming its resources. What pattern prevents orphaned jobs?

Q5

A streaming response has been running for 25 seconds and the user's connection drops mid-stream. On reconnect, the user wants to resume where they left off. What is the limitation of SSE in this scenario?