Skip to main content

Webhook Handler

Process incoming webhooks from external services:
ensemble: webhook-handler
description: Validate and process incoming webhooks

agents:
  # Validate webhook signature
  - name: validate-signature
    operation: code
    config:
      script: scripts/validate-webhook-signature
    input:
      signature: ${input.headers['x-webhook-signature']}
      body: ${input.body}
      secret: ${env.WEBHOOK_SECRET}

  # Validate payload schema
  - name: validate-schema
    condition: ${validate-signature.output.valid}
    operation: validate
    input:
      data: ${input.body}
      schema: ${[email protected]}

  # Process event based on type
  - name: route-event
    condition: ${validate-schema.output.valid}
    operation: code
    config:
      script: scripts/route-webhook-event
    input:
      eventType: ${input.body.type}

  # Handle different event types
  - name: handle-order-created
    condition: ${route-event.output.eventType === 'order.created'}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: process-order
        input: ${input.body.data}

  - name: handle-payment-success
    condition: ${route-event.output.eventType === 'payment.succeeded'}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: fulfill-order
        input: ${input.body.data}

  - name: handle-user-signup
    condition: ${route-event.output.eventType === 'user.created'}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: onboard-user
        input: ${input.body.data}

  # Log webhook receipt
  - name: log-webhook
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO webhook_logs (
          event_type, valid, processed, timestamp
        ) VALUES (?, ?, ?, ?)
      params:
        - ${route-event.output.eventType}
        - ${validate-schema.output.valid}
        - ${route-event.output.shouldProcess}
        - ${Date.now()}

output:
  success: ${validate-signature.output.valid && validate-schema.output.valid}
  eventType: ${route-event.output.eventType}
  processed: ${route-event.output.shouldProcess}
// scripts/validate-webhook-signature.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
import * as crypto from 'crypto'

export default function validateWebhookSignature(context: AgentExecutionContext) {
  const { signature, body, secret } = context.input
  const payload = JSON.stringify(body)
  const expected = crypto
    .createHmac('sha256', secret)
    .update(payload)
    .digest('hex')
  return { valid: signature === expected }
}
// scripts/route-webhook-event.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function routeWebhookEvent(context: AgentExecutionContext) {
  const { eventType } = context.input
  return { eventType, shouldProcess: true }
}

Scheduled Jobs

Execute ensembles on a schedule using cron triggers:

Daily Report Generation

name: daily-report
description: Generate and send daily report

trigger:
  - type: cron
    cron: "0 8 * * *"  # Daily at 8 AM UTC
    timezone: "America/New_York"
    enabled: true

flow:
  - agent: get-metrics
  - agent: generate-report
  - agent: send-email

agents:
  # Query yesterday's metrics
  - name: get-metrics
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT
          COUNT(*) as orders,
          SUM(total) as revenue,
          AVG(total) as avg_order_value
        FROM orders
        WHERE created_at >= strftime('%s', 'now', '-1 day') * 1000

  # Generate report with AI
  - name: generate-report
    operation: think
    config:
      provider: anthropic
      model: claude-sonnet-4
      prompt: |
        Generate a professional daily business report based on these metrics:

        Orders: ${get-metrics.output[0].orders}
        Revenue: $${get-metrics.output[0].revenue}
        Average Order Value: $${get-metrics.output[0].avg_order_value}

        Include trends, insights, and recommendations.

  # Send to team
  - name: send-email
    operation: email
    config:
      to: [[email protected], [email protected]]
      subject: "Daily Report - ${new Date().toDateString()}"
      html: |
        <h1>Daily Business Report</h1>
        ${generate-report.output}

outputs:
  sent: ${send-email.success}
  metrics: ${get-metrics.output[0]}

Multiple Schedules

Run different tasks on different schedules:
name: maintenance-tasks

trigger:
  # Every 4 hours - check inventory
  - type: cron
    cron: "0 */4 * * *"
    timezone: "UTC"
    input:
      task: "inventory-check"

  # Weekly on Sunday - generate summary
  - type: cron
    cron: "0 0 * * 0"
    timezone: "America/New_York"
    input:
      task: "weekly-summary"

