Skip to main content

What are Ensembles?

In Conductor’s musical metaphor, ensembles are the sheet music that coordinate your members (musicians) into a harmonious workflow. They’re YAML files that define what happens, when, and in what order.
Why “Ensembles” instead of “Workflows”?While ensembles ARE workflows, we use this term to:
  • Reinforce the musical metaphor (members = musicians, ensembles = sheet music, Conductor = orchestrator)
  • Distinguish from traditional workflow engines and DAG builders
  • Emphasize the composable, harmonic nature of coordinating multiple members
Throughout this documentation, you’ll see “ensemble” and “workflow” used interchangeably—they mean the same thing in Conductor.

Basic Structure

An ensemble defines a workflow with members, flow, and optional features:
# ensembles/company-intelligence.yaml
name: company-intelligence
description: Gather and analyze company data

# Optional: Shared state between members
state:
  schema:
    companyData: object
    analysis: object

# The workflow
flow:
  - member: fetch-company-data
    input:
      domain: ${input.domain}
    state:
      set: [companyData]

  - member: analyze-company
    state:
      use: [companyData]
      set: [analysis]

# Optional: Map outputs
output:
  company: ${fetch-company-data.output.company}
  analysis: ${analyze-company.output.analysis}

Ensemble Components

Name and Description

name: my-ensemble
description: A clear description of what this ensemble does
  • name: Unique identifier (lowercase, hyphens allowed)
  • description: Human-readable explanation

Flow

The heart of every ensemble - defines the sequence of member executions:
flow:
  - member: step-one
    input:
      data: ${input.data}

  - member: step-two
    input:
      result: ${step-one.output.result}
Flow features:
  • Sequential execution by default
  • Parallel execution supported
  • Conditional branching
  • Loops and iteration
  • Error handling

Input

Data passed to the ensemble:
# Execution
const result = await executor.executeEnsemble(ensemble, {
  domain: 'acme.com',
  options: { detailed: true }
});
Access in flow:
flow:
  - member: process-data
    input:
      domain: ${input.domain}
      detailed: ${input.options.detailed}

Output

Shape the final result:
output:
  summary: ${analyze.output.summary}
  score: ${evaluate.output.score}
  metadata:
    processedAt: ${analyze.output.timestamp}
    version: "1.0.0"
Without output mapping:
// Returns all member outputs
{
  "fetch-data": { success: true, data: {...} },
  "analyze": { success: true, data: {...} }
}
With output mapping:
// Returns only mapped fields
{
  summary: "...",
  score: 0.95,
  metadata: { processedAt: "...", version: "1.0.0" }
}

Shared State

Enable members to share data without prop drilling:
state:
  schema:
    companyData:
      type: object
      properties:
        name: string
        domain: string
    analysis: object

flow:
  - member: fetch-data
    state:
      set: [companyData]  # Writes to state

  - member: analyze
    state:
      use: [companyData]  # Reads from state
      set: [analysis]     # Writes analysis

  - member: generate-report
    state:
      use: [companyData, analysis]  # Reads both
Benefits:
  • No prop drilling through multiple members
  • Selective access (members only see declared keys)
  • Access tracking for optimization
  • Type-safe with schema validation
Learn more: State Management

Flow Patterns

Sequential Execution

Default behavior - one member after another:
flow:
  - member: step-1
  - member: step-2
  - member: step-3

Parallel Execution

Execute multiple members simultaneously:
flow:
  - parallel:
      - member: fetch-financials
      - member: fetch-news
      - member: fetch-competitors

  - member: combine-results
    input:
      financials: ${fetch-financials.output}
      news: ${fetch-news.output}
      competitors: ${fetch-competitors.output}

Conditional Branching

Execute based on conditions:
flow:
  - member: check-quality

  - branch:
      if: ${check-quality.output.score} > 0.8
      then:
        - member: approve-automatically
      else:
        - member: request-human-review

Loops

Iterate over data:
flow:
  - foreach:
      items: ${input.companies}
      concurrency: 3
      steps:
        - member: analyze-company
          input:
            company: ${item}

Error Handling

Catch and handle errors:
flow:
  - try:
      - member: risky-operation
    catch:
      - member: handle-error
        input:
          error: ${error}
    finally:
      - member: cleanup

Caching

Cache member results to reduce costs and latency:
flow:
  - member: expensive-ai-call
    cache:
      ttl: 3600      # 1 hour in seconds
      bypass: false  # Set to true to skip cache
Benefits:
  • Reduce AI provider costs
  • Faster execution
  • Consistent results
Learn more: Caching

Quality Scoring

Validate output quality with automatic retry:
scoring:
  enabled: true
  defaultThresholds:
    minimum: 0.7
    target: 0.8
  maxRetries: 3
  backoffStrategy: exponential

flow:
  - member: generate-content
    scoring:
      evaluator: validate
      thresholds:
        minimum: 0.8
      criteria:
        accuracy: "Content must be factually accurate"
        completeness: "All required sections present"
      onFailure: retry
Learn more: Quality Scoring

Webhooks

Trigger ensembles via HTTP:
webhooks:
  - path: "/process-payment"
    method: POST
    auth:
      type: signature
      secret: ${env.WEBHOOK_SECRET}
    async: true
    timeout: 30000
