Overview
The StateManager class manages execution state, enabling pausing, resuming, and recovering workflows. It integrates with Cloudflare Durable Objects for persistent state storage and distributed execution coordination.
import { StateManager } from '@ensemble-edge/conductor';
const stateManager = new StateManager({
env,
executionId: 'exec_abc123'
});
await stateManager.save({
status: 'running',
context: { userId: 'user_123' }
});
Constructor
new StateManager(options: StateManagerOptions)
options
StateManagerOptions
required
State manager configurationCloudflare Workers environment bindings
Unique execution identifier
options.namespace
string
default:"execution-state"
Durable Object namespace
State TTL in milliseconds (24 hours)
interface StateManagerOptions {
env: Env;
executionId: string;
namespace?: string;
ttl?: number;
}
Example:
const stateManager = new StateManager({
env,
executionId: 'exec_abc123',
ttl: 7 * 24 * 60 * 60 * 1000 // 7 days
});
Methods
save()
Save execution state.
async save(state: ExecutionState): Promise<void>
State to saveCurrently executing member
interface ExecutionState {
id: string;
status: 'running' | 'waiting' | 'paused' | 'completed' | 'failed';
input?: Record<string, any>;
context?: Record<string, any>;
memberOutputs?: Map<string, any>;
currentMember?: string;
startTime: number;
metadata?: Record<string, any>;
}
Example:
await stateManager.save({
id: 'exec_abc123',
status: 'running',
input: { orderId: 'ORD-123' },
context: { userId: 'user_123' },
memberOutputs: new Map([
['validate-order', { valid: true }],
['calculate-total', { amount: 99.99 }]
]),
currentMember: 'process-payment',
startTime: Date.now()
});
load()
Load execution state.
async load(): Promise<ExecutionState | null>
Returns: Promise<ExecutionState | null> - State or null if not found
Example:
const state = await stateManager.load();
if (state) {
console.log(`Status: ${state.status}`);
console.log(`Current member: ${state.currentMember}`);
} else {
console.log('State not found');
}
update()
Update specific state fields.
async update(updates: Partial<ExecutionState>): Promise<void>
updates
Partial<ExecutionState>
required
Fields to update
Example:
// Update status and current member
await stateManager.update({
status: 'waiting',
currentMember: 'approval-step'
});
// Add member output
await stateManager.update({
memberOutputs: new Map([
['process-payment', { paymentId: 'ch_123' }]
])
});
delete()
Delete execution state.
async delete(): Promise<void>
Example:
// Clean up after completion
await stateManager.delete();
pause()
Pause execution and save state.
async pause(options: PauseOptions): Promise<string>
Pause configurationMember that triggered pause
Returns: Promise<string> - Resume token
Example:
const token = await stateManager.pause({
member: 'approval-step',
reason: 'waiting_for_approval',
data: {
approvalUrl: 'https://app.example.com/approve/123',
requestedBy: 'user_123'
},
expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24 hours
});
console.log(`Resume with: ${token}`);
resume()
Resume paused execution.
async resume(token: string, input: Record<string, any>): Promise<void>
Resume token from pause()
Input data to resume with
Example:
await stateManager.resume('hitl_token_123', {
approved: true,
comments: 'Looks good'
});
checkpoint()
Create a state checkpoint.
async checkpoint(name: string): Promise<void>
Example:
// Save checkpoint before critical operation
await stateManager.checkpoint('before-payment');
try {
await processPayment();
} catch (error) {
// Restore from checkpoint
await stateManager.restore('before-payment');
}
restore()
Restore from checkpoint.
async restore(name: string): Promise<void>
Checkpoint name to restore
listCheckpoints()
List available checkpoints.
async listCheckpoints(): Promise<Checkpoint[]>
Returns: Promise<Checkpoint[]>
interface Checkpoint {
name: string;
timestamp: number;
state: ExecutionState;
}
lock()
Acquire distributed lock for execution.
async lock(options?: LockOptions): Promise<boolean>
Lock configurationLock timeout in milliseconds
Returns: Promise<boolean> - True if lock acquired
Example:
const locked = await stateManager.lock({ timeout: 60000 });
if (locked) {
try {
// Perform exclusive operation
await criticalOperation();
} finally {
await stateManager.unlock();
}
}
unlock()
Release distributed lock.
async unlock(): Promise<void>
isLocked()
Check if execution is locked.
async isLocked(): Promise<boolean>
Returns: Promise<boolean> - True if locked
Properties
executionId
readonly executionId: string
Execution identifier for this state manager.
namespace
readonly namespace: string
Durable Object namespace being used.
ttl
Time-to-live for state in milliseconds.
Durable Objects Integration
The StateManager uses Cloudflare Durable Objects for persistent storage:
Configuration
# wrangler.toml
[[durable_objects.bindings]]
name = "EXECUTION_STATE"
class_name = "ExecutionStateDO"
script_name = "conductor"
[[durable_objects.bindings]]
name = "HITL_STATE"
class_name = "HITLStateDO"
script_name = "conductor"
Durable Object Implementation
// src/durable-objects/execution-state.ts
export class ExecutionStateDO {
state: DurableObjectState;
constructor(state: DurableObjectState) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (request.method === 'GET') {
const state = await this.state.storage.get('execution-state');
return Response.json(state || null);
}
if (request.method === 'PUT') {
const state = await request.json();
await this.state.storage.put('execution-state', state);
return Response.json({ success: true });
}
if (request.method === 'DELETE') {
await this.state.storage.deleteAll();
return Response.json({ success: true });
}
return new Response('Method not allowed', { status: 405 });
}
}
State Serialization
The StateManager handles complex object serialization:
// Maps are serialized as objects
const state = {
memberOutputs: new Map([
['member-1', { result: 'success' }],
['member-2', { count: 42 }]
])
};
await stateManager.save(state);
// Restored with correct types
const loaded = await stateManager.load();
console.log(loaded.memberOutputs instanceof Map); // true
Error Handling
try {
await stateManager.save(state);
} catch (error) {
if (error instanceof StateError) {
console.error('State error:', {
code: error.code,
message: error.message,
executionId: error.executionId
});
}
}
State Error Codes
STATE_NOT_FOUND - No state exists for execution
LOCK_TIMEOUT - Failed to acquire lock
CHECKPOINT_NOT_FOUND - Checkpoint doesn’t exist
SERIALIZATION_ERROR - Failed to serialize state
STORAGE_ERROR - Durable Object storage error
INVALID_TOKEN - Resume token invalid/expired
Advanced Usage
Optimistic Updates
// Load current state
const state = await stateManager.load();
// Modify state
state.memberOutputs.set('new-member', { result: 'success' });
// Save with optimistic locking
try {
await stateManager.save(state);
} catch (error) {
if (error.code === 'CONFLICT') {
// State was modified by another process
// Reload and retry
const fresh = await stateManager.load();
// ... merge changes
}
}
Transactional Updates
await stateManager.transaction(async (state) => {
// Atomic state modifications
state.memberOutputs.set('payment', { id: 'ch_123' });
state.currentMember = 'send-confirmation';
state.metadata.paymentProcessed = true;
// All changes committed or rolled back together
});
State Migration
import { migrateState } from '@ensemble-edge/conductor';
const oldState = await stateManager.load();
const newState = await migrateState(oldState, {
from: '1.0.0',
to: '2.0.0',
transformers: [
// Custom transformation logic
(state) => ({
...state,
version: '2.0.0',
newField: 'default-value'
})
]
});
await stateManager.save(newState);
Batch Operations
const manager1 = new StateManager({ env, executionId: 'exec_1' });
const manager2 = new StateManager({ env, executionId: 'exec_2' });
const manager3 = new StateManager({ env, executionId: 'exec_3' });
// Save multiple states in parallel
await Promise.all([
manager1.save(state1),
manager2.save(state2),
manager3.save(state3)
]);
Caching
Enable in-memory caching for frequently accessed state:
const stateManager = new StateManager({
env,
executionId: 'exec_abc123',
cache: {
enabled: true,
ttl: 60000 // 1 minute
}
});
// First load - from Durable Object
const state1 = await stateManager.load();
// Second load - from cache
const state2 = await stateManager.load(); // Fast!
Compression
Large states are automatically compressed:
// Automatically compressed if > 10KB
await stateManager.save({
...state,
largeData: veryLargeObject
});
Best Practices
- Set appropriate TTL - Clean up old state
- Use checkpoints - Before critical operations
- Handle lock timeouts - Retry with backoff
- Minimize state size - Store only essential data
- Test serialization - Verify complex objects
- Clean up completed state - Delete after execution
- Use transactions - For atomic updates
- Monitor DO usage - Track storage and requests
- Handle migration - Version state schemas
- Log state changes - Debug state issues
Testing
import { StateManager } from '@ensemble-edge/conductor';
import { describe, it, expect, beforeEach } from 'vitest';
describe('StateManager', () => {
let stateManager: StateManager;
beforeEach(() => {
stateManager = new StateManager({
env: getMockEnv(),
executionId: 'test-exec-123'
});
});
it('saves and loads state', async () => {
const state = {
id: 'test-exec-123',
status: 'running',
startTime: Date.now()
};
await stateManager.save(state);
const loaded = await stateManager.load();
expect(loaded).toEqual(state);
});
it('handles pause and resume', async () => {
const token = await stateManager.pause({
member: 'approval',
reason: 'waiting'
});
expect(token).toMatch(/^hitl_/);
await stateManager.resume(token, { approved: true });
const state = await stateManager.load();
expect(state.status).toBe('running');
});
});