Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The queue operation enables ensembles to send messages to and consume messages from Cloudflare Queues. Use queues for:
  • Asynchronous processing - Decouple producers and consumers
  • Batch operations - Process multiple messages efficiently
  • Reliable delivery - Automatic retries and dead letter queues
  • Load leveling - Handle traffic spikes gracefully

Queue Modes

The queue operation supports three modes:
  1. send - Send a single message
  2. send-batch - Send multiple messages at once
  3. consume - Process messages (used with queue triggers)

Sending Messages

Single Message

name: task-enqueuer

flow:
  - agent: enqueue-task

agents:
  - name: enqueue-task
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body:
          taskId: ${input.taskId}
          action: "process"
          data: ${input.data}
        contentType: "json"

outputs:
  queued: ${enqueue-task.success}

Batch Send

Send multiple messages in a single operation:
name: bulk-enqueue

flow:
  - agent: enqueue-batch

agents:
  - name: enqueue-batch
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages:
        - body:
            taskId: "task-1"
            action: "process"
        - body:
            taskId: "task-2"
            action: "process"
        - body:
            taskId: "task-3"
            action: "process"

outputs:
  sent: ${enqueue-batch.output.count}

Dynamic Batch from Input

flow:
  - agent: process-and-enqueue

agents:
  - name: process-and-enqueue
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages: ${input.tasks}  # Array of message objects

inputs:
  tasks:
    type: array
    required: true
    description: Array of tasks to enqueue
Invoke with:
curl -X POST /webhooks/bulk-enqueue \
  -d '{
    "tasks": [
      {"body": {"action": "process", "id": 1}},
      {"body": {"action": "process", "id": 2}},
      {"body": {"action": "process", "id": 3}}
    ]
  }'

Consuming Messages

Queue Trigger Consumer

Configure ensemble to process queue messages:
name: task-processor

trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10
    max_retries: 3
    max_wait_time: 5

flow:
  - agent: process-messages

agents:
  - name: process-messages
    operation: code
    config:
      script: scripts/process-queue-messages
    input:
      messages: ${input.messages}

outputs:
  processed: ${process-messages.output.processed}
  succeeded: ${process-messages.output.succeeded}
  failed: ${process-messages.output.failed}
// scripts/process-queue-messages.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processQueueMessages(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []

  for (const msg of messages) {
    try {
      // Process message
      const result = await processTask(msg.body)
      results.push({
        id: msg.id,
        success: true,
        result
      })
    } catch (error) {
      results.push({
        id: msg.id,
        success: false,
        error: error.message
      })
    }
  }

  return {
    processed: results.length,
    succeeded: results.filter(r => r.success).length,
    failed: results.filter(r => !r.success).length,
    results
  }
}
### Message Batch Structure

Queue consumers receive messages in this format:

```typescript
{
  messages: [
    {
      id: "msg-abc123",
      timestamp: 1705315200000,
      body: {
        // Your message data
        taskId: "task-1",
        action: "process"
      },
      attempts: 1
    },
    // ... more messages
  ],
  queue: "TASK_QUEUE",
  batch: {
    size: 10,
    receivedAt: 1705315200000
  }
}

Message Options

Content Type

Specify message format:
agents:
  - name: send-json
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.data}
        contentType: "json"  # json | text | bytes

Delivery Delay

Delay message delivery:
agents:
  - name: send-delayed
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        delaySeconds: 60  # Deliver after 60 seconds

Message Deduplication

Prevent duplicate messages:
agents:
  - name: send-unique
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        deduplicationId: ${input.taskId}  # Unique identifier

Retry Logic

Automatic Retries

Failed messages are automatically retried:
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3  # Retry up to 3 times
Retry backoff:
  • Attempt 1: Immediate
  • Attempt 2: 1 second delay
  • Attempt 3: 5 seconds delay
  • Attempt 4: 30 seconds delay

Retry in Consumer

Handle retries explicitly:
agents:
  - name: process-with-retry
    operation: code
    config:
      script: scripts/process-with-retry
    input:
      messages: ${input.messages}
// scripts/process-with-retry.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processWithRetry(context: AgentExecutionContext) {
  const { messages } = context.input
  const toRetry = []

  for (const msg of messages) {
    try {
      await processMessage(msg.body)
    } catch (error) {
      if (msg.attempts < 3) {
        // Retry this message
        toRetry.push(msg.id)
      } else {
        // Send to DLQ
        await sendToDLQ(msg, error)
      }
    }
  }

  return { retryIds: toRetry }
}

## Dead Letter Queue (DLQ)

### Configure DLQ

Messages that fail after max retries go to DLQ:

```yaml
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3
    dead_letter_queue: TASK_DLQ  # Optional DLQ name

Process DLQ Messages

Create separate ensemble for DLQ:
name: dlq-processor

trigger:
  - type: queue
    queue: TASK_DLQ
    batch_size: 5

flow:
  - agent: log-failures
  - agent: alert-team

