Overview
This example demonstrates a complete data processing pipeline that extracts data from multiple sources, transforms it, validates quality, and loads it into storage - all with parallel processing and error handling.Pipeline Architecture
Copy
Extract (Parallel) → Transform → Validate → Load → Notify
Complete Example
Copy
name: data-processing-pipeline
description: ETL pipeline with parallel extraction and validation
state:
schema:
extractedData: array
transformedData: array
validationResults: object
loadedCount: number
flow:
# Extract: Fetch from multiple sources in parallel
parallel:
- member: extract-api
type: Fetch
config:
url: "https://api.example.com/data"
headers:
Authorization: "Bearer ${env.API_KEY}"
cache:
ttl: 300
state:
set: [extractedData]
- member: extract-database
type: Data
config:
storage: d1
operation: query
query: |
SELECT * FROM source_table
WHERE updated_at > ?
input:
params: [${input.lastSync}]
state:
set: [extractedData]
- member: extract-s3
type: Fetch
config:
url: "${env.S3_URL}/data.json"
state:
set: [extractedData]
# Transform: Combine and reshape data
- member: combine-sources
type: Transform
state:
use: [extractedData]
input:
data:
api: ${extract-api.output.data}
db: ${extract-database.output.results}
s3: ${extract-s3.output.data}
expression: |
{
"records": $append(
$append(api.records[], db[]),
s3.items[]
)
}
- member: transform-data
type: Transform
input:
data: ${combine-sources.output}
expression: |
{
"transformed": records[].{
"id": id,
"name": $trim(name),
"email": $lowercase(email),
"createdAt": $fromMillis(created_timestamp),
"category": $uppercase(category),
"metadata": {
"source": source,
"processedAt": $now()
}
}
}
state:
set: [transformedData]
# Validate: Check data quality
- member: validate-data
type: Function
state:
use: [transformedData]
set: [validationResults]
input:
data: ${state.transformedData}
rules:
- field: "email"
pattern: "^[^\\s@]+@[^\\s@]+\\.[^\\s@]+$"
- field: "name"
minLength: 1
- field: "category"
enum: ["A", "B", "C"]
# Filter: Remove invalid records
- member: filter-invalid
type: Transform
state:
use: [transformedData, validationResults]
input:
data: ${state.transformedData}
validation: ${state.validationResults}
expression: |
{
"valid": transformed[validation.valid[id]],
"invalid": transformed[validation.invalid[id]]
}
# Load: Insert valid records in batches
- member: load-batch
foreach: ${filter-invalid.output.valid}
batch: 100
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO target_table (id, name, email, category, metadata)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
email = excluded.email,
category = excluded.category,
metadata = excluded.metadata,
updated_at = CURRENT_TIMESTAMP
input:
params:
- ${item.id}
- ${item.name}
- ${item.email}
- ${item.category}
- ${JSON.stringify(item.metadata)}
state:
set: [loadedCount]
# Log invalid records
- member: log-invalid
condition: ${filter-invalid.output.invalid.length > 0}
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO validation_errors (record_id, errors, created_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
input:
params:
- ${filter-invalid.output.invalid.map(r => r.id)}
- ${JSON.stringify(state.validationResults.errors)}
# Notify on completion
- member: send-notification
type: API
config:
url: "${env.SLACK_WEBHOOK_URL}"
method: POST
input:
body:
text: "Data pipeline completed"
blocks:
- type: "section"
text:
type: "mrkdwn"
text: |
*Pipeline Results*
• Extracted: ${extract-api.output.data.length + extract-database.output.results.length + extract-s3.output.data.length} records
• Valid: ${filter-invalid.output.valid.length} records
• Invalid: ${filter-invalid.output.invalid.length} records
• Loaded: ${state.loadedCount} records
output:
summary:
extracted: ${state.extractedData.length}
transformed: ${state.transformedData.length}
valid: ${filter-invalid.output.valid.length}
invalid: ${filter-invalid.output.invalid.length}
loaded: ${state.loadedCount}
validationErrors: ${state.validationResults.errors}
completedAt: ${Date.now()}
Key Patterns
1. Parallel Extraction
Copy
parallel:
- member: extract-api
type: Fetch
- member: extract-database
type: Data
- member: extract-s3
type: Fetch
2. Data Transformation
Copy
- member: transform-data
type: Transform
input:
expression: |
{
"transformed": records[].{
"id": id,
"name": $trim(name),
"email": $lowercase(email)
}
}
3. Quality Validation
Copy
- member: validate-data
type: Function
input:
rules:
- field: "email"
pattern: "^[^\\s@]+@[^\\s@]+\\.[^\\s@]+$"
- field: "name"
minLength: 1
4. Batch Loading
Copy
- member: load-batch
foreach: ${validRecords}
batch: 100
type: Data
Testing
Copy
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('data-processing-pipeline', () => {
it('should process data from multiple sources', async () => {
const conductor = await TestConductor.create({
mocks: {
http: {
responses: {
'https://api.example.com/data': {
data: { records: [{ id: 1, name: 'Test', email: 'test@example.com' }] }
}
}
},
db: {
results: [
{ id: 2, name: 'User', email: 'user@example.com' }
]
}
}
});
const result = await conductor.executeEnsemble('data-processing-pipeline', {
lastSync: Date.now() - 86400000
});
expect(result).toBeSuccessful();
expect(result.output.summary.extracted).toBeGreaterThan(0);
expect(result.output.summary.loaded).toBeGreaterThan(0);
});
it('should filter invalid records', async () => {
const conductor = await TestConductor.create({
mocks: {
data: [
{ id: 1, name: 'Valid', email: 'valid@example.com' },
{ id: 2, name: '', email: 'invalid' } // Invalid
]
}
});
const result = await conductor.executeEnsemble('data-processing-pipeline', {
lastSync: Date.now()
});
expect(result.output.summary.valid).toBe(1);
expect(result.output.summary.invalid).toBe(1);
});
});
Enhancements
Add Deduplication
Copy
- member: deduplicate
type: Transform
input:
data: ${combine-sources.output}
expression: |
{
"unique": $distinct(records[], function($r) { $r.id })
}
Add Data Enrichment
Copy
- member: enrich-data
foreach: ${transform-data.output.transformed}
type: Think
config:
provider: openai
model: gpt-4o-mini
input:
prompt: |
Enrich this customer record with industry classification:
${JSON.stringify(item)}
Add Monitoring
Copy
- member: log-metrics
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO pipeline_metrics (
pipeline_name, records_processed, duration_ms, timestamp
) VALUES (?, ?, ?, CURRENT_TIMESTAMP)
input:
params:
- "data-processing-pipeline"
- ${state.loadedCount}
- ${Date.now() - execution.startTime}
Best Practices
- Extract in parallel - Maximize throughput
- Validate early - Filter bad data before processing
- Batch operations - Efficient database inserts
- Use state management - Track progress across steps
- Handle errors gracefully - Log and continue
- Cache extracted data - Reduce API calls
- Monitor metrics - Track success rates
- Test thoroughly - Verify each stage

