Skip to main content

Overview

The Schedule member provides delayed execution capabilities using Durable Objects for reliable, persistent scheduling. Queue tasks for later execution, implement delays, and build time-based workflows. Perfect for reminders, delayed notifications, retry delays, and time-based orchestration.

Quick Example

name: delayed-notification
description: Send notification after delay

flow:
  - member: process-order
    type: Function

  - member: schedule-reminder
    type: Schedule
    config:
      delay: 3600000  # 1 hour
    input:
      action: "send-reminder"
      orderId: ${process-order.output.id}

  - member: send-reminder
    type: API
    config:
      url: "${env.EMAIL_SERVICE_URL}"
      method: POST
    input:
      body:
        to: ${input.email}
        subject: "Order Reminder"
        orderId: ${schedule-reminder.output.orderId}

output:
  scheduledAt: ${schedule-reminder.output.scheduledAt}

Configuration

Input Parameters

config:
  delay: number          # Required: Delay in milliseconds
  executionTime: number  # Alternative: Specific timestamp to execute

input:
  action: string        # Optional: Action identifier
  data: any            # Optional: Data to pass to scheduled execution

Output Format

output:
  scheduledAt: number    # Timestamp when scheduled
  executeAt: number      # Timestamp when will execute
  scheduleId: string     # Schedule identifier

Delay Patterns

Fixed Delay

- member: schedule-task
  type: Schedule
  config:
    delay: 60000  # 1 minute from now

Dynamic Delay

- member: schedule-task
  type: Schedule
  config:
    delay: ${input.delayMinutes * 60000}

Specific Time

- member: schedule-task
  type: Schedule
  config:
    executionTime: ${input.executeAt}  # Unix timestamp

Common Patterns

Reminder System

name: appointment-reminder
description: Send reminder before appointment

flow:
  - member: create-appointment
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO appointments (user_id, date_time)
        VALUES (?, ?)
    input:
      params:
        - ${input.userId}
        - ${input.appointmentTime}

  # Schedule reminder 24 hours before
  - member: schedule-reminder
    type: Schedule
    config:
      executionTime: ${input.appointmentTime - 86400000}
    input:
      action: "send-appointment-reminder"
      userId: ${input.userId}
      appointmentTime: ${input.appointmentTime}

  - member: send-reminder
    type: API
    config:
      url: "${env.NOTIFICATION_SERVICE}"
      method: POST
    input:
      body:
        userId: ${schedule-reminder.output.userId}
        message: "Appointment tomorrow at ${new Date(schedule-reminder.output.appointmentTime).toLocaleString()}"

output:
  appointmentId: ${create-appointment.output.id}
  reminderScheduled: true

Retry with Exponential Backoff

name: retry-with-delay
description: Retry failed operation with increasing delays

state:
  schema:
    retryCount: number
    lastError: string

flow:
  - member: attempt-operation
    type: Function
    continue_on_error: true

  - member: check-retry
    condition: ${!attempt-operation.success && state.retryCount < 5}
    type: Function
    state:
      use: [retryCount]
      set: [retryCount, lastError]

  # Exponential delay: 1s, 2s, 4s, 8s, 16s
  - member: schedule-retry
    condition: ${check-retry.success}
    type: Schedule
    config:
      delay: ${Math.pow(2, state.retryCount) * 1000}
    input:
      action: "retry-operation"
      attempt: ${state.retryCount + 1}

output:
  success: ${attempt-operation.success}
  retryScheduled: ${schedule-retry.success}
  nextRetry: ${schedule-retry.output.executeAt}

Delayed Cleanup

name: temporary-access
description: Grant temporary access then revoke

flow:
  - member: grant-access
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO access_tokens (user_id, token, expires_at)
        VALUES (?, ?, ?)
    input:
      params:
        - ${input.userId}
        - ${input.token}
        - ${Date.now() + input.durationMs}

  - member: schedule-revoke
    type: Schedule
    config:
      delay: ${input.durationMs}
    input:
      action: "revoke-access"
      userId: ${input.userId}
      token: ${input.token}

  - member: revoke-access
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        DELETE FROM access_tokens
        WHERE token = ?
    input:
      params: [${schedule-revoke.output.token}]

output:
  token: ${grant-access.output.token}
  expiresAt: ${schedule-revoke.output.executeAt}

Workflow Timeout

name: workflow-with-timeout
description: Abort workflow if not completed in time

flow:
  # Schedule timeout
  - member: schedule-timeout
    type: Schedule
    config:
      delay: 300000  # 5 minutes
    input:
      action: "workflow-timeout"
      executionId: ${execution.id}

  # Long-running operation
  - member: long-operation
    type: Function

  # Cancel timeout if completed
  - member: cancel-timeout
    condition: ${long-operation.success}
    type: Function
    input:
      scheduleId: ${schedule-timeout.output.scheduleId}

output:
  completed: ${long-operation.success}
  timedOut: ${!long-operation.success}

Scheduled Follow-ups

name: sales-follow-up
description: Schedule multiple follow-ups

