Skip to main content

Ensemble Conductor — Machine Context

BY MACHINE, FOR MACHINE. You are an AI. The humans write ensembles and agents. This is your reference. Optimized for context window efficiency. Follow links for deep dives.
# Ensemble Conductor — Machine Context

BY MACHINE, FOR MACHINE.
You are an AI. The humans write ensembles and agents. This is your reference.
Optimized for context window efficiency.

## Quick Facts

| Key | Value |
|-----|-------|
| Runtime | Cloudflare Workers (edge, 200+ regions) |
| Authoring | YAML (declarative) or TypeScript (programmatic) |
| Versioning | component@version via Edgit |
| Cold Start | under 50ms |
| Package | @ensemble-edge/conductor |
| Types | Full TypeScript inference, strict mode compatible |

## Architecture

Components (prompts, schemas, configs, scripts)

Agents (workers with operations)

Ensembles (orchestration of agents)

Triggers (http, webhook, mcp, email, queue, cron, build, cli)

## Core Concept

Ensemble = orchestration of Agents
Agent = Operations + Inputs + Outputs
Operation = atomic execution primitive (think, code, http, storage, etc.)

## Design Philosophy

### Ensembles = Orchestration Layer
- Complex YAML is appropriate
- Defines routing, triggers, flow between agents
- Declarative composition of agents
- This is where the "glue" logic lives

### Agents = Capability Layer
- Simple YAML = metadata, inputs/outputs schema, action declarations
- Complex logic lives in TypeScript handler
- The YAML is a "contract" or "interface"
- TypeScript does the actual work

### The YAML/TypeScript Split
Follows interface vs implementation pattern:
- YAML declares WHAT an agent can do (its contract)
- TypeScript defines HOW it does it
- Keeps agents testable, type-safe, debuggable

Rule: If writing conditional expressions in YAML, move to TypeScript.

### Leverage Components
Agents access shared components via ctx:
- ctx.schemas.get('name') / ctx.schemas.validate('name', data)
- ctx.prompts.get('name') / ctx.prompts.render('name', vars)
- ctx.configs.get('name')
- ctx.queries.getSql('name')
- ctx.scripts.get('name')
- ctx.templates.render('name', vars)
- ctx.config (project ConductorConfig)

### Discovery Registries
Agents can introspect available components:
- ctx.agentRegistry.list() / ctx.agentRegistry.get('name')
- ctx.ensembleRegistry.list() / ctx.ensembleRegistry.get('name')

### Eating Our Own Dog Food
Built-in capabilities ship as catalog templates (real agents/ensembles):
- catalog/agents/system/redirect/ → agents/system/redirect/
- catalog/agents/system/docs/ → agents/system/docs/
- catalog/ensembles/system/docs/ → ensembles/system/docs/
No magic. Framework must support all use cases.

## Operations (16 types)

| Operation | Purpose | Config Keys |
|-----------|---------|-------------|
| think | LLM reasoning | provider, model, prompt, schema, temperature, maxTokens |
| code | JS/TS execution | handler: ./file.ts OR script: scripts/path |
| storage | KV/R2 access | type: kv|r2, action: get|put|delete, key |
| data | D1/Hyperdrive/Vectorize | backend: d1|hyperdrive|vectorize, binding, query|operation |
| http | HTTP requests | url, method, headers, body |
| tools | MCP tools | tool, params |
| email | Send email | to, from, subject, body |
| sms | Send SMS | to, from, body |
| html | Render HTML | template, data |
| pdf | Generate PDF | html, filename, format |
| form | Generate forms | fields, csrf |
| queue | Queue messages | action: send|consume, queue, body |
| docs | API docs | OpenAPI generation |
| transform | Data transformation | mode: value|merge|input, pick, omit, rename, defaults |
| convert | Format conversion | from: html|markdown|docx, to: html|markdown, options |
| chart | Data visualization | type: bar|line|area|pie|scatter|sparkline, data, x, y, output: svg|url|vega|html |

## Flow Control (TypeScript)