flow:
  - agent: route-task

agents:
  - name: route-task
    operation: code
    config:
      script: scripts/route-maintenance-task
    input:
      task: ${input.task}

outputs:
  completed: ${route-task.output.action}
// scripts/route-maintenance-task.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function routeMaintenanceTask(context: AgentExecutionContext) {
  const { task } = context.input

  if (task === 'inventory-check') {
    // Check inventory levels
    return { action: 'check_inventory' }
  } else if (task === 'weekly-summary') {
    // Generate weekly summary
    return { action: 'generate_summary' }
  }
  return { action: 'unknown' }
}
Configure in wrangler.toml:
[triggers]
crons = [
  "0 8 * * *",    # Daily at 8 AM
  "0 */4 * * *",  # Every 4 hours
  "0 0 * * 0"     # Weekly on Sunday
]
Note: Conductor automatically discovers cron triggers from ensemble configurations. The wrangler.toml crons must match your ensemble trigger configurations.

Database Triggers

React to database changes:

On User Signup

ensemble: on-user-signup
description: Triggered when a new user signs up

agents:
  # Send welcome email
  - name: send-welcome-email
    operation: email
    config:
      to: ${input.user.email}
      subject: "Welcome to Our Platform!"
      template: welcome-email
      data:
        name: ${input.user.name}
        verificationLink: https://app.example.com/verify/${input.user.id}

  # Create user profile
  - name: create-profile
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO profiles (
          user_id, onboarding_status, created_at
        ) VALUES (?, 'pending', ?)
      params:
        - ${input.user.id}
        - ${Date.now()}

  # Add to email marketing list
  - name: add-to-marketing
    operation: http
    config:
      url: https://api.mailchimp.com/3.0/lists/${env.MAILCHIMP_LIST_ID}/members
      method: POST
      headers:
        Authorization: Bearer ${env.MAILCHIMP_API_KEY}
      body:
        email_address: ${input.user.email}
        status: subscribed
        merge_fields:
          FNAME: ${input.user.name}

  # Track signup event
  - name: track-signup
    operation: http
    config:
      url: https://api.segment.com/v1/track
      method: POST
      headers:
        Authorization: Basic ${env.SEGMENT_WRITE_KEY}
      body:
        userId: ${input.user.id}
        event: User Signed Up
        properties:
          source: ${input.source}
          timestamp: ${Date.now()}

output:
  emailSent: ${send-welcome-email.success}
  profileCreated: ${create-profile.success}
Trigger from signup handler:
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    if (request.method === 'POST' && new URL(request.url).pathname === '/signup') {
      const { email, name } = await request.json();

      // Create user in database
      const user = await createUser(env, { email, name });

      // Trigger on-user-signup ensemble
      const conductor = new Conductor({ env });
      await conductor.execute('on-user-signup', {
        user,
        source: 'web'
      });

      return Response.json({ success: true, userId: user.id });
    }
  }
};

E-Commerce Order Processing

Complete event-driven order workflow with error handling:
ensemble: process-order
description: Complete order processing with validation, payment, and fulfillment

state:
  schema:
    order: object
    inventoryReserved: boolean
    paymentProcessed: boolean
    fulfillmentCreated: boolean

