Skip to main content
Think of state as shared memory for your workflow.

When to Use State

Use state when:
  • Multiple agents need access to the same data
  • You’re tracking workflow progress
  • You need to accumulate results across agents
  • Prop drilling becomes cumbersome
Don’t use state for:
  • Simple data passing between adjacent agents (use outputs instead)
  • Data that only one agent needs
  • Temporary calculations

Basic State

Define Schema

ensemble: stateful-workflow

state:
  schema:
    user_data: object
    processed_count: number
    errors: array

agents:
  # ... agents that use state ...

Read State

agents:
  - name: process
    operation: code
    config:
      script: scripts/increment-count
    input:
      current_count: ${state.processed_count || 0}
    state:
      use: [processed_count]  # Declare what you read
// scripts/increment-count.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function incrementCount(context: AgentExecutionContext) {
  const { current_count } = context.input
  return { new_count: current_count + 1 }
}

Write State

agents:
  - name: update
    operation: code
    config:
      script: scripts/update-user-state
    input:
      user: ${fetch-user.output}
      count: ${state.processed_count}
    state:
      use: [processed_count]
      set:
        user_data: ${update.output.user}
        processed_count: ${update.output.count}
// scripts/update-user-state.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function updateUserState(context: AgentExecutionContext) {
  const { user, count } = context.input
  return {
    user: user,
    count: count + 1
  }
}

State Patterns

Pattern 1: Accumulator

Build up results across agents:
ensemble: document-processor

state:
  schema:
    processed_docs: array
    total_words: number

agents:
  - name: process-doc
    operation: code
    config:
      script: scripts/accumulate-document
    input:
      doc_id: ${input.doc_id}
      content: ${input.content}
      current_docs: ${state.processed_docs || []}
      current_words: ${state.total_words || 0}
    state:
      use: [processed_docs, total_words]
      set:
        processed_docs: ${process-doc.output.docs}
        total_words: ${process-doc.output.words}

output:
  total_documents: ${state.processed_docs.length}
  total_words: ${state.total_words}
// scripts/accumulate-document.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function accumulateDocument(context: AgentExecutionContext) {
  const { doc_id, content, current_docs, current_words } = context.input

  const newDoc = {
    id: doc_id,
    words: content.split(' ').length
  }

  return {
    docs: [...current_docs, newDoc],
    words: current_words + newDoc.words
  }
}

Pattern 2: Configuration State

Share configuration across agents:
ensemble: configurable-pipeline

state:
  schema:
    config: object

agents:
  - name: load-config
    operation: storage
    config:
      type: kv
      action: get
      key: pipeline-config
    state:
      set:
        config: ${load-config.output.value}

  - name: step1
    operation: http
    config:
      url: ${state.config.api_url}
      headers:
        Authorization: Bearer ${state.config.api_key}
    state:
      use: [config]

  - name: step2
    operation: think
    config:
      provider: ${state.config.ai_provider}
      model: ${state.config.ai_model}
      prompt: ${input.text}
    state:
      use: [config]

Pattern 3: Error Tracking

Track errors across the workflow:
ensemble: resilient-pipeline

state:
  schema:
    errors: array
    retry_count: number

agents:
  - name: step1
    operation: http
    config:
      url: https://api.example.com
    retry:
      maxAttempts: 3

  - name: track-errors
    condition: ${step1.failed}
    operation: code
    config:
      script: scripts/track-error
    input:
      current_errors: ${state.errors || []}
      agent_name: step1
      error_message: ${step1.error}
    state:
      use: [errors]
      set:
        errors: ${track-errors.output.errors}

output:
  success: ${!step1.failed}
  errors: ${state.errors}
// scripts/track-error.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function trackError(context: AgentExecutionContext) {
  const { current_errors, agent_name, error_message } = context.input

  return {
    errors: [...current_errors, {
      agent: agent_name,
      error: error_message,
      timestamp: Date.now()
    }]
  }
}

Pattern 4: Multi-Step Context

Build context across multiple AI calls:
ensemble: contextual-conversation

state:
  schema:
    conversation_history: array
    user_preferences: object

agents:
  - name: load-history
    operation: storage
    config:
      type: d1
      query: SELECT * FROM conversations WHERE user_id = ? ORDER BY timestamp DESC LIMIT 10
      params: [${input.user_id}]
    state:
      set:
        conversation_history: ${load-history.output.rows}

  - name: generate-response
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Conversation history:
        ${state.conversation_history.map(m => `${m.role}: ${m.content}`).join('\n')}

        User preferences: ${JSON.stringify(state.user_preferences)}

        New message: ${input.message}

        Respond naturally.
    state:
      use: [conversation_history, user_preferences]

  - name: save-message
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO conversations (user_id, role, content, timestamp)
        VALUES (?, 'assistant', ?, ?)
      params:
        - ${input.user_id}
        - ${generate-response.output}
        - ${Date.now()}

output:
  response: ${generate-response.output}

State vs Outputs

Use Outputs For

Simple data passing between adjacent agents:
# Good: Direct output reference
agents:
  - name: fetch
    operation: http
    config:
      url: ${input.url}

  - name: process
    operation: code
    config:
      script: scripts/process-fetch-output
    input:
      fetch_output: ${fetch.output}
// scripts/process-fetch-output.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processFetchOutput(context: AgentExecutionContext) {
  const { fetch_output } = context.input
  return { processed: fetch_output }
}

Use State For

