Skip to main content

Overview

Schedule ensemble execution using Cloudflare Workers Cron Triggers. Perfect for periodic tasks like data syncing, report generation, cleanup jobs, and monitoring checks. Conductor integrates seamlessly with Workers cron syntax for reliable, distributed scheduled execution at the edge.

Quick Example

# wrangler.toml
[triggers]
crons = ["0 */6 * * *"]  # Every 6 hours
// src/index.ts
import { Conductor } from '@ensemble-edge/conductor';

export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    await conductor.executeEnsemble('daily-sync', {
      scheduledTime: event.scheduledTime,
      cron: event.cron
    });
  }
};

Cron Syntax

Basic Patterns

# Every hour
crons = ["0 * * * *"]

# Every day at midnight UTC
crons = ["0 0 * * *"]

# Every Monday at 9am UTC
crons = ["0 9 * * MON"]

# Every 15 minutes
crons = ["*/15 * * * *"]

# First day of every month
crons = ["0 0 1 * *"]

Multiple Schedules

[triggers]
crons = [
  "0 0 * * *",    # Daily at midnight
  "0 12 * * *",   # Daily at noon
  "0 */6 * * *"   # Every 6 hours
]

Cron Format

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
│ │ │ │ ┌───────────── day of week (0 - 6 or SUN-SAT)
│ │ │ │ │
│ │ │ │ │
* * * * *

Scheduled Workflows

Daily Report Generation

name: daily-analytics-report
description: Generate and email daily analytics

flow:
  # Fetch yesterday's data
  - member: fetch-analytics
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT *
        FROM analytics
        WHERE date = DATE('now', '-1 day')

  # Analyze with AI
  - member: generate-insights
    type: Think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
    input:
      prompt: |
        Analyze this data and provide key insights:
        ${JSON.stringify(fetch-analytics.output.results)}

  # Email report
  - member: send-report
    type: API
    config:
      url: "${env.EMAIL_SERVICE_URL}"
      method: POST
    input:
      body:
        to: "team@example.com"
        subject: "Daily Analytics Report"
        body: ${generate-insights.output.text}

output:
  sent: ${send-report.output.status === 200}
  date: ${new Date().toISOString()}

Data Synchronization

name: sync-external-data
description: Sync data from external API every hour

flow:
  # Fetch latest data
  - member: fetch-external
    type: API
    config:
      url: "https://api.example.com/data"
      method: GET
      headers:
        Authorization: "Bearer ${env.API_KEY}"
    cache:
      ttl: 0  # No cache for fresh data

  # Transform data
  - member: transform-data
    type: Function
    input:
      rawData: ${fetch-external.output.data}

  # Upsert to database
  - member: save-to-db
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO external_data (id, data, synced_at)
        VALUES (?, ?, CURRENT_TIMESTAMP)
        ON CONFLICT(id) DO UPDATE SET
          data = excluded.data,
          synced_at = excluded.synced_at

output:
  recordsSynced: ${transform-data.output.count}
  lastSync: ${new Date().toISOString()}

Cleanup Job

name: cleanup-old-data
description: Delete data older than 30 days

flow:
  # Delete old executions
  - member: delete-executions
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        DELETE FROM executions
        WHERE created_at < DATE('now', '-30 days')

  # Delete old cache entries
  - member: delete-cache
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        DELETE FROM cache
        WHERE expires_at < CURRENT_TIMESTAMP

  # Vacuum database
  - member: vacuum
    type: Data
    config:
      storage: d1
      operation: query
      query: "VACUUM"

output:
  executionsDeleted: ${delete-executions.output.changes}
  cacheEntriesDeleted: ${delete-cache.output.changes}

Health Check Monitor

name: health-check-monitor
description: Check service health every 5 minutes

flow:
  # Check API endpoints
  parallel:
    - member: check-api
      type: API
      config:
        url: "${env.API_URL}/health"
        timeout: 5000

    - member: check-database
      type: Data
      config:
        storage: d1
        operation: query
        query: "SELECT 1"

    - member: check-cache
      type: Data
      config:
        storage: kv
        operation: get
        binding: CACHE
      input:
        key: "health-check"

  # Alert if any failed
  - member: send-alert
    condition: ${!check-api.success || !check-database.success || !check-cache.success}
    type: API
    config:
      url: "${env.PAGERDUTY_URL}"
      method: POST
    input:
      body:
        event_action: "trigger"
        payload:
          summary: "Service health check failed"
          severity: "critical"

