Skip to main content

Overview

State management in Conductor solves the “prop drilling” problem - instead of passing data through every member, use shared state that members can read and write. State is immutable, type-safe, and tracked for optimization.

Quick Example

name: multi-step-workflow
description: Workflow using shared state

state:
  schema:
    companyData: object
    analysis: object
    report: string

flow:
  # Step 1: Fetch data, write to state
  - member: fetch-company-data
    input:
      domain: ${input.domain}
    state:
      set: [companyData]

  # Step 2: Read from state, write analysis
  - member: analyze-company
    state:
      use: [companyData]
      set: [analysis]

  # Step 3: Read both, write report
  - member: generate-report
    state:
      use: [companyData, analysis]
      set: [report]

output:
  report: ${state.report}
See State Concept for detailed explanation.

Declaring State Schema

Basic Schema

state:
  schema:
    userData: object
    processedCount: number
    timestamp: number

With Zod Validation

// For advanced validation, use Zod in member
import { z } from 'zod';

const UserSchema = z.object({
  id: z.number(),
  name: z.string(),
  email: z.string().email()
});

export default async function({ state }) {
  const user = UserSchema.parse(state.userData);
  // ...
}

Reading State

In Ensemble

flow:
  - member: process-data
    state:
      use: [userData, settings]  # Declare what you need

In Member Implementation

export default async function({ input, state }) {
  // Read from state
  const userData = state.userData;
  const settings = state.settings;

  // Use state data
  const result = processUser(userData, settings);

  return { result };
}

TypeScript Types

interface HandlerContext {
  state?: {
    userData?: UserData;
    settings?: Settings;
  };
}

export default async function({ state }: HandlerContext) {
  if (!state?.userData) {
    throw new Error('userData not found in state');
  }

  return { processed: state.userData };
}

Writing State

In Ensemble

flow:
  - member: fetch-user
    state:
      set: [userData, timestamp]  # Declare what you'll set

In Member Implementation

export default async function({ input, setState }) {
  // Fetch user data
  const userData = await fetchUser(input.userId);

  // Write to state
  setState?.({
    userData,
    timestamp: Date.now()
  });

  return { success: true };
}

Conditional Updates

export default async function({ input, state, setState }) {
  if (input.updateCache) {
    setState?.({
      cachedData: input.data,
      cacheTimestamp: Date.now()
    });
  }

  return { updated: input.updateCache };
}

State Patterns

Sequential Building

Build up state across multiple members:
flow:
  - member: step1
    state:
      set: [dataA]

  - member: step2
    state:
      use: [dataA]
      set: [dataB]

  - member: step3
    state:
      use: [dataA, dataB]
      set: [dataC]

Parallel Gathering

Collect data in parallel:
flow:
  parallel:
    - member: fetch-user
      state:
        set: [userData]

    - member: fetch-orders
      state:
        set: [ordersData]

    - member: fetch-settings
      state:
        set: [settingsData]

  # Use all parallel results
  - member: combine-data
    state:
      use: [userData, ordersData, settingsData]

Incremental Refinement

Refine data across iterations:
flow:
  - member: initial-analysis
    state:
      set: [analysis]

  - member: add-details
    state:
      use: [analysis]
      set: [analysis]  # Update same key

  - member: finalize
    state:
      use: [analysis]
      set: [finalAnalysis]

Conditional State Updates

flow:
  - member: check-condition
    state:
      set: [shouldCache]

  - member: maybe-cache
    condition: ${state.shouldCache}
    state:
      set: [cachedData]

State vs Member Outputs

When to Use State

# ✅ Good - shared data needed by multiple members
state:
  schema:
    companyData: object  # Used by 3+ members

flow:
  - member: fetch-data
    state:
      set: [companyData]

  - member: analyze
    state:
      use: [companyData]

  - member: report
    state:
      use: [companyData]

When to Use Member Outputs

# ✅ Good - linear data flow
flow:
  - member: step1

  - member: step2
    input:
      data: ${step1.output.result}  # Direct reference

  - member: step3
    input:
      data: ${step2.output.result}

State Initialization

From Input

flow:
  - member: initialize-state
    state:
      set: [userData, config]

# Implementation
export default async function({ input, setState }) {
  setState?.({
    userData: input.user,
    config: input.config
  });
  return { initialized: true };
}

From External Source