Data needed by multiple non-adjacent agents:
# Good: State for shared data
state:
  schema:
    api_token: string

agents:
  - name: authenticate
    operation: http
    config:
      url: https://auth.example.com
    state:
      set:
        api_token: ${authenticate.output.token}

  - name: step1
    operation: http
    config:
      url: https://api.example.com/step1
      headers:
        Authorization: Bearer ${state.api_token}
    state:
      use: [api_token]

  - name: step2
    operation: http
    config:
      url: https://api.example.com/step2
      headers:
        Authorization: Bearer ${state.api_token}
    state:
      use: [api_token]

State Persistence

KV-Backed State

Persist state across executions:
ensemble: stateful-workflow

state:
  backend: kv
  key: workflow-${input.workflow_id}
  schema:
    progress: number
    data: object

agents:
  - name: load-state
    operation: storage
    config:
      type: kv
      action: get
      key: workflow-${input.workflow_id}
    state:
      set:
        progress: ${load-state.output.value?.progress || 0}
        data: ${load-state.output.value?.data || {}}

  - name: process
    operation: code
    config:
      script: scripts/update-progress
    input:
      current_progress: ${state.progress}
      current_data: ${state.data}
    state:
      use: [progress, data]
      set:
        progress: ${process.output.progress}
        data: ${process.output.data}
// scripts/update-progress.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function updateProgress(context: AgentExecutionContext) {
  const { current_progress, current_data } = context.input

  return {
    progress: current_progress + 10,
    data: { ...current_data, updated: Date.now() }
  }
}
  - name: save-state
    operation: storage
    config:
      type: kv
      action: put
      key: workflow-${input.workflow_id}
      value:
        progress: ${state.progress}
        data: ${state.data}
    state:
      use: [progress, data]

D1-Backed State

For complex queries and relationships:
ensemble: stateful-pipeline

agents:
  - name: load-workflow-state
    operation: storage
    config:
      type: d1
      query: SELECT * FROM workflow_state WHERE id = ?
      params: [${input.workflow_id}]
    state:
      set:
        workflow_data: ${load-workflow-state.output.rows[0]}

  - name: process
    operation: code
    config:
      script: scripts/update-workflow-status
    state:
      use: [workflow_data]
// scripts/update-workflow-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function updateWorkflowStatus(context: AgentExecutionContext) {
  return {
    status: 'processing',
    updated_at: Date.now()
  }
}
  - name: save-workflow-state
    operation: storage
    config:
      type: d1
      query: |
        UPDATE workflow_state
        SET status = ?, updated_at = ?
        WHERE id = ?
      params:
        - ${process.output.status}
        - ${process.output.updated_at}
        - ${input.workflow_id}

State Scoping

Ensemble-Level State

Shared across all agents in the ensemble:
ensemble: my-ensemble

state:
  schema:
    shared_data: object

agents:
  - name: agent1
    state:
      set:
        shared_data: ${agent1.output}

  - name: agent2
    state:
      use: [shared_data]

Agent-Level State

Each agent has its own state (for custom agents):
# agents/stateful-agent/agent.yaml
agent: stateful-agent

state:
  schema:
    internal_counter: number

operations:
  - name: increment
    operation: code
    config:
      script: scripts/increment-internal-counter
    input:
      counter: ${state.internal_counter || 0}
    state:
      use: [internal_counter]
      set:
        internal_counter: ${increment.output.count}
// scripts/increment-internal-counter.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function incrementInternalCounter(context: AgentExecutionContext) {
  const { counter } = context.input
  return { count: counter + 1 }
}

Best Practices

  1. Minimize State Usage - Use outputs for simple data passing
  2. Clear Schema - Define types explicitly
  3. Declare Dependencies - Always use use: and set:
  4. Persist When Needed - Use KV/D1 for long-running workflows
  5. Avoid Circular Dependencies - Don’t create state loops
  6. Initialize State - Provide defaults (|| 0, || [])
  7. Test State Logic - Unit test state transformations
  8. Monitor State Size - Keep state compact

Performance Considerations

State Access Cost

Reading state is fast (in-memory):
# Fast: Direct state access
agents:
  - name: process
    operation: code
    config:
      script: scripts/increment-counter-value
    input:
      counter: ${state.counter}
    state:
      use: [counter]
// scripts/increment-counter-value.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function incrementCounterValue(context: AgentExecutionContext) {
  const { counter } = context.input
  return { value: counter + 1 }
}

State Write Cost

Writing to persistent backends has latency:
# Slower: KV write
agents:
  - name: save
    operation: storage
    config:
      type: kv
      action: put
      key: state-${input.id}
      value: ${state}
    state:
      use: [state]
Tip: Batch state writes at the end of the workflow rather than after each agent.

Debugging State

Log State Changes

agents:
  - name: process
    operation: code
    config:
      script: scripts/process-with-logging
    input:
      data: ${input.data}
      current_state: ${state}
    state:
      use: [data]
      set:
        data: ${process.output}
// scripts/process-with-logging.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processWithLogging(context: AgentExecutionContext) {
  const { data, current_state } = context.input

  console.log('State before:', JSON.stringify(current_state))
  const result = processData(data)
  console.log('State after:', JSON.stringify(result))

  return result
}

function processData(data: any) {
  // Process data implementation
  return { processed: true, data }
}

Output State

Include state in ensemble output for debugging:
output:
  result: ${final-agent.output}
  debug_state: ${state}  # Include entire state

Next Steps