Data Processing ETL Playbook
Move data between systems. Transform, validate, and load at scale.Basic ETL Pattern
Copy
ensemble: etl-pipeline
agents:
# Extract
- name: extract
operation: http
config:
url: https://source-api.example.com/data
# Transform
- name: transform
agent: transformer
inputs:
data: ${extract.output.body}
template: ${component.transform-template@v1.0.0}
# Validate
- name: validate
agent: validator
inputs:
data: ${transform.output}
schema: ${component.target-schema@v1.0.0}
# Load
- name: load
condition: ${validate.output.valid}
operation: storage
config:
type: d1
query: INSERT INTO target_table ...
params: ${validate.output.data}
Database to Database
Copy
ensemble: db-to-db-etl
agents:
# Extract from source
- name: extract
operation: storage
config:
type: d1
query: |
SELECT *
FROM source_table
WHERE updated_at > ?
ORDER BY id
LIMIT 1000
params: [${input.last_sync_time}]
# Transform each record
- name: transform
agent: transformer
inputs:
data: ${extract.output}
template:
- id: ${item.source_id}
name: ${uppercase(item.name)}
email: ${lowercase(item.email)}
created: ${formatDate(item.created_at)}
metadata:
source: legacy_db
migrated_at: ${Date.now()}
functions:
uppercase: (str) => str.toUpperCase()
lowercase: (str) => str.toLowerCase()
formatDate: (date) => new Date(date).toISOString()
# Validate transformed data
- name: validate
agent: validator
inputs:
data: ${transform.output}
schema:
id: string
name: string
email:
type: string
pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
created: string
# Handle validation errors
- name: log-errors
condition: ${!validate.output.valid}
operation: storage
config:
type: d1
query: INSERT INTO etl_errors (batch_id, errors, timestamp) VALUES (?, ?, ?)
params:
- ${input.batch_id}
- ${JSON.stringify(validate.output.errors)}
- ${Date.now()}
# Load valid records
- name: load
condition: ${validate.output.valid}
operation: storage
config:
type: d1
query: |
INSERT OR REPLACE INTO target_table (id, name, email, created, metadata)
VALUES (?, ?, ?, ?, ?)
params: ${validate.output.data.map(r => [
r.id, r.name, r.email, r.created, JSON.stringify(r.metadata)
])}
# Update sync timestamp
- name: update-sync
condition: ${load.success}
operation: storage
config:
type: kv
action: put
key: last_sync_time
value: ${Date.now()}
output:
records_processed: ${extract.output.length}
records_loaded: ${load.success ? validate.output.data.length : 0}
errors: ${validate.output.valid ? [] : validate.output.errors}
API to Data Warehouse
Copy
ensemble: api-to-warehouse
agents:
# Extract from multiple API endpoints (parallel)
- name: fetch-users
agent: fetcher
inputs:
url: https://api.example.com/users
cacheTTL: 300
- name: fetch-orders
agent: fetcher
inputs:
url: https://api.example.com/orders
cacheTTL: 300
- name: fetch-products
agent: fetcher
inputs:
url: https://api.example.com/products
cacheTTL: 300
# Transform users
- name: transform-users
agent: transformer
inputs:
data: ${fetch-users.output.body}
template:
- user_id: ${item.id}
full_name: ${item.firstName} ${item.lastName}
email: ${item.email}
registration_date: ${item.createdAt}
lifetime_value: ${item.orders.reduce((sum, o) => sum + o.total, 0)}
# Transform orders
- name: transform-orders
agent: transformer
inputs:
data: ${fetch-orders.output.body}
template:
- order_id: ${item.id}
user_id: ${item.userId}
total: ${item.total}
items_count: ${item.items.length}
order_date: ${item.createdAt}
status: ${item.status}
# Transform products
- name: transform-products
agent: transformer
inputs:
data: ${fetch-products.output.body}
template:
- product_id: ${item.id}
name: ${item.name}
category: ${item.category}
price: ${item.price}
stock: ${item.stock}
# Load to data warehouse (parallel)
- name: load-users
operation: http
config:
url: https://warehouse.example.com/api/load
method: POST
body:
table: users
data: ${transform-users.output}
- name: load-orders
operation: http
config:
url: https://warehouse.example.com/api/load
method: POST
body:
table: orders
data: ${transform-orders.output}
- name: load-products
operation: http
config:
url: https://warehouse.example.com/api/load
method: POST
body:
table: products
data: ${transform-products.output}
# Log pipeline execution
- name: log
operation: storage
config:
type: d1
query: |
INSERT INTO etl_log (
pipeline, users_loaded, orders_loaded, products_loaded, timestamp
) VALUES (?, ?, ?, ?, ?)
params:
- api-to-warehouse
- ${transform-users.output.length}
- ${transform-orders.output.length}
- ${transform-products.output.length}
- ${Date.now()}
CSV Import with Validation
Copy
ensemble: csv-import
agents:
# Fetch CSV from R2
- name: fetch-csv
operation: storage
config:
type: r2
action: get
key: imports/${input.file_name}
# Parse CSV
- name: parse
operation: code
config:
code: |
const csv = ${fetch-csv.output};
const lines = csv.split('\n');
const headers = lines[0].split(',');
const records = lines.slice(1).map(line => {
const values = line.split(',');
return Object.fromEntries(
headers.map((h, i) => [h, values[i]])
);
});
return { records };
# Validate each record
- name: validate
agent: validator
inputs:
data: ${parse.output.records}
schema:
name:
type: string
required: true
email:
type: string
pattern: ^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$
required: true
age:
type: number
min: 0
max: 120
# Split valid and invalid
- name: split-records
operation: code
config:
code: |
const validation = ${validate.output};
if (validation.valid) {
return {
valid: validation.data,
invalid: []
};
}
// Split records by validation errors
const valid = [];
const invalid = [];
${parse.output.records}.forEach((record, idx) => {
const error = validation.errors.find(e => e.row === idx);
if (error) {
invalid.push({ ...record, error: error.message });
} else {
valid.push(record);
}
});
return { valid, invalid };
# Load valid records
- name: load-valid
condition: ${split-records.output.valid.length > 0}
operation: storage
config:
type: d1
query: INSERT INTO users (name, email, age) VALUES (?, ?, ?)
params: ${split-records.output.valid.map(r => [r.name, r.email, r.age])}
# Store invalid records
- name: store-invalid
condition: ${split-records.output.invalid.length > 0}
operation: storage
config:
type: r2
action: put
key: imports/invalid/${input.file_name}
value: ${JSON.stringify(split-records.output.invalid)}
# Notify completion
- name: notify
operation: email
config:
to: ${input.uploader_email}
subject: "CSV Import Complete - ${input.file_name}"
body: |
Import Results:
- Valid records: ${split-records.output.valid.length}
- Invalid records: ${split-records.output.invalid.length}
- Total processed: ${parse.output.records.length}
${split-records.output.invalid.length > 0 ?
'Invalid records saved to: imports/invalid/' + input.file_name : ''}
Incremental Sync
Copy
ensemble: incremental-sync
state:
schema:
last_sync_id: number
last_sync_time: string
agents:
# Get last sync point
- name: get-checkpoint
operation: storage
state:
use: [last_sync_id, last_sync_time]
config:
type: kv
action: get
key: sync_checkpoint
# Extract new/updated records
- name: extract
operation: storage
config:
type: d1
query: |
SELECT *
FROM source_table
WHERE id > ? OR updated_at > ?
ORDER BY id
LIMIT 10000
params:
- ${state.last_sync_id || 0}
- ${state.last_sync_time || '1970-01-01'}
# No new data? Skip rest
- name: check-empty
condition: ${extract.output.length === 0}
operation: code
config:
code: return { message: 'No new data' };
# Transform
- name: transform
condition: ${extract.output.length > 0}
agent: transformer
inputs:
data: ${extract.output}
template: ${component.transform-template@v1.0.0}
# Load
- name: load
condition: ${transform.executed}
operation: storage
config:
type: d1
query: INSERT OR REPLACE INTO target_table ...
params: ${transform.output}
# Update checkpoint
- name: update-checkpoint
condition: ${load.success}
operation: storage
state:
set:
last_sync_id: ${extract.output[extract.output.length - 1].id}
last_sync_time: ${new Date().toISOString()}
config:
type: kv
action: put
key: sync_checkpoint
value:
last_sync_id: ${extract.output[extract.output.length - 1].id}
last_sync_time: ${new Date().toISOString()}
Real-time Streaming ETL
Copy
ensemble: streaming-etl
agents:
# Receive event
- name: validate-event
agent: validator
inputs:
data: ${input.event}
schema: ${component.event-schema@v1.0.0}
# Enrich with context
- name: enrich
operation: storage
config:
type: kv
action: get
key: user-${input.event.user_id}
# Transform
- name: transform
agent: transformer
inputs:
data:
event: ${validate-event.output.data}
user: ${enrich.output}
template:
event_id: ${data.event.id}
event_type: ${data.event.type}
user_id: ${data.event.user_id}
user_segment: ${data.user.segment}
properties: ${data.event.properties}
timestamp: ${Date.now()}
# Write to multiple destinations (parallel)
- name: write-analytics-db
operation: storage
config:
type: d1
query: INSERT INTO events ...
params: ${transform.output}
- name: write-data-lake
operation: storage
config:
type: r2
action: put
key: events/${new Date().toISOString().split('T')[0]}/${transform.output.event_id}.json
value: ${JSON.stringify(transform.output)}
- name: send-to-warehouse
operation: http
config:
url: https://warehouse.example.com/api/ingest
body: ${transform.output}
Best Practices
1. Batch ProcessingCopy
# Process in chunks
agents:
- name: extract
operation: storage
config:
query: SELECT * FROM source LIMIT 1000 OFFSET ${input.offset}
Copy
# Always handle failures
agents:
- name: transform
agent: transformer
- name: log-error
condition: ${transform.failed}
operation: storage
config:
type: d1
query: INSERT INTO etl_errors ...
Copy
# Use INSERT OR REPLACE
config:
query: INSERT OR REPLACE INTO target ...
Copy
# Track progress
state:
schema:
last_processed_id: number
Copy
# Log metrics
agents:
- name: log-metrics
operation: storage
config:
query: |
INSERT INTO etl_metrics (
pipeline, records_processed, duration, timestamp
) VALUES (?, ?, ?, ?)

