Skip to main content

Overview

The StateManager class manages execution state, enabling pausing, resuming, and recovering workflows. It integrates with Cloudflare Durable Objects for persistent state storage and distributed execution coordination.
import { StateManager } from '@ensemble-edge/conductor';

const stateManager = new StateManager({
  env,
  executionId: 'exec_abc123'
});

await stateManager.save({
  status: 'running',
  context: { userId: 'user_123' }
});

Constructor

new StateManager(options: StateManagerOptions)
options
StateManagerOptions
required
State manager configuration
options.env
Env
required
Cloudflare Workers environment bindings
options.executionId
string
required
Unique execution identifier
options.namespace
string
default:"execution-state"
Durable Object namespace
options.ttl
number
default:"86400000"
State TTL in milliseconds (24 hours)
interface StateManagerOptions {
  env: Env;
  executionId: string;
  namespace?: string;
  ttl?: number;
}
Example:
const stateManager = new StateManager({
  env,
  executionId: 'exec_abc123',
  ttl: 7 * 24 * 60 * 60 * 1000 // 7 days
});

Methods

save()

Save execution state.
async save(state: ExecutionState): Promise<void>
state
ExecutionState
required
State to save
state.status
string
required
Execution status
state.input
object
Initial input data
state.context
object
Execution context
state.memberOutputs
object
Map of member outputs
state.currentMember
string
Currently executing member
state.startTime
number
Start timestamp
state.metadata
object
Additional metadata
interface ExecutionState {
  id: string;
  status: 'running' | 'waiting' | 'paused' | 'completed' | 'failed';
  input?: Record<string, any>;
  context?: Record<string, any>;
  memberOutputs?: Map<string, any>;
  currentMember?: string;
  startTime: number;
  metadata?: Record<string, any>;
}
Example:
await stateManager.save({
  id: 'exec_abc123',
  status: 'running',
  input: { orderId: 'ORD-123' },
  context: { userId: 'user_123' },
  memberOutputs: new Map([
    ['validate-order', { valid: true }],
    ['calculate-total', { amount: 99.99 }]
  ]),
  currentMember: 'process-payment',
  startTime: Date.now()
});

load()

Load execution state.
async load(): Promise<ExecutionState | null>
Returns: Promise<ExecutionState | null> - State or null if not found Example:
const state = await stateManager.load();

if (state) {
  console.log(`Status: ${state.status}`);
  console.log(`Current member: ${state.currentMember}`);
} else {
  console.log('State not found');
}

update()

Update specific state fields.
async update(updates: Partial<ExecutionState>): Promise<void>
updates
Partial<ExecutionState>
required
Fields to update
Example:
// Update status and current member
await stateManager.update({
  status: 'waiting',
  currentMember: 'approval-step'
});

// Add member output
await stateManager.update({
  memberOutputs: new Map([
    ['process-payment', { paymentId: 'ch_123' }]
  ])
});

delete()

Delete execution state.
async delete(): Promise<void>
Example:
// Clean up after completion
await stateManager.delete();

pause()

Pause execution and save state.
async pause(options: PauseOptions): Promise<string>
options
PauseOptions
required
Pause configuration
options.member
string
required
Member that triggered pause
options.reason
string
required
Pause reason
options.data
object
Additional pause data
options.expiresAt
number
Expiration timestamp
Returns: Promise<string> - Resume token Example:
const token = await stateManager.pause({
  member: 'approval-step',
  reason: 'waiting_for_approval',
  data: {
    approvalUrl: 'https://app.example.com/approve/123',
    requestedBy: 'user_123'
  },
  expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24 hours
});

console.log(`Resume with: ${token}`);

resume()

Resume paused execution.
async resume(token: string, input: Record<string, any>): Promise<void>
token
string
required
Resume token from pause()
input
object
required
Input data to resume with
Example:
await stateManager.resume('hitl_token_123', {
  approved: true,
  comments: 'Looks good'
});

checkpoint()

Create a state checkpoint.
async checkpoint(name: string): Promise<void>
name
string
required
Checkpoint name
Example:
// Save checkpoint before critical operation
await stateManager.checkpoint('before-payment');

try {
  await processPayment();
} catch (error) {
  // Restore from checkpoint
  await stateManager.restore('before-payment');
}

restore()

Restore from checkpoint.
async restore(name: string): Promise<void>
name
string
required
Checkpoint name to restore

listCheckpoints()

List available checkpoints.
async listCheckpoints(): Promise<Checkpoint[]>
Returns: Promise<Checkpoint[]>
interface Checkpoint {
  name: string;
  timestamp: number;
  state: ExecutionState;
}

lock()

Acquire distributed lock for execution.
async lock(options?: LockOptions): Promise<boolean>
options
LockOptions
Lock configuration
options.timeout
number
default:"30000"
Lock timeout in milliseconds
options.retry
boolean
default:"true"
Retry on lock contention
Returns: Promise<boolean> - True if lock acquired Example:
const locked = await stateManager.lock({ timeout: 60000 });

if (locked) {
  try {
    // Perform exclusive operation
    await criticalOperation();
  } finally {
    await stateManager.unlock();
  }
}

unlock()

Release distributed lock.
async unlock(): Promise<void>

isLocked()

Check if execution is locked.
async isLocked(): Promise<boolean>
Returns: Promise<boolean> - True if locked

Properties

executionId

readonly executionId: string
Execution identifier for this state manager.

namespace

readonly namespace: string
Durable Object namespace being used.