| Primitive | Signature | Purpose |
|-----------|-----------|---------|
| createEnsemble(name) | .addStep().build() | Ensemble container |
| step(name) | .agent()|.operation().input().config() | Unit of work |
| parallel(name) | .steps([...]) | Concurrent execution |
| branch(name) | .condition().then().else() | If/else |
| switchStep(name) | .value().case().default() | Multi-branch |
| foreach(name) | .items().as().step() | Iterate array |
| tryStep(name) | .try().catch() | Error handling |
| whileStep(name) | .condition().maxIterations().step() | Loop |
| mapReduce(name) | .items().map().reduce() | Map-reduce pattern |

## Lifecycle Hooks

| Hook | Trigger | Use Case |
|------|---------|----------|
| beforeExecute | Before agent runs | Logging, validation |
| afterExecute | After agent completes | Metrics, cleanup |
| onError | On agent failure | Error reporting, fallback |

## Built-in Agents (2 types)

Framework-level agents requiring platform integration. Configure only.

| Agent | Purpose | Key Inputs |
|-------|---------|------------|
| rag | Vector search + LLM (Cloudflare AI & Vectorize) | query, operation: index|search |
| hitl | Human approval flow (Durable Objects) | approvalData, action: suspend|resume |

## Starter Kit Agents

Template agents you can modify or delete. Located in `agents/system/`:

| Agent | Purpose |
|-------|---------|
| scrape | Web scraping with bot detection |
| fetch | HTTP requests with caching |
| validate | Data validation with multiple evaluators |
| redirect | URL redirection service |
| docs | API documentation generation |
| tools | MCP tool integration |
| autorag | Automated RAG indexing |

## Triggers (9 types)

| Trigger | Config | Use Case |
|---------|--------|----------|
| http | path, paths[], methods, auth, rateLimit, cors, public | Web apps, APIs with path params |
| webhook | path, methods, auth, async, public | External integrations |
| mcp | auth, public | AI tool exposure |
| email | addresses, reply_with_output | Email routing |
| queue | queue, batch_size, max_retries | Message processing |
| cron | cron, timezone, enabled | Scheduled execution |
| build | output, enabled, input, metadata | Static generation at build time |
| cli | command, description, options[] | Developer commands |
| startup | enabled, runOnce | Worker cold start initialization |

### Startup Trigger (NEW)
trigger:
  - type: startup
    enabled: true
    runOnce: true  # Only run once per worker instance
flow:
  - agent: cache-warmer
    input: { keys: ["config", "templates"] }

### Build Trigger
trigger:
  - type: build
    enabled: true
    output: ./dist/docs
flow:
  - agent: docs
    input: { action: generate-openapi }

### CLI Trigger (NEW)
trigger:
  - type: cli
    command: generate-docs
    description: Generate documentation
    options:
      - name: format
        type: string
        default: yaml
      - name: output
        type: string
        required: true
flow:
  - agent: docs
    input: { format: ${trigger.options.format} }

### Multi-Path HTTP Trigger
trigger:
  - type: http
    paths:
      - path: /api/v1/users
        methods: [GET, POST]
      - path: /api/v1/users/:id
        methods: [GET, PUT, DELETE]
    public: true

## Components (7 types)

| Type | Extension | Reference Syntax | ctx Method |
|------|-----------|------------------|------------|
| schemas | .json | schemas/[email protected] | ctx.schemas.validate() |
| prompts | .md | prompts/name@latest | ctx.prompts.render() |
| configs | .json, .yaml | configs/name@production | ctx.configs.get() |
| queries | .sql | queries/name@v2 | ctx.queries.getSql() |
| scripts | .ts, .js | scripts/name@v1 | ctx.scripts.get() |
| templates | .html | templates/name@v1 | ctx.templates.render() |
| docs | .md | docs/name@v1 | - |

## Expression Syntax

### Variable Access
\${input.field}                    # Ensemble/agent input
\${agent-name.output}              # Agent output
\${agent-name.output.nested.field} # Nested access
\${state.field}                    # State variable
\${env.VARIABLE}                   # Environment variable
\${[email protected]}          # Component reference
\${trigger.options.format}         # CLI trigger options

### Execution Status
\${agent.executed}                 # Boolean: ran
\${agent.failed}                   # Boolean: errored
\${agent.success}                  # Boolean: succeeded
\${agent.cached}                   # Boolean: from cache
\${agent.duration}                 # Number: ms

