Webhook Handler
Process incoming webhooks from external services:Copy
ensemble: webhook-handler
description: Validate and process incoming webhooks
agents:
# Validate webhook signature
- name: validate-signature
operation: code
config:
script: scripts/validate-webhook-signature
input:
signature: ${input.headers['x-webhook-signature']}
body: ${input.body}
secret: ${env.WEBHOOK_SECRET}
# Validate payload schema
- name: validate-schema
condition: ${validate-signature.output.valid}
operation: validate
input:
data: ${input.body}
schema: ${[email protected]}
# Process event based on type
- name: route-event
condition: ${validate-schema.output.valid}
operation: code
config:
script: scripts/route-webhook-event
input:
eventType: ${input.body.type}
# Handle different event types
- name: handle-order-created
condition: ${route-event.output.eventType === 'order.created'}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: process-order
input: ${input.body.data}
- name: handle-payment-success
condition: ${route-event.output.eventType === 'payment.succeeded'}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: fulfill-order
input: ${input.body.data}
- name: handle-user-signup
condition: ${route-event.output.eventType === 'user.created'}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: onboard-user
input: ${input.body.data}
# Log webhook receipt
- name: log-webhook
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO webhook_logs (
event_type, valid, processed, timestamp
) VALUES (?, ?, ?, ?)
params:
- ${route-event.output.eventType}
- ${validate-schema.output.valid}
- ${route-event.output.shouldProcess}
- ${Date.now()}
output:
success: ${validate-signature.output.valid && validate-schema.output.valid}
eventType: ${route-event.output.eventType}
processed: ${route-event.output.shouldProcess}
Copy
// scripts/validate-webhook-signature.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
import * as crypto from 'crypto'
export default function validateWebhookSignature(context: AgentExecutionContext) {
const { signature, body, secret } = context.input
const payload = JSON.stringify(body)
const expected = crypto
.createHmac('sha256', secret)
.update(payload)
.digest('hex')
return { valid: signature === expected }
}
Copy
// scripts/route-webhook-event.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function routeWebhookEvent(context: AgentExecutionContext) {
const { eventType } = context.input
return { eventType, shouldProcess: true }
}
Scheduled Jobs
Execute ensembles on a schedule using cron triggers:Daily Report Generation
Copy
name: daily-report
description: Generate and send daily report
trigger:
- type: cron
cron: "0 8 * * *" # Daily at 8 AM UTC
timezone: "America/New_York"
enabled: true
flow:
- agent: get-metrics
- agent: generate-report
- agent: send-email
agents:
# Query yesterday's metrics
- name: get-metrics
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT
COUNT(*) as orders,
SUM(total) as revenue,
AVG(total) as avg_order_value
FROM orders
WHERE created_at >= strftime('%s', 'now', '-1 day') * 1000
# Generate report with AI
- name: generate-report
operation: think
config:
provider: anthropic
model: claude-sonnet-4
prompt: |
Generate a professional daily business report based on these metrics:
Orders: ${get-metrics.output[0].orders}
Revenue: $${get-metrics.output[0].revenue}
Average Order Value: $${get-metrics.output[0].avg_order_value}
Include trends, insights, and recommendations.
# Send to team
- name: send-email
operation: email
config:
to: [[email protected], [email protected]]
subject: "Daily Report - ${new Date().toDateString()}"
html: |
<h1>Daily Business Report</h1>
${generate-report.output}
outputs:
sent: ${send-email.success}
metrics: ${get-metrics.output[0]}
Multiple Schedules
Run different tasks on different schedules:Copy
name: maintenance-tasks
trigger:
# Every 4 hours - check inventory
- type: cron
cron: "0 */4 * * *"
timezone: "UTC"
input:
task: "inventory-check"
# Weekly on Sunday - generate summary
- type: cron
cron: "0 0 * * 0"
timezone: "America/New_York"
input:
task: "weekly-summary"
flow:
- agent: route-task
agents:
- name: route-task
operation: code
config:
script: scripts/route-maintenance-task
input:
task: ${input.task}
outputs:
completed: ${route-task.output.action}
Copy
// scripts/route-maintenance-task.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function routeMaintenanceTask(context: AgentExecutionContext) {
const { task } = context.input
if (task === 'inventory-check') {
// Check inventory levels
return { action: 'check_inventory' }
} else if (task === 'weekly-summary') {
// Generate weekly summary
return { action: 'generate_summary' }
}
return { action: 'unknown' }
}
wrangler.toml:
Copy
[triggers]
crons = [
"0 8 * * *", # Daily at 8 AM
"0 */4 * * *", # Every 4 hours
"0 0 * * 0" # Weekly on Sunday
]
wrangler.toml crons must match your ensemble trigger configurations.
Database Triggers
React to database changes:On User Signup
Copy
ensemble: on-user-signup
description: Triggered when a new user signs up
agents:
# Send welcome email
- name: send-welcome-email
operation: email
config:
to: ${input.user.email}
subject: "Welcome to Our Platform!"
template: welcome-email
data:
name: ${input.user.name}
verificationLink: https://app.example.com/verify/${input.user.id}
# Create user profile
- name: create-profile
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO profiles (
user_id, onboarding_status, created_at
) VALUES (?, 'pending', ?)
params:
- ${input.user.id}
- ${Date.now()}
# Add to email marketing list
- name: add-to-marketing
operation: http
config:
url: https://api.mailchimp.com/3.0/lists/${env.MAILCHIMP_LIST_ID}/members
method: POST
headers:
Authorization: Bearer ${env.MAILCHIMP_API_KEY}
body:
email_address: ${input.user.email}
status: subscribed
merge_fields:
FNAME: ${input.user.name}
# Track signup event
- name: track-signup
operation: http
config:
url: https://api.segment.com/v1/track
method: POST
headers:
Authorization: Basic ${env.SEGMENT_WRITE_KEY}
body:
userId: ${input.user.id}
event: User Signed Up
properties:
source: ${input.source}
timestamp: ${Date.now()}
output:
emailSent: ${send-welcome-email.success}
profileCreated: ${create-profile.success}
Copy
export default {
async fetch(request: Request, env: Env): Promise<Response> {
if (request.method === 'POST' && new URL(request.url).pathname === '/signup') {
const { email, name } = await request.json();
// Create user in database
const user = await createUser(env, { email, name });
// Trigger on-user-signup ensemble
const conductor = new Conductor({ env });
await conductor.execute('on-user-signup', {
user,
source: 'web'
});
return Response.json({ success: true, userId: user.id });
}
}
};
E-Commerce Order Processing
Complete event-driven order workflow with error handling:Copy
ensemble: process-order
description: Complete order processing with validation, payment, and fulfillment
state:
schema:
order: object
inventoryReserved: boolean
paymentProcessed: boolean
fulfillmentCreated: boolean
agents:
# Validate order
- name: validate-order
operation: code
config:
script: scripts/validate-order
input:
items: ${input.items}
customerId: ${input.customerId}
shippingAddress: ${input.shippingAddress}
state:
set: [order]
# Check inventory availability in parallel
- parallel:
- name: check-inventory
operation: data
loop:
items: ${input.items}
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT quantity FROM inventory
WHERE product_id = ? AND quantity >= ?
params:
- ${loop.item.productId}
- ${loop.item.quantity}
# Reserve inventory
- name: reserve-inventory
condition: ${check-inventory.outputs.every(r => r.length > 0)}
operation: data
loop:
items: ${input.items}
config:
backend: d1
binding: DB
operation: execute
sql: |
UPDATE inventory
SET quantity = quantity - ?,
reserved = reserved + ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.quantity}
- ${loop.item.productId}
state:
set: [inventoryReserved]
# Process payment
- name: process-payment
condition: ${state.inventoryReserved}
operation: http
config:
url: https://api.stripe.com/v1/payment_intents
method: POST
headers:
Authorization: Bearer ${env.STRIPE_SECRET_KEY}
body:
amount: ${input.total}
currency: usd
customer: ${input.customerId}
payment_method: ${input.paymentMethodId}
confirm: true
state:
set: [paymentProcessed]
# Handle payment failure - release inventory
- name: release-inventory-on-failure
condition: ${!process-payment.success && state.inventoryReserved}
operation: data
loop:
items: ${input.items}
config:
backend: d1
binding: DB
operation: execute
sql: |
UPDATE inventory
SET quantity = quantity + ?,
reserved = reserved - ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.quantity}
- ${loop.item.productId}
# Create order record
- name: create-order
condition: ${state.paymentProcessed}
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
INSERT INTO orders (
customer_id, total, status, payment_intent_id, created_at
) VALUES (?, ?, 'processing', ?, ?)
RETURNING id
params:
- ${input.customerId}
- ${input.total}
- ${process-payment.output.id}
- ${Date.now()}
# Create order items
- name: create-order-items
condition: ${create-order.success}
operation: data
loop:
items: ${input.items}
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO order_items (
order_id, product_id, quantity, price
) VALUES (?, ?, ?, ?)
params:
- ${create-order.output[0].id}
- ${loop.item.productId}
- ${loop.item.quantity}
- ${loop.item.price}
# Finalize inventory - remove from reserved
- name: finalize-inventory
condition: ${create-order.success}
operation: data
loop:
items: ${input.items}
config:
backend: d1
binding: DB
operation: execute
sql: |
UPDATE inventory
SET reserved = reserved - ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.productId}
# Create fulfillment request
- name: create-fulfillment
condition: ${create-order.success}
operation: http
config:
url: ${env.FULFILLMENT_API}/orders
method: POST
headers:
Authorization: Bearer ${env.FULFILLMENT_KEY}
body:
orderId: ${create-order.output[0].id}
items: ${input.items}
shippingAddress: ${input.shippingAddress}
priority: standard
state:
set: [fulfillmentCreated]
# Send notifications in parallel
- parallel:
- name: send-confirmation-email
condition: ${create-order.success}
operation: email
config:
to: ${input.customerEmail}
subject: Order Confirmation ${create-order.output[0].id}
template: order-confirmation
data:
orderNumber: ${create-order.output[0].id}
items: ${input.items}
total: ${input.total}
- name: send-sms
condition: ${create-order.success}
operation: sms
config:
to: ${input.customerPhone}
message: Your order ${create-order.output[0].id} is confirmed!
- name: track-analytics
condition: ${create-order.success}
operation: http
config:
url: ${env.ANALYTICS_URL}/events
method: POST
body:
event: order_placed
userId: ${input.customerId}
properties:
orderId: ${create-order.output[0].id}
total: ${input.total}
itemCount: ${input.items.length}
output:
success: ${create-order.success}
orderId: ${create-order.output[0]?.id}
paymentIntentId: ${process-payment.output?.id}
fulfillmentId: ${create-fulfillment.output?.id}
error: ${!create-order.success ? 'Order processing failed' : null}
Inventory Monitoring
Scheduled check for low stock with alerts:Copy
ensemble: check-inventory
description: Monitor inventory and alert on low stock
agents:
# Check low stock items
- name: check-low-stock
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT product_id, product_name, quantity, reorder_level
FROM inventory
WHERE quantity <= reorder_level
# Only continue if there are low stock items
- name: has-low-stock
condition: ${check-low-stock.output.length > 0}
operation: code
config:
script: scripts/return-item-count
input:
count: ${check-low-stock.output.length}
# Get product details
- name: enrich-products
condition: ${has-low-stock.executed}
operation: data
loop:
items: ${check-low-stock.output}
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT * FROM products WHERE id = ?
params: [${loop.item.product_id}]
# Send Slack alert
- name: alert-slack
condition: ${has-low-stock.executed}
operation: http
config:
url: ${env.SLACK_WEBHOOK}
method: POST
body:
text: Low Stock Alert
blocks:
- type: section
text:
type: mrkdwn
text: |
*Low Stock Items (${has-low-stock.output.count})*
${check-low-stock.output.map(item =>
` ${item.product_name}: ${item.quantity} units (reorder at ${item.reorder_level})`
).join('\n')}
# Send email to purchasing team
- name: alert-email
condition: ${has-low-stock.executed}
operation: email
config:
to: [[email protected]]
subject: Low Stock Alert - ${has-low-stock.output.count} Items
html: |
<h2>Low Stock Alert</h2>
<table>
<tr><th>Product</th><th>Current</th><th>Reorder Level</th></tr>
${check-low-stock.output.map(item => `
<tr>
<td>${item.product_name}</td>
<td>${item.quantity}</td>
<td>${item.reorder_level}</td>
</tr>
`).join('')}
</table>
output:
lowStockItems: ${check-low-stock.output}
alertsSent: ${alert-slack.success && alert-email.success}
Order Cancellation Workflow
Handle order cancellations with refunds and inventory restoration:Copy
ensemble: cancel-order
description: Cancel order with refund and inventory restoration
agents:
# Get order details
- name: get-order
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT * FROM orders
WHERE id = ? AND status != 'cancelled'
params: [${input.orderId}]
# Check if cancellable
- name: check-cancellable
condition: ${get-order.output.length > 0}
operation: code
config:
script: scripts/check-order-cancellable
input:
orderStatus: ${get-order.output[0].status}
# Refund payment
- name: refund-payment
condition: ${check-cancellable.output.canCancel}
operation: http
config:
url: https://api.stripe.com/v1/refunds
method: POST
headers:
Authorization: Bearer ${env.STRIPE_SECRET_KEY}
body:
payment_intent: ${get-order.output[0].payment_intent_id}
# Get order items
- name: get-order-items
condition: ${refund-payment.success}
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT product_id, quantity FROM order_items
WHERE order_id = ?
params: [${input.orderId}]
# Restore inventory
- name: restore-inventory
condition: ${get-order-items.success}
operation: data
loop:
items: ${get-order-items.output}
config:
backend: d1
binding: DB
operation: execute
sql: |
UPDATE inventory
SET quantity = quantity + ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.product_id}
# Update order status
- name: update-order-status
condition: ${refund-payment.success}
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
UPDATE orders
SET status = 'cancelled', updated_at = ?
WHERE id = ?
params:
- ${Date.now()}
- ${input.orderId}
# Notify customer
- name: send-cancellation-email
condition: ${update-order-status.success}
operation: email
config:
to: ${get-order.output[0].customer_email}
subject: Order Cancellation Confirmed
template: order-cancelled
data:
orderNumber: ${input.orderId}
refundAmount: ${get-order.output[0].total}
output:
success: ${update-order-status.success}
refunded: ${refund-payment.success}
inventoryRestored: ${restore-inventory.outputs?.every(r => r.success)}
Copy
// scripts/validate-order.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function validateOrder(context: AgentExecutionContext) {
const { items, customerId, shippingAddress } = context.input
const isValid = items.length > 0 &&
customerId &&
shippingAddress.zip &&
shippingAddress.country
return { valid: isValid }
}
Copy
// scripts/return-item-count.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function returnItemCount(context: AgentExecutionContext) {
const { count } = context.input
return { count }
}
Copy
// scripts/check-order-cancellable.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function checkOrderCancellable(context: AgentExecutionContext) {
const { orderStatus } = context.input
const canCancel = ['pending', 'processing'].includes(orderStatus)
return { canCancel, reason: canCancel ? null : 'Order already shipped' }
}
Copy
// scripts/acknowledge-webhook.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function acknowledgeWebhook(context: AgentExecutionContext) {
return { received: true }
}
Real-Time User Activity Tracking
Track user actions and trigger personalized responses:Copy
ensemble: track-user-activity
description: Track user actions and respond in real-time
agents:
# Log activity
- name: log-activity
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO user_activities (
user_id, action, metadata, timestamp
) VALUES (?, ?, ?, ?)
params:
- ${input.userId}
- ${input.action}
- ${JSON.stringify(input.metadata)}
- ${Date.now()}
# Check for patterns
- name: check-patterns
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT COUNT(*) as count, action
FROM user_activities
WHERE user_id = ? AND timestamp > ?
GROUP BY action
params:
- ${input.userId}
- ${Date.now() - 24 * 60 * 60 * 1000} # Last 24 hours
# Trigger based on patterns
- name: abandoned-cart-reminder
condition: ${input.action === 'cart_add' && check-patterns.output.some(p => p.action === 'cart_add' && p.count >= 3)}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: send-cart-reminder
input:
userId: ${input.userId}
delay: 3600000 # 1 hour delay
- name: engagement-reward
condition: ${input.action === 'product_view' && check-patterns.output.some(p => p.action === 'product_view' && p.count >= 10)}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: send-engagement-reward
input:
userId: ${input.userId}
output:
logged: ${log-activity.success}
patterns: ${check-patterns.output}
Queue-Based Processing
Process tasks asynchronously using Cloudflare Queues:Copy
// src/index.ts - Queue consumer
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
for (const message of batch.messages) {
const { ensemble, input, delay } = message.body;
try {
// Execute the specified ensemble
const result = await conductor.execute(ensemble, input);
if (result.success) {
message.ack();
} else {
message.retry();
}
} catch (error) {
console.error(`Queue processing error:`, error);
message.retry();
}
}
}
};
Best Practices
1. Use Queues for Async Work
Copy
# Don't block webhook responses
agents:
- name: acknowledge-webhook
operation: code
config:
script: scripts/acknowledge-webhook
# Queue heavy processing
- name: queue-processing
operation: storage
config:
type: queue
action: send
body:
ensemble: process-webhook-data
input: ${input.webhook}
2. Implement Idempotency
Copy
agents:
# Check if already processed
- name: check-processed
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT COUNT(*) as count FROM processed_events
WHERE event_id = ?
params: [${input.eventId}]
# Only process if not seen before
- name: process-event
condition: ${check-processed.output[0].count === 0}
operation: code
config:
code: # Process event
# Mark as processed
- name: mark-processed
condition: ${process-event.success}
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO processed_events (event_id, timestamp)
VALUES (?, ?)
params:
- ${input.eventId}
- ${Date.now()}
3. Add Retry Logic
Copy
agents:
- name: external-api-call
operation: http
config:
url: https://api.example.com/endpoint
retry:
max_attempts: 3
backoff: exponential
max_delay: 30000
4. Monitor Event Processing
Copy
// Log all events
agents:
- name: log-event
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO event_logs (
event_type, source, status, latency, timestamp
) VALUES (?, ?, ?, ?, ?)

