Skip to main content

Overview

The queue operation enables ensembles to send messages to and consume messages from Cloudflare Queues. Use queues for:
  • Asynchronous processing - Decouple producers and consumers
  • Batch operations - Process multiple messages efficiently
  • Reliable delivery - Automatic retries and dead letter queues
  • Load leveling - Handle traffic spikes gracefully

Queue Modes

The queue operation supports three modes:
  1. send - Send a single message
  2. send-batch - Send multiple messages at once
  3. consume - Process messages (used with queue triggers)

Sending Messages

Single Message

name: task-enqueuer

flow:
  - agent: enqueue-task

agents:
  - name: enqueue-task
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body:
          taskId: ${input.taskId}
          action: "process"
          data: ${input.data}
        contentType: "json"

outputs:
  queued: ${enqueue-task.success}

Batch Send

Send multiple messages in a single operation:
name: bulk-enqueue

flow:
  - agent: enqueue-batch

agents:
  - name: enqueue-batch
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages:
        - body:
            taskId: "task-1"
            action: "process"
        - body:
            taskId: "task-2"
            action: "process"
        - body:
            taskId: "task-3"
            action: "process"

outputs:
  sent: ${enqueue-batch.output.count}

Dynamic Batch from Input

flow:
  - agent: process-and-enqueue

agents:
  - name: process-and-enqueue
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages: ${input.tasks}  # Array of message objects

inputs:
  tasks:
    type: array
    required: true
    description: Array of tasks to enqueue
Invoke with:
curl -X POST /webhooks/bulk-enqueue \
  -d '{
    "tasks": [
      {"body": {"action": "process", "id": 1}},
      {"body": {"action": "process", "id": 2}},
      {"body": {"action": "process", "id": 3}}
    ]
  }'

Consuming Messages

Queue Trigger Consumer

Configure ensemble to process queue messages:
name: task-processor

trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10
    max_retries: 3
    max_wait_time: 5

flow:
  - agent: process-messages

agents:
  - name: process-messages
    operation: code
    config:
      code: |
        const messages = ${input.messages};
        const results = [];

        for (const msg of messages) {
          try {
            // Process message
            const result = await processTask(msg.body);
            results.push({
              id: msg.id,
              success: true,
              result
            });
          } catch (error) {
            results.push({
              id: msg.id,
              success: false,
              error: error.message
            });
          }
        }

        return {
          processed: results.length,
          succeeded: results.filter(r => r.success).length,
          failed: results.filter(r => !r.success).length,
          results
        };

outputs:
  processed: ${process-messages.output.processed}
  succeeded: ${process-messages.output.succeeded}
  failed: ${process-messages.output.failed}

Message Batch Structure

Queue consumers receive messages in this format:
{
  messages: [
    {
      id: "msg-abc123",
      timestamp: 1705315200000,
      body: {
        // Your message data
        taskId: "task-1",
        action: "process"
      },
      attempts: 1
    },
    // ... more messages
  ],
  queue: "TASK_QUEUE",
  batch: {
    size: 10,
    receivedAt: 1705315200000
  }
}

Message Options

Content Type

Specify message format:
agents:
  - name: send-json
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.data}
        contentType: "json"  # json | text | bytes

Delivery Delay

Delay message delivery:
agents:
  - name: send-delayed
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        delaySeconds: 60  # Deliver after 60 seconds

Message Deduplication

Prevent duplicate messages:
agents:
  - name: send-unique
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        deduplicationId: ${input.taskId}  # Unique identifier

Retry Logic

Automatic Retries

Failed messages are automatically retried:
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3  # Retry up to 3 times
Retry backoff:
  • Attempt 1: Immediate
  • Attempt 2: 1 second delay
  • Attempt 3: 5 seconds delay
  • Attempt 4: 30 seconds delay

Retry in Consumer

Handle retries explicitly:
agents:
  - name: process-with-retry
    operation: code
    config:
      code: |
        const messages = ${input.messages};
        const toRetry = [];

        for (const msg of messages) {
          try {
            await processMessage(msg.body);
          } catch (error) {
            if (msg.attempts < 3) {
              // Retry this message
              toRetry.push(msg.id);
            } else {
              // Send to DLQ
              await sendToDLQ(msg, error);
            }
          }
        }

        return { retryIds: toRetry };

Dead Letter Queue (DLQ)

Configure DLQ

