Skip to main content

Overview

Parallel processing executes multiple independent members simultaneously, dramatically reducing total execution time. Perfect for gathering data from multiple sources, processing batches, or running independent analyses concurrently.

Basic Parallel Execution

name: parallel-fetch
description: Fetch data from multiple sources concurrently

flow:
  # Execute all three members in parallel
  parallel:
    - member: fetch-user-data
      input:
        userId: ${input.userId}

    - member: fetch-order-history
      input:
        userId: ${input.userId}

    - member: fetch-preferences
      input:
        userId: ${input.userId}

  # Combine results (runs after parallel completion)
  - member: combine-data
    input:
      userData: ${fetch-user-data.output}
      orders: ${fetch-order-history.output}
      preferences: ${fetch-preferences.output}

output:
  combined: ${combine-data.output}
Performance:
  • Sequential: ~3 seconds (1s + 1s + 1s)
  • Parallel: ~1 second (max of all three)
  • 3x faster!

API Calls in Parallel

Fetch from multiple APIs simultaneously:
name: gather-company-data
description: Gather data from multiple sources

flow:
  parallel:
    # Official company website
    - member: scrape-website
      input:
        url: ${input.domain}
        output: markdown

    # LinkedIn profile
    - member: fetch-linkedin
      input:
        companyName: ${input.companyName}

    # Crunchbase data
    - member: fetch-crunchbase
      input:
        companyName: ${input.companyName}

    # News articles
    - member: fetch-news
      input:
        query: ${input.companyName}
        limit: 5

  # Analyze all gathered data
  - member: analyze-company
    type: Think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
    input:
      websiteContent: ${scrape-website.output.content}
      linkedinData: ${fetch-linkedin.output}
      crunchbaseData: ${fetch-crunchbase.output}
      newsArticles: ${fetch-news.output.articles}

output:
  analysis: ${analyze-company.output}

AI Analysis in Parallel

Run multiple AI analyses concurrently:
name: multi-perspective-analysis
description: Analyze from multiple perspectives simultaneously

flow:
  parallel:
    # Financial analysis
    - member: analyze-financials
      type: Think
      config:
        provider: openai
        model: gpt-4o
        systemPrompt: "You are a financial analyst."
      input:
        data: ${input.companyData}

    # Market analysis
    - member: analyze-market
      type: Think
      config:
        provider: anthropic
        model: claude-3-5-sonnet-20241022
        systemPrompt: "You are a market analyst."
      input:
        data: ${input.companyData}

    # Technical analysis
    - member: analyze-technical
      type: Think
      config:
        provider: openai
        model: gpt-4o
        systemPrompt: "You are a technical analyst."
      input:
        data: ${input.companyData}

    # Competitive analysis
    - member: analyze-competition
      type: Think
      config:
        provider: anthropic
        model: claude-3-5-sonnet-20241022
        systemPrompt: "You are a competitive analyst."
      input:
        data: ${input.companyData}

  # Synthesize all perspectives
  - member: synthesize-analysis
    type: Think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
    input:
      financial: ${analyze-financials.output}
      market: ${analyze-market.output}
      technical: ${analyze-technical.output}
      competitive: ${analyze-competition.output}

output:
  comprehensive: ${synthesize-analysis.output}

Batch Processing

Process array items in parallel:
name: process-batch
description: Process multiple items concurrently

flow:
  # Process each item in parallel
  - member: process-item
    foreach: ${input.items}
    parallel: true  # Enable parallel processing
    input:
      item: ${item}

output:
  results: ${process-item.outputs}  # Array of all results

With Concurrency Limit

flow:
  - member: process-item
    foreach: ${input.items}
    parallel: true
    concurrency: 5  # Max 5 concurrent executions

Mixed Parallel and Sequential

Combine parallel and sequential execution:
name: complex-workflow
description: Mix of parallel and sequential steps

flow:
  # Step 1: Fetch user data (sequential)
  - member: fetch-user
    input:
      userId: ${input.userId}

  # Step 2: Fetch related data in parallel
  parallel:
    - member: fetch-orders
      input:
        userId: ${input.userId}

    - member: fetch-reviews
      input:
        userId: ${input.userId}

    - member: fetch-recommendations
      input:
        userId: ${input.userId}

  # Step 3: Process in parallel
  parallel:
    - member: analyze-orders
      input:
        orders: ${fetch-orders.output}

    - member: analyze-reviews
      input:
        reviews: ${fetch-reviews.output}

  # Step 4: Generate final report (sequential)
  - member: generate-report
    input:
      userData: ${fetch-user.output}
      orderAnalysis: ${analyze-orders.output}
      reviewAnalysis: ${analyze-reviews.output}
      recommendations: ${fetch-recommendations.output}

