Skip to main content

Overview

The queue operation enables ensembles to send messages to and consume messages from Cloudflare Queues. Use queues for:
  • Asynchronous processing - Decouple producers and consumers
  • Batch operations - Process multiple messages efficiently
  • Reliable delivery - Automatic retries and dead letter queues
  • Load leveling - Handle traffic spikes gracefully

Queue Modes

The queue operation supports three modes:
  1. send - Send a single message
  2. send-batch - Send multiple messages at once
  3. consume - Process messages (used with queue triggers)

Sending Messages

Single Message

name: task-enqueuer

flow:
  - agent: enqueue-task

agents:
  - name: enqueue-task
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body:
          taskId: ${input.taskId}
          action: "process"
          data: ${input.data}
        contentType: "json"

outputs:
  queued: ${enqueue-task.success}

Batch Send

Send multiple messages in a single operation:
name: bulk-enqueue

flow:
  - agent: enqueue-batch

agents:
  - name: enqueue-batch
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages:
        - body:
            taskId: "task-1"
            action: "process"
        - body:
            taskId: "task-2"
            action: "process"
        - body:
            taskId: "task-3"
            action: "process"

outputs:
  sent: ${enqueue-batch.output.count}

Dynamic Batch from Input

flow:
  - agent: process-and-enqueue

agents:
  - name: process-and-enqueue
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages: ${input.tasks}  # Array of message objects

inputs:
  tasks:
    type: array
    required: true
    description: Array of tasks to enqueue
Invoke with:
curl -X POST /webhooks/bulk-enqueue \
  -d '{
    "tasks": [
      {"body": {"action": "process", "id": 1}},
      {"body": {"action": "process", "id": 2}},
      {"body": {"action": "process", "id": 3}}
    ]
  }'

Consuming Messages

Queue Trigger Consumer

Configure ensemble to process queue messages:
name: task-processor

trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10
    max_retries: 3
    max_wait_time: 5

flow:
  - agent: process-messages

agents:
  - name: process-messages
    operation: code
    config:
      script: scripts/process-queue-messages
    input:
      messages: ${input.messages}

outputs:
  processed: ${process-messages.output.processed}
  succeeded: ${process-messages.output.succeeded}
  failed: ${process-messages.output.failed}
// scripts/process-queue-messages.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processQueueMessages(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []

  for (const msg of messages) {
    try {
      // Process message
      const result = await processTask(msg.body)
      results.push({
        id: msg.id,
        success: true,
        result
      })
    } catch (error) {
      results.push({
        id: msg.id,
        success: false,
        error: error.message
      })
    }
  }

  return {
    processed: results.length,
    succeeded: results.filter(r => r.success).length,
    failed: results.filter(r => !r.success).length,
    results
  }
}
### Message Batch Structure

Queue consumers receive messages in this format:

```typescript
{
  messages: [
    {
      id: "msg-abc123",
      timestamp: 1705315200000,
      body: {
        // Your message data
        taskId: "task-1",
        action: "process"
      },
      attempts: 1
    },
    // ... more messages
  ],
  queue: "TASK_QUEUE",
  batch: {
    size: 10,
    receivedAt: 1705315200000
  }
}

Message Options

Content Type

Specify message format:
agents:
  - name: send-json
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.data}
        contentType: "json"  # json | text | bytes

Delivery Delay

Delay message delivery:
agents:
  - name: send-delayed
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        delaySeconds: 60  # Deliver after 60 seconds

Message Deduplication

Prevent duplicate messages:
agents:
  - name: send-unique
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        deduplicationId: ${input.taskId}  # Unique identifier

Retry Logic

Automatic Retries

Failed messages are automatically retried:
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3  # Retry up to 3 times
Retry backoff:
  • Attempt 1: Immediate
  • Attempt 2: 1 second delay
  • Attempt 3: 5 seconds delay
  • Attempt 4: 30 seconds delay

Retry in Consumer

Handle retries explicitly:
agents:
  - name: process-with-retry
    operation: code
    config:
      script: scripts/process-with-retry
    input:
      messages: ${input.messages}
// scripts/process-with-retry.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processWithRetry(context: AgentExecutionContext) {
  const { messages } = context.input
  const toRetry = []

  for (const msg of messages) {
    try {
      await processMessage(msg.body)
    } catch (error) {
      if (msg.attempts < 3) {
        // Retry this message
        toRetry.push(msg.id)
      } else {
        // Send to DLQ
        await sendToDLQ(msg, error)
      }
    }
  }

  return { retryIds: toRetry }
}

## Dead Letter Queue (DLQ)

### Configure DLQ

Messages that fail after max retries go to DLQ:

```yaml
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3
    dead_letter_queue: TASK_DLQ  # Optional DLQ name

Process DLQ Messages

Create separate ensemble for DLQ:
name: dlq-processor

trigger:
  - type: queue
    queue: TASK_DLQ
    batch_size: 5