ttl

readonly ttl: number
Time-to-live for state in milliseconds.

Durable Objects Integration

The StateManager uses Cloudflare Durable Objects for persistent storage:

Configuration

# wrangler.toml
[[durable_objects.bindings]]
name = "EXECUTION_STATE"
class_name = "ExecutionStateDO"
script_name = "conductor"

[[durable_objects.bindings]]
name = "HITL_STATE"
class_name = "HITLStateDO"
script_name = "conductor"

Durable Object Implementation

// src/durable-objects/execution-state.ts
export class ExecutionStateDO {
  state: DurableObjectState;

  constructor(state: DurableObjectState) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (request.method === 'GET') {
      const state = await this.state.storage.get('execution-state');
      return Response.json(state || null);
    }

    if (request.method === 'PUT') {
      const state = await request.json();
      await this.state.storage.put('execution-state', state);
      return Response.json({ success: true });
    }

    if (request.method === 'DELETE') {
      await this.state.storage.deleteAll();
      return Response.json({ success: true });
    }

    return new Response('Method not allowed', { status: 405 });
  }
}

State Serialization

The StateManager handles complex object serialization:
// Maps are serialized as objects
const state = {
  memberOutputs: new Map([
    ['member-1', { result: 'success' }],
    ['member-2', { count: 42 }]
  ])
};

await stateManager.save(state);

// Restored with correct types
const loaded = await stateManager.load();
console.log(loaded.memberOutputs instanceof Map); // true

Error Handling

try {
  await stateManager.save(state);
} catch (error) {
  if (error instanceof StateError) {
    console.error('State error:', {
      code: error.code,
      message: error.message,
      executionId: error.executionId
    });
  }
}

State Error Codes

  • STATE_NOT_FOUND - No state exists for execution
  • LOCK_TIMEOUT - Failed to acquire lock
  • CHECKPOINT_NOT_FOUND - Checkpoint doesn’t exist
  • SERIALIZATION_ERROR - Failed to serialize state
  • STORAGE_ERROR - Durable Object storage error
  • INVALID_TOKEN - Resume token invalid/expired

Advanced Usage

Optimistic Updates

// Load current state
const state = await stateManager.load();

// Modify state
state.memberOutputs.set('new-member', { result: 'success' });

// Save with optimistic locking
try {
  await stateManager.save(state);
} catch (error) {
  if (error.code === 'CONFLICT') {
    // State was modified by another process
    // Reload and retry
    const fresh = await stateManager.load();
    // ... merge changes
  }
}

Transactional Updates

await stateManager.transaction(async (state) => {
  // Atomic state modifications
  state.memberOutputs.set('payment', { id: 'ch_123' });
  state.currentMember = 'send-confirmation';
  state.metadata.paymentProcessed = true;

  // All changes committed or rolled back together
});

State Migration

import { migrateState } from '@ensemble-edge/conductor';

const oldState = await stateManager.load();

const newState = await migrateState(oldState, {
  from: '1.0.0',
  to: '2.0.0',
  transformers: [
    // Custom transformation logic
    (state) => ({
      ...state,
      version: '2.0.0',
      newField: 'default-value'
    })
  ]
});

await stateManager.save(newState);

Batch Operations

const manager1 = new StateManager({ env, executionId: 'exec_1' });
const manager2 = new StateManager({ env, executionId: 'exec_2' });
const manager3 = new StateManager({ env, executionId: 'exec_3' });

// Save multiple states in parallel
await Promise.all([
  manager1.save(state1),
  manager2.save(state2),
  manager3.save(state3)
]);

Performance

Caching

Enable in-memory caching for frequently accessed state:
const stateManager = new StateManager({
  env,
  executionId: 'exec_abc123',
  cache: {
    enabled: true,
    ttl: 60000 // 1 minute
  }
});

// First load - from Durable Object
const state1 = await stateManager.load();

// Second load - from cache
const state2 = await stateManager.load(); // Fast!

Compression

Large states are automatically compressed:
// Automatically compressed if > 10KB
await stateManager.save({
  ...state,
  largeData: veryLargeObject
});

Best Practices

  1. Set appropriate TTL - Clean up old state
  2. Use checkpoints - Before critical operations
  3. Handle lock timeouts - Retry with backoff
  4. Minimize state size - Store only essential data
  5. Test serialization - Verify complex objects
  6. Clean up completed state - Delete after execution
  7. Use transactions - For atomic updates
  8. Monitor DO usage - Track storage and requests
  9. Handle migration - Version state schemas
  10. Log state changes - Debug state issues

Testing

import { StateManager } from '@ensemble-edge/conductor';
import { describe, it, expect, beforeEach } from 'vitest';

describe('StateManager', () => {
  let stateManager: StateManager;

  beforeEach(() => {
    stateManager = new StateManager({
      env: getMockEnv(),
      executionId: 'test-exec-123'
    });
  });

  it('saves and loads state', async () => {
    const state = {
      id: 'test-exec-123',
      status: 'running',
      startTime: Date.now()
    };

    await stateManager.save(state);
    const loaded = await stateManager.load();

    expect(loaded).toEqual(state);
  });

  it('handles pause and resume', async () => {
    const token = await stateManager.pause({
      member: 'approval',
      reason: 'waiting'
    });

    expect(token).toMatch(/^hitl_/);

    await stateManager.resume(token, { approved: true });

    const state = await stateManager.load();
    expect(state.status).toBe('running');
  });
});