Skip to main content

Ensemble Design Principles

1. Single Responsibility

Each ensemble should accomplish one clear task:
# Good: Clear purpose
ensemble: process-invoice
description: Process customer invoice and send confirmation

# Bad: Too broad
ensemble: handle-everything
description: Does various things

2. Composability

Design agents to be reusable across ensembles:
# Good: Reusable agents
ensemble: customer-onboarding
agents:
  - name: validate-email
    agent: email-validator  # Reusable
  - name: create-account
    agent: account-creator  # Reusable

# Good: Another ensemble reusing same agents
ensemble: newsletter-signup
agents:
  - name: validate-email
    agent: email-validator  # Same agent!

3. Explicit Dependencies

Make data flow obvious:
# Good: Clear dependencies
agents:
  - name: fetch
    agent: fetcher
  - name: process
    operation: code
    config:
      script: scripts/process-fetch-data  # Clear dependency via script
    input:
      data: ${fetch.output}

# Bad: Hidden dependencies
agents:
  - name: step1
    agent: agent1
  - name: step2
    agent: agent2  # What does it depend on?

Common Patterns

Sequential Pipeline

Process data through multiple stages:
ensemble: data-pipeline

agents:
  - name: fetch
    agent: fetcher
    inputs:
      url: ${input.url}

  - name: validate
    agent: validator
    inputs:
      data: ${fetch.output}

  - name: transform
    agent: transformer
    inputs:
      data: ${fetch.output}

  - name: store
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO data (json) VALUES (?)
      params: [${transform.output}]

output:
  success: ${store.executed}

Parallel Fan-Out/Fan-In

Process multiple items in parallel:
ensemble: multi-source-aggregator

agents:
  # Fan-out: Parallel processing
  - name: fetch-api-1
    agent: fetcher
    inputs:
      url: https://api1.com

  - name: fetch-api-2
    agent: fetcher
    inputs:
      url: https://api2.com

  - name: fetch-api-3
    agent: fetcher
    inputs:
      url: https://api3.com

  # Fan-in: Aggregate results
  - name: aggregate
    operation: code
    config:
      script: scripts/aggregate-api-results
    input:
      api1: ${fetch-api-1.output}
      api2: ${fetch-api-2.output}
      api3: ${fetch-api-3.output}

output:
  all_data: ${aggregate.output.results}
// scripts/aggregate-api-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function aggregateResults(context: AgentExecutionContext) {
  const { api1, api2, api3 } = context.input
  return {
    results: [api1, api2, api3]
  }
}

Conditional Branching

Route based on conditions:
ensemble: smart-routing

agents:
  - name: classify
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Classify this request: "${input.text}"
        Return: urgent, normal, or low

  - name: urgent-handler
    condition: ${classify.output === 'urgent'}
    agent: urgent-processor

  - name: normal-handler
    condition: ${classify.output === 'normal'}
    agent: normal-processor

  - name: low-handler
    condition: ${classify.output === 'low'}
    agent: low-processor

output:
  result: ${urgent-handler.output || normal-handler.output || low-handler.output}
  priority: ${classify.output}

Error Recovery

Handle failures gracefully:
ensemble: resilient-workflow

agents:
  - name: try-primary
    agent: fetcher
    inputs:
      url: ${input.primary_url}
    retry:
      maxAttempts: 2

  - name: try-secondary
    condition: ${try-primary.failed}
    agent: fetcher
    inputs:
      url: ${input.secondary_url}

  - name: use-cache
    condition: ${try-primary.failed && try-secondary.failed}
    operation: storage
    config:
      type: kv
      action: get
      key: cached-data

output:
  data: ${try-primary.output || try-secondary.output || use-cache.output.value}
  source: ${try-primary.executed ? 'primary' : try-secondary.executed ? 'secondary' : 'cache'}

Event-Driven

Trigger actions based on events:
ensemble: event-processor

agents:
  - name: parse-event
    operation: code
    config:
      script: scripts/parse-event
    input:
      event: ${input.event}

  - name: handle-user-created
    condition: ${parse-event.output.type === 'user.created'}
    agent: user-onboarding

  - name: handle-payment-received
    condition: ${parse-event.output.type === 'payment.received'}
    agent: payment-processor

  - name: handle-order-placed
    condition: ${parse-event.output.type === 'order.placed'}
    agent: order-fulfillment

output:
  handled: ${handle-user-created.executed || handle-payment-received.executed || handle-order-placed.executed}
  event_type: ${parse-event.output.type}
