Skip to main content

Overview

The Executor class is the core execution engine that processes ensemble workflows. It manages member execution, state transitions, control flow, error handling, and runtime coordination.
import { Executor } from '@ensemble-edge/conductor';

const executor = new Executor({
  env,
  ensemble: parsedEnsemble,
  input: { userId: 'user_123' },
  timeout: 30000
});

const result = await executor.execute();

Constructor

options
ExecutorOptions
required
Executor configuration options
options.env
Env
required
Cloudflare Workers environment bindings (D1, KV, AI, etc.)
options.ensemble
ParsedEnsemble
required
Parsed ensemble definition from Parser
options.input
object
Initial input data for the workflow
options.timeout
number
default:"30000"
Maximum execution time in milliseconds
options.executionId
string
Custom execution ID (auto-generated if not provided)
options.parentExecutionId
string
Parent execution ID for nested workflows
options.context
ExecutionContext
Additional execution context
interface ExecutorOptions {
  env: Env;
  ensemble: ParsedEnsemble;
  input?: Record<string, any>;
  timeout?: number;
  executionId?: string;
  parentExecutionId?: string;
  context?: ExecutionContext;
}

Methods

execute()

Execute the ensemble workflow from start to finish.
async execute(): Promise<ExecutionResult>
Returns: Promise<ExecutionResult>
interface ExecutionResult {
  id: string;
  status: 'completed' | 'failed' | 'cancelled' | 'timeout';
  output?: Record<string, any>;
  error?: ExecutionError;
  duration: number;
  timestamp: number;
  metadata: ExecutionMetadata;
}
Example:
const executor = new Executor({
  env,
  ensemble: parsedEnsemble,
  input: { orderId: 'ORD-123' }
});

const result = await executor.execute();

if (result.status === 'completed') {
  console.log('Output:', result.output);
} else {
  console.error('Error:', result.error);
}

executeMember()

Execute a specific member within the ensemble.
async executeMember(
  member: Member,
  input: Record<string, any>
): Promise<MemberResult>
member
Member
required
Member to execute
input
object
required
Input data for the member
Returns: Promise<MemberResult>
interface MemberResult {
  member: string;
  status: 'success' | 'error' | 'skipped';
  output?: any;
  error?: Error;
  duration: number;
  cached?: boolean;
}
Example:
const member = ensemble.flow.find(m => m.member === 'validate-order');
const result = await executor.executeMember(member, {
  orderId: 'ORD-123'
});

evaluateCondition()

Evaluate a conditional expression in the current execution context.
evaluateCondition(condition: string): boolean
condition
string
required
Condition expression (e.g., ${input.amount > 100})
Returns: boolean Example:
const shouldExecute = executor.evaluateCondition(
  '${validate-order.output.valid === true}'
);

if (shouldExecute) {
  // Execute next member
}

resolveExpression()

Resolve an expression to its value in the current context.
resolveExpression(expression: string): any
expression
string
required
Expression to resolve (e.g., ${input.userId})
Returns: any - Resolved value Example:
const userId = executor.resolveExpression('${input.userId}');
const total = executor.resolveExpression('${calculate-total.output.amount}');

getState()

Get the current execution state.
getState(): ExecutionState
Returns: ExecutionState
interface ExecutionState {
  id: string;
  status: 'running' | 'waiting' | 'paused' | 'completed' | 'failed';
  input: Record<string, any>;
  context: Record<string, any>;
  memberOutputs: Map<string, any>;
  currentMember?: string;
  startTime: number;
  metadata: Record<string, any>;
}
Example:
const state = executor.getState();
console.log(`Current member: ${state.currentMember}`);
console.log(`Status: ${state.status}`);

cancel()

Cancel the execution.
cancel(reason?: string): void
reason
string
Optional cancellation reason
Example:
// Cancel after timeout
setTimeout(() => {
  executor.cancel('Timeout exceeded');
}, 30000);

pause()

Pause execution (for HITL or async operations).
async pause(options: PauseOptions): Promise<string>
options
PauseOptions
required
Pause configuration
options.reason
string
required
Reason for pausing
options.member
string
required
Member that triggered the pause
options.expiresAt
number
Expiration timestamp
Returns: Promise<string> - Resume token Example:
const token = await executor.pause({
  reason: 'waiting_for_approval',
  member: 'approval-step',
  expiresAt: Date.now() + 24 * 60 * 60 * 1000 // 24 hours
});

console.log(`Resume with token: ${token}`);

resume()

