Skip to main content

Overview

Webhooks enable event-driven workflows by triggering ensemble execution from external HTTP requests. Conductor can both receive webhooks (inbound) and send webhook notifications (outbound). Perfect for integrating with third-party services, responding to external events, and building event-driven architectures.

Inbound Webhooks

Basic Webhook Receiver

// src/index.ts
import { Conductor } from '@ensemble-edge/conductor';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const conductor = new Conductor({ env });

    // Webhook endpoint
    if (request.url.endsWith('/webhook/github')) {
      const payload = await request.json();

      // Execute ensemble with webhook data
      const result = await conductor.executeEnsemble('handle-github-event', {
        event: request.headers.get('X-GitHub-Event'),
        payload
      });

      return Response.json({ success: true, executionId: result.id });
    }

    return new Response('Not Found', { status: 404 });
  }
};

With Signature Verification

// Verify GitHub webhook signature
async function verifyGitHubSignature(
  request: Request,
  secret: string
): Promise<boolean> {
  const signature = request.headers.get('X-Hub-Signature-256');
  if (!signature) return false;

  const body = await request.clone().text();
  const encoder = new TextEncoder();
  const key = await crypto.subtle.importKey(
    'raw',
    encoder.encode(secret),
    { name: 'HMAC', hash: 'SHA-256' },
    false,
    ['sign']
  );

  const signed = await crypto.subtle.sign(
    'HMAC',
    key,
    encoder.encode(body)
  );

  const expectedSignature = 'sha256=' + Array.from(new Uint8Array(signed))
    .map(b => b.toString(16).padStart(2, '0'))
    .join('');

  return signature === expectedSignature;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const conductor = new Conductor({ env });

    if (request.url.endsWith('/webhook/github')) {
      // Verify signature
      const isValid = await verifyGitHubSignature(request, env.GITHUB_WEBHOOK_SECRET);

      if (!isValid) {
        return new Response('Invalid signature', { status: 401 });
      }

      const payload = await request.json();

      await conductor.executeEnsemble('handle-github-event', {
        event: request.headers.get('X-GitHub-Event'),
        payload
      });

      return Response.json({ success: true });
    }

    return new Response('Not Found', { status: 404 });
  }
};

GitHub Webhooks

Repository Events

name: handle-github-event
description: Process GitHub webhook events

flow:
  # Route by event type
  - member: handle-push
    condition: ${input.event === 'push'}
    input:
      commits: ${input.payload.commits}
      ref: ${input.payload.ref}

  - member: handle-pull-request
    condition: ${input.event === 'pull_request'}
    input:
      action: ${input.payload.action}
      pr: ${input.payload.pull_request}

  - member: handle-issue
    condition: ${input.event === 'issues'}
    input:
      action: ${input.payload.action}
      issue: ${input.payload.issue}

output:
  processed: true
  eventType: ${input.event}

CI/CD Automation

name: ci-cd-pipeline
description: Automated deployment on push

flow:
  - member: check-branch
    type: Function

  # Only deploy from main branch
  - member: run-tests
    condition: ${check-branch.output.branch === 'main'}
    type: Function

  - member: deploy
    condition: ${run-tests.output.passed}
    type: Function
    input:
      environment: production
      version: ${input.payload.after}

  # Notify Slack
  - member: notify-deployment
    condition: ${deploy.success}
    type: API
    config:
      url: "${env.SLACK_WEBHOOK_URL}"
      method: POST
    input:
      body:
        text: "Deployed ${input.payload.after} to production"

output:
  deployed: ${deploy.success}
  version: ${input.payload.after}

Stripe Webhooks

Payment Events

// Verify Stripe signature
function verifyStripeSignature(
  payload: string,
  signature: string,
  secret: string
): boolean {
  // Stripe signature verification
  const stripe = require('stripe')(process.env.STRIPE_SECRET_KEY);
  try {
    stripe.webhooks.constructEvent(payload, signature, secret);
    return true;
  } catch (err) {
    return false;
  }
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const conductor = new Conductor({ env });

    if (request.url.endsWith('/webhook/stripe')) {
      const payload = await request.text();
      const signature = request.headers.get('stripe-signature') || '';

      if (!verifyStripeSignature(payload, signature, env.STRIPE_WEBHOOK_SECRET)) {
        return new Response('Invalid signature', { status: 401 });
      }

      const event = JSON.parse(payload);

      await conductor.executeEnsemble('handle-stripe-event', {
        type: event.type,
        data: event.data.object
      });

      return Response.json({ received: true });
    }
  }
};

