Skip to main content

The Problem

In traditional workflows, data must be passed explicitly from one step to the next:
# ❌ Prop drilling hell
flow:
  - member: fetch-data
  - member: process-data
    input:
      data: ${fetch-data.output.data}
  - member: analyze-data
    input:
      data: ${fetch-data.output.data}
      processed: ${process-data.output.result}
  - member: generate-report
    input:
      data: ${fetch-data.output.data}
      processed: ${process-data.output.result}
      analysis: ${analyze-data.output.analysis}
This becomes unmaintainable as workflows grow.

The Solution

Shared state lets members read and write to a common state object:
# ✅ Clean with shared state
state:
  schema:
    rawData: object
    processed: object
    analysis: object

flow:
  - member: fetch-data
    state:
      set: [rawData]

  - member: process-data
    state:
      use: [rawData]
      set: [processed]

  - member: analyze-data
    state:
      use: [rawData, processed]
      set: [analysis]

  - member: generate-report
    state:
      use: [rawData, processed, analysis]

How It Works

1. Define State Schema

state:
  schema:
    companyData:
      type: object
      properties:
        name: string
        domain: string
        employees: number
    analysis:
      type: object
    financials:
      type: array

2. Declare State Access

Each member declares what it needs:
flow:
  - member: fetch-company
    state:
      set: [companyData]  # Can write to companyData

  - member: analyze-company
    state:
      use: [companyData]  # Can read companyData
      set: [analysis]     # Can write to analysis

3. Access State in Members

export default async function analyzeCompany({ input, state, setState }) {
  // Read from state
  const companyData = state.companyData;

  // Process data
  const analysis = await performAnalysis(companyData);

  // Write to state
  setState({ analysis });

  return { analysis };
}

State Features

Immutability

StateManager returns new instances on every mutation:
// Internally, Conductor creates new instances
const manager1 = new StateManager(config, initialState);
const manager2 = manager1.applyPendingUpdates({ key: 'value' });
const manager3 = manager2.clearAccessLog();

// manager1, manager2, manager3 are all different instances
// No shared mutable state
Why immutability?
  • Predictable state transitions
  • No accidental mutations
  • Easy to debug
  • Time-travel debugging possible

Selective Access

Members only see declared keys:
state:
  schema:
    sensitive: object
    public: object

flow:
  - member: restricted-member
    state:
      use: [public]  # Cannot see 'sensitive'
Benefits:
  • Principle of least privilege
  • Clear dependencies
  • Prevents accidental access
  • Optimization opportunities

Access Tracking

Conductor tracks all state access:
const report = manager.getAccessReport();

console.log(report);
// {
//   unusedKeys: ['financials'],  // Declared but never accessed
//   accessPatterns: {
//     companyData: [
//       { member: 'fetch-company', operation: 'write', timestamp: '...' },
//       { member: 'analyze-company', operation: 'read', timestamp: '...' }
//     ]
//   }
// }
Uses:
  • Identify unused state keys
  • Optimize state schema
  • Debug data flow
  • Performance profiling

State Patterns

Sequential Data Building

Build up state progressively:
state:
  schema:
    company: object
    financials: object
    news: array
    competitors: array

flow:
  - member: fetch-company
    state:
      set: [company]

  - member: fetch-financials
    state:
      use: [company]
      set: [financials]

  - member: fetch-news
    state:
      use: [company]
      set: [news]

  - member: fetch-competitors
    state:
      use: [company]
      set: [competitors]

  - member: generate-report
    state:
      use: [company, financials, news, competitors]

Parallel Data Gathering

Gather data in parallel, combine later:
state:
  schema:
    financials: object
    news: array
    social: object

flow:
  # Parallel fetch - each writes to different state key
  - parallel:
      - member: fetch-financials
        state:
          set: [financials]

      - member: fetch-news
        state:
          set: [news]

      - member: fetch-social
        state:
          set: [social]

  # Combine all data
  - member: analyze-all
    state:
      use: [financials, news, social]

Incremental Refinement

Refine data through multiple passes:
state:
  schema:
    rawData: object
    cleaned: object
    enriched: object
    analyzed: object

flow:
  - member: fetch-raw-data
    state:
      set: [rawData]

  - member: clean-data
    state:
      use: [rawData]
      set: [cleaned]

  - member: enrich-data
    state:
      use: [cleaned]
      set: [enriched]

  - member: analyze-data
    state:
      use: [enriched]
      set: [analyzed]

Conditional State Updates

Update state based on conditions:
state:
  schema:
    analysis: object
    humanReview: object

flow:
  - member: analyze-content
    state:
      set: [analysis]

  - branch:
      if: ${analysis.score} < 0.8
      then:
        - member: request-human-review
          state:
            use: [analysis]
            set: [humanReview]

State vs Member Outputs

When to use state vs accessing member outputs directly:

Use State When:

  1. Multiple members need the same data
state:
  set: [companyData]

# Many members can access without prop drilling
- member: analyze
  state:
    use: [companyData]
- member: report
  state:
    use: [companyData]
- member: store
  state:
    use: [companyData]
  1. Data is shared across the workflow
# Core data used throughout
state:
  schema:
    user: object
    session: object
  1. You want access tracking
# Track which members use what data
state:
  schema:
    sensitiveData: object

Use Member Outputs When:

  1. Data flows linearly
# Simple pipeline - no need for state
- member: step-1
- member: step-2
  input:
    result: ${step-1.output}
  1. Data is only used once
# Only next member needs it
- member: fetch
- member: process
  input:
    data: ${fetch.output}
  1. Workflow is simple
