> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Processing ETL

> Move data between systems. Transform, validate, and load at scale.

## Basic ETL Pattern

<Tabs>
  <Tab title="YAML">
    ```yaml theme={null}
    ensemble: etl-pipeline

    agents:
      # Extract
      - name: extract
        operation: http
        config:
          url: https://source-api.example.com/data

      # Transform
      - name: transform
        operation: code
        inputs:
          data: ${extract.output.body}
          template: ${component.transform-template@v1.0.0}

      # Validate
      - name: validate
        operation: validate
        inputs:
          data: ${transform.output}
          schema: ${component.target-schema@v1.0.0}

      # Load
      - name: load
        condition: ${validate.output.valid}
        operation: data
        config:
          backend: d1
          binding: DB
          operation: execute
          sql: INSERT INTO target_table ...
          params: ${validate.output.data}
    ```
  </Tab>

  <Tab title="TypeScript">
    ```typescript theme={null}
    import { createEnsemble, step, branch } from '@anthropic/conductor'

    const etlPipeline = createEnsemble('etl-pipeline')
      // Extract
      .addStep(
        step('extract')
          .operation('http')
          .config({ url: 'https://source-api.example.com/data' })
      )
      // Transform
      .addStep(
        step('transform')
          .operation('code')
          .input({
            data: '${extract.output.body}',
            template: '${component.transform-template@v1.0.0}'
          })
      )
      // Validate
      .addStep(
        step('validate')
          .operation('validate')
          .input({
            data: '${transform.output}',
            schema: '${component.target-schema@v1.0.0}'
          })
      )
      // Load (conditional)
      .addStep(
        branch('load-if-valid')
          .condition('${validate.output.valid}')
          .then(
            step('load')
              .operation('data')
              .config({
                backend: 'd1',
                binding: 'DB',
                operation: 'execute',
                sql: 'INSERT INTO target_table ...',
                params: '${validate.output.data}'
              })
          )
      )
      .build()

    export default etlPipeline
    ```
  </Tab>
</Tabs>

## Database to Database

```yaml theme={null}
ensemble: db-to-db-etl

agents:
  # Extract from source
  - name: extract
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT *
        FROM source_table
        WHERE updated_at > ?
        ORDER BY id
        LIMIT 1000
      params: [${input.last_sync_time}]

  # Transform each record
  - name: transform
    operation: code
    inputs:
      data: ${extract.output}
      template:
        - id: ${item.source_id}
          name: ${uppercase(item.name)}
          email: ${lowercase(item.email)}
          created: ${formatDate(item.created_at)}
          metadata:
            source: legacy_db
            migrated_at: ${Date.now()}
      functions:
        uppercase: (str) => str.toUpperCase()
        lowercase: (str) => str.toLowerCase()
        formatDate: (date) => new Date(date).toISOString()

  # Validate transformed data
  - name: validate
    operation: validate
    inputs:
      data: ${transform.output}
      schema:
        id: string
        name: string
        email:
          type: string
          pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
        created: string

  # Handle validation errors
  - name: log-errors
    condition: ${!validate.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO etl_errors (batch_id, errors, timestamp) VALUES (?, ?, ?)
      params:
        - ${input.batch_id}
        - ${JSON.stringify(validate.output.errors)}
        - ${Date.now()}

  # Load valid records
  - name: load
    condition: ${validate.output.valid}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT OR REPLACE INTO target_table (id, name, email, created, metadata)
        VALUES (?, ?, ?, ?, ?)
      params: ${validate.output.data.map(r => [
        r.id, r.name, r.email, r.created, JSON.stringify(r.metadata)
      ])}

  # Update sync timestamp
  - name: update-sync
    condition: ${load.success}
    operation: storage
    config:
      type: kv
      action: put
      key: last_sync_time
      value: ${Date.now()}

output:
  records_processed: ${extract.output.length}
  records_loaded: ${load.success ? validate.output.data.length : 0}
  errors: ${validate.output.valid ? [] : validate.output.errors}
```

## API to Data Warehouse