agents:
  - name: log-failures
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO failed_tasks (message_id, body, error, attempts, failed_at)
        VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.messages[0].id}
        - ${input.messages[0].body}
        - ${input.messages[0].error}
        - ${input.messages[0].attempts}
        - ${Date.now()}

  - name: alert-team
    operation: email
    config:
      to: [devops@example.com]
      subject: "Queue Processing Failures"
      body: |
        ${input.messages.length} messages failed after max retries.
        See database for details.

outputs:
  logged: ${log-failures.success}

Complete Examples

Image Processing Pipeline

name: image-processor

trigger:
  # Webhook enqueues images
  - type: webhook
    path: /webhooks/upload
    methods: [POST]
    public: true

  # Queue processes images
  - type: queue
    queue: IMAGE_QUEUE
    batch_size: 5
    max_retries: 2

flow:
  # Check if this is webhook or queue invocation
  - agent: route-request
  - agent: enqueue-image
    condition: ${route-request.output.source === 'webhook'}
  - agent: process-images
    condition: ${route-request.output.source === 'queue'}

agents:
  - name: route-request
    operation: code
    config:
      script: scripts/route-request
    input:
      messages: ${input.messages}

  - name: enqueue-image
    operation: queue
    config:
      mode: send
      queue: IMAGE_QUEUE
      message:
        body:
          imageUrl: ${input.imageUrl}
          operations: ${input.operations}

  - name: process-images
    operation: code
    config:
      script: scripts/process-queue-images
    input:
      messages: ${input.messages}

outputs:
  # Webhook response
  enqueued: ${enqueue-image.success}
  # Queue response
  processed: ${process-images.output.results}
// scripts/route-request.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function routeRequest(context: AgentExecutionContext) {
  const { messages } = context.input

  // Detect invocation source
  if (messages) {
    return { source: 'queue' }
  } else {
    return { source: 'webhook' }
  }
}
// scripts/process-queue-images.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processQueueImages(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []

  for (const msg of messages) {
    const { imageUrl, operations } = msg.body

    // Download image
    const image = await fetch(imageUrl)
    const buffer = await image.arrayBuffer()

    // Process image (resize, compress, etc.)
    const processed = await processImage(buffer, operations)

    // Upload to R2
    await env.IMAGES.put(`processed/${msg.id}.jpg`, processed)

    results.push({
      id: msg.id,
      url: `https://cdn.example.com/processed/${msg.id}.jpg`
    })
  }

  return { results }
}
### Background Task Queue

```yaml
name: task-worker

trigger:
  - type: queue
    queue: BACKGROUND_TASKS
    batch_size: 20
    max_retries: 5
    max_wait_time: 10

flow:
  - agent: classify-tasks
  - agent: process-email-tasks
    condition: ${classify-tasks.output.emailTasks.length > 0}
  - agent: process-api-tasks
    condition: ${classify-tasks.output.apiTasks.length > 0}
  - agent: process-data-tasks
    condition: ${classify-tasks.output.dataTasks.length > 0}

agents:
  - name: classify-tasks
    operation: code
    config:
      script: scripts/classify-queue-tasks
    input:
      messages: ${input.messages}

  - name: process-email-tasks
    operation: email
    config:
      to: ${classify-tasks.output.emailTasks[0].body.recipients}
      subject: ${classify-tasks.output.emailTasks[0].body.subject}
      body: ${classify-tasks.output.emailTasks[0].body.content}

  - name: process-api-tasks
    operation: http
    config:
      url: ${classify-tasks.output.apiTasks[0].body.endpoint}
      method: POST
      body: ${classify-tasks.output.apiTasks[0].body.payload}

  - name: process-data-tasks
    operation: storage
    config:
      type: d1
      query: ${classify-tasks.output.dataTasks[0].body.query}
      params: ${classify-tasks.output.dataTasks[0].body.params}

outputs:
  processed: ${input.messages.length}
// scripts/classify-queue-tasks.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function classifyQueueTasks(context: AgentExecutionContext) {
  const { messages } = context.input
  const emailTasks = []
  const apiTasks = []
  const dataTasks = []

  for (const msg of messages) {
    switch (msg.body.type) {
      case 'email':
        emailTasks.push(msg)
        break
      case 'api':
        apiTasks.push(msg)
        break
      case 'data':
        dataTasks.push(msg)
        break
    }
  }

  return { emailTasks, apiTasks, dataTasks }
}
## Wrangler Configuration

Define queues in `wrangler.toml`:

```toml
[[queues.producers]]
queue = "TASK_QUEUE"
binding = "TASK_QUEUE"

[[queues.consumers]]
queue = "TASK_QUEUE"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 3
dead_letter_queue = "TASK_DLQ"

[[queues.producers]]
queue = "TASK_DLQ"
binding = "TASK_DLQ"

[[queues.consumers]]
queue = "TASK_DLQ"
max_batch_size = 5

Performance Tips

