Skip to main content

What’s an Ensemble?

An ensemble is a workflow that:
  • Orchestrates multiple agents
  • Controls flow (sequential, parallel, conditional)
  • Manages state across agents
  • Maps outputs to final results
Think of agents as musicians and ensembles as the sheet music that coordinates them.

Explore the Template Ensemble

Your project includes a working ensemble! Let’s explore ensembles/hello-world.yaml:
name: hello-world
description: Simple greeting workflow

trigger:
  - type: cli
    command: hello

agents:
  - name: hello
    operation: code

flow:
  - agent: hello

output:
  greeting: ${hello.output.message}
This ensemble:
  1. Defines a CLI trigger (conductor run hello)
  2. Calls the hello agent (from agents/examples/hello/)
  3. Returns its output as greeting

Run It

The template includes a test in tests/basic.test.ts:
import { Executor, MemberLoader } from '@ensemble-edge/conductor';
import { stringify } from 'yaml';
import helloWorldYAML from '../ensembles/hello-world.yaml';
import greetConfig from '../agents/examples/hello/agent.yaml';
import greetFunction from '../agents/hello';

const executor = new Executor({ env, ctx });
const loader = new MemberLoader({ env, ctx });

// Register agent
const greetMember = loader.registerAgent(greetConfig, greetFunction);
executor.registerAgent(greetMember);

// Execute ensemble
const result = await executor.executeFromYAML(
  stringify(helloWorldYAML),
  { name: 'World' }
);

console.log(result.value.output.greeting);
// Output: "Hello, World! Welcome to Conductor."
Run the tests:
pnpm test

Template Syntax

Ensembles use ${} for variable interpolation:

Access Input

name: my-workflow

trigger:
  - type: cli
    command: process

agents:
  - name: processor
    operation: code
    input:
      user_name: ${input.name}  # Pass ensemble input to agent

flow:
  - agent: processor

Access Agent Outputs

agents:
  - name: step1
    operation: code

  - name: step2
    operation: code
    input:
      data: ${step1.output.result}  # Use previous agent's output

output:
  final_result: ${step2.output.processed}

Check Agent Status

agents:
  - name: risky-operation
    operation: http

  - name: fallback
    operation: code
    condition: ${risky-operation.failed}  # Only run if failed

output:
  success: ${risky-operation.success}
  result: ${risky-operation.output || fallback.output}

Default Values with Fallbacks

# Nullish coalescing (??) - only catches null/undefined
input:
  name: ${input.query.name ?? input.body.name ?? "Guest"}
  count: ${input.page ?? 1}  # Preserves 0 if page is 0

# Falsy coalescing (||) - catches "", 0, false, null, undefined
input:
  title: ${input.title || "Untitled"}
  limit: ${input.limit || 10}  # Uses 10 if limit is 0

Ternary Conditions

output:
  status: ${process.success ? "completed" : "failed"}
  tier: ${input.premium ? "premium" : "basic"}
  greeting: ${input.name ? input.name : "Anonymous"}

Array Access

input:
  first_item: ${response.items[0]}
  second_user: ${input.users[1].name}
  primary_tag: ${article.tags[0] ?? "untagged"}

Boolean Negation

condition: ${!input.disabled}      # Run if NOT disabled
condition: ${!previous-step.failed} # Run if previous step didn't fail

output:
  is_active: ${!user.suspended}

Important: Agent Signatures for Ensembles

Critical: For agents to work in ensembles, they MUST use the AgentExecutionContext signature!

Why This Matters

When you reference an agent in an ensemble, Conductor calls it with this structure:
{
  input: { /* your parameters */ },
  env: { /* Cloudflare bindings */ },
  ctx: { /* ExecutionContext */ }
}

Correct Agent Signature ✅

All agents used in ensembles must follow this pattern:
import type { AgentExecutionContext } from '@ensemble-edge/conductor';