Subscription Workflow

name: handle-stripe-event
description: Process Stripe webhook events

flow:
  # New subscription
  - member: activate-subscription
    condition: ${input.type === 'customer.subscription.created'}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO subscriptions (customer_id, plan, status)
        VALUES (?, ?, 'active')
    input:
      params:
        - ${input.data.customer}
        - ${input.data.plan.id}

  # Subscription cancelled
  - member: cancel-subscription
    condition: ${input.type === 'customer.subscription.deleted'}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE subscriptions
        SET status = 'cancelled', cancelled_at = CURRENT_TIMESTAMP
        WHERE customer_id = ?
    input:
      params:
        - ${input.data.customer}

  # Payment failed
  - member: notify-payment-failure
    condition: ${input.type === 'invoice.payment_failed'}
    type: API
    config:
      url: "${env.EMAIL_SERVICE_URL}"
      method: POST
    input:
      body:
        to: ${input.data.customer_email}
        subject: "Payment Failed"
        template: "payment_failed"

output:
  processed: true
  eventType: ${input.type}

Slack Webhooks

Slash Command Handler

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const conductor = new Conductor({ env });

    if (request.method === 'POST' && request.url.endsWith('/slack/command')) {
      const formData = await request.formData();
      const command = formData.get('command');
      const text = formData.get('text');
      const userId = formData.get('user_id');

      // Execute ensemble for slash command
      const result = await conductor.executeEnsemble('handle-slack-command', {
        command,
        text,
        userId
      });

      return Response.json({
        response_type: 'in_channel',
        text: result.output.message
      });
    }

    return new Response('Not Found', { status: 404 });
  }
};

Interactive Message Handler

name: handle-slack-command
description: Process Slack slash commands

flow:
  # /deploy command
  - member: trigger-deployment
    condition: ${input.command === '/deploy'}
    type: Function
    input:
      environment: ${input.text}

  # /status command
  - member: check-status
    condition: ${input.command === '/status'}
    type: Function

  # /analyze command
  - member: run-analysis
    condition: ${input.command === '/analyze'}
    type: Think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
    input:
      prompt: ${input.text}

output:
  message: |
    ${input.command === '/deploy' ? 'Deployment started' : ''}
    ${input.command === '/status' ? check-status.output.summary : ''}
    ${input.command === '/analyze' ? run-analysis.output.text : ''}

Outbound Webhooks

Sending Webhook Notifications

name: notify-webhook
description: Send webhook notification

flow:
  - member: send-notification
    type: API
    config:
      url: "${env.WEBHOOK_URL}"
      method: POST
      headers:
        Authorization: "Bearer ${env.WEBHOOK_SECRET}"
        Content-Type: "application/json"
    input:
      body:
        event: "workflow.completed"
        timestamp: ${Date.now()}
        data: ${input}

output:
  notified: ${send-notification.output.status === 200}

Webhook with Retry

- member: reliable-webhook
  type: API
  config:
    url: "${env.WEBHOOK_URL}"
    method: POST
    retries: 5
  retry:
    maxAttempts: 3
    backoff: exponential
  input:
    body:
      event: ${input.event}
      data: ${input.data}

Webhook Security

HMAC Signature Verification

async function verifyWebhookSignature(
  payload: string,
  signature: string,
  secret: string
): Promise<boolean> {
  const encoder = new TextEncoder();
  const key = await crypto.subtle.importKey(
    'raw',
    encoder.encode(secret),
    { name: 'HMAC', hash: 'SHA-256' },
    false,
    ['sign']
  );

  const signed = await crypto.subtle.sign(
    'HMAC',
    key,
    encoder.encode(payload)
  );

  const expectedSignature = Array.from(new Uint8Array(signed))
    .map(b => b.toString(16).padStart(2, '0'))
    .join('');

  return signature === expectedSignature;
}

IP Allowlist

