Overview
ExecutionState is a Durable Object that tracks async execution state with strong consistency. It provides real-time status queries, WebSocket streaming, and automatic state persistence for long-running workflow executions.
// Create an ExecutionState Durable Object instance
const id = env.EXECUTION_STATE.idFromName(executionId);
const stub = env.EXECUTION_STATE.get(id);
// Start tracking
await stub.fetch('https://do/start', {
method: 'POST',
body: JSON.stringify({
executionId: 'exec_123',
ensembleName: 'order-processing',
totalSteps: 5
})
});
// Query status
const response = await stub.fetch('https://do/status');
const state = await response.json();
Configuration
Add to your wrangler.toml:
[[durable_objects.bindings]]
name = "EXECUTION_STATE"
class_name = "ExecutionState"
script_name = "conductor"
[[migrations]]
tag = "v1"
new_classes = ["ExecutionState"]
##Types
ExecutionStatus
type ExecutionStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
StoredExecutionState
interface StoredExecutionState {
executionId: string;
ensembleName: string;
status: ExecutionStatus;
startedAt: number;
completedAt?: number;
currentStep?: string;
stepIndex?: number;
totalSteps?: number;
outputs: Record<string, unknown>;
metrics: ExecutionMetrics;
result?: unknown;
error?: string;
events: ExecutionEvent[];
}
ExecutionProgressEvent
interface ExecutionProgressEvent {
type: 'progress';
executionId: string;
step: string;
stepIndex: number;
totalSteps: number;
output?: unknown;
timestamp: number;
}
ExecutionCompletionEvent
interface ExecutionCompletionEvent {
type: 'completed' | 'failed' | 'cancelled';
executionId: string;
result?: unknown;
error?: string;
timestamp: number;
}
HTTP API
POST /start
Start execution tracking:
Unique execution identifier
Name of the ensemble being executed
Total number of steps in the execution
Example:
await stub.fetch('https://do/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
executionId: 'exec_abc123',
ensembleName: 'user-onboarding',
totalSteps: 7
})
});
GET /status
Get current execution status:
Response:
{
executionId: string;
ensembleName: string;
status: ExecutionStatus;
startedAt: number;
completedAt?: number;
currentStep?: string;
stepIndex?: number;
totalSteps?: number;
outputs: Record<string, unknown>;
metrics: ExecutionMetrics;
result?: unknown;
error?: string;
events: ExecutionEvent[];
}
Example:
const response = await stub.fetch('https://do/status');
const state = await response.json();
console.log(`Status: ${state.status}`);
console.log(`Progress: ${state.stepIndex}/${state.totalSteps}`);
console.log(`Current: ${state.currentStep}`);
POST /progress
Update execution progress:
Index of the current step (0-based)
Output from the step (optional)
Example:
await stub.fetch('https://do/progress', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
step: 'validate-input',
stepIndex: 0,
output: { valid: true }
})
});
POST /complete
Mark execution as completed:
Example:
await stub.fetch('https://do/complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
result: {
userId: 'user_123',
status: 'onboarded',
emailSent: true
}
})
});
POST /fail
Mark execution as failed:
Example:
await stub.fetch('https://do/fail', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
error: 'Payment processing failed: insufficient funds'
})
});
POST /cancel
Cancel execution:
Example:
await stub.fetch('https://do/cancel', {
method: 'POST'
});
WebSocket API
Connect
Upgrade to WebSocket for live updates:
Example:
const ws = await stub.fetch('https://do/', {
headers: { 'Upgrade': 'websocket' }
}).then(res => res.webSocket);
ws.addEventListener('message', (event) => {
const update = JSON.parse(event.data);
if (update.type === 'initial_state') {
console.log('Initial state:', update.state);
} else if (update.type === 'progress') {
console.log(`Step ${update.stepIndex + 1}/${update.totalSteps}: ${update.step}`);
} else if (update.type === 'completed') {
console.log('Execution completed:', update.result);
ws.close();
}
});
ws.accept();
Events
initial_state
Sent immediately upon connection:
{
type: 'initial_state';
state: StoredExecutionState;
}
progress
Sent when execution progresses:
{
type: 'progress';
executionId: string;
step: string;
stepIndex: number;
totalSteps: number;
output?: unknown;
timestamp: number;
}
completed
Sent when execution completes:
{
type: 'completed';
executionId: string;
result: unknown;
timestamp: number;
}
failed
Sent when execution fails:
{
type: 'failed';
executionId: string;
error: string;
timestamp: number;
}
Usage Patterns
Tracking Async Execution
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const executionId = crypto.randomUUID();
// Get Durable Object stub
const id = env.EXECUTION_STATE.idFromName(executionId);
const stub = env.EXECUTION_STATE.get(id);
// Start tracking
await stub.fetch('https://do/start', {
method: 'POST',
body: JSON.stringify({
executionId,
ensembleName: 'async-workflow',
totalSteps: 10
})
});
// Start async execution (don't await)
env.ctx.waitUntil(
executeWorkflow(executionId, stub, env)
);
// Return immediately with execution ID
return Response.json({
executionId,
status: 'running',
statusUrl: `/executions/${executionId}`
});
}
};
async function executeWorkflow(
executionId: string,
stub: DurableObjectStub,
env: Env
) {
try {
for (let i = 0; i < 10; i++) {
// Update progress
await stub.fetch('https://do/progress', {
method: 'POST',
body: JSON.stringify({
step: `step-${i + 1}`,
stepIndex: i
})
});
// Do work
await performStep(i);
}
// Mark complete
await stub.fetch('https://do/complete', {
method: 'POST',
body: JSON.stringify({
result: { success: true }
})
});
} catch (error) {
// Mark failed
await stub.fetch('https://do/fail', {
method: 'POST',
body: JSON.stringify({
error: error.message
})
});
}
}
Querying Status
// Query execution status
async function getExecutionStatus(
executionId: string,
env: Env
): Promise<StoredExecutionState> {
const id = env.EXECUTION_STATE.idFromName(executionId);
const stub = env.EXECUTION_STATE.get(id);
const response = await stub.fetch('https://do/status');
if (!response.ok) {
throw new Error('Execution not found');
}
return await response.json();
}
Live Progress Streaming
// Client-side progress monitoring
function monitorExecution(executionId: string): EventSource {
// Note: This would require a server-sent events endpoint
// that proxies WebSocket updates to SSE
const eventSource = new EventSource(`/executions/${executionId}/stream`);
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
updateProgressBar(data.stepIndex, data.totalSteps);
});
eventSource.addEventListener('completed', (event) => {
const data = JSON.parse(event.data);
showSuccess(data.result);
eventSource.close();
});
eventSource.addEventListener('failed', (event) => {
const data = JSON.parse(event.data);
showError(data.error);
eventSource.close();
});
return eventSource;
}
Best Practices
- Use unique execution IDs - Generate with
crypto.randomUUID()
- Track all executions - Even synchronous ones for consistency
- Update progress - Give users real-time feedback
- Handle failures - Always call
/fail on errors
- Clean up - Delete old execution states after TTL
- Monitor metrics - Track execution times and success rates
- Use WebSockets - For real-time progress in UIs
- Validate status - Check state before operations
- Implement TTL - Auto-expire old execution states
- Log events - Keep execution history for debugging