output:
  healthy: ${check-api.success && check-database.success && check-cache.success}
  timestamp: ${new Date().toISOString()}

Routing by Schedule

Multiple Schedules, Different Workflows

export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    // Route to different ensemble based on schedule
    switch (event.cron) {
      case '0 0 * * *':  // Daily at midnight
        await conductor.executeEnsemble('daily-cleanup', {
          scheduledTime: event.scheduledTime
        });
        break;

      case '0 */6 * * *':  // Every 6 hours
        await conductor.executeEnsemble('sync-data', {
          scheduledTime: event.scheduledTime
        });
        break;

      case '0 0 * * MON':  // Weekly on Monday
        await conductor.executeEnsemble('weekly-report', {
          scheduledTime: event.scheduledTime
        });
        break;
    }
  }
};

Dynamic Scheduling

export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    const hour = new Date(event.scheduledTime).getUTCHours();

    // Business hours: frequent checks
    if (hour >= 8 && hour <= 18) {
      await conductor.executeEnsemble('frequent-monitor', {
        interval: 'business-hours'
      });
    }
    // Off hours: less frequent
    else {
      await conductor.executeEnsemble('standard-monitor', {
        interval: 'off-hours'
      });
    }
  }
};

Error Handling

Retry Failed Jobs

export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    try {
      await conductor.executeEnsemble('scheduled-task', {
        scheduledTime: event.scheduledTime
      });
    } catch (error) {
      // Log error
      console.error('Scheduled task failed:', error);

      // Send alert
      await fetch(env.ALERT_WEBHOOK_URL, {
        method: 'POST',
        body: JSON.stringify({
          error: error.message,
          cron: event.cron,
          time: event.scheduledTime
        })
      });

      // Optionally retry
      await conductor.executeEnsemble('scheduled-task-retry', {
        originalError: error.message,
        scheduledTime: event.scheduledTime
      });
    }
  }
};

Idempotency

name: idempotent-sync
description: Sync data idempotently

flow:
  # Check if already processed this period
  - member: check-processed
    type: Data
    config:
      storage: kv
      operation: get
      binding: CACHE
    input:
      key: "sync:${input.date}"

  # Skip if already processed
  - member: sync-data
    condition: ${!check-processed.output.value}
    type: Function

  # Mark as processed
  - member: mark-processed
    condition: ${sync-data.success}
    type: Data
    config:
      storage: kv
      operation: put
      binding: CACHE
    input:
      key: "sync:${input.date}"
      value: "completed"
      expirationTtl: 86400  # 24 hours

output:
  processed: ${sync-data.success}
  skipped: ${!!check-processed.output.value}

Advanced Patterns

Distributed Lock

// Prevent concurrent execution
async function acquireLock(
  kv: KVNamespace,
  key: string,
  ttl: number = 300
): Promise<boolean> {
  const lockKey = `lock:${key}`;
  const existing = await kv.get(lockKey);

  if (existing) {
    return false;  // Lock already held
  }

  await kv.put(lockKey, Date.now().toString(), { expirationTtl: ttl });
  return true;
}

export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    // Try to acquire lock
    const hasLock = await acquireLock(env.CACHE, 'daily-job', 300);

    if (!hasLock) {
      console.log('Job already running, skipping');
      return;
    }

    try {
      await conductor.executeEnsemble('daily-job', {
        scheduledTime: event.scheduledTime
      });
    } finally {
      // Release lock
      await env.CACHE.delete('lock:daily-job');
    }
  }
};

Batch Processing

name: process-pending-items
description: Process items in batches

flow:
  # Get pending items
  - member: fetch-pending
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT id, data
        FROM pending_items
        WHERE processed = false
        LIMIT 100

  # Process each item
  - member: process-items
    foreach: ${fetch-pending.output.results}
    type: Function
    input:
      item: ${item}

  # Mark as processed
  - member: mark-processed
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE pending_items
        SET processed = true, processed_at = CURRENT_TIMESTAMP
        WHERE id IN (${fetch-pending.output.results.map(r => r.id).join(',')})

output:
  processed: ${process-items.output.length}

Rate-Limited Processing

name: rate-limited-api-sync
description: Sync data respecting API rate limits

