> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# queue Operation

> Process Cloudflare Queue messages with batch processing, retries, and dead letter queues

## Overview

The `queue` operation enables ensembles to send messages to and consume messages from Cloudflare Queues. Use queues for:

* **Asynchronous processing** - Decouple producers and consumers
* **Batch operations** - Process multiple messages efficiently
* **Reliable delivery** - Automatic retries and dead letter queues
* **Load leveling** - Handle traffic spikes gracefully

## Queue Modes

The queue operation supports three modes:

1. **send** - Send a single message
2. **send-batch** - Send multiple messages at once
3. **consume** - Process messages (used with queue triggers)

## Sending Messages

### Single Message

```yaml theme={null}
name: task-enqueuer

flow:
  - agent: enqueue-task

agents:
  - name: enqueue-task
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body:
          taskId: ${input.taskId}
          action: "process"
          data: ${input.data}
        contentType: "json"

outputs:
  queued: ${enqueue-task.success}
```

### Batch Send

Send multiple messages in a single operation:

```yaml theme={null}
name: bulk-enqueue

flow:
  - agent: enqueue-batch

agents:
  - name: enqueue-batch
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages:
        - body:
            taskId: "task-1"
            action: "process"
        - body:
            taskId: "task-2"
            action: "process"
        - body:
            taskId: "task-3"
            action: "process"

outputs:
  sent: ${enqueue-batch.output.count}
```

### Dynamic Batch from Input

```yaml theme={null}
flow:
  - agent: process-and-enqueue

agents:
  - name: process-and-enqueue
    operation: queue
    config:
      mode: send-batch
      queue: TASK_QUEUE
      messages: ${input.tasks}  # Array of message objects

inputs:
  tasks:
    type: array
    required: true
    description: Array of tasks to enqueue
```

Invoke with:

```bash theme={null}
curl -X POST /webhooks/bulk-enqueue \
  -d '{
    "tasks": [
      {"body": {"action": "process", "id": 1}},
      {"body": {"action": "process", "id": 2}},
      {"body": {"action": "process", "id": 3}}
    ]
  }'
```

## Consuming Messages

### Queue Trigger Consumer

Configure ensemble to process queue messages:

```yaml theme={null}
name: task-processor

trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10
    max_retries: 3
    max_wait_time: 5

flow:
  - agent: process-messages

agents:
  - name: process-messages
    operation: code
    config:
      script: scripts/process-queue-messages
    input:
      messages: ${input.messages}

outputs:
  processed: ${process-messages.output.processed}
  succeeded: ${process-messages.output.succeeded}
  failed: ${process-messages.output.failed}
```

```typescript theme={null}
// scripts/process-queue-messages.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processQueueMessages(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []

  for (const msg of messages) {
    try {
      // Process message
      const result = await processTask(msg.body)
      results.push({
        id: msg.id,
        success: true,
        result
      })
    } catch (error) {
      results.push({
        id: msg.id,
        success: false,
        error: error.message
      })
    }
  }

  return {
    processed: results.length,
    succeeded: results.filter(r => r.success).length,
    failed: results.filter(r => !r.success).length,
    results
  }
}
```

````yaml theme={null}
### Message Batch Structure

Queue consumers receive messages in this format:

```typescript
{
  messages: [
    {
      id: "msg-abc123",
      timestamp: 1705315200000,
      body: {
        // Your message data
        taskId: "task-1",
        action: "process"
      },
      attempts: 1
    },
    // ... more messages
  ],
  queue: "TASK_QUEUE",
  batch: {
    size: 10,
    receivedAt: 1705315200000
  }
}
````

## Message Options

### Content Type

Specify message format:

```yaml theme={null}
agents:
  - name: send-json
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.data}
        contentType: "json"  # json | text | bytes
```

### Delivery Delay

Delay message delivery:

```yaml theme={null}
agents:
  - name: send-delayed
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        delaySeconds: 60  # Deliver after 60 seconds
```

### Message Deduplication

Prevent duplicate messages:

```yaml theme={null}
agents:
  - name: send-unique
    operation: queue
    config:
      mode: send
      queue: TASK_QUEUE
      message:
        body: ${input.task}
        deduplicationId: ${input.taskId}  # Unique identifier
