Skip to main content

Overview

Build reliable, maintainable, and performant Conductor workflows following these battle-tested patterns and best practices from production deployments.

Workflow Design

1. Keep Ensembles Focused

# ✅ Good - Single responsibility
name: process-payment
description: Process payment for order

# ❌ Bad - Too many responsibilities
name: process-order-payment-shipping-notifications
Why: Focused ensembles are easier to test, debug, and reuse.

2. Use Descriptive Names

# ✅ Good - Clear intent
- member: validate-customer-credit-card
  type: Function

# ❌ Bad - Unclear
- member: check-cc
  type: Function

3. Document Everything

name: customer-onboarding
description: |
  Complete customer onboarding workflow including:
  - Email verification
  - Profile creation
  - Welcome email
  - Trial activation

flow:
  - member: verify-email
    type: API
    # Why: Prevents fake accounts
    config:
      url: "${env.VERIFICATION_SERVICE}"

4. Fail Fast

flow:
  # Validate early
  - member: validate-input
    type: Validate
    scoring:
      thresholds:
        minimum: 0.9

  # Stop if validation fails
  - member: expensive-operation
    condition: ${validate-input.scoring.score >= 0.9}

Error Handling

1. Always Handle Errors

# ✅ Good - Explicit error handling
- member: risky-operation
  type: API
  continue_on_error: true

- member: handle-error
  condition: ${!risky-operation.success}
  type: Function
  input:
    error: ${risky-operation.error}
    context: ${input}

# ❌ Bad - No error handling
- member: risky-operation
  type: API

2. Use Retries Wisely

# ✅ Good - Retry transient failures
- member: fetch-external-api
  type: Fetch
  retry:
    maxAttempts: 3
    backoff: exponential
    retryOn: [500, 502, 503, 504]  # Only server errors

# ❌ Bad - Retry everything
- member: process-payment
  retry:
    maxAttempts: 10  # Too many for payment

3. Implement Circuit Breakers

state:
  schema:
    failureCount: number
    lastFailure: number

flow:
  - member: check-circuit-breaker
    type: Function
    state:
      use: [failureCount, lastFailure]
    input:
      threshold: 5
      resetTime: 60000

  - member: call-external-service
    condition: ${!check-circuit-breaker.output.circuitOpen}
    type: API
    continue_on_error: true

  - member: update-circuit-breaker
    type: Function
    state:
      use: [failureCount]
      set: [failureCount, lastFailure]
    input:
      success: ${call-external-service.success}

4. Provide Fallbacks

- member: call-primary-service
  type: API
  continue_on_error: true

- member: call-fallback-service
  condition: ${!call-primary-service.success}
  type: API

output:
  result: ${call-primary-service.success ? call-primary-service.output : call-fallback-service.output}

Performance

1. Parallelize Independent Operations

# ✅ Good - 100ms total
parallel:
  - member: fetch-user      # 100ms
  - member: fetch-orders    # 100ms
  - member: fetch-products  # 100ms

# ❌ Bad - 300ms total
flow:
  - member: fetch-user
  - member: fetch-orders
  - member: fetch-products

2. Cache Aggressively

- member: expensive-ai-call
  type: Think
  cache:
    ttl: 3600  # Cache for 1 hour
  config:
    routing: cloudflare-gateway  # AI Gateway cache
    temperature: 0.1  # Low temp = better cache hits

3. Choose Right Model for Task

# Simple classification - Use mini
- member: classify-sentiment
  config:
    model: gpt-4o-mini  # Fast, cheap

# Complex reasoning - Use flagship
- member: analyze-contract
  config:
    model: gpt-4o  # Accurate, expensive

# Ultra-fast - Use Workers AI
- member: quick-filter
  config:
    provider: workers-ai
    model: "@cf/meta/llama-3.1-8b-instruct"

4. Batch Database Operations

# ✅ Good - Single batch insert
- member: insert-all
  type: Data
  config:
    query: |
      INSERT INTO items (id, name, value)
      VALUES ${items.map(() => '(?, ?, ?)').join(',')}