flow:
  - agent: log-failures
  - agent: alert-team

agents:
  - name: log-failures
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO failed_tasks (message_id, body, error, attempts, failed_at)
        VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.messages[0].id}
        - ${input.messages[0].body}
        - ${input.messages[0].error}
        - ${input.messages[0].attempts}
        - ${Date.now()}

  - name: alert-team
    operation: email
    config:
      to: [[email protected]]
      subject: "Queue Processing Failures"
      body: |
        ${input.messages.length} messages failed after max retries.
        See database for details.

outputs:
  logged: ${log-failures.success}

Complete Examples

Image Processing Pipeline

name: image-processor

trigger:
  # Webhook enqueues images
  - type: webhook
    path: /webhooks/upload
    methods: [POST]
    public: true

  # Queue processes images
  - type: queue
    queue: IMAGE_QUEUE
    batch_size: 5
    max_retries: 2

flow:
  # Check if this is webhook or queue invocation
  - agent: route-request
  - agent: enqueue-image
    condition: ${route-request.output.source === 'webhook'}
  - agent: process-images
    condition: ${route-request.output.source === 'queue'}

agents:
  - name: route-request
    operation: code
    config:
      script: scripts/route-request
    input:
      messages: ${input.messages}

  - name: enqueue-image
    operation: queue
    config:
      mode: send
      queue: IMAGE_QUEUE
      message:
        body:
          imageUrl: ${input.imageUrl}
          operations: ${input.operations}

  - name: process-images
    operation: code
    config:
      script: scripts/process-queue-images
    input:
      messages: ${input.messages}

outputs:
  # Webhook response
  enqueued: ${enqueue-image.success}
  # Queue response
  processed: ${process-images.output.results}
// scripts/route-request.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function routeRequest(context: AgentExecutionContext) {
  const { messages } = context.input

  // Detect invocation source
  if (messages) {
    return { source: 'queue' }
  } else {
    return { source: 'webhook' }
  }
}
// scripts/process-queue-images.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processQueueImages(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []

  for (const msg of messages) {
    const { imageUrl, operations } = msg.body

    // Download image
    const image = await fetch(imageUrl)
    const buffer = await image.arrayBuffer()

    // Process image (resize, compress, etc.)
    const processed = await processImage(buffer, operations)

    // Upload to R2
    await env.IMAGES.put(`processed/${msg.id}.jpg`, processed)

    results.push({
      id: msg.id,
      url: `https://cdn.example.com/processed/${msg.id}.jpg`
    })
  }

  return { results }
}
### Background Task Queue

```yaml
name: task-worker

trigger:
  - type: queue
    queue: BACKGROUND_TASKS
    batch_size: 20
    max_retries: 5
    max_wait_time: 10

flow:
  - agent: classify-tasks
  - agent: process-email-tasks
    condition: ${classify-tasks.output.emailTasks.length > 0}
  - agent: process-api-tasks
    condition: ${classify-tasks.output.apiTasks.length > 0}
  - agent: process-data-tasks
    condition: ${classify-tasks.output.dataTasks.length > 0}

agents:
  - name: classify-tasks
    operation: code
    config:
      script: scripts/classify-queue-tasks
    input:
      messages: ${input.messages}

  - name: process-email-tasks
    operation: email
    config:
      to: ${classify-tasks.output.emailTasks[0].body.recipients}
      subject: ${classify-tasks.output.emailTasks[0].body.subject}
      body: ${classify-tasks.output.emailTasks[0].body.content}

  - name: process-api-tasks
    operation: http
    config:
      url: ${classify-tasks.output.apiTasks[0].body.endpoint}
      method: POST
      body: ${classify-tasks.output.apiTasks[0].body.payload}

  - name: process-data-tasks
    operation: storage
    config:
      type: d1
      query: ${classify-tasks.output.dataTasks[0].body.query}
      params: ${classify-tasks.output.dataTasks[0].body.params}

outputs:
  processed: ${input.messages.length}
// scripts/classify-queue-tasks.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function classifyQueueTasks(context: AgentExecutionContext) {
  const { messages } = context.input
  const emailTasks = []
  const apiTasks = []
  const dataTasks = []

  for (const msg of messages) {
    switch (msg.body.type) {
      case 'email':
        emailTasks.push(msg)
        break
      case 'api':
        apiTasks.push(msg)
        break
      case 'data':
        dataTasks.push(msg)
        break
    }
  }

  return { emailTasks, apiTasks, dataTasks }
}
## Wrangler Configuration

Define queues in `wrangler.toml`:

```toml
[[queues.producers]]
queue = "TASK_QUEUE"
binding = "TASK_QUEUE"

[[queues.consumers]]
queue = "TASK_QUEUE"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 3
dead_letter_queue = "TASK_DLQ"

[[queues.producers]]
queue = "TASK_DLQ"
binding = "TASK_DLQ"

[[queues.consumers]]
queue = "TASK_DLQ"
max_batch_size = 5

