Skip to main content

Your First Ensemble

Ensembles orchestrate agents into workflows. Think of them as the conductor coordinating the orchestra. An agent does one thing. An ensemble combines many agents to accomplish complex tasks.

What’s an Ensemble?

An ensemble is a workflow that:
  • Orchestrates multiple agents
  • Controls flow (sequential, parallel, conditional)
  • Manages state across agents
  • Maps outputs to final results
Ensembles are where the magic happens - where simple agents become powerful workflows.

Simple Ensemble

Let’s build a content moderation ensemble that analyzes text for safety. Create ensembles/moderate-content.yaml:
ensemble: moderate-content
description: Multi-step content moderation

agents:
  # Step 1: Check for explicit content
  - name: check-explicit
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze this text for explicit content: "${input.text}"
        Return JSON: {"explicit": true/false, "confidence": 0-1, "reason": ""}

  # Step 2: Check for hate speech
  - name: check-hate
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze this text for hate speech: "${input.text}"
        Return JSON: {"hate_speech": true/false, "confidence": 0-1, "reason": ""}

  # Step 3: Check for spam
  - name: check-spam
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze this text for spam: "${input.text}"
        Return JSON: {"spam": true/false, "confidence": 0-1, "reason": ""}

  # Step 4: Aggregate results
  - name: aggregate
    operation: code
    config:
      code: |
        const explicit = ${check-explicit.output.explicit};
        const hate = ${check-hate.output.hate_speech};
        const spam = ${check-spam.output.spam};

        return {
          safe: !explicit && !hate && !spam,
          flags: {
            explicit,
            hate_speech: hate,
            spam
          },
          reasons: [
            ${check-explicit.output.reason},
            ${check-hate.output.reason},
            ${check-spam.output.reason}
          ].filter(r => r)
        };

output:
  safe: ${aggregate.output.safe}
  flags: ${aggregate.output.flags}
  reasons: ${aggregate.output.reasons}
Execute it:
const result = await conductor.execute('moderate-content', {
  text: 'Hello world! This is a friendly message.'
});

console.log(result);
Result:
{
  "safe": true,
  "flags": {
    "explicit": false,
    "hate_speech": false,
    "spam": false
  },
  "reasons": []
}

Parallel Execution

The previous example runs sequentially (check-explicit � check-hate � check-spam). Let’s run checks in parallel:
ensemble: moderate-content-parallel

agents:
  # All checks run in parallel
  - name: check-explicit
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze for explicit content: "${input.text}"
        Return JSON: {"explicit": true/false}

  - name: check-hate
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze for hate speech: "${input.text}"
        Return JSON: {"hate_speech": true/false}

  - name: check-spam
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze for spam: "${input.text}"
        Return JSON: {"spam": true/false}

  # Aggregate waits for all checks to complete
  - name: aggregate
    operation: code
    config:
      code: |
        return {
          safe: !${check-explicit.output.explicit} &&
                !${check-hate.output.hate_speech} &&
                !${check-spam.output.spam}
        };

output:
  safe: ${aggregate.output.safe}
Performance: Sequential ~1200ms � Parallel ~400ms (3x faster!) Conductor automatically runs agents in parallel when there are no dependencies.

Conditional Flow

Only execute agents when conditions are met:
ensemble: smart-moderation

agents:
  # Quick spam check first
  - name: quick-spam-check
    operation: code
    config:
      code: |
        const text = "${input.text}".toLowerCase();
        const spamWords = ['viagra', 'casino', 'lottery'];
        return {
          spam: spamWords.some(word => text.includes(word))
        };

  # Only do expensive AI checks if not obvious spam
  - name: ai-moderation
    condition: ${!quick-spam-check.output.spam}
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Moderate this content: "${input.text}"
        Return JSON: {"safe": true/false, "reason": ""}

output:
  safe: ${quick-spam-check.output.spam ? false : ai-moderation.output.safe}
  spam_detected: ${quick-spam-check.output.spam}
  ai_reason: ${ai-moderation.output.reason}
