Documentation Index Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The queue operation enables ensembles to send messages to and consume messages from Cloudflare Queues. Use queues for:
Asynchronous processing - Decouple producers and consumers
Batch operations - Process multiple messages efficiently
Reliable delivery - Automatic retries and dead letter queues
Load leveling - Handle traffic spikes gracefully
Queue Modes
The queue operation supports three modes:
send - Send a single message
send-batch - Send multiple messages at once
consume - Process messages (used with queue triggers)
Sending Messages
Single Message
name : task-enqueuer
flow :
- agent : enqueue-task
agents :
- name : enqueue-task
operation : queue
config :
mode : send
queue : TASK_QUEUE
message :
body :
taskId : ${input.taskId}
action : "process"
data : ${input.data}
contentType : "json"
outputs :
queued : ${enqueue-task.success}
Batch Send
Send multiple messages in a single operation:
name : bulk-enqueue
flow :
- agent : enqueue-batch
agents :
- name : enqueue-batch
operation : queue
config :
mode : send-batch
queue : TASK_QUEUE
messages :
- body :
taskId : "task-1"
action : "process"
- body :
taskId : "task-2"
action : "process"
- body :
taskId : "task-3"
action : "process"
outputs :
sent : ${enqueue-batch.output.count}
flow :
- agent : process-and-enqueue
agents :
- name : process-and-enqueue
operation : queue
config :
mode : send-batch
queue : TASK_QUEUE
messages : ${input.tasks} # Array of message objects
inputs :
tasks :
type : array
required : true
description : Array of tasks to enqueue
Invoke with:
curl -X POST /webhooks/bulk-enqueue \
-d '{
"tasks": [
{"body": {"action": "process", "id": 1}},
{"body": {"action": "process", "id": 2}},
{"body": {"action": "process", "id": 3}}
]
}'
Consuming Messages
Queue Trigger Consumer
Configure ensemble to process queue messages:
name : task-processor
trigger :
- type : queue
queue : TASK_QUEUE
batch_size : 10
max_retries : 3
max_wait_time : 5
flow :
- agent : process-messages
agents :
- name : process-messages
operation : code
config :
script : scripts/process-queue-messages
input :
messages : ${input.messages}
outputs :
processed : ${process-messages.output.processed}
succeeded : ${process-messages.output.succeeded}
failed : ${process-messages.output.failed}
// scripts/process-queue-messages.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function processQueueMessages ( context : AgentExecutionContext ) {
const { messages } = context . input
const results = []
for ( const msg of messages ) {
try {
// Process message
const result = await processTask ( msg . body )
results . push ({
id: msg . id ,
success: true ,
result
})
} catch ( error ) {
results . push ({
id: msg . id ,
success: false ,
error: error . message
})
}
}
return {
processed: results . length ,
succeeded: results . filter ( r => r . success ). length ,
failed: results . filter ( r => ! r . success ). length ,
results
}
}
### Message Batch Structure
Queue consumers receive messages in this format :
``` typescript
{
messages : [
{
id : "msg-abc123" ,
timestamp : 1705315200000 ,
body : {
// Your message data
taskId : "task-1" ,
action : "process"
},
attempts : 1
},
// ... more messages
],
queue : "TASK_QUEUE" ,
batch : {
size : 10 ,
receivedAt : 1705315200000
}
}
Message Options
Content Type
Specify message format:
agents :
- name : send-json
operation : queue
config :
mode : send
queue : TASK_QUEUE
message :
body : ${input.data}
contentType : "json" # json | text | bytes
Delivery Delay
Delay message delivery:
agents :
- name : send-delayed
operation : queue
config :
mode : send
queue : TASK_QUEUE
message :
body : ${input.task}
delaySeconds : 60 # Deliver after 60 seconds
Message Deduplication
Prevent duplicate messages:
agents :
- name : send-unique
operation : queue
config :
mode : send
queue : TASK_QUEUE
message :
body : ${input.task}
deduplicationId : ${input.taskId} # Unique identifier
Retry Logic
Automatic Retries
Failed messages are automatically retried:
trigger :
- type : queue
queue : TASK_QUEUE
max_retries : 3 # Retry up to 3 times
Retry backoff:
Attempt 1: Immediate
Attempt 2: 1 second delay
Attempt 3: 5 seconds delay
Attempt 4: 30 seconds delay
Retry in Consumer
Handle retries explicitly:
agents :
- name : process-with-retry
operation : code
config :
script : scripts/process-with-retry
input :
messages : ${input.messages}
// scripts/process-with-retry.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function processWithRetry ( context : AgentExecutionContext ) {
const { messages } = context . input
const toRetry = []
for ( const msg of messages ) {
try {
await processMessage ( msg . body )
} catch ( error ) {
if ( msg . attempts < 3 ) {
// Retry this message
toRetry . push ( msg . id )
} else {
// Send to DLQ
await sendToDLQ ( msg , error )
}
}
}
return { retryIds: toRetry }
}
## Dead Letter Queue (DLQ)
### Configure DLQ
Messages that fail after max retries go to DLQ :
``` yaml
trigger :
- type : queue
queue : TASK_QUEUE
max_retries : 3
dead_letter_queue : TASK_DLQ # Optional DLQ name
Process DLQ Messages
Create separate ensemble for DLQ:
name : dlq-processor
trigger :
- type : queue
queue : TASK_DLQ
batch_size : 5
flow :
- agent : log-failures
- agent : alert-team
agents :
- name : log-failures
operation : storage
config :
type : d1
query : |
INSERT INTO failed_tasks (message_id, body, error, attempts, failed_at)
VALUES (?, ?, ?, ?, ?)
params :
- ${input.messages[0].id}
- ${input.messages[0].body}
- ${input.messages[0].error}
- ${input.messages[0].attempts}
- ${Date.now()}
- name : alert-team
operation : email
config :
to : [ devops@example.com ]
subject : "Queue Processing Failures"
body : |
${input.messages.length} messages failed after max retries.
See database for details.
outputs :
logged : ${log-failures.success}
Complete Examples
Image Processing Pipeline
name : image-processor
trigger :
# Webhook enqueues images
- type : webhook
path : /webhooks/upload
methods : [ POST ]
public : true
# Queue processes images
- type : queue
queue : IMAGE_QUEUE
batch_size : 5
max_retries : 2
flow :
# Check if this is webhook or queue invocation
- agent : route-request
- agent : enqueue-image
condition : ${route-request.output.source === 'webhook'}
- agent : process-images
condition : ${route-request.output.source === 'queue'}
agents :
- name : route-request
operation : code
config :
script : scripts/route-request
input :
messages : ${input.messages}
- name : enqueue-image
operation : queue
config :
mode : send
queue : IMAGE_QUEUE
message :
body :
imageUrl : ${input.imageUrl}
operations : ${input.operations}
- name : process-images
operation : code
config :
script : scripts/process-queue-images
input :
messages : ${input.messages}
outputs :
# Webhook response
enqueued : ${enqueue-image.success}
# Queue response
processed : ${process-images.output.results}
// scripts/route-request.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function routeRequest ( context : AgentExecutionContext ) {
const { messages } = context . input
// Detect invocation source
if ( messages ) {
return { source: 'queue' }
} else {
return { source: 'webhook' }
}
}
// scripts/process-queue-images.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function processQueueImages ( context : AgentExecutionContext ) {
const { messages } = context . input
const results = []
for ( const msg of messages ) {
const { imageUrl , operations } = msg . body
// Download image
const image = await fetch ( imageUrl )
const buffer = await image . arrayBuffer ()
// Process image (resize, compress, etc.)
const processed = await processImage ( buffer , operations )
// Upload to R2
await env . IMAGES . put ( `processed/ ${ msg . id } .jpg` , processed )
results . push ({
id: msg . id ,
url: `https://cdn.example.com/processed/ ${ msg . id } .jpg`
})
}
return { results }
}
### Background Task Queue
``` yaml
name : task-worker
trigger :
- type : queue
queue : BACKGROUND_TASKS
batch_size : 20
max_retries : 5
max_wait_time : 10
flow :
- agent : classify-tasks
- agent : process-email-tasks
condition : ${classify-tasks.output.emailTasks.length > 0}
- agent : process-api-tasks
condition : ${classify-tasks.output.apiTasks.length > 0}
- agent : process-data-tasks
condition : ${classify-tasks.output.dataTasks.length > 0}
agents :
- name : classify-tasks
operation : code
config :
script : scripts/classify-queue-tasks
input :
messages : ${input.messages}
- name : process-email-tasks
operation : email
config :
to : ${classify-tasks.output.emailTasks[0].body.recipients}
subject : ${classify-tasks.output.emailTasks[0].body.subject}
body : ${classify-tasks.output.emailTasks[0].body.content}
- name : process-api-tasks
operation : http
config :
url : ${classify-tasks.output.apiTasks[0].body.endpoint}
method : POST
body : ${classify-tasks.output.apiTasks[0].body.payload}
- name : process-data-tasks
operation : storage
config :
type : d1
query : ${classify-tasks.output.dataTasks[0].body.query}
params : ${classify-tasks.output.dataTasks[0].body.params}
outputs :
processed : ${input.messages.length}
// scripts/classify-queue-tasks.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function classifyQueueTasks ( context : AgentExecutionContext ) {
const { messages } = context . input
const emailTasks = []
const apiTasks = []
const dataTasks = []
for ( const msg of messages ) {
switch ( msg . body . type ) {
case 'email' :
emailTasks . push ( msg )
break
case 'api' :
apiTasks . push ( msg )
break
case 'data' :
dataTasks . push ( msg )
break
}
}
return { emailTasks , apiTasks , dataTasks }
}
## Wrangler Configuration
Define queues in `wrangler.toml` :
``` toml
[[ queues.producers ]]
queue = "TASK_QUEUE"
binding = "TASK_QUEUE"
[[ queues.consumers ]]
queue = "TASK_QUEUE"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 3
dead_letter_queue = "TASK_DLQ"
[[ queues.producers ]]
queue = "TASK_DLQ"
binding = "TASK_DLQ"
[[ queues.consumers ]]
queue = "TASK_DLQ"
max_batch_size = 5
Batch Size
Small batches (1-5) : Low latency, real-time processing
Medium batches (10-20) : Balanced throughput
Large batches (50-100) : Maximum throughput, higher latency
trigger :
- type : queue
queue : TASK_QUEUE
batch_size : 10 # Tune based on your needs
Max Wait Time
Control batch fill time:
trigger :
- type : queue
queue : TASK_QUEUE
batch_size : 100
max_wait_time : 5 # Don't wait more than 5 seconds
Parallel Processing
Process messages concurrently within a batch:
agents :
- name : parallel-process
operation : code
config :
script : scripts/process-messages-parallel
input :
messages : ${input.messages}
// scripts/process-messages-parallel.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function processMessagesParallel ( context : AgentExecutionContext ) {
const { messages } = context . input
// Process all messages concurrently
const results = await Promise . all (
messages . map ( async ( msg : any ) => {
return await processMessage ( msg . body )
})
)
return { results }
}
async function processMessage ( body : any ) {
// Process message implementation
return { processed: true , body }
}
Message Size
Keep messages small for best performance:
Recommended : < 128 KB per message
Maximum : 256 KB per message
For large data, store in R2/KV and pass reference:
agents :
- name : enqueue-large-data
operation : code
config :
script : scripts/enqueue-large-data-reference
input :
large_data : ${input.largeData}
metadata : ${input.metadata}
// scripts/enqueue-large-data-reference.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function enqueueLargeDataReference ( context : AgentExecutionContext ) {
const { large_data , metadata } = context . input
// Store large data in R2
const dataId = generateId ()
await context . env . DATA . put ( dataId , large_data )
// Queue just the reference
await context . env . TASK_QUEUE . send ({
dataId ,
metadata
})
return { dataId }
}
function generateId () {
return crypto . randomUUID ()
}
Error Handling
Handle Individual Failures
Process messages individually to isolate failures:
agents :
- name : safe-process
operation : code
config :
script : scripts/process-messages-safely
input :
messages : ${input.messages}
// scripts/process-messages-safely.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function processMessagesSafely ( context : AgentExecutionContext ) {
const { messages } = context . input
const succeeded = []
const failed = []
for ( const msg of messages ) {
try {
const result = await processMessage ( msg . body )
succeeded . push ({ id: msg . id , result })
} catch ( error : any ) {
failed . push ({
id: msg . id ,
error: error . message ,
willRetry: msg . attempts < 3
})
// Log failure
console . error ( `Message ${ msg . id } failed:` , error )
}
}
return { succeeded , failed }
}
async function processMessage ( body : any ) {
// Process message implementation
return { processed: true }
}
Circuit Breaker
Stop processing if too many failures:
agents :
- name : circuit-breaker
operation : code
config :
script : scripts/process-with-circuit-breaker
input :
messages : ${input.messages}
// scripts/process-with-circuit-breaker.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function processWithCircuitBreaker ( context : AgentExecutionContext ) {
const { messages } = context . input
const results = []
let failures = 0
const maxFailures = 3
for ( const msg of messages ) {
if ( failures >= maxFailures ) {
console . warn ( 'Circuit breaker triggered' )
break
}
try {
const result = await processMessage ( msg . body )
results . push ({ id: msg . id , result })
} catch ( error : any ) {
failures ++
results . push ({ id: msg . id , error: error . message })
}
}
return {
results ,
circuitBroken: failures >= maxFailures
}
}
async function processMessage ( body : any ) {
// Process message implementation
return { processed: true }
}
Monitoring
Queue Metrics
Track queue performance:
agents :
- name : log-metrics
operation : storage
config :
type : d1
query : |
INSERT INTO queue_metrics (
batch_size,
processed,
succeeded,
failed,
duration,
timestamp
) VALUES (?, ?, ?, ?, ?, ?)
params :
- ${input.messages.length}
- ${process-messages.output.processed}
- ${process-messages.output.succeeded}
- ${process-messages.output.failed}
- ${Date.now() - input.batch.receivedAt}
- ${Date.now()}
Alerting
Alert on queue issues:
agents :
- name : check-failure-rate
operation : code
config :
script : scripts/check-queue-failure-rate
input :
failed_count : ${process-messages.output.failed}
total_messages : ${input.messages.length}
queue_name : ${input.queue}
// scripts/check-queue-failure-rate.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default async function checkQueueFailureRate ( context : AgentExecutionContext ) {
const { failed_count , total_messages , queue_name } = context . input
const failureRate = failed_count / total_messages
if ( failureRate > 0.5 ) {
// High failure rate, send alert
await fetch ( context . env . ALERT_WEBHOOK , {
method: 'POST' ,
body: JSON . stringify ({
alert: 'High queue failure rate' ,
rate: failureRate ,
queue: queue_name
})
})
}
return { failureRate }
}
Best Practices
Idempotency - Design consumers to handle duplicate messages
Small batches for latency - Large batches for throughput
Retry with backoff - Use exponential backoff for retries
Monitor DLQ - Set up alerts for DLQ growth
Message references - Store large data externally, queue references
Graceful degradation - Handle individual message failures
Circuit breakers - Stop processing during systemic failures
Next Steps
Triggers Configure queue triggers
Event-Driven Event-driven patterns
Storage Store queue processing results
Webhooks Combine webhooks with queues