Testing & Observability
Build confidence with testing. Monitor production with observability. Debug issues fast.Testing Strategy
Test Pyramid
Copy
/\
/ \ E2E Tests (few)
/____\
/ \ Integration Tests (some)
/________\
/ \ Unit Tests (many)
/____________\
Unit Testing Agents
Basic Agent Test
Copy
// agents/company-enricher/agent.test.ts
import { describe, it, expect, beforeEach } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('company-enricher agent', () => {
let conductor: TestConductor;
beforeEach(async () => {
conductor = await TestConductor.create();
await conductor.loadProject('./');
});
it('should enrich company data', async () => {
const result = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic',
include_news: false
});
expect(result).toBeSuccessful();
expect(result.output.company_data).toMatchObject({
name: expect.any(String),
description: expect.any(String),
industry: expect.any(String)
});
});
it('should cache on second call', async () => {
const result1 = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic'
});
const result2 = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic'
});
expect(result1.output.from_cache).toBe(false);
expect(result2.output.from_cache).toBe(true);
});
it('should handle failures gracefully', async () => {
conductor.mockOperation('scrape-primary', {
failed: true,
error: 'Timeout'
});
const result = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic'
});
expect(result).toBeSuccessful();
expect(result.operations['scrape-fallback'].executed).toBe(true);
});
});
Mocking Operations
Copy
describe('company-enricher with mocks', () => {
it('should work with mocked operations', async () => {
const conductor = await TestConductor.create();
await conductor.loadProject('./');
// Mock search
conductor.mockOperation('search', {
output: {
AbstractURL: 'https://anthropic.com'
}
});
// Mock scrape
conductor.mockOperation('scrape-primary', {
output: {
body: '<html>Anthropic is an AI safety company</html>'
}
});
// Mock AI extraction
conductor.mockOperation('extract', {
output: {
name: 'Anthropic',
description: 'AI safety company',
industry: 'Artificial Intelligence'
}
});
const result = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic'
});
expect(result).toBeSuccessful();
expect(result.output.company_data.name).toBe('Anthropic');
});
});
Testing Error Paths
Copy
describe('company-enricher error handling', () => {
it('should retry failed operations', async () => {
const conductor = await TestConductor.create();
await conductor.loadProject('./');
let callCount = 0;
conductor.mockOperation('search', () => {
callCount++;
if (callCount < 3) {
return { failed: true, error: 'Timeout' };
}
return {
output: { AbstractURL: 'https://anthropic.com' }
};
});
const result = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic'
});
expect(result).toBeSuccessful();
expect(callCount).toBe(3); // Retried twice
});
it('should use fallback on persistent failure', async () => {
const conductor = await TestConductor.create();
await conductor.loadProject('./');
conductor.mockOperation('search', {
failed: true,
error: 'Service unavailable'
});
conductor.mockOperation('scrape-primary', {
failed: true,
error: 'Cannot connect'
});
const result = await conductor.executeAgent('company-enricher', {
company_name: 'Anthropic'
});
expect(result.operations['scrape-fallback'].executed).toBe(true);
});
});
Integration Testing Ensembles
Basic Ensemble Test
Copy
// ensembles/process-invoice.test.ts
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('process-invoice ensemble', () => {
it('should process valid invoice end-to-end', async () => {
const conductor = await TestConductor.create({
env: {
STRIPE_API_KEY: process.env.STRIPE_API_KEY
}
});
await conductor.loadProject('./');
const result = await conductor.execute('process-invoice', {
invoice: {
id: 'INV-001',
amount: 100,
customer_email: 'test@example.com'
}
});
expect(result).toBeSuccessful();
expect(result.output.processed).toBe(true);
expect(result.output.payment_id).toBeDefined();
expect(result.output.confirmation_sent).toBe(true);
});
it('should reject invalid invoice', async () => {
const conductor = await TestConductor.create();
await conductor.loadProject('./');
const result = await conductor.execute('process-invoice', {
invoice: {
id: 'INV-002',
amount: -100 // Invalid
}
});
expect(result).toBeSuccessful();
expect(result.output.processed).toBe(false);
expect(result.output.errors).toContain('Invalid amount');
});
});
Testing Parallel Execution
Copy
describe('parallel-processing ensemble', () => {
it('should execute agents in parallel', async () => {
const conductor = await TestConductor.create();
await conductor.loadProject('./');
const result = await conductor.execute('parallel-processing', {
urls: ['url1', 'url2', 'url3']
});
expect(result).toBeSuccessful();
// Check all agents executed
expect(result.agents['fetch-1'].executed).toBe(true);
expect(result.agents['fetch-2'].executed).toBe(true);
expect(result.agents['fetch-3'].executed).toBe(true);
// Check they started at roughly the same time
const startTimes = [
result.agents['fetch-1'].startTime,
result.agents['fetch-2'].startTime,
result.agents['fetch-3'].startTime
];
const timeSpread = Math.max(...startTimes) - Math.min(...startTimes);
expect(timeSpread).toBeLessThan(100); // Within 100ms
});
});
Testing State Management
Copy
describe('stateful-workflow ensemble', () => {
it('should maintain state across operations', async () => {
const conductor = await TestConductor.create();
await conductor.loadProject('./');
const result = await conductor.execute('stateful-workflow', {
item: 'item1'
});
expect(result).toBeSuccessful();
expect(result.state.processed_items).toContain('item1');
expect(result.state.total_count).toBe(1);
});
});
Production Monitoring
Cloudflare Analytics
View metrics in Cloudflare dashboard:- Workers & Pages � Your worker � Metrics
- View:
- Requests per second
- Error rate
- CPU time
- Duration (p50, p95, p99)
Custom Metrics
Log custom metrics:Copy
ensemble: monitored-workflow
agents:
- name: process
agent: processor
inputs:
data: ${input.data}
# Log metrics
- name: log-metrics
operation: storage
config:
type: d1
query: |
INSERT INTO metrics
(ensemble, agent, duration_ms, success, timestamp)
VALUES (?, ?, ?, ?, ?)
params:
- monitored-workflow
- process
- ${process.duration}
- ${!process.failed}
- ${Date.now()}
output:
result: ${process.output}
duration_ms: ${process.duration}
Copy
-- Average duration by agent
SELECT
agent,
AVG(duration_ms) as avg_duration_ms,
MIN(duration_ms) as min_duration_ms,
MAX(duration_ms) as max_duration_ms,
COUNT(*) as executions
FROM metrics
WHERE timestamp > datetime('now', '-24 hours')
GROUP BY agent
ORDER BY avg_duration_ms DESC;
-- Success rate by ensemble
SELECT
ensemble,
COUNT(*) as total,
SUM(CASE WHEN success THEN 1 ELSE 0 END) as successes,
ROUND(AVG(CASE WHEN success THEN 1.0 ELSE 0.0 END) * 100, 2) as success_rate_pct
FROM metrics
WHERE timestamp > datetime('now', '-24 hours')
GROUP BY ensemble;
Logging
Copy
// src/index.ts
import { Conductor } from '@ensemble-edge/conductor';
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const conductor = new Conductor({ env, ctx });
// Log request
console.log('Request:', {
url: request.url,
method: request.method,
timestamp: Date.now()
});
try {
const result = await conductor.execute('my-ensemble', {});
// Log success
console.log('Success:', {
ensemble: 'my-ensemble',
duration_ms: result.duration,
agents_executed: Object.keys(result.agents).filter(k => result.agents[k].executed).length
});
return Response.json(result.output);
} catch (error) {
// Log error
console.error('Error:', {
ensemble: 'my-ensemble',
error: error.message,
stack: error.stack
});
return Response.json({ error: error.message }, { status: 500 });
}
}
};
Copy
wrangler tail
wrangler tail --status=error
Error Tracking
Store errors for analysis:Copy
ensemble: error-tracked-workflow
agents:
- name: process
agent: processor
inputs:
data: ${input.data}
# Log errors
- name: log-error
condition: ${process.failed}
operation: storage
config:
type: d1
query: |
INSERT INTO errors
(ensemble, agent, error_message, input_data, timestamp)
VALUES (?, ?, ?, ?, ?)
params:
- error-tracked-workflow
- process
- ${process.error}
- ${JSON.stringify(input.data)}
- ${Date.now()}
output:
result: ${process.output}
error: ${process.error}
Copy
-- Error frequency
SELECT
agent,
error_message,
COUNT(*) as occurrences
FROM errors
WHERE timestamp > datetime('now', '-7 days')
GROUP BY agent, error_message
ORDER BY occurrences DESC
LIMIT 10;
Debugging
Enable Debug Mode
Copy
ensemble: debuggable-workflow
debug: true # Enable debug output
agents:
# ... agents ...
- Execution timeline
- Operation inputs/outputs
- Cache hits/misses
- Retry attempts
- Error details
Local Development
Copy
# Run with debug logs
wrangler dev --log-level=debug
# Test ensemble locally
curl http://localhost:8787/api/v1/execute \
-H "Content-Type: application/json" \
-d '{
"ensemble": "my-ensemble",
"input": { "test": "data" }
}'
Trace Execution
Copy
ensemble: traceable-workflow
agents:
- name: step1
operation: code
config:
code: |
console.log('Step1 input:', ${JSON.stringify(input)});
const result = processData(${input.data});
console.log('Step1 output:', JSON.stringify(result));
return result;
- name: step2
operation: code
config:
code: |
console.log('Step2 input:', ${JSON.stringify(step1.output)});
const result = transformData(${step1.output});
console.log('Step2 output:', JSON.stringify(result));
return result;
Performance Testing
Load Testing
Copy
// scripts/load-test.ts
import { Conductor } from '@ensemble-edge/conductor';
async function loadTest() {
const results = [];
// Send 100 concurrent requests
const promises = Array.from({ length: 100 }, async (_, i) => {
const start = Date.now();
const result = await fetch('https://your-worker.workers.dev', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
ensemble: 'my-ensemble',
input: { id: i }
})
});
const duration = Date.now() - start;
return { success: result.ok, duration };
});
const results = await Promise.all(promises);
// Analyze results
const successes = results.filter(r => r.success).length;
const avgDuration = results.reduce((sum, r) => sum + r.duration, 0) / results.length;
const p95Duration = results.sort((a, b) => a.duration - b.duration)[Math.floor(results.length * 0.95)].duration;
console.log({
total: results.length,
successes,
success_rate: (successes / results.length * 100).toFixed(2) + '%',
avg_duration_ms: avgDuration.toFixed(0),
p95_duration_ms: p95Duration
});
}
loadTest();
Best Practices
- Test Coverage - Aim for 80%+ coverage
- Mock External Calls - Don’t hit real APIs in tests
- Test Error Paths - Test failures and fallbacks
- Monitor Continuously - Track metrics in production
- Log Strategically - Log important events, not everything
- Debug Locally First - Use
wrangler devbefore deploying - Track Errors - Store errors for analysis
- Performance Test - Load test before major releases
- Alert on Anomalies - Set up alerts for errors/slowness
- Review Logs Regularly - Check for patterns