Cost optimization: Skip expensive AI call when obvious spam is detected.

Using Custom Agents

Ensembles can use your custom agents:
ensemble: company-intelligence

agents:
  # Use custom company-enricher agent
  - name: enrich
    agent: company-enricher
    inputs:
      company_name: ${input.company}
      include_news: true

  # Use pre-built scraper agent
  - name: scrape-linkedin
    agent: scraper
    inputs:
      url: https://linkedin.com/company/${input.company}

  # Analyze combined data
  - name: analyze
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Analyze this company:

        Web Data: ${enrich.output.company_data}
        LinkedIn: ${scrape-linkedin.output.structured_data}
        Recent News: ${enrich.output.news}

        Provide:
        1. Company overview (2-3 sentences)
        2. Key strengths
        3. Notable challenges
        4. Investment outlook

output:
  company_data: ${enrich.output.company_data}
  analysis: ${analyze.output}
  sources:
    - ${enrich.output.source_url}
    - https://linkedin.com/company/${input.company}

State Management

Share state between agents:
ensemble: stateful-processing

state:
  schema:
    processed_items: array
    total_count: number
    last_timestamp: number

agents:
  # Process item
  - name: process
    operation: code
    config:
      code: |
        const items = ${state.processed_items || []};
        const newItem = {
          id: "${input.item_id}",
          timestamp: Date.now()
        };
        return {
          items: [...items, newItem],
          count: items.length + 1
        };
    state:
      use: [processed_items]
      set:
        processed_items: ${process.output.items}
        total_count: ${process.output.count}
        last_timestamp: ${process.output.items[-1].timestamp}

  # Store in database
  - name: store
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO processed (item_id, count, timestamp)
        VALUES (?, ?, ?)
      params:
        - ${input.item_id}
        - ${state.total_count}
        - ${state.last_timestamp}

output:
  processed: true
  total_count: ${state.total_count}

Error Handling & Fallbacks

Handle failures gracefully:
ensemble: resilient-api

agents:
  # Try primary API
  - name: try-primary
    operation: http
    config:
      url: https://primary-api.com/data
      method: GET
      headers:
        Authorization: Bearer ${env.PRIMARY_API_KEY}
    retry:
      maxAttempts: 2
      backoff: exponential

  # Fallback to secondary if primary fails
  - name: try-secondary
    condition: ${try-primary.failed}
    operation: http
    config:
      url: https://secondary-api.com/data
      method: GET
      headers:
        Authorization: Bearer ${env.SECONDARY_API_KEY}

  # Last resort: cached data
  - name: use-cache
    condition: ${try-primary.failed && try-secondary.failed}
    operation: storage
    config:
      type: kv
      action: get
      key: api-data-cache

output:
  data: ${try-primary.output.body || try-secondary.output.body || use-cache.output.value}
  source: |
    ${(() => {
      if (try-primary.executed && !try-primary.failed) return 'primary';
      if (try-secondary.executed && !try-secondary.failed) return 'secondary';
      return 'cache';
    })()}
  cache_fallback: ${use-cache.executed}

Caching Strategies

Cache at the ensemble level:
ensemble: expensive-analysis

cache:
  ttl: 3600  # Cache entire ensemble result for 1 hour
  key: analysis-${input.document_id}

agents:
  - name: fetch-document
    operation: storage
    config:
      type: r2
      action: get
      key: documents/${input.document_id}

  - name: analyze
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Analyze: ${fetch-document.output.text}

output:
  analysis: ${analyze.output}
  cached: ${__cache_hit}  # True if result came from cache
Or cache individual agents:
ensemble: multi-source-analysis

agents:
  # Cache expensive scraping
  - name: scrape
    agent: scraper
    inputs:
      url: ${input.url}
    cache:
      ttl: 86400  # 24 hours
      key: scrape-${input.url}

  # Don't cache AI analysis (want fresh results)
  - name: analyze
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Analyze: ${scrape.output.text}