### Conditions
condition: \${input.value > 10}
condition: \${agent.failed}
condition: \${!agent.executed}
condition: \${input.type === 'premium'}
condition: \${input.age >= 18 && input.verified}

## YAML Agent Schema (with handler)

name: my-agent
operation: code
handler: ./my-handler.ts          # TypeScript implementation
description: What this agent does

schema:
  input:
    field: type
  output:
    field: type

## YAML Ensemble Schema

name: string                       # Required (filename-derived)
description: string                # Optional
trigger:                           # Optional
  - type: http|webhook|mcp|email|queue|cron|build|cli
state:                             # Optional
  schema:
    field: type
flow:                              # Required
  - name: string
    agent: string                  # OR operation
    operation: string              # OR agent
    input: { key: value }
    config: { key: value }
    condition: string
    cache: { ttl: number, key: string }
    retry: { maxAttempts: number, backoff: exponential|linear }
    timeout: number
    state: { use: [fields], set: { field: value } }
output:                            # Optional - supports conditional blocks
  key: value

## Common Patterns

### Linear Pipeline
agents:
  - name: fetch
    operation: http
    config: { url: "\${input.url}" }
  - name: process
    operation: code
    config: { script: scripts/process }
    input: { data: \${fetch.output} }

### Cache-or-Generate
agents:
  - name: check-cache
    operation: storage
    config: { type: kv, action: get, key: "result-\${input.query}" }
  - name: generate
    condition: \${check-cache.output.value === null}
    operation: think
    config: { provider: openai, model: gpt-4o, prompt: "\${input.query}" }

### Fallback Chain
agents:
  - name: try-primary
    operation: http
    config: { url: "https://primary-api.com" }
    retry: { maxAttempts: 2 }
  - name: try-backup
    condition: \${try-primary.failed}
    operation: http
    config: { url: "https://backup-api.com" }

## File Structure

project/
├── ensembles/              # YAML or TS ensembles
│   ├── examples/           # Example ensembles
│   ├── system/             # Built-in functionality (docs, etc.)
│   ├── debug/              # Debug/dev ensembles
│   └── user/               # User-created ensembles
├── agents/                 # Custom agents
│   ├── examples/           # Example agents
│   ├── system/             # Built-in agents (redirect, docs)
│   │   ├── redirect/       # URL redirect service
│   │   │   ├── agent.yaml
│   │   │   └── redirect.ts
│   │   └── docs/           # Documentation agent
│   │       ├── agent.yaml
│   │       └── docs.ts
│   ├── debug/              # Debug agents
│   └── user/               # User-created agents
├── scripts/                # TS/JS for code operations
├── prompts/                # Prompt templates
├── schemas/                # JSON schemas
├── configs/                # Reusable configs
├── queries/                # SQL templates
├── wrangler.toml           # Cloudflare config
└── .dev.vars               # Local secrets (gitignored)

## CLI Quick Reference

```bash
ensemble conductor init [name]    # Create project (use --yes for CI)
ensemble conductor validate       # Validate YAML/TS

# Project commands (pnpm/npm)
pnpm run dev                      # Local dev server
pnpm run build                    # Build project
pnpm test                         # Run tests
npx wrangler deploy               # Deploy to CF Workers
```

## Common Mistakes

| Don't | Do |
|-------|-----|
| Inline JS in YAML | Use handler: ./file.ts or script: scripts/path |
| Hardcode secrets | Use \${env.SECRET} |
| Skip error handling | Use tryStep() or condition: \${x.failed} |
| Create mega-ensembles | Compose via agents |
| Forget cache | Add cache: { ttl: 3600 } to expensive ops |
| HTTP trigger without auth | Add public: true or auth config |

## Provider Models (think operation)

| Provider | Models |
|----------|--------|
| openai | gpt-4o, gpt-4o-mini, text-embedding-3-small |
| anthropic | claude-3-5-sonnet-20241022, claude-sonnet-4 |
| workers-ai | @cf/meta/llama-3.1-8b-instruct, @cf/meta/llama-3-8b-instruct |
| groq | llama3-70b-8192, mixtral-8x7b-32768 |

## Think Agent Output Mapping

For inline think agents, use schema.output to map AI response to named fields:

