Skip to main content

Data Processing ETL Playbook

Move data between systems. Transform, validate, and load at scale.

Basic ETL Pattern

ensemble: etl-pipeline

agents:
  # Extract
  - name: extract
    operation: http
    config:
      url: https://source-api.example.com/data

  # Transform
  - name: transform
    agent: transformer
    inputs:
      data: ${extract.output.body}
      template: ${component.transform-template@v1.0.0}

  # Validate
  - name: validate
    agent: validator
    inputs:
      data: ${transform.output}
      schema: ${component.target-schema@v1.0.0}

  # Load
  - name: load
    condition: ${validate.output.valid}
    operation: storage
    config:
      type: d1
      query: INSERT INTO target_table ...
      params: ${validate.output.data}

Database to Database

ensemble: db-to-db-etl

agents:
  # Extract from source
  - name: extract
    operation: storage
    config:
      type: d1
      query: |
        SELECT *
        FROM source_table
        WHERE updated_at > ?
        ORDER BY id
        LIMIT 1000
      params: [${input.last_sync_time}]

  # Transform each record
  - name: transform
    agent: transformer
    inputs:
      data: ${extract.output}
      template:
        - id: ${item.source_id}
          name: ${uppercase(item.name)}
          email: ${lowercase(item.email)}
          created: ${formatDate(item.created_at)}
          metadata:
            source: legacy_db
            migrated_at: ${Date.now()}
      functions:
        uppercase: (str) => str.toUpperCase()
        lowercase: (str) => str.toLowerCase()
        formatDate: (date) => new Date(date).toISOString()

  # Validate transformed data
  - name: validate
    agent: validator
    inputs:
      data: ${transform.output}
      schema:
        id: string
        name: string
        email:
          type: string
          pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
        created: string

  # Handle validation errors
  - name: log-errors
    condition: ${!validate.output.valid}
    operation: storage
    config:
      type: d1
      query: INSERT INTO etl_errors (batch_id, errors, timestamp) VALUES (?, ?, ?)
      params:
        - ${input.batch_id}
        - ${JSON.stringify(validate.output.errors)}
        - ${Date.now()}

  # Load valid records
  - name: load
    condition: ${validate.output.valid}
    operation: storage
    config:
      type: d1
      query: |
        INSERT OR REPLACE INTO target_table (id, name, email, created, metadata)
        VALUES (?, ?, ?, ?, ?)
      params: ${validate.output.data.map(r => [
        r.id, r.name, r.email, r.created, JSON.stringify(r.metadata)
      ])}

  # Update sync timestamp
  - name: update-sync
    condition: ${load.success}
    operation: storage
    config:
      type: kv
      action: put
      key: last_sync_time
      value: ${Date.now()}

output:
  records_processed: ${extract.output.length}
  records_loaded: ${load.success ? validate.output.data.length : 0}
  errors: ${validate.output.valid ? [] : validate.output.errors}

API to Data Warehouse

ensemble: api-to-warehouse

