Skip to main content

Basic ETL Pattern

ensemble: etl-pipeline

agents:
  # Extract
  - name: extract
    operation: http
    config:
      url: https://source-api.example.com/data

  # Transform
  - name: transform
    operation: code
    inputs:
      data: ${extract.output.body}
      template: ${[email protected]}

  # Validate
  - name: validate
    operation: validate
    inputs:
      data: ${transform.output}
      schema: ${[email protected]}

  # Load
  - name: load
    condition: ${validate.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO target_table ...
      params: ${validate.output.data}

Database to Database

ensemble: db-to-db-etl

agents:
  # Extract from source
  - name: extract
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT *
        FROM source_table
        WHERE updated_at > ?
        ORDER BY id
        LIMIT 1000
      params: [${input.last_sync_time}]

  # Transform each record
  - name: transform
    operation: code
    inputs:
      data: ${extract.output}
      template:
        - id: ${item.source_id}
          name: ${uppercase(item.name)}
          email: ${lowercase(item.email)}
          created: ${formatDate(item.created_at)}
          metadata:
            source: legacy_db
            migrated_at: ${Date.now()}
      functions:
        uppercase: (str) => str.toUpperCase()
        lowercase: (str) => str.toLowerCase()
        formatDate: (date) => new Date(date).toISOString()

  # Validate transformed data
  - name: validate
    operation: validate
    inputs:
      data: ${transform.output}
      schema:
        id: string
        name: string
        email:
          type: string
          pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
        created: string

  # Handle validation errors
  - name: log-errors
    condition: ${!validate.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO etl_errors (batch_id, errors, timestamp) VALUES (?, ?, ?)
      params:
        - ${input.batch_id}
        - ${JSON.stringify(validate.output.errors)}
        - ${Date.now()}

  # Load valid records
  - name: load
    condition: ${validate.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT OR REPLACE INTO target_table (id, name, email, created, metadata)
        VALUES (?, ?, ?, ?, ?)
      params: ${validate.output.data.map(r => [
        r.id, r.name, r.email, r.created, JSON.stringify(r.metadata)
      ])}

  # Update sync timestamp
  - name: update-sync
    condition: ${load.success}
    operation: storage
    config:
      type: kv
      action: put
      key: last_sync_time
      value: ${Date.now()}

output:
  records_processed: ${extract.output.length}
  records_loaded: ${load.success ? validate.output.data.length : 0}
  errors: ${validate.output.valid ? [] : validate.output.errors}

API to Data Warehouse

ensemble: api-to-warehouse

agents:
  # Extract from multiple API endpoints (parallel)
  - name: fetch-users
    operation: fetch
    inputs:
      url: https://api.example.com/users
      cacheTTL: 300

  - name: fetch-orders
    operation: fetch
    inputs:
      url: https://api.example.com/orders
      cacheTTL: 300

  - name: fetch-products
    operation: fetch
    inputs:
      url: https://api.example.com/products
      cacheTTL: 300

  # Transform users
  - name: transform-users
    operation: code
    inputs:
      data: ${fetch-users.output.body}
      template:
        - user_id: ${item.id}
          full_name: ${item.firstName} ${item.lastName}
          email: ${item.email}
          registration_date: ${item.createdAt}
          lifetime_value: ${item.orders.reduce((sum, o) => sum + o.total, 0)}

  # Transform orders
  - name: transform-orders
    operation: code
    inputs:
      data: ${fetch-orders.output.body}
      template:
        - order_id: ${item.id}
          user_id: ${item.userId}
          total: ${item.total}
          items_count: ${item.items.length}
          order_date: ${item.createdAt}
          status: ${item.status}

  # Transform products
  - name: transform-products
    operation: code
    inputs:
      data: ${fetch-products.output.body}
      template:
        - product_id: ${item.id}
          name: ${item.name}
          category: ${item.category}
          price: ${item.price}
          stock: ${item.stock}

  # Load to data warehouse (parallel)
  - name: load-users
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: users
        data: ${transform-users.output}

  - name: load-orders
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: orders
        data: ${transform-orders.output}

  - name: load-products
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: products
        data: ${transform-products.output}

  # Log pipeline execution
  - name: log
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO etl_log (
          pipeline, users_loaded, orders_loaded, products_loaded, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - api-to-warehouse
        - ${transform-users.output.length}
        - ${transform-orders.output.length}
        - ${transform-products.output.length}
        - ${Date.now()}

CSV Import with Validation

ensemble: csv-import

agents:
  # Fetch CSV from R2
  - name: fetch-csv
    operation: storage
    config:
      type: r2
      action: get
      key: imports/${input.file_name}

  # Parse CSV
  - name: parse
    operation: code
    config:
      script: scripts/parse-csv
    input:
      csv: ${fetch-csv.output}

  # Validate each record
  - name: validate
    operation: validate
    inputs:
      data: ${parse.output.records}
      schema:
        name:
          type: string
          required: true
        email:
          type: string
          pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
          required: true
        age:
          type: number
          min: 0
          max: 120

  # Split valid and invalid
  - name: split-records
    operation: code
    config:
      script: scripts/split-validated-records
    input:
      validation: ${validate.output}
      records: ${parse.output.records}

  # Load valid records
  - name: load-valid
    condition: ${split-records.output.valid.length > 0}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO users (name, email, age) VALUES (?, ?, ?)
      params: ${split-records.output.valid.map(r => [r.name, r.email, r.age])}

  # Store invalid records
  - name: store-invalid
    condition: ${split-records.output.invalid.length > 0}
    operation: storage
    config:
      type: r2
      action: put
      key: imports/invalid/${input.file_name}
      value: ${JSON.stringify(split-records.output.invalid)}

  # Notify completion
  - name: notify
    operation: email
    config:
      to: ${input.uploader_email}
      subject: "CSV Import Complete - ${input.file_name}"
      body: |
        Import Results:
        - Valid records: ${split-records.output.valid.length}
        - Invalid records: ${split-records.output.invalid.length}
        - Total processed: ${parse.output.records.length}

        ${split-records.output.invalid.length > 0 ?
          'Invalid records saved to: imports/invalid/' + input.file_name : ''}
// scripts/parse-csv.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function parseCsv(context: AgentExecutionContext) {
  const { csv } = context.input
  const lines = csv.split('\n')
  const headers = lines[0].split(',')

  const records = lines.slice(1).map(line => {
    const values = line.split(',')
    return Object.fromEntries(
      headers.map((h, i) => [h, values[i]])
    )
  })

  return { records }
}
// scripts/split-validated-records.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function splitValidatedRecords(context: AgentExecutionContext) {
  const { validation, records } = context.input

  if (validation.valid) {
    return {
      valid: validation.data,
      invalid: []
    }
  }

  // Split records by validation errors
  const valid = []
  const invalid = []

  records.forEach((record, idx) => {
    const error = validation.errors.find(e => e.row === idx)
    if (error) {
      invalid.push({ ...record, error: error.message })
    } else {
      valid.push(record)
    }
  })

  return { valid, invalid }
}
// scripts/return-no-new-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function returnNoNewData(context: AgentExecutionContext) {
  return { message: 'No new data' }
}

Incremental Sync

ensemble: incremental-sync

state:
  schema:
    last_sync_id: number
    last_sync_time: string

agents:
  # Get last sync point
  - name: get-checkpoint
    operation: storage
    state:
      use: [last_sync_id, last_sync_time]
    config:
      type: kv
      action: get
      key: sync_checkpoint

  # Extract new/updated records
  - name: extract
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT *
        FROM source_table
        WHERE id > ? OR updated_at > ?
        ORDER BY id
        LIMIT 10000
      params:
        - ${state.last_sync_id || 0}
        - ${state.last_sync_time || '1970-01-01'}

  # No new data? Skip rest
  - name: check-empty
    condition: ${extract.output.length === 0}
    operation: code
    config:
      script: scripts/return-no-new-data

  # Transform
  - name: transform
    condition: ${extract.output.length > 0}
    operation: code
    inputs:
      data: ${extract.output}
      template: ${[email protected]}

  # Load
  - name: load
    condition: ${transform.executed}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT OR REPLACE INTO target_table ...
      params: ${transform.output}

  # Update checkpoint
  - name: update-checkpoint
    condition: ${load.success}
    operation: storage
    state:
      set:
        last_sync_id: ${extract.output[extract.output.length - 1].id}
        last_sync_time: ${new Date().toISOString()}
    config:
      type: kv
      action: put
      key: sync_checkpoint
      value:
        last_sync_id: ${extract.output[extract.output.length - 1].id}
        last_sync_time: ${new Date().toISOString()}

Real-time Streaming ETL

ensemble: streaming-etl

agents:
  # Receive event
  - name: validate-event
    operation: validate
    inputs:
      data: ${input.event}
      schema: ${[email protected]}

  # Enrich with context
  - name: enrich
    operation: storage
    config:
      type: kv
      action: get
      key: user-${input.event.user_id}

  # Transform
  - name: transform
    operation: code
    inputs:
      data:
        event: ${validate-event.output.data}
        user: ${enrich.output}
      template:
        event_id: ${data.event.id}
        event_type: ${data.event.type}
        user_id: ${data.event.user_id}
        user_segment: ${data.user.segment}
        properties: ${data.event.properties}
        timestamp: ${Date.now()}

  # Write to multiple destinations (parallel)
  - name: write-analytics-db
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO events ...
      params: ${transform.output}

  - name: write-data-lake
    operation: storage
    config:
      type: r2
      action: put
      key: events/${new Date().toISOString().split('T')[0]}/${transform.output.event_id}.json
      value: ${JSON.stringify(transform.output)}

  - name: send-to-warehouse
    operation: http
    config:
      url: https://warehouse.example.com/api/ingest
      body: ${transform.output}

Best Practices

1. Batch Processing
# Process in chunks
agents:
  - name: extract
    operation: storage
    config:
      query: SELECT * FROM source LIMIT 1000 OFFSET ${input.offset}
2. Error Handling
# Always handle failures
agents:
  - name: transform
    operation: code

  - name: log-error
    condition: ${transform.failed}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO etl_errors ...
3. Idempotency
# Use INSERT OR REPLACE
config:
  query: INSERT OR REPLACE INTO target ...
4. Checkpointing
# Track progress
state:
  schema:
    last_processed_id: number
5. Monitoring
# Log metrics
agents:
  - name: log-metrics
    operation: storage
    config:
      query: |
        INSERT INTO etl_metrics (
          pipeline, records_processed, duration, timestamp
        ) VALUES (?, ?, ?, ?)

Next Steps