Documentation Index
Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
Use this file to discover all available pages before exploring further.
Think of state as shared memory for your workflow.
When to Use State
Use state when:
- Multiple agents need access to the same data
- You’re tracking workflow progress
- You need to accumulate results across agents
- Prop drilling becomes cumbersome
Don’t use state for:
- Simple data passing between adjacent agents (use outputs instead)
- Data that only one agent needs
- Temporary calculations
Basic State
Define Schema
ensemble: stateful-workflow
state:
schema:
user_data: object
processed_count: number
errors: array
agents:
# ... agents that use state ...
Read State
agents:
- name: process
operation: code
config:
script: scripts/increment-count
input:
current_count: ${state.processed_count || 0}
state:
use: [processed_count] # Declare what you read
// scripts/increment-count.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function incrementCount(context: AgentExecutionContext) {
const { current_count } = context.input
return { new_count: current_count + 1 }
}
Write State
agents:
- name: update
operation: code
config:
script: scripts/update-user-state
input:
user: ${fetch-user.output}
count: ${state.processed_count}
state:
use: [processed_count]
set:
user_data: ${update.output.user}
processed_count: ${update.output.count}
// scripts/update-user-state.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function updateUserState(context: AgentExecutionContext) {
const { user, count } = context.input
return {
user: user,
count: count + 1
}
}
State Patterns
Pattern 1: Accumulator
Build up results across agents:
ensemble: document-processor
state:
schema:
processed_docs: array
total_words: number
agents:
- name: process-doc
operation: code
config:
script: scripts/accumulate-document
input:
doc_id: ${input.doc_id}
content: ${input.content}
current_docs: ${state.processed_docs || []}
current_words: ${state.total_words || 0}
state:
use: [processed_docs, total_words]
set:
processed_docs: ${process-doc.output.docs}
total_words: ${process-doc.output.words}
output:
total_documents: ${state.processed_docs.length}
total_words: ${state.total_words}
// scripts/accumulate-document.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function accumulateDocument(context: AgentExecutionContext) {
const { doc_id, content, current_docs, current_words } = context.input
const newDoc = {
id: doc_id,
words: content.split(' ').length
}
return {
docs: [...current_docs, newDoc],
words: current_words + newDoc.words
}
}
Pattern 2: Configuration State
Share configuration across agents:
ensemble: configurable-pipeline
state:
schema:
config: object
agents:
- name: load-config
operation: storage
config:
type: kv
action: get
key: pipeline-config
state:
set:
config: ${load-config.output.value}
- name: step1
operation: http
config:
url: ${state.config.api_url}
headers:
Authorization: Bearer ${state.config.api_key}
state:
use: [config]
- name: step2
operation: think
config:
provider: ${state.config.ai_provider}
model: ${state.config.ai_model}
prompt: ${input.text}
state:
use: [config]
Pattern 3: Error Tracking
Track errors across the workflow:
ensemble: resilient-pipeline
state:
schema:
errors: array
retry_count: number
agents:
- name: step1
operation: http
config:
url: https://api.example.com
retry:
maxAttempts: 3
- name: track-errors
condition: ${step1.failed}
operation: code
config:
script: scripts/track-error
input:
current_errors: ${state.errors || []}
agent_name: step1
error_message: ${step1.error}
state:
use: [errors]
set:
errors: ${track-errors.output.errors}
output:
success: ${!step1.failed}
errors: ${state.errors}
// scripts/track-error.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function trackError(context: AgentExecutionContext) {
const { current_errors, agent_name, error_message } = context.input
return {
errors: [...current_errors, {
agent: agent_name,
error: error_message,
timestamp: Date.now()
}]
}
}
Pattern 4: Multi-Step Context
Build context across multiple AI calls:
ensemble: contextual-conversation
state:
schema:
conversation_history: array
user_preferences: object
agents:
- name: load-history
operation: storage
config:
type: d1
query: SELECT * FROM conversations WHERE user_id = ? ORDER BY timestamp DESC LIMIT 10
params: [${input.user_id}]
state:
set:
conversation_history: ${load-history.output.rows}
- name: generate-response
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Conversation history:
${state.conversation_history.map(m => `${m.role}: ${m.content}`).join('\n')}
User preferences: ${JSON.stringify(state.user_preferences)}
New message: ${input.message}
Respond naturally.
state:
use: [conversation_history, user_preferences]
- name: save-message
operation: storage
config:
type: d1
query: |
INSERT INTO conversations (user_id, role, content, timestamp)
VALUES (?, 'assistant', ?, ?)
params:
- ${input.user_id}
- ${generate-response.output}
- ${Date.now()}
output:
response: ${generate-response.output}
State vs Outputs
Use Outputs For
Simple data passing between adjacent agents:
# Good: Direct output reference
agents:
- name: fetch
operation: http
config:
url: ${input.url}
- name: process
operation: code
config:
script: scripts/process-fetch-output
input:
fetch_output: ${fetch.output}
// scripts/process-fetch-output.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function processFetchOutput(context: AgentExecutionContext) {
const { fetch_output } = context.input
return { processed: fetch_output }
}
Use State For
Data needed by multiple non-adjacent agents:
# Good: State for shared data
state:
schema:
api_token: string
agents:
- name: authenticate
operation: http
config:
url: https://auth.example.com
state:
set:
api_token: ${authenticate.output.token}
- name: step1
operation: http
config:
url: https://api.example.com/step1
headers:
Authorization: Bearer ${state.api_token}
state:
use: [api_token]
- name: step2
operation: http
config:
url: https://api.example.com/step2
headers:
Authorization: Bearer ${state.api_token}
state:
use: [api_token]
State Persistence
KV-Backed State
Persist state across executions:
ensemble: stateful-workflow
state:
backend: kv
key: workflow-${input.workflow_id}
schema:
progress: number
data: object
agents:
- name: load-state
operation: storage
config:
type: kv
action: get
key: workflow-${input.workflow_id}
state:
set:
progress: ${load-state.output.value?.progress || 0}
data: ${load-state.output.value?.data || {}}
- name: process
operation: code
config:
script: scripts/update-progress
input:
current_progress: ${state.progress}
current_data: ${state.data}
state:
use: [progress, data]
set:
progress: ${process.output.progress}
data: ${process.output.data}
// scripts/update-progress.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function updateProgress(context: AgentExecutionContext) {
const { current_progress, current_data } = context.input
return {
progress: current_progress + 10,
data: { ...current_data, updated: Date.now() }
}
}
- name: save-state
operation: storage
config:
type: kv
action: put
key: workflow-${input.workflow_id}
value:
progress: ${state.progress}
data: ${state.data}
state:
use: [progress, data]
D1-Backed State
For complex queries and relationships:
ensemble: stateful-pipeline
agents:
- name: load-workflow-state
operation: storage
config:
type: d1
query: SELECT * FROM workflow_state WHERE id = ?
params: [${input.workflow_id}]
state:
set:
workflow_data: ${load-workflow-state.output.rows[0]}
- name: process
operation: code
config:
script: scripts/update-workflow-status
state:
use: [workflow_data]
// scripts/update-workflow-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function updateWorkflowStatus(context: AgentExecutionContext) {
return {
status: 'processing',
updated_at: Date.now()
}
}
- name: save-workflow-state
operation: storage
config:
type: d1
query: |
UPDATE workflow_state
SET status = ?, updated_at = ?
WHERE id = ?
params:
- ${process.output.status}
- ${process.output.updated_at}
- ${input.workflow_id}
State Scoping
Ensemble-Level State
Shared across all agents in the ensemble:
ensemble: my-ensemble
state:
schema:
shared_data: object
agents:
- name: agent1
state:
set:
shared_data: ${agent1.output}
- name: agent2
state:
use: [shared_data]
Agent-Level State
Each agent has its own state (for custom agents):
# agents/stateful-agent/agent.yaml
agent: stateful-agent
state:
schema:
internal_counter: number
operations:
- name: increment
operation: code
config:
script: scripts/increment-internal-counter
input:
counter: ${state.internal_counter || 0}
state:
use: [internal_counter]
set:
internal_counter: ${increment.output.count}
// scripts/increment-internal-counter.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function incrementInternalCounter(context: AgentExecutionContext) {
const { counter } = context.input
return { count: counter + 1 }
}
Best Practices
- Minimize State Usage - Use outputs for simple data passing
- Clear Schema - Define types explicitly
- Declare Dependencies - Always use
use: and set:
- Persist When Needed - Use KV/D1 for long-running workflows
- Avoid Circular Dependencies - Don’t create state loops
- Initialize State - Provide defaults (
|| 0, || [])
- Test State Logic - Unit test state transformations
- Monitor State Size - Keep state compact
State Access Cost
Reading state is fast (in-memory):
# Fast: Direct state access
agents:
- name: process
operation: code
config:
script: scripts/increment-counter-value
input:
counter: ${state.counter}
state:
use: [counter]
// scripts/increment-counter-value.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function incrementCounterValue(context: AgentExecutionContext) {
const { counter } = context.input
return { value: counter + 1 }
}
State Write Cost
Writing to persistent backends has latency:
# Slower: KV write
agents:
- name: save
operation: storage
config:
type: kv
action: put
key: state-${input.id}
value: ${state}
state:
use: [state]
Tip: Batch state writes at the end of the workflow rather than after each agent.
Debugging State
Log State Changes
agents:
- name: process
operation: code
config:
script: scripts/process-with-logging
input:
data: ${input.data}
current_state: ${state}
state:
use: [data]
set:
data: ${process.output}
// scripts/process-with-logging.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function processWithLogging(context: AgentExecutionContext) {
const { data, current_state } = context.input
console.log('State before:', JSON.stringify(current_state))
const result = processData(data)
console.log('State after:', JSON.stringify(result))
return result
}
function processData(data: any) {
// Process data implementation
return { processed: true, data }
}
Output State
Include state in ensemble output for debugging:
output:
result: ${final-agent.output}
debug_state: ${state} # Include entire state
Next Steps
Flow Control
Control execution flow
A/B Testing
Test multiple variants
Operations
Learn about operations
Playbooks
Real-world examples