```

## Retry Logic

### Automatic Retries

Failed messages are automatically retried:

```yaml theme={null}
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3  # Retry up to 3 times
```

**Retry backoff:**

* Attempt 1: Immediate
* Attempt 2: 1 second delay
* Attempt 3: 5 seconds delay
* Attempt 4: 30 seconds delay

### Retry in Consumer

Handle retries explicitly:

```yaml theme={null}
agents:
  - name: process-with-retry
    operation: code
    config:
      script: scripts/process-with-retry
    input:
      messages: ${input.messages}
```

```typescript theme={null}
// scripts/process-with-retry.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processWithRetry(context: AgentExecutionContext) {
  const { messages } = context.input
  const toRetry = []

  for (const msg of messages) {
    try {
      await processMessage(msg.body)
    } catch (error) {
      if (msg.attempts < 3) {
        // Retry this message
        toRetry.push(msg.id)
      } else {
        // Send to DLQ
        await sendToDLQ(msg, error)
      }
    }
  }

  return { retryIds: toRetry }
}
```

````yaml theme={null}

## Dead Letter Queue (DLQ)

### Configure DLQ

Messages that fail after max retries go to DLQ:

```yaml
trigger:
  - type: queue
    queue: TASK_QUEUE
    max_retries: 3
    dead_letter_queue: TASK_DLQ  # Optional DLQ name
````

### Process DLQ Messages

Create separate ensemble for DLQ:

```yaml theme={null}
name: dlq-processor

trigger:
  - type: queue
    queue: TASK_DLQ
    batch_size: 5

flow:
  - agent: log-failures
  - agent: alert-team

agents:
  - name: log-failures
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO failed_tasks (message_id, body, error, attempts, failed_at)
        VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.messages[0].id}
        - ${input.messages[0].body}
        - ${input.messages[0].error}
        - ${input.messages[0].attempts}
        - ${Date.now()}

  - name: alert-team
    operation: email
    config:
      to: [devops@example.com]
      subject: "Queue Processing Failures"
      body: |
        ${input.messages.length} messages failed after max retries.
        See database for details.

outputs:
  logged: ${log-failures.success}
```

## Complete Examples

### Image Processing Pipeline

```yaml theme={null}
name: image-processor

trigger:
  # Webhook enqueues images
  - type: webhook
    path: /webhooks/upload
    methods: [POST]
    public: true

  # Queue processes images
  - type: queue
    queue: IMAGE_QUEUE
    batch_size: 5
    max_retries: 2

flow:
  # Check if this is webhook or queue invocation
  - agent: route-request
  - agent: enqueue-image
    condition: ${route-request.output.source === 'webhook'}
  - agent: process-images
    condition: ${route-request.output.source === 'queue'}

agents:
  - name: route-request
    operation: code
    config:
      script: scripts/route-request
    input:
      messages: ${input.messages}

  - name: enqueue-image
    operation: queue
    config:
      mode: send
      queue: IMAGE_QUEUE
      message:
        body:
          imageUrl: ${input.imageUrl}
          operations: ${input.operations}

  - name: process-images
    operation: code
    config:
      script: scripts/process-queue-images
    input:
      messages: ${input.messages}

outputs:
  # Webhook response
  enqueued: ${enqueue-image.success}
  # Queue response
  processed: ${process-images.output.results}
```

```typescript theme={null}
// scripts/route-request.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function routeRequest(context: AgentExecutionContext) {
  const { messages } = context.input

  // Detect invocation source
  if (messages) {
    return { source: 'queue' }
  } else {
    return { source: 'webhook' }
  }
}
```

```typescript theme={null}
// scripts/process-queue-images.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processQueueImages(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []

  for (const msg of messages) {
    const { imageUrl, operations } = msg.body

    // Download image
    const image = await fetch(imageUrl)
    const buffer = await image.arrayBuffer()

    // Process image (resize, compress, etc.)
    const processed = await processImage(buffer, operations)

    // Upload to R2
    await env.IMAGES.put(`processed/${msg.id}.jpg`, processed)

    results.push({
      id: msg.id,
      url: `https://cdn.example.com/processed/${msg.id}.jpg`
    })
  }

  return { results }
}
```

````yaml theme={null}
### Background Task Queue

```yaml
name: task-worker

trigger:
  - type: queue
    queue: BACKGROUND_TASKS
    batch_size: 20
    max_retries: 5
    max_wait_time: 10

