Skip to main content

Event-Driven Workflow Playbook

Respond to events: webhooks, database changes, cron schedules, and user actions in real-time. Event-driven workflows react to triggers automatically, enabling asynchronous processing, real-time updates, and scheduled tasks without manual intervention.

Webhook Handler

Process incoming webhooks from external services:
ensemble: webhook-handler
description: Validate and process incoming webhooks

agents:
  # Validate webhook signature
  - name: validate-signature
    operation: code
    config:
      code: |
        const crypto = require('crypto');
        const signature = ${input.headers['x-webhook-signature']};
        const payload = JSON.stringify(${input.body});
        const expected = crypto
          .createHmac('sha256', ${env.WEBHOOK_SECRET})
          .update(payload)
          .digest('hex');
        return { valid: signature === expected };

  # Validate payload schema
  - name: validate-schema
    condition: ${validate-signature.output.valid}
    agent: validator
    input:
      data: ${input.body}
      schema: ${component.webhook-schema@v1.0.0}

  # Process event based on type
  - name: route-event
    condition: ${validate-schema.output.valid}
    operation: code
    config:
      code: |
        const eventType = ${input.body.type};
        return { eventType, shouldProcess: true };

  # Handle different event types
  - name: handle-order-created
    condition: ${route-event.output.eventType === 'order.created'}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: process-order
        input: ${input.body.data}

  - name: handle-payment-success
    condition: ${route-event.output.eventType === 'payment.succeeded'}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: fulfill-order
        input: ${input.body.data}

  - name: handle-user-signup
    condition: ${route-event.output.eventType === 'user.created'}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: onboard-user
        input: ${input.body.data}

  # Log webhook receipt
  - name: log-webhook
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO webhook_logs (
          event_type, valid, processed, timestamp
        ) VALUES (?, ?, ?, ?)
      params:
        - ${route-event.output.eventType}
        - ${validate-schema.output.valid}
        - ${route-event.output.shouldProcess}
        - ${Date.now()}

output:
  success: ${validate-signature.output.valid && validate-schema.output.valid}
  eventType: ${route-event.output.eventType}
  processed: ${route-event.output.shouldProcess}

Scheduled Jobs

Execute ensembles on a schedule using Cloudflare Cron Triggers:

Daily Report Generation

ensemble: daily-report
description: Generate and send daily report

agents:
  # Query yesterday's metrics
  - name: get-metrics
    operation: storage
    config:
      type: d1
      query: |
        SELECT
          COUNT(*) as orders,
          SUM(total) as revenue,
          AVG(total) as avg_order_value
        FROM orders
        WHERE created_at >= strftime('%s', 'now', '-1 day') * 1000

  # Generate report with AI
  - name: generate-report
    operation: think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      prompt: |
        Generate a professional daily business report based on these metrics:

        Orders: ${get-metrics.output[0].orders}
        Revenue: $${get-metrics.output[0].revenue}
        Average Order Value: $${get-metrics.output[0].avg_order_value}

        Include trends, insights, and recommendations.

  # Send to team
  - name: send-email
    operation: email
    config:
      to: [team@example.com, exec@example.com]
      subject: "Daily Report - ${new Date().toDateString()}"
      html: |
        <h1>Daily Business Report</h1>
        ${generate-report.output}

output:
  sent: ${send-email.success}
  metrics: ${get-metrics.output[0]}
Configure cron trigger in src/index.ts:
export default {
  async scheduled(event: ScheduledEvent, env: Env) {
    const conductor = new Conductor({ env });

    switch (event.cron) {
      case '0 8 * * *': // 8 AM daily
        await conductor.execute('daily-report', {});
        break;

      case '0 */4 * * *': // Every 4 hours
        await conductor.execute('check-inventory', {});
        break;

      case '0 0 * * 0': // Weekly on Sunday
        await conductor.execute('weekly-summary', {});
        break;
    }
  }
};
Configure in wrangler.toml:
[triggers]
crons = [
  "0 8 * * *",    # Daily at 8 AM
  "0 */4 * * *",  # Every 4 hours
  "0 0 * * 0"     # Weekly on Sunday
]

Database Triggers

React to database changes:

On User Signup

ensemble: on-user-signup
description: Triggered when a new user signs up