Batch Size

  • Small batches (1-5): Low latency, real-time processing
  • Medium batches (10-20): Balanced throughput
  • Large batches (50-100): Maximum throughput, higher latency
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10  # Tune based on your needs

Max Wait Time

Control batch fill time:
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 100
    max_wait_time: 5  # Don't wait more than 5 seconds

Parallel Processing

Process messages concurrently within a batch:
agents:
  - name: parallel-process
    operation: code
    config:
      script: scripts/process-messages-parallel
    input:
      messages: ${input.messages}
// scripts/process-messages-parallel.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processMessagesParallel(context: AgentExecutionContext) {
  const { messages } = context.input

  // Process all messages concurrently
  const results = await Promise.all(
    messages.map(async (msg: any) => {
      return await processMessage(msg.body)
    })
  )

  return { results }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true, body }
}

Message Size

Keep messages small for best performance:
  • Recommended: < 128 KB per message
  • Maximum: 256 KB per message
For large data, store in R2/KV and pass reference:
agents:
  - name: enqueue-large-data
    operation: code
    config:
      script: scripts/enqueue-large-data-reference
    input:
      large_data: ${input.largeData}
      metadata: ${input.metadata}
// scripts/enqueue-large-data-reference.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function enqueueLargeDataReference(context: AgentExecutionContext) {
  const { large_data, metadata } = context.input

  // Store large data in R2
  const dataId = generateId()
  await context.env.DATA.put(dataId, large_data)

  // Queue just the reference
  await context.env.TASK_QUEUE.send({
    dataId,
    metadata
  })

  return { dataId }
}

function generateId() {
  return crypto.randomUUID()
}

Error Handling

Handle Individual Failures

Process messages individually to isolate failures:
agents:
  - name: safe-process
    operation: code
    config:
      script: scripts/process-messages-safely
    input:
      messages: ${input.messages}
// scripts/process-messages-safely.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processMessagesSafely(context: AgentExecutionContext) {
  const { messages } = context.input
  const succeeded = []
  const failed = []

  for (const msg of messages) {
    try {
      const result = await processMessage(msg.body)
      succeeded.push({ id: msg.id, result })
    } catch (error: any) {
      failed.push({
        id: msg.id,
        error: error.message,
        willRetry: msg.attempts < 3
      })

      // Log failure
      console.error(`Message ${msg.id} failed:`, error)
    }
  }

  return { succeeded, failed }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true }
}

Circuit Breaker

Stop processing if too many failures:
agents:
  - name: circuit-breaker
    operation: code
    config:
      script: scripts/process-with-circuit-breaker
    input:
      messages: ${input.messages}
// scripts/process-with-circuit-breaker.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processWithCircuitBreaker(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []
  let failures = 0
  const maxFailures = 3

  for (const msg of messages) {
    if (failures >= maxFailures) {
      console.warn('Circuit breaker triggered')
      break
    }

    try {
      const result = await processMessage(msg.body)
      results.push({ id: msg.id, result })
    } catch (error: any) {
      failures++
      results.push({ id: msg.id, error: error.message })
    }
  }

  return {
    results,
    circuitBroken: failures >= maxFailures
  }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true }
}

Monitoring

Queue Metrics

Track queue performance:
agents:
  - name: log-metrics
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO queue_metrics (
          batch_size,
          processed,
          succeeded,
          failed,
          duration,
          timestamp
        ) VALUES (?, ?, ?, ?, ?, ?)
      params:
        - ${input.messages.length}
        - ${process-messages.output.processed}
        - ${process-messages.output.succeeded}
        - ${process-messages.output.failed}
        - ${Date.now() - input.batch.receivedAt}
        - ${Date.now()}

Alerting

Alert on queue issues:
agents:
  - name: check-failure-rate
    operation: code
    config:
      script: scripts/check-queue-failure-rate
    input:
      failed_count: ${process-messages.output.failed}
      total_messages: ${input.messages.length}
      queue_name: ${input.queue}
// scripts/check-queue-failure-rate.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function checkQueueFailureRate(context: AgentExecutionContext) {
  const { failed_count, total_messages, queue_name } = context.input
  const failureRate = failed_count / total_messages

  if (failureRate > 0.5) {
    // High failure rate, send alert
    await fetch(context.env.ALERT_WEBHOOK, {
      method: 'POST',
      body: JSON.stringify({
        alert: 'High queue failure rate',
        rate: failureRate,
        queue: queue_name
      })
    })
  }

  return { failureRate }
}

Best Practices

  1. Idempotency - Design consumers to handle duplicate messages
  2. Small batches for latency - Large batches for throughput
  3. Retry with backoff - Use exponential backoff for retries
  4. Monitor DLQ - Set up alerts for DLQ growth
  5. Message references - Store large data externally, queue references
  6. Graceful degradation - Handle individual message failures
  7. Circuit breakers - Stop processing during systemic failures

Next Steps

Triggers

Configure queue triggers

Event-Driven

Event-driven patterns

Storage

Store queue processing results

Webhooks

Combine webhooks with queues