flow:
  - agent: classify-tasks
  - agent: process-email-tasks
    condition: ${classify-tasks.output.emailTasks.length > 0}
  - agent: process-api-tasks
    condition: ${classify-tasks.output.apiTasks.length > 0}
  - agent: process-data-tasks
    condition: ${classify-tasks.output.dataTasks.length > 0}

agents:
  - name: classify-tasks
    operation: code
    config:
      script: scripts/classify-queue-tasks
    input:
      messages: ${input.messages}

  - name: process-email-tasks
    operation: email
    config:
      to: ${classify-tasks.output.emailTasks[0].body.recipients}
      subject: ${classify-tasks.output.emailTasks[0].body.subject}
      body: ${classify-tasks.output.emailTasks[0].body.content}

  - name: process-api-tasks
    operation: http
    config:
      url: ${classify-tasks.output.apiTasks[0].body.endpoint}
      method: POST
      body: ${classify-tasks.output.apiTasks[0].body.payload}

  - name: process-data-tasks
    operation: storage
    config:
      type: d1
      query: ${classify-tasks.output.dataTasks[0].body.query}
      params: ${classify-tasks.output.dataTasks[0].body.params}

outputs:
  processed: ${input.messages.length}
````

```typescript theme={null}
// scripts/classify-queue-tasks.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function classifyQueueTasks(context: AgentExecutionContext) {
  const { messages } = context.input
  const emailTasks = []
  const apiTasks = []
  const dataTasks = []

  for (const msg of messages) {
    switch (msg.body.type) {
      case 'email':
        emailTasks.push(msg)
        break
      case 'api':
        apiTasks.push(msg)
        break
      case 'data':
        dataTasks.push(msg)
        break
    }
  }

  return { emailTasks, apiTasks, dataTasks }
}
```

````yaml theme={null}
## Wrangler Configuration

Define queues in `wrangler.toml`:

```toml
[[queues.producers]]
queue = "TASK_QUEUE"
binding = "TASK_QUEUE"

[[queues.consumers]]
queue = "TASK_QUEUE"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 3
dead_letter_queue = "TASK_DLQ"

[[queues.producers]]
queue = "TASK_DLQ"
binding = "TASK_DLQ"

[[queues.consumers]]
queue = "TASK_DLQ"
max_batch_size = 5
````

## Performance Tips

### Batch Size

* **Small batches (1-5)**: Low latency, real-time processing
* **Medium batches (10-20)**: Balanced throughput
* **Large batches (50-100)**: Maximum throughput, higher latency

```yaml theme={null}
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 10  # Tune based on your needs
```

### Max Wait Time

Control batch fill time:

```yaml theme={null}
trigger:
  - type: queue
    queue: TASK_QUEUE
    batch_size: 100
    max_wait_time: 5  # Don't wait more than 5 seconds
```

### Parallel Processing

Process messages concurrently within a batch:

```yaml theme={null}
agents:
  - name: parallel-process
    operation: code
    config:
      script: scripts/process-messages-parallel
    input:
      messages: ${input.messages}
```

```typescript theme={null}
// scripts/process-messages-parallel.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processMessagesParallel(context: AgentExecutionContext) {
  const { messages } = context.input

  // Process all messages concurrently
  const results = await Promise.all(
    messages.map(async (msg: any) => {
      return await processMessage(msg.body)
    })
  )

  return { results }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true, body }
}
```

```yaml theme={null}
```

### Message Size

Keep messages small for best performance:

* **Recommended**: \< 128 KB per message
* **Maximum**: 256 KB per message

For large data, store in R2/KV and pass reference:

```yaml theme={null}
agents:
  - name: enqueue-large-data
    operation: code
    config:
      script: scripts/enqueue-large-data-reference
    input:
      large_data: ${input.largeData}
      metadata: ${input.metadata}
```

```typescript theme={null}
// scripts/enqueue-large-data-reference.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function enqueueLargeDataReference(context: AgentExecutionContext) {
  const { large_data, metadata } = context.input

  // Store large data in R2
  const dataId = generateId()
  await context.env.DATA.put(dataId, large_data)

  // Queue just the reference
  await context.env.TASK_QUEUE.send({
    dataId,
    metadata
  })

  return { dataId }
}

