Overview
Integrate Conductor with Cloudflare Queues for reliable asynchronous task processing. Perfect for background jobs, event processing, webhooks, and any operation that can be processed later.Quick Example
Copy
// Producer: Add to queue
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Queue task for background processing
await env.TASK_QUEUE.send({
type: 'process-order',
orderId: 123,
timestamp: Date.now()
});
return Response.json({ queued: true });
}
};
// Consumer: Process with Conductor
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
for (const message of batch.messages) {
try {
await conductor.executeEnsemble('process-task', message.body);
message.ack();
} catch (error) {
message.retry();
}
}
}
};
Setup
1. Configure Queue
Copy
# wrangler.toml
[[queues.producers]]
binding = "TASK_QUEUE"
queue = "task-queue"
[[queues.consumers]]
queue = "task-queue"
max_batch_size = 10
max_batch_timeout = 30
2. Create Queue
Copy
npx wrangler queues create task-queue
Producer Patterns
Basic Queue Producer
Copy
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const data = await request.json();
// Add to queue
await env.TASK_QUEUE.send({
task: 'process-data',
data,
queuedAt: Date.now()
});
return Response.json({
status: 'queued',
message: 'Task will be processed shortly'
});
}
};
Batch Queue Production
Copy
// Queue multiple messages at once
await env.TASK_QUEUE.sendBatch([
{ task: 'email', userId: 1 },
{ task: 'email', userId: 2 },
{ task: 'email', userId: 3 }
]);
Delayed Messages
Copy
// Delay execution by 1 hour
await env.TASK_QUEUE.send(
{ task: 'reminder', userId: 123 },
{ delaySeconds: 3600 }
);
Priority Messages
Copy
// Send with content type for routing
await env.TASK_QUEUE.send(
{ task: 'urgent', data: {...} },
{ contentType: 'urgent' }
);
Consumer Patterns
Basic Consumer
Copy
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
for (const message of batch.messages) {
const { task, data } = message.body;
try {
await conductor.executeEnsemble(task, data);
message.ack(); // Success
} catch (error) {
console.error('Task failed:', error);
message.retry(); // Will retry automatically
}
}
}
};
Batch Processing
Copy
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
// Process all messages as a batch
const tasks = batch.messages.map(m => m.body);
try {
await conductor.executeEnsemble('process-batch', {
tasks,
batchSize: tasks.length
});
// Ack all on success
batch.messages.forEach(m => m.ack());
} catch (error) {
// Retry all on failure
batch.messages.forEach(m => m.retry());
}
}
};
Selective Retry
Copy
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
for (const message of batch.messages) {
try {
const result = await conductor.executeEnsemble('process-task', message.body);
if (result.success) {
message.ack();
} else {
// Permanent failure - don't retry
message.ack();
await logFailure(message.body, result.error);
}
} catch (error) {
// Transient failure - retry
if (isTransient(error)) {
message.retry();
} else {
message.ack();
await logFatalError(message.body, error);
}
}
}
}
};
Common Workflows
Email Queue
Copy
# Producer
name: queue-email
description: Queue email for sending
flow:
- member: validate-email
type: Function
- member: queue-message
type: Function
input:
queue: "EMAIL_QUEUE"
message:
to: ${input.to}
subject: ${input.subject}
body: ${input.body}
templateId: ${input.templateId}
output:
queued: true
messageId: ${queue-message.output.id}
Copy
# Consumer
name: send-email
description: Process email from queue
flow:
- member: render-template
condition: ${input.templateId != null}
type: Function
- member: send-email
type: API
config:
url: "${env.EMAIL_SERVICE_URL}"
method: POST
input:
body:
to: ${input.to}
subject: ${input.subject}
html: ${render-template.success ? render-template.output : input.body}
- member: log-sent
condition: ${send-email.success}
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO email_log (to_address, subject, sent_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
input:
params:
- ${input.to}
- ${input.subject}
output:
sent: ${send-email.success}
Webhook Processing
Copy
// Producer: Receive webhook, queue for processing
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const payload = await request.json();
// Verify signature
const signature = request.headers.get('X-Webhook-Signature');
if (!verifySignature(payload, signature, env.WEBHOOK_SECRET)) {
return new Response('Invalid signature', { status: 401 });
}
// Queue for processing
await env.WEBHOOK_QUEUE.send({
source: 'github',
event: request.headers.get('X-GitHub-Event'),
payload,
receivedAt: Date.now()
});
// Respond immediately
return Response.json({ received: true });
}
};
Copy
# Consumer: Process webhook
name: process-webhook
description: Handle webhook event
flow:
# Route by event type
- member: handle-push
condition: ${input.event === 'push'}
type: Function
- member: handle-pr
condition: ${input.event === 'pull_request'}
type: Function
- member: handle-issue
condition: ${input.event === 'issues'}
type: Function
# Log processing
- member: log-event
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO webhook_log (source, event, processed_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
output:
processed: true
Data Pipeline Queue
Copy
// Queue data ingestion tasks
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
// Get list of sources to process
const sources = await getSourcesToProcess(env);
// Queue each source
for (const source of sources) {
await env.PIPELINE_QUEUE.send({
task: 'ingest-data',
source: source.id,
lastSync: source.lastSync
});
}
},
async queue(batch: MessageBatch, env: Env): Promise<void> {
const conductor = new Conductor({ env });
for (const message of batch.messages) {
try {
await conductor.executeEnsemble('data-ingestion', message.body);
message.ack();
} catch (error) {
message.retry();
}
}
}
};
Error Handling
Retry Strategy
Copy
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await processMessage(message.body);
message.ack();
} catch (error) {
if (message.attempts < 5) {
message.retry(); // Automatic exponential backoff
} else {
// Max retries reached - move to DLQ
await env.DLQ.send({
original: message.body,
error: error.message,
attempts: message.attempts
});
message.ack();
}
}
}
}
};
Dead Letter Queue
Copy
[[queues.producers]]
binding = "DLQ"
queue = "dead-letter-queue"
Copy
// DLQ consumer - handle failed messages
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
// Log failure
await env.DB.prepare(
'INSERT INTO failed_tasks (task, error, timestamp) VALUES (?, ?, ?)'
).bind(
JSON.stringify(message.body.original),
message.body.error,
Date.now()
).run();
// Alert
await fetch(env.ALERT_WEBHOOK, {
method: 'POST',
body: JSON.stringify({
text: `Task failed after ${message.body.attempts} attempts`,
task: message.body.original
})
});
message.ack();
}
}
};
Monitoring
Track Queue Metrics
Copy
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
const startTime = Date.now();
let processed = 0;
let failed = 0;
for (const message of batch.messages) {
try {
await processMessage(message.body);
message.ack();
processed++;
} catch (error) {
message.retry();
failed++;
}
}
// Log metrics
await env.DB.prepare(
`INSERT INTO queue_metrics
(processed, failed, duration_ms, timestamp)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)`
).bind(
processed,
failed,
Date.now() - startTime
).run();
}
};
CloudFlare Dashboard
Monitor in Cloudflare Dashboard:- Queue depth
- Message throughput
- Processing time
- Error rate
- Retry rate
Testing
Copy
import { describe, it, expect } from 'vitest';
import { unstable_dev } from 'wrangler';
describe('queue integration', () => {
it('should queue and process message', async () => {
const worker = await unstable_dev('src/index.ts');
// Queue message
await worker.fetch('/api/queue', {
method: 'POST',
body: JSON.stringify({ task: 'test' })
});
// Trigger queue consumer
await worker.queue({
queue: 'TASK_QUEUE',
messages: [{
id: '1',
timestamp: Date.now(),
body: { task: 'test' }
}]
});
// Verify processed
// ... check database or state
});
});
Best Practices
- Respond quickly - Queue and return immediately
- Idempotent processing - Handle duplicate messages
- Selective retry - Don’t retry permanent failures
- Monitor metrics - Track queue depth and processing time
- Set batch sizes - Balance throughput and latency
- Use DLQ - Handle failed messages
- Log everything - Debug queue issues
- Test thoroughly - Verify queue and consumer logic