Webhook URL:
https://your-worker.dev/webhooks/process-payment
Learn more: Webhooks Guide

Scheduled Execution

Run ensembles on a schedule:
schedules:
  - cron: "0 9 * * *"        # Daily at 9 AM
    timezone: "America/New_York"
    enabled: true
    input:
      reportType: "daily"

  - cron: "0 */4 * * *"      # Every 4 hours
    enabled: true
    input:
      reportType: "hourly"
Learn more: Scheduling Guide

Interpolation

Reference data throughout your ensemble:
flow:
  - member: process
    input:
      # From input
      domain: ${input.domain}

      # From state
      cached: ${state.companyData}

      # From previous member
      result: ${fetch-data.output.result}

      # From environment
      apiKey: ${env.API_KEY}

      # Nested access
      companyName: ${fetch-data.output.company.name}
Supported sources:
  • ${input.field} - Input data
  • ${state.key} - Shared state
  • ${member-name.output.field} - Member outputs
  • ${env.VARIABLE} - Environment variables

Complete Example

A real-world ensemble that combines multiple features:
name: company-intelligence-report
description: Generate comprehensive company intelligence report

state:
  schema:
    companyData: object
    financials: object
    news: array
    analysis: object

scoring:
  enabled: true
  defaultThresholds:
    minimum: 0.7
    target: 0.85
  maxRetries: 2

webhooks:
  - path: "/generate-report"
    method: POST
    auth:
      type: bearer
      secret: ${env.API_KEY}
    async: true

schedules:
  - cron: "0 9 * * 1"  # Monday 9 AM
    timezone: "America/New_York"
    enabled: true
    input:
      companies: ["acme.com", "example.com"]

flow:
  # Fetch data in parallel
  - parallel:
      - member: scrape
        input:
          url: "https://${input.domain}"
          output: markdown
        state:
          set: [companyData]
        cache:
          ttl: 3600

      - member: fetch-financials
        input:
          domain: ${input.domain}
        state:
          set: [financials]
        cache:
          ttl: 7200

      - member: fetch-news
        input:
          query: ${input.domain}
        state:
          set: [news]

  # Analyze with AI
  - member: analyze-company
    state:
      use: [companyData, financials, news]
      set: [analysis]
    scoring:
      evaluator: validate
      thresholds:
        minimum: 0.8
      criteria:
        accuracy: "Analysis must be factually accurate"
        completeness: "All key metrics analyzed"
      onFailure: retry
      retryLimit: 3

  # Generate report
  - member: generate-report
    state:
      use: [companyData, financials, news, analysis]

  # Human approval if score is low
  - branch:
      if: ${analyze-company.scoring.score} < 0.85
      then:
        - member: hitl
          input:
            operation: suspend
            data: ${generate-report.output}
            ttl: 3600

  # Save to database
  - member: save-report
    input:
      report: ${generate-report.output}
      metadata:
        score: ${analyze-company.scoring.score}
        timestamp: ${analyze-company.output.timestamp}

output:
  report: ${generate-report.output.report}
  analysis: ${analyze-company.output.analysis}
  quality:
    score: ${analyze-company.scoring.score}
    approved: ${hitl.output.approved}
  metadata:
    generatedAt: ${generate-report.output.timestamp}
    company: ${companyData.name}

Best Practices

1. Use Descriptive Names

# ✅ Good
name: company-intelligence-report
member: analyze-financial-health

# ❌ Bad
name: report
member: analyze

2. Document Your Ensemble

name: company-analysis
description: |
  Comprehensive company analysis workflow that:
  - Scrapes company website
  - Fetches financial data
  - Analyzes with AI
  - Generates structured report

  Runs daily at 9 AM EST for monitored companies.

3. Use State for Shared Data

# ✅ Good - Use state
state:
  schema:
    companyData: object

flow:
  - member: fetch
    state:
      set: [companyData]
  - member: analyze
    state:
      use: [companyData]

# ❌ Bad - Prop drilling
flow:
  - member: fetch
  - member: analyze
    input:
      data: ${fetch.output.data}  # Repeated everywhere

4. Enable Caching

flow:
  - member: expensive-operation
    cache:
      ttl: 3600  # Cache for 1 hour

5. Handle Errors

flow:
  - try:
      - member: risky-operation
    catch:
      - member: log-error
      - member: notify-team

Ensemble Organization

Organize ensembles by domain or workflow:
project/
├── ensembles/
│   ├── company-intelligence/
│   │   ├── full-report.yaml
│   │   ├── quick-scan.yaml
│   │   └── deep-dive.yaml
│   ├── customer-support/
│   │   ├── ticket-routing.yaml
│   │   └── auto-response.yaml
│   └── data-pipeline/
│       ├── daily-etl.yaml
│       └── real-time-processing.yaml
└── members/
    └── ...

Testing Ensembles

Use TestConductor to test your ensembles:
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('company-intelligence ensemble', () => {
  it('should generate report for valid company', async () => {
    const conductor = await TestConductor.create({
      projectPath: '.'
    });

    const result = await conductor.executeEnsemble(
      'company-intelligence',
      { domain: 'acme.com' }
    );

    expect(result).toBeSuccessful();
    expect(result.output.report).toBeDefined();
    expect(result).toHaveExecutedMember('analyze-company');
  });
});
Learn more: Testing Guide