agents:
  # Send welcome email
  - name: send-welcome-email
    operation: email
    config:
      to: ${input.user.email}
      subject: "Welcome to Our Platform!"
      template: welcome-email
      data:
        name: ${input.user.name}
        verificationLink: https://app.example.com/verify/${input.user.id}

  # Create user profile
  - name: create-profile
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO profiles (
          user_id, onboarding_status, created_at
        ) VALUES (?, 'pending', ?)
      params:
        - ${input.user.id}
        - ${Date.now()}

  # Add to email marketing list
  - name: add-to-marketing
    operation: http
    config:
      url: https://api.mailchimp.com/3.0/lists/${env.MAILCHIMP_LIST_ID}/members
      method: POST
      headers:
        Authorization: Bearer ${env.MAILCHIMP_API_KEY}
      body:
        email_address: ${input.user.email}
        status: subscribed
        merge_fields:
          FNAME: ${input.user.name}

  # Track signup event
  - name: track-signup
    operation: http
    config:
      url: https://api.segment.com/v1/track
      method: POST
      headers:
        Authorization: Basic ${env.SEGMENT_WRITE_KEY}
      body:
        userId: ${input.user.id}
        event: User Signed Up
        properties:
          source: ${input.source}
          timestamp: ${Date.now()}

output:
  emailSent: ${send-welcome-email.success}
  profileCreated: ${create-profile.success}
Trigger from signup handler:
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    if (request.method === 'POST' && new URL(request.url).pathname === '/signup') {
      const { email, name } = await request.json();

      // Create user in database
      const user = await createUser(env, { email, name });

      // Trigger on-user-signup ensemble
      const conductor = new Conductor({ env });
      await conductor.execute('on-user-signup', {
        user,
        source: 'web'
      });

      return Response.json({ success: true, userId: user.id });
    }
  }
};

E-Commerce Order Processing

Complete event-driven order workflow with error handling:
ensemble: process-order
description: Complete order processing with validation, payment, and fulfillment

state:
  schema:
    order: object
    inventoryReserved: boolean
    paymentProcessed: boolean
    fulfillmentCreated: boolean

agents:
  # Validate order
  - name: validate-order
    operation: code
    config:
      code: |
        const { items, customerId, shippingAddress } = ${input};
        const isValid = items.length > 0 &&
                       customerId &&
                       shippingAddress.zip &&
                       shippingAddress.country;
        return { valid: isValid };
    state:
      set: [order]

  # Check inventory availability in parallel
  - parallel:
      - name: check-inventory
        operation: storage
        loop:
          items: ${input.items}
        config:
          type: d1
          query: |
            SELECT quantity FROM inventory
            WHERE product_id = ? AND quantity >= ?
          params:
            - ${loop.item.productId}
            - ${loop.item.quantity}

  # Reserve inventory
  - name: reserve-inventory
    condition: ${check-inventory.outputs.every(r => r.length > 0)}
    operation: storage
    loop:
      items: ${input.items}
    config:
      type: d1
      query: |
        UPDATE inventory
        SET quantity = quantity - ?,
            reserved = reserved + ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.quantity}
        - ${loop.item.productId}
    state:
      set: [inventoryReserved]

  # Process payment
  - name: process-payment
    condition: ${state.inventoryReserved}
    operation: http
    config:
      url: https://api.stripe.com/v1/payment_intents
      method: POST
      headers:
        Authorization: Bearer ${env.STRIPE_SECRET_KEY}
      body:
        amount: ${input.total}
        currency: usd
        customer: ${input.customerId}
        payment_method: ${input.paymentMethodId}
        confirm: true
    state:
      set: [paymentProcessed]

  # Handle payment failure - release inventory
  - name: release-inventory-on-failure
    condition: ${!process-payment.success && state.inventoryReserved}
    operation: storage
    loop:
      items: ${input.items}
    config:
      type: d1
      query: |
        UPDATE inventory
        SET quantity = quantity + ?,
            reserved = reserved - ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.quantity}
        - ${loop.item.productId}

  # Create order record
  - name: create-order
    condition: ${state.paymentProcessed}
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO orders (
          customer_id, total, status, payment_intent_id, created_at
        ) VALUES (?, ?, 'processing', ?, ?)
        RETURNING id
      params:
        - ${input.customerId}
        - ${input.total}
        - ${process-payment.output.id}
        - ${Date.now()}

  # Create order items
  - name: create-order-items
    condition: ${create-order.success}
    operation: storage
    loop:
      items: ${input.items}
    config:
      type: d1
      query: |
        INSERT INTO order_items (
          order_id, product_id, quantity, price
        ) VALUES (?, ?, ?, ?)
      params:
        - ${create-order.output[0].id}
        - ${loop.item.productId}
        - ${loop.item.quantity}
        - ${loop.item.price}

  # Finalize inventory - remove from reserved
  - name: finalize-inventory
    condition: ${create-order.success}
    operation: storage
    loop:
      items: ${input.items}
    config:
      type: d1
      query: |
        UPDATE inventory
        SET reserved = reserved - ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.productId}

  # Create fulfillment request
  - name: create-fulfillment
    condition: ${create-order.success}
    operation: http
    config:
      url: ${env.FULFILLMENT_API}/orders
      method: POST
      headers:
        Authorization: Bearer ${env.FULFILLMENT_KEY}
      body:
        orderId: ${create-order.output[0].id}
        items: ${input.items}
        shippingAddress: ${input.shippingAddress}
        priority: standard
    state:
      set: [fulfillmentCreated]

  # Send notifications in parallel
  - parallel:
      - name: send-confirmation-email
        condition: ${create-order.success}
        operation: email
        config:
          to: ${input.customerEmail}
          subject: Order Confirmation ${create-order.output[0].id}
          template: order-confirmation
          data:
            orderNumber: ${create-order.output[0].id}
            items: ${input.items}
            total: ${input.total}

      - name: send-sms
        condition: ${create-order.success}
        operation: sms
        config:
          to: ${input.customerPhone}
          message: Your order ${create-order.output[0].id} is confirmed!

      - name: track-analytics
        condition: ${create-order.success}
        operation: http
        config:
          url: ${env.ANALYTICS_URL}/events
          method: POST
          body:
            event: order_placed
            userId: ${input.customerId}
            properties:
              orderId: ${create-order.output[0].id}
              total: ${input.total}
              itemCount: ${input.items.length}