output:
  report: ${generate-report.output}

Error Handling in Parallel

flow:
  parallel:
    - member: task-1
      continue_on_error: true  # Don't fail entire parallel block

    - member: task-2
      retry:
        maxAttempts: 3

    - member: task-3

  # Check which tasks succeeded
  - member: handle-results
    input:
      task1Success: ${task-1.success}
      task2Success: ${task-2.success}
      task3Success: ${task-3.success}

Real-World Example: E-commerce Checkout

name: checkout-process
description: Process checkout with parallel validation and preparation

flow:
  # Validate everything in parallel
  parallel:
    - member: validate-cart
      input:
        cart: ${input.cart}

    - member: validate-payment
      input:
        paymentMethod: ${input.paymentMethod}

    - member: validate-shipping
      input:
        address: ${input.shippingAddress}

    - member: check-inventory
      input:
        items: ${input.cart.items}

  # If all validations pass, process in parallel
  - member: check-all-valid
    input:
      validations:
        cart: ${validate-cart.output.valid}
        payment: ${validate-payment.output.valid}
        shipping: ${validate-shipping.output.valid}
        inventory: ${check-inventory.output.available}

  # Process payment and create shipment simultaneously
  parallel:
    - member: process-payment
      condition: ${check-all-valid.output.allValid}
      input:
        amount: ${input.cart.total}
        paymentMethod: ${input.paymentMethod}

    - member: create-shipment
      condition: ${check-all-valid.output.allValid}
      input:
        items: ${input.cart.items}
        address: ${input.shippingAddress}

  # Send notifications in parallel
  parallel:
    - member: send-confirmation-email
      input:
        email: ${input.customerEmail}
        orderId: ${process-payment.output.orderId}

    - member: send-sms
      input:
        phone: ${input.customerPhone}
        orderId: ${process-payment.output.orderId}

    - member: update-analytics
      input:
        orderId: ${process-payment.output.orderId}
        amount: ${input.cart.total}

output:
  orderId: ${process-payment.output.orderId}
  trackingNumber: ${create-shipment.output.trackingNumber}
  success: true

Performance Comparison

Sequential Execution

# Total time: 5 seconds
flow:
  - member: task-1  # 1 second
  - member: task-2  # 1 second
  - member: task-3  # 1 second
  - member: task-4  # 1 second
  - member: task-5  # 1 second

Parallel Execution

# Total time: 1 second
flow:
  parallel:
    - member: task-1  # 1 second
    - member: task-2  # 1 second
    - member: task-3  # 1 second
    - member: task-4  # 1 second
    - member: task-5  # 1 second

Mixed Execution

# Total time: 3 seconds
flow:
  - member: task-1  # 1 second

  parallel:          # 1 second (max of parallel)
    - member: task-2
    - member: task-3
    - member: task-4

  - member: task-5  # 1 second

Best Practices

1. Parallelize Independent Tasks

# ✅ Good - tasks don't depend on each other
parallel:
  - member: fetch-user
  - member: fetch-settings
  - member: fetch-config

# ❌ Bad - task-2 depends on task-1
parallel:
  - member: fetch-user
  - member: analyze-user  # Needs fetch-user output

2. Use State for Shared Data

flow:
  # Fetch shared data first
  - member: fetch-base-data
    state:
      set: [baseData]

  # Use shared data in parallel
  parallel:
    - member: process-a
      state:
        use: [baseData]

    - member: process-b
      state:
        use: [baseData]

3. Set Concurrency Limits

# ✅ Good - limit concurrent API calls
foreach: ${input.items}
parallel: true
concurrency: 10  # Max 10 at once

# ❌ Bad - might hit rate limits
foreach: ${input.items}
parallel: true  # Could spawn 1000s

4. Handle Partial Failures

parallel:
  - member: critical-task
    # Fail if this fails

  - member: optional-task
    continue_on_error: true  # Don't fail workflow

Testing Parallel Execution

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('parallel-fetch', () => {
  it('should execute members in parallel', async () => {
    const conductor = await TestConductor.create();

    const startTime = performance.now();

    const result = await conductor.executeEnsemble('parallel-fetch', {
      userId: 123
    });

    const duration = performance.now() - startTime;

    expect(result).toBeSuccessful();
    expect(result).toHaveExecutedMember('fetch-user-data');
    expect(result).toHaveExecutedMember('fetch-order-history');
    expect(result).toHaveExecutedMember('fetch-preferences');

    // Should be faster than sequential (< 2s vs 3s)
    expect(duration).toBeLessThan(2000);
  });
});