# Just 2-3 steps, no complex dependencies
flow:
  - member: fetch
  - member: process
  - member: save

State Initialization

Default Values

state:
  initial:
    counter: 0
    results: []
    metadata:
      startTime: null

Dynamic Initialization

state:
  initial:
    userId: ${input.userId}
    sessionId: ${input.sessionId}
    startedAt: ${input.timestamp}

State Schema Validation

Define strict schemas with Zod validation:
state:
  schema:
    company:
      type: object
      properties:
        name:
          type: string
          minLength: 1
        domain:
          type: string
          pattern: "^[a-z0-9-]+\\.[a-z]{2,}$"
        employees:
          type: number
          minimum: 1
      required: [name, domain]

    analysis:
      type: object
      properties:
        score:
          type: number
          minimum: 0
          maximum: 1
        summary:
          type: string

State Size Limits

State is stored in memory during execution:
EnvironmentLimit
DevelopmentUnlimited
Production128 KB recommended
Maximum10 MB (practical limit)
Best practices:
  • Store references, not large payloads
  • Use KV/D1/R2 for large data
  • Keep state lean and structured
# ❌ Don't store large data in state
state:
  schema:
    hugeDocument: object  # Multiple MB

# ✅ Store reference instead
state:
  schema:
    documentId: string  # Reference to KV/D1/R2

flow:
  - member: fetch-document
    # Fetch and store in KV
  - member: process-document
    input:
      documentId: ${state.documentId}  # Fetch from KV when needed

State Persistence

State lives only during ensemble execution:
Ensemble Start → State Created → Members Execute → State Discarded
For persistence across executions:

Option 1: Durable Objects

// Use ExecutionState Durable Object
const id = env.EXECUTION_STATE.idFromName(executionId);
const stub = env.EXECUTION_STATE.get(id);

// Store state
await stub.fetch('/state', {
  method: 'POST',
  body: JSON.stringify(state)
});

// Retrieve state
const response = await stub.fetch('/state');
const state = await response.json();

Option 2: KV Storage

flow:
  - member: save-state
    input:
      key: ${input.executionId}
      value: ${state}

  # Later execution
  - member: load-state
    input:
      key: ${input.executionId}

Option 3: D1 Database

flow:
  - member: queries
    input:
      operation: execute
      query: "INSERT INTO execution_state (id, state) VALUES (?, ?)"
      params: [${input.executionId}, ${state}]

State Debugging

Log State Changes

export default async function myMember({ state, setState }) {
  console.log('State before:', JSON.stringify(state, null, 2));

  setState({ newKey: 'newValue' });

  console.log('State after update:', JSON.stringify(state, null, 2));
}

Access Report

import { TestConductor } from '@ensemble-edge/conductor/testing';

const conductor = await TestConductor.create();
const result = await conductor.executeEnsemble('my-ensemble', input);

// Check state access
const report = result.stateReport;
console.log('Unused keys:', report.unusedKeys);
console.log('Access patterns:', report.accessPatterns);

State Snapshots

// In testing
const result = await conductor.executeEnsemble('my-ensemble', input);
expect(result).toHaveState('analysis', expect.objectContaining({
  score: expect.any(Number)
}));

Best Practices

1. Keep State Flat

# ✅ Good - Flat structure
state:
  schema:
    userData: object
    preferences: object
    session: object

# ❌ Bad - Deep nesting
state:
  schema:
    user:
      data:
        profile:
          preferences:
            theme: string

2. Use Descriptive Keys

# ✅ Good
state:
  schema:
    companyFinancials: object
    marketAnalysis: object
    competitorData: array

# ❌ Bad
state:
  schema:
    data1: object
    result: object
    stuff: array

3. Declare Minimal Access

# ✅ Good - Only what's needed
- member: generate-report
  state:
    use: [analysis]  # Just analysis

# ❌ Bad - Everything
- member: generate-report
  state:
    use: [companyData, financials, news, social, analysis]

4. Validate State Schema

state:
  schema:
    score:
      type: number
      minimum: 0
      maximum: 1
    status:
      type: string
      enum: [pending, completed, failed]

5. Initialize Important Keys

state:
  schema:
    counter: number
    results: array
  initial:
    counter: 0
    results: []

Example: E-Commerce Order Processing

name: process-order
description: Complete order processing workflow with state

state:
  schema:
    order: object
    payment: object
    inventory: object
    shipping: object
    notification: object

  initial:
    order: null
    payment: null
    inventory: null
    shipping: null
    notification: null

flow:
  # Validate order
  - member: validate-order
    input:
      orderId: ${input.orderId}
    state:
      set: [order]

  # Process payment
  - member: process-payment
    state:
      use: [order]
      set: [payment]

  # Reserve inventory
  - member: reserve-inventory
    state:
      use: [order]
      set: [inventory]

  # If payment failed, release inventory
  - branch:
      if: ${payment.status} != 'success'
      then:
        - member: release-inventory
          state:
            use: [inventory]
        - member: notify-payment-failed
          state:
            use: [order, payment]
            set: [notification]

  # If payment succeeded, create shipment
  - branch:
      if: ${payment.status} == 'success'
      then:
        - member: create-shipment
          state:
            use: [order, inventory]
            set: [shipping]

        - member: notify-order-confirmed
          state:
            use: [order, payment, shipping]
            set: [notification]

output:
  orderId: ${order.id}
  status: ${payment.status}
  trackingNumber: ${shipping.trackingNumber}
  notification: ${notification.sent}