๐Ÿค– AI Explained
Emerging area 5 min read

Production Agent Systems

An agent that works in a demo fails in production the first time it crashes mid-task, gets retried with a duplicate side effect, or loses its state to a process restart. This module covers the durability semantics that separate toy agents from production systems.

Layer 1: Surface

Production agents fail in ways that have nothing to do with the model: the process crashes, the network drops, the task queue retries a completed job. Without durability design, these infrastructure events cause:

  • Duplicate side effects: email sent twice, charge made twice
  • Lost progress: 40-minute task restarts from scratch after a crash
  • Stuck leases: a worker claims a task, dies, and no other worker can pick it up
  • Inconsistent state: some steps completed, others not, with no record of which

The four durability primitives:

PrimitiveWhat it provides
CheckpointsDurable snapshots of task progress; restart from last checkpoint, not from scratch
Idempotency keysDeduplication of side effects when a step is retried
LeasesExclusive task ownership with automatic expiry; prevents stuck tasks
Replay safetySteps can be re-executed from a checkpoint without double-counting effects

Layer 2: Guided

Checkpointing

A checkpoint is a serialised snapshot of agent state that survives process restarts:

import json
import time
from dataclasses import dataclass, asdict

@dataclass
class AgentCheckpoint:
    task_id: str
    step_index: int                 # last completed step
    completed_steps: list[str]      # IDs of completed steps
    step_results: dict              # results keyed by step ID
    messages: list[dict]            # conversation history at this point
    created_at: float

class CheckpointStore:
    def __init__(self, backend):    # backend: Redis, DynamoDB, Postgres
        self.backend = backend

    def save(self, checkpoint: AgentCheckpoint):
        """Atomically write checkpoint."""
        key = f"checkpoint:{checkpoint.task_id}"
        self.backend.set(key, json.dumps(asdict(checkpoint)))

    def load(self, task_id: str) -> AgentCheckpoint | None:
        raw = self.backend.get(f"checkpoint:{task_id}")
        if not raw:
            return None
        return AgentCheckpoint(**json.loads(raw))

    def delete(self, task_id: str):
        self.backend.delete(f"checkpoint:{task_id}")

def run_with_checkpoints(task_id: str, goal: str, tools: list[dict]) -> str:
    store = CheckpointStore(backend)
    checkpoint = store.load(task_id)

    # Resume from checkpoint or start fresh
    if checkpoint:
        messages = checkpoint.messages
        completed = set(checkpoint.completed_steps)
        results = checkpoint.step_results
        start_step = checkpoint.step_index + 1
    else:
        messages = [{"role": "user", "content": goal}]
        completed = set()
        results = {}
        start_step = 0

    plan = generate_plan(goal)

    for i, step in enumerate(plan[start_step:], start=start_step):
        if step["id"] in completed:
            continue    # skip already-completed steps on replay

        result = execute_step(step, tools, context=results)
        results[step["id"]] = result
        completed.add(step["id"])

        # Checkpoint after every step
        store.save(AgentCheckpoint(
            task_id=task_id,
            step_index=i,
            completed_steps=list(completed),
            step_results=results,
            messages=messages,
            created_at=time.time(),
        ))

    store.delete(task_id)   # clean up on successful completion
    return synthesise(goal, results)

Idempotency keys for agent tasks

Every step that produces an external side effect needs an idempotency key that is stable across retries:

import hashlib

def step_idempotency_key(task_id: str, step_id: str, attempt: int = 0) -> str:
    """
    Derive a stable key for a specific step execution.
    - task_id + step_id produces the same key for the same logical step
    - attempt is only incremented when the user explicitly resubmits (not on retry)
    """
    return hashlib.sha256(f"{task_id}:{step_id}:{attempt}".encode()).hexdigest()[:32]

class IdempotentEffectExecutor:
    def __init__(self, result_store):
        self.store = result_store

    def execute(self, task_id: str, step_id: str, fn, *args, **kwargs) -> str:
        key = step_idempotency_key(task_id, step_id)

        # Check if this step already completed
        cached = self.store.get(key)
        if cached:
            return cached       # return previous result without re-executing

        result = fn(*args, **kwargs)

        # Persist result before returning โ€” if we crash here, next attempt retries cleanly
        self.store.set(key, result, ttl=86400 * 7)  # 7-day TTL
        return result

The key must be deterministic from (task_id, step_id): not from wall-clock time or random values. This ensures that a retried step uses the same key and hits the cache.

Lease / lock ownership

Prevent two workers from executing the same task concurrently:

import time
import uuid

class TaskLease:
    def __init__(self, store, lease_ttl: float = 300.0):  # 5-minute lease
        self.store = store
        self.lease_ttl = lease_ttl

    def acquire(self, task_id: str) -> str | None:
        """Atomically acquire a lease. Returns lease_id or None if already held."""
        lease_id = str(uuid.uuid4())
        key = f"lease:{task_id}"

        # SET NX EX โ€” only set if key does not exist, with expiry
        acquired = self.store.set(key, lease_id, nx=True, ex=int(self.lease_ttl))
        return lease_id if acquired else None

    # Lua scripts make renew and release atomic โ€” no window between the ownership
    # check and the mutation where another worker can slip in.
    _RENEW_SCRIPT = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("EXPIRE", KEYS[1], ARGV[2])
        else
            return 0
        end
    """

    _RELEASE_SCRIPT = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    """

    def renew(self, task_id: str, lease_id: str) -> bool:
        """Atomically extend the lease if we still own it."""
        key = f"lease:{task_id}"
        result = self.store.eval(self._RENEW_SCRIPT, 1, key, lease_id, int(self.lease_ttl))
        return bool(result)

    def release(self, task_id: str, lease_id: str):
        """Atomically release only if we still own the lease."""
        key = f"lease:{task_id}"
        self.store.eval(self._RELEASE_SCRIPT, 1, key, lease_id)