Resume a paused execution.
async resume(token: string, input: Record<string, any>): Promise<ExecutionResult>
token
string
required
Resume token from pause()
input
object
required
Input data to resume with
Returns: Promise<ExecutionResult> Example:
const result = await executor.resume('hitl_token_123', {
  approved: true,
  comments: 'Looks good'
});

Properties

id

readonly id: string
Unique execution identifier.

status

readonly status: ExecutionStatus
Current execution status: running, waiting, paused, completed, failed, cancelled

ensemble

readonly ensemble: ParsedEnsemble
The parsed ensemble being executed.

startTime

readonly startTime: number
Execution start timestamp.

duration

readonly duration: number
Elapsed execution time in milliseconds.

Events

The Executor emits events during execution that can be subscribed to:
executor.on('member.start', (data) => {
  console.log(`Starting member: ${data.member}`);
});

executor.on('member.complete', (data) => {
  console.log(`Completed member: ${data.member}`, data.output);
});

executor.on('member.error', (data) => {
  console.error(`Error in member: ${data.member}`, data.error);
});

executor.on('execution.complete', (result) => {
  console.log('Execution completed:', result);
});

Event Types

EventDataDescription
member.start{ member, timestamp }Member execution started
member.complete{ member, output, duration }Member completed successfully
member.error{ member, error }Member execution failed
member.skip{ member, reason }Member skipped (condition)
execution.pause{ member, token }Execution paused (HITL)
execution.resume{ token }Execution resumed
execution.completeExecutionResultExecution finished
execution.errorExecutionErrorExecution failed

Error Handling

The Executor provides comprehensive error handling:
try {
  const result = await executor.execute();
} catch (error) {
  if (error instanceof ExecutionError) {
    console.error('Execution failed:', {
      code: error.code,
      message: error.message,
      member: error.member,
      stack: error.stack
    });
  }
}

Error Types

class ExecutionError extends Error {
  code: string;
  member?: string;
  details?: Record<string, any>;
}
Common error codes:
  • TIMEOUT - Execution exceeded timeout
  • MEMBER_ERROR - Member execution failed
  • VALIDATION_ERROR - Input validation failed
  • CONDITION_ERROR - Condition evaluation failed
  • EXPRESSION_ERROR - Expression resolution failed
  • CANCELLED - Execution cancelled

Advanced Usage

Nested Workflows

Execute ensembles within ensembles:
const parentExecutor = new Executor({
  env,
  ensemble: parentEnsemble,
  input: { userId: 'user_123' }
});

// Child executor inherits parent context
const childExecutor = new Executor({
  env,
  ensemble: childEnsemble,
  input: { orderId: 'ORD-123' },
  parentExecutionId: parentExecutor.id
});

const childResult = await childExecutor.execute();

Custom Context

Pass additional context to the execution:
const executor = new Executor({
  env,
  ensemble: parsedEnsemble,
  input: { userId: 'user_123' },
  context: {
    requestId: 'req_abc123',
    userAgent: 'Mozilla/5.0...',
    ip: '192.168.1.1',
    metadata: {
      source: 'api',
      version: '1.0.0'
    }
  }
});
Access context in members:
// In member input
input:
  userId: ${input.userId}
  requestId: ${context.requestId}
  userAgent: ${context.userAgent}

Progress Tracking

Track execution progress:
const executor = new Executor({ env, ensemble, input });

let completed = 0;
const total = ensemble.flow.length;

executor.on('member.complete', () => {
  completed++;
  const progress = (completed / total * 100).toFixed(1);
  console.log(`Progress: ${progress}%`);
});

await executor.execute();

Streaming Results

Stream execution progress via SSE:
const stream = new ReadableStream({
  async start(controller) {
    const executor = new Executor({ env, ensemble, input });

    executor.on('member.complete', (data) => {
      controller.enqueue(
        `data: ${JSON.stringify({ type: 'member.complete', data })}\n\n`
      );
    });

    executor.on('execution.complete', (result) => {
      controller.enqueue(
        `data: ${JSON.stringify({ type: 'complete', data: result })}\n\n`
      );
      controller.close();
    });

    await executor.execute();
  }
});

return new Response(stream, {
  headers: {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  }
});

Best Practices

  1. Set appropriate timeouts - Prevent long-running executions
  2. Handle errors gracefully - Catch and log all errors
  3. Use events for monitoring - Track execution progress
  4. Clean up resources - Cancel executions when needed
  5. Validate input - Check input before execution
  6. Log execution context - Include execution ID in logs
  7. Monitor performance - Track duration and identify bottlenecks
  8. Use custom context - Pass request metadata
  9. Test error scenarios - Verify error handling
  10. Document expectations - Clear input/output contracts