Skip to main content

Overview

This example demonstrates a complete data processing pipeline that extracts data from multiple sources, transforms it, validates quality, and loads it into storage - all with parallel processing and error handling.

Pipeline Architecture

Extract (Parallel) → Transform → Validate → Load → Notify

Complete Example

name: data-processing-pipeline
description: ETL pipeline with parallel extraction and validation

state:
  schema:
    extractedData: array
    transformedData: array
    validationResults: object
    loadedCount: number

flow:
  # Extract: Fetch from multiple sources in parallel
  parallel:
    - member: extract-api
      type: Fetch
      config:
        url: "https://api.example.com/data"
        headers:
          Authorization: "Bearer ${env.API_KEY}"
      cache:
        ttl: 300
      state:
        set: [extractedData]

    - member: extract-database
      type: Data
      config:
        storage: d1
        operation: query
        query: |
          SELECT * FROM source_table
          WHERE updated_at > ?
      input:
        params: [${input.lastSync}]
      state:
        set: [extractedData]

    - member: extract-s3
      type: Fetch
      config:
        url: "${env.S3_URL}/data.json"
      state:
        set: [extractedData]

  # Transform: Combine and reshape data
  - member: combine-sources
    type: Transform
    state:
      use: [extractedData]
    input:
      data:
        api: ${extract-api.output.data}
        db: ${extract-database.output.results}
        s3: ${extract-s3.output.data}
      expression: |
        {
          "records": $append(
            $append(api.records[], db[]),
            s3.items[]
          )
        }

  - member: transform-data
    type: Transform
    input:
      data: ${combine-sources.output}
      expression: |
        {
          "transformed": records[].{
            "id": id,
            "name": $trim(name),
            "email": $lowercase(email),
            "createdAt": $fromMillis(created_timestamp),
            "category": $uppercase(category),
            "metadata": {
              "source": source,
              "processedAt": $now()
            }
          }
        }
    state:
      set: [transformedData]

  # Validate: Check data quality
  - member: validate-data
    type: Function
    state:
      use: [transformedData]
      set: [validationResults]
    input:
      data: ${state.transformedData}
      rules:
        - field: "email"
          pattern: "^[^\\s@]+@[^\\s@]+\\.[^\\s@]+$"
        - field: "name"
          minLength: 1
        - field: "category"
          enum: ["A", "B", "C"]

  # Filter: Remove invalid records
  - member: filter-invalid
    type: Transform
    state:
      use: [transformedData, validationResults]
    input:
      data: ${state.transformedData}
      validation: ${state.validationResults}
      expression: |
        {
          "valid": transformed[validation.valid[id]],
          "invalid": transformed[validation.invalid[id]]
        }

  # Load: Insert valid records in batches
  - member: load-batch
    foreach: ${filter-invalid.output.valid}
    batch: 100
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO target_table (id, name, email, category, metadata)
        VALUES (?, ?, ?, ?, ?)
        ON CONFLICT(id) DO UPDATE SET
          name = excluded.name,
          email = excluded.email,
          category = excluded.category,
          metadata = excluded.metadata,
          updated_at = CURRENT_TIMESTAMP
    input:
      params:
        - ${item.id}
        - ${item.name}
        - ${item.email}
        - ${item.category}
        - ${JSON.stringify(item.metadata)}
    state:
      set: [loadedCount]

  # Log invalid records
  - member: log-invalid
    condition: ${filter-invalid.output.invalid.length > 0}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO validation_errors (record_id, errors, created_at)
        VALUES (?, ?, CURRENT_TIMESTAMP)
    input:
      params:
        - ${filter-invalid.output.invalid.map(r => r.id)}
        - ${JSON.stringify(state.validationResults.errors)}

  # Notify on completion
  - member: send-notification
    type: API
    config:
      url: "${env.SLACK_WEBHOOK_URL}"
      method: POST
    input:
      body:
        text: "Data pipeline completed"
        blocks:
          - type: "section"
            text:
              type: "mrkdwn"
              text: |
                *Pipeline Results*
                • Extracted: ${extract-api.output.data.length + extract-database.output.results.length + extract-s3.output.data.length} records
                • Valid: ${filter-invalid.output.valid.length} records
                • Invalid: ${filter-invalid.output.invalid.length} records
                • Loaded: ${state.loadedCount} records

