Skip to main content

Testing & Observability

Build confidence with testing. Monitor production with observability. Debug issues fast.

Testing Strategy

Test Pyramid

        /\
       /  \      E2E Tests (few)
      /____\
     /      \    Integration Tests (some)
    /________\
   /          \  Unit Tests (many)
  /____________\
Unit Tests: Test individual agents Integration Tests: Test ensembles end-to-end E2E Tests: Test full user journeys

Unit Testing Agents

Basic Agent Test

// agents/company-enricher/agent.test.ts
import { describe, it, expect, beforeEach } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('company-enricher agent', () => {
  let conductor: TestConductor;

  beforeEach(async () => {
    conductor = await TestConductor.create();
    await conductor.loadProject('./');
  });

  it('should enrich company data', async () => {
    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic',
      include_news: false
    });

    expect(result).toBeSuccessful();
    expect(result.output.company_data).toMatchObject({
      name: expect.any(String),
      description: expect.any(String),
      industry: expect.any(String)
    });
  });

  it('should cache on second call', async () => {
    const result1 = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    const result2 = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result1.output.from_cache).toBe(false);
    expect(result2.output.from_cache).toBe(true);
  });

  it('should handle failures gracefully', async () => {
    conductor.mockOperation('scrape-primary', {
      failed: true,
      error: 'Timeout'
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result).toBeSuccessful();
    expect(result.operations['scrape-fallback'].executed).toBe(true);
  });
});

Mocking Operations

describe('company-enricher with mocks', () => {
  it('should work with mocked operations', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    // Mock search
    conductor.mockOperation('search', {
      output: {
        AbstractURL: 'https://anthropic.com'
      }
    });

    // Mock scrape
    conductor.mockOperation('scrape-primary', {
      output: {
        body: '<html>Anthropic is an AI safety company</html>'
      }
    });

    // Mock AI extraction
    conductor.mockOperation('extract', {
      output: {
        name: 'Anthropic',
        description: 'AI safety company',
        industry: 'Artificial Intelligence'
      }
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result).toBeSuccessful();
    expect(result.output.company_data.name).toBe('Anthropic');
  });
});

Testing Error Paths

describe('company-enricher error handling', () => {
  it('should retry failed operations', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    let callCount = 0;
    conductor.mockOperation('search', () => {
      callCount++;
      if (callCount < 3) {
        return { failed: true, error: 'Timeout' };
      }
      return {
        output: { AbstractURL: 'https://anthropic.com' }
      };
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result).toBeSuccessful();
    expect(callCount).toBe(3);  // Retried twice
  });

  it('should use fallback on persistent failure', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    conductor.mockOperation('search', {
      failed: true,
      error: 'Service unavailable'
    });

    conductor.mockOperation('scrape-primary', {
      failed: true,
      error: 'Cannot connect'
    });

    const result = await conductor.executeAgent('company-enricher', {
      company_name: 'Anthropic'
    });

    expect(result.operations['scrape-fallback'].executed).toBe(true);
  });
});

Integration Testing Ensembles

Basic Ensemble Test