agents:
  - name: greet
    operation: think
    config:
      provider: workers-ai
      model: "@cf/meta/llama-3.1-8b-instruct"
    schema:
      output:
        greeting: string    # AI response maps to this field
    prompt: "Generate greeting for \${input.name}"

output:
  message: \${greet.output.greeting}  # Access via schema field name
  model: \${greet.output._meta.model} # Metadata available via _meta

## Workers AI Local Development

For local dev with Workers AI, configure wrangler.toml:

[ai]
binding = "AI"
remote = true  # Required for local dev

Add account_id to wrangler.toml and CLOUDFLARE_API_TOKEN to .dev.vars.

Machine context ends. The humans thank you for building their ensembles.

Quick Facts

KeyValue
RuntimeCloudflare Workers (edge, 200+ regions)
AuthoringYAML (declarative) or TypeScript (programmatic)
Versioningcomponent@version via Edgit
Cold Start<50ms
Package@ensemble-edge/conductor
TypesFull TypeScript inference, strict mode compatible

Architecture

Components (prompts, schemas, configs, scripts)

Agents (workers with operations)

Ensembles (orchestration of agents)

Triggers (http, webhook, mcp, email, queue, cron, build, cli)

Core Concept

Ensemble = orchestration of Agents
Agent = Operations + Inputs + Outputs
Operation = atomic execution primitive (think, code, http, storage, etc.)

Design Philosophy

Ensembles = Orchestration Layer

Ensembles are the glue that composes agents into workflows. Complex YAML is appropriate here because ensembles define:
  • Routing: Which agents handle which triggers
  • Flow control: Branching, looping, parallel execution
  • Data flow: Mapping outputs from one agent to inputs of another
  • Error handling: Fallbacks, retries, circuit breakers
  • Triggers: HTTP, webhooks, cron, queues, email
# Ensembles: Complex orchestration is OK
ensemble: order-pipeline
trigger:
  - type: http
    path: /api/orders
agents:
  - name: validate
    agent: order-validator
    inputs: { order: ${input.body} }
  - name: process
    condition: ${validate.output.valid}
    agent: order-processor
  - name: notify
    condition: ${process.success}
    agent: notification-sender
output:
  orderId: ${process.output.id}
  status: ${process.output.status}

Agents = Capability Layer

Agents define what can be done. Keep agent YAML simple—it’s a contract/interface:
  • Metadata: Name, description, version
  • Input/Output schemas: What goes in, what comes out
  • Action declarations: Named operations the agent can perform
  • TypeScript handler: Where all the complex logic lives
# Agent YAML: Simple contract
agent: order-validator
description: Validates order data against business rules
version: 1.0.0

inputs:
  order:
    type: object
    required: true

actions:
  - name: validate
    operation: code
    handler: ./validator.ts

outputs:
  valid: boolean
  errors: array
// validator.ts: Complex logic lives here
import type { AgentContext } from '@ensemble-edge/conductor'
import { orderSchema } from '../schemas/order.js'

export default async function validate(ctx: AgentContext) {
  const { order } = ctx.input

  // All complex validation logic in TypeScript
  const errors: string[] = []

  if (!order.items?.length) {
    errors.push('Order must have at least one item')
  }

  if (order.total < 0) {
    errors.push('Order total cannot be negative')
  }

  // Use shared schema for structure validation
  const schemaResult = ctx.schemas.validate('order', order)
  if (!schemaResult.valid) {
    errors.push(...schemaResult.errors)
  }

  return {
    valid: errors.length === 0,
    errors
  }
}

The YAML/TypeScript Split

This follows the classic interface vs implementation pattern:
AspectYAML (Contract)TypeScript (Implementation)
PurposeDeclares capabilitiesImplements logic
ComplexitySimple, declarativeAs complex as needed
TestingSchema validationUnit testable
DebuggingEasy to readFull IDE support
ReuseComposition in ensemblesImport/export modules
Rule of thumb: If you’re writing conditional expressions or complex transformations in YAML, move it to TypeScript.

Leverage Components

Agents should use shared components rather than duplicating logic:
# Agent leveraging components
agent: docs-generator
inputs:
  page:
    type: string
    required: true

actions:
  - name: generate
    operation: code
    handler: ./generator.ts
    # Reference shared components
    uses:
      - schemas/[email protected]      # Validation schema
      - prompts/docs-writer@latest    # AI prompt (if useAI enabled)
      - configs/[email protected]   # Shared configuration