agents:
  # Validate order
  - name: validate-order
    operation: code
    config:
      script: scripts/validate-order
    input:
      items: ${input.items}
      customerId: ${input.customerId}
      shippingAddress: ${input.shippingAddress}
    state:
      set: [order]

  # Check inventory availability in parallel
  - parallel:
      - name: check-inventory
        operation: data
        loop:
          items: ${input.items}
        config:
          backend: d1
          binding: DB
          operation: query
          sql: |
            SELECT quantity FROM inventory
            WHERE product_id = ? AND quantity >= ?
          params:
            - ${loop.item.productId}
            - ${loop.item.quantity}

  # Reserve inventory
  - name: reserve-inventory
    condition: ${check-inventory.outputs.every(r => r.length > 0)}
    operation: data
    loop:
      items: ${input.items}
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        UPDATE inventory
        SET quantity = quantity - ?,
            reserved = reserved + ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.quantity}
        - ${loop.item.productId}
    state:
      set: [inventoryReserved]

  # Process payment
  - name: process-payment
    condition: ${state.inventoryReserved}
    operation: http
    config:
      url: https://api.stripe.com/v1/payment_intents
      method: POST
      headers:
        Authorization: Bearer ${env.STRIPE_SECRET_KEY}
      body:
        amount: ${input.total}
        currency: usd
        customer: ${input.customerId}
        payment_method: ${input.paymentMethodId}
        confirm: true
    state:
      set: [paymentProcessed]

  # Handle payment failure - release inventory
  - name: release-inventory-on-failure
    condition: ${!process-payment.success && state.inventoryReserved}
    operation: data
    loop:
      items: ${input.items}
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        UPDATE inventory
        SET quantity = quantity + ?,
            reserved = reserved - ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.quantity}
        - ${loop.item.productId}

  # Create order record
  - name: create-order
    condition: ${state.paymentProcessed}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        INSERT INTO orders (
          customer_id, total, status, payment_intent_id, created_at
        ) VALUES (?, ?, 'processing', ?, ?)
        RETURNING id
      params:
        - ${input.customerId}
        - ${input.total}
        - ${process-payment.output.id}
        - ${Date.now()}

  # Create order items
  - name: create-order-items
    condition: ${create-order.success}
    operation: data
    loop:
      items: ${input.items}
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO order_items (
          order_id, product_id, quantity, price
        ) VALUES (?, ?, ?, ?)
      params:
        - ${create-order.output[0].id}
        - ${loop.item.productId}
        - ${loop.item.quantity}
        - ${loop.item.price}

  # Finalize inventory - remove from reserved
  - name: finalize-inventory
    condition: ${create-order.success}
    operation: data
    loop:
      items: ${input.items}
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        UPDATE inventory
        SET reserved = reserved - ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.productId}

  # Create fulfillment request
  - name: create-fulfillment
    condition: ${create-order.success}
    operation: http
    config:
      url: ${env.FULFILLMENT_API}/orders
      method: POST
      headers:
        Authorization: Bearer ${env.FULFILLMENT_KEY}
      body:
        orderId: ${create-order.output[0].id}
        items: ${input.items}
        shippingAddress: ${input.shippingAddress}
        priority: standard
    state:
      set: [fulfillmentCreated]

  # Send notifications in parallel
  - parallel:
      - name: send-confirmation-email
        condition: ${create-order.success}
        operation: email
        config:
          to: ${input.customerEmail}
          subject: Order Confirmation ${create-order.output[0].id}
          template: order-confirmation
          data:
            orderNumber: ${create-order.output[0].id}
            items: ${input.items}
            total: ${input.total}

      - name: send-sms
        condition: ${create-order.success}
        operation: sms
        config:
          to: ${input.customerPhone}
          message: Your order ${create-order.output[0].id} is confirmed!

      - name: track-analytics
        condition: ${create-order.success}
        operation: http
        config:
          url: ${env.ANALYTICS_URL}/events
          method: POST
          body:
            event: order_placed
            userId: ${input.customerId}
            properties:
              orderId: ${create-order.output[0].id}
              total: ${input.total}
              itemCount: ${input.items.length}

output:
  success: ${create-order.success}
  orderId: ${create-order.output[0]?.id}
  paymentIntentId: ${process-payment.output?.id}
  fulfillmentId: ${create-fulfillment.output?.id}
  error: ${!create-order.success ? 'Order processing failed' : null}

Inventory Monitoring

Scheduled check for low stock with alerts:
ensemble: check-inventory
description: Monitor inventory and alert on low stock

