Skip to main content

Overview

The GraphExecutor extends the base Executor to execute workflows as directed acyclic graphs (DAGs), enabling parallel execution of independent members and optimized workflow scheduling.
import { GraphExecutor } from '@ensemble-edge/conductor';

const executor = new GraphExecutor({
  env,
  ensemble: parsedEnsemble,
  input: { userId: 'user_123' },
  maxParallelism: 10
});

const result = await executor.execute();

Constructor

new GraphExecutor(options: GraphExecutorOptions)
options
GraphExecutorOptions
required
Graph executor configuration (extends ExecutorOptions)
options.env
Env
required
Cloudflare Workers environment bindings
options.ensemble
ParsedEnsemble
required
Parsed ensemble definition
options.input
object
Initial input data
options.maxParallelism
number
default:"5"
Maximum concurrent member executions
options.scheduler
string
default:"topological"
Scheduling algorithm: topological, priority, resource-aware
options.optimizeGraph
boolean
default:"true"
Enable graph optimization
interface GraphExecutorOptions extends ExecutorOptions {
  maxParallelism?: number;
  scheduler?: 'topological' | 'priority' | 'resource-aware';
  optimizeGraph?: boolean;
}
Example:
const executor = new GraphExecutor({
  env,
  ensemble: parsedEnsemble,
  input: { userId: 'user_123' },
  maxParallelism: 10,
  scheduler: 'priority',
  optimizeGraph: true
});

Methods

execute()

Execute the workflow as a DAG with parallelization.
async execute(): Promise<ExecutionResult>
Returns: Promise<ExecutionResult> The graph executor automatically:
  1. Builds dependency graph from ensemble
  2. Identifies independent members
  3. Schedules parallel execution
  4. Manages resource allocation
  5. Handles errors and retries
Example:
const result = await executor.execute();

console.log(`Executed ${result.metadata.totalMembers} members`);
console.log(`${result.metadata.parallelExecutions} ran in parallel`);
console.log(`Duration: ${result.duration}ms`);

buildGraph()

Build execution graph from ensemble.
buildGraph(): ExecutionGraph
Returns: ExecutionGraph
interface ExecutionGraph {
  nodes: Map<string, GraphNode>;
  edges: Map<string, string[]>;
  roots: string[];
  leaves: string[];
  layers: string[][];
}

interface GraphNode {
  member: string;
  dependencies: string[];
  dependents: string[];
  layer: number;
  priority: number;
  estimatedDuration?: number;
}
Example:
const graph = executor.buildGraph();

console.log('Root nodes:', graph.roots);
console.log('Execution layers:', graph.layers.length);

// Visualize dependencies
graph.nodes.forEach((node, member) => {
  console.log(`${member} depends on:`, node.dependencies);
});

getExecutionPlan()

Get optimized execution plan.
getExecutionPlan(): ExecutionPlan
Returns: ExecutionPlan
interface ExecutionPlan {
  stages: ExecutionStage[];
  estimatedDuration: number;
  parallelism: number;
  criticalPath: string[];
}

interface ExecutionStage {
  members: string[];
  parallel: boolean;
  estimatedDuration: number;
}
Example:
const plan = executor.getExecutionPlan();

console.log('Execution stages:');
plan.stages.forEach((stage, i) => {
  console.log(`Stage ${i + 1}:`, stage.members);
  console.log(`Parallel: ${stage.parallel}`);
});

console.log('Critical path:', plan.criticalPath);
console.log('Estimated duration:', plan.estimatedDuration);

getCriticalPath()

Find critical path through the graph.
getCriticalPath(): string[]
Returns: string[] - Member names on critical path The critical path is the longest sequence of dependent members that determines minimum execution time. Example:
const criticalPath = executor.getCriticalPath();

console.log('Critical path (slowest sequence):');
criticalPath.forEach(member => {
  console.log(`  → ${member}`);
});

visualize()

Generate graph visualization (DOT format).
visualize(options?: VisualizeOptions): string
options
VisualizeOptions
Visualization options
options.format
string
default:"dot"
Output format: dot, mermaid, ascii
options.showDurations
boolean
default:"false"
Show member durations
options.highlightCriticalPath
boolean
default:"true"
Highlight critical path
Returns: string - Graph visualization Example:
const dot = executor.visualize({
  format: 'dot',
  highlightCriticalPath: true
});