// generator.ts
export default async function generate(ctx: AgentContext) {
  // Access components directly
  const schema = ctx.schemas.get('docs-page')
  const prompt = ctx.prompts.get('docs-writer')
  const config = ctx.configs.get('docs-settings')

  // Access project configuration
  const docsConfig = ctx.config.docs

  if (docsConfig?.useAI && prompt) {
    // AI-powered generation
    return ctx.think({
      prompt: prompt.render({ page: ctx.input.page }),
      schema: schema
    })
  }

  // Static generation
  return generateStatic(ctx.input.page, config)
}

Config Access

Agents and ensembles have full access to the project configuration:
// Read config
const projectName = ctx.config.name
const docsSettings = ctx.config.docs
const observability = ctx.config.observability

// Config is read-only at runtime
// To modify settings, update conductor.config.ts and rebuild

Eating Our Own Dog Food

Built-in capabilities (redirects, docs, etc.) are implemented as catalog templates—real agents and ensembles that ship with Conductor:
catalog/
├── agents/
│   ├── redirect/           # URL redirect agent
│   │   ├── redirect.yaml   # Simple contract
│   │   └── redirect.ts     # All redirect logic
│   └── docs/               # Documentation agent
│       ├── docs.yaml       # Simple contract
│       └── docs.ts         # All docs logic
└── ensembles/
    ├── redirects/          # Redirect orchestration
    │   └── resolve.yaml    # HTTP trigger + routing
    └── docs/               # Docs orchestration
        ├── serve.yaml      # HTTP serving
        └── generate.yaml   # Build-time generation
When you run conductor init, these are copied to your project. You can:
  • Use them as-is
  • Customize them for your needs
  • Replace them entirely
  • Learn from them as examples
No magic. If the framework can’t express something as an agent/ensemble, we fix the framework—we don’t add special cases.

Primitives Reference

Operations (16 types)

OperationPurposeConfig Keys
thinkLLM reasoningprovider, model, prompt, schema, temperature, maxTokens
codeJS/TS executionscript (path to script file)
storageKV/R2 accesstype: kv|r2, action: get|put|delete, key
dataD1/Hyperdrive/Vectorizebackend: d1|hyperdrive|vectorize, binding, query|operation
httpHTTP requestsurl, method, headers, body
toolsMCP toolstool, params
emailSend emailto, from, subject, body
smsSend SMSto, from, body
htmlRender HTMLtemplate, data
pdfGenerate PDFhtml, filename, format
formGenerate formsfields, csrf
queueQueue messagesaction: send|consume, queue, body
docsAPI docsOpenAPI generation
transformData transformationmode: value|merge|input, pick, omit, rename, defaults
convertFormat conversionfrom: html|markdown|docx, to: html|markdown, options
chartData visualizationtype: bar|line|area|pie|scatter|sparkline, data, x, y, output: svg|url|vega|html
Operations Reference

Flow Control (TypeScript)

PrimitiveSignaturePurpose
createEnsemble(name).addStep().build()Ensemble container
step(name).agent()|.operation().input().config()Unit of work
parallel(name).steps([...])Concurrent execution
branch(name).condition().then().else()If/else
switchStep(name).value().case().default()Multi-branch
foreach(name).items().as().step()Iterate array
tryStep(name).try().catch()Error handling
whileStep(name).condition().maxIterations().step()Loop
mapReduce(name).items().map().reduce()Map-reduce pattern

Lifecycle Hooks

HookTriggerUse Case
beforeExecuteBefore agent runsLogging, validation
afterExecuteAfter agent completesMetrics, cleanup
onErrorOn agent failureError reporting, fallback
TypeScript API Reference

Built-in Agents (2 types)

Framework-level agents requiring platform integration. Configure only.
AgentPurposeKey Inputs
ragVector search + LLM (Cloudflare AI & Vectorize)query, operation: index|search
hitlHuman approval flow (Durable Objects)approvalData, action: suspend|resume

Starter Kit Agents

Template agents you can modify or delete. Located in agents/system/:
AgentPurpose
scrapeWeb scraping with bot detection
fetchHTTP requests with caching
validateData validation with multiple evaluators
redirectURL redirection service
docsAPI documentation generation
toolsMCP tool integration
autoragAutomated RAG indexing
Starter Kit