agents:
  # Check low stock items
  - name: check-low-stock
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT product_id, product_name, quantity, reorder_level
        FROM inventory
        WHERE quantity <= reorder_level

  # Only continue if there are low stock items
  - name: has-low-stock
    condition: ${check-low-stock.output.length > 0}
    operation: code
    config:
      script: scripts/return-item-count
    input:
      count: ${check-low-stock.output.length}

  # Get product details
  - name: enrich-products
    condition: ${has-low-stock.executed}
    operation: data
    loop:
      items: ${check-low-stock.output}
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT * FROM products WHERE id = ?
      params: [${loop.item.product_id}]

  # Send Slack alert
  - name: alert-slack
    condition: ${has-low-stock.executed}
    operation: http
    config:
      url: ${env.SLACK_WEBHOOK}
      method: POST
      body:
        text:  Low Stock Alert
        blocks:
          - type: section
            text:
              type: mrkdwn
              text: |
                *Low Stock Items (${has-low-stock.output.count})*
                ${check-low-stock.output.map(item =>
                  ` ${item.product_name}: ${item.quantity} units (reorder at ${item.reorder_level})`
                ).join('\n')}

  # Send email to purchasing team
  - name: alert-email
    condition: ${has-low-stock.executed}
    operation: email
    config:
      to: [[email protected]]
      subject: Low Stock Alert - ${has-low-stock.output.count} Items
      html: |
        <h2>Low Stock Alert</h2>
        <table>
          <tr><th>Product</th><th>Current</th><th>Reorder Level</th></tr>
          ${check-low-stock.output.map(item => `
            <tr>
              <td>${item.product_name}</td>
              <td>${item.quantity}</td>
              <td>${item.reorder_level}</td>
            </tr>
          `).join('')}
        </table>

output:
  lowStockItems: ${check-low-stock.output}
  alertsSent: ${alert-slack.success && alert-email.success}

Order Cancellation Workflow

Handle order cancellations with refunds and inventory restoration:
ensemble: cancel-order
description: Cancel order with refund and inventory restoration

agents:
  # Get order details
  - name: get-order
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT * FROM orders
        WHERE id = ? AND status != 'cancelled'
      params: [${input.orderId}]

  # Check if cancellable
  - name: check-cancellable
    condition: ${get-order.output.length > 0}
    operation: code
    config:
      script: scripts/check-order-cancellable
    input:
      orderStatus: ${get-order.output[0].status}

  # Refund payment
  - name: refund-payment
    condition: ${check-cancellable.output.canCancel}
    operation: http
    config:
      url: https://api.stripe.com/v1/refunds
      method: POST
      headers:
        Authorization: Bearer ${env.STRIPE_SECRET_KEY}
      body:
        payment_intent: ${get-order.output[0].payment_intent_id}

  # Get order items
  - name: get-order-items
    condition: ${refund-payment.success}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT product_id, quantity FROM order_items
        WHERE order_id = ?
      params: [${input.orderId}]

  # Restore inventory
  - name: restore-inventory
    condition: ${get-order-items.success}
    operation: data
    loop:
      items: ${get-order-items.output}
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        UPDATE inventory
        SET quantity = quantity + ?
        WHERE product_id = ?
      params:
        - ${loop.item.quantity}
        - ${loop.item.product_id}

  # Update order status
  - name: update-order-status
    condition: ${refund-payment.success}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        UPDATE orders
        SET status = 'cancelled', updated_at = ?
        WHERE id = ?
      params:
        - ${Date.now()}
        - ${input.orderId}

  # Notify customer
  - name: send-cancellation-email
    condition: ${update-order-status.success}
    operation: email
    config:
      to: ${get-order.output[0].customer_email}
      subject: Order Cancellation Confirmed
      template: order-cancelled
      data:
        orderNumber: ${input.orderId}
        refundAmount: ${get-order.output[0].total}

output:
  success: ${update-order-status.success}
  refunded: ${refund-payment.success}
  inventoryRestored: ${restore-inventory.outputs?.every(r => r.success)}
