Skip to main content

Overview

Integrate Conductor with Cloudflare Queues for reliable asynchronous task processing. Perfect for background jobs, event processing, webhooks, and any operation that can be processed later.

Quick Example

// Producer: Add to queue
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Queue task for background processing
    await env.TASK_QUEUE.send({
      type: 'process-order',
      orderId: 123,
      timestamp: Date.now()
    });

    return Response.json({ queued: true });
  }
};

// Consumer: Process with Conductor
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      try {
        await conductor.executeEnsemble('process-task', message.body);
        message.ack();
      } catch (error) {
        message.retry();
      }
    }
  }
};

Setup

1. Configure Queue

# wrangler.toml
[[queues.producers]]
binding = "TASK_QUEUE"
queue = "task-queue"

[[queues.consumers]]
queue = "task-queue"
max_batch_size = 10
max_batch_timeout = 30

2. Create Queue

npx wrangler queues create task-queue

Producer Patterns

Basic Queue Producer

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const data = await request.json();

    // Add to queue
    await env.TASK_QUEUE.send({
      task: 'process-data',
      data,
      queuedAt: Date.now()
    });

    return Response.json({
      status: 'queued',
      message: 'Task will be processed shortly'
    });
  }
};

Batch Queue Production

// Queue multiple messages at once
await env.TASK_QUEUE.sendBatch([
  { task: 'email', userId: 1 },
  { task: 'email', userId: 2 },
  { task: 'email', userId: 3 }
]);

Delayed Messages

// Delay execution by 1 hour
await env.TASK_QUEUE.send(
  { task: 'reminder', userId: 123 },
  { delaySeconds: 3600 }
);

Priority Messages

// Send with content type for routing
await env.TASK_QUEUE.send(
  { task: 'urgent', data: {...} },
  { contentType: 'urgent' }
);

Consumer Patterns

Basic Consumer

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      const { task, data } = message.body;

      try {
        await conductor.executeEnsemble(task, data);
        message.ack();  // Success
      } catch (error) {
        console.error('Task failed:', error);
        message.retry();  // Will retry automatically
      }
    }
  }
};

Batch Processing

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    // Process all messages as a batch
    const tasks = batch.messages.map(m => m.body);

    try {
      await conductor.executeEnsemble('process-batch', {
        tasks,
        batchSize: tasks.length
      });

      // Ack all on success
      batch.messages.forEach(m => m.ack());
    } catch (error) {
      // Retry all on failure
      batch.messages.forEach(m => m.retry());
    }
  }
};

Selective Retry

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      try {
        const result = await conductor.executeEnsemble('process-task', message.body);

        if (result.success) {
          message.ack();
        } else {
          // Permanent failure - don't retry
          message.ack();
          await logFailure(message.body, result.error);
        }
      } catch (error) {
        // Transient failure - retry
        if (isTransient(error)) {
          message.retry();
        } else {
          message.ack();
          await logFatalError(message.body, error);
        }
      }
    }
  }
};

Common Workflows

Email Queue

# Producer
name: queue-email
description: Queue email for sending

flow:
  - member: validate-email
    type: Function

  - member: queue-message
    type: Function
    input:
      queue: "EMAIL_QUEUE"
      message:
        to: ${input.to}
        subject: ${input.subject}
        body: ${input.body}
        templateId: ${input.templateId}

output:
  queued: true
  messageId: ${queue-message.output.id}
# Consumer
name: send-email
description: Process email from queue

flow:
  - member: render-template
    condition: ${input.templateId != null}
    type: Function

  - member: send-email
    type: API
    config:
      url: "${env.EMAIL_SERVICE_URL}"
      method: POST
    input:
      body:
        to: ${input.to}
        subject: ${input.subject}
        html: ${render-template.success ? render-template.output : input.body}

  - member: log-sent
    condition: ${send-email.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO email_log (to_address, subject, sent_at)
        VALUES (?, ?, CURRENT_TIMESTAMP)
    input:
      params:
        - ${input.to}
        - ${input.subject}

output:
  sent: ${send-email.success}

Webhook Processing

// Producer: Receive webhook, queue for processing
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const payload = await request.json();

    // Verify signature
    const signature = request.headers.get('X-Webhook-Signature');
    if (!verifySignature(payload, signature, env.WEBHOOK_SECRET)) {
      return new Response('Invalid signature', { status: 401 });
    }

    // Queue for processing
    await env.WEBHOOK_QUEUE.send({
      source: 'github',
      event: request.headers.get('X-GitHub-Event'),
      payload,
      receivedAt: Date.now()
    });

    // Respond immediately
    return Response.json({ received: true });
  }
};
# Consumer: Process webhook
name: process-webhook
description: Handle webhook event