Triggers (9 types)

TriggerConfigUse Case
httppath, paths[], methods, auth, rateLimit, cors, publicWeb apps, APIs with path params
webhookpath, methods, auth, async, publicExternal integrations
mcpauth, publicAI tool exposure
emailaddresses, reply_with_outputEmail routing
queuequeue, batch_size, max_retriesMessage processing
croncron, timezone, enabledScheduled execution
buildoutput, enabled, input, metadataStatic generation at build time
clicommand, description, options[]Developer commands
startupenabled, runOnceWorker cold start initialization
Triggers Reference

Components (7 types)

TypeExtensionReference Syntax
schemas.jsonschemas/[email protected]
prompts.mdprompts/name@latest
configs.json, .yamlconfigs/name@production
queries.sqlqueries/name@v2
scripts.ts, .jsscripts/name@v1
templates.htmltemplates/name@v1
docs.mddocs/name@v1
Components Guide

Expression Syntax

Variable Access

${input.field}                    # Ensemble/agent input
${agent-name.output}              # Agent output
${agent-name.output.nested.field} # Nested access
${state.field}                    # State variable
${env.VARIABLE}                   # Environment variable
${[email protected]}          # Component reference
${trigger.options.format}         # CLI trigger options

Execution Status

${agent.executed}                 # Boolean: ran
${agent.failed}                   # Boolean: errored
${agent.success}                  # Boolean: succeeded
${agent.cached}                   # Boolean: from cache
${agent.duration}                 # Number: ms

Conditions

condition: ${input.value > 10}
condition: ${agent.failed}
condition: ${!agent.executed}
condition: ${input.type === 'premium'}
condition: ${input.age >= 18 && input.verified}

Built-in Functions

${Date.now()}
${JSON.stringify(obj)}
${JSON.parse(str)}
${array.length}
${array.map(x => x.id)}
${array.filter(x => x.active)}
${string.split(',')}
${string.toUpperCase()}

YAML Schemas

Ensemble Schema

ensemble: string                   # Required
description: string                # Optional

trigger:                           # Optional: invocation methods
  - type: http|webhook|mcp|email|queue|cron|build|cli
    # ... trigger-specific config

state:                             # Optional
  schema:
    field: type

agents:                            # Required
  - name: string
    agent: string                  # OR operation
    operation: string              # OR agent
    inputs: { key: value }
    config: { key: value }
    condition: string
    cache: { ttl: number, key: string }
    retry: { maxAttempts: number, backoff: exponential|linear }
    timeout: number
    state: { use: [fields], set: { field: value } }

output:                            # Optional
  key: value

Agent Schema

agent: string                      # Required
description: string                # Optional

inputs:                            # Optional
  field:
    type: string|number|boolean|array|object
    required: boolean
    default: any

operations:                        # Required
  - name: string
    operation: string
    config: { ... }
    condition: string
    cache: { ttl, key }
    retry: { maxAttempts, backoff }

outputs:                           # Optional
  field: expression

TypeScript API

Ensemble Builder

import { createEnsemble, step, parallel, branch, tryStep } from '@ensemble-edge/conductor'

const ensemble = createEnsemble('name')
  .setDescription('...')
  .setInput<InputType>()
  .setOutput<OutputType>()
  .addStep(step('name').agent('agent-name').input({ ... }))
  .addStep(step('name').operation('think').config({ ... }))
  .addStep(parallel('name').steps([...]))
  .addStep(branch('name').condition('...').then(...).else(...))
  .addStep(tryStep('name').try(...).catch(...))
  .build()

export default ensemble

Step Builder

step('name')
  .agent('agent-name')           // Reference agent
  .operation('think')            // OR use operation directly
  .input({ key: '${...}' })      // Inputs
  .config({ ... })               // Operation config
  .condition('${...}')           // Conditional execution
  .cache({ ttl: 3600, key: '...' })
  .retry({ maxAttempts: 3, backoff: 'exponential' })
  .timeout(30000)

Version Primitives (Edgit)

import { componentRef, versionedAgent, versionedEnsemble } from '@ensemble-edge/conductor'