flow:
  - member: create-lead
    type: Data

  # Day 1: Welcome email
  - member: schedule-welcome
    type: Schedule
    config:
      delay: 0  # Immediate
    input:
      action: "send-welcome"
      leadId: ${create-lead.output.id}

  # Day 3: Follow-up
  - member: schedule-followup-1
    type: Schedule
    config:
      delay: 259200000  # 3 days
    input:
      action: "send-followup"
      leadId: ${create-lead.output.id}
      sequence: 1

  # Day 7: Follow-up
  - member: schedule-followup-2
    type: Schedule
    config:
      delay: 604800000  # 7 days
    input:
      action: "send-followup"
      leadId: ${create-lead.output.id}
      sequence: 2

  # Day 14: Final follow-up
  - member: schedule-followup-3
    type: Schedule
    config:
      delay: 1209600000  # 14 days
    input:
      action: "send-final-followup"
      leadId: ${create-lead.output.id}

output:
  leadId: ${create-lead.output.id}
  followupsScheduled: 3

Rate Limiting with Delays

name: rate-limited-processing
description: Process items with rate limiting

flow:
  - member: process-batch
    foreach: ${input.items}
    flow:
      - member: process-item
        type: Function

      # Delay between items
      - member: rate-limit-delay
        type: Schedule
        config:
          delay: 1000  # 1 second between items
        input:
          action: "continue"

output:
  processed: ${process-batch.output.length}

Canceling Scheduled Tasks

Cancel Before Execution

flow:
  - member: schedule-task
    type: Schedule
    config:
      delay: 60000
    input:
      action: "send-notification"

  - member: check-condition
    type: Function

  # Cancel if condition met
  - member: cancel-schedule
    condition: ${check-condition.output.shouldCancel}
    type: Function
    input:
      scheduleId: ${schedule-task.output.scheduleId}
      method: "cancel"

output:
  scheduled: ${schedule-task.success}
  cancelled: ${cancel-schedule.success}

Monitoring Scheduled Tasks

Track Scheduled Tasks

flow:
  - member: schedule-task
    type: Schedule

  - member: log-schedule
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO scheduled_tasks (schedule_id, action, execute_at)
        VALUES (?, ?, ?)
    input:
      params:
        - ${schedule-task.output.scheduleId}
        - ${schedule-task.output.action}
        - ${schedule-task.output.executeAt}

output:
  scheduleId: ${schedule-task.output.scheduleId}
  logged: ${log-schedule.success}

Error Handling

Handle Schedule Failures

flow:
  - member: schedule-task
    type: Schedule
    continue_on_error: true

  - member: handle-schedule-error
    condition: ${!schedule-task.success}
    type: Function
    input:
      error: "Failed to schedule task"
      originalData: ${input}

output:
  scheduled: ${schedule-task.success}
  error: ${handle-schedule-error.output}

Retry Failed Schedules

flow:
  - member: schedule-task
    type: Schedule
    retry:
      maxAttempts: 3
      backoff: exponential

output:
  scheduled: ${schedule-task.success}

Performance Considerations

Batch Scheduling

# Schedule multiple tasks efficiently
parallel:
  - member: schedule-1
    type: Schedule
  - member: schedule-2
    type: Schedule
  - member: schedule-3
    type: Schedule

Avoid Very Short Delays

# ✅ Good - reasonable delay
config:
  delay: 60000  # 1 minute

# ⚠️ Problematic - very short delay
config:
  delay: 100  # 100ms - use direct execution instead

Testing

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('schedule member', () => {
  it('should schedule task', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeMember('schedule', {
      delay: 60000,
      action: 'send-reminder',
      data: { userId: 123 }
    });

    expect(result).toBeSuccessful();
    expect(result.output.scheduleId).toBeDefined();
    expect(result.output.executeAt).toBeGreaterThan(Date.now());
  });

  it('should execute scheduled task', async () => {
    const conductor = await TestConductor.create();

    const schedule = await conductor.executeMember('schedule', {
      delay: 1000,
      action: 'test-action'
    });

    // Fast-forward time
    await conductor.advanceTime(1000);

    // Verify task executed
    const execution = await conductor.getScheduledExecution(schedule.output.scheduleId);
    expect(execution.status).toBe('completed');
  });

  it('should cancel scheduled task', async () => {
    const conductor = await TestConductor.create();

    const schedule = await conductor.executeMember('schedule', {
      delay: 60000,
      action: 'test-action'
    });

    // Cancel before execution
    const cancelled = await conductor.cancelSchedule(schedule.output.scheduleId);
    expect(cancelled).toBe(true);

    // Verify not executed
    await conductor.advanceTime(60000);
    const execution = await conductor.getScheduledExecution(schedule.output.scheduleId);
    expect(execution.status).toBe('cancelled');
  });
});

Best Practices

  1. Use reasonable delays - Avoid very short delays (< 1 second)
  2. Store schedule IDs - Save IDs for cancellation
  3. Monitor scheduled tasks - Track execution status
  4. Handle failures - Retry or fallback for critical schedules
  5. Cancel when unnecessary - Don’t execute obsolete tasks
  6. Use specific timestamps - More reliable than relative delays
  7. Test thoroughly - Verify scheduling and execution
  8. Log schedule events - Track for debugging and auditing

Limitations

  • Minimum delay: 1 millisecond (use direct execution for immediate tasks)
  • Maximum delay: Limited by Durable Objects storage
  • Execution accuracy: ±1 second typical
  • Persistence: Durable Objects must be properly configured