Skip to main content

What’s an Ensemble?

An ensemble is a workflow that:
  • Orchestrates multiple agents
  • Controls flow (sequential, parallel, conditional)
  • Manages state across agents
  • Maps outputs to final results
Think of agents as musicians and ensembles as the sheet music that coordinates them.

Explore the Template Ensemble

Your project includes a working ensemble! Let’s explore ensembles/hello-world.yaml:
name: hello-world
description: Simple greeting workflow

trigger:
  - type: cli
    command: hello

agents:
  - name: hello
    operation: code

flow:
  - agent: hello

output:
  greeting: ${hello.output.message}
This ensemble:
  1. Defines a CLI trigger (conductor run hello)
  2. Calls the hello agent (from agents/examples/hello/)
  3. Returns its output as greeting

Run It

The template includes a test in tests/basic.test.ts:
import { Executor, MemberLoader } from '@ensemble-edge/conductor';
import { stringify } from 'yaml';
import helloWorldYAML from '../ensembles/hello-world.yaml';
import greetConfig from '../agents/examples/hello/agent.yaml';
import greetFunction from '../agents/hello';

const executor = new Executor({ env, ctx });
const loader = new MemberLoader({ env, ctx });

// Register agent
const greetMember = loader.registerAgent(greetConfig, greetFunction);
executor.registerAgent(greetMember);

// Execute ensemble
const result = await executor.executeFromYAML(
  stringify(helloWorldYAML),
  { name: 'World' }
);

console.log(result.value.output.greeting);
// Output: "Hello, World! Welcome to Conductor."
Run the tests:
pnpm test

Template Syntax

Ensembles use ${} for variable interpolation:

Access Input

name: my-workflow

trigger:
  - type: cli
    command: process

agents:
  - name: processor
    operation: code
    input:
      user_name: ${input.name}  # Pass ensemble input to agent

flow:
  - agent: processor

Access Agent Outputs

agents:
  - name: step1
    operation: code

  - name: step2
    operation: code
    input:
      data: ${step1.output.result}  # Use previous agent's output

output:
  final_result: ${step2.output.processed}

Check Agent Status

agents:
  - name: risky-operation
    operation: http

  - name: fallback
    operation: code
    condition: ${risky-operation.failed}  # Only run if failed

output:
  success: ${risky-operation.success}
  result: ${risky-operation.output || fallback.output}

Default Values with Fallbacks

# Nullish coalescing (??) - only catches null/undefined
input:
  name: ${input.query.name ?? input.body.name ?? "Guest"}
  count: ${input.page ?? 1}  # Preserves 0 if page is 0

# Falsy coalescing (||) - catches "", 0, false, null, undefined
input:
  title: ${input.title || "Untitled"}
  limit: ${input.limit || 10}  # Uses 10 if limit is 0

Ternary Conditions

output:
  status: ${process.success ? "completed" : "failed"}
  tier: ${input.premium ? "premium" : "basic"}
  greeting: ${input.name ? input.name : "Anonymous"}

Array Access

input:
  first_item: ${response.items[0]}
  second_user: ${input.users[1].name}
  primary_tag: ${article.tags[0] ?? "untagged"}

Boolean Negation

condition: ${!input.disabled}      # Run if NOT disabled
condition: ${!previous-step.failed} # Run if previous step didn't fail

output:
  is_active: ${!user.suspended}

Important: Agent Signatures for Ensembles

Critical: For agents to work in ensembles, they MUST use the AgentExecutionContext signature!

Why This Matters

When you reference an agent in an ensemble, Conductor calls it with this structure:
{
  input: { /* your parameters */ },
  env: { /* Cloudflare bindings */ },
  ctx: { /* ExecutionContext */ }
}

Correct Agent Signature ✅

All agents used in ensembles must follow this pattern:
import type { AgentExecutionContext } from '@ensemble-edge/conductor';

export default function myAgent({ input, env, ctx }: AgentExecutionContext) {
  // Destructure parameters from input
  const { param1, param2 } = input as MyInput;

  // Your logic here
  return { result: 'success' };
}

What Happens If You Don’t?

Wrong signature (direct parameters):
// ❌ DOESN'T WORK IN ENSEMBLES
export default function myAgent({ param1, param2 }: MyInput) {
  return { result: param1 + param2 };
}
Result: Agent receives undefined values because ensemble passes { input: {...}, env, ctx } but agent expects direct parameters!

Quick Fix for Existing Agents