console.log(dot);
// digraph {
//   "member-1" -> "member-2"
//   "member-1" -> "member-3"
//   "member-2" -> "member-4" [color=red]
//   ...
// }

// Render with Graphviz
// dot -Tpng -o graph.png

analyzeDependencies()

Analyze member dependencies.
analyzeDependencies(member: string): DependencyAnalysis
member
string
required
Member name to analyze
Returns: DependencyAnalysis
interface DependencyAnalysis {
  member: string;
  directDependencies: string[];
  transitiveDependencies: string[];
  dependents: string[];
  layer: number;
  canRunInParallel: string[];
  blockedBy: string[];
}
Example:
const analysis = executor.analyzeDependencies('process-payment');

console.log('Direct dependencies:', analysis.directDependencies);
console.log('Can run in parallel with:', analysis.canRunInParallel);
console.log('Execution layer:', analysis.layer);

optimizeGraph()

Optimize execution graph.
optimizeGraph(): OptimizationResult
Returns: OptimizationResult
interface OptimizationResult {
  removedNodes: string[];
  mergedNodes: Array<[string, string]>;
  reorderedNodes: Array<[string, number]>;
  estimatedSpeedup: number;
}
Optimizations include:
  • Removing unreachable nodes
  • Merging small sequential members
  • Reordering for better parallelization
  • Resource-aware scheduling
Example:
const result = executor.optimizeGraph();

console.log(`Removed ${result.removedNodes.length} unreachable members`);
console.log(`Estimated speedup: ${result.estimatedSpeedup.toFixed(2)}x`);

Graph Scheduling

Topological Scheduler

Default scheduler that executes members in topological order:
const executor = new GraphExecutor({
  env,
  ensemble,
  input,
  scheduler: 'topological'
});
Guarantees:
  • Dependencies executed before dependents
  • Maximum parallelization
  • Deterministic ordering

Priority Scheduler

Prioritizes critical path and high-priority members:
const executor = new GraphExecutor({
  env,
  ensemble,
  input,
  scheduler: 'priority'
});
Priority factors:
  • Critical path membership
  • Number of dependents
  • Estimated duration
  • Explicit priority annotations

Resource-Aware Scheduler

Optimizes based on resource availability:
const executor = new GraphExecutor({
  env,
  ensemble,
  input,
  scheduler: 'resource-aware',
  maxParallelism: 10
});
Considers:
  • CPU limits
  • Memory constraints
  • API rate limits
  • Worker concurrency

Parallel Execution

Automatic Parallelization

The graph executor automatically identifies and executes independent members in parallel:
flow:
  # These run in parallel (no dependencies)
  - member: fetch-user-data
    type: API

  - member: fetch-product-data
    type: API

  - member: fetch-inventory-data
    type: API

  # This waits for all three to complete
  - member: combine-data
    type: Function
    input:
      userData: ${fetch-user-data.output}
      productData: ${fetch-product-data.output}
      inventoryData: ${fetch-inventory-data.output}

Controlling Parallelism

// Limit concurrent executions
const executor = new GraphExecutor({
  env,
  ensemble,
  input,
  maxParallelism: 3 // Max 3 members at once
});

Explicit Parallel Groups

flow:
  - member: validate-input
    type: Function

  # Explicit parallel execution
  - parallel:
      - member: process-a
        type: Function
      - member: process-b
        type: Function
      - member: process-c
        type: Function

  - member: merge-results
    type: Function

Performance Metrics

The graph executor tracks detailed performance metrics:
const result = await executor.execute();

console.log('Performance metrics:', {
  totalDuration: result.duration,
  serialDuration: result.metadata.serialDuration,
  speedup: result.metadata.speedup,
  parallelExecutions: result.metadata.parallelExecutions,
  maxConcurrency: result.metadata.maxConcurrency,
  avgConcurrency: result.metadata.avgConcurrency,
  cpuTime: result.metadata.cpuTime,
  waitTime: result.metadata.waitTime
});

Error Handling

Partial Failures

When a member fails, the graph executor:
  1. Marks member as failed
  2. Skips all dependent members
  3. Continues executing independent members
  4. Returns partial results
const result = await executor.execute();