// scripts/parse-event.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function parseEvent(context: AgentExecutionContext) {
  return JSON.parse(context.input.event)
}

Anti-Patterns

1. God Ensemble

# Bad: Does too much
ensemble: everything
description: Handles all business logic
agents:
  - name: step1...
  - name: step2...
  # ... 50 more agents

# Good: Break into focused ensembles
ensemble: process-order
ensemble: fulfill-order
ensemble: notify-customer

2. Tight Coupling

# Bad: Agents tightly coupled
agents:
  - name: step1
    agent: custom-step1
  - name: step2
    agent: custom-step2-only-for-step1  # Only works with step1

# Good: Loosely coupled
agents:
  - name: fetch
    agent: fetcher  # Generic
  - name: process
    agent: processor  # Generic

3. Hidden State

# Bad: Implicit state management
agents:
  - name: step1
    agent: agent1  # Sets global state internally

# Good: Explicit state
state:
  schema:
    shared_data: object
agents:
  - name: step1
    operation: code
    config:
      script: scripts/produce-data
    state:
      set:
        shared_data: ${step1.output.data}

4. Deep Nesting

# Bad: Deeply nested conditionals
agents:
  - name: check1
    condition: ${input.a}
    operation: code
    # ...
  - name: check2
    condition: ${check1.executed && input.b}
    operation: code
    # ...
  - name: check3
    condition: ${check2.executed && input.c}
    operation: code
    # ...

# Good: Flatten with early returns using a validation script
agents:
  - name: validate
    operation: code
    config:
      script: scripts/validate-inputs
    input:
      a: ${input.a}
      b: ${input.b}
      c: ${input.c}
  - name: process
    condition: ${validate.output.valid}
    operation: code
    config:
      script: scripts/process-validated
// scripts/validate-inputs.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function validateInputs(context: AgentExecutionContext) {
  const { a, b, c } = context.input
  if (!a) return { valid: false }
  if (!b) return { valid: false }
  if (!c) return { valid: false }
  return { valid: true }
}

Performance Optimization

Minimize Sequential Dependencies

# Bad: Sequential (slow)
agents:
  - name: step1
    agent: fetcher
  - name: step2
    operation: code
    config:
      script: scripts/wrap-a
    input:
      data: ${step1.output}
  - name: step3
    operation: code
    config:
      script: scripts/wrap-b
    input:
      data: ${step2.output}

# Good: Parallel where possible
agents:
  - name: step1
    agent: fetcher
  - name: step2
    agent: fetcher
  - name: combine
    operation: code
    config:
      script: scripts/combine-outputs
    input:
      a: ${step1.output}
      b: ${step2.output}
// scripts/combine-outputs.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function combineOutputs(context: AgentExecutionContext) {
  const { a, b } = context.input
  return { a, b }
}

Cache Expensive Operations

agents:
  - name: expensive-scrape
    agent: scraper
    inputs:
      url: ${input.url}
    cache:
      ttl: 86400  # Cache for 24 hours

  - name: expensive-ai
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: ${expensive-scrape.output}
    cache:
      ttl: 3600  # Cache for 1 hour

Early Termination

agents:
  # Quick validation first
  - name: quick-check
    operation: code
    config:
      script: scripts/quick-validate
    input:
      data: ${input.data}

  # Only do expensive work if valid
  - name: expensive-process
    condition: ${quick-check.output.valid}
    agent: expensive-agent
// scripts/quick-validate.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function quickValidate(context: AgentExecutionContext) {
  const { data } = context.input
  return { valid: data && data.length > 0 }
}

Testing Ensembles