// Component reference
componentRef('agent', 'path/name', '1.0.0')
componentRef('agent', 'path/name', '^1.0.0', { fallback: '0.9.0' })

// Versioned agent
versionedAgent('path/name', '1.0.0')
versionedAgent('path/name', '^1.0.0', { config: { ... } })

// Versioned ensemble
versionedEnsemble('pipelines/name', '^2.0.0', { input: { ... } })

Patterns (Copy-Paste Ready)

Linear Pipeline

agents:
  - name: fetch
    operation: http
    config: { url: "${input.url}" }
  - name: process
    operation: code
    config: { script: scripts/process }
    input: { data: ${fetch.output} }
  - name: store
    operation: data
    config: { backend: d1, binding: DB, query: "INSERT..." }

Parallel Fetch + Merge

agents:
  - name: fetch-a
    operation: http
    config: { url: "https://api-a.com" }
  - name: fetch-b
    operation: http
    config: { url: "https://api-b.com" }
  - name: merge
    operation: code
    config: { script: scripts/merge }
    input: { a: ${fetch-a.output}, b: ${fetch-b.output} }

Cache-or-Generate

agents:
  - name: check-cache
    operation: storage
    config: { type: kv, action: get, key: "result-${input.query}" }
  - name: generate
    condition: ${check-cache.output.value === null}
    operation: think
    config: { provider: openai, model: gpt-4o, prompt: "${input.query}" }
  - name: save-cache
    condition: ${generate.executed}
    operation: storage
    config: { type: kv, action: put, key: "result-${input.query}", value: ${generate.output} }
output:
  result: ${check-cache.output.value || generate.output}

Fallback Chain

agents:
  - name: try-primary
    operation: http
    config: { url: "https://primary-api.com" }
    retry: { maxAttempts: 2 }
  - name: try-backup
    condition: ${try-primary.failed}
    operation: http
    config: { url: "https://backup-api.com" }
  - name: use-cache
    condition: ${try-primary.failed && try-backup.failed}
    operation: storage
    config: { type: kv, action: get, key: cached-data }
output:
  data: ${try-primary.output || try-backup.output || use-cache.output}

Conditional Routing

agents:
  - name: classify
    operation: think
    config: { provider: openai, model: gpt-4o-mini, prompt: "Classify: ${input.text}. Return: urgent|normal|low" }
  - name: urgent-handler
    condition: ${classify.output === 'urgent'}
    agent: urgent-processor
  - name: normal-handler
    condition: ${classify.output === 'normal'}
    agent: normal-processor
output:
  result: ${urgent-handler.output || normal-handler.output}

RAG Pipeline

agents:
  - name: embed
    operation: think
    config: { provider: openai, model: text-embedding-3-small, input: "${input.question}" }
  - name: search
    operation: data
    config: { backend: vectorize, binding: VECTORIZE, operation: query, vector: ${embed.output}, topK: 5 }
  - name: generate
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Context: ${search.output.results}
        Question: ${input.question}
        Answer using only the context.

HITL Approval

agents:
  - name: generate
    operation: think
    config: { prompt: "Generate content for: ${input.topic}" }
  - name: review
    agent: hitl
    inputs: { prompt: "Review content", context: { content: ${generate.output} } }
  - name: publish
    condition: ${review.output.approved}
    operation: data
    config: { backend: d1, query: "INSERT INTO published..." }

HTTP Trigger with Auth

trigger:
  - type: http
    path: /api/data/:id
    methods: [GET, POST]
    auth:
      type: bearer
      secret: ${env.API_TOKEN}
    rateLimit:
      requests: 100
      window: 60
    cors:
      origin: "https://myapp.com"

Cron Schedule

trigger:
  - type: cron
    cron: "0 8 * * *"
    timezone: "America/New_York"
    input: { report_type: "daily" }

File Structure

project/
├── ensembles/          # YAML or TS ensembles
│   └── my-workflow.yaml
├── agents/             # Custom agents
│   └── my-agent/
│       └── agent.yaml
├── scripts/            # TS/JS for code operations
│   └── transform.ts
├── prompts/            # Prompt templates
│   └── analyzer.md
├── schemas/            # JSON schemas for structured output
│   └── response.json
├── configs/            # Reusable configs
│   └── settings.yaml
├── queries/            # SQL templates
│   └── analytics.sql
├── wrangler.toml       # Cloudflare config
└── .dev.vars           # Local secrets (gitignored)