Messages that fail after max retries go to DLQ:
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3
    dead_letter_queue: TASK_DLQ  # Optional DLQ name

Process DLQ Messages

Create separate ensemble for DLQ:
name: dlq-processor

trigger:
  - type: queue
    queue: TASK_DLQ
    batch_size: 5

flow:
  - agent: log-failures
  - agent: alert-team

agents:
  - name: log-failures
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO failed_tasks (message_id, body, error, attempts, failed_at)
        VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.messages[0].id}
        - ${input.messages[0].body}
        - ${input.messages[0].error}
        - ${input.messages[0].attempts}
        - ${Date.now()}

  - name: alert-team
    operation: email
    config:
      to: [devops@example.com]
      subject: "Queue Processing Failures"
      body: |
        ${input.messages.length} messages failed after max retries.
        See database for details.

outputs:
  logged: ${log-failures.success}

Complete Examples

Image Processing Pipeline

name: image-processor

trigger:
  # Webhook enqueues images
  - type: webhook
    path: /upload
    methods: [POST]
    public: true

  # Queue processes images
  - type: queue
    queue: IMAGE_QUEUE
    batch_size: 5
    max_retries: 2

flow:
  # Check if this is webhook or queue invocation
  - agent: route-request
  - agent: enqueue-image
    condition: ${route-request.output.source === 'webhook'}
  - agent: process-images
    condition: ${route-request.output.source === 'queue'}

agents:
  - name: route-request
    operation: code
    config:
      code: |
        // Detect invocation source
        if (${input.messages}) {
          return { source: 'queue' };
        } else {
          return { source: 'webhook' };
        }

  - name: enqueue-image
    operation: queue
    config:
      mode: send
      queue: IMAGE_QUEUE
      message:
        body:
          imageUrl: ${input.imageUrl}
          operations: ${input.operations}

  - name: process-images
    operation: code
    config:
      code: |
        const messages = ${input.messages};
        const results = [];

        for (const msg of messages) {
          const { imageUrl, operations } = msg.body;

          // Download image
          const image = await fetch(imageUrl);
          const buffer = await image.arrayBuffer();

          // Process image (resize, compress, etc.)
          const processed = await processImage(buffer, operations);

          // Upload to R2
          await env.IMAGES.put(`processed/${msg.id}.jpg`, processed);

          results.push({
            id: msg.id,
            url: `https://cdn.example.com/processed/${msg.id}.jpg`
          });
        }

        return { results };

outputs:
  # Webhook response
  enqueued: ${enqueue-image.success}
  # Queue response
  processed: ${process-images.output.results}

Background Task Queue

name: task-worker

trigger:
  - type: queue
    queue: BACKGROUND_TASKS
    batch_size: 20
    max_retries: 5
    max_wait_time: 10

flow:
  - agent: classify-tasks
  - agent: process-email-tasks
    condition: ${classify-tasks.output.emailTasks.length > 0}
  - agent: process-api-tasks
    condition: ${classify-tasks.output.apiTasks.length > 0}
  - agent: process-data-tasks
    condition: ${classify-tasks.output.dataTasks.length > 0}

agents:
  - name: classify-tasks
    operation: code
    config:
      code: |
        const messages = ${input.messages};
        const emailTasks = [];
        const apiTasks = [];
        const dataTasks = [];

        for (const msg of messages) {
          switch (msg.body.type) {
            case 'email':
              emailTasks.push(msg);
              break;
            case 'api':
              apiTasks.push(msg);
              break;
            case 'data':
              dataTasks.push(msg);
              break;
          }
        }

        return { emailTasks, apiTasks, dataTasks };

  - name: process-email-tasks
    operation: email
    config:
      to: ${classify-tasks.output.emailTasks[0].body.recipients}
      subject: ${classify-tasks.output.emailTasks[0].body.subject}
      body: ${classify-tasks.output.emailTasks[0].body.content}

  - name: process-api-tasks
    operation: http
    config:
      url: ${classify-tasks.output.apiTasks[0].body.endpoint}
      method: POST
      body: ${classify-tasks.output.apiTasks[0].body.payload}

  - name: process-data-tasks
    operation: storage
    config:
      type: d1
      query: ${classify-tasks.output.dataTasks[0].body.query}
      params: ${classify-tasks.output.dataTasks[0].body.params}

outputs:
  processed: ${input.messages.length}