flow:
  - member: load-config
    type: Data
    config:
      storage: kv
      operation: get
    state:
      set: [config]

With Defaults

export default async function({ state, setState }) {
  const config = state.config || {
    timeout: 30000,
    retries: 3,
    verbose: false
  };

  setState?.({ config });

  return { config };
}

State Persistence

Save to KV

flow:
  # Use state throughout workflow
  - member: step1
    state:
      use: [userData]

  # Persist state to KV
  - member: save-state
    type: Data
    config:
      storage: kv
      operation: put
      binding: CACHE
    input:
      key: "workflow:${input.workflowId}:state"
      value: ${state}

Load from KV

flow:
  # Load previous state
  - member: load-state
    type: Data
    config:
      storage: kv
      operation: get
      binding: CACHE
    state:
      set: [userData, progress]

  # Continue from where we left off
  - member: resume
    state:
      use: [userData, progress]

Save to D1

flow:
  - member: save-state
    type: Data
    config:
      storage: d1
      operation: query
      binding: DB
      query: |
        INSERT INTO workflow_state (workflow_id, state, updated_at)
        VALUES (?, ?, CURRENT_TIMESTAMP)
        ON CONFLICT(workflow_id)
        DO UPDATE SET state = ?, updated_at = CURRENT_TIMESTAMP

State Size Limits

State is stored in memory during execution:
# ✅ Good - reasonable size (< 1MB)
state:
  schema:
    userData: object       # ~10 KB
    analysis: object       # ~50 KB
    config: object         # ~1 KB

# ⚠️ Warning - large data (> 1MB)
state:
  schema:
    entireDatabase: object  # Store reference instead
    largeFile: string       # Store in R2 instead
Better approach:
state:
  schema:
    userIds: array         # Array of IDs, not full data
    fileKey: string        # R2 key, not file contents

State Debugging

Log State Changes

export default async function({ state, setState }) {
  console.log('State before:', state);

  setState?.({
    newValue: 'updated'
  });

  console.log('State after update');

  return { success: true };
}

Access Report

// Conductor tracks which members access which keys
const report = stateManager.getAccessReport();

console.log('Unused keys:', report.unusedKeys);
console.log('Access patterns:', report.accessPatterns);

State Snapshots

flow:
  - member: snapshot1
    state:
      set: [snapshot1]
    # Sets snapshot1 = entire current state

  - member: process

  - member: snapshot2
    state:
      set: [snapshot2]
    # Sets snapshot2 = entire current state

  - member: compare-snapshots
    state:
      use: [snapshot1, snapshot2]

Advanced Patterns

State Machine

state:
  schema:
    status: string  # 'pending', 'processing', 'completed', 'failed'
    data: object

flow:
  - member: start
    state:
      set: [status]  # Set to 'processing'

  - member: process
    state:
      use: [status, data]
      set: [status, data]  # Update to 'completed'

  - member: cleanup
    condition: ${state.status === 'completed'}

Accumulator Pattern

state:
  schema:
    results: array
    count: number

flow:
  - member: init
    state:
      set: [results, count]  # Initialize arrays

  - member: process-batch
    foreach: ${input.batches}
    state:
      use: [results, count]
      set: [results, count]  # Append to array

output:
  results: ${state.results}
  total: ${state.count}

Validation Chain

state:
  schema:
    validations: object  # Track validation results

flow:
  - member: validate-email
    state:
      set: [validations]

  - member: validate-age
    state:
      use: [validations]
      set: [validations]

  - member: validate-address
    state:
      use: [validations]
      set: [validations]

  - member: check-all-valid
    state:
      use: [validations]

Testing with State

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('workflow with state', () => {
  it('should share state between members', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeEnsemble('multi-step', {
      userId: 123
    });

    expect(result).toBeSuccessful();
    expect(result).toHaveState('userData');
    expect(result).toHaveState('analysis');
    expect(result.stateHistory).toHaveLength(3);  // 3 state updates
  });
});

Best Practices

  1. Declare what you use - Use use and set for clarity and optimization
  2. Keep state minimal - Store references, not large data
  3. Use descriptive names - userData not data1
  4. Initialize early - Set defaults in first member
  5. Avoid circular dependencies - Linear state flow is clearer
  6. Persist if needed - Save state to KV/D1 for resumption
  7. Type your state - Use TypeScript interfaces
  8. Log state changes - Debug with console.log