output:
  success: ${create-order.success}
  orderId: ${create-order.output[0]?.id}
  paymentIntentId: ${process-payment.output?.id}
  fulfillmentId: ${create-fulfillment.output?.id}
  error: ${!create-order.success ? 'Order processing failed' : null}

Inventory Monitoring

Scheduled check for low stock with alerts:
ensemble: check-inventory
description: Monitor inventory and alert on low stock

agents:
  # Check low stock items
  - name: check-low-stock
    operation: storage
    config:
      type: d1
      query: |
        SELECT product_id, product_name, quantity, reorder_level
        FROM inventory
        WHERE quantity <= reorder_level

  # Only continue if there are low stock items
  - name: has-low-stock
    condition: ${check-low-stock.output.length > 0}
    operation: code
    config:
      code: |
        return { count: ${check-low-stock.output.length} };

  # Get product details
  - name: enrich-products
    condition: ${has-low-stock.executed}
    operation: storage
    loop:
      items: ${check-low-stock.output}
    config:
      type: d1
      query: |
        SELECT * FROM products WHERE id = ?
      params: [${loop.item.product_id}]

  # Send Slack alert
  - name: alert-slack
    condition: ${has-low-stock.executed}
    operation: http
    config:
      url: ${env.SLACK_WEBHOOK}
      method: POST
      body:
        text: ⚠️ Low Stock Alert
        blocks:
          - type: section
            text:
              type: mrkdwn
              text: |
                *Low Stock Items (${has-low-stock.output.count})*
                ${check-low-stock.output.map(item =>
                  `• ${item.product_name}: ${item.quantity} units (reorder at ${item.reorder_level})`
                ).join('\n')}

  # Send email to purchasing team
  - name: alert-email
    condition: ${has-low-stock.executed}
    operation: email
    config:
      to: [purchasing@example.com]
      subject: Low Stock Alert - ${has-low-stock.output.count} Items
      html: |
        <h2>Low Stock Alert</h2>
        <table>
          <tr><th>Product</th><th>Current</th><th>Reorder Level</th></tr>
          ${check-low-stock.output.map(item => `
            <tr>
              <td>${item.product_name}</td>
              <td>${item.quantity}</td>
              <td>${item.reorder_level}</td>
            </tr>
          `).join('')}
        </table>

output:
  lowStockItems: ${check-low-stock.output}
  alertsSent: ${alert-slack.success && alert-email.success}

Order Cancellation Workflow

Handle order cancellations with refunds and inventory restoration:
ensemble: cancel-order
description: Cancel order with refund and inventory restoration