flow:
  # Route by event type
  - member: handle-push
    condition: ${input.event === 'push'}
    type: Function

  - member: handle-pr
    condition: ${input.event === 'pull_request'}
    type: Function

  - member: handle-issue
    condition: ${input.event === 'issues'}
    type: Function

  # Log processing
  - member: log-event
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO webhook_log (source, event, processed_at)
        VALUES (?, ?, CURRENT_TIMESTAMP)

output:
  processed: true

Data Pipeline Queue

// Queue data ingestion tasks
export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    // Get list of sources to process
    const sources = await getSourcesToProcess(env);

    // Queue each source
    for (const source of sources) {
      await env.PIPELINE_QUEUE.send({
        task: 'ingest-data',
        source: source.id,
        lastSync: source.lastSync
      });
    }
  },

  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    for (const message of batch.messages) {
      try {
        await conductor.executeEnsemble('data-ingestion', message.body);
        message.ack();
      } catch (error) {
        message.retry();
      }
    }
  }
};

Error Handling

Retry Strategy

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        await processMessage(message.body);
        message.ack();
      } catch (error) {
        if (message.attempts < 5) {
          message.retry();  // Automatic exponential backoff
        } else {
          // Max retries reached - move to DLQ
          await env.DLQ.send({
            original: message.body,
            error: error.message,
            attempts: message.attempts
          });
          message.ack();
        }
      }
    }
  }
};

Dead Letter Queue

[[queues.producers]]
binding = "DLQ"
queue = "dead-letter-queue"
// DLQ consumer - handle failed messages
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      // Log failure
      await env.DB.prepare(
        'INSERT INTO failed_tasks (task, error, timestamp) VALUES (?, ?, ?)'
      ).bind(
        JSON.stringify(message.body.original),
        message.body.error,
        Date.now()
      ).run();

      // Alert
      await fetch(env.ALERT_WEBHOOK, {
        method: 'POST',
        body: JSON.stringify({
          text: `Task failed after ${message.body.attempts} attempts`,
          task: message.body.original
        })
      });

      message.ack();
    }
  }
};

Monitoring

Track Queue Metrics

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    const startTime = Date.now();
    let processed = 0;
    let failed = 0;

    for (const message of batch.messages) {
      try {
        await processMessage(message.body);
        message.ack();
        processed++;
      } catch (error) {
        message.retry();
        failed++;
      }
    }

    // Log metrics
    await env.DB.prepare(
      `INSERT INTO queue_metrics
       (processed, failed, duration_ms, timestamp)
       VALUES (?, ?, ?, CURRENT_TIMESTAMP)`
    ).bind(
      processed,
      failed,
      Date.now() - startTime
    ).run();
  }
};

CloudFlare Dashboard

Monitor in Cloudflare Dashboard:
  • Queue depth
  • Message throughput
  • Processing time
  • Error rate
  • Retry rate

Testing

import { describe, it, expect } from 'vitest';
import { unstable_dev } from 'wrangler';

describe('queue integration', () => {
  it('should queue and process message', async () => {
    const worker = await unstable_dev('src/index.ts');

    // Queue message
    await worker.fetch('/api/queue', {
      method: 'POST',
      body: JSON.stringify({ task: 'test' })
    });

    // Trigger queue consumer
    await worker.queue({
      queue: 'TASK_QUEUE',
      messages: [{
        id: '1',
        timestamp: Date.now(),
        body: { task: 'test' }
      }]
    });

    // Verify processed
    // ... check database or state
  });
});

Best Practices

  1. Respond quickly - Queue and return immediately
  2. Idempotent processing - Handle duplicate messages
  3. Selective retry - Don’t retry permanent failures
  4. Monitor metrics - Track queue depth and processing time
  5. Set batch sizes - Balance throughput and latency
  6. Use DLQ - Handle failed messages
  7. Log everything - Debug queue issues
  8. Test thoroughly - Verify queue and consumer logic