if (result.status === 'failed') {
  console.log('Failed member:', result.error.member);
  console.log('Completed members:', result.metadata.completedMembers);
  console.log('Skipped members:', result.metadata.skippedMembers);
}

Retry Strategies

Configure per-member retry with graph awareness:
flow:
  - member: flaky-api-call
    type: API
    retry:
      maxAttempts: 3
      backoff: exponential
      graphAware: true  # Don't block parallel execution during retries

Advanced Features

Dynamic Graph Modification

Modify the graph during execution:
executor.on('member.complete', (data) => {
  if (data.member === 'check-condition' && data.output.shouldAddStep) {
    // Dynamically add member to graph
    executor.addMember({
      member: 'dynamic-step',
      type: 'Function',
      dependencies: ['check-condition']
    });
  }
});

Subgraph Execution

Execute a portion of the graph:
// Execute only members in specific subgraph
const result = await executor.executeSubgraph({
  roots: ['member-1', 'member-2'],
  maxDepth: 3,
  stopAt: ['member-5']
});

Graph Comparison

Compare two ensemble graphs:
import { compareGraphs } from '@ensemble-edge/conductor';

const diff = compareGraphs(
  oldEnsemble,
  newEnsemble
);

console.log('Added members:', diff.added);
console.log('Removed members:', diff.removed);
console.log('Modified dependencies:', diff.modifiedDependencies);

Graph Metrics

Analyze graph properties:
const metrics = executor.getGraphMetrics();

console.log({
  nodes: metrics.nodeCount,
  edges: metrics.edgeCount,
  layers: metrics.layerCount,
  maxWidth: metrics.maxWidth, // Max parallel members
  avgDegree: metrics.avgDegree,
  cyclomaticComplexity: metrics.complexity
});

Testing

import { GraphExecutor } from '@ensemble-edge/conductor';
import { describe, it, expect } from 'vitest';

describe('GraphExecutor', () => {
  it('executes independent members in parallel', async () => {
    const ensemble = {
      name: 'parallel-test',
      flow: [
        { member: 'a', type: 'Function' },
        { member: 'b', type: 'Function' },
        { member: 'c', type: 'Function' }
      ]
    };

    const executor = new GraphExecutor({
      env: getMockEnv(),
      ensemble,
      input: {}
    });

    const result = await executor.execute();

    expect(result.status).toBe('completed');
    expect(result.metadata.parallelExecutions).toBeGreaterThan(0);
  });

  it('respects dependencies', async () => {
    const ensemble = {
      name: 'dependency-test',
      flow: [
        { member: 'a', type: 'Function' },
        {
          member: 'b',
          type: 'Function',
          input: { data: '${a.output}' }
        }
      ]
    };

    const executor = new GraphExecutor({
      env: getMockEnv(),
      ensemble,
      input: {}
    });

    const executionOrder: string[] = [];

    executor.on('member.start', (data) => {
      executionOrder.push(data.member);
    });

    await executor.execute();

    expect(executionOrder).toEqual(['a', 'b']);
  });

  it('finds critical path', () => {
    const executor = new GraphExecutor({
      env: getMockEnv(),
      ensemble,
      input: {}
    });

    const criticalPath = executor.getCriticalPath();

    expect(criticalPath).toContain('slowest-member');
  });
});

Best Practices

  1. Minimize dependencies - Enable more parallelization
  2. Use priority hints - For critical members
  3. Set realistic timeouts - Per member and total
  4. Monitor concurrency - Adjust maxParallelism
  5. Analyze critical path - Optimize slowest sequence
  6. Handle partial failures - Expect some members to fail
  7. Test with various graphs - Different topologies
  8. Profile execution - Identify bottlenecks
  9. Visualize graphs - Understand dependencies
  10. Document dependencies - Make implicit explicit

Visualization Examples

DOT Format

const dot = executor.visualize({ format: 'dot' });
// Render with: dot -Tpng -o graph.png

Mermaid

const mermaid = executor.visualize({ format: 'mermaid' });
// graph TD
//   A[validate] --> B[process]
//   A --> C[transform]
//   B --> D[output]
//   C --> D

ASCII

const ascii = executor.visualize({ format: 'ascii' });
//     validate
//      /   \
// process transform
//      \   /
//      output