function generateId() {
  return crypto.randomUUID()
}
```

```yaml theme={null}
```

## Error Handling

### Handle Individual Failures

Process messages individually to isolate failures:

```yaml theme={null}
agents:
  - name: safe-process
    operation: code
    config:
      script: scripts/process-messages-safely
    input:
      messages: ${input.messages}
```

```typescript theme={null}
// scripts/process-messages-safely.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processMessagesSafely(context: AgentExecutionContext) {
  const { messages } = context.input
  const succeeded = []
  const failed = []

  for (const msg of messages) {
    try {
      const result = await processMessage(msg.body)
      succeeded.push({ id: msg.id, result })
    } catch (error: any) {
      failed.push({
        id: msg.id,
        error: error.message,
        willRetry: msg.attempts < 3
      })

      // Log failure
      console.error(`Message ${msg.id} failed:`, error)
    }
  }

  return { succeeded, failed }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true }
}
```

```yaml theme={null}
```

### Circuit Breaker

Stop processing if too many failures:

```yaml theme={null}
agents:
  - name: circuit-breaker
    operation: code
    config:
      script: scripts/process-with-circuit-breaker
    input:
      messages: ${input.messages}
```

```typescript theme={null}
// scripts/process-with-circuit-breaker.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processWithCircuitBreaker(context: AgentExecutionContext) {
  const { messages } = context.input
  const results = []
  let failures = 0
  const maxFailures = 3

  for (const msg of messages) {
    if (failures >= maxFailures) {
      console.warn('Circuit breaker triggered')
      break
    }

    try {
      const result = await processMessage(msg.body)
      results.push({ id: msg.id, result })
    } catch (error: any) {
      failures++
      results.push({ id: msg.id, error: error.message })
    }
  }

  return {
    results,
    circuitBroken: failures >= maxFailures
  }
}

async function processMessage(body: any) {
  // Process message implementation
  return { processed: true }
}
```

```yaml theme={null}
```

## Monitoring

### Queue Metrics

Track queue performance:

```yaml theme={null}
agents:
  - name: log-metrics
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO queue_metrics (
          batch_size,
          processed,
          succeeded,
          failed,
          duration,
          timestamp
        ) VALUES (?, ?, ?, ?, ?, ?)
      params:
        - ${input.messages.length}
        - ${process-messages.output.processed}
        - ${process-messages.output.succeeded}
        - ${process-messages.output.failed}
        - ${Date.now() - input.batch.receivedAt}
        - ${Date.now()}
```

### Alerting

Alert on queue issues:

```yaml theme={null}
agents:
  - name: check-failure-rate
    operation: code
    config:
      script: scripts/check-queue-failure-rate
    input:
      failed_count: ${process-messages.output.failed}
      total_messages: ${input.messages.length}
      queue_name: ${input.queue}
```

```typescript theme={null}
// scripts/check-queue-failure-rate.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function checkQueueFailureRate(context: AgentExecutionContext) {
  const { failed_count, total_messages, queue_name } = context.input
  const failureRate = failed_count / total_messages

  if (failureRate > 0.5) {
    // High failure rate, send alert
    await fetch(context.env.ALERT_WEBHOOK, {
      method: 'POST',
      body: JSON.stringify({
        alert: 'High queue failure rate',
        rate: failureRate,
        queue: queue_name
      })
    })
  }

  return { failureRate }
}
```

```yaml theme={null}
```

## Best Practices

1. **Idempotency** - Design consumers to handle duplicate messages
2. **Small batches for latency** - Large batches for throughput
3. **Retry with backoff** - Use exponential backoff for retries
4. **Monitor DLQ** - Set up alerts for DLQ growth
5. **Message references** - Store large data externally, queue references
6. **Graceful degradation** - Handle individual message failures
7. **Circuit breakers** - Stop processing during systemic failures

## Next Steps

<CardGroup cols={2}>
  <Card title="Triggers" icon="bolt" href="/conductor/core-concepts/triggers">
    Configure queue triggers
  </Card>

  <Card title="Event-Driven" icon="shuffle" href="/conductor/playbooks/event-driven-workflow">
    Event-driven patterns
  </Card>

  <Card title="Storage" icon="database" href="/conductor/operations/storage">
    Store queue processing results
  </Card>

  <Card title="Webhooks" icon="webhook" href="/api/http/webhooks">
    Combine webhooks with queues
  </Card>
</CardGroup>