flow:
  # Check rate limit
  - member: check-rate-limit
    type: Data
    config:
      storage: kv
      operation: get
      binding: CACHE
    input:
      key: "rate-limit:api-sync"

  # Wait if rate limited
  - member: wait-for-reset
    condition: ${check-rate-limit.output.value >= 100}
    type: Function

  # Fetch data
  - member: fetch-data
    condition: ${!wait-for-reset.success || check-rate-limit.output.value < 100}
    type: API
    config:
      url: "https://api.example.com/data"

  # Increment rate limit counter
  - member: increment-counter
    condition: ${fetch-data.success}
    type: Data
    config:
      storage: kv
      operation: put
      binding: CACHE
    input:
      key: "rate-limit:api-sync"
      value: ${(check-rate-limit.output.value || 0) + 1}
      expirationTtl: 3600  # Reset every hour

output:
  fetched: ${fetch-data.success}
  rateLimitRemaining: ${100 - (check-rate-limit.output.value || 0)}

Monitoring and Observability

Log Execution Metrics

export default {
  async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
    const conductor = new Conductor({ env });

    const startTime = Date.now();

    try {
      const result = await conductor.executeEnsemble('scheduled-task', {
        scheduledTime: event.scheduledTime
      });

      const duration = Date.now() - startTime;

      // Log success metrics
      await env.METRICS.put(`execution:${result.id}`, JSON.stringify({
        status: 'success',
        duration,
        timestamp: event.scheduledTime
      }));

    } catch (error) {
      const duration = Date.now() - startTime;

      // Log error metrics
      await env.METRICS.put(`execution:error:${Date.now()}`, JSON.stringify({
        status: 'error',
        error: error.message,
        duration,
        timestamp: event.scheduledTime
      }));
    }
  }
};

Track Success Rate

name: track-job-metrics
description: Track success rate of scheduled jobs

flow:
  - member: run-job
    type: Function
    continue_on_error: true

  # Increment success counter
  - member: increment-success
    condition: ${run-job.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO job_metrics (job_name, status, timestamp)
        VALUES ('scheduled-task', 'success', CURRENT_TIMESTAMP)

  # Increment failure counter
  - member: increment-failure
    condition: ${!run-job.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO job_metrics (job_name, status, timestamp)
        VALUES ('scheduled-task', 'failure', CURRENT_TIMESTAMP)

output:
  success: ${run-job.success}

Testing Scheduled Workflows

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('scheduled workflows', () => {
  it('should execute daily sync', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeEnsemble('daily-sync', {
      scheduledTime: Date.now(),
      cron: '0 0 * * *'
    });

    expect(result).toBeSuccessful();
    expect(result.output.recordsSynced).toBeGreaterThan(0);
  });

  it('should handle cleanup job', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeEnsemble('cleanup-old-data', {
      scheduledTime: Date.now()
    });

    expect(result).toBeSuccessful();
    expect(result.output.executionsDeleted).toBeGreaterThanOrEqual(0);
  });
});

Integration Testing

import { unstable_dev } from 'wrangler';

describe('cron trigger', () => {
  it('should execute scheduled handler', async () => {
    const worker = await unstable_dev('src/index.ts');

    // Trigger scheduled event
    await worker.scheduled({
      scheduledTime: Date.now(),
      cron: '0 0 * * *'
    });

    // Verify execution
    const executions = await getExecutions(worker);
    expect(executions).toHaveLength(1);
  });
});

Best Practices

  1. Use UTC times - All cron schedules run in UTC
  2. Implement idempotency - Handle duplicate executions gracefully
  3. Set timeouts - Prevent long-running jobs from blocking
  4. Use distributed locks - Prevent concurrent execution
  5. Monitor failures - Alert on job failures
  6. Log execution - Track when jobs run and their results
  7. Batch processing - Process items in chunks for large datasets
  8. Rate limit awareness - Respect external API limits
  9. Test thoroughly - Verify schedules work as expected
  10. Keep jobs small - Break large jobs into smaller tasks

Common Schedules

# Every minute (minimum interval)
crons = ["* * * * *"]

# Every 5 minutes
crons = ["*/5 * * * *"]

# Every hour
crons = ["0 * * * *"]

# Every 6 hours
crons = ["0 */6 * * *"]

# Daily at midnight UTC
crons = ["0 0 * * *"]

# Daily at 2am UTC
crons = ["0 2 * * *"]

# Weekdays at 9am UTC
crons = ["0 9 * * 1-5"]

# First of every month
crons = ["0 0 1 * *"]

# Every Sunday at midnight
crons = ["0 0 * * SUN"]

Limitations

  • Minimum interval: 1 minute
  • Maximum cron triggers: 3 per Worker
  • Execution time: Subject to Worker CPU time limits
  • Timing accuracy: May vary by a few seconds
  • No catch-up: Missed executions are not replayed