const ALLOWED_IPS = [
  '192.30.252.0/22',  // GitHub
  '140.82.112.0/20',  // GitHub
  '3.0.0.0/8'         // Stripe
];

function isIPAllowed(ip: string): boolean {
  // Check if IP is in allowlist
  return ALLOWED_IPS.some(range => ipInRange(ip, range));
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const clientIP = request.headers.get('CF-Connecting-IP');

    if (!isIPAllowed(clientIP || '')) {
      return new Response('Forbidden', { status: 403 });
    }

    // Process webhook
  }
};

Token Authentication

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const authHeader = request.headers.get('Authorization');

    if (authHeader !== `Bearer ${env.WEBHOOK_TOKEN}`) {
      return new Response('Unauthorized', { status: 401 });
    }

    // Process webhook
  }
};

Advanced Patterns

Webhook Queue

// Queue webhooks for processing
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const payload = await request.json();

    // Add to queue
    await env.WEBHOOK_QUEUE.send({
      payload,
      timestamp: Date.now(),
      headers: Object.fromEntries(request.headers)
    });

    return Response.json({ queued: true });
  }
};

// Consumer
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      await conductor.executeEnsemble('process-webhook', message.body);
      message.ack();
    }
  }
};

Webhook Replay

name: webhook-with-replay
description: Store webhooks for replay

flow:
  # Store webhook
  - member: store-webhook
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO webhook_log (id, payload, timestamp)
        VALUES (?, ?, CURRENT_TIMESTAMP)
    input:
      params:
        - ${input.webhookId}
        - ${JSON.stringify(input.payload)}

  # Process webhook
  - member: process-webhook
    continue_on_error: true

  # Mark as processed
  - member: mark-processed
    condition: ${process-webhook.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE webhook_log SET processed = true WHERE id = ?
    input:
      params:
        - ${input.webhookId}

Webhook Filtering

name: filtered-webhook-handler
description: Filter webhooks by conditions

flow:
  # Validate webhook
  - member: validate-webhook
    type: Function

  # Filter by event type
  - member: filter-events
    condition: ${validate-webhook.output.valid && input.eventType !== 'ping'}

  # Filter by priority
  - member: high-priority-handler
    condition: ${filter-events.success && input.priority === 'high'}

  - member: normal-handler
    condition: ${filter-events.success && input.priority !== 'high'}

Testing Webhooks

Local Testing

# Use ngrok for local webhook testing
ngrok http 8787

# Update webhook URL in service
# https://abc123.ngrok.io/webhook/github

Mock Webhook Payloads

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('webhook handler', () => {
  it('should process GitHub push event', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeEnsemble('handle-github-event', {
      event: 'push',
      payload: {
        ref: 'refs/heads/main',
        commits: [
          {
            id: 'abc123',
            message: 'feat: add new feature',
            author: { name: 'Alice' }
          }
        ]
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.processed).toBe(true);
  });

  it('should handle Stripe subscription event', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeEnsemble('handle-stripe-event', {
      type: 'customer.subscription.created',
      data: {
        customer: 'cus_123',
        plan: { id: 'plan_pro' },
        status: 'active'
      }
    });

    expect(result).toBeSuccessful();
  });
});

Integration Testing

describe('webhook integration', () => {
  it('should receive and process webhook', async () => {
    const worker = await unstable_dev('src/index.ts');

    // Send webhook
    const response = await worker.fetch('/webhook/github', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-GitHub-Event': 'push'
      },
      body: JSON.stringify({
        ref: 'refs/heads/main',
        commits: [...]
      })
    });

    expect(response.status).toBe(200);

    const result = await response.json();
    expect(result.success).toBe(true);
  });
});

Best Practices

  1. Verify signatures - Always validate webhook authenticity
  2. Return quickly - Respond with 200 immediately, process async
  3. Use queues - Queue webhooks for reliable processing
  4. Implement idempotency - Handle duplicate webhooks gracefully
  5. Log everything - Store webhook payloads for debugging
  6. Set timeouts - Don’t wait indefinitely for processing
  7. Rate limit - Protect against webhook floods
  8. Monitor failures - Alert on webhook processing errors
  9. Version payloads - Handle schema changes gracefully
  10. Document webhooks - Clear documentation for consumers

Common Providers

GitHub

Stripe

Slack

SendGrid

Twilio