output:
  analysis: ${analyze.output}

A/B Testing

Test different implementations:
ensemble: ab-test-analysis

agents:
  # Variant A: GPT-4
  - name: analyze-a
    condition: ${input.user_id % 2 === 0}
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: ${input.text}

  # Variant B: Claude
  - name: analyze-b
    condition: ${input.user_id % 2 === 1}
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: ${input.text}

  # Log for analysis
  - name: log-variant
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO ab_test_results (user_id, variant, timestamp)
        VALUES (?, ?, ?)
      params:
        - ${input.user_id}
        - ${analyze-a.executed ? 'A' : 'B'}
        - ${Date.now()}

output:
  analysis: ${analyze-a.output || analyze-b.output}
  variant: ${analyze-a.executed ? 'A' : 'B'}

Real-World Example: RAG Pipeline

Complete RAG (Retrieval-Augmented Generation) workflow:
ensemble: rag-qa

agents:
  # Generate embedding for query
  - name: embed-query
    operation: think
    config:
      provider: openai
      model: text-embedding-3-small
      input: ${input.question}

  # Search vector database
  - name: search-vectors
    operation: storage
    config:
      type: vectorize
      action: query
      vector: ${embed-query.output.embedding}
      topK: 5

  # Fetch full documents
  - name: fetch-docs
    operation: storage
    config:
      type: d1
      query: |
        SELECT content FROM documents
        WHERE id IN (${search-vectors.output.ids.join(',')})

  # Generate answer
  - name: generate-answer
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Context documents:
        ${fetch-docs.output.rows.map(r => r.content).join('\n\n')}

        Question: ${input.question}

        Answer the question using only the context provided.

  # Store interaction
  - name: log-qa
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO qa_log (question, answer, sources, timestamp)
        VALUES (?, ?, ?, ?)
      params:
        - ${input.question}
        - ${generate-answer.output}
        - ${JSON.stringify(search-vectors.output.ids)}
        - ${Date.now()}

output:
  answer: ${generate-answer.output}
  sources: ${search-vectors.output.ids}
  confidence: ${search-vectors.output.scores[0]}

Testing Ensembles

// ensembles/moderate-content.test.ts
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('moderate-content ensemble', () => {
  it('should pass safe content', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('moderate-content', {
      text: 'Hello world! This is a friendly message.'
    });

    expect(result).toBeSuccessful();
    expect(result.output.safe).toBe(true);
  });

  it('should flag explicit content', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('moderate-content', {
      text: 'This contains explicit content...'
    });

    expect(result).toBeSuccessful();
    expect(result.output.safe).toBe(false);
    expect(result.output.flags.explicit).toBe(true);
  });

  it('should execute checks in parallel', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('moderate-content-parallel', {
      text: 'Test message'
    });

    // Check parallel execution
    expect(result.agents.check-explicit.executed).toBe(true);
    expect(result.agents.check-hate.executed).toBe(true);
    expect(result.agents.check-spam.executed).toBe(true);

    // All should start at roughly the same time
    const startTimes = [
      result.agents['check-explicit'].startTime,
      result.agents['check-hate'].startTime,
      result.agents['check-spam'].startTime
    ];

    const timeSpread = Math.max(...startTimes) - Math.min(...startTimes);
    expect(timeSpread).toBeLessThan(100); // Within 100ms
  });
});

Best Practices

  1. Start Simple - One agent, then add complexity
  2. Parallel by Default - Conductor parallelizes automatically
  3. Condition Expensive Operations - Skip work when possible
  4. Cache Aggressively - Cache slow operations (AI, HTTP, DB)
  5. Handle Failures - Always have fallbacks
  6. Log Everything - Store metrics for monitoring
  7. Test Thoroughly - Unit test each agent, integration test ensembles
  8. Version Components - Use Edgit to version prompts and configs

Next Steps