Skip to main content
Agents do work. Ensembles coordinate agents.

Ensemble Structure

# ensembles/company-intelligence.yaml
ensemble: company-intelligence
description: Gather and analyze company data

agents:
  # Step 1: Enrich company data
  - name: enricher
    agent: company-enricher
    inputs:
      company_name: ${input.company}
      include_news: true

  # Step 2: Scrape LinkedIn
  - name: scrape-linkedin
    agent: scraper
    inputs:
      url: https://linkedin.com/company/${input.company}

  # Step 3: Analyze combined data
  - name: analyze
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Analyze this company:

        Data: ${enricher.output.company_data}
        LinkedIn: ${scrape-linkedin.output}
        News: ${enricher.output.news}

        Provide: overview, strengths, challenges, outlook

output:
  company_data: ${enricher.output.company_data}
  linkedin: ${scrape-linkedin.output}
  analysis: ${analyze.output}

Key Concepts

Agents in Ensembles

Ensembles orchestrate agents:
agents:
  # Custom agent
  - name: enricher
    agent: company-enricher
    inputs:
      company_name: ${input.company}

  # Pre-built agent
  - name: scrape
    agent: scraper
    inputs:
      url: ${input.url}

  # Inline operation (no separate agent file)
  - name: transform
    operation: code
    config:
      script: scripts/process-enricher-output
    input:
      enricherOutput: ${enricher.output}
// scripts/process-enricher-output.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processEnricherOutput(context: AgentExecutionContext) {
  const { enricherOutput } = context.input
  return { processed: enricherOutput }
}

Input/Output Mapping

Input: Data passed to ensemble
const result = await conductor.execute('company-intelligence', {
  company: 'Anthropic'
});
Access in agents:
agents:
  - name: enricher
    agent: company-enricher
    inputs:
      company_name: ${input.company}  # From execution input
Output: Shape the final result
output:
  company: ${enricher.output.company_data}
  analysis: ${analyze.output}
  sources: [${enricher.output.source}, 'linkedin.com']

Flow Control

Sequential Execution

Agents run in order by default:
agents:
  - name: step1
    agent: fetcher
    inputs:
      url: ${input.url}

  - name: step2
    operation: think
    config:
      prompt: Analyze: ${step1.output}

  - name: step3
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO results (data) VALUES (?)
      params: [${step2.output}]

Parallel Execution

Agents without dependencies run in parallel:
agents:
  # These 3 run in parallel
  - name: fetch-a
    agent: fetcher
    inputs:
      url: https://api-a.com

  - name: fetch-b
    agent: fetcher
    inputs:
      url: https://api-b.com

  - name: fetch-c
    agent: fetcher
    inputs:
      url: https://api-c.com

  # This waits for all 3
  - name: merge
    operation: code
    config:
      script: scripts/merge-fetch-results
    input:
      fetchA: ${fetch-a.output}
      fetchB: ${fetch-b.output}
      fetchC: ${fetch-c.output}
// scripts/merge-fetch-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function mergeFetchResults(context: AgentExecutionContext) {
  const { fetchA, fetchB, fetchC } = context.input
  return {
    a: fetchA,
    b: fetchB,
    c: fetchC
  }
}

Conditional Execution

Skip agents based on conditions:
agents:
  - name: check-cache
    operation: storage
    config:
      type: kv
      action: get
      key: result-${input.query}

  # Only run if cache miss
  - name: generate
    condition: ${check-cache.output.value === null}
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: ${input.query}

  # Only run if generated
  - name: save-cache
    condition: ${generate.executed}
    operation: storage
    config:
      type: kv
      action: put
      key: result-${input.query}
      value: ${generate.output}

output:
  result: ${check-cache.output.value || generate.output}
  from_cache: ${!generate.executed}

State Management

Share state across agents:
ensemble: stateful-workflow

state:
  schema:
    processed_items: array
    total_count: number

agents:
  - name: process
    operation: code
    config:
      script: scripts/process-with-state
    input:
      processedItems: ${state.processed_items || []}
      item: ${input.item}
// scripts/process-with-state.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processWithState(context: AgentExecutionContext) {
  const { processedItems, item } = context.input
  return {
    items: [...processedItems, item],
    count: processedItems.length + 1
  }
}
    state:
      use: [processed_items]
      set:
        processed_items: ${process.output.items}
        total_count: ${process.output.count}

output:
  count: ${state.total_count}
  items: ${state.processed_items}

Ensemble Caching

Cache entire ensemble results:
ensemble: expensive-analysis

cache:
  ttl: 3600  # 1 hour
  key: analysis-${input.document_id}

agents:
  # ... agents ...

output:
  result: ${analyze.output}
  cached: ${__cache_hit}  # True if from cache

Error Handling

Fallbacks

