Ensembles
Ensembles orchestrate agents into workflows. They’re the top level - where you compose agents to accomplish complex tasks. Agents do work. Ensembles coordinate agents.Ensemble Structure
Copy
# ensembles/company-intelligence.yaml
ensemble: company-intelligence
description: Gather and analyze company data
agents:
# Step 1: Enrich company data
- name: enricher
agent: company-enricher
inputs:
company_name: ${input.company}
include_news: true
# Step 2: Scrape LinkedIn
- name: scrape-linkedin
agent: scraper
inputs:
url: https://linkedin.com/company/${input.company}
# Step 3: Analyze combined data
- name: analyze
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Analyze this company:
Data: ${enricher.output.company_data}
LinkedIn: ${scrape-linkedin.output}
News: ${enricher.output.news}
Provide: overview, strengths, challenges, outlook
output:
company_data: ${enricher.output.company_data}
linkedin: ${scrape-linkedin.output}
analysis: ${analyze.output}
Key Concepts
Agents in Ensembles
Ensembles orchestrate agents:Copy
agents:
# Custom agent
- name: enricher
agent: company-enricher
inputs:
company_name: ${input.company}
# Pre-built agent
- name: scrape
agent: scraper
inputs:
url: ${input.url}
# Inline operation (no separate agent file)
- name: transform
operation: code
config:
code: |
return { processed: ${enricher.output} };
Input/Output Mapping
Input: Data passed to ensembleCopy
const result = await conductor.execute('company-intelligence', {
company: 'Anthropic'
});
Copy
agents:
- name: enricher
agent: company-enricher
inputs:
company_name: ${input.company} # From execution input
Copy
output:
company: ${enricher.output.company_data}
analysis: ${analyze.output}
sources: [${enricher.output.source}, 'linkedin.com']
Flow Control
Sequential Execution
Agents run in order by default:Copy
agents:
- name: step1
agent: fetcher
inputs:
url: ${input.url}
- name: step2
operation: think
config:
prompt: Analyze: ${step1.output}
- name: step3
operation: storage
config:
type: d1
query: INSERT INTO results (data) VALUES (?)
params: [${step2.output}]
Parallel Execution
Agents without dependencies run in parallel:Copy
agents:
# These 3 run in parallel
- name: fetch-a
agent: fetcher
inputs:
url: https://api-a.com
- name: fetch-b
agent: fetcher
inputs:
url: https://api-b.com
- name: fetch-c
agent: fetcher
inputs:
url: https://api-c.com
# This waits for all 3
- name: merge
operation: code
config:
code: |
return {
a: ${fetch-a.output},
b: ${fetch-b.output},
c: ${fetch-c.output}
};
Conditional Execution
Skip agents based on conditions:Copy
agents:
- name: check-cache
operation: storage
config:
type: kv
action: get
key: result-${input.query}
# Only run if cache miss
- name: generate
condition: ${check-cache.output.value === null}
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: ${input.query}
# Only run if generated
- name: save-cache
condition: ${generate.executed}
operation: storage
config:
type: kv
action: put
key: result-${input.query}
value: ${generate.output}
output:
result: ${check-cache.output.value || generate.output}
from_cache: ${!generate.executed}
State Management
Share state across agents:Copy
ensemble: stateful-workflow
state:
schema:
processed_items: array
total_count: number
agents:
- name: process
operation: code
config:
code: |
const items = ${state.processed_items || []};
return {
items: [...items, ${input.item}],
count: items.length + 1
};
state:
use: [processed_items]
set:
processed_items: ${process.output.items}
total_count: ${process.output.count}
output:
count: ${state.total_count}
items: ${state.processed_items}
Ensemble Caching
Cache entire ensemble results:Copy
ensemble: expensive-analysis
cache:
ttl: 3600 # 1 hour
key: analysis-${input.document_id}
agents:
# ... agents ...
output:
result: ${analyze.output}
cached: ${__cache_hit} # True if from cache
Error Handling
Fallbacks
Copy
agents:
- name: try-primary
agent: fetcher
inputs:
url: https://primary-api.com
retry:
maxAttempts: 2
- name: try-secondary
condition: ${try-primary.failed}
agent: fetcher
inputs:
url: https://backup-api.com
- name: use-cache
condition: ${try-primary.failed && try-secondary.failed}
operation: storage
config:
type: kv
action: get
key: cached-data
output:
data: ${try-primary.output || try-secondary.output || use-cache.output}
source: ${try-primary.executed ? 'primary' : try-secondary.executed ? 'secondary' : 'cache'}
Validation
Copy
agents:
- name: process-data
agent: processor
inputs:
data: ${input.data}
- name: validate
agent: validator
inputs:
data: ${process-data.output}
- name: store
condition: ${validate.output.valid}
operation: storage
config:
type: d1
query: INSERT INTO results (data) VALUES (?)
params: [${process-data.output}]
output:
success: ${validate.output.valid}
data: ${process-data.output}
errors: ${validate.output.errors}
Real-World Patterns
RAG Pipeline
Copy
ensemble: rag-qa
agents:
- name: embed-query
operation: think
config:
provider: openai
model: text-embedding-3-small
input: ${input.question}
- name: search
operation: storage
config:
type: vectorize
action: query
vector: ${embed-query.output}
topK: 5
- name: fetch-docs
operation: storage
config:
type: d1
query: SELECT content FROM docs WHERE id IN (?)
params: [${search.output.ids}]
- name: generate-answer
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Context: ${fetch-docs.output.rows.map(r => r.content).join('\n')}
Question: ${input.question}
Answer using only the context.
output:
answer: ${generate-answer.output}
sources: ${search.output.ids}
Multi-Agent Analysis
Copy
ensemble: multi-agent-analysis
agents:
# Extract entities
- name: extract-entities
agent: entity-extractor
inputs:
text: ${input.text}
# Analyze sentiment
- name: analyze-sentiment
agent: sentiment-analyzer
inputs:
text: ${input.text}
# Generate summary
- name: summarize
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: Summarize: ${input.text}
# Synthesize results
- name: synthesize
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Entities: ${extract-entities.output}
Sentiment: ${analyze-sentiment.output}
Summary: ${summarize.output}
Provide comprehensive analysis.
output:
entities: ${extract-entities.output}
sentiment: ${analyze-sentiment.output}
summary: ${summarize.output}
analysis: ${synthesize.output}
HITL Approval Flow
Copy
ensemble: content-approval
agents:
- name: generate-content
operation: think
config:
provider: openai
model: gpt-4o
prompt: Generate content for: ${input.topic}
- name: request-approval
agent: hitl
inputs:
prompt: Review this content
context:
content: ${generate-content.output}
topic: ${input.topic}
- name: publish
condition: ${request-approval.output.approved}
operation: storage
config:
type: d1
query: INSERT INTO published (content) VALUES (?)
params: [${generate-content.output}]
output:
content: ${generate-content.output}
approved: ${request-approval.output.approved}
published: ${publish.executed}
A/B Testing in Ensembles
Test different variants:Copy
ensemble: ab-test-analysis
agents:
# Variant A: GPT-4
- name: analyze-a
condition: ${input.user_id % 2 === 0}
operation: think
config:
provider: openai
model: gpt-4o
prompt: ${input.text}
# Variant B: Claude
- name: analyze-b
condition: ${input.user_id % 2 === 1}
operation: think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
prompt: ${input.text}
# Log result
- name: log-variant
operation: storage
config:
type: d1
query: INSERT INTO ab_tests (user_id, variant) VALUES (?, ?)
params:
- ${input.user_id}
- ${analyze-a.executed ? 'A' : 'B'}
output:
analysis: ${analyze-a.output || analyze-b.output}
variant: ${analyze-a.executed ? 'A' : 'B'}
Best Practices
- Start Simple - One agent, then add complexity
- Parallel by Default - Conductor parallelizes automatically
- Cache Strategically - Cache expensive operations
- Handle Failures - Always have fallbacks
- Use State Sparingly - Only when truly needed
- Test Thoroughly - Integration test ensembles
- Monitor Performance - Track execution times
- Version Components - Use Edgit for prompts/configs

