Skip to main content

Ensembles

Ensembles orchestrate agents into workflows. They’re the top level - where you compose agents to accomplish complex tasks. Agents do work. Ensembles coordinate agents.

Ensemble Structure

# ensembles/company-intelligence.yaml
ensemble: company-intelligence
description: Gather and analyze company data

agents:
  # Step 1: Enrich company data
  - name: enricher
    agent: company-enricher
    inputs:
      company_name: ${input.company}
      include_news: true

  # Step 2: Scrape LinkedIn
  - name: scrape-linkedin
    agent: scraper
    inputs:
      url: https://linkedin.com/company/${input.company}

  # Step 3: Analyze combined data
  - name: analyze
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Analyze this company:

        Data: ${enricher.output.company_data}
        LinkedIn: ${scrape-linkedin.output}
        News: ${enricher.output.news}

        Provide: overview, strengths, challenges, outlook

output:
  company_data: ${enricher.output.company_data}
  linkedin: ${scrape-linkedin.output}
  analysis: ${analyze.output}

Key Concepts

Agents in Ensembles

Ensembles orchestrate agents:
agents:
  # Custom agent
  - name: enricher
    agent: company-enricher
    inputs:
      company_name: ${input.company}

  # Pre-built agent
  - name: scrape
    agent: scraper
    inputs:
      url: ${input.url}

  # Inline operation (no separate agent file)
  - name: transform
    operation: code
    config:
      code: |
        return { processed: ${enricher.output} };

Input/Output Mapping

Input: Data passed to ensemble
const result = await conductor.execute('company-intelligence', {
  company: 'Anthropic'
});
Access in agents:
agents:
  - name: enricher
    agent: company-enricher
    inputs:
      company_name: ${input.company}  # From execution input
Output: Shape the final result
output:
  company: ${enricher.output.company_data}
  analysis: ${analyze.output}
  sources: [${enricher.output.source}, 'linkedin.com']

Flow Control

Sequential Execution

Agents run in order by default:
agents:
  - name: step1
    agent: fetcher
    inputs:
      url: ${input.url}

  - name: step2
    operation: think
    config:
      prompt: Analyze: ${step1.output}

  - name: step3
    operation: storage
    config:
      type: d1
      query: INSERT INTO results (data) VALUES (?)
      params: [${step2.output}]

Parallel Execution

Agents without dependencies run in parallel:
agents:
  # These 3 run in parallel
  - name: fetch-a
    agent: fetcher
    inputs:
      url: https://api-a.com

  - name: fetch-b
    agent: fetcher
    inputs:
      url: https://api-b.com

  - name: fetch-c
    agent: fetcher
    inputs:
      url: https://api-c.com

  # This waits for all 3
  - name: merge
    operation: code
    config:
      code: |
        return {
          a: ${fetch-a.output},
          b: ${fetch-b.output},
          c: ${fetch-c.output}
        };

Conditional Execution

Skip agents based on conditions:
agents:
  - name: check-cache
    operation: storage
    config:
      type: kv
      action: get
      key: result-${input.query}

  # Only run if cache miss
  - name: generate
    condition: ${check-cache.output.value === null}
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: ${input.query}

  # Only run if generated
  - name: save-cache
    condition: ${generate.executed}
    operation: storage
    config:
      type: kv
      action: put
      key: result-${input.query}
      value: ${generate.output}

output:
  result: ${check-cache.output.value || generate.output}
  from_cache: ${!generate.executed}

State Management

Share state across agents:
ensemble: stateful-workflow

state:
  schema:
    processed_items: array
    total_count: number

agents:
  - name: process
    operation: code
    config:
      code: |
        const items = ${state.processed_items || []};
        return {
          items: [...items, ${input.item}],
          count: items.length + 1
        };
    state:
      use: [processed_items]
      set:
        processed_items: ${process.output.items}
        total_count: ${process.output.count}

output:
  count: ${state.total_count}
  items: ${state.processed_items}

Ensemble Caching

Cache entire ensemble results:
ensemble: expensive-analysis

cache:
  ttl: 3600  # 1 hour
  key: analysis-${input.document_id}

agents:
  # ... agents ...

output:
  result: ${analyze.output}
  cached: ${__cache_hit}  # True if from cache

Error Handling

Fallbacks