If your agent isn’t working in an ensemble:
  1. Import AgentExecutionContext:
    import type { AgentExecutionContext } from '@ensemble-edge/conductor';
    
  2. Change signature to accept { input, env, ctx }:
    export default function myAgent({ input, env, ctx }: AgentExecutionContext) {
    
  3. Destructure your parameters from input:
    const { param1, param2 } = input as MyInput;
    
That’s it! Your agent now works everywhere: ensembles, direct calls, and tests.
See Agent Signatures in “Your First Agent” for detailed explanation.

Create Your First Ensemble

Let’s build a simple two-step workflow.

Step 1: Create the Ensemble

Create ensembles/greeting-workflow.yaml:
name: greeting-workflow
description: Generate and log a greeting

trigger:
  - type: cli
    command: greet-and-log

agents:
  # Generate greeting
  - name: greeter
    operation: code
    config:
      handler: hello
    input:
      name: ${input.name}
      style: ${input.style}

  # Log the greeting
  - name: logger
    operation: code
    config:
      handler: |
        export default function({ input }) {
          console.log("Generated greeting:", input.greeting);
          return { logged: true, timestamp: Date.now() };
        }
    input:
      greeting: ${greeter.output.message}

flow:
  - agent: greeter
  - agent: logger

output:
  greeting: ${greeter.output.message}
  logged_at: ${logger.output.timestamp}

Step 2: Test It

Create tests/greeting-workflow.test.ts:
import { describe, it, expect } from 'vitest';
import { Executor, MemberLoader } from '@ensemble-edge/conductor';
import { stringify } from 'yaml';
import greetingWorkflow from '../ensembles/greeting-workflow.yaml';
import helloConfig from '../agents/examples/hello/agent.yaml';
import helloFunction from '../agents/hello';

describe('Greeting Workflow', () => {
  it('should execute both steps', async () => {
    const env = {} as Env;
    const ctx = {
      waitUntil: (promise: Promise<any>) => promise,
      passThroughOnException: () => {}
    } as ExecutionContext;

    const executor = new Executor({ env, ctx });
    const loader = new MemberLoader({ env, ctx });

    const helloMember = loader.registerAgent(helloConfig, helloFunction);
    executor.registerAgent(helloMember);

    const result = await executor.executeFromYAML(
      stringify(greetingWorkflow),
      { name: 'Alice', style: 'formal' }
    );

    expect(result.success).toBe(true);
    expect(result.value.output.greeting).toContain('Alice');
    expect(result.value.output.logged_at).toBeDefined();
  });
});
Run: pnpm test

Auto-Discovery (v1.12+)

Zero-Config Ensemble Loading: Ensembles in the ensembles/ directory are automatically discovered at build time.

How It Works

Just create a YAML file in ensembles/ - no imports or registration needed! Step 1: Create ensembles/my-workflow.yaml
name: my-workflow
description: My automated workflow

trigger:
  - type: cli
    command: my-workflow

agents:
  - name: step1
    operation: code
  - name: step2
    operation: think

flow:
  - agent: step1
  - agent: step2

output:
  result: ${step2.output}
Step 2: Rebuild
pnpm run build
Step 3: Execute via API
curl -X POST http://localhost:8787/api/v1/execute/ensemble/my-workflow \
  -H "Content-Type: application/json" \
  -d '{
    "input": { "data": "test" }
  }'
That’s it! No imports, no registration, just create the YAML file.

Using Auto-Discovery API

The recommended way to use ensembles is with createAutoDiscoveryAPI():
// src/index.ts
import { createAutoDiscoveryAPI } from '@ensemble-edge/conductor/api'
import { agents } from 'virtual:conductor-agents'
import { ensembles } from 'virtual:conductor-ensembles'

export default createAutoDiscoveryAPI({
  agents,  // Auto-discovered from agents/
  ensembles,  // Auto-discovered from ensembles/
  autoDiscover: true,
})
This provides:
  • POST /api/v1/execute/ensemble/{name} - Execute an ensemble by name
  • POST /api/v1/execute/agent/{name} - Execute an agent directly (if enabled)
  • GET /api/v1/ensembles - List all ensembles
  • GET /api/v1/agents - List all agents
  • Automatic webhook and cron trigger handling

Execute Request Format

Execute an ensemble via the API:
POST /api/v1/execute/ensemble/greeting-workflow
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json

