Event-Driven Workflow Playbook
Respond to events: webhooks, database changes, cron schedules, and user actions in real-time. Event-driven workflows react to triggers automatically, enabling asynchronous processing, real-time updates, and scheduled tasks without manual intervention.Webhook Handler
Process incoming webhooks from external services:Copy
ensemble: webhook-handler
description: Validate and process incoming webhooks
agents:
# Validate webhook signature
- name: validate-signature
operation: code
config:
code: |
const crypto = require('crypto');
const signature = ${input.headers['x-webhook-signature']};
const payload = JSON.stringify(${input.body});
const expected = crypto
.createHmac('sha256', ${env.WEBHOOK_SECRET})
.update(payload)
.digest('hex');
return { valid: signature === expected };
# Validate payload schema
- name: validate-schema
condition: ${validate-signature.output.valid}
agent: validator
input:
data: ${input.body}
schema: ${component.webhook-schema@v1.0.0}
# Process event based on type
- name: route-event
condition: ${validate-schema.output.valid}
operation: code
config:
code: |
const eventType = ${input.body.type};
return { eventType, shouldProcess: true };
# Handle different event types
- name: handle-order-created
condition: ${route-event.output.eventType === 'order.created'}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: process-order
input: ${input.body.data}
- name: handle-payment-success
condition: ${route-event.output.eventType === 'payment.succeeded'}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: fulfill-order
input: ${input.body.data}
- name: handle-user-signup
condition: ${route-event.output.eventType === 'user.created'}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: onboard-user
input: ${input.body.data}
# Log webhook receipt
- name: log-webhook
operation: storage
config:
type: d1
query: |
INSERT INTO webhook_logs (
event_type, valid, processed, timestamp
) VALUES (?, ?, ?, ?)
params:
- ${route-event.output.eventType}
- ${validate-schema.output.valid}
- ${route-event.output.shouldProcess}
- ${Date.now()}
output:
success: ${validate-signature.output.valid && validate-schema.output.valid}
eventType: ${route-event.output.eventType}
processed: ${route-event.output.shouldProcess}
Scheduled Jobs
Execute ensembles on a schedule using Cloudflare Cron Triggers:Daily Report Generation
Copy
ensemble: daily-report
description: Generate and send daily report
agents:
# Query yesterday's metrics
- name: get-metrics
operation: storage
config:
type: d1
query: |
SELECT
COUNT(*) as orders,
SUM(total) as revenue,
AVG(total) as avg_order_value
FROM orders
WHERE created_at >= strftime('%s', 'now', '-1 day') * 1000
# Generate report with AI
- name: generate-report
operation: think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
prompt: |
Generate a professional daily business report based on these metrics:
Orders: ${get-metrics.output[0].orders}
Revenue: $${get-metrics.output[0].revenue}
Average Order Value: $${get-metrics.output[0].avg_order_value}
Include trends, insights, and recommendations.
# Send to team
- name: send-email
operation: email
config:
to: [team@example.com, exec@example.com]
subject: "Daily Report - ${new Date().toDateString()}"
html: |
<h1>Daily Business Report</h1>
${generate-report.output}
output:
sent: ${send-email.success}
metrics: ${get-metrics.output[0]}
src/index.ts:
Copy
export default {
async scheduled(event: ScheduledEvent, env: Env) {
const conductor = new Conductor({ env });
switch (event.cron) {
case '0 8 * * *': // 8 AM daily
await conductor.execute('daily-report', {});
break;
case '0 */4 * * *': // Every 4 hours
await conductor.execute('check-inventory', {});
break;
case '0 0 * * 0': // Weekly on Sunday
await conductor.execute('weekly-summary', {});
break;
}
}
};
wrangler.toml:
Copy
[triggers]
crons = [
"0 8 * * *", # Daily at 8 AM
"0 */4 * * *", # Every 4 hours
"0 0 * * 0" # Weekly on Sunday
]
Database Triggers
React to database changes:On User Signup
Copy
ensemble: on-user-signup
description: Triggered when a new user signs up
agents:
# Send welcome email
- name: send-welcome-email
operation: email
config:
to: ${input.user.email}
subject: "Welcome to Our Platform!"
template: welcome-email
data:
name: ${input.user.name}
verificationLink: https://app.example.com/verify/${input.user.id}
# Create user profile
- name: create-profile
operation: storage
config:
type: d1
query: |
INSERT INTO profiles (
user_id, onboarding_status, created_at
) VALUES (?, 'pending', ?)
params:
- ${input.user.id}
- ${Date.now()}
# Add to email marketing list
- name: add-to-marketing
operation: http
config:
url: https://api.mailchimp.com/3.0/lists/${env.MAILCHIMP_LIST_ID}/members
method: POST
headers:
Authorization: Bearer ${env.MAILCHIMP_API_KEY}
body:
email_address: ${input.user.email}
status: subscribed
merge_fields:
FNAME: ${input.user.name}
# Track signup event
- name: track-signup
operation: http
config:
url: https://api.segment.com/v1/track
method: POST
headers:
Authorization: Basic ${env.SEGMENT_WRITE_KEY}
body:
userId: ${input.user.id}
event: User Signed Up
properties:
source: ${input.source}
timestamp: ${Date.now()}
output:
emailSent: ${send-welcome-email.success}
profileCreated: ${create-profile.success}
Copy
export default {
async fetch(request: Request, env: Env): Promise<Response> {
if (request.method === 'POST' && new URL(request.url).pathname === '/signup') {
const { email, name } = await request.json();
// Create user in database
const user = await createUser(env, { email, name });
// Trigger on-user-signup ensemble
const conductor = new Conductor({ env });
await conductor.execute('on-user-signup', {
user,
source: 'web'
});
return Response.json({ success: true, userId: user.id });
}
}
};
E-Commerce Order Processing
Complete event-driven order workflow with error handling:Copy
ensemble: process-order
description: Complete order processing with validation, payment, and fulfillment
state:
schema:
order: object
inventoryReserved: boolean
paymentProcessed: boolean
fulfillmentCreated: boolean
agents:
# Validate order
- name: validate-order
operation: code
config:
code: |
const { items, customerId, shippingAddress } = ${input};
const isValid = items.length > 0 &&
customerId &&
shippingAddress.zip &&
shippingAddress.country;
return { valid: isValid };
state:
set: [order]
# Check inventory availability in parallel
- parallel:
- name: check-inventory
operation: storage
loop:
items: ${input.items}
config:
type: d1
query: |
SELECT quantity FROM inventory
WHERE product_id = ? AND quantity >= ?
params:
- ${loop.item.productId}
- ${loop.item.quantity}
# Reserve inventory
- name: reserve-inventory
condition: ${check-inventory.outputs.every(r => r.length > 0)}
operation: storage
loop:
items: ${input.items}
config:
type: d1
query: |
UPDATE inventory
SET quantity = quantity - ?,
reserved = reserved + ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.quantity}
- ${loop.item.productId}
state:
set: [inventoryReserved]
# Process payment
- name: process-payment
condition: ${state.inventoryReserved}
operation: http
config:
url: https://api.stripe.com/v1/payment_intents
method: POST
headers:
Authorization: Bearer ${env.STRIPE_SECRET_KEY}
body:
amount: ${input.total}
currency: usd
customer: ${input.customerId}
payment_method: ${input.paymentMethodId}
confirm: true
state:
set: [paymentProcessed]
# Handle payment failure - release inventory
- name: release-inventory-on-failure
condition: ${!process-payment.success && state.inventoryReserved}
operation: storage
loop:
items: ${input.items}
config:
type: d1
query: |
UPDATE inventory
SET quantity = quantity + ?,
reserved = reserved - ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.quantity}
- ${loop.item.productId}
# Create order record
- name: create-order
condition: ${state.paymentProcessed}
operation: storage
config:
type: d1
query: |
INSERT INTO orders (
customer_id, total, status, payment_intent_id, created_at
) VALUES (?, ?, 'processing', ?, ?)
RETURNING id
params:
- ${input.customerId}
- ${input.total}
- ${process-payment.output.id}
- ${Date.now()}
# Create order items
- name: create-order-items
condition: ${create-order.success}
operation: storage
loop:
items: ${input.items}
config:
type: d1
query: |
INSERT INTO order_items (
order_id, product_id, quantity, price
) VALUES (?, ?, ?, ?)
params:
- ${create-order.output[0].id}
- ${loop.item.productId}
- ${loop.item.quantity}
- ${loop.item.price}
# Finalize inventory - remove from reserved
- name: finalize-inventory
condition: ${create-order.success}
operation: storage
loop:
items: ${input.items}
config:
type: d1
query: |
UPDATE inventory
SET reserved = reserved - ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.productId}
# Create fulfillment request
- name: create-fulfillment
condition: ${create-order.success}
operation: http
config:
url: ${env.FULFILLMENT_API}/orders
method: POST
headers:
Authorization: Bearer ${env.FULFILLMENT_KEY}
body:
orderId: ${create-order.output[0].id}
items: ${input.items}
shippingAddress: ${input.shippingAddress}
priority: standard
state:
set: [fulfillmentCreated]
# Send notifications in parallel
- parallel:
- name: send-confirmation-email
condition: ${create-order.success}
operation: email
config:
to: ${input.customerEmail}
subject: Order Confirmation ${create-order.output[0].id}
template: order-confirmation
data:
orderNumber: ${create-order.output[0].id}
items: ${input.items}
total: ${input.total}
- name: send-sms
condition: ${create-order.success}
operation: sms
config:
to: ${input.customerPhone}
message: Your order ${create-order.output[0].id} is confirmed!
- name: track-analytics
condition: ${create-order.success}
operation: http
config:
url: ${env.ANALYTICS_URL}/events
method: POST
body:
event: order_placed
userId: ${input.customerId}
properties:
orderId: ${create-order.output[0].id}
total: ${input.total}
itemCount: ${input.items.length}
output:
success: ${create-order.success}
orderId: ${create-order.output[0]?.id}
paymentIntentId: ${process-payment.output?.id}
fulfillmentId: ${create-fulfillment.output?.id}
error: ${!create-order.success ? 'Order processing failed' : null}
Inventory Monitoring
Scheduled check for low stock with alerts:Copy
ensemble: check-inventory
description: Monitor inventory and alert on low stock
agents:
# Check low stock items
- name: check-low-stock
operation: storage
config:
type: d1
query: |
SELECT product_id, product_name, quantity, reorder_level
FROM inventory
WHERE quantity <= reorder_level
# Only continue if there are low stock items
- name: has-low-stock
condition: ${check-low-stock.output.length > 0}
operation: code
config:
code: |
return { count: ${check-low-stock.output.length} };
# Get product details
- name: enrich-products
condition: ${has-low-stock.executed}
operation: storage
loop:
items: ${check-low-stock.output}
config:
type: d1
query: |
SELECT * FROM products WHERE id = ?
params: [${loop.item.product_id}]
# Send Slack alert
- name: alert-slack
condition: ${has-low-stock.executed}
operation: http
config:
url: ${env.SLACK_WEBHOOK}
method: POST
body:
text: ⚠️ Low Stock Alert
blocks:
- type: section
text:
type: mrkdwn
text: |
*Low Stock Items (${has-low-stock.output.count})*
${check-low-stock.output.map(item =>
`• ${item.product_name}: ${item.quantity} units (reorder at ${item.reorder_level})`
).join('\n')}
# Send email to purchasing team
- name: alert-email
condition: ${has-low-stock.executed}
operation: email
config:
to: [purchasing@example.com]
subject: Low Stock Alert - ${has-low-stock.output.count} Items
html: |
<h2>Low Stock Alert</h2>
<table>
<tr><th>Product</th><th>Current</th><th>Reorder Level</th></tr>
${check-low-stock.output.map(item => `
<tr>
<td>${item.product_name}</td>
<td>${item.quantity}</td>
<td>${item.reorder_level}</td>
</tr>
`).join('')}
</table>
output:
lowStockItems: ${check-low-stock.output}
alertsSent: ${alert-slack.success && alert-email.success}
Order Cancellation Workflow
Handle order cancellations with refunds and inventory restoration:Copy
ensemble: cancel-order
description: Cancel order with refund and inventory restoration
agents:
# Get order details
- name: get-order
operation: storage
config:
type: d1
query: |
SELECT * FROM orders
WHERE id = ? AND status != 'cancelled'
params: [${input.orderId}]
# Check if cancellable
- name: check-cancellable
condition: ${get-order.output.length > 0}
operation: code
config:
code: |
const order = ${get-order.output[0]};
const canCancel = ['pending', 'processing'].includes(order.status);
return { canCancel, reason: canCancel ? null : 'Order already shipped' };
# Refund payment
- name: refund-payment
condition: ${check-cancellable.output.canCancel}
operation: http
config:
url: https://api.stripe.com/v1/refunds
method: POST
headers:
Authorization: Bearer ${env.STRIPE_SECRET_KEY}
body:
payment_intent: ${get-order.output[0].payment_intent_id}
# Get order items
- name: get-order-items
condition: ${refund-payment.success}
operation: storage
config:
type: d1
query: |
SELECT product_id, quantity FROM order_items
WHERE order_id = ?
params: [${input.orderId}]
# Restore inventory
- name: restore-inventory
condition: ${get-order-items.success}
operation: storage
loop:
items: ${get-order-items.output}
config:
type: d1
query: |
UPDATE inventory
SET quantity = quantity + ?
WHERE product_id = ?
params:
- ${loop.item.quantity}
- ${loop.item.product_id}
# Update order status
- name: update-order-status
condition: ${refund-payment.success}
operation: storage
config:
type: d1
query: |
UPDATE orders
SET status = 'cancelled', updated_at = ?
WHERE id = ?
params:
- ${Date.now()}
- ${input.orderId}
# Notify customer
- name: send-cancellation-email
condition: ${update-order-status.success}
operation: email
config:
to: ${get-order.output[0].customer_email}
subject: Order Cancellation Confirmed
template: order-cancelled
data:
orderNumber: ${input.orderId}
refundAmount: ${get-order.output[0].total}
output:
success: ${update-order-status.success}
refunded: ${refund-payment.success}
inventoryRestored: ${restore-inventory.outputs?.every(r => r.success)}
Real-Time User Activity Tracking
Track user actions and trigger personalized responses:Copy
ensemble: track-user-activity
description: Track user actions and respond in real-time
agents:
# Log activity
- name: log-activity
operation: storage
config:
type: d1
query: |
INSERT INTO user_activities (
user_id, action, metadata, timestamp
) VALUES (?, ?, ?, ?)
params:
- ${input.userId}
- ${input.action}
- ${JSON.stringify(input.metadata)}
- ${Date.now()}
# Check for patterns
- name: check-patterns
operation: storage
config:
type: d1
query: |
SELECT COUNT(*) as count, action
FROM user_activities
WHERE user_id = ? AND timestamp > ?
GROUP BY action
params:
- ${input.userId}
- ${Date.now() - 24 * 60 * 60 * 1000} # Last 24 hours
# Trigger based on patterns
- name: abandoned-cart-reminder
condition: ${input.action === 'cart_add' && check-patterns.output.some(p => p.action === 'cart_add' && p.count >= 3)}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: send-cart-reminder
input:
userId: ${input.userId}
delay: 3600000 # 1 hour delay
- name: engagement-reward
condition: ${input.action === 'product_view' && check-patterns.output.some(p => p.action === 'product_view' && p.count >= 10)}
operation: storage
config:
type: queue
action: send
queue: TASK_QUEUE
body:
ensemble: send-engagement-reward
input:
userId: ${input.userId}
output:
logged: ${log-activity.success}
patterns: ${check-patterns.output}
Queue-Based Processing
Process tasks asynchronously using Cloudflare Queues:Copy
// src/index.ts - Queue consumer
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
for (const message of batch.messages) {
const { ensemble, input, delay } = message.body;
try {
// Execute the specified ensemble
const result = await conductor.execute(ensemble, input);
if (result.success) {
message.ack();
} else {
message.retry();
}
} catch (error) {
console.error(`Queue processing error:`, error);
message.retry();
}
}
}
};
Best Practices
1. Use Queues for Async Work
Copy
# Don't block webhook responses
agents:
- name: acknowledge-webhook
operation: code
config:
code: return { received: true };
# Queue heavy processing
- name: queue-processing
operation: storage
config:
type: queue
action: send
body:
ensemble: process-webhook-data
input: ${input.webhook}
2. Implement Idempotency
Copy
agents:
# Check if already processed
- name: check-processed
operation: storage
config:
type: d1
query: |
SELECT COUNT(*) as count FROM processed_events
WHERE event_id = ?
params: [${input.eventId}]
# Only process if not seen before
- name: process-event
condition: ${check-processed.output[0].count === 0}
operation: code
config:
code: # Process event
# Mark as processed
- name: mark-processed
condition: ${process-event.success}
operation: storage
config:
type: d1
query: |
INSERT INTO processed_events (event_id, timestamp)
VALUES (?, ?)
params:
- ${input.eventId}
- ${Date.now()}
3. Add Retry Logic
Copy
agents:
- name: external-api-call
operation: http
config:
url: https://api.example.com/endpoint
retry:
max_attempts: 3
backoff: exponential
max_delay: 30000
4. Monitor Event Processing
Copy
// Log all events
agents:
- name: log-event
operation: storage
config:
type: d1
query: |
INSERT INTO event_logs (
event_type, source, status, latency, timestamp
) VALUES (?, ?, ?, ?, ?)