async def run_with_lease(task_id: str, goal: str):
    lease = TaskLease(redis_client)
    lease_id = lease.acquire(task_id)
    if lease_id is None:
        raise TaskAlreadyRunning(f"Task {task_id} is being executed by another worker")

    # Flag lets us distinguish lease-loss cancellation from external cancellation
    # (e.g. operator shutdown). Without it, a broad `except CancelledError` would
    # misreport any external cancel as a lease failure.
    lease_lost = False
    current_task = asyncio.current_task()

    async def renew_periodically():
        nonlocal lease_lost
        while True:
            await asyncio.sleep(60)
            if not lease.renew(task_id, lease_id):
                # Raising inside a background task is asyncio-swallowed unless the
                # task is awaited. Cancel the main coroutine directly instead.
                lease_lost = True
                current_task.cancel()
                return

    renew_task = asyncio.create_task(renew_periodically())
    try:
        return await run_with_checkpoints(task_id, goal, TOOLS)
    except asyncio.CancelledError:
        if lease_lost:
            raise LeaseLost(f"Lost lease on task {task_id}")
        raise   # external cancellation โ€” propagate as-is
    finally:
        renew_task.cancel()
        lease.release(task_id, lease_id)

Replay safety

A replayed step must not produce new side effects if it already succeeded:

class ReplaySafeStep:
    """
    A step wrapper that is safe to re-execute from a checkpoint.
    On replay: returns the cached result without calling the tool again.
    On first run: executes the tool and caches the result.
    """

    def __init__(self, effect_store):
        self.effects = effect_store

    def execute(self, step_key: str, tool_fn, **kwargs) -> str:
        # Check if this effect was already committed
        prior = self.effects.get(step_key)
        if prior:
            return prior    # idempotent replay

        result = tool_fn(**kwargs)

        # Atomically commit โ€” if this fails, the step will retry and hit the same path
        self.effects.set(step_key, result)
        return result

At-least-once vs exactly-once:

Most task queues provide at-least-once delivery: a task may execute more than once. Exactly-once is achievable only at the application layer by combining idempotency keys with replay safety. Do not rely on the queue to prevent duplicates.


Layer 3: Deep Dive

Durability requirements by task type

Task typeCheckpoint needed?Idempotency needed?Lease needed?
Short query (under 10s)NoNoNo
Multi-step research (no side effects)RecommendedNoRecommended
Multi-step with external writesRequiredRequiredRequired
Long-running background jobRequiredRequiredRequired
Multi-agent workflowRequired at each agent boundaryRequiredRequired per agent

Cost attribution per task

@dataclass
class TaskCostRecord:
    task_id: str
    llm_input_tokens: int = 0
    llm_output_tokens: int = 0
    tool_call_count: int = 0
    wall_time_seconds: float = 0.0

    def total_llm_cost(
        self,
        input_price_per_mtok: float = 3.0,
        output_price_per_mtok: float = 15.0,
    ) -> float:
        return (
            self.llm_input_tokens / 1_000_000 * input_price_per_mtok
            + self.llm_output_tokens / 1_000_000 * output_price_per_mtok
        )

Track cost per task, not per session: it reveals which task types are expensive and guides model right-sizing.

Distributed agent execution

When agents run across multiple workers:

  • Each worker holds a lease and renews it while alive
  • Checkpoints are written to shared storage (Redis, DynamoDB): not local disk
  • Lease expiry allows dead-worker recovery without manual intervention
  • Task queues (Celery, BullMQ, Temporal) handle the retry/backoff logic; agents handle the idempotency

Temporal Workflow or similar durable execution frameworks handle much of this infrastructure automatically: worth evaluating before building a custom checkpoint+lease system.

Further reading

  • Temporal documentation; Durable workflow execution that handles checkpointing, retries, and lease management; the reference implementation of these patterns for production agent systems.
  • Designing Data-Intensive Applications, Chapter 7, Kleppmann. The transactions and atomicity chapter; the idempotency and exactly-once semantics material translates directly to agent step execution.
โœ Suggest an edit on GitHub

Production Agent Systems: Check your understanding

Q1

A worker completes step 3 of a 6-step agent task and then crashes before marking step 3 as done. The task queue retries the task from the beginning. Steps 1โ€“3 execute again, sending a duplicate email and charging the user twice. What two mechanisms working together would have prevented this?

Q2

A step's idempotency key is generated as `hashlib.sha256(json.dumps(step_args).encode()).hexdigest()`. The same agent task is submitted twice by two different users with identical arguments (e.g., both request a report for customer_id='C-001'). What is the problem with this key design?

Q3

A worker acquires a lease on task T-100 and begins execution. Halfway through, the worker process is killed by the OS. No other worker can pick up T-100, even after 10 minutes. What lease design property is missing?

Q4

A team member argues: 'We're using Celery with Redis broker: tasks are guaranteed to execute exactly once, so we don't need idempotency keys.' Is this correct?

Q5

A multi-step agent task has a checkpoint stored after step 4. A replay is triggered. Steps 1โ€“4 re-execute but their side effects (API calls, database writes) do not fire again. The final output is identical to the original run. What mechanism enables this?