Layer 1: Surface
Production agents fail in ways that have nothing to do with the model: the process crashes, the network drops, the task queue retries a completed job. Without durability design, these infrastructure events cause:
- Duplicate side effects: email sent twice, charge made twice
- Lost progress: 40-minute task restarts from scratch after a crash
- Stuck leases: a worker claims a task, dies, and no other worker can pick it up
- Inconsistent state: some steps completed, others not, with no record of which
The four durability primitives:
| Primitive | What it provides |
|---|---|
| Checkpoints | Durable snapshots of task progress; restart from last checkpoint, not from scratch |
| Idempotency keys | Deduplication of side effects when a step is retried |
| Leases | Exclusive task ownership with automatic expiry; prevents stuck tasks |
| Replay safety | Steps can be re-executed from a checkpoint without double-counting effects |
Layer 2: Guided
Checkpointing
A checkpoint is a serialised snapshot of agent state that survives process restarts:
import json
import time
from dataclasses import dataclass, asdict
@dataclass
class AgentCheckpoint:
task_id: str
step_index: int # last completed step
completed_steps: list[str] # IDs of completed steps
step_results: dict # results keyed by step ID
messages: list[dict] # conversation history at this point
created_at: float
class CheckpointStore:
def __init__(self, backend): # backend: Redis, DynamoDB, Postgres
self.backend = backend
def save(self, checkpoint: AgentCheckpoint):
"""Atomically write checkpoint."""
key = f"checkpoint:{checkpoint.task_id}"
self.backend.set(key, json.dumps(asdict(checkpoint)))
def load(self, task_id: str) -> AgentCheckpoint | None:
raw = self.backend.get(f"checkpoint:{task_id}")
if not raw:
return None
return AgentCheckpoint(**json.loads(raw))
def delete(self, task_id: str):
self.backend.delete(f"checkpoint:{task_id}")
def run_with_checkpoints(task_id: str, goal: str, tools: list[dict]) -> str:
store = CheckpointStore(backend)
checkpoint = store.load(task_id)
# Resume from checkpoint or start fresh
if checkpoint:
messages = checkpoint.messages
completed = set(checkpoint.completed_steps)
results = checkpoint.step_results
start_step = checkpoint.step_index + 1
else:
messages = [{"role": "user", "content": goal}]
completed = set()
results = {}
start_step = 0
plan = generate_plan(goal)
for i, step in enumerate(plan[start_step:], start=start_step):
if step["id"] in completed:
continue # skip already-completed steps on replay
result = execute_step(step, tools, context=results)
results[step["id"]] = result
completed.add(step["id"])
# Checkpoint after every step
store.save(AgentCheckpoint(
task_id=task_id,
step_index=i,
completed_steps=list(completed),
step_results=results,
messages=messages,
created_at=time.time(),
))
store.delete(task_id) # clean up on successful completion
return synthesise(goal, results)
Idempotency keys for agent tasks
Every step that produces an external side effect needs an idempotency key that is stable across retries:
import hashlib
def step_idempotency_key(task_id: str, step_id: str, attempt: int = 0) -> str:
"""
Derive a stable key for a specific step execution.
- task_id + step_id produces the same key for the same logical step
- attempt is only incremented when the user explicitly resubmits (not on retry)
"""
return hashlib.sha256(f"{task_id}:{step_id}:{attempt}".encode()).hexdigest()[:32]
class IdempotentEffectExecutor:
def __init__(self, result_store):
self.store = result_store
def execute(self, task_id: str, step_id: str, fn, *args, **kwargs) -> str:
key = step_idempotency_key(task_id, step_id)
# Check if this step already completed
cached = self.store.get(key)
if cached:
return cached # return previous result without re-executing
result = fn(*args, **kwargs)
# Persist result before returning โ if we crash here, next attempt retries cleanly
self.store.set(key, result, ttl=86400 * 7) # 7-day TTL
return result
The key must be deterministic from (task_id, step_id): not from wall-clock time or random values. This ensures that a retried step uses the same key and hits the cache.
Lease / lock ownership
Prevent two workers from executing the same task concurrently:
import time
import uuid
class TaskLease:
def __init__(self, store, lease_ttl: float = 300.0): # 5-minute lease
self.store = store
self.lease_ttl = lease_ttl
def acquire(self, task_id: str) -> str | None:
"""Atomically acquire a lease. Returns lease_id or None if already held."""
lease_id = str(uuid.uuid4())
key = f"lease:{task_id}"
# SET NX EX โ only set if key does not exist, with expiry
acquired = self.store.set(key, lease_id, nx=True, ex=int(self.lease_ttl))
return lease_id if acquired else None
# Lua scripts make renew and release atomic โ no window between the ownership
# check and the mutation where another worker can slip in.
_RENEW_SCRIPT = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
"""
_RELEASE_SCRIPT = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
def renew(self, task_id: str, lease_id: str) -> bool:
"""Atomically extend the lease if we still own it."""
key = f"lease:{task_id}"
result = self.store.eval(self._RENEW_SCRIPT, 1, key, lease_id, int(self.lease_ttl))
return bool(result)
def release(self, task_id: str, lease_id: str):
"""Atomically release only if we still own the lease."""
key = f"lease:{task_id}"
self.store.eval(self._RELEASE_SCRIPT, 1, key, lease_id)
async def run_with_lease(task_id: str, goal: str):
lease = TaskLease(redis_client)
lease_id = lease.acquire(task_id)
if lease_id is None:
raise TaskAlreadyRunning(f"Task {task_id} is being executed by another worker")
# Flag lets us distinguish lease-loss cancellation from external cancellation
# (e.g. operator shutdown). Without it, a broad `except CancelledError` would
# misreport any external cancel as a lease failure.
lease_lost = False
current_task = asyncio.current_task()
async def renew_periodically():
nonlocal lease_lost
while True:
await asyncio.sleep(60)
if not lease.renew(task_id, lease_id):
# Raising inside a background task is asyncio-swallowed unless the
# task is awaited. Cancel the main coroutine directly instead.
lease_lost = True
current_task.cancel()
return
renew_task = asyncio.create_task(renew_periodically())
try:
return await run_with_checkpoints(task_id, goal, TOOLS)
except asyncio.CancelledError:
if lease_lost:
raise LeaseLost(f"Lost lease on task {task_id}")
raise # external cancellation โ propagate as-is
finally:
renew_task.cancel()
lease.release(task_id, lease_id)
Replay safety
A replayed step must not produce new side effects if it already succeeded:
class ReplaySafeStep:
"""
A step wrapper that is safe to re-execute from a checkpoint.
On replay: returns the cached result without calling the tool again.
On first run: executes the tool and caches the result.
"""
def __init__(self, effect_store):
self.effects = effect_store
def execute(self, step_key: str, tool_fn, **kwargs) -> str:
# Check if this effect was already committed
prior = self.effects.get(step_key)
if prior:
return prior # idempotent replay
result = tool_fn(**kwargs)
# Atomically commit โ if this fails, the step will retry and hit the same path
self.effects.set(step_key, result)
return result
At-least-once vs exactly-once:
Most task queues provide at-least-once delivery: a task may execute more than once. Exactly-once is achievable only at the application layer by combining idempotency keys with replay safety. Do not rely on the queue to prevent duplicates.
Layer 3: Deep Dive
Durability requirements by task type
| Task type | Checkpoint needed? | Idempotency needed? | Lease needed? |
|---|---|---|---|
| Short query (under 10s) | No | No | No |
| Multi-step research (no side effects) | Recommended | No | Recommended |
| Multi-step with external writes | Required | Required | Required |
| Long-running background job | Required | Required | Required |
| Multi-agent workflow | Required at each agent boundary | Required | Required per agent |
Cost attribution per task
@dataclass
class TaskCostRecord:
task_id: str
llm_input_tokens: int = 0
llm_output_tokens: int = 0
tool_call_count: int = 0
wall_time_seconds: float = 0.0
def total_llm_cost(
self,
input_price_per_mtok: float = 3.0,
output_price_per_mtok: float = 15.0,
) -> float:
return (
self.llm_input_tokens / 1_000_000 * input_price_per_mtok
+ self.llm_output_tokens / 1_000_000 * output_price_per_mtok
)
Track cost per task, not per session: it reveals which task types are expensive and guides model right-sizing.
Distributed agent execution
When agents run across multiple workers:
- Each worker holds a lease and renews it while alive
- Checkpoints are written to shared storage (Redis, DynamoDB): not local disk
- Lease expiry allows dead-worker recovery without manual intervention
- Task queues (Celery, BullMQ, Temporal) handle the retry/backoff logic; agents handle the idempotency
Temporal Workflow or similar durable execution frameworks handle much of this infrastructure automatically: worth evaluating before building a custom checkpoint+lease system.
Further reading
- Temporal documentation; Durable workflow execution that handles checkpointing, retries, and lease management; the reference implementation of these patterns for production agent systems.
- Designing Data-Intensive Applications, Chapter 7, Kleppmann. The transactions and atomicity chapter; the idempotency and exactly-once semantics material translates directly to agent step execution.