Documentation Index Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
Use this file to discover all available pages before exploring further.
Testing Strategy
Test Pyramid
/\
/ \ E2E Tests (few)
/____\
/ \ Integration Tests (some)
/________\
/ \ Unit Tests (many)
/____________\
Unit Tests : Test individual agents
Integration Tests : Test ensembles end-to-end
E2E Tests : Test full user journeys
Unit Testing Agents
Basic Agent Test
// agents/company-enricher/agent.test.ts
import { describe , it , expect , beforeEach } from 'vitest' ;
import { TestConductor } from '@ensemble-edge/conductor/testing' ;
describe ( 'company-enricher agent' , () => {
let conductor : TestConductor ;
beforeEach ( async () => {
conductor = await TestConductor . create ();
await conductor . loadProject ( './' );
});
it ( 'should enrich company data' , async () => {
const result = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic' ,
include_news: false
});
expect ( result ). toBeSuccessful ();
expect ( result . output . company_data ). toMatchObject ({
name: expect . any ( String ),
description: expect . any ( String ),
industry: expect . any ( String )
});
});
it ( 'should cache on second call' , async () => {
const result1 = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic'
});
const result2 = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic'
});
expect ( result1 . output . from_cache ). toBe ( false );
expect ( result2 . output . from_cache ). toBe ( true );
});
it ( 'should handle failures gracefully' , async () => {
conductor . mockOperation ( 'scrape-primary' , {
failed: true ,
error: 'Timeout'
});
const result = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic'
});
expect ( result ). toBeSuccessful ();
expect ( result . operations [ 'scrape-fallback' ]. executed ). toBe ( true );
});
});
Mocking Operations
describe ( 'company-enricher with mocks' , () => {
it ( 'should work with mocked operations' , async () => {
const conductor = await TestConductor . create ();
await conductor . loadProject ( './' );
// Mock search
conductor . mockOperation ( 'search' , {
output: {
AbstractURL: 'https://anthropic.com'
}
});
// Mock scrape
conductor . mockOperation ( 'scrape-primary' , {
output: {
body: '<html>Anthropic is an AI safety company</html>'
}
});
// Mock AI extraction
conductor . mockOperation ( 'extract' , {
output: {
name: 'Anthropic' ,
description: 'AI safety company' ,
industry: 'Artificial Intelligence'
}
});
const result = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic'
});
expect ( result ). toBeSuccessful ();
expect ( result . output . company_data . name ). toBe ( 'Anthropic' );
});
});
Testing Error Paths
describe ( 'company-enricher error handling' , () => {
it ( 'should retry failed operations' , async () => {
const conductor = await TestConductor . create ();
await conductor . loadProject ( './' );
let callCount = 0 ;
conductor . mockOperation ( 'search' , () => {
callCount ++ ;
if ( callCount < 3 ) {
return { failed: true , error: 'Timeout' };
}
return {
output: { AbstractURL: 'https://anthropic.com' }
};
});
const result = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic'
});
expect ( result ). toBeSuccessful ();
expect ( callCount ). toBe ( 3 ); // Retried twice
});
it ( 'should use fallback on persistent failure' , async () => {
const conductor = await TestConductor . create ();
await conductor . loadProject ( './' );
conductor . mockOperation ( 'search' , {
failed: true ,
error: 'Service unavailable'
});
conductor . mockOperation ( 'scrape-primary' , {
failed: true ,
error: 'Cannot connect'
});
const result = await conductor . executeAgent ( 'company-enricher' , {
company_name: 'Anthropic'
});
expect ( result . operations [ 'scrape-fallback' ]. executed ). toBe ( true );
});
});
Integration Testing Ensembles
Basic Ensemble Test
// ensembles/process-invoice.test.ts
import { describe , it , expect } from 'vitest' ;
import { TestConductor } from '@ensemble-edge/conductor/testing' ;
describe ( 'process-invoice ensemble' , () => {
it ( 'should process valid invoice end-to-end' , async () => {
const conductor = await TestConductor . create ({
env: {
STRIPE_API_KEY: process . env . STRIPE_API_KEY
}
});
await conductor . loadProject ( './' );
const result = await conductor . execute ( 'process-invoice' , {
invoice: {
id: 'INV-001' ,
amount: 100 ,
customer_email: 'test@example.com'
}
});
expect ( result ). toBeSuccessful ();
expect ( result . output . processed ). toBe ( true );
expect ( result . output . payment_id ). toBeDefined ();
expect ( result . output . confirmation_sent ). toBe ( true );
});
it ( 'should reject invalid invoice' , async () => {
const conductor = await TestConductor . create ();
await conductor . loadProject ( './' );
const result = await conductor . execute ( 'process-invoice' , {
invoice: {
id: 'INV-002' ,
amount: - 100 // Invalid
}
});
expect ( result ). toBeSuccessful ();
expect ( result . output . processed ). toBe ( false );
expect ( result . output . errors ). toContain ( 'Invalid amount' );
});
});
Observability Configuration
Conductor provides Cloudflare-first observability with structured logging, metrics via Analytics Engine, and optional OpenTelemetry export.
Configuration in conductor.config.ts
// conductor.config.ts
import { defineConfig } from '@ensemble-edge/conductor'
export default defineConfig ({
observability: {
// Logging configuration
logging: {
enabled: true ,
level: 'info' , // 'debug' | 'info' | 'warn' | 'error'
format: 'json' , // JSON for Workers Logs indexing
// Fields to automatically include in all logs
context: [ 'requestId' , 'executionId' , 'ensembleName' , 'agentName' ],
// Sensitive fields to redact (replaced with [REDACTED])
redact: [ 'password' , 'apiKey' , 'token' , 'authorization' , 'secret' ],
// Events to log automatically
events: [
'request' , // HTTP requests
'response' , // HTTP responses
'agent:start' , // Agent execution start
'agent:complete' , // Agent completion
'agent:error' , // Agent errors
],
},
// Analytics Engine metrics
metrics: {
enabled: true ,
binding: 'ANALYTICS' , // Must match wrangler.toml
track: [
'ensemble:execution' , // Ensemble duration/success
'agent:execution' , // Agent duration/success
'http:request' , // HTTP request stats
'error' , // Error counts
],
},
// Optional: External observability (Datadog, Honeycomb, etc.)
opentelemetry: {
enabled: false ,
endpoint: 'https://api.honeycomb.io' ,
headers: { 'x-honeycomb-team' : '${HONEYCOMB_API_KEY}' },
samplingRate: 1.0 ,
},
// Track AI token usage for cost analysis
trackTokenUsage: true ,
} ,
})
Setting Up Analytics Engine
Add to your wrangler.toml:
# Analytics Engine for metrics
[[ analytics_engine_datasets ]]
binding = "ANALYTICS"
dataset = "conductor-metrics"
Then create the dataset in Cloudflare Dashboard:
Go to Workers & Pages → Analytics Engine
Click Create dataset
Name it conductor-metrics
Telemetry vs Observability
Observability (logging, traces) is for debugging—understanding how your code executed.
Telemetry is for analytics—tracking business metrics for billing, dashboards, and trend analysis.
Use Case Tool Debug failures, trace execution logger, observability configTrack costs, revenue, conversion telemetry operation
For example, in a payment agent: use logger to debug failures, use telemetry to track daily revenue and success rates.
Logging in Agents
Every agent receives a pre-configured logger and metrics recorder in its execution context:
// scripts/my-agent.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function myAgent ( context : AgentExecutionContext ) {
const { input , logger , metrics , executionId , requestId } = context
// Structured logging - automatically includes execution context
logger . info ( 'Processing started' , {
itemCount: input . items . length ,
category: input . category
})
const startTime = Date . now ()
try {
const result = await processItems ( input . items )
// Log completion
logger . info ( 'Processing complete' , {
processedCount: result . length ,
durationMs: Date . now () - startTime
})
// Record custom metric
metrics . record ( 'items.processed' , result . length , {
category: input . category
})
return result
} catch ( error ) {
// Error logging with stack trace
logger . error ( 'Processing failed' , error , {
itemCount: input . items . length ,
attemptedAt: Date . now ()
})
// Record error metric
metrics . recordError ( 'ProcessingError' , input . category )
throw error
}
}
Log Levels
Level Use For Example debugDetailed debugging Input/output values, cache decisions infoNormal operations Start/complete events, important milestones warnPotential issues Rate limits, fallback usage, slow queries errorFailures Exceptions, validation errors, API failures
YAML-Based Observability
Agent-Level Logging
# agents/sensitive-processor/agent.yaml
name : sensitive-processor
operation : code
config :
script : scripts/sensitive-processor
# Agent-level logging overrides
logging :
level : debug # More verbose for this agent
redact :
- password
- ssn
- creditCard
events :
onStart : true
onComplete : true
onError : true
# Agent-level metrics
metrics :
enabled : true
custom :
- name : records.processed
value : output.count
type : counter
Ensemble-Level Logging
# ensembles/user-workflow.yaml
name : user-workflow
# Ensemble-level logging configuration
logging :
level : info
trace :
enabled : true
includeInputs : true
includeOutputs : false # Don't log large outputs
redactInputs :
- password
- creditCard
# Ensemble-level metrics
metrics :
enabled : true
custom :
- name : workflow.completed
condition : success
type : counter
- name : workflow.duration
value : _executionTime
type : histogram
flow :
- name : validate
agent : validator
input :
data : ${input.userData}
- name : process
agent : processor
input :
validated : ${validate.output}
output :
success : ${process.output.success}
data : ${process.output.result}
Querying Metrics
Analytics Engine SQL API
Query your metrics using the Cloudflare dashboard or API:
-- Average ensemble execution time (last 24 hours)
SELECT
blob1 as ensemble_name,
AVG (double1) as avg_duration_ms,
MIN (double1) as min_duration_ms,
MAX (double1) as max_duration_ms,
COUNT ( * ) as executions
FROM conductor_metrics
WHERE index1 = 'ensemble.execution'
AND timestamp > NOW () - INTERVAL '24' HOUR
GROUP BY blob1
ORDER BY avg_duration_ms DESC
-- Agent error rate
SELECT
blob1 as agent_name,
SUM ( CASE WHEN double2 = 0 THEN 1 ELSE 0 END ) as failures,
COUNT ( * ) as total,
ROUND ( SUM ( CASE WHEN double2 = 0 THEN 1 ELSE 0 END ) * 100 . 0 / COUNT ( * ), 2 ) as error_rate_pct
FROM conductor_metrics
WHERE index1 = 'agent.execution'
AND timestamp > NOW () - INTERVAL '7' DAY
GROUP BY blob1
HAVING COUNT ( * ) > 10
ORDER BY error_rate_pct DESC
-- Cache hit rate
SELECT
blob1 as cache_status,
COUNT ( * ) as count,
ROUND ( COUNT ( * ) * 100 . 0 / SUM ( COUNT ( * )) OVER (), 2 ) as percentage
FROM conductor_metrics
WHERE index1 = 'cache.performance'
AND timestamp > NOW () - INTERVAL '1' DAY
GROUP BY blob1
Production Monitoring
Cloudflare Workers Dashboard
View real-time metrics:
Workers & Pages → Your worker → Metrics
Monitor:
Requests per second
Error rate
CPU time
Duration (p50, p95, p99)
Viewing Logs
# Stream all logs
wrangler tail
# Filter by status
wrangler tail --status=error
# Filter by specific execution
wrangler tail --search= "exec_abc123"
Conductor outputs structured JSON logs for Cloudflare Workers Logs:
{
"timestamp" : "2024-01-15T10:30:00.000Z" ,
"level" : "info" ,
"message" : "Agent execution completed" ,
"service" : "conductor" ,
"requestId" : "req_abc123" ,
"executionId" : "exec_xyz789" ,
"ensembleName" : "user-workflow" ,
"agentName" : "processor" ,
"stepIndex" : 2 ,
"durationMs" : 150 ,
"cached" : false
}
OpenTelemetry Integration
Export to external platforms like Datadog or Honeycomb:
// conductor.config.ts
export default defineConfig ({
observability: {
opentelemetry: {
enabled: true ,
endpoint: 'https://http-intake.logs.datadoghq.com/api/v2/logs' ,
headers: {
'DD-API-KEY' : '${DATADOG_API_KEY}' ,
'DD-SOURCE' : 'conductor'
},
samplingRate: 0.1 , // Sample 10% of requests
},
} ,
})
Debugging
Enable Debug Mode
ensemble : debuggable-workflow
logging :
level : debug # Enable verbose logging
trace :
enabled : true
includeInputs : true
includeOutputs : true
flow :
# ... agents ...
Local Development
# Run with debug logs
wrangler dev --log-level=debug
# Test ensemble locally
curl -X POST http://localhost:8787/api/v1/execute/ensemble/my-ensemble \
-H "Content-Type: application/json" \
-d '{"input": { "test": "data" }}'
Trace Execution with Logger
// scripts/traceable-agent.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function traceableAgent ( context : AgentExecutionContext ) {
const { logger , executionId , input } = context
// Use built-in logger instead of console.log
logger . debug ( 'Agent started' , {
executionId ,
inputKeys: Object . keys ( input )
})
const result = await processData ( input . data )
logger . debug ( 'Agent completed' , {
executionId ,
outputKeys: Object . keys ( result )
})
return result
}
Load Testing
// scripts/load-test.ts
async function loadTest () {
// Send 100 concurrent requests
const promises = Array . from ({ length: 100 }, async ( _ , i ) => {
const start = Date . now ();
const result = await fetch ( 'https://your-worker.workers.dev/api/v1/execute/ensemble/my-ensemble' , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({ input: { id: i } })
});
const duration = Date . now () - start ;
return { success: result . ok , duration };
});
const results = await Promise . all ( promises );
// Analyze results
const successes = results . filter ( r => r . success ). length ;
const avgDuration = results . reduce (( sum , r ) => sum + r . duration , 0 ) / results . length ;
const sorted = results . sort (( a , b ) => a . duration - b . duration );
const p95Duration = sorted [ Math . floor ( results . length * 0.95 )]. duration ;
console . log ({
total: results . length ,
successes ,
success_rate: ( successes / results . length * 100 ). toFixed ( 2 ) + '%' ,
avg_duration_ms: avgDuration . toFixed ( 0 ),
p95_duration_ms: p95Duration
});
}
loadTest ();
Best Practices
Test Coverage - Aim for 80%+ coverage
Mock External Calls - Don’t hit real APIs in tests
Test Error Paths - Test failures and fallbacks
Use Structured Logging - Always log as JSON with context
Redact Sensitive Data - Configure redact patterns
Monitor Continuously - Track metrics in production
Set Up Alerts - Alert on error rate spikes
Sample in Production - Use samplingRate for high-volume traffic
Debug Locally First - Use wrangler dev before deploying
Review Logs Regularly - Check for patterns and anomalies
Next Steps
Creating Agents Build testable agents
Writing Ensembles Design robust workflows
Playbooks Real-world examples
Operations Reference All operations docs