export default function myAgent({ input, env, ctx }: AgentExecutionContext) {
  // Destructure parameters from input
  const { param1, param2 } = input as MyInput;

  // Your logic here
  return { result: 'success' };
}

What Happens If You Don’t?

Wrong signature (direct parameters):
// ❌ DOESN'T WORK IN ENSEMBLES
export default function myAgent({ param1, param2 }: MyInput) {
  return { result: param1 + param2 };
}
Result: Agent receives undefined values because ensemble passes { input: {...}, env, ctx } but agent expects direct parameters!

Quick Fix for Existing Agents

If your agent isn’t working in an ensemble:
  1. Import AgentExecutionContext:
    import type { AgentExecutionContext } from '@ensemble-edge/conductor';
    
  2. Change signature to accept { input, env, ctx }:
    export default function myAgent({ input, env, ctx }: AgentExecutionContext) {
    
  3. Destructure your parameters from input:
    const { param1, param2 } = input as MyInput;
    
That’s it! Your agent now works everywhere: ensembles, direct calls, and tests.
See Agent Signatures in “Your First Agent” for detailed explanation.

Create Your First Ensemble

Let’s build a simple two-step workflow.

Step 1: Create the Ensemble

Create ensembles/greeting-workflow.yaml:
name: greeting-workflow
description: Generate and log a greeting

trigger:
  - type: cli
    command: greet-and-log

agents:
  # Generate greeting
  - name: greeter
    operation: code
    config:
      handler: hello
    input:
      name: ${input.name}
      style: ${input.style}

  # Log the greeting
  - name: logger
    operation: code
    config:
      handler: |
        export default function({ input }) {
          console.log("Generated greeting:", input.greeting);
          return { logged: true, timestamp: Date.now() };
        }
    input:
      greeting: ${greeter.output.message}

flow:
  - agent: greeter
  - agent: logger

output:
  greeting: ${greeter.output.message}
  logged_at: ${logger.output.timestamp}

Step 2: Test It

Create tests/greeting-workflow.test.ts:
import { describe, it, expect } from 'vitest';
import { Executor, MemberLoader } from '@ensemble-edge/conductor';
import { stringify } from 'yaml';
import greetingWorkflow from '../ensembles/greeting-workflow.yaml';
import helloConfig from '../agents/examples/hello/agent.yaml';
import helloFunction from '../agents/hello';

describe('Greeting Workflow', () => {
  it('should execute both steps', async () => {
    const env = {} as Env;
    const ctx = {
      waitUntil: (promise: Promise<any>) => promise,
      passThroughOnException: () => {}
    } as ExecutionContext;

    const executor = new Executor({ env, ctx });
    const loader = new MemberLoader({ env, ctx });

    const helloMember = loader.registerAgent(helloConfig, helloFunction);
    executor.registerAgent(helloMember);

    const result = await executor.executeFromYAML(
      stringify(greetingWorkflow),
      { name: 'Alice', style: 'formal' }
    );

    expect(result.success).toBe(true);
    expect(result.value.output.greeting).toContain('Alice');
    expect(result.value.output.logged_at).toBeDefined();
  });
});
Run: pnpm test

Auto-Discovery (v1.12+)

Zero-Config Ensemble Loading: Ensembles in the ensembles/ directory are automatically discovered at build time.

How It Works

Just create a YAML file in ensembles/ - no imports or registration needed! Step 1: Create ensembles/my-workflow.yaml
name: my-workflow
description: My automated workflow

trigger:
  - type: cli
    command: my-workflow

agents:
  - name: step1
    operation: code
  - name: step2
    operation: think

flow:
  - agent: step1
  - agent: step2

output:
  result: ${step2.output}
Step 2: Rebuild
pnpm run build
Step 3: Execute via API
curl -X POST http://localhost:8787/api/v1/execute/ensemble/my-workflow \
  -H "Content-Type: application/json" \
  -d '{
    "input": { "data": "test" }
  }'