Script Template

// scripts/my-script.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function myScript(context: AgentExecutionContext) {
  const { input, env, bindings } = context

  // Your logic here

  return { result: "..." }
}

Cloudflare Bindings

BindingTypewrangler.toml
KVKey-value[[kv_namespaces]]
D1SQL database[[d1_databases]]
R2Object storage[[r2_buckets]]
VectorizeVector DB[[vectorize]]
QueuesMessage queue[[queues.producers]]
AIWorkers AI[ai]
AI GatewayAI caching[ai] gateway_id

Limits

LimitFree TierPaid
CPU time50ms30s
Execution time30s15min
Memory128MB128MB
Payload100MB100MB
Mitigations:
  • Split into smaller steps
  • Use streaming for large data
  • Leverage caching

CLI Quick Reference

# Project initialization
ensemble conductor init [name]    # Create project (use --yes for CI)

# Development (project commands)
pnpm run dev                      # Local dev server
pnpm run build                    # Build project
pnpm test                         # Run Vitest tests
npx wrangler deploy               # Deploy to CF Workers

# Validate
ensemble conductor validate [path]           # Validate YAML/TS files
ensemble conductor validate --fix            # Auto-fix issues
ensemble conductor validate -r               # Recursive directory scan
ensemble conductor validate --strict         # Strict mode
ensemble conductor validate --format json    # JSON output

# Execute
ensemble conductor ensemble:run <name> <input>
ensemble conductor agent:run <name> <input>

# Secrets
wrangler secret put <NAME>        # Production secrets

Testing

import { describe, it, expect } from 'vitest'
import { Executor, MemberLoader } from '@ensemble-edge/conductor'

describe('My Ensemble', () => {
  it('executes successfully', async () => {
    const executor = new Executor({ env, ctx })
    const result = await executor.executeFromYAML(yaml, { input: 'test' })
    expect(result.success).toBe(true)
  })
})
Run: pnpm test

Validation

# Validate all files in current directory
ensemble conductor validate

# Validate specific file
ensemble conductor validate ensembles/my-workflow.yaml
ensemble conductor validate ensembles/my-workflow.ts

# Recursive with auto-fix
ensemble conductor validate . -r --fix

# Strict mode with JSON output (for CI)
ensemble conductor validate --strict --format json
Validates:
  • Ensemble YAML/TS syntax and schema
  • Agent definitions and references
  • Component references (@version syntax)
  • Expression syntax (${...})
  • Operation configs

Common Mistakes

Don’tDo
Inline JS in YAMLUse script: scripts/path
Hardcode secretsUse ${env.SECRET}
Skip error handlingUse tryStep() or condition: ${x.failed}
Create mega-ensemblesCompose via agents
Forget cacheAdd cache: { ttl: 3600 } to expensive ops
Create deps for parallelKeep parallel steps independent

Provider Models

think operation

ProviderModels
openaigpt-4o, gpt-4o-mini, gpt-4-turbo, text-embedding-3-small
anthropicclaude-3-5-sonnet-20241022, claude-3-opus, claude-sonnet-4
workers-ai@cf/meta/llama-3.1-8b-instruct, @cf/meta/llama-3-8b-instruct
groqllama3-70b-8192, mixtral-8x7b-32768

Think Agent Schema Output Mapping

For inline think agents, use schema.output to map AI response to named fields:
agents:
  - name: greet
    operation: think
    config:
      provider: workers-ai
      model: "@cf/meta/llama-3.1-8b-instruct"
      temperature: 0.7
    schema:
      output:
        greeting: string    # AI response maps to this field
    prompt: "Generate a friendly greeting for ${input.name}"

output:
  message: ${greet.output.greeting}  # Access via schema field name
  model: ${greet.output._meta.model} # Metadata available via _meta

Workers AI Local Development

For local development with Workers AI, configure wrangler.toml:
account_id = "your-account-id"  # Required for remote binding

[ai]
binding = "AI"
remote = true  # Required for local dev
Add CLOUDFLARE_API_TOKEN to .dev.vars for authentication.

Core Concepts

Reference

Operations

Patterns


Machine context ends. The humans thank you for building their ensembles.