```yaml theme={null}
ensemble: api-to-warehouse

agents:
  # Extract from multiple API endpoints (parallel)
  - name: fetch-users
    operation: fetch
    inputs:
      url: https://api.example.com/users
      cacheTTL: 300

  - name: fetch-orders
    operation: fetch
    inputs:
      url: https://api.example.com/orders
      cacheTTL: 300

  - name: fetch-products
    operation: fetch
    inputs:
      url: https://api.example.com/products
      cacheTTL: 300

  # Transform users
  - name: transform-users
    operation: code
    inputs:
      data: ${fetch-users.output.body}
      template:
        - user_id: ${item.id}
          full_name: ${item.firstName} ${item.lastName}
          email: ${item.email}
          registration_date: ${item.createdAt}
          lifetime_value: ${item.orders.reduce((sum, o) => sum + o.total, 0)}

  # Transform orders
  - name: transform-orders
    operation: code
    inputs:
      data: ${fetch-orders.output.body}
      template:
        - order_id: ${item.id}
          user_id: ${item.userId}
          total: ${item.total}
          items_count: ${item.items.length}
          order_date: ${item.createdAt}
          status: ${item.status}

  # Transform products
  - name: transform-products
    operation: code
    inputs:
      data: ${fetch-products.output.body}
      template:
        - product_id: ${item.id}
          name: ${item.name}
          category: ${item.category}
          price: ${item.price}
          stock: ${item.stock}

  # Load to data warehouse (parallel)
  - name: load-users
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: users
        data: ${transform-users.output}

  - name: load-orders
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: orders
        data: ${transform-orders.output}

  - name: load-products
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: products
        data: ${transform-products.output}

  # Log pipeline execution
  - name: log
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO etl_log (
          pipeline, users_loaded, orders_loaded, products_loaded, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - api-to-warehouse
        - ${transform-users.output.length}
        - ${transform-orders.output.length}
        - ${transform-products.output.length}
        - ${Date.now()}
```

## CSV Import with Validation

```yaml theme={null}
ensemble: csv-import

agents:
  # Fetch CSV from R2
  - name: fetch-csv
    operation: storage
    config:
      type: r2
      action: get
      key: imports/${input.file_name}

  # Parse CSV
  - name: parse
    operation: code
    config:
      script: scripts/parse-csv
    input:
      csv: ${fetch-csv.output}

  # Validate each record
  - name: validate
    operation: validate
    inputs:
      data: ${parse.output.records}
      schema:
        name:
          type: string
          required: true
        email:
          type: string
          pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
          required: true
        age:
          type: number
          min: 0
          max: 120

  # Split valid and invalid
  - name: split-records
    operation: code
    config:
      script: scripts/split-validated-records
    input:
      validation: ${validate.output}
      records: ${parse.output.records}

  # Load valid records
  - name: load-valid
    condition: ${split-records.output.valid.length > 0}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO users (name, email, age) VALUES (?, ?, ?)
      params: ${split-records.output.valid.map(r => [r.name, r.email, r.age])}

  # Store invalid records
  - name: store-invalid
    condition: ${split-records.output.invalid.length > 0}
    operation: storage
    config:
      type: r2
      action: put
      key: imports/invalid/${input.file_name}
      value: ${JSON.stringify(split-records.output.invalid)}

  # Notify completion
  - name: notify
    operation: email
    config:
      to: ${input.uploader_email}
      subject: "CSV Import Complete - ${input.file_name}"
      body: |
        Import Results:
        - Valid records: ${split-records.output.valid.length}
        - Invalid records: ${split-records.output.invalid.length}
        - Total processed: ${parse.output.records.length}

        ${split-records.output.invalid.length > 0 ?
          'Invalid records saved to: imports/invalid/' + input.file_name : ''}
```

```typescript theme={null}
// scripts/parse-csv.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function parseCsv(context: AgentExecutionContext) {
  const { csv } = context.input
  const lines = csv.split('\n')
  const headers = lines[0].split(',')

  const records = lines.slice(1).map(line => {
    const values = line.split(',')
    return Object.fromEntries(
      headers.map((h, i) => [h, values[i]])
    )
  })

  return { records }
}
```

```typescript theme={null}
// scripts/split-validated-records.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function splitValidatedRecords(context: AgentExecutionContext) {
  const { validation, records } = context.input

  if (validation.valid) {
    return {
      valid: validation.data,
      invalid: []
    }
  }

  // Split records by validation errors
  const valid = []
  const invalid = []

  records.forEach((record, idx) => {
    const error = validation.errors.find(e => e.row === idx)
    if (error) {
      invalid.push({ ...record, error: error.message })
    } else {
      valid.push(record)
    }
  })

  return { valid, invalid }
}
```