output:
  summary:
    extracted: ${state.extractedData.length}
    transformed: ${state.transformedData.length}
    valid: ${filter-invalid.output.valid.length}
    invalid: ${filter-invalid.output.invalid.length}
    loaded: ${state.loadedCount}
  validationErrors: ${state.validationResults.errors}
  completedAt: ${Date.now()}

Key Patterns

1. Parallel Extraction

parallel:
  - member: extract-api
    type: Fetch
  - member: extract-database
    type: Data
  - member: extract-s3
    type: Fetch
Extract from multiple sources simultaneously for maximum performance.

2. Data Transformation

- member: transform-data
  type: Transform
  input:
    expression: |
      {
        "transformed": records[].{
          "id": id,
          "name": $trim(name),
          "email": $lowercase(email)
        }
      }
Use JSONata for powerful data transformations.

3. Quality Validation

- member: validate-data
  type: Function
  input:
    rules:
      - field: "email"
        pattern: "^[^\\s@]+@[^\\s@]+\\.[^\\s@]+$"
      - field: "name"
        minLength: 1
Validate data quality before loading.

4. Batch Loading

- member: load-batch
  foreach: ${validRecords}
  batch: 100
  type: Data
Insert records in batches for efficiency.

Testing

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('data-processing-pipeline', () => {
  it('should process data from multiple sources', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        http: {
          responses: {
            'https://api.example.com/data': {
              data: { records: [{ id: 1, name: 'Test', email: 'test@example.com' }] }
            }
          }
        },
        db: {
          results: [
            { id: 2, name: 'User', email: 'user@example.com' }
          ]
        }
      }
    });

    const result = await conductor.executeEnsemble('data-processing-pipeline', {
      lastSync: Date.now() - 86400000
    });

    expect(result).toBeSuccessful();
    expect(result.output.summary.extracted).toBeGreaterThan(0);
    expect(result.output.summary.loaded).toBeGreaterThan(0);
  });

  it('should filter invalid records', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        data: [
          { id: 1, name: 'Valid', email: 'valid@example.com' },
          { id: 2, name: '', email: 'invalid' }  // Invalid
        ]
      }
    });

    const result = await conductor.executeEnsemble('data-processing-pipeline', {
      lastSync: Date.now()
    });

    expect(result.output.summary.valid).toBe(1);
    expect(result.output.summary.invalid).toBe(1);
  });
});

Enhancements

Add Deduplication

- member: deduplicate
  type: Transform
  input:
    data: ${combine-sources.output}
    expression: |
      {
        "unique": $distinct(records[], function($r) { $r.id })
      }

Add Data Enrichment

- member: enrich-data
  foreach: ${transform-data.output.transformed}
  type: Think
  config:
    provider: openai
    model: gpt-4o-mini
  input:
    prompt: |
      Enrich this customer record with industry classification:
      ${JSON.stringify(item)}

Add Monitoring

- member: log-metrics
  type: Data
  config:
    storage: d1
    operation: query
    query: |
      INSERT INTO pipeline_metrics (
        pipeline_name, records_processed, duration_ms, timestamp
      ) VALUES (?, ?, ?, CURRENT_TIMESTAMP)
  input:
    params:
      - "data-processing-pipeline"
      - ${state.loadedCount}
      - ${Date.now() - execution.startTime}

Best Practices

  1. Extract in parallel - Maximize throughput
  2. Validate early - Filter bad data before processing
  3. Batch operations - Efficient database inserts
  4. Use state management - Track progress across steps
  5. Handle errors gracefully - Log and continue
  6. Cache extracted data - Reduce API calls
  7. Monitor metrics - Track success rates
  8. Test thoroughly - Verify each stage