Skip to main content

Testing Strategy

Test Pyramid

        /\
       /  \      E2E Tests (few)
      /____\
     /      \    Integration Tests (some)
    /________\
   /          \  Unit Tests (many)
  /____________\
Unit Tests: Test individual agents Integration Tests: Test ensembles end-to-end E2E Tests: Test full user journeys

Unit Testing Agents

Basic Agent Test

// agents/company-enricher/agent.test.ts
import { describe, it, expect, beforeEach } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('company-enricher agent', () => {
  let conductor: TestConductor;

  beforeEach(async () => {
    conductor = await TestConductor.create();
    await conductor.loadProject('./');
  });

  it('should enrich company data', async () => {
    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic',
      include_news: false
    });

    expect(result).toBeSuccessful();
    expect(result.output.company_data).toMatchObject({
      name: expect.any(String),
      description: expect.any(String),
      industry: expect.any(String)
    });
  });

  it('should cache on second call', async () => {
    const result1 = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    const result2 = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result1.output.from_cache).toBe(false);
    expect(result2.output.from_cache).toBe(true);
  });

  it('should handle failures gracefully', async () => {
    conductor.mockOperation('scrape-primary', {
      failed: true,
      error: 'Timeout'
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result).toBeSuccessful();
    expect(result.operations['scrape-fallback'].executed).toBe(true);
  });
});

Mocking Operations

describe('company-enricher with mocks', () => {
  it('should work with mocked operations', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    // Mock search
    conductor.mockOperation('search', {
      output: {
        AbstractURL: 'https://anthropic.com'
      }
    });

    // Mock scrape
    conductor.mockOperation('scrape-primary', {
      output: {
        body: '<html>Anthropic is an AI safety company</html>'
      }
    });

    // Mock AI extraction
    conductor.mockOperation('extract', {
      output: {
        name: 'Anthropic',
        description: 'AI safety company',
        industry: 'Artificial Intelligence'
      }
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result).toBeSuccessful();
    expect(result.output.company_data.name).toBe('Anthropic');
  });
});

Testing Error Paths

describe('company-enricher error handling', () => {
  it('should retry failed operations', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    let callCount = 0;
    conductor.mockOperation('search', () => {
      callCount++;
      if (callCount < 3) {
        return { failed: true, error: 'Timeout' };
      }
      return {
        output: { AbstractURL: 'https://anthropic.com' }
      };
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result).toBeSuccessful();
    expect(callCount).toBe(3);  // Retried twice
  });

  it('should use fallback on persistent failure', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    conductor.mockOperation('search', {
      failed: true,
      error: 'Service unavailable'
    });

    conductor.mockOperation('scrape-primary', {
      failed: true,
      error: 'Cannot connect'
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result.operations['scrape-fallback'].executed).toBe(true);
  });
});

Integration Testing Ensembles

Basic Ensemble Test

