Overview
The Executor class is the core execution engine that processes ensemble workflows. It manages member execution, state transitions, control flow, error handling, and runtime coordination.
import { Executor } from '@ensemble-edge/conductor';
const executor = new Executor({
env,
ensemble: parsedEnsemble,
input: { userId: 'user_123' },
timeout: 30000
});
const result = await executor.execute();
Constructor
Executor configuration optionsCloudflare Workers environment bindings (D1, KV, AI, etc.)
Parsed ensemble definition from Parser
Initial input data for the workflow
Maximum execution time in milliseconds
Custom execution ID (auto-generated if not provided)
options.parentExecutionId
Parent execution ID for nested workflows
Additional execution context
interface ExecutorOptions {
env: Env;
ensemble: ParsedEnsemble;
input?: Record<string, any>;
timeout?: number;
executionId?: string;
parentExecutionId?: string;
context?: ExecutionContext;
}
Methods
execute()
Execute the ensemble workflow from start to finish.
async execute(): Promise<ExecutionResult>
Returns: Promise<ExecutionResult>
interface ExecutionResult {
id: string;
status: 'completed' | 'failed' | 'cancelled' | 'timeout';
output?: Record<string, any>;
error?: ExecutionError;
duration: number;
timestamp: number;
metadata: ExecutionMetadata;
}
Example:
const executor = new Executor({
env,
ensemble: parsedEnsemble,
input: { orderId: 'ORD-123' }
});
const result = await executor.execute();
if (result.status === 'completed') {
console.log('Output:', result.output);
} else {
console.error('Error:', result.error);
}
executeMember()
Execute a specific member within the ensemble.
async executeMember(
member: Member,
input: Record<string, any>
): Promise<MemberResult>
Input data for the member
Returns: Promise<MemberResult>
interface MemberResult {
member: string;
status: 'success' | 'error' | 'skipped';
output?: any;
error?: Error;
duration: number;
cached?: boolean;
}
Example:
const member = ensemble.flow.find(m => m.member === 'validate-order');
const result = await executor.executeMember(member, {
orderId: 'ORD-123'
});
evaluateCondition()
Evaluate a conditional expression in the current execution context.
evaluateCondition(condition: string): boolean
Condition expression (e.g., ${input.amount > 100})
Returns: boolean
Example:
const shouldExecute = executor.evaluateCondition(
'${validate-order.output.valid === true}'
);
if (shouldExecute) {
// Execute next member
}
resolveExpression()
Resolve an expression to its value in the current context.
resolveExpression(expression: string): any
Expression to resolve (e.g., ${input.userId})
Returns: any - Resolved value
Example:
const userId = executor.resolveExpression('${input.userId}');
const total = executor.resolveExpression('${calculate-total.output.amount}');
getState()
Get the current execution state.
getState(): ExecutionState
Returns: ExecutionState
interface ExecutionState {
id: string;
status: 'running' | 'waiting' | 'paused' | 'completed' | 'failed';
input: Record<string, any>;
context: Record<string, any>;
memberOutputs: Map<string, any>;
currentMember?: string;
startTime: number;
metadata: Record<string, any>;
}
Example:
const state = executor.getState();
console.log(`Current member: ${state.currentMember}`);
console.log(`Status: ${state.status}`);
cancel()
Cancel the execution.
cancel(reason?: string): void
Optional cancellation reason
Example:
// Cancel after timeout
setTimeout(() => {
executor.cancel('Timeout exceeded');
}, 30000);
pause()
Pause execution (for HITL or async operations).
async pause(options: PauseOptions): Promise<string>
Pause configurationMember that triggered the pause
Returns: Promise<string> - Resume token
Example:
const token = await executor.pause({
reason: 'waiting_for_approval',
member: 'approval-step',
expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24 hours
});
console.log(`Resume with token: ${token}`);
resume()
Resume a paused execution.
async resume(token: string, input: Record<string, any>): Promise<ExecutionResult>
Resume token from pause()
Input data to resume with
Returns: Promise<ExecutionResult>
Example:
const result = await executor.resume('hitl_token_123', {
approved: true,
comments: 'Looks good'
});
Properties
Unique execution identifier.
status
readonly status: ExecutionStatus
Current execution status: running, waiting, paused, completed, failed, cancelled
ensemble
readonly ensemble: ParsedEnsemble
The parsed ensemble being executed.
startTime
readonly startTime: number
Execution start timestamp.
duration
readonly duration: number
Elapsed execution time in milliseconds.
Events
The Executor emits events during execution that can be subscribed to:
executor.on('member.start', (data) => {
console.log(`Starting member: ${data.member}`);
});
executor.on('member.complete', (data) => {
console.log(`Completed member: ${data.member}`, data.output);
});
executor.on('member.error', (data) => {
console.error(`Error in member: ${data.member}`, data.error);
});
executor.on('execution.complete', (result) => {
console.log('Execution completed:', result);
});
Event Types
| Event | Data | Description |
member.start | { member, timestamp } | Member execution started |
member.complete | { member, output, duration } | Member completed successfully |
member.error | { member, error } | Member execution failed |
member.skip | { member, reason } | Member skipped (condition) |
execution.pause | { member, token } | Execution paused (HITL) |
execution.resume | { token } | Execution resumed |
execution.complete | ExecutionResult | Execution finished |
execution.error | ExecutionError | Execution failed |
Error Handling
The Executor provides comprehensive error handling:
try {
const result = await executor.execute();
} catch (error) {
if (error instanceof ExecutionError) {
console.error('Execution failed:', {
code: error.code,
message: error.message,
member: error.member,
stack: error.stack
});
}
}
Error Types
class ExecutionError extends Error {
code: string;
member?: string;
details?: Record<string, any>;
}
Common error codes:
TIMEOUT - Execution exceeded timeout
MEMBER_ERROR - Member execution failed
VALIDATION_ERROR - Input validation failed
CONDITION_ERROR - Condition evaluation failed
EXPRESSION_ERROR - Expression resolution failed
CANCELLED - Execution cancelled
Advanced Usage
Nested Workflows
Execute ensembles within ensembles:
const parentExecutor = new Executor({
env,
ensemble: parentEnsemble,
input: { userId: 'user_123' }
});
// Child executor inherits parent context
const childExecutor = new Executor({
env,
ensemble: childEnsemble,
input: { orderId: 'ORD-123' },
parentExecutionId: parentExecutor.id
});
const childResult = await childExecutor.execute();
Custom Context
Pass additional context to the execution:
const executor = new Executor({
env,
ensemble: parsedEnsemble,
input: { userId: 'user_123' },
context: {
requestId: 'req_abc123',
userAgent: 'Mozilla/5.0...',
ip: '192.168.1.1',
metadata: {
source: 'api',
version: '1.0.0'
}
}
});
Access context in members:
// In member input
input:
userId: ${input.userId}
requestId: ${context.requestId}
userAgent: ${context.userAgent}
Progress Tracking
Track execution progress:
const executor = new Executor({ env, ensemble, input });
let completed = 0;
const total = ensemble.flow.length;
executor.on('member.complete', () => {
completed++;
const progress = (completed / total * 100).toFixed(1);
console.log(`Progress: ${progress}%`);
});
await executor.execute();
Streaming Results
Stream execution progress via SSE:
const stream = new ReadableStream({
async start(controller) {
const executor = new Executor({ env, ensemble, input });
executor.on('member.complete', (data) => {
controller.enqueue(
`data: ${JSON.stringify({ type: 'member.complete', data })}\n\n`
);
});
executor.on('execution.complete', (result) => {
controller.enqueue(
`data: ${JSON.stringify({ type: 'complete', data: result })}\n\n`
);
controller.close();
});
await executor.execute();
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
Best Practices
- Set appropriate timeouts - Prevent long-running executions
- Handle errors gracefully - Catch and log all errors
- Use events for monitoring - Track execution progress
- Clean up resources - Cancel executions when needed
- Validate input - Check input before execution
- Log execution context - Include execution ID in logs
- Monitor performance - Track duration and identify bottlenecks
- Use custom context - Pass request metadata
- Test error scenarios - Verify error handling
- Document expectations - Clear input/output contracts