Overview
Parallel processing executes multiple independent members simultaneously, dramatically reducing total execution time. Perfect for gathering data from multiple sources, processing batches, or running independent analyses concurrently.Basic Parallel Execution
Copy
name: parallel-fetch
description: Fetch data from multiple sources concurrently
flow:
# Execute all three members in parallel
parallel:
- member: fetch-user-data
input:
userId: ${input.userId}
- member: fetch-order-history
input:
userId: ${input.userId}
- member: fetch-preferences
input:
userId: ${input.userId}
# Combine results (runs after parallel completion)
- member: combine-data
input:
userData: ${fetch-user-data.output}
orders: ${fetch-order-history.output}
preferences: ${fetch-preferences.output}
output:
combined: ${combine-data.output}
- Sequential: ~3 seconds (1s + 1s + 1s)
- Parallel: ~1 second (max of all three)
- 3x faster!
API Calls in Parallel
Fetch from multiple APIs simultaneously:Copy
name: gather-company-data
description: Gather data from multiple sources
flow:
parallel:
# Official company website
- member: scrape-website
input:
url: ${input.domain}
output: markdown
# LinkedIn profile
- member: fetch-linkedin
input:
companyName: ${input.companyName}
# Crunchbase data
- member: fetch-crunchbase
input:
companyName: ${input.companyName}
# News articles
- member: fetch-news
input:
query: ${input.companyName}
limit: 5
# Analyze all gathered data
- member: analyze-company
type: Think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
input:
websiteContent: ${scrape-website.output.content}
linkedinData: ${fetch-linkedin.output}
crunchbaseData: ${fetch-crunchbase.output}
newsArticles: ${fetch-news.output.articles}
output:
analysis: ${analyze-company.output}
AI Analysis in Parallel
Run multiple AI analyses concurrently:Copy
name: multi-perspective-analysis
description: Analyze from multiple perspectives simultaneously
flow:
parallel:
# Financial analysis
- member: analyze-financials
type: Think
config:
provider: openai
model: gpt-4o
systemPrompt: "You are a financial analyst."
input:
data: ${input.companyData}
# Market analysis
- member: analyze-market
type: Think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
systemPrompt: "You are a market analyst."
input:
data: ${input.companyData}
# Technical analysis
- member: analyze-technical
type: Think
config:
provider: openai
model: gpt-4o
systemPrompt: "You are a technical analyst."
input:
data: ${input.companyData}
# Competitive analysis
- member: analyze-competition
type: Think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
systemPrompt: "You are a competitive analyst."
input:
data: ${input.companyData}
# Synthesize all perspectives
- member: synthesize-analysis
type: Think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
input:
financial: ${analyze-financials.output}
market: ${analyze-market.output}
technical: ${analyze-technical.output}
competitive: ${analyze-competition.output}
output:
comprehensive: ${synthesize-analysis.output}
Batch Processing
Process array items in parallel:Copy
name: process-batch
description: Process multiple items concurrently
flow:
# Process each item in parallel
- member: process-item
foreach: ${input.items}
parallel: true # Enable parallel processing
input:
item: ${item}
output:
results: ${process-item.outputs} # Array of all results
With Concurrency Limit
Copy
flow:
- member: process-item
foreach: ${input.items}
parallel: true
concurrency: 5 # Max 5 concurrent executions
Mixed Parallel and Sequential
Combine parallel and sequential execution:Copy
name: complex-workflow
description: Mix of parallel and sequential steps
flow:
# Step 1: Fetch user data (sequential)
- member: fetch-user
input:
userId: ${input.userId}
# Step 2: Fetch related data in parallel
parallel:
- member: fetch-orders
input:
userId: ${input.userId}
- member: fetch-reviews
input:
userId: ${input.userId}
- member: fetch-recommendations
input:
userId: ${input.userId}
# Step 3: Process in parallel
parallel:
- member: analyze-orders
input:
orders: ${fetch-orders.output}
- member: analyze-reviews
input:
reviews: ${fetch-reviews.output}
# Step 4: Generate final report (sequential)
- member: generate-report
input:
userData: ${fetch-user.output}
orderAnalysis: ${analyze-orders.output}
reviewAnalysis: ${analyze-reviews.output}
recommendations: ${fetch-recommendations.output}
output:
report: ${generate-report.output}
Error Handling in Parallel
Copy
flow:
parallel:
- member: task-1
continue_on_error: true # Don't fail entire parallel block
- member: task-2
retry:
maxAttempts: 3
- member: task-3
# Check which tasks succeeded
- member: handle-results
input:
task1Success: ${task-1.success}
task2Success: ${task-2.success}
task3Success: ${task-3.success}
Real-World Example: E-commerce Checkout
Copy
name: checkout-process
description: Process checkout with parallel validation and preparation
flow:
# Validate everything in parallel
parallel:
- member: validate-cart
input:
cart: ${input.cart}
- member: validate-payment
input:
paymentMethod: ${input.paymentMethod}
- member: validate-shipping
input:
address: ${input.shippingAddress}
- member: check-inventory
input:
items: ${input.cart.items}
# If all validations pass, process in parallel
- member: check-all-valid
input:
validations:
cart: ${validate-cart.output.valid}
payment: ${validate-payment.output.valid}
shipping: ${validate-shipping.output.valid}
inventory: ${check-inventory.output.available}
# Process payment and create shipment simultaneously
parallel:
- member: process-payment
condition: ${check-all-valid.output.allValid}
input:
amount: ${input.cart.total}
paymentMethod: ${input.paymentMethod}
- member: create-shipment
condition: ${check-all-valid.output.allValid}
input:
items: ${input.cart.items}
address: ${input.shippingAddress}
# Send notifications in parallel
parallel:
- member: send-confirmation-email
input:
email: ${input.customerEmail}
orderId: ${process-payment.output.orderId}
- member: send-sms
input:
phone: ${input.customerPhone}
orderId: ${process-payment.output.orderId}
- member: update-analytics
input:
orderId: ${process-payment.output.orderId}
amount: ${input.cart.total}
output:
orderId: ${process-payment.output.orderId}
trackingNumber: ${create-shipment.output.trackingNumber}
success: true
Performance Comparison
Sequential Execution
Copy
# Total time: 5 seconds
flow:
- member: task-1 # 1 second
- member: task-2 # 1 second
- member: task-3 # 1 second
- member: task-4 # 1 second
- member: task-5 # 1 second
Parallel Execution
Copy
# Total time: 1 second
flow:
parallel:
- member: task-1 # 1 second
- member: task-2 # 1 second
- member: task-3 # 1 second
- member: task-4 # 1 second
- member: task-5 # 1 second
Mixed Execution
Copy
# Total time: 3 seconds
flow:
- member: task-1 # 1 second
parallel: # 1 second (max of parallel)
- member: task-2
- member: task-3
- member: task-4
- member: task-5 # 1 second
Best Practices
1. Parallelize Independent Tasks
Copy
# ✅ Good - tasks don't depend on each other
parallel:
- member: fetch-user
- member: fetch-settings
- member: fetch-config
# ❌ Bad - task-2 depends on task-1
parallel:
- member: fetch-user
- member: analyze-user # Needs fetch-user output
2. Use State for Shared Data
Copy
flow:
# Fetch shared data first
- member: fetch-base-data
state:
set: [baseData]
# Use shared data in parallel
parallel:
- member: process-a
state:
use: [baseData]
- member: process-b
state:
use: [baseData]
3. Set Concurrency Limits
Copy
# ✅ Good - limit concurrent API calls
foreach: ${input.items}
parallel: true
concurrency: 10 # Max 10 at once
# ❌ Bad - might hit rate limits
foreach: ${input.items}
parallel: true # Could spawn 1000s
4. Handle Partial Failures
Copy
parallel:
- member: critical-task
# Fail if this fails
- member: optional-task
continue_on_error: true # Don't fail workflow
Testing Parallel Execution
Copy
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('parallel-fetch', () => {
it('should execute members in parallel', async () => {
const conductor = await TestConductor.create();
const startTime = performance.now();
const result = await conductor.executeEnsemble('parallel-fetch', {
userId: 123
});
const duration = performance.now() - startTime;
expect(result).toBeSuccessful();
expect(result).toHaveExecutedMember('fetch-user-data');
expect(result).toHaveExecutedMember('fetch-order-history');
expect(result).toHaveExecutedMember('fetch-preferences');
// Should be faster than sequential (< 2s vs 3s)
expect(duration).toBeLessThan(2000);
});
});