// ensembles/process-invoice.test.ts
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('process-invoice ensemble', () => {
  it('should process valid invoice end-to-end', async () => {
    const conductor = await TestConductor.create({
      env: {
        STRIPE_API_KEY: process.env.STRIPE_API_KEY
      }
    });
    await conductor.loadProject('./');

    const result = await conductor.execute('process-invoice', {
      invoice: {
        id: 'INV-001',
        amount: 100,
        customer_email: '[email protected]'
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(true);
    expect(result.output.payment_id).toBeDefined();
    expect(result.output.confirmation_sent).toBe(true);
  });

  it('should reject invalid invoice', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('process-invoice', {
      invoice: {
        id: 'INV-002',
        amount: -100  // Invalid
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(false);
    expect(result.output.errors).toContain('Invalid amount');
  });
});

Observability Configuration

Conductor provides Cloudflare-first observability with structured logging, metrics via Analytics Engine, and optional OpenTelemetry export.

Configuration in conductor.config.ts

// conductor.config.ts
import { defineConfig } from '@ensemble-edge/conductor'

export default defineConfig({
  observability: {
    // Logging configuration
    logging: {
      enabled: true,
      level: 'info',  // 'debug' | 'info' | 'warn' | 'error'
      format: 'json', // JSON for Workers Logs indexing

      // Fields to automatically include in all logs
      context: ['requestId', 'executionId', 'ensembleName', 'agentName'],

      // Sensitive fields to redact (replaced with [REDACTED])
      redact: ['password', 'apiKey', 'token', 'authorization', 'secret'],

      // Events to log automatically
      events: [
        'request',        // HTTP requests
        'response',       // HTTP responses
        'agent:start',    // Agent execution start
        'agent:complete', // Agent completion
        'agent:error',    // Agent errors
      ],
    },

    // Analytics Engine metrics
    metrics: {
      enabled: true,
      binding: 'ANALYTICS', // Must match wrangler.toml
      track: [
        'ensemble:execution',  // Ensemble duration/success
        'agent:execution',     // Agent duration/success
        'http:request',        // HTTP request stats
        'error',               // Error counts
      ],
    },

    // Optional: External observability (Datadog, Honeycomb, etc.)
    opentelemetry: {
      enabled: false,
      endpoint: 'https://api.honeycomb.io',
      headers: { 'x-honeycomb-team': '${HONEYCOMB_API_KEY}' },
      samplingRate: 1.0,
    },

    // Track AI token usage for cost analysis
    trackTokenUsage: true,
  },
})

Setting Up Analytics Engine

Add to your wrangler.toml:
# Analytics Engine for metrics
[[analytics_engine_datasets]]
binding = "ANALYTICS"
dataset = "conductor-metrics"
Then create the dataset in Cloudflare Dashboard:
  1. Go to Workers & PagesAnalytics Engine
  2. Click Create dataset
  3. Name it conductor-metrics

Telemetry vs Observability

Observability (logging, traces) is for debugging—understanding how your code executed. Telemetry is for analytics—tracking business metrics for billing, dashboards, and trend analysis.
Use CaseTool
Debug failures, trace executionlogger, observability config
Track costs, revenue, conversiontelemetry operation
For example, in a payment agent: use logger to debug failures, use telemetry to track daily revenue and success rates.

Logging in Agents

Every agent receives a pre-configured logger and metrics recorder in its execution context:
// scripts/my-agent.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function myAgent(context: AgentExecutionContext) {
  const { input, logger, metrics, executionId, requestId } = context

  // Structured logging - automatically includes execution context
  logger.info('Processing started', {
    itemCount: input.items.length,
    category: input.category
  })

  const startTime = Date.now()

  try {
    const result = await processItems(input.items)

    // Log completion
    logger.info('Processing complete', {
      processedCount: result.length,
      durationMs: Date.now() - startTime
    })

    // Record custom metric
    metrics.record('items.processed', result.length, {
      category: input.category
    })

    return result
  } catch (error) {
    // Error logging with stack trace
    logger.error('Processing failed', error, {
      itemCount: input.items.length,
      attemptedAt: Date.now()
    })

    // Record error metric
    metrics.recordError('ProcessingError', input.category)

    throw error
  }
}

Log Levels

LevelUse ForExample
debugDetailed debuggingInput/output values, cache decisions
infoNormal operationsStart/complete events, important milestones
warnPotential issuesRate limits, fallback usage, slow queries
errorFailuresExceptions, validation errors, API failures

YAML-Based Observability

Agent-Level Logging

# agents/sensitive-processor/agent.yaml
name: sensitive-processor
operation: code
config:
  script: scripts/sensitive-processor

# Agent-level logging overrides
logging:
  level: debug  # More verbose for this agent
  redact:
    - password
    - ssn
    - creditCard
  events:
    onStart: true
    onComplete: true
    onError: true

# Agent-level metrics
metrics:
  enabled: true
  custom:
    - name: records.processed
      value: output.count
      type: counter

Ensemble-Level Logging

# ensembles/user-workflow.yaml
name: user-workflow

# Ensemble-level logging configuration
logging:
  level: info
  trace:
    enabled: true
    includeInputs: true
    includeOutputs: false  # Don't log large outputs
    redactInputs:
      - password
      - creditCard

# Ensemble-level metrics
metrics:
  enabled: true
  custom:
    - name: workflow.completed
      condition: success
      type: counter
    - name: workflow.duration
      value: _executionTime
      type: histogram

flow:
  - name: validate
    agent: validator
    input:
      data: ${input.userData}

  - name: process
    agent: processor
    input:
      validated: ${validate.output}

output:
  success: ${process.output.success}
  data: ${process.output.result}

Querying Metrics

Analytics Engine SQL API

Query your metrics using the Cloudflare dashboard or API:
-- Average ensemble execution time (last 24 hours)
SELECT
  blob1 as ensemble_name,
  AVG(double1) as avg_duration_ms,
  MIN(double1) as min_duration_ms,
  MAX(double1) as max_duration_ms,
  COUNT(*) as executions
FROM conductor_metrics
WHERE index1 = 'ensemble.execution'
  AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY blob1
ORDER BY avg_duration_ms DESC

-- Agent error rate
SELECT
  blob1 as agent_name,
  SUM(CASE WHEN double2 = 0 THEN 1 ELSE 0 END) as failures,
  COUNT(*) as total,
  ROUND(SUM(CASE WHEN double2 = 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as error_rate_pct
FROM conductor_metrics
WHERE index1 = 'agent.execution'
  AND timestamp > NOW() - INTERVAL '7' DAY
GROUP BY blob1
HAVING COUNT(*) > 10
ORDER BY error_rate_pct DESC

-- Cache hit rate
SELECT
  blob1 as cache_status,
  COUNT(*) as count,
  ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
FROM conductor_metrics
WHERE index1 = 'cache.performance'
  AND timestamp > NOW() - INTERVAL '1' DAY
GROUP BY blob1

Production Monitoring

Cloudflare Workers Dashboard

View real-time metrics:
  1. Workers & Pages → Your worker → Metrics
  2. Monitor:
    • Requests per second
    • Error rate
    • CPU time
    • Duration (p50, p95, p99)

Viewing Logs

# Stream all logs
wrangler tail

# Filter by status
wrangler tail --status=error

# Filter by specific execution
wrangler tail --search="exec_abc123"

Log Format

Conductor outputs structured JSON logs for Cloudflare Workers Logs:
{
  "timestamp": "2024-01-15T10:30:00.000Z",
  "level": "info",
  "message": "Agent execution completed",
  "service": "conductor",
  "requestId": "req_abc123",
  "executionId": "exec_xyz789",
  "ensembleName": "user-workflow",
  "agentName": "processor",
  "stepIndex": 2,
  "durationMs": 150,
  "cached": false
}

OpenTelemetry Integration

Export to external platforms like Datadog or Honeycomb:
// conductor.config.ts
export default defineConfig({
  observability: {
    opentelemetry: {
      enabled: true,
      endpoint: 'https://http-intake.logs.datadoghq.com/api/v2/logs',
      headers: {
        'DD-API-KEY': '${DATADOG_API_KEY}',
        'DD-SOURCE': 'conductor'
      },
      samplingRate: 0.1, // Sample 10% of requests
    },
  },
})

Debugging

Enable Debug Mode

ensemble: debuggable-workflow

logging:
  level: debug  # Enable verbose logging
  trace:
    enabled: true
    includeInputs: true
    includeOutputs: true

flow:
  # ... agents ...

Local Development

# Run with debug logs
wrangler dev --log-level=debug

# Test ensemble locally
curl -X POST http://localhost:8787/api/v1/execute/ensemble/my-ensemble \
  -H "Content-Type: application/json" \
  -d '{"input": { "test": "data" }}'

Trace Execution with Logger

// scripts/traceable-agent.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function traceableAgent(context: AgentExecutionContext) {
  const { logger, executionId, input } = context

  // Use built-in logger instead of console.log
  logger.debug('Agent started', {
    executionId,
    inputKeys: Object.keys(input)
  })

  const result = await processData(input.data)

  logger.debug('Agent completed', {
    executionId,
    outputKeys: Object.keys(result)
  })

  return result
}

Performance Testing

Load Testing

// scripts/load-test.ts
async function loadTest() {
  // Send 100 concurrent requests
  const promises = Array.from({ length: 100 }, async (_, i) => {
    const start = Date.now();

    const result = await fetch('https://your-worker.workers.dev/api/v1/execute/ensemble/my-ensemble', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ input: { id: i } })
    });

    const duration = Date.now() - start;
    return { success: result.ok, duration };
  });

  const results = await Promise.all(promises);

  // Analyze results
  const successes = results.filter(r => r.success).length;
  const avgDuration = results.reduce((sum, r) => sum + r.duration, 0) / results.length;
  const sorted = results.sort((a, b) => a.duration - b.duration);
  const p95Duration = sorted[Math.floor(results.length * 0.95)].duration;

  console.log({
    total: results.length,
    successes,
    success_rate: (successes / results.length * 100).toFixed(2) + '%',
    avg_duration_ms: avgDuration.toFixed(0),
    p95_duration_ms: p95Duration
  });
}

loadTest();

Best Practices

  1. Test Coverage - Aim for 80%+ coverage
  2. Mock External Calls - Don’t hit real APIs in tests
  3. Test Error Paths - Test failures and fallbacks
  4. Use Structured Logging - Always log as JSON with context
  5. Redact Sensitive Data - Configure redact patterns
  6. Monitor Continuously - Track metrics in production
  7. Set Up Alerts - Alert on error rate spikes
  8. Sample in Production - Use samplingRate for high-volume traffic
  9. Debug Locally First - Use wrangler dev before deploying
  10. Review Logs Regularly - Check for patterns and anomalies

Next Steps