agents:
  - name: try-primary
    agent: fetcher
    inputs:
      url: https://primary-api.com
    retry:
      maxAttempts: 2

  - name: try-secondary
    condition: ${try-primary.failed}
    agent: fetcher
    inputs:
      url: https://backup-api.com

  - name: use-cache
    condition: ${try-primary.failed && try-secondary.failed}
    operation: storage
    config:
      type: kv
      action: get
      key: cached-data

output:
  data: ${try-primary.output || try-secondary.output || use-cache.output}
  source: ${try-primary.executed ? 'primary' : try-secondary.executed ? 'secondary' : 'cache'}

Validation

agents:
  - name: process-data
    agent: processor
    inputs:
      data: ${input.data}

  - name: validate
    agent: validator
    inputs:
      data: ${process-data.output}

  - name: store
    condition: ${validate.output.valid}
    operation: storage
    config:
      type: d1
      query: INSERT INTO results (data) VALUES (?)
      params: [${process-data.output}]

output:
  success: ${validate.output.valid}
  data: ${process-data.output}
  errors: ${validate.output.errors}

Real-World Patterns

RAG Pipeline

ensemble: rag-qa

agents:
  - name: embed-query
    operation: think
    config:
      provider: openai
      model: text-embedding-3-small
      input: ${input.question}

  - name: search
    operation: storage
    config:
      type: vectorize
      action: query
      vector: ${embed-query.output}
      topK: 5

  - name: fetch-docs
    operation: storage
    config:
      type: d1
      query: SELECT content FROM docs WHERE id IN (?)
      params: [${search.output.ids}]

  - name: generate-answer
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Context: ${fetch-docs.output.rows.map(r => r.content).join('\n')}
        Question: ${input.question}
        Answer using only the context.

output:
  answer: ${generate-answer.output}
  sources: ${search.output.ids}

Multi-Agent Analysis

ensemble: multi-agent-analysis

agents:
  # Extract entities
  - name: extract-entities
    agent: entity-extractor
    inputs:
      text: ${input.text}

  # Analyze sentiment
  - name: analyze-sentiment
    agent: sentiment-analyzer
    inputs:
      text: ${input.text}

  # Generate summary
  - name: summarize
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: Summarize: ${input.text}

  # Synthesize results
  - name: synthesize
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Entities: ${extract-entities.output}
        Sentiment: ${analyze-sentiment.output}
        Summary: ${summarize.output}

        Provide comprehensive analysis.

output:
  entities: ${extract-entities.output}
  sentiment: ${analyze-sentiment.output}
  summary: ${summarize.output}
  analysis: ${synthesize.output}

HITL Approval Flow

ensemble: content-approval

agents:
  - name: generate-content
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: Generate content for: ${input.topic}

  - name: request-approval
    agent: hitl
    inputs:
      prompt: Review this content
      context:
        content: ${generate-content.output}
        topic: ${input.topic}

  - name: publish
    condition: ${request-approval.output.approved}
    operation: storage
    config:
      type: d1
      query: INSERT INTO published (content) VALUES (?)
      params: [${generate-content.output}]

output:
  content: ${generate-content.output}
  approved: ${request-approval.output.approved}
  published: ${publish.executed}

A/B Testing in Ensembles

Test different variants:
ensemble: ab-test-analysis

agents:
  # Variant A: GPT-4
  - name: analyze-a
    condition: ${input.user_id % 2 === 0}
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: ${input.text}

  # Variant B: Claude
  - name: analyze-b
    condition: ${input.user_id % 2 === 1}
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: ${input.text}

  # Log result
  - name: log-variant
    operation: storage
    config:
      type: d1
      query: INSERT INTO ab_tests (user_id, variant) VALUES (?, ?)
      params:
        - ${input.user_id}
        - ${analyze-a.executed ? 'A' : 'B'}

output:
  analysis: ${analyze-a.output || analyze-b.output}
  variant: ${analyze-a.executed ? 'A' : 'B'}

Best Practices

  1. Start Simple - One agent, then add complexity
  2. Parallel by Default - Conductor parallelizes automatically
  3. Cache Strategically - Cache expensive operations
  4. Handle Failures - Always have fallbacks
  5. Use State Sparingly - Only when truly needed
  6. Test Thoroughly - Integration test ensembles
  7. Monitor Performance - Track execution times
  8. Version Components - Use Edgit for prompts/configs

Next Steps