# ❌ Bad - Individual inserts
- member: insert-item
  foreach: ${items}
  type: Data
  config:
    query: "INSERT INTO items VALUES (?, ?, ?)"

Security

1. Never Commit Secrets

# .gitignore
.dev.vars
.env
*.key
credentials.json
secrets/
# ✅ Good - Use environment variables
config:
  headers:
    Authorization: "Bearer ${env.API_KEY}"

# ❌ Bad - Hardcoded secret
config:
  headers:
    Authorization: "Bearer sk-1234567890abcdef"

2. Validate All Input

flow:
  - member: validate-input
    type: Validate
    scoring:
      evaluator: validate
      evaluatorConfig:
        type: rule
        rules:
          - field: email
            rule: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
          - field: amount
            rule: "amount > 0 && amount < 10000"
      thresholds:
        minimum: 1.0

3. Sanitize Output

- member: sanitize-user-input
  type: Transform
  input:
    data: ${input}
    expression: |
      {
        "email": $lowercase($trim(email)),
        "name": $replace(name, /<[^>]*>/g, ""),  /* Remove HTML */
        "comment": $substring(comment, 0, 500)    /* Limit length */
      }

4. Rate Limit API Access

- member: check-rate-limit
  type: Data
  config:
    storage: kv
    operation: get
  input:
    key: "ratelimit:${input.userId}"

- member: enforce-rate-limit
  condition: ${check-rate-limit.output.value >= 100}
  type: Function
  input:
    error: "Rate limit exceeded"

State Management

1. Declare State Schema

state:
  schema:
    userId: string
    orderData: object
    paymentProcessed: boolean
    timestamp: number

2. Minimize State Size

# ✅ Good - Only essential data
state:
  set:
    userId: ${user.id}
    orderId: ${order.id}

# ❌ Bad - Entire objects
state:
  set:
    user: ${user}  # Might be huge
    order: ${order}

3. Use Selective State Access

# ✅ Good - Only use what you need
- member: process-payment
  state:
    use: [orderId, amount]  # Specific fields

# ❌ Bad - Access all state
- member: process-payment
  state:
    use: all  # Inefficient

Testing

1. Test Happy Path and Edge Cases

describe('order-processing', () => {
  it('should process valid order', async () => {
    // Happy path
  });

  it('should handle payment failure', async () => {
    // Error case
  });

  it('should handle timeout', async () => {
    // Edge case
  });

  it('should validate invalid input', async () => {
    // Validation
  });
});

2. Use Realistic Test Data

const conductor = await TestConductor.create({
  mocks: {
    db: {
      users: [
        {
          id: 1,
          email: 'test@example.com',
          name: 'Test User',
          created_at: new Date().toISOString()
        }
      ]
    }
  }
});

3. Test Integration Points

it('should call external API correctly', async () => {
  const conductor = await TestConductor.create();

  const result = await conductor.executeEnsemble('call-api', {
    endpoint: 'https://api.example.com/data'
  });

  expect(result).toBeSuccessful();
  expect(result.members['fetch-data'].config.url).toContain('api.example.com');
});

Monitoring

1. Log Structured Data

console.log(JSON.stringify({
  level: 'info',
  ensemble: 'order-processing',
  orderId: order.id,
  duration: executionTime,
  timestamp: Date.now()
}));

2. Track Key Metrics

output:
  metrics:
    duration: ${execution.duration}
    memberCount: ${execution.memberCount}
    errorCount: ${execution.errorCount}
    cost: ${execution.cost}

3. Set Up Alerts

- member: check-error-rate
  type: Data
  config:
    storage: d1
    operation: query
    query: |
      SELECT COUNT(*) as errors
      FROM execution_log
      WHERE status = 'error'
        AND timestamp > datetime('now', '-5 minutes')

- member: alert-high-errors
  condition: ${check-error-rate.output.results[0].errors > 10}
  type: API
  config:
    url: "${env.PAGERDUTY_URL}"

Code Organization

1. Use Consistent Structure