Performance Tips

Batch Size

  • Small batches (1-5): Low latency, real-time processing
  • Medium batches (10-20): Balanced throughput
  • Large batches (50-100): Maximum throughput, higher latency
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10  # Tune based on your needs

Max Wait Time

Control batch fill time:
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 100
    max_wait_time: 5  # Don't wait more than 5 seconds

Parallel Processing

Process messages concurrently within a batch:
agents:
  - name: parallel-process
    operation: code
    config:
      script: scripts/process-messages-parallel
    input:
      messages: ${input.messages}
// scripts/process-messages-parallel.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processMessagesParallel(context: AgentExecutionContext) {
  const { messages } = context.input

  // Process all messages concurrently
  const results = await Promise.all(
    messages.map(async (msg: any) => {
      return await processMessage(msg.body)
    })
  )

  return { results }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true, body }
}

Message Size

Keep messages small for best performance:
  • Recommended: < 128 KB per message
  • Maximum: 256 KB per message
For large data, store in R2/KV and pass reference:
agents:
  - name: enqueue-large-data
    operation: code
    config:
      script: scripts/enqueue-large-data-reference
    input:
      large_data: ${input.largeData}
      metadata: ${input.metadata}
// scripts/enqueue-large-data-reference.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function enqueueLargeDataReference(context: AgentExecutionContext) {
  const { large_data, metadata } = context.input

  // Store large data in R2
  const dataId = generateId()
  await context.env.DATA.put(dataId, large_data)

  // Queue just the reference
  await context.env.TASK_QUEUE.send({
    dataId,
    metadata
  })

  return { dataId }
}

function generateId() {
  return crypto.randomUUID()
}

Error Handling

Handle Individual Failures

Process messages individually to isolate failures:
agents:
  - name: safe-process
    operation: code
    config:
      script: scripts/process-messages-safely
    input:
      messages: ${input.messages}
// scripts/process-messages-safely.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processMessagesSafely(context: AgentExecutionContext) {
  const { messages } = context.input
  const succeeded = []
  const failed = []

  for (const msg of messages) {
    try {
      const result = await processMessage(msg.body)
      succeeded.push({ id: msg.id, result })
    } catch (error: any) {
      failed.push({
        id: msg.id,
        error: error.message,
        willRetry: msg.attempts < 3
      })

      // Log failure
      console.error(`Message ${msg.id} failed:`, error)
    }
  }

  return { succeeded, failed }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true }
}

Circuit Breaker

Stop processing if too many failures:
agents:
  - name: circuit-breaker
    operation: code
    config:
      script: scripts/process-with-circuit-breaker
    input:
      messages: ${input.messages}
// scripts/process-with-circuit-breaker.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processWithCircuitBreaker(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []
  let failures = 0
  const maxFailures = 3

  for (const msg of messages) {
    if (failures >= maxFailures) {
      console.warn('Circuit breaker triggered')
      break
    }

    try {
      const result = await processMessage(msg.body)
      results.push({ id: msg.id, result })
    } catch (error: any) {
      failures++
      results.push({ id: msg.id, error: error.message })
    }
  }

  return {
    results,
    circuitBroken: failures >= maxFailures
  }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true }
}

Monitoring

Queue Metrics

Track queue performance:
agents:
  - name: log-metrics
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO queue_metrics (
          batch_size,
          processed,
          succeeded,
          failed,
          duration,
          timestamp
        ) VALUES (?, ?, ?, ?, ?, ?)
      params:
        - ${input.messages.length}
        - ${process-messages.output.processed}
        - ${process-messages.output.succeeded}
        - ${process-messages.output.failed}
        - ${Date.now() - input.batch.receivedAt}
        - ${Date.now()}

Alerting

Alert on queue issues:
agents:
  - name: check-failure-rate
    operation: code
    config:
      script: scripts/check-queue-failure-rate
    input:
      failed_count: ${process-messages.output.failed}
      total_messages: ${input.messages.length}
      queue_name: ${input.queue}
// scripts/check-queue-failure-rate.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function checkQueueFailureRate(context: AgentExecutionContext) {
  const { failed_count, total_messages, queue_name } = context.input
  const failureRate = failed_count / total_messages

  if (failureRate > 0.5) {
    // High failure rate, send alert
    await fetch(context.env.ALERT_WEBHOOK, {
      method: 'POST',
      body: JSON.stringify({
        alert: 'High queue failure rate',
        rate: failureRate,
        queue: queue_name
      })
    })
  }

  return { failureRate }
}

Best Practices

  1. Idempotency - Design consumers to handle duplicate messages
  2. Small batches for latency - Large batches for throughput
  3. Retry with backoff - Use exponential backoff for retries
  4. Monitor DLQ - Set up alerts for DLQ growth
  5. Message references - Store large data externally, queue references
  6. Graceful degradation - Handle individual message failures
  7. Circuit breakers - Stop processing during systemic failures

Next Steps