{
  "input": {
    "name": "Alice",
    "style": "formal"
  }
}
Note: All /api/v1/* routes require authentication by default. See Security & Authentication for details.
Execute an agent directly (if allowDirectAgentExecution is enabled):
POST /api/v1/execute/agent/hello
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json

{
  "input": {
    "name": "Bob"
  }
}

Discovery Rules

Auto-Discovered:
  • ✅ All *.yaml files in ensembles/
  • ✅ Nested directories: ensembles/workflows/user-onboarding.yaml
  • ✅ Cron triggers from ensemble configs
Not Discovered:
  • ❌ README.md files
  • ❌ Files outside ensembles/ directory

Verification

List all discovered ensembles:
curl http://localhost:8787/api/v1/ensembles

# Returns:
{
  "ensembles": [
    { "name": "hello-world", "description": "Simple greeting" },
    { "name": "greeting-workflow", "description": "Generate and log greeting" }
  ]
}

Testing with Auto-Discovery

In tests, you can still use manual registration for fine-grained control:
import { Executor, MemberLoader } from '@ensemble-edge/conductor'
import { stringify } from 'yaml'
import myWorkflow from '../ensembles/my-workflow.yaml'

const executor = new Executor({ env, ctx })
const result = await executor.executeFromYAML(
  stringify(myWorkflow),
  { input: 'data' }
)
This is fine! Manual execution is supported alongside auto-discovery. See the Auto-Discovery guide for complete details.

Flow Control

Sequential Execution (Default)

Agents run one after another:
agents:
  - name: fetch-data
    operation: http
    config:
      url: https://api.example.com/data

  - name: process-data
    operation: code
    input:
      data: ${fetch-data.output.body}

  - name: store-data
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO results (data) VALUES (?)
      params:
        - ${process-data.output.result}
⏱️ Total time: fetch + process + store

Parallel Execution

Agents run simultaneously:
agents:
  - name: check-spam
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Is this spam? ${input.text}"

  - name: check-hate
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Is this hate speech? ${input.text}"

  - name: check-explicit
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Is this explicit? ${input.text}"

  # This agent waits for all three checks
  - name: aggregate
    operation: code
    config:
      script: scripts/aggregate-moderation-checks
    input:
      isSpam: ${check-spam.output.spam}
      isHate: ${check-hate.output.hate}
      isExplicit: ${check-explicit.output.explicit}
// scripts/aggregate-moderation-checks.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function aggregateModerationChecks(context: AgentExecutionContext) {
  const { isSpam, isHate, isExplicit } = context.input
  return {
    safe: !isSpam && !isHate && !isExplicit
  }
}
output:
  safe: ${aggregate.output.safe}
⏱️ Total time: max(spam, hate, explicit) + aggregate How Conductor determines parallelism:
  • Agents with NO dependencies on each other → Run in parallel
  • Agent depends on previous output → Wait for completion

Conditional Execution

Run agents only when conditions are met:
agents:
  # Always runs
  - name: quick-check
    operation: code
    config:
      script: scripts/quick-review-check
// scripts/quick-review-check.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function quickReviewCheck(context: AgentExecutionContext) {
  return { needs_review: Math.random() < 0.2 }
}
  # Only runs 20% of the time
  - name: expensive-ai-check
    operation: think
    condition: ${quick-check.output.needs_review}
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: "Deep analysis of: ${input.text}"

  # Fallback if AI check didn't run
  - name: use-default
    operation: code
    condition: ${!expensive-ai-check.executed}
    config:
      script: scripts/use-default-value
// scripts/use-default-value.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function useDefaultValue(context: AgentExecutionContext) {
  return { result: "default-value" }
}
output:
  result: ${expensive-ai-check.output || use-default.output.result}
  ai_used: ${expensive-ai-check.executed}
Cost optimization: Skip expensive operations when possible!

Loops

Process arrays of items:
agents:
  - name: process-batch
    operation: code
    loop:
      items: ${input.users}
      as: user
    config:
      script: scripts/process-user
    input:
      user: ${user}
// scripts/process-user.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processUser(context: AgentExecutionContext) {
  const { user } = context.input
  return {
    id: user.id,
    processed: true,
    timestamp: Date.now()
  }
}
output:
  processed: ${process-batch.output}  # Array of results

Retry Logic

Automatically retry failed operations:
agents:
  - name: unreliable-api
    operation: http
    config:
      url: https://api.sometimes-fails.com/data
    retry:
      max_attempts: 3
      backoff: exponential  # 1s, 2s, 4s
      on_status: [500, 502, 503, 504]

output:
  data: ${unreliable-api.output.body}
  attempts: ${unreliable-api.attempts}

Real-World Example: Content Moderation

Let’s build a complete content moderation pipeline:
name: content-moderation
description: Multi-stage content safety checking

trigger:
  - type: http
    path: /api/moderate
    methods: [POST]
    public: false  # Require auth

agents:
  # Quick regex-based filters (fast, free)
  - name: quick-filter
    operation: code
    config:
      script: scripts/quick-filter-text
    input:
      text: ${input.text}
// scripts/quick-filter-text.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function quickFilterText(context: AgentExecutionContext) {
  const { text } = context.input
  const lowerText = text.toLowerCase()
  const badWords = ["spam", "scam", "viagra"]
  const hasBadWords = badWords.some(w => lowerText.includes(w))

  return {
    blocked: hasBadWords,
    reason: hasBadWords ? "Contains blocked keywords" : null
  }
}
  # Only run expensive AI if quick filter passes
  - name: ai-check
    operation: think
    condition: ${!quick-filter.output.blocked}
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: |
        Analyze this text for safety issues:
        "${input.text}"

        Check for:
        - Spam
        - Hate speech
        - Explicit content
        - Misinformation

        Return JSON: {"safe": boolean, "issues": string[], "confidence": number}

  # Log the decision
  - name: log-decision
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO moderation_logs (text, decision, reason, timestamp)
        VALUES (?, ?, ?, ?)
      params:
        - ${input.text}
        - ${quick-filter.output.blocked || !ai-check.output.safe}
        - ${quick-filter.output.reason || ai-check.output.issues}
        - ${Date.now()}

flow:
  - agent: quick-filter
  - agent: ai-check
  - agent: log-decision

output:
  safe: ${!quick-filter.output.blocked && ai-check.output.safe}
  reason: ${quick-filter.output.reason || ai-check.output.issues}
  method: ${quick-filter.output.blocked ? 'filter' : 'ai'}
  confidence: ${ai-check.output.confidence || 1.0}
Cost optimization achieved:
  • Quick filter catches ~80% of bad content (free)
  • AI only runs on remaining 20% (costs money)
  • Result: 80% cost reduction!

Execute from Other Ensembles

You can call one ensemble from another using the HTTP operation:
name: moderation-with-notification
description: Moderate content and notify on violations

trigger:
  - type: http
    path: /api/moderate-notify
    methods: [POST]

agents:
  - name: moderate
    operation: http
    config:
      url: ${env.BASE_URL}/api/moderate
      method: POST
      body:
        text: ${input.text}

  - name: notify
    operation: http
    condition: ${!moderate.output.safe}
    config:
      url: https://slack.example.com/webhook
      method: POST
      body:
        text: "Content violation detected: ${moderate.output.reason}"

flow:
  - agent: moderate
  - agent: notify

output:
  moderated: true
  safe: ${moderate.output.safe}
  reason: ${moderate.output.reason}

Best Practices

1. Start Simple, Add Complexity

# Start with this
name: my-workflow
agents:
  - name: step1
    operation: code
output:
  result: ${step1.output}

# Then add more steps
# Then add conditions
# Then add parallelism
# Then add retry logic

2. Use Descriptive Names

  • validate-input, fetch-user-data, send-notification
  • step1, step2, step3

3. Document Your Ensembles

name: complex-workflow
description: |
  This workflow:
  1. Fetches user data from API
  2. Validates the data structure
  3. Enriches with additional info
  4. Stores in database
  5. Sends notification email

  Expected input:
    - user_id: string (UUID)

  Returns:
    - success: boolean
    - user: object

4. Handle Errors

agents:
  - name: risky-operation
    operation: http
    retry:
      max_attempts: 3

  - name: fallback-handler
    operation: code
    condition: ${risky-operation.failed}
    config:
      script: scripts/handle-failure
    input:
      error: ${risky-operation.error}
// scripts/handle-failure.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function handleFailure(context: AgentExecutionContext) {
  const { error } = context.input
  console.error("Operation failed:", error)
  return { fallback: true }
}
output:
  result: ${risky-operation.output || fallback-handler.output}
  used_fallback: ${fallback-handler.executed}

5. Optimize Costs

Sequential for dependencies:
agents:
  - name: filter
    operation: code  # Free
  - name: ai-analysis
    operation: think  # Only if filter passes
    condition: ${filter.output.needs_analysis}
Parallel for independence:
agents:
  - name: check-a
    operation: think  # Runs simultaneously
  - name: check-b
    operation: think  # Runs simultaneously
  - name: check-c
    operation: think  # Runs simultaneously

6. Test Everything

describe('My Ensemble', () => {
  it('should handle success case', async () => {
    // Test happy path
  });

  it('should handle failures gracefully', async () => {
    // Test error cases
  });

  it('should respect conditions', async () => {
    // Test conditional logic
  });

  it('should execute in parallel', async () => {
    // Test performance
  });
});

Troubleshooting

Problem: Agent 'my-agent' not foundFix: Ensure agent is registered or exists in agents/ directory:
pnpm run build  # Rebuild to discover agents
Problem: Cannot read property 'output' of undefinedFix: Check variable references:
# Wrong
output: ${agent-1.output}  # Agent name has hyphen

# Correct
output: ${agent1.output}  # Use agent-1 as name, reference as agent1
Problem: Agents run sequentially even though they shouldn’tReason: One agent depends on another’s outputFix: Remove dependencies:
# Sequential (b depends on a)
agents:
  - name: a
    operation: code
  - name: b
    input:
      data: ${a.output}  # Dependency!

# Parallel (independent)
agents:
  - name: a
    operation: code
  - name: b
    operation: code  # No dependency
Problem: Execution exceeds time limitFixes:
  1. Use caching for slow operations
  2. Run independent checks in parallel
  3. Skip expensive operations when possible
  4. Increase timeout (paid plan)
agents:
  - name: slow-api
    operation: http
    config:
      cache:
        ttl: 3600  # Cache for 1 hour

TypeScript Ensembles

Prefer TypeScript over YAML? You can create ensembles programmatically with full type safety, IDE autocomplete, and compile-time validation.

Basic TypeScript Ensemble

// ensembles/greeting-workflow.ts
import { createEnsemble, step } from '@anthropic/conductor'

const greetingWorkflow = createEnsemble('greeting-workflow')
  .setDescription('Generate personalized greetings')
  .addStep(
    step('greeter')
      .agent('greeter')
      .input({
        name: '${input.name}',
        style: '${input.style}'
      })
  )
  .addStep(
    step('format')
      .operation('code')
      .config({ script: 'scripts/format-greeting' })
      .input({ greeting: '${greeter.output}' })
  )
  .build()

export default greetingWorkflow

TypeScript vs YAML Comparison

import { createEnsemble, step, parallel } from '@anthropic/conductor'

const multiSourceFetch = createEnsemble('multi-source-fetch')
  .addStep(
    parallel('fetch-all')
      .steps(
        step('api-a').agent('fetcher').input({ url: '${input.urlA}' }),
        step('api-b').agent('fetcher').input({ url: '${input.urlB}' }),
        step('api-c').agent('fetcher').input({ url: '${input.urlC}' })
      )
  )
  .addStep(
    step('merge')
      .operation('code')
      .config({ script: 'scripts/merge-data' })
      .input({
        a: '${api-a.output}',
        b: '${api-b.output}',
        c: '${api-c.output}'
      })
  )
  .build()

export default multiSourceFetch

Benefits of TypeScript Ensembles

FeatureYAMLTypeScript
Type safetyRuntime onlyCompile-time
IDE autocompleteLimitedFull support
RefactoringManualAutomated
Conditional logicExpression stringsNative code
Reusable componentsCopy/pasteImport/export

When to Use Each

Use YAML when:
  • Quick prototyping
  • Simple linear workflows
  • Non-developers editing workflows
  • Maximum readability
Use TypeScript when:
  • Complex conditional logic
  • Reusable step patterns
  • Type safety is important
  • IDE support needed
  • Dynamic workflow generation

Validating TypeScript Ensembles

TypeScript ensembles are validated the same way as YAML:
# Validate a TypeScript ensemble
ensemble conductor validate ensembles/greeting-workflow.ts

# Validate all ensembles (YAML and TypeScript)
ensemble conductor validate ensembles/ -r
For complete TypeScript API documentation, see the TypeScript API Reference.

Next Steps

Performance Tips

Caching Layers

  1. KV Cache (operation level):
agents:
  - name: fetch
    operation: http
    config:
      cache:
        ttl: 3600
  1. AI Gateway (automatic for AI providers):
[ai]
binding = "AI_GATEWAY"
gateway_id = "your-gateway"
  1. Ensemble Results (application level):
// Cache entire ensemble result
const cacheKey = `ensemble:${name}:${hash(input)}`;
const cached = await env.CACHE.get(cacheKey);
if (cached) return JSON.parse(cached);

const result = await executor.execute(name, input);
await env.CACHE.put(cacheKey, JSON.stringify(result), { expirationTtl: 3600 });

Cost vs Speed

Fast + Expensive (parallel):
agents:
  - name: check1
    operation: think  # $0.01
  - name: check2
    operation: think  # $0.01
  - name: check3
    operation: think  # $0.01
# Total: $0.03, Time: max(check1, check2, check3)
Slow + Cheap (sequential with filtering):
agents:
  - name: filter
    operation: code  # $0
  - name: ai-check
    operation: think  # $0.01 only 20% of time
    condition: ${filter.output.needs_check}
# Average: $0.002, Time: filter + (ai-check * 20%)
Choose based on your priorities!