project/
├── ensembles/
│   ├── customer/
│   │   ├── onboarding.yaml
│   │   └── verification.yaml
│   ├── orders/
│   │   ├── processing.yaml
│   │   └── fulfillment.yaml
│   └── payments/
│       └── charge.yaml
├── functions/
│   ├── validation/
│   └── transforms/
└── src/
    └── index.ts

2. Reuse Common Patterns

# common/validate-email.yaml
name: validate-email
description: Reusable email validation

flow:
  - member: check-format
    type: Validate
    scoring:
      evaluator: validate
      evaluatorConfig:
        type: rule
        rules:
          - field: email
            rule: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"

# Use in other ensembles
flow:
  - member: validate
    type: Ensemble
    input:
      ensemble: "validate-email"
      data: { email: ${input.email} }

3. Version Your Ensembles

# Use Edgit for version control
edgit tag conductor/orders/processing v1.2.0
edgit deploy production

Deployment

1. Use Environment-Specific Configuration

# wrangler.toml
[env.staging]
name = "conductor-staging"
vars = { ENVIRONMENT = "staging" }

[env.production]
name = "conductor-production"
vars = { ENVIRONMENT = "production" }

2. Implement CI/CD

# .github/workflows/deploy.yml
name: Deploy
on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - run: npm ci
      - run: npm test

  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - run: npx wrangler deploy --env production

3. Blue-Green Deployments

# Deploy to staging
npx wrangler deploy --env staging

# Test staging
npm run test:e2e:staging

# Promote to production
npx wrangler deploy --env production

Documentation

1. Document Workflows

name: customer-onboarding
description: |
  Complete customer onboarding workflow.

  Steps:
  1. Verify email address
  2. Create user profile
  3. Send welcome email
  4. Activate trial subscription

  Dependencies:
  - Email verification service
  - User database (D1)
  - SendGrid API

  Rate limits:
  - 100 onboardings per minute

  SLA: 95th percentile < 2 seconds

2. Document Member Behavior

- member: process-payment
  type: API
  # Charges customer credit card via Stripe
  # Retries: 3 attempts with exponential backoff
  # Idempotent: Safe to retry
  # Timeout: 30 seconds
  config:
    url: "https://api.stripe.com/v1/charges"

3. Maintain Changelog

# Changelog

## [1.2.0] - 2024-01-15
### Added
- Email verification step
- Retry logic for payment processing

### Changed
- Increased timeout from 10s to 30s
- Updated error messages

### Fixed
- Race condition in inventory reservation

Cost Optimization

1. Monitor Costs

- member: log-cost
  type: Data
  config:
    storage: d1
    operation: query
    query: |
      INSERT INTO cost_tracking (ensemble, tokens, cost)
      VALUES (?, ?, ?)
  input:
    params:
      - ${execution.ensemble}
      - ${ai-call.output.usage.total_tokens}
      - ${ai-call.output.usage.total_tokens * 0.00001}

2. Use Cheaper Models

# Cascade: Try cheap model first
- member: quick-classification
  config:
    model: gpt-4o-mini
  scoring:
    thresholds:
      minimum: 0.8
    onFailure: continue

# Escalate to expensive model if needed
- member: detailed-analysis
  condition: ${quick-classification.scoring.score < 0.8}
  config:
    model: gpt-4o

3. Batch Requests

# Process multiple items in single AI call
- member: batch-classify
  type: Think
  input:
    prompt: |
      Classify sentiment for each review:
      ${reviews.map((r, i) => `${i+1}. ${r.text}`).join('\n')}

      Return JSON array: [{"index": 1, "sentiment": "positive"}, ...]

Summary Checklist

  • Ensembles have single responsibility
  • All members have descriptive names
  • Workflows are documented
  • Errors are handled explicitly
  • Retries are configured appropriately
  • Independent operations run in parallel
  • Expensive operations are cached
  • Secrets are in environment variables
  • Input is validated
  • Output is sanitized
  • State is minimized
  • Tests cover happy path and edge cases
  • Structured logging is enabled
  • Alerts are configured
  • CI/CD pipeline is set up
  • Costs are monitored