Wrangler Configuration

Define queues in wrangler.toml:
[[queues.producers]]
queue = "TASK_QUEUE"
binding = "TASK_QUEUE"

[[queues.consumers]]
queue = "TASK_QUEUE"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 3
dead_letter_queue = "TASK_DLQ"

[[queues.producers]]
queue = "TASK_DLQ"
binding = "TASK_DLQ"

[[queues.consumers]]
queue = "TASK_DLQ"
max_batch_size = 5

Performance Tips

Batch Size

  • Small batches (1-5): Low latency, real-time processing
  • Medium batches (10-20): Balanced throughput
  • Large batches (50-100): Maximum throughput, higher latency
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10  # Tune based on your needs

Max Wait Time

Control batch fill time:
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 100
    max_wait_time: 5  # Don't wait more than 5 seconds

Parallel Processing

Process messages concurrently within a batch:
agents:
  - name: parallel-process
    operation: code
    config:
      code: |
        const messages = ${input.messages};

        // Process all messages concurrently
        const results = await Promise.all(
          messages.map(async (msg) => {
            return await processMessage(msg.body);
          })
        );

        return { results };

Message Size

Keep messages small for best performance:
  • Recommended: < 128 KB per message
  • Maximum: 256 KB per message
For large data, store in R2/KV and pass reference:
agents:
  - name: enqueue-large-data
    operation: code
    config:
      code: |
        // Store large data in R2
        const dataId = generateId();
        await env.DATA.put(dataId, ${input.largeData});

        // Queue just the reference
        await env.TASK_QUEUE.send({
          dataId,
          metadata: ${input.metadata}
        });

        return { dataId };

Error Handling

Handle Individual Failures

Process messages individually to isolate failures:
agents:
  - name: safe-process
    operation: code
    config:
      code: |
        const messages = ${input.messages};
        const succeeded = [];
        const failed = [];

        for (const msg of messages) {
          try {
            const result = await processMessage(msg.body);
            succeeded.push({ id: msg.id, result });
          } catch (error) {
            failed.push({
              id: msg.id,
              error: error.message,
              willRetry: msg.attempts < 3
            });

            // Log failure
            console.error(`Message ${msg.id} failed:`, error);
          }
        }

        return { succeeded, failed };

Circuit Breaker

Stop processing if too many failures:
agents:
  - name: circuit-breaker
    operation: code
    config:
      code: |
        const messages = ${input.messages};
        const results = [];
        let failures = 0;
        const maxFailures = 3;

        for (const msg of messages) {
          if (failures >= maxFailures) {
            console.warn('Circuit breaker triggered');
            break;
          }

          try {
            const result = await processMessage(msg.body);
            results.push({ id: msg.id, result });
          } catch (error) {
            failures++;
            results.push({ id: msg.id, error: error.message });
          }
        }

        return {
          results,
          circuitBroken: failures >= maxFailures
        };

Monitoring

Queue Metrics

Track queue performance:
agents:
  - name: log-metrics
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO queue_metrics (
          batch_size,
          processed,
          succeeded,
          failed,
          duration,
          timestamp
        ) VALUES (?, ?, ?, ?, ?, ?)
      params:
        - ${input.messages.length}
        - ${process-messages.output.processed}
        - ${process-messages.output.succeeded}
        - ${process-messages.output.failed}
        - ${Date.now() - input.batch.receivedAt}
        - ${Date.now()}

Alerting

Alert on queue issues:
agents:
  - name: check-failure-rate
    operation: code
    config:
      code: |
        const failed = ${process-messages.output.failed};
        const total = ${input.messages.length};
        const failureRate = failed / total;

        if (failureRate > 0.5) {
          // High failure rate, send alert
          await fetch('${env.ALERT_WEBHOOK}', {
            method: 'POST',
            body: JSON.stringify({
              alert: 'High queue failure rate',
              rate: failureRate,
              queue: '${input.queue}'
            })
          });
        }

        return { failureRate };

Best Practices

  1. Idempotency - Design consumers to handle duplicate messages
  2. Small batches for latency - Large batches for throughput
  3. Retry with backoff - Use exponential backoff for retries
  4. Monitor DLQ - Set up alerts for DLQ growth
  5. Message references - Store large data externally, queue references
  6. Graceful degradation - Handle individual message failures
  7. Circuit breakers - Stop processing during systemic failures

Next Steps