Execution Modes
Sequential Execution
Agents with dependencies run sequentially:Copy
agents:
- name: fetch
agent: fetcher
inputs:
url: ${input.url}
- name: process
operation: code
config:
script: scripts/process-fetch-output
input:
fetchOutput: ${fetch.output}
Copy
// scripts/process-fetch-output.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function processFetchOutput(context: AgentExecutionContext) {
const { fetchOutput } = context.input
return { processed: fetchOutput }
}
Copy
- name: store
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: INSERT INTO data (json) VALUES (?)
params: [${process.output}]
Parallel Execution
Agents without dependencies run in parallel:Copy
agents:
# These 3 run in parallel
- name: fetch-a
agent: fetcher
inputs:
url: https://api-a.com
- name: fetch-b
agent: fetcher
inputs:
url: https://api-b.com
- name: fetch-c
agent: fetcher
inputs:
url: https://api-c.com
# This waits for all 3
- name: merge
operation: code
config:
script: scripts/merge-fetch-results
input:
fetchA: ${fetch-a.output}
fetchB: ${fetch-b.output}
fetchC: ${fetch-c.output}
Copy
// scripts/merge-fetch-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function mergeFetchResults(context: AgentExecutionContext) {
const { fetchA, fetchB, fetchC } = context.input
return {
a: fetchA,
b: fetchB,
c: fetchC
}
}
Conditional Execution
Simple Conditions
Skip agents based on boolean expressions:Copy
agents:
- name: generate
operation: think
condition: ${input.use_ai} # Only run if input.use_ai is true
config:
provider: openai
model: gpt-4o-mini
prompt: ${input.text}
Based on Previous Outputs
Copy
agents:
- name: validate
agent: validator
inputs:
data: ${input.data}
- name: process
condition: ${validate.output.valid} # Only run if valid
operation: code
config:
script: scripts/process-data
input:
data: ${input.data}
Copy
// scripts/process-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function processData(context: AgentExecutionContext) {
const { data } = context.input
return { processed: data }
}
Copy
- name: log-error
condition: ${!validate.output.valid} # Only run if invalid
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: INSERT INTO errors (data) VALUES (?)
params: [${validate.output.errors}]
Cache-or-Generate Pattern
Copy
agents:
- name: check-cache
operation: storage
config:
type: kv
action: get
key: result-${input.query}
# Only generate if cache miss
- name: generate
condition: ${check-cache.output.value === null}
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: ${input.query}
# Only save if generated
- name: save-cache
condition: ${generate.executed}
operation: storage
config:
type: kv
action: put
key: result-${input.query}
value: ${generate.output}
output:
result: ${check-cache.output.value || generate.output}
from_cache: ${!generate.executed}
Error Handling
Fallback Pattern
Copy
agents:
- name: try-primary
operation: http
config:
url: https://primary-api.com
retry:
maxAttempts: 2
backoff: exponential
- name: try-secondary
condition: ${try-primary.failed} # Only if primary failed
operation: http
config:
url: https://backup-api.com
- name: use-cache
condition: ${try-primary.failed && try-secondary.failed}
operation: storage
config:
type: kv
action: get
key: cached-data
output:
data: ${try-primary.output || try-secondary.output || use-cache.output}
source: ${(() => {
if (try-primary.executed && !try-primary.failed) return 'primary';
if (try-secondary.executed && !try-secondary.failed) return 'secondary';
return 'cache';
})()}
Retry Logic
Copy
agents:
- name: flaky-operation
operation: http
config:
url: https://api.example.com
retry:
maxAttempts: 3 # Try up to 3 times
backoff: exponential # Exponential backoff (1s, 2s, 4s)
initialDelay: 1000 # Start with 1 second
maxDelay: 10000 # Cap at 10 seconds
retryOn: [500, 502, 503] # Only retry these status codes
Complex Conditions
Multiple Conditions
Copy
agents:
- name: premium-processing
condition: ${input.user.premium && input.data.size < 10000000}
operation: think
config:
provider: openai
model: gpt-4o
prompt: ${input.data}
- name: standard-processing
condition: ${!input.user.premium || input.data.size >= 10000000}
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: ${input.data}
Dynamic Routing
Copy
agents:
- name: classify
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Classify this request: "${input.text}"
Return only: urgent, normal, or low
# Route based on classification
- name: urgent-handler
condition: ${classify.output === 'urgent'}
agent: urgent-processor
inputs:
data: ${input.text}
- name: normal-handler
condition: ${classify.output === 'normal'}
agent: normal-processor
inputs:
data: ${input.text}
- name: low-handler
condition: ${classify.output === 'low'}
agent: low-processor
inputs:
data: ${input.text}
output:
result: ${urgent-handler.output || normal-handler.output || low-handler.output}
priority: ${classify.output}
Loops and Iteration
Array Processing
Copy
agents:
# Process each item in array
- name: process-items
operation: code
config:
script: scripts/process-items
input:
items: ${input.items}
Copy
// scripts/process-items.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
function processItem(item: any) {
// Process individual item logic here
return { ...item, processed: true }
}
export default function processItems(context: AgentExecutionContext) {
const { items } = context.input
return {
results: items.map((item: any) => processItem(item))
}
}
Batch Processing
Copy
agents:
- name: fetch-batch
operation: data
config:
backend: d1
binding: DB
operation: query
sql: SELECT * FROM queue WHERE processed = false LIMIT ?
params: [${input.batch_size || 100}]
- name: process-batch
condition: ${fetch-batch.output.rows.length > 0}
operation: code
config:
script: scripts/process-batch-rows
input:
rows: ${fetch-batch.output.rows}
Copy
// scripts/process-batch-rows.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
function processRow(row: any) {
// Process individual row logic here
return { ...row, processed: true }
}
export default function processBatchRows(context: AgentExecutionContext) {
const { rows } = context.input
return {
results: rows.map((row: any) => ({
id: row.id,
result: processRow(row)
}))
}
}
Copy
- name: mark-processed
condition: ${process-batch.executed}
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
UPDATE queue SET processed = true
WHERE id IN (${process-batch.output.results.map(r => r.id).join(',')})
output:
processed_count: ${process-batch.output?.results.length || 0}
has_more: ${fetch-batch.output.rows.length === input.batch_size}
Performance Optimization
Minimize Dependencies
Copy
# Bad: Sequential (slow)
agents:
- name: step1
operation: http
config:
url: https://api-a.com
- name: step2
operation: http
config:
url: https://api-b.com
data: ${step1.output} # Creates dependency
# Good: Parallel (fast)
agents:
- name: step1
operation: http
config:
url: https://api-a.com
- name: step2
operation: http
config:
url: https://api-b.com
- name: merge
operation: code
config:
script: scripts/merge-step-outputs
input:
stepA: ${step1.output}
stepB: ${step2.output}
Copy
// scripts/merge-step-outputs.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function mergeStepOutputs(context: AgentExecutionContext) {
const { stepA, stepB } = context.input
return {
a: stepA,
b: stepB
}
}
Early Termination
Skip expensive operations when possible:Copy
agents:
# Quick validation first
- name: quick-check
operation: code
config:
script: scripts/quick-spam-check
input:
text: ${input.text}
Copy
// scripts/quick-spam-check.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function quickSpamCheck(context: AgentExecutionContext) {
const { text } = context.input
const lowerText = text.toLowerCase()
return { spam: lowerText.includes('viagra') }
}
Copy
# Only do expensive AI if not obvious spam
- name: ai-moderation
condition: ${!quick-check.output.spam}
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: Moderate: ${input.text}
output:
safe: ${quick-check.output.spam ? false : ai-moderation.output.safe}
reason: ${quick-check.output.spam ? 'spam detected' : ai-moderation.output.reason}
Advanced Patterns
Fan-Out/Fan-In
Process multiple items in parallel, then aggregate:Copy
agents:
# Fan-out: Process each URL in parallel
- name: scrape-url-1
condition: ${input.urls[0]}
agent: scraper
inputs:
url: ${input.urls[0]}
- name: scrape-url-2
condition: ${input.urls[1]}
agent: scraper
inputs:
url: ${input.urls[1]}
- name: scrape-url-3
condition: ${input.urls[2]}
agent: scraper
inputs:
url: ${input.urls[2]}
# Fan-in: Aggregate results
- name: aggregate
operation: code
config:
script: scripts/aggregate-scrape-results
input:
scrape1: ${scrape-url-1.output}
scrape2: ${scrape-url-2.output}
scrape3: ${scrape-url-3.output}
Copy
// scripts/aggregate-scrape-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function aggregateScrapeResults(context: AgentExecutionContext) {
const { scrape1, scrape2, scrape3 } = context.input
return {
results: [scrape1, scrape2, scrape3].filter(r => r !== undefined)
}
}
Copy
output:
all_results: ${aggregate.output.results}
Pipeline with Validation
Copy
agents:
- name: fetch-data
agent: fetcher
inputs:
url: ${input.url}
- name: validate-schema
agent: validator
inputs:
data: ${fetch-data.output}
schema: data-schema
- name: transform
condition: ${validate-schema.output.valid}
operation: code
config:
script: scripts/transform-data
inputs:
data: ${fetch-data.output}
- name: validate-business-rules
condition: ${transform.executed}
agent: validator
inputs:
data: ${transform.output}
schema: business-rules
- name: store
condition: ${validate-business-rules.output.valid}
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: INSERT INTO data (json) VALUES (?)
params: [${transform.output}]
output:
success: ${store.executed}
errors: ${validate-schema.output.errors || validate-business-rules.output.errors}
Circuit Breaker
Copy
agents:
- name: check-circuit
operation: storage
config:
type: kv
action: get
key: circuit-breaker-${input.service}
- name: call-service
condition: ${check-circuit.output.value !== 'open'}
operation: http
config:
url: ${input.service_url}
retry:
maxAttempts: 1
- name: update-circuit
operation: storage
config:
type: kv
action: put
key: circuit-breaker-${input.service}
value: ${call-service.failed ? 'open' : 'closed'}
expirationTtl: ${call-service.failed ? 60 : null}
- name: fallback
condition: ${call-service.failed || check-circuit.output.value === 'open'}
operation: storage
config:
type: kv
action: get
key: cached-${input.service}
output:
data: ${call-service.output || fallback.output}
circuit_open: ${check-circuit.output.value === 'open'}
Best Practices
- Parallel by Default - Only add dependencies when necessary
- Condition Expensive Operations - Skip work when possible
- Handle Failures - Always have fallbacks
- Use Retry Logic - For transient failures
- Early Termination - Fast checks before slow operations
- Cache Strategically - Cache expensive results
- Test Flow Paths - Test all conditional branches
- Monitor Performance - Track execution times