// scripts/validate-order.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function validateOrder(context: AgentExecutionContext) {
  const { items, customerId, shippingAddress } = context.input
  const isValid = items.length > 0 &&
                 customerId &&
                 shippingAddress.zip &&
                 shippingAddress.country
  return { valid: isValid }
}
// scripts/return-item-count.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function returnItemCount(context: AgentExecutionContext) {
  const { count } = context.input
  return { count }
}
// scripts/check-order-cancellable.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function checkOrderCancellable(context: AgentExecutionContext) {
  const { orderStatus } = context.input
  const canCancel = ['pending', 'processing'].includes(orderStatus)
  return { canCancel, reason: canCancel ? null : 'Order already shipped' }
}
// scripts/acknowledge-webhook.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function acknowledgeWebhook(context: AgentExecutionContext) {
  return { received: true }
}

Real-Time User Activity Tracking

Track user actions and trigger personalized responses:
ensemble: track-user-activity
description: Track user actions and respond in real-time

agents:
  # Log activity
  - name: log-activity
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO user_activities (
          user_id, action, metadata, timestamp
        ) VALUES (?, ?, ?, ?)
      params:
        - ${input.userId}
        - ${input.action}
        - ${JSON.stringify(input.metadata)}
        - ${Date.now()}

  # Check for patterns
  - name: check-patterns
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT COUNT(*) as count, action
        FROM user_activities
        WHERE user_id = ? AND timestamp > ?
        GROUP BY action
      params:
        - ${input.userId}
        - ${Date.now() - 24 * 60 * 60 * 1000} # Last 24 hours

  # Trigger based on patterns
  - name: abandoned-cart-reminder
    condition: ${input.action === 'cart_add' && check-patterns.output.some(p => p.action === 'cart_add' && p.count >= 3)}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: send-cart-reminder
        input:
          userId: ${input.userId}
        delay: 3600000 # 1 hour delay

  - name: engagement-reward
    condition: ${input.action === 'product_view' && check-patterns.output.some(p => p.action === 'product_view' && p.count >= 10)}
    operation: storage
    config:
      type: queue
      action: send
      queue: TASK_QUEUE
      body:
        ensemble: send-engagement-reward
        input:
          userId: ${input.userId}

output:
  logged: ${log-activity.success}
  patterns: ${check-patterns.output}

Queue-Based Processing

Process tasks asynchronously using Cloudflare Queues:
// src/index.ts - Queue consumer
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      const { ensemble, input, delay } = message.body;

      try {
        // Execute the specified ensemble
        const result = await conductor.execute(ensemble, input);

        if (result.success) {
          message.ack();
        } else {
          message.retry();
        }
      } catch (error) {
        console.error(`Queue processing error:`, error);
        message.retry();
      }
    }
  }
};

Best Practices

1. Use Queues for Async Work

# Don't block webhook responses
agents:
  - name: acknowledge-webhook
    operation: code
    config:
      script: scripts/acknowledge-webhook

  # Queue heavy processing
  - name: queue-processing
    operation: storage
    config:
      type: queue
      action: send
      body:
        ensemble: process-webhook-data
        input: ${input.webhook}

2. Implement Idempotency

agents:
  # Check if already processed
  - name: check-processed
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT COUNT(*) as count FROM processed_events
        WHERE event_id = ?
      params: [${input.eventId}]

  # Only process if not seen before
  - name: process-event
    condition: ${check-processed.output[0].count === 0}
    operation: code
    config:
      code: # Process event

  # Mark as processed
  - name: mark-processed
    condition: ${process-event.success}
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO processed_events (event_id, timestamp)
        VALUES (?, ?)
      params:
        - ${input.eventId}
        - ${Date.now()}

3. Add Retry Logic

agents:
  - name: external-api-call
    operation: http
    config:
      url: https://api.example.com/endpoint
    retry:
      max_attempts: 3
      backoff: exponential
      max_delay: 30000

4. Monitor Event Processing

// Log all events
agents:
  - name: log-event
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO event_logs (
          event_type, source, status, latency, timestamp
        ) VALUES (?, ?, ?, ?, ?)

Next Steps