That’s it! No imports, no registration, just create the YAML file.

Using Auto-Discovery API

The recommended way to use ensembles is with createAutoDiscoveryAPI():
// src/index.ts
import { createAutoDiscoveryAPI } from '@ensemble-edge/conductor/api'
import { agents } from 'virtual:conductor-agents'
import { ensembles } from 'virtual:conductor-ensembles'

export default createAutoDiscoveryAPI({
  agents,  // Auto-discovered from agents/
  ensembles,  // Auto-discovered from ensembles/
  autoDiscover: true,
})
This provides:
  • POST /api/v1/execute/ensemble/{name} - Execute an ensemble by name
  • POST /api/v1/execute/agent/{name} - Execute an agent directly (if enabled)
  • GET /api/v1/ensembles - List all ensembles
  • GET /api/v1/agents - List all agents
  • Automatic webhook and cron trigger handling

Execute Request Format

Execute an ensemble via the API:
POST /api/v1/execute/ensemble/greeting-workflow
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json

{
  "input": {
    "name": "Alice",
    "style": "formal"
  }
}
Note: All /api/v1/* routes require authentication by default. See Security & Authentication for details.
Execute an agent directly (if allowDirectAgentExecution is enabled):
POST /api/v1/execute/agent/hello
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json

{
  "input": {
    "name": "Bob"
  }
}

Discovery Rules

Auto-Discovered:
  • ✅ All *.yaml files in ensembles/
  • ✅ Nested directories: ensembles/workflows/user-onboarding.yaml
  • ✅ Cron triggers from ensemble configs
Not Discovered:
  • ❌ README.md files
  • ❌ Files outside ensembles/ directory

Verification

List all discovered ensembles:
curl http://localhost:8787/api/v1/ensembles

# Returns:
{
  "ensembles": [
    { "name": "hello-world", "description": "Simple greeting" },
    { "name": "greeting-workflow", "description": "Generate and log greeting" }
  ]
}

Testing with Auto-Discovery

In tests, you can still use manual registration for fine-grained control:
import { Executor, MemberLoader } from '@ensemble-edge/conductor'
import { stringify } from 'yaml'
import myWorkflow from '../ensembles/my-workflow.yaml'

const executor = new Executor({ env, ctx })
const result = await executor.executeFromYAML(
  stringify(myWorkflow),
  { input: 'data' }
)
This is fine! Manual execution is supported alongside auto-discovery. See the Auto-Discovery guide for complete details.

Flow Control

Sequential Execution (Default)

Agents run one after another:
agents:
  - name: fetch-data
    operation: http
    config:
      url: https://api.example.com/data

  - name: process-data
    operation: code
    input:
      data: ${fetch-data.output.body}

  - name: store-data
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO results (data) VALUES (?)
      params:
        - ${process-data.output.result}
⏱️ Total time: fetch + process + store

Parallel Execution

Agents run simultaneously:
agents:
  - name: check-spam
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Is this spam? ${input.text}"

  - name: check-hate
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Is this hate speech? ${input.text}"

  - name: check-explicit
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Is this explicit? ${input.text}"

  # This agent waits for all three checks
  - name: aggregate
    operation: code
    config:
      script: scripts/aggregate-moderation-checks
    input:
      isSpam: ${check-spam.output.spam}
      isHate: ${check-hate.output.hate}
      isExplicit: ${check-explicit.output.explicit}
// scripts/aggregate-moderation-checks.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function aggregateModerationChecks(context: AgentExecutionContext) {
  const { isSpam, isHate, isExplicit } = context.input
  return {
    safe: !isSpam && !isHate && !isExplicit
  }
}
output:
  safe: ${aggregate.output.safe}
⏱️ Total time: max(spam, hate, explicit) + aggregate How Conductor determines parallelism:
  • Agents with NO dependencies on each other → Run in parallel
  • Agent depends on previous output → Wait for completion

Conditional Execution

Run agents only when conditions are met:
agents:
  # Always runs
  - name: quick-check
    operation: code
    config:
      script: scripts/quick-review-check
// scripts/quick-review-check.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function quickReviewCheck(context: AgentExecutionContext) {
  return { needs_review: Math.random() < 0.2 }
}
  # Only runs 20% of the time
  - name: expensive-ai-check
    operation: think
    condition: ${quick-check.output.needs_review}
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Deep analysis of: ${input.text}"

  # Fallback if AI check didn't run
  - name: use-default
    operation: code
    condition: ${!expensive-ai-check.executed}
    config:
      script: scripts/use-default-value
// scripts/use-default-value.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function useDefaultValue(context: AgentExecutionContext) {
  return { result: "default-value" }
}
output:
  result: ${expensive-ai-check.output || use-default.output.result}
  ai_used: ${expensive-ai-check.executed}
Cost optimization: Skip expensive operations when possible!

Loops

Process arrays of items:
agents:
  - name: process-batch
    operation: code
    loop:
      items: ${input.users}
      as: user
    config:
      script: scripts/process-user
    input:
      user: ${user}
// scripts/process-user.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processUser(context: AgentExecutionContext) {
  const { user } = context.input
  return {
    id: user.id,
    processed: true,
    timestamp: Date.now()
  }
}
output:
  processed: ${process-batch.output}  # Array of results

Retry Logic

Automatically retry failed operations:
agents:
  - name: unreliable-api
    operation: http
    config:
      url: https://api.sometimes-fails.com/data
    retry:
      max_attempts: 3
      backoff: exponential  # 1s, 2s, 4s
      on_status: [500, 502, 503, 504]

output:
  data: ${unreliable-api.output.body}
  attempts: ${unreliable-api.attempts}

Real-World Example: Content Moderation

Let’s build a complete content moderation pipeline:
name: content-moderation
description: Multi-stage content safety checking

trigger:
  - type: http
    path: /api/moderate
    methods: [POST]
    public: false  # Require auth

agents:
  # Quick regex-based filters (fast, free)
  - name: quick-filter
    operation: code
    config:
      script: scripts/quick-filter-text
    input:
      text: ${input.text}
// scripts/quick-filter-text.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function quickFilterText(context: AgentExecutionContext) {
  const { text } = context.input
  const lowerText = text.toLowerCase()
  const badWords = ["spam", "scam", "viagra"]
  const hasBadWords = badWords.some(w => lowerText.includes(w))

  return {
    blocked: hasBadWords,
    reason: hasBadWords ? "Contains blocked keywords" : null
  }
}
  # Only run expensive AI if quick filter passes
  - name: ai-check
    operation: think
    condition: ${!quick-filter.output.blocked}
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: |
        Analyze this text for safety issues:
        "${input.text}"

        Check for:
        - Spam
        - Hate speech
        - Explicit content
        - Misinformation

        Return JSON: {"safe": boolean, "issues": string[], "confidence": number}

  # Log the decision
  - name: log-decision
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO moderation_logs (text, decision, reason, timestamp)
        VALUES (?, ?, ?, ?)
      params:
        - ${input.text}
        - ${quick-filter.output.blocked || !ai-check.output.safe}
        - ${quick-filter.output.reason || ai-check.output.issues}
        - ${Date.now()}

flow:
  - agent: quick-filter
  - agent: ai-check
  - agent: log-decision

output:
  safe: ${!quick-filter.output.blocked && ai-check.output.safe}
  reason: ${quick-filter.output.reason || ai-check.output.issues}
  method: ${quick-filter.output.blocked ? 'filter' : 'ai'}
  confidence: ${ai-check.output.confidence || 1.0}
Cost optimization achieved:
  • Quick filter catches ~80% of bad content (free)
  • AI only runs on remaining 20% (costs money)
  • Result: 80% cost reduction!

Execute from Other Ensembles

You can call one ensemble from another using the HTTP operation:
name: moderation-with-notification
description: Moderate content and notify on violations

trigger:
  - type: http
    path: /api/moderate-notify
    methods: [POST]

agents:
  - name: moderate
    operation: http
    config:
      url: ${env.BASE_URL}/api/moderate
      method: POST
      body:
        text: ${input.text}

  - name: notify
    operation: http
    condition: ${!moderate.output.safe}
    config:
      url: https://slack.example.com/webhook
      method: POST
      body:
        text: "Content violation detected: ${moderate.output.reason}"

flow:
  - agent: moderate
  - agent: notify

output:
  moderated: true
  safe: ${moderate.output.safe}
  reason: ${moderate.output.reason}

Best Practices

1. Start Simple, Add Complexity

# Start with this
name: my-workflow
agents:
  - name: step1
    operation: code
output:
  result: ${step1.output}

# Then add more steps
# Then add conditions
# Then add parallelism
# Then add retry logic

2. Use Descriptive Names

  • validate-input, fetch-user-data, send-notification
  • step1, step2, step3

3. Document Your Ensembles

name: complex-workflow
description: |
  This workflow:
  1. Fetches user data from API
  2. Validates the data structure
  3. Enriches with additional info
  4. Stores in database
  5. Sends notification email

  Expected input:
    - user_id: string (UUID)

  Returns:
    - success: boolean
    - user: object

4. Handle Errors

agents:
  - name: risky-operation
    operation: http
    retry:
      max_attempts: 3

  - name: fallback-handler
    operation: code
    condition: ${risky-operation.failed}
    config:
      script: scripts/handle-failure
    input:
      error: ${risky-operation.error}
// scripts/handle-failure.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function handleFailure(context: AgentExecutionContext) {
  const { error } = context.input
  console.error("Operation failed:", error)
  return { fallback: true }
}
output:
  result: ${risky-operation.output || fallback-handler.output}
  used_fallback: ${fallback-handler.executed}

5. Optimize Costs

Sequential for dependencies:
agents:
  - name: filter
    operation: code  # Free
  - name: ai-analysis
    operation: think  # Only if filter passes
    condition: ${filter.output.needs_analysis}
Parallel for independence:
agents:
  - name: check-a
    operation: think  # Runs simultaneously
  - name: check-b
    operation: think  # Runs simultaneously
  - name: check-c
    operation: think  # Runs simultaneously

6. Test Everything

describe('My Ensemble', () => {
  it('should handle success case', async () => {
    // Test happy path
  });

  it('should handle failures gracefully', async () => {
    // Test error cases
  });

  it('should respect conditions', async () => {
    // Test conditional logic
  });

  it('should execute in parallel', async () => {
    // Test performance
  });
});

Troubleshooting

Problem: Agent 'my-agent' not foundFix: Ensure agent is registered or exists in agents/ directory:
pnpm run build  # Rebuild to discover agents
Problem: Cannot read property 'output' of undefinedFix: Check variable references:
# Wrong
output: ${agent-1.output}  # Agent name has hyphen

# Correct
output: ${agent1.output}  # Use agent-1 as name, reference as agent1
Problem: Agents run sequentially even though they shouldn’tReason: One agent depends on another’s outputFix: Remove dependencies:
# Sequential (b depends on a)
agents:
  - name: a
    operation: code
  - name: b
    input:
      data: ${a.output}  # Dependency!

# Parallel (independent)
agents:
  - name: a
    operation: code
  - name: b
    operation: code  # No dependency
Problem: Execution exceeds time limitFixes:
  1. Use caching for slow operations
  2. Run independent checks in parallel
  3. Skip expensive operations when possible
  4. Increase timeout (paid plan)
agents:
  - name: slow-api
    operation: http
    config:
      cache:
        ttl: 3600  # Cache for 1 hour

TypeScript Ensembles

Prefer TypeScript over YAML? You can create ensembles programmatically with full type safety, IDE autocomplete, and compile-time validation.

Basic TypeScript Ensemble

// ensembles/greeting-workflow.ts
import { createEnsemble, step } from '@anthropic/conductor'

const greetingWorkflow = createEnsemble('greeting-workflow')
  .setDescription('Generate personalized greetings')
  .addStep(
    step('greeter')
      .agent('greeter')
      .input({
        name: '${input.name}',
        style: '${input.style}'
      })
  )
  .addStep(
    step('format')
      .operation('code')
      .config({ script: 'scripts/format-greeting' })
      .input({ greeting: '${greeter.output}' })
  )
  .build()

export default greetingWorkflow

TypeScript vs YAML Comparison

import { createEnsemble, step, parallel } from '@anthropic/conductor'

const multiSourceFetch = createEnsemble('multi-source-fetch')
  .addStep(
    parallel('fetch-all')
      .steps(
        step('api-a').agent('fetcher').input({ url: '${input.urlA}' }),
        step('api-b').agent('fetcher').input({ url: '${input.urlB}' }),
        step('api-c').agent('fetcher').input({ url: '${input.urlC}' })
      )
  )
  .addStep(
    step('merge')
      .operation('code')
      .config({ script: 'scripts/merge-data' })
      .input({
        a: '${api-a.output}',
        b: '${api-b.output}',
        c: '${api-c.output}'
      })
  )
  .build()

export default multiSourceFetch

Benefits of TypeScript Ensembles

FeatureYAMLTypeScript
Type safetyRuntime onlyCompile-time
IDE autocompleteLimitedFull support
RefactoringManualAutomated
Conditional logicExpression stringsNative code
Reusable componentsCopy/pasteImport/export

When to Use Each

Use YAML when:
  • Quick prototyping
  • Simple linear workflows
  • Non-developers editing workflows
  • Maximum readability
Use TypeScript when:
  • Complex conditional logic
  • Reusable step patterns
  • Type safety is important
  • IDE support needed
  • Dynamic workflow generation

Validating TypeScript Ensembles

TypeScript ensembles are validated the same way as YAML:
# Validate a TypeScript ensemble
ensemble conductor validate ensembles/greeting-workflow.ts

# Validate all ensembles (YAML and TypeScript)
ensemble conductor validate ensembles/ -r
For complete TypeScript API documentation, see the TypeScript API Reference.

Next Steps

TypeScript API

Complete TypeScript reference

Flow Control

Master advanced flow patterns

State Management

Share data across agents

Playbooks

Real-world patterns

Performance Tips

Caching Layers

  1. KV Cache (operation level):
agents:
  - name: fetch
    operation: http
    config:
      cache:
        ttl: 3600
  1. AI Gateway (automatic for AI providers):
[ai]
binding = "AI_GATEWAY"
gateway_id = "your-gateway"
  1. Ensemble Results (application level):
// Cache entire ensemble result
const cacheKey = `ensemble:${name}:${hash(input)}`;
const cached = await env.CACHE.get(cacheKey);
if (cached) return JSON.parse(cached);

const result = await executor.execute(name, input);
await env.CACHE.put(cacheKey, JSON.stringify(result), { expirationTtl: 3600 });

Cost vs Speed

Fast + Expensive (parallel):
agents:
  - name: check1
    operation: think  # $0.01
  - name: check2
    operation: think  # $0.01
  - name: check3
    operation: think  # $0.01
# Total: $0.03, Time: max(check1, check2, check3)
Slow + Cheap (sequential with filtering):
agents:
  - name: filter
    operation: code  # $0
  - name: ai-check
    operation: think  # $0.01 only 20% of time
    condition: ${filter.output.needs_check}
# Average: $0.002, Time: filter + (ai-check * 20%)
Choose based on your priorities!