// ensembles/process-invoice.test.ts
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('process-invoice ensemble', () => {
  it('should process valid invoice end-to-end', async () => {
    const conductor = await TestConductor.create({
      env: {
        STRIPE_API_KEY: process.env.STRIPE_API_KEY
      }
    });
    await conductor.loadProject('./');

    const result = await conductor.execute('process-invoice', {
      invoice: {
        id: 'INV-001',
        amount: 100,
        customer_email: 'test@example.com'
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(true);
    expect(result.output.payment_id).toBeDefined();
    expect(result.output.confirmation_sent).toBe(true);
  });

  it('should reject invalid invoice', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('process-invoice', {
      invoice: {
        id: 'INV-002',
        amount: -100  // Invalid
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(false);
    expect(result.output.errors).toContain('Invalid amount');
  });
});

Testing Parallel Execution

describe('parallel-processing ensemble', () => {
  it('should execute agents in parallel', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('parallel-processing', {
      urls: ['url1', 'url2', 'url3']
    });

    expect(result).toBeSuccessful();

    // Check all agents executed
    expect(result.agents['fetch-1'].executed).toBe(true);
    expect(result.agents['fetch-2'].executed).toBe(true);
    expect(result.agents['fetch-3'].executed).toBe(true);

    // Check they started at roughly the same time
    const startTimes = [
      result.agents['fetch-1'].startTime,
      result.agents['fetch-2'].startTime,
      result.agents['fetch-3'].startTime
    ];

    const timeSpread = Math.max(...startTimes) - Math.min(...startTimes);
    expect(timeSpread).toBeLessThan(100); // Within 100ms
  });
});

Testing State Management

describe('stateful-workflow ensemble', () => {
  it('should maintain state across operations', async () => {
    const conductor = await TestConductor.create();
    await conductor.loadProject('./');

    const result = await conductor.execute('stateful-workflow', {
      item: 'item1'
    });

    expect(result).toBeSuccessful();
    expect(result.state.processed_items).toContain('item1');
    expect(result.state.total_count).toBe(1);
  });
});

Production Monitoring

Cloudflare Analytics

View metrics in Cloudflare dashboard:
  1. Workers & Pages � Your worker � Metrics
  2. View:
    • Requests per second
    • Error rate
    • CPU time
    • Duration (p50, p95, p99)

Custom Metrics

Log custom metrics:
ensemble: monitored-workflow

agents:
  - name: process
    agent: processor
    inputs:
      data: ${input.data}

  # Log metrics
  - name: log-metrics
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO metrics
        (ensemble, agent, duration_ms, success, timestamp)
        VALUES (?, ?, ?, ?, ?)
      params:
        - monitored-workflow
        - process
        - ${process.duration}
        - ${!process.failed}
        - ${Date.now()}

output:
  result: ${process.output}
  duration_ms: ${process.duration}
Query metrics:
-- Average duration by agent
SELECT
  agent,
  AVG(duration_ms) as avg_duration_ms,
  MIN(duration_ms) as min_duration_ms,
  MAX(duration_ms) as max_duration_ms,
  COUNT(*) as executions
FROM metrics
WHERE timestamp > datetime('now', '-24 hours')
GROUP BY agent
ORDER BY avg_duration_ms DESC;

-- Success rate by ensemble
SELECT
  ensemble,
  COUNT(*) as total,
  SUM(CASE WHEN success THEN 1 ELSE 0 END) as successes,
  ROUND(AVG(CASE WHEN success THEN 1.0 ELSE 0.0 END) * 100, 2) as success_rate_pct
FROM metrics
WHERE timestamp > datetime('now', '-24 hours')
GROUP BY ensemble;

Logging

// src/index.ts
import { Conductor } from '@ensemble-edge/conductor';

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
    const conductor = new Conductor({ env, ctx });

    // Log request
    console.log('Request:', {
      url: request.url,
      method: request.method,
      timestamp: Date.now()
    });

    try {
      const result = await conductor.execute('my-ensemble', {});

      // Log success
      console.log('Success:', {
        ensemble: 'my-ensemble',
        duration_ms: result.duration,
        agents_executed: Object.keys(result.agents).filter(k => result.agents[k].executed).length
      });

      return Response.json(result.output);
    } catch (error) {
      // Log error
      console.error('Error:', {
        ensemble: 'my-ensemble',
        error: error.message,
        stack: error.stack
      });

      return Response.json({ error: error.message }, { status: 500 });
    }
  }
};
View logs:
wrangler tail
wrangler tail --status=error

Error Tracking

Store errors for analysis:
ensemble: error-tracked-workflow

agents:
  - name: process
    agent: processor
    inputs:
      data: ${input.data}

  # Log errors
  - name: log-error
    condition: ${process.failed}
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO errors
        (ensemble, agent, error_message, input_data, timestamp)
        VALUES (?, ?, ?, ?, ?)
      params:
        - error-tracked-workflow
        - process
        - ${process.error}
        - ${JSON.stringify(input.data)}
        - ${Date.now()}

output:
  result: ${process.output}
  error: ${process.error}
Query errors:
-- Error frequency
SELECT
  agent,
  error_message,
  COUNT(*) as occurrences
FROM errors
WHERE timestamp > datetime('now', '-7 days')
GROUP BY agent, error_message
ORDER BY occurrences DESC
LIMIT 10;

Debugging

Enable Debug Mode

ensemble: debuggable-workflow

debug: true  # Enable debug output

agents:
  # ... agents ...
Output includes:
  • Execution timeline
  • Operation inputs/outputs
  • Cache hits/misses
  • Retry attempts
  • Error details

Local Development

# Run with debug logs
wrangler dev --log-level=debug

# Test ensemble locally
curl http://localhost:8787/api/v1/execute \
  -H "Content-Type: application/json" \
  -d '{
    "ensemble": "my-ensemble",
    "input": { "test": "data" }
  }'

Trace Execution

ensemble: traceable-workflow

agents:
  - name: step1
    operation: code
    config:
      code: |
        console.log('Step1 input:', ${JSON.stringify(input)});
        const result = processData(${input.data});
        console.log('Step1 output:', JSON.stringify(result));
        return result;

  - name: step2
    operation: code
    config:
      code: |
        console.log('Step2 input:', ${JSON.stringify(step1.output)});
        const result = transformData(${step1.output});
        console.log('Step2 output:', JSON.stringify(result));
        return result;

Performance Testing

Load Testing

// scripts/load-test.ts
import { Conductor } from '@ensemble-edge/conductor';

async function loadTest() {
  const results = [];

  // Send 100 concurrent requests
  const promises = Array.from({ length: 100 }, async (_, i) => {
    const start = Date.now();

    const result = await fetch('https://your-worker.workers.dev', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        ensemble: 'my-ensemble',
        input: { id: i }
      })
    });

    const duration = Date.now() - start;
    return { success: result.ok, duration };
  });

  const results = await Promise.all(promises);

  // Analyze results
  const successes = results.filter(r => r.success).length;
  const avgDuration = results.reduce((sum, r) => sum + r.duration, 0) / results.length;
  const p95Duration = results.sort((a, b) => a.duration - b.duration)[Math.floor(results.length * 0.95)].duration;

  console.log({
    total: results.length,
    successes,
    success_rate: (successes / results.length * 100).toFixed(2) + '%',
    avg_duration_ms: avgDuration.toFixed(0),
    p95_duration_ms: p95Duration
  });
}

loadTest();

Best Practices

  1. Test Coverage - Aim for 80%+ coverage
  2. Mock External Calls - Don’t hit real APIs in tests
  3. Test Error Paths - Test failures and fallbacks
  4. Monitor Continuously - Track metrics in production
  5. Log Strategically - Log important events, not everything
  6. Debug Locally First - Use wrangler dev before deploying
  7. Track Errors - Store errors for analysis
  8. Performance Test - Load test before major releases
  9. Alert on Anomalies - Set up alerts for errors/slowness
  10. Review Logs Regularly - Check for patterns

Next Steps