agents:
  # Extract from multiple API endpoints (parallel)
  - name: fetch-users
    agent: fetcher
    inputs:
      url: https://api.example.com/users
      cacheTTL: 300

  - name: fetch-orders
    agent: fetcher
    inputs:
      url: https://api.example.com/orders
      cacheTTL: 300

  - name: fetch-products
    agent: fetcher
    inputs:
      url: https://api.example.com/products
      cacheTTL: 300

  # Transform users
  - name: transform-users
    agent: transformer
    inputs:
      data: ${fetch-users.output.body}
      template:
        - user_id: ${item.id}
          full_name: ${item.firstName} ${item.lastName}
          email: ${item.email}
          registration_date: ${item.createdAt}
          lifetime_value: ${item.orders.reduce((sum, o) => sum + o.total, 0)}

  # Transform orders
  - name: transform-orders
    agent: transformer
    inputs:
      data: ${fetch-orders.output.body}
      template:
        - order_id: ${item.id}
          user_id: ${item.userId}
          total: ${item.total}
          items_count: ${item.items.length}
          order_date: ${item.createdAt}
          status: ${item.status}

  # Transform products
  - name: transform-products
    agent: transformer
    inputs:
      data: ${fetch-products.output.body}
      template:
        - product_id: ${item.id}
          name: ${item.name}
          category: ${item.category}
          price: ${item.price}
          stock: ${item.stock}

  # Load to data warehouse (parallel)
  - name: load-users
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: users
        data: ${transform-users.output}

  - name: load-orders
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: orders
        data: ${transform-orders.output}

  - name: load-products
    operation: http
    config:
      url: https://warehouse.example.com/api/load
      method: POST
      body:
        table: products
        data: ${transform-products.output}

  # Log pipeline execution
  - name: log
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO etl_log (
          pipeline, users_loaded, orders_loaded, products_loaded, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - api-to-warehouse
        - ${transform-users.output.length}
        - ${transform-orders.output.length}
        - ${transform-products.output.length}
        - ${Date.now()}

CSV Import with Validation

ensemble: csv-import

agents:
  # Fetch CSV from R2
  - name: fetch-csv
    operation: storage
    config:
      type: r2
      action: get
      key: imports/${input.file_name}

  # Parse CSV
  - name: parse
    operation: code
    config:
      code: |
        const csv = ${fetch-csv.output};
        const lines = csv.split('\n');
        const headers = lines[0].split(',');

        const records = lines.slice(1).map(line => {
          const values = line.split(',');
          return Object.fromEntries(
            headers.map((h, i) => [h, values[i]])
          );
        });

        return { records };

  # Validate each record
  - name: validate
    agent: validator
    inputs:
      data: ${parse.output.records}
      schema:
        name:
          type: string
          required: true
        email:
          type: string
          pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
          required: true
        age:
          type: number
          min: 0
          max: 120

  # Split valid and invalid
  - name: split-records
    operation: code
    config:
      code: |
        const validation = ${validate.output};

        if (validation.valid) {
          return {
            valid: validation.data,
            invalid: []
          };
        }

        // Split records by validation errors
        const valid = [];
        const invalid = [];

        ${parse.output.records}.forEach((record, idx) => {
          const error = validation.errors.find(e => e.row === idx);
          if (error) {
            invalid.push({ ...record, error: error.message });
          } else {
            valid.push(record);
          }
        });

        return { valid, invalid };

  # Load valid records
  - name: load-valid
    condition: ${split-records.output.valid.length > 0}
    operation: storage
    config:
      type: d1
      query: INSERT INTO users (name, email, age) VALUES (?, ?, ?)
      params: ${split-records.output.valid.map(r => [r.name, r.email, r.age])}

  # Store invalid records
  - name: store-invalid
    condition: ${split-records.output.invalid.length > 0}
    operation: storage
    config:
      type: r2
      action: put
      key: imports/invalid/${input.file_name}
      value: ${JSON.stringify(split-records.output.invalid)}

  # Notify completion
  - name: notify
    operation: email
    config:
      to: ${input.uploader_email}
      subject: "CSV Import Complete - ${input.file_name}"
      body: |
        Import Results:
        - Valid records: ${split-records.output.valid.length}
        - Invalid records: ${split-records.output.invalid.length}
        - Total processed: ${parse.output.records.length}

        ${split-records.output.invalid.length > 0 ?
          'Invalid records saved to: imports/invalid/' + input.file_name : ''}

Incremental Sync

ensemble: incremental-sync

state:
  schema:
    last_sync_id: number
    last_sync_time: string

agents:
  # Get last sync point
  - name: get-checkpoint
    operation: storage
    state:
      use: [last_sync_id, last_sync_time]
    config:
      type: kv
      action: get
      key: sync_checkpoint

  # Extract new/updated records
  - name: extract
    operation: storage
    config:
      type: d1
      query: |
        SELECT *
        FROM source_table
        WHERE id > ? OR updated_at > ?
        ORDER BY id
        LIMIT 10000
      params:
        - ${state.last_sync_id || 0}
        - ${state.last_sync_time || '1970-01-01'}

  # No new data? Skip rest
  - name: check-empty
    condition: ${extract.output.length === 0}
    operation: code
    config:
      code: return { message: 'No new data' };

  # Transform
  - name: transform
    condition: ${extract.output.length > 0}
    agent: transformer
    inputs:
      data: ${extract.output}
      template: ${component.transform-template@v1.0.0}

  # Load
  - name: load
    condition: ${transform.executed}
    operation: storage
    config:
      type: d1
      query: INSERT OR REPLACE INTO target_table ...
      params: ${transform.output}

  # Update checkpoint
  - name: update-checkpoint
    condition: ${load.success}
    operation: storage
    state:
      set:
        last_sync_id: ${extract.output[extract.output.length - 1].id}
        last_sync_time: ${new Date().toISOString()}
    config:
      type: kv
      action: put
      key: sync_checkpoint
      value:
        last_sync_id: ${extract.output[extract.output.length - 1].id}
        last_sync_time: ${new Date().toISOString()}

Real-time Streaming ETL

ensemble: streaming-etl

agents:
  # Receive event
  - name: validate-event
    agent: validator
    inputs:
      data: ${input.event}
      schema: ${component.event-schema@v1.0.0}

  # Enrich with context
  - name: enrich
    operation: storage
    config:
      type: kv
      action: get
      key: user-${input.event.user_id}

  # Transform
  - name: transform
    agent: transformer
    inputs:
      data:
        event: ${validate-event.output.data}
        user: ${enrich.output}
      template:
        event_id: ${data.event.id}
        event_type: ${data.event.type}
        user_id: ${data.event.user_id}
        user_segment: ${data.user.segment}
        properties: ${data.event.properties}
        timestamp: ${Date.now()}

  # Write to multiple destinations (parallel)
  - name: write-analytics-db
    operation: storage
    config:
      type: d1
      query: INSERT INTO events ...
      params: ${transform.output}

  - name: write-data-lake
    operation: storage
    config:
      type: r2
      action: put
      key: events/${new Date().toISOString().split('T')[0]}/${transform.output.event_id}.json
      value: ${JSON.stringify(transform.output)}

  - name: send-to-warehouse
    operation: http
    config:
      url: https://warehouse.example.com/api/ingest
      body: ${transform.output}

Best Practices

1. Batch Processing
# Process in chunks
agents:
  - name: extract
    operation: storage
    config:
      query: SELECT * FROM source LIMIT 1000 OFFSET ${input.offset}
2. Error Handling
# Always handle failures
agents:
  - name: transform
    agent: transformer

  - name: log-error
    condition: ${transform.failed}
    operation: storage
    config:
      type: d1
      query: INSERT INTO etl_errors ...
3. Idempotency
# Use INSERT OR REPLACE
config:
  query: INSERT OR REPLACE INTO target ...
4. Checkpointing
# Track progress
state:
  schema:
    last_processed_id: number
5. Monitoring
# Log metrics
agents:
  - name: log-metrics
    operation: storage
    config:
      query: |
        INSERT INTO etl_metrics (
          pipeline, records_processed, duration, timestamp
        ) VALUES (?, ?, ?, ?)

Next Steps