agents:
  - name: try-primary
    agent: fetcher
    inputs:
      url: https://primary-api.com
    retry:
      maxAttempts: 2

  - name: try-secondary
    condition: ${try-primary.failed}
    agent: fetcher
    inputs:
      url: https://backup-api.com

  - name: use-cache
    condition: ${try-primary.failed && try-secondary.failed}
    operation: storage
    config:
      type: kv
      action: get
      key: cached-data

output:
  data: ${try-primary.output || try-secondary.output || use-cache.output}
  source: ${try-primary.executed ? 'primary' : try-secondary.executed ? 'secondary' : 'cache'}

Validation

agents:
  - name: process-data
    agent: processor
    inputs:
      data: ${input.data}

  - name: validate
    agent: validator
    inputs:
      data: ${process-data.output}

  - name: store
    condition: ${validate.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO results (data) VALUES (?)
      params: [${process-data.output}]

output:
  success: ${validate.output.valid}
  data: ${process-data.output}
  errors: ${validate.output.errors}

Conditional Output Blocks

Control HTTP response status codes, headers, and body formats based on execution results. Conditional outputs allow you to implement proper REST API semantics, handle errors gracefully, and support multiple response formats.

Basic Conditional Outputs

Use when conditions to select the appropriate response:
ensemble: user-lookup

trigger:
  - type: http
    path: /api/users/:id
    methods: [GET]
    public: true

agents:
  - name: fetch-user
    operation: data
    config:
      backend: d1
      binding: DB
      query: "SELECT * FROM users WHERE id = ?"
      params: [${input.params.id}]

output:
  # Success case - user found
  - when: ${fetch-user.output.length > 0}
    status: 200
    body:
      data: ${fetch-user.output[0]}
      success: true

  # Not found case
  - when: ${fetch-user.output.length === 0}
    status: 404
    body:
      error: 'not_found'
      message: 'User not found'

  # Default fallback (no when = always matches)
  - status: 500
    body:
      error: 'unknown'
      message: 'An unexpected error occurred'
Key concepts:
  • Conditions are evaluated in order
  • First matching when condition is used
  • Block without when acts as default fallback
  • Always provide a fallback for safety

Custom Status Codes

Support all HTTP status codes (2xx, 3xx, 4xx, 5xx):
output:
  # Success
  - when: ${create-user.output.created}
    status: 201
    body:
      id: ${create-user.output.id}
      message: 'User created successfully'

  # Validation error
  - when: ${validate.output.errors}
    status: 422
    body:
      error: 'validation_failed'
      details: ${validate.output.errors}

  # Conflict (duplicate)
  - when: ${create-user.output.duplicate}
    status: 409
    body:
      error: 'conflict'
      message: 'User already exists'

  # Rate limited
  - when: ${rate-limit.output.exceeded}
    status: 429
    body:
      error: 'rate_limit_exceeded'
      retry_after: ${rate-limit.output.retryAfter}

  # Server error
  - when: ${create-user.failed}
    status: 500
    body:
      error: 'internal_error'

HTTP Redirects

Implement redirects with custom status codes:
ensemble: url-shortener

trigger:
  - type: http
    path: /:shortcode
    methods: [GET]
    public: true

agents:
  - name: lookup-url
    operation: storage
    config:
      type: kv
      action: get
      key: url-${input.params.shortcode}

output:
  # Permanent redirect (301)
  - when: ${lookup-url.output.value && lookup-url.output.permanent}
    redirect:
      url: ${lookup-url.output.value}
      status: 301

  # Temporary redirect (302)
  - when: ${lookup-url.output.value}
    redirect:
      url: ${lookup-url.output.value}
      status: 302

  # Not found
  - when: ${!lookup-url.output.value}
    status: 404
    body:
      error: 'not_found'
      message: 'Short URL not found'
Redirect status codes:
  • 301 - Permanent redirect (browsers cache)
  • 302 - Temporary redirect (default)
  • 303 - See Other (POST → GET)
  • 307 - Temporary redirect (preserve method)
  • 308 - Permanent redirect (preserve method)

Custom Headers

Add custom headers to responses:
output:
  # JSON response with custom headers
  - when: ${format === 'json'}
    status: 200
    headers:
      Content-Type: application/json
      X-API-Version: '2.0'
      X-Rate-Limit-Remaining: ${rate-limit.output.remaining}
    body:
      data: ${process.output}

  # YAML response using format field
  - when: ${format === 'yaml'}
    status: 200
    headers:
      X-Generated-At: ${new Date().toISOString()}
    format:
      type: yaml
      extract: config
    body:
      config: ${yamlContent}

  # CSV download using format field
  - when: ${format === 'csv'}
    status: 200
    headers:
      Content-Disposition: 'attachment; filename="export.csv"'
    format:
      type: csv
      extract: data
    body:
      data: ${csvContent}

Response Formats

Use the format field to automatically serialize responses and set Content-Type headers:
TypeContent-TypeDescription
jsonapplication/jsonJSON serialization (default)
texttext/plainPlain text
htmltext/htmlHTML content
xmlapplication/xmlXML content
csvtext/csvCSV serialization from arrays
markdowntext/markdownMarkdown content
yamlapplication/x-yamlYAML serialization
icstext/calendariCalendar format
rssapplication/rss+xmlRSS feed
atomapplication/atom+xmlAtom feed
ensemble: data-export

agents:
  - name: fetch-data
    operation: data
    config:
      backend: d1
      binding: DB
      query: "SELECT * FROM exports WHERE id = ?"
      params: [${input.id}]

output:
  # CSV export - auto-serializes array to CSV
  - when: ${input.format === 'csv'}
    status: 200
    headers:
      Content-Disposition: 'attachment; filename="export.csv"'
    format:
      type: csv
      extract: records
    body:
      records: ${fetch-data.output}

  # YAML export - auto-serializes object to YAML
  - when: ${input.format === 'yaml'}
    status: 200
    format:
      type: yaml
      extract: data
    body:
      data: ${fetch-data.output}

  # JSON response (default)
  - status: 200
    body:
      data: ${fetch-data.output}
Format Options:
# Simple format (just type)
format: csv

# Format with extraction
format:
  type: csv
  extract: users  # Extract 'users' field from body for serialization
When to use format:
  • Non-JSON content (XML, CSV, YAML, plain text)
  • File downloads (CSV, iCalendar)
  • API documentation formats (RSS, Atom)
  • Automatic Content-Type handling
When to use body (default JSON):
  • JSON API responses
  • Object/array data structures
  • Standard REST APIs

Complex Example: RESTful CRUD

Complete example with multiple status codes and error handling:
ensemble: products-api

trigger:
  - type: http
    paths:
      - path: /api/products/:id
        methods: [GET, PUT, DELETE]
    auth:
      type: bearer
      secret: ${env.API_KEY}

agents:
  - name: validate-auth
    operation: code
    config:
      handler: |
        const token = context.input.headers.authorization
        return { valid: token === process.env.API_KEY }

  - name: fetch-product
    condition: ${validate-auth.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      query: "SELECT * FROM products WHERE id = ?"
      params: [${input.params.id}]

  - name: update-product
    condition: ${input.method === 'PUT' && fetch-product.output.length > 0}
    operation: data
    config:
      backend: d1
      binding: DB
      query: "UPDATE products SET name = ?, price = ? WHERE id = ?"
      params: [${input.body.name}, ${input.body.price}, ${input.params.id}]

  - name: delete-product
    condition: ${input.method === 'DELETE' && fetch-product.output.length > 0}
    operation: data
    config:
      backend: d1
      binding: DB
      query: "DELETE FROM products WHERE id = ?"
      params: [${input.params.id}]

output:
  # Unauthorized
  - when: ${!validate-auth.output.valid}
    status: 401
    body:
      error: 'unauthorized'
      message: 'Invalid or missing authentication token'

  # GET success
  - when: ${input.method === 'GET' && fetch-product.output.length > 0}
    status: 200
    headers:
      Cache-Control: 'max-age=300'
    body:
      data: ${fetch-product.output[0]}

  # PUT success
  - when: ${input.method === 'PUT' && update-product.executed}
    status: 200
    body:
      message: 'Product updated successfully'
      data: ${fetch-product.output[0]}

  # DELETE success
  - when: ${input.method === 'DELETE' && delete-product.executed}
    status: 204
    body: null

  # Not found
  - when: ${fetch-product.output.length === 0}
    status: 404
    body:
      error: 'not_found'
      message: 'Product not found'

  # Server error
  - status: 500
    body:
      error: 'internal_error'
      message: 'An unexpected error occurred'

Best Practices

  1. Always provide a fallback - Include a default output block without when
  2. Order matters - Place more specific conditions first
  3. Use appropriate status codes - Follow HTTP semantics
  4. Include error details - Help clients debug issues
  5. Set proper headers - Content-Type, Cache-Control, etc.
  6. Handle authentication - Return 401/403 for auth failures
  7. Validate input - Return 422 for validation errors
  8. Use format for non-JSON - Use the format field for CSV, XML, YAML, etc.

Real-World Patterns

RAG Pipeline

ensemble: rag-qa

agents:
  - name: embed-query
    operation: think
    config:
      provider: openai
      model: text-embedding-3-small
      input: ${input.question}

  - name: search
    operation: data
    config:
      backend: vectorize
      binding: VECTORIZE
      operation: query
      vector: ${embed-query.output}
      topK: 5

  - name: fetch-docs
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: SELECT content FROM docs WHERE id IN (?)
      params: [${search.output.ids}]

  - name: generate-answer
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Context: ${fetch-docs.output.rows.map(r => r.content).join('\n')}
        Question: ${input.question}
        Answer using only the context.

output:
  answer: ${generate-answer.output}
  sources: ${search.output.ids}

Multi-Agent Analysis

ensemble: multi-agent-analysis

agents:
  # Extract entities
  - name: extract-entities
    agent: entity-extractor
    inputs:
      text: ${input.text}

  # Analyze sentiment
  - name: analyze-sentiment
    agent: sentiment-analyzer
    inputs:
      text: ${input.text}

  # Generate summary
  - name: summarize
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: Summarize: ${input.text}

  # Synthesize results
  - name: synthesize
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Entities: ${extract-entities.output}
        Sentiment: ${analyze-sentiment.output}
        Summary: ${summarize.output}

        Provide comprehensive analysis.

output:
  entities: ${extract-entities.output}
  sentiment: ${analyze-sentiment.output}
  summary: ${summarize.output}
  analysis: ${synthesize.output}

HITL Approval Flow

ensemble: content-approval

agents:
  - name: generate-content
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: Generate content for: ${input.topic}

  - name: request-approval
    agent: hitl
    inputs:
      prompt: Review this content
      context:
        content: ${generate-content.output}
        topic: ${input.topic}

  - name: publish
    condition: ${request-approval.output.approved}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO published (content) VALUES (?)
      params: [${generate-content.output}]

output:
  content: ${generate-content.output}
  approved: ${request-approval.output.approved}
  published: ${publish.executed}

A/B Testing in Ensembles

Test different variants:
ensemble: ab-test-analysis

agents:
  # Variant A: GPT-4
  - name: analyze-a
    condition: ${input.user_id % 2 === 0}
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: ${input.text}

  # Variant B: Claude
  - name: analyze-b
    condition: ${input.user_id % 2 === 1}
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: ${input.text}

  # Log result
  - name: log-variant
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO ab_tests (user_id, variant) VALUES (?, ?)
      params:
        - ${input.user_id}
        - ${analyze-a.executed ? 'A' : 'B'}

output:
  analysis: ${analyze-a.output || analyze-b.output}
  variant: ${analyze-a.executed ? 'A' : 'B'}

TypeScript Ensembles

For developers who prefer TypeScript, you can create ensembles programmatically with full type safety:

Basic Example

import { createEnsemble, step } from '@anthropic/conductor'

const companyIntelligence = createEnsemble('company-intelligence')
  .setDescription('Gather and analyze company data')
  .addStep(
    step('enricher')
      .agent('company-enricher')
      .input({
        company_name: '${input.company}',
        include_news: true
      })
  )
  .addStep(
    step('scrape-linkedin')
      .agent('scraper')
      .input({ url: 'https://linkedin.com/company/${input.company}' })
  )
  .addStep(
    step('analyze')
      .operation('think')
      .config({
        provider: 'openai',
        model: 'gpt-4o',
        prompt: `
          Analyze this company:
          Data: \${enricher.output.company_data}
          LinkedIn: \${scrape-linkedin.output}
          News: \${enricher.output.news}
        `
      })
  )
  .build()

export default companyIntelligence

Flow Control in TypeScript

TypeScript ensembles support advanced flow control primitives:
import { createEnsemble, step, parallel, branch, tryStep } from '@anthropic/conductor'

const resilientWorkflow = createEnsemble('resilient-workflow')
  // Parallel fetching
  .addStep(
    parallel('fetch-sources')
      .steps(
        step('api-a').agent('fetcher').input({ url: '${input.urlA}' }),
        step('api-b').agent('fetcher').input({ url: '${input.urlB}' })
      )
  )
  // Conditional routing
  .addStep(
    branch('check-priority')
      .condition('${input.priority === "high"}')
      .then(step('fast-process').agent('fast-processor'))
      .else(step('standard-process').agent('standard-processor'))
  )
  // Error handling
  .addStep(
    tryStep('safe-store')
      .try(step('store').agent('db-writer'))
      .catch(step('fallback').agent('cache-writer'))
  )
  .build()
For complete TypeScript API documentation, see the TypeScript API Reference.

Best Practices

  1. Start Simple - One agent, then add complexity
  2. Parallel by Default - Conductor parallelizes automatically
  3. Cache Strategically - Cache expensive operations
  4. Handle Failures - Always have fallbacks
  5. Use State Sparingly - Only when truly needed
  6. Test Thoroughly - Integration test ensembles
  7. Monitor Performance - Track execution times
  8. Version Components - Use Edgit for prompts/configs

Next Steps