Overview
Schedule ensemble execution using Cloudflare Workers Cron Triggers. Perfect for periodic tasks like data syncing, report generation, cleanup jobs, and monitoring checks. Conductor integrates seamlessly with Workers cron syntax for reliable, distributed scheduled execution at the edge.Quick Example
Copy
# wrangler.toml
[triggers]
crons = ["0 */6 * * *"] # Every 6 hours
Copy
// src/index.ts
import { Conductor } from '@ensemble-edge/conductor';
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
const conductor = new Conductor({ env });
await conductor.executeEnsemble('daily-sync', {
scheduledTime: event.scheduledTime,
cron: event.cron
});
}
};
Cron Syntax
Basic Patterns
Copy
# Every hour
crons = ["0 * * * *"]
# Every day at midnight UTC
crons = ["0 0 * * *"]
# Every Monday at 9am UTC
crons = ["0 9 * * MON"]
# Every 15 minutes
crons = ["*/15 * * * *"]
# First day of every month
crons = ["0 0 1 * *"]
Multiple Schedules
Copy
[triggers]
crons = [
"0 0 * * *", # Daily at midnight
"0 12 * * *", # Daily at noon
"0 */6 * * *" # Every 6 hours
]
Cron Format
Copy
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
│ │ │ │ ┌───────────── day of week (0 - 6 or SUN-SAT)
│ │ │ │ │
│ │ │ │ │
* * * * *
Scheduled Workflows
Daily Report Generation
Copy
name: daily-analytics-report
description: Generate and email daily analytics
flow:
# Fetch yesterday's data
- member: fetch-analytics
type: Data
config:
storage: d1
operation: query
query: |
SELECT *
FROM analytics
WHERE date = DATE('now', '-1 day')
# Analyze with AI
- member: generate-insights
type: Think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
input:
prompt: |
Analyze this data and provide key insights:
${JSON.stringify(fetch-analytics.output.results)}
# Email report
- member: send-report
type: API
config:
url: "${env.EMAIL_SERVICE_URL}"
method: POST
input:
body:
to: "team@example.com"
subject: "Daily Analytics Report"
body: ${generate-insights.output.text}
output:
sent: ${send-report.output.status === 200}
date: ${new Date().toISOString()}
Data Synchronization
Copy
name: sync-external-data
description: Sync data from external API every hour
flow:
# Fetch latest data
- member: fetch-external
type: API
config:
url: "https://api.example.com/data"
method: GET
headers:
Authorization: "Bearer ${env.API_KEY}"
cache:
ttl: 0 # No cache for fresh data
# Transform data
- member: transform-data
type: Function
input:
rawData: ${fetch-external.output.data}
# Upsert to database
- member: save-to-db
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO external_data (id, data, synced_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(id) DO UPDATE SET
data = excluded.data,
synced_at = excluded.synced_at
output:
recordsSynced: ${transform-data.output.count}
lastSync: ${new Date().toISOString()}
Cleanup Job
Copy
name: cleanup-old-data
description: Delete data older than 30 days
flow:
# Delete old executions
- member: delete-executions
type: Data
config:
storage: d1
operation: query
query: |
DELETE FROM executions
WHERE created_at < DATE('now', '-30 days')
# Delete old cache entries
- member: delete-cache
type: Data
config:
storage: d1
operation: query
query: |
DELETE FROM cache
WHERE expires_at < CURRENT_TIMESTAMP
# Vacuum database
- member: vacuum
type: Data
config:
storage: d1
operation: query
query: "VACUUM"
output:
executionsDeleted: ${delete-executions.output.changes}
cacheEntriesDeleted: ${delete-cache.output.changes}
Health Check Monitor
Copy
name: health-check-monitor
description: Check service health every 5 minutes
flow:
# Check API endpoints
parallel:
- member: check-api
type: API
config:
url: "${env.API_URL}/health"
timeout: 5000
- member: check-database
type: Data
config:
storage: d1
operation: query
query: "SELECT 1"
- member: check-cache
type: Data
config:
storage: kv
operation: get
binding: CACHE
input:
key: "health-check"
# Alert if any failed
- member: send-alert
condition: ${!check-api.success || !check-database.success || !check-cache.success}
type: API
config:
url: "${env.PAGERDUTY_URL}"
method: POST
input:
body:
event_action: "trigger"
payload:
summary: "Service health check failed"
severity: "critical"
output:
healthy: ${check-api.success && check-database.success && check-cache.success}
timestamp: ${new Date().toISOString()}
Routing by Schedule
Multiple Schedules, Different Workflows
Copy
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
const conductor = new Conductor({ env });
// Route to different ensemble based on schedule
switch (event.cron) {
case '0 0 * * *': // Daily at midnight
await conductor.executeEnsemble('daily-cleanup', {
scheduledTime: event.scheduledTime
});
break;
case '0 */6 * * *': // Every 6 hours
await conductor.executeEnsemble('sync-data', {
scheduledTime: event.scheduledTime
});
break;
case '0 0 * * MON': // Weekly on Monday
await conductor.executeEnsemble('weekly-report', {
scheduledTime: event.scheduledTime
});
break;
}
}
};
Dynamic Scheduling
Copy
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
const conductor = new Conductor({ env });
const hour = new Date(event.scheduledTime).getUTCHours();
// Business hours: frequent checks
if (hour >= 8 && hour <= 18) {
await conductor.executeEnsemble('frequent-monitor', {
interval: 'business-hours'
});
}
// Off hours: less frequent
else {
await conductor.executeEnsemble('standard-monitor', {
interval: 'off-hours'
});
}
}
};
Error Handling
Retry Failed Jobs
Copy
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
const conductor = new Conductor({ env });
try {
await conductor.executeEnsemble('scheduled-task', {
scheduledTime: event.scheduledTime
});
} catch (error) {
// Log error
console.error('Scheduled task failed:', error);
// Send alert
await fetch(env.ALERT_WEBHOOK_URL, {
method: 'POST',
body: JSON.stringify({
error: error.message,
cron: event.cron,
time: event.scheduledTime
})
});
// Optionally retry
await conductor.executeEnsemble('scheduled-task-retry', {
originalError: error.message,
scheduledTime: event.scheduledTime
});
}
}
};
Idempotency
Copy
name: idempotent-sync
description: Sync data idempotently
flow:
# Check if already processed this period
- member: check-processed
type: Data
config:
storage: kv
operation: get
binding: CACHE
input:
key: "sync:${input.date}"
# Skip if already processed
- member: sync-data
condition: ${!check-processed.output.value}
type: Function
# Mark as processed
- member: mark-processed
condition: ${sync-data.success}
type: Data
config:
storage: kv
operation: put
binding: CACHE
input:
key: "sync:${input.date}"
value: "completed"
expirationTtl: 86400 # 24 hours
output:
processed: ${sync-data.success}
skipped: ${!!check-processed.output.value}
Advanced Patterns
Distributed Lock
Copy
// Prevent concurrent execution
async function acquireLock(
kv: KVNamespace,
key: string,
ttl: number = 300
): Promise<boolean> {
const lockKey = `lock:${key}`;
const existing = await kv.get(lockKey);
if (existing) {
return false; // Lock already held
}
await kv.put(lockKey, Date.now().toString(), { expirationTtl: ttl });
return true;
}
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
const conductor = new Conductor({ env });
// Try to acquire lock
const hasLock = await acquireLock(env.CACHE, 'daily-job', 300);
if (!hasLock) {
console.log('Job already running, skipping');
return;
}
try {
await conductor.executeEnsemble('daily-job', {
scheduledTime: event.scheduledTime
});
} finally {
// Release lock
await env.CACHE.delete('lock:daily-job');
}
}
};
Batch Processing
Copy
name: process-pending-items
description: Process items in batches
flow:
# Get pending items
- member: fetch-pending
type: Data
config:
storage: d1
operation: query
query: |
SELECT id, data
FROM pending_items
WHERE processed = false
LIMIT 100
# Process each item
- member: process-items
foreach: ${fetch-pending.output.results}
type: Function
input:
item: ${item}
# Mark as processed
- member: mark-processed
type: Data
config:
storage: d1
operation: query
query: |
UPDATE pending_items
SET processed = true, processed_at = CURRENT_TIMESTAMP
WHERE id IN (${fetch-pending.output.results.map(r => r.id).join(',')})
output:
processed: ${process-items.output.length}
Rate-Limited Processing
Copy
name: rate-limited-api-sync
description: Sync data respecting API rate limits
flow:
# Check rate limit
- member: check-rate-limit
type: Data
config:
storage: kv
operation: get
binding: CACHE
input:
key: "rate-limit:api-sync"
# Wait if rate limited
- member: wait-for-reset
condition: ${check-rate-limit.output.value >= 100}
type: Function
# Fetch data
- member: fetch-data
condition: ${!wait-for-reset.success || check-rate-limit.output.value < 100}
type: API
config:
url: "https://api.example.com/data"
# Increment rate limit counter
- member: increment-counter
condition: ${fetch-data.success}
type: Data
config:
storage: kv
operation: put
binding: CACHE
input:
key: "rate-limit:api-sync"
value: ${(check-rate-limit.output.value || 0) + 1}
expirationTtl: 3600 # Reset every hour
output:
fetched: ${fetch-data.success}
rateLimitRemaining: ${100 - (check-rate-limit.output.value || 0)}
Monitoring and Observability
Log Execution Metrics
Copy
export default {
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
const conductor = new Conductor({ env });
const startTime = Date.now();
try {
const result = await conductor.executeEnsemble('scheduled-task', {
scheduledTime: event.scheduledTime
});
const duration = Date.now() - startTime;
// Log success metrics
await env.METRICS.put(`execution:${result.id}`, JSON.stringify({
status: 'success',
duration,
timestamp: event.scheduledTime
}));
} catch (error) {
const duration = Date.now() - startTime;
// Log error metrics
await env.METRICS.put(`execution:error:${Date.now()}`, JSON.stringify({
status: 'error',
error: error.message,
duration,
timestamp: event.scheduledTime
}));
}
}
};
Track Success Rate
Copy
name: track-job-metrics
description: Track success rate of scheduled jobs
flow:
- member: run-job
type: Function
continue_on_error: true
# Increment success counter
- member: increment-success
condition: ${run-job.success}
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO job_metrics (job_name, status, timestamp)
VALUES ('scheduled-task', 'success', CURRENT_TIMESTAMP)
# Increment failure counter
- member: increment-failure
condition: ${!run-job.success}
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO job_metrics (job_name, status, timestamp)
VALUES ('scheduled-task', 'failure', CURRENT_TIMESTAMP)
output:
success: ${run-job.success}
Testing Scheduled Workflows
Copy
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('scheduled workflows', () => {
it('should execute daily sync', async () => {
const conductor = await TestConductor.create();
const result = await conductor.executeEnsemble('daily-sync', {
scheduledTime: Date.now(),
cron: '0 0 * * *'
});
expect(result).toBeSuccessful();
expect(result.output.recordsSynced).toBeGreaterThan(0);
});
it('should handle cleanup job', async () => {
const conductor = await TestConductor.create();
const result = await conductor.executeEnsemble('cleanup-old-data', {
scheduledTime: Date.now()
});
expect(result).toBeSuccessful();
expect(result.output.executionsDeleted).toBeGreaterThanOrEqual(0);
});
});
Integration Testing
Copy
import { unstable_dev } from 'wrangler';
describe('cron trigger', () => {
it('should execute scheduled handler', async () => {
const worker = await unstable_dev('src/index.ts');
// Trigger scheduled event
await worker.scheduled({
scheduledTime: Date.now(),
cron: '0 0 * * *'
});
// Verify execution
const executions = await getExecutions(worker);
expect(executions).toHaveLength(1);
});
});
Best Practices
- Use UTC times - All cron schedules run in UTC
- Implement idempotency - Handle duplicate executions gracefully
- Set timeouts - Prevent long-running jobs from blocking
- Use distributed locks - Prevent concurrent execution
- Monitor failures - Alert on job failures
- Log execution - Track when jobs run and their results
- Batch processing - Process items in chunks for large datasets
- Rate limit awareness - Respect external API limits
- Test thoroughly - Verify schedules work as expected
- Keep jobs small - Break large jobs into smaller tasks
Common Schedules
Copy
# Every minute (minimum interval)
crons = ["* * * * *"]
# Every 5 minutes
crons = ["*/5 * * * *"]
# Every hour
crons = ["0 * * * *"]
# Every 6 hours
crons = ["0 */6 * * *"]
# Daily at midnight UTC
crons = ["0 0 * * *"]
# Daily at 2am UTC
crons = ["0 2 * * *"]
# Weekdays at 9am UTC
crons = ["0 9 * * 1-5"]
# First of every month
crons = ["0 0 1 * *"]
# Every Sunday at midnight
crons = ["0 0 * * SUN"]
Limitations
- Minimum interval: 1 minute
- Maximum cron triggers: 3 per Worker
- Execution time: Subject to Worker CPU time limits
- Timing accuracy: May vary by a few seconds
- No catch-up: Missed executions are not replayed