agents:
  # Get order details
  - name: get-order
    operation: storage
    config:
      type: d1
      query: |
        SELECT * FROM orders
        WHERE id = ? AND status != 'cancelled'
      params: [${input.orderId}]

  # Check if cancellable
  - name: check-cancellable
    condition: ${get-order.output.length > 0}
    operation: code
    config:
      code: |
        const order = ${get-order.output[0]};
        const canCancel = ['pending', 'processing'].includes(order.status);
        return { canCancel, reason: canCancel ? null : 'Order already shipped' };

  # Refund payment
  - name: refund-payment
    condition: ${check-cancellable.output.canCancel}
    operation: http
    config:
      url: https://api.stripe.com/v1/refunds
      method: POST
      headers:
        Authorization: Bearer ${env.STRIPE_SECRET_KEY}
      body:
        payment_intent: ${get-order.output[0].payment_intent_id}

  # Get order items
  - name: get-order-items
    condition: ${refund-payment.success}
    operation: storage
    config:
      type: d1
      query: |
        SELECT product_id, quantity FROM order_items
        WHERE order_id = ?
      params: [${input.orderId}]

  # Restore inventory
  - name: restore-inventory
    condition: ${get-order-items.success}
    operation: storage
    loop:
      items: ${get-order-items.output}
    config:
      type: d1
      query: |
        UPDATE inventory
        SET quantity = quantity + ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.product_id}

  # Update order status
  - name: update-order-status
    condition: ${refund-payment.success}
    operation: storage
    config:
      type: d1
      query: |
        UPDATE orders
        SET status = 'cancelled', updated_at = ?
        WHERE id = ?
      params:
        - ${Date.now()}
        - ${input.orderId}

  # Notify customer
  - name: send-cancellation-email
    condition: ${update-order-status.success}
    operation: email
    config:
      to: ${get-order.output[0].customer_email}
      subject: Order Cancellation Confirmed
      template: order-cancelled
      data:
        orderNumber: ${input.orderId}
        refundAmount: ${get-order.output[0].total}

output:
  success: ${update-order-status.success}
  refunded: ${refund-payment.success}
  inventoryRestored: ${restore-inventory.outputs?.every(r => r.success)}

Real-Time User Activity Tracking

Track user actions and trigger personalized responses:
ensemble: track-user-activity
description: Track user actions and respond in real-time

agents:
  # Log activity
  - name: log-activity
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO user_activities (
          user_id, action, metadata, timestamp
        ) VALUES (?, ?, ?, ?)
      params:
        - ${input.userId}
        - ${input.action}
        - ${JSON.stringify(input.metadata)}
        - ${Date.now()}

  # Check for patterns
  - name: check-patterns
    operation: storage
    config:
      type: d1
      query: |
        SELECT COUNT(*) as count, action
        FROM user_activities
        WHERE user_id = ? AND timestamp > ?
        GROUP BY action
      params:
        - ${input.userId}
        - ${Date.now() - 24 * 60 * 60 * 1000} # Last 24 hours

  # Trigger based on patterns
  - name: abandoned-cart-reminder
    condition: ${input.action === 'cart_add' && check-patterns.output.some(p => p.action === 'cart_add' && p.count >= 3)}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: send-cart-reminder
        input:
          userId: ${input.userId}
        delay: 3600000 # 1 hour delay

  - name: engagement-reward
    condition: ${input.action === 'product_view' && check-patterns.output.some(p => p.action === 'product_view' && p.count >= 10)}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: send-engagement-reward
        input:
          userId: ${input.userId}

output:
  logged: ${log-activity.success}
  patterns: ${check-patterns.output}

Queue-Based Processing

Process tasks asynchronously using Cloudflare Queues:
// src/index.ts - Queue consumer
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      const { ensemble, input, delay } = message.body;

      try {
        // Execute the specified ensemble
        const result = await conductor.execute(ensemble, input);

        if (result.success) {
          message.ack();
        } else {
          message.retry();
        }
      } catch (error) {
        console.error(`Queue processing error:`, error);
        message.retry();
      }
    }
  }
};

Best Practices

1. Use Queues for Async Work

# Don't block webhook responses
agents:
  - name: acknowledge-webhook
    operation: code
    config:
      code: return { received: true };

  # Queue heavy processing
  - name: queue-processing
    operation: storage
    config:
      type: queue
      action: send
      body:
        ensemble: process-webhook-data
        input: ${input.webhook}

2. Implement Idempotency

agents:
  # Check if already processed
  - name: check-processed
    operation: storage
    config:
      type: d1
      query: |
        SELECT COUNT(*) as count FROM processed_events
        WHERE event_id = ?
      params: [${input.eventId}]

  # Only process if not seen before
  - name: process-event
    condition: ${check-processed.output[0].count === 0}
    operation: code
    config:
      code: # Process event

  # Mark as processed
  - name: mark-processed
    condition: ${process-event.success}
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO processed_events (event_id, timestamp)
        VALUES (?, ?)
      params:
        - ${input.eventId}
        - ${Date.now()}

3. Add Retry Logic

agents:
  - name: external-api-call
    operation: http
    config:
      url: https://api.example.com/endpoint
    retry:
      max_attempts: 3
      backoff: exponential
      max_delay: 30000

4. Monitor Event Processing

// Log all events
agents:
  - name: log-event
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO event_logs (
          event_type, source, status, latency, timestamp
        ) VALUES (?, ?, ?, ?, ?)

Next Steps