Skip to main content

Core Classes

Core runtime classes for programmatic ensemble execution.

Conductor

Main orchestration class for executing ensembles and agents programmatically.

Constructor

import { Conductor } from '@ensemble/conductor';

const conductor = new Conductor(options: ConductorOptions);

ConductorOptions

interface ConductorOptions {
  env: Env;                    // Cloudflare Workers environment bindings
  projectPath?: string;        // Path to Conductor project
  timeout?: number;            // Default timeout (ms, default: 30000)
  enableLogging?: boolean;     // Enable execution logging
  enableTracing?: boolean;     // Enable distributed tracing
}

Env

interface Env {
  // Cloudflare bindings
  AI?: AIBinding;
  DB?: D1Database;
  CACHE?: KVNamespace;
  STORAGE?: R2Bucket;
  VECTORIZE?: VectorizeIndex;
  
  // Secrets
  OPENAI_API_KEY?: string;
  ANTHROPIC_API_KEY?: string;
  GROQ_API_KEY?: string;
  
  // Environment variables
  [key: string]: any;
}

Methods

execute()

Execute an ensemble workflow.
async execute(
  ensemble: string,
  inputs: Record<string, any>,
  options?: ExecutionOptions
): Promise<ExecutionResult>
Parameters:
  • ensemble - Ensemble name
  • inputs - Input data
  • options - Execution options
Returns: Promise<ExecutionResult>
interface ExecutionResult {
  id: string;
  status: 'completed' | 'failed' | 'timeout';
  output?: Record<string, any>;
  error?: ExecutionError;
  duration: number;
  timestamp: number;
  trace?: ExecutionTrace;
}
Example:
const conductor = new Conductor({ env });

const result = await conductor.execute('user-onboarding', {
  email: 'alice@example.com',
  name: 'Alice'
});

if (result.status === 'completed') {
  console.log('User ID:', result.output.userId);
} else {
  console.error('Failed:', result.error.message);
}

executeAgent()

Execute a single agent.
async executeAgent(
  agent: string,
  inputs: Record<string, any>,
  options?: ExecutionOptions
): Promise<AgentResult>
Example:
const result = await conductor.executeAgent('company-enricher', {
  company_name: 'Anthropic'
});

getState()

Get current ensemble state.
async getState(ensembleId: string): Promise<State>
Example:
const state = await conductor.getState('exec-123...');
console.log('History:', state.history);

setState()

Update ensemble state.
async setState(
  ensembleId: string,
  state: Record<string, any>
): Promise<void>
Example:
await conductor.setState('exec-123...', {
  userPreferences: { theme: 'dark' }
});

listEnsembles()

List available ensembles.
async listEnsembles(): Promise<Ensemble[]>
Returns:
interface Ensemble {
  name: string;
  description?: string;
  inputs: InputSchema;
  outputs: OutputSchema;
  version?: string;
}

listAgents()

List available agents.
async listAgents(): Promise<Agent[]>

getComponent()

Get a versioned component.
async getComponent(
  name: string,
  version?: string
): Promise<Component>
Example:
const prompt = await conductor.getComponent('user-prompt', 'v1.2.0');

Error Handling

try {
  const result = await conductor.execute('my-ensemble', inputs);
  
  if (result.status === 'failed') {
    // Handle execution failure
    console.error('Execution failed:', result.error);
  }
} catch (error) {
  // Handle system errors (network, timeout, etc.)
  console.error('System error:', error);
}

Executor

Low-level execution engine (typically not used directly).
import { Executor } from '@ensemble/conductor';

const executor = new Executor({
  env,
  ensemble: parsedEnsemble,
  input: { userId: '123' },
  timeout: 30000
});

const result = await executor.execute();

Parser

YAML parser for ensemble definitions.
import { Parser } from '@ensemble/conductor';

const parser = new Parser();
const parsed = await parser.parse(yamlContent);

StateManager

State management for ensemble execution.
import { StateManager } from '@ensemble/conductor';

const stateManager = new StateManager({
  schema: {
    counter: 'number',
    history: 'array'
  }
});

// Get state
const counter = stateManager.get('counter');

// Set state
stateManager.set('counter', counter + 1);

ExecutionOptions

interface ExecutionOptions {
  timeout?: number;            // Execution timeout (ms)
  executionId?: string;        // Custom execution ID
  parentExecutionId?: string;  // Parent execution (for nesting)
  enableTracing?: boolean;     // Enable tracing
  enableCaching?: boolean;     // Enable caching
  retryPolicy?: RetryPolicy;   // Custom retry policy
}

ExecutionError

interface ExecutionError {
  code: string;
  message: string;
  agent?: string;             // Agent that failed
  operation?: string;          // Operation that failed
  cause?: Error;              // Underlying error
  retryable: boolean;         // Can this error be retried?
  metadata?: Record<string, any>;
}

Best Practices

1. Reuse Conductor Instance
// Good: Single instance
const conductor = new Conductor({ env });
await conductor.execute('ensemble-1', input1);
await conductor.execute('ensemble-2', input2);

// Bad: New instance each time
await new Conductor({ env }).execute('ensemble-1', input1);
await new Conductor({ env }).execute('ensemble-2', input2);
2. Handle Errors Properly
try {
  const result = await conductor.execute('ensemble', input);
  
  if (result.status === 'completed') {
    return result.output;
  } else if (result.error?.retryable) {
    // Retry logic
    return await conductor.execute('ensemble', input);
  } else {
    throw new Error(result.error.message);
  }
} catch (error) {
  // System error handling
  throw error;
}
3. Use Type Safety
interface UserOnboardingInput {
  email: string;
  name: string;
}

interface UserOnboardingOutput {
  userId: string;
  created: boolean;
}

const result = await conductor.execute<UserOnboardingOutput>(
  'user-onboarding',
  { email: 'alice@example.com', name: 'Alice' } as UserOnboardingInput
);

// result.output is typed as UserOnboardingOutput
4. Enable Tracing in Development
const conductor = new Conductor({
  env,
  enableLogging: true,
  enableTracing: true
});

Next Steps