// ensembles/process-invoice.test.ts
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('process-invoice ensemble', () => {
  it('should process valid invoice', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('process-invoice', {
      invoice: {
        id: 'INV-001',
        amount: 100,
        customer_email: '[email protected]'
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(true);
    expect(result.output.confirmation_sent).toBe(true);
  });

  it('should reject invalid invoice', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('process-invoice', {
      invoice: {
        id: 'INV-002',
        amount: -100  // Invalid
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(false);
    expect(result.output.errors).toBeDefined();
  });

  it('should use fallback on primary failure', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    // Mock primary agent failure
    conductor.mockAgent('primary-processor', {
      failed: true,
      error: 'Service unavailable'
    });

    const result = await conductor.execute('process-invoice', {
      invoice: { id: 'INV-003', amount: 100 }
    });

    expect(result).toBeSuccessful();
    expect(result.agents['fallback-processor'].executed).toBe(true);
  });

  it('should execute operations in parallel', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('parallel-ensemble', {
      urls: ['url1', 'url2', 'url3']
    });

    const startTimes = [
      result.agents['fetch-1'].startTime,
      result.agents['fetch-2'].startTime,
      result.agents['fetch-3'].startTime
    ];

    const timeSpread = Math.max(...startTimes) - Math.min(...startTimes);
    expect(timeSpread).toBeLessThan(100); // Started within 100ms
  });
});

Documentation

Document your ensembles:
ensemble: process-invoice
description: |
  Process customer invoice through validation, payment processing,
  and confirmation email.

  Requirements:
  - Valid invoice with amount > 0
  - Customer email in database
  - Payment gateway available

  Returns:
  - processed: true if successful
  - confirmation_sent: true if email sent
  - errors: array of error messages if failed

inputs:
  invoice:
    type: object
    required: true
    properties:
      id: string
      amount: number
      customer_email: string

agents:
  # ... agents ...

output:
  processed: ${process.output.success}
  confirmation_sent: ${send-email.executed}
  errors: ${validate.output.errors}

TypeScript Ensembles

For complex workflows, TypeScript ensembles offer type safety and IDE support:

Flow Control Primitives

import {
  createEnsemble,
  step,
  parallel,
  branch,
  foreach,
  tryStep,
  switchStep,
  whileStep,
  mapReduce
} from '@anthropic/conductor'

const advancedWorkflow = createEnsemble('advanced-workflow')
  // Parallel execution
  .addStep(
    parallel('fetch-all')
      .steps(
        step('api-a').agent('fetcher').input({ url: '${input.urlA}' }),
        step('api-b').agent('fetcher').input({ url: '${input.urlB}' })
      )
  )

  // Conditional branching
  .addStep(
    branch('route')
      .condition('${input.priority === "high"}')
      .then(step('fast').agent('fast-processor'))
      .else(step('normal').agent('normal-processor'))
  )

  // Iteration
  .addStep(
    foreach('process-items')
      .items('${input.items}')
      .as('item')
      .concurrency(5)
      .step(step('process').agent('processor').input({ item: '${item}' }))
  )

  // Error handling
  .addStep(
    tryStep('safe-op')
      .try(step('risky').agent('risky-agent'))
      .catch(step('fallback').agent('fallback-agent'))
  )

  // Multi-way branching
  .addStep(
    switchStep('route-by-type')
      .value('${input.type}')
      .case('email', step('email').agent('email-handler'))
      .case('sms', step('sms').agent('sms-handler'))
      .default(step('default').agent('default-handler'))
  )

  // Map-reduce pattern
  .addStep(
    mapReduce('analyze-docs')
      .items('${input.documents}')
      .map(step('analyze').agent('analyzer').input({ doc: '${item}' }))
      .reduce(step('aggregate').agent('aggregator').input({ results: '${mapResults}' }))
  )

  .build()

export default advancedWorkflow

Reusable Step Patterns

Create factory functions for common patterns:
import { step, tryStep } from '@anthropic/conductor'

// Reusable resilient fetch pattern
function resilientFetch(name: string, urlExpression: string) {
  return tryStep(`${name}-safe`)
    .try(
      step(name)
        .agent('fetcher')
        .input({ url: urlExpression })
        .retry({ maxAttempts: 3, backoff: 'exponential' })
    )
    .catch(
      step(`${name}-cache`)
        .agent('cache-reader')
        .input({ key: `cache-${urlExpression}` })
    )
}

// Use in multiple ensembles
const workflow = createEnsemble('my-workflow')
  .addStep(resilientFetch('primary', '${input.primaryUrl}'))
  .addStep(resilientFetch('secondary', '${input.secondaryUrl}'))
  .build()
For complete TypeScript API documentation, see the TypeScript API Reference.

Best Practices

  1. Single Responsibility - One ensemble, one task
  2. Composable Agents - Reusable across ensembles
  3. Explicit Dependencies - Make data flow clear
  4. Parallel by Default - Only add dependencies when needed
  5. Handle Failures - Always have fallbacks
  6. Cache Strategically - Cache expensive operations
  7. Test Thoroughly - Unit and integration tests
  8. Document Well - Clear descriptions and examples
  9. Monitor Performance - Track execution times
  10. Version with Edgit - Track changes over time

Next Steps