```typescript theme={null}
// scripts/return-no-new-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function returnNoNewData(context: AgentExecutionContext) {
  return { message: 'No new data' }
}
```

## Incremental Sync

```yaml theme={null}
ensemble: incremental-sync

state:
  schema:
    last_sync_id: number
    last_sync_time: string

agents:
  # Get last sync point
  - name: get-checkpoint
    operation: storage
    state:
      use: [last_sync_id, last_sync_time]
    config:
      type: kv
      action: get
      key: sync_checkpoint

  # Extract new/updated records
  - name: extract
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT *
        FROM source_table
        WHERE id > ? OR updated_at > ?
        ORDER BY id
        LIMIT 10000
      params:
        - ${state.last_sync_id || 0}
        - ${state.last_sync_time || '1970-01-01'}

  # No new data? Skip rest
  - name: check-empty
    condition: ${extract.output.length === 0}
    operation: code
    config:
      script: scripts/return-no-new-data

  # Transform
  - name: transform
    condition: ${extract.output.length > 0}
    operation: code
    inputs:
      data: ${extract.output}
      template: ${component.transform-template@v1.0.0}

  # Load
  - name: load
    condition: ${transform.executed}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT OR REPLACE INTO target_table ...
      params: ${transform.output}

  # Update checkpoint
  - name: update-checkpoint
    condition: ${load.success}
    operation: storage
    state:
      set:
        last_sync_id: ${extract.output[extract.output.length - 1].id}
        last_sync_time: ${new Date().toISOString()}
    config:
      type: kv
      action: put
      key: sync_checkpoint
      value:
        last_sync_id: ${extract.output[extract.output.length - 1].id}
        last_sync_time: ${new Date().toISOString()}
```

## Real-time Streaming ETL

```yaml theme={null}
ensemble: streaming-etl

agents:
  # Receive event
  - name: validate-event
    operation: validate
    inputs:
      data: ${input.event}
      schema: ${component.event-schema@v1.0.0}

  # Enrich with context
  - name: enrich
    operation: storage
    config:
      type: kv
      action: get
      key: user-${input.event.user_id}

  # Transform
  - name: transform
    operation: code
    inputs:
      data:
        event: ${validate-event.output.data}
        user: ${enrich.output}
      template:
        event_id: ${data.event.id}
        event_type: ${data.event.type}
        user_id: ${data.event.user_id}
        user_segment: ${data.user.segment}
        properties: ${data.event.properties}
        timestamp: ${Date.now()}

  # Write to multiple destinations (parallel)
  - name: write-analytics-db
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO events ...
      params: ${transform.output}

  - name: write-data-lake
    operation: storage
    config:
      type: r2
      action: put
      key: events/${new Date().toISOString().split('T')[0]}/${transform.output.event_id}.json
      value: ${JSON.stringify(transform.output)}

  - name: send-to-warehouse
    operation: http
    config:
      url: https://warehouse.example.com/api/ingest
      body: ${transform.output}
```

## Best Practices

**1. Batch Processing**

```yaml theme={null}
# Process in chunks
agents:
  - name: extract
    operation: storage
    config:
      query: SELECT * FROM source LIMIT 1000 OFFSET ${input.offset}
```

**2. Error Handling**

```yaml theme={null}
# Always handle failures
agents:
  - name: transform
    operation: code

  - name: log-error
    condition: ${transform.failed}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: INSERT INTO etl_errors ...
```

**3. Idempotency**

```yaml theme={null}
# Use INSERT OR REPLACE
config:
  query: INSERT OR REPLACE INTO target ...
```

**4. Checkpointing**

```yaml theme={null}
# Track progress
state:
  schema:
    last_processed_id: number
```

**5. Monitoring**

```yaml theme={null}
# Log metrics
agents:
  - name: log-metrics
    operation: storage
    config:
      query: |
        INSERT INTO etl_metrics (
          pipeline, records_processed, duration, timestamp
        ) VALUES (?, ?, ?, ?)
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Code Operation" icon="wand-magic-sparkles" href="/conductor/operations/code">
    Data transformation
  </Card>

  <Card title="Validate Operation" icon="check-circle" href="/conductor/starter-kit/validate">
    Data validation
  </Card>

  <Card title="storage Operation" icon="database" href="/conductor/operations/storage">
    Database access
  </Card>

  <Card title="Event-Driven Workflow" icon="bolt" href="/conductor/playbooks/event-driven-workflow">
    Real-time processing
  </Card>
</CardGroup>
