Skip to main content

Overview

HITLState is a Durable Object that manages HITL (Human-in-the-Loop) resumption state with strong consistency and automatic expiration. It provides real-time notifications via WebSockets and uses Durable Object alarms for TTL management.
// Create a HITLState Durable Object instance
const id = env.HITL_STATE.idFromName(token);
const stub = env.HITL_STATE.get(id);

// Suspend execution
await stub.fetch('https://do/suspend', {
  method: 'POST',
  body: JSON.stringify({
    token: 'hitl_xyz789',
    suspendedState: executionState,
    ttl: 3600000 // 1 hour
  })
});

// Approve
await stub.fetch('https://do/approve', {
  method: 'POST',
  body: JSON.stringify({
    actor: 'user@example.com',
    approvalData: { approved: true, comments: 'Looks good' }
  })
});

Configuration

Add to your wrangler.toml:
[[durable_objects.bindings]]
name = "HITL_STATE"
class_name = "HITLState"
script_name = "conductor"

[[migrations]]
tag = "v1"
new_classes = ["HITLState"]

Types

HITLStatus

type HITLStatus = 'pending' | 'approved' | 'rejected' | 'expired' | 'resumed'

StoredHITLState

interface StoredHITLState {
  token: string;
  status: HITLStatus;
  suspendedState: SuspendedExecutionState;
  suspendedAt: number;
  expiresAt: number;
  approvalData?: unknown;
  rejectionReason?: string;
  events: HITLEvent[];
}

HITLEvent

interface HITLEvent {
  type: 'suspended' | 'approved' | 'rejected' | 'expired' | 'resumed';
  timestamp: number;
  actor?: string;
  data?: unknown;
}

SuspendedExecutionState

interface SuspendedExecutionState {
  executionId: string;
  ensembleName: string;
  currentMemberIndex: number;
  state: Record<string, unknown>;
  outputs: Record<string, unknown>;
  input: unknown;
}

HTTP API

POST /suspend

Suspend execution and create HITL state:
token
string
required
Unique HITL token
suspendedState
SuspendedExecutionState
required
Suspended execution state to restore later
ttl
number
required
Time-to-live in milliseconds
Example:
const token = `hitl_${crypto.randomUUID()}`;

await stub.fetch('https://do/suspend', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    token,
    suspendedState: {
      executionId: 'exec_123',
      ensembleName: 'content-moderation',
      currentMemberIndex: 3,
      state: { userId: 'user_456' },
      outputs: { content: '...' },
      input: { postId: 'post_789' }
    },
    ttl: 3600000 // 1 hour
  })
});

// Return token to user for approval UI
return Response.json({ token, expiresAt: Date.now() + 3600000 });

GET /status

Get current HITL status: Response:
{
  token: string;
  status: HITLStatus;
  suspendedState: SuspendedExecutionState;
  suspendedAt: number;
  expiresAt: number;
  approvalData?: unknown;
  rejectionReason?: string;
  events: HITLEvent[];
}
Example:
const response = await stub.fetch('https://do/status');
const state = await response.json();

console.log(`Status: ${state.status}`);
console.log(`Expires: ${new Date(state.expiresAt)}`);

if (state.status === 'pending') {
  // Show approval UI
} else if (state.status === 'approved') {
  // Resume execution
} else if (state.status === 'expired') {
  // Show expired message
}

POST /approve

Approve and enable resumption:
actor
string
required
User/system who approved (email, ID, etc.)
approvalData
unknown
Additional approval data (optional)
Example:
await stub.fetch('https://do/approve', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    actor: 'moderator@example.com',
    approvalData: {
      approved: true,
      comments: 'Content looks appropriate',
      reviewedAt: Date.now()
    }
  })
});

// After approval, retrieve suspended state and resume
const statusRes = await stub.fetch('https://do/status');
const state = await statusRes.json();

if (state.status === 'approved') {
  await resumeExecution(state.suspendedState, state.approvalData);
}

POST /reject

Reject and prevent resumption:
actor
string
required
User/system who rejected
reason
string
Rejection reason (optional)
Example:
await stub.fetch('https://do/reject', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    actor: 'moderator@example.com',
    reason: 'Content violates community guidelines'
  })
});

POST /resume

Mark as resumed (called after successful resumption): Example:
// After successfully resuming execution
await stub.fetch('https://do/resume', {
  method: 'POST'
});

// Then delete the HITL state
await stub.fetch('https://do/', {
  method: 'DELETE'
});

DELETE /

Delete HITL state: Example:
await stub.fetch('https://do/', {
  method: 'DELETE'
});

Alarm Handler

The HITLState Durable Object uses alarms for automatic expiration:
async alarm(): Promise<void> {
  // Automatically called when TTL expires
  // Marks state as 'expired'
  // Broadcasts expiration event to WebSocket clients
  // Closes all connections
}
The alarm is set when /suspend is called and automatically fires when the TTL expires.

WebSocket API

Connect

Upgrade to WebSocket for live notifications: Example:
const ws = await stub.fetch('https://do/', {
  headers: { 'Upgrade': 'websocket' }
}).then(res => res.webSocket);

ws.addEventListener('message', (event) => {
  const update = JSON.parse(event.data);
  
  if (update.type === 'initial_state') {
    console.log('HITL status:', update.state.status);
    showApprovalUI(update.state);
  } else if (update.type === 'approved') {
    console.log('Approved by:', update.actor);
    hideApprovalUI();
    showSuccess();
  } else if (update.type === 'rejected') {
    console.log('Rejected:', update.reason);
    hideApprovalUI();
    showRejection(update.reason);
  } else if (update.type === 'expired') {
    console.log('HITL expired');
    hideApprovalUI();
    showExpired();
    ws.close();
  }
});

ws.accept();

Events

initial_state

Sent immediately upon connection:
{
  type: 'initial_state';
  state: {
    token: string;
    status: HITLStatus;
    suspendedAt: number;
    expiresAt: number;
    events: HITLEvent[];
  };
}

suspended

Sent when execution is suspended:
{
  type: 'suspended';
  token: string;
  expiresAt: number;
  timestamp: number;
}

approved

Sent when HITL is approved:
{
  type: 'approved';
  token: string;
  actor: string;
  data: unknown;
  timestamp: number;
}

rejected

Sent when HITL is rejected:
{
  type: 'rejected';
  token: string;
  actor: string;
  reason?: string;
  timestamp: number;
}

expired

Sent when HITL expires:
{
  type: 'expired';
  token: string;
  timestamp: number;
}

Usage Patterns

Content Moderation

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Execute ensemble until HITL suspension
    const result = await conductor.execute('content-moderation', {
      postId: 'post_123',
      content: 'User-generated content...'
    });
    
    // Check if suspended for human review
    if (result.status === 'waiting_for_input' && result.hitl) {
      const token = result.hitl.token;
      
      // Create HITL state
      const id = env.HITL_STATE.idFromName(token);
      const stub = env.HITL_STATE.get(id);
      
      await stub.fetch('https://do/suspend', {
        method: 'POST',
        body: JSON.stringify({
          token,
          suspendedState: result.hitl.suspendedState,
          ttl: 3600000 // 1 hour
        })
      });
      
      // Return approval URL
      return Response.json({
        status: 'pending_review',
        reviewUrl: `https://admin.example.com/review/${token}`,
        expiresAt: Date.now() + 3600000
      });
    }
    
    return Response.json(result);
  }
};

Approval Interface

// Approval UI handler
async function handleApproval(
  request: Request,
  env: Env
): Promise<Response> {
  const { token, action, actor, comments } = await request.json();
  
  // Get HITL state
  const id = env.HITL_STATE.idFromName(token);
  const stub = env.HITL_STATE.get(id);
  
  // Check if still pending
  const statusRes = await stub.fetch('https://do/status');
  const state = await statusRes.json();
  
  if (state.status !== 'pending') {
    return Response.json(
      { error: 'HITL request already processed or expired' },
      { status: 400 }
    );
  }
  
  if (action === 'approve') {
    // Approve
    await stub.fetch('https://do/approve', {
      method: 'POST',
      body: JSON.stringify({
        actor,
        approvalData: { approved: true, comments }
      })
    });
    
    // Resume execution
    await resumeExecution(state.suspendedState, env);
    
    // Mark as resumed
    await stub.fetch('https://do/resume', { method: 'POST' });
    
    // Clean up
    await stub.fetch('https://do/', { method: 'DELETE' });
    
    return Response.json({ status: 'approved' });
  } else {
    // Reject
    await stub.fetch('https://do/reject', {
      method: 'POST',
      body: JSON.stringify({
        actor,
        reason: comments
      })
    });
    
    return Response.json({ status: 'rejected' });
  }
}

async function resumeExecution(
  suspendedState: SuspendedExecutionState,
  env: Env
) {
  // Resume the conductor execution from suspended state
  await conductor.resume(suspendedState);
}

Real-Time Approval UI

// Client-side approval monitoring
function createApprovalUI(token: string) {
  const ws = new WebSocket(`wss://api.example.com/hitl/${token}/ws`);
  
  ws.onopen = () => {
    console.log('Connected to HITL state');
  };
  
  ws.onmessage = (event) => {
    const update = JSON.parse(event.data);
    
    if (update.type === 'initial_state') {
      // Show approval form
      showApprovalForm(update.state);
      
      // Show countdown timer
      startCountdown(update.state.expiresAt);
    } else if (update.type === 'approved') {
      // Another moderator approved
      hideApprovalForm();
      showMessage(`Approved by ${update.actor}`);
      ws.close();
    } else if (update.type === 'rejected') {
      // Another moderator rejected
      hideApprovalForm();
      showMessage(`Rejected by ${update.actor}: ${update.reason}`);
      ws.close();
    } else if (update.type === 'expired') {
      // Approval window expired
      hideApprovalForm();
      showMessage('Review window expired');
      ws.close();
    }
  };
  
  return ws;
}

Best Practices

  1. Set appropriate TTLs - Balance urgency vs. reviewer availability
  2. Handle expiration - Always check status before operations
  3. Use WebSockets - For real-time approval UIs
  4. Validate actors - Authenticate approval requests
  5. Log events - Track approval history
  6. Clean up - Delete state after resumption
  7. Notify reviewers - Send alerts when approval needed
  8. Provide context - Include all necessary info for reviewers
  9. Handle race conditions - Multiple reviewers may act simultaneously
  10. Monitor metrics - Track approval times and rates

Security Considerations

  1. Validate tokens - Ensure tokens are cryptographically secure
  2. Authenticate actors - Verify reviewer identity
  3. Authorize access - Check permissions before approval
  4. Sanitize data - Clean suspended state before storage
  5. Rate limit - Prevent abuse of approval endpoints
  6. Audit logs - Record all approval actions
  7. Encrypt sensitive data - Protect suspended state
  8. Implement RBAC - Role-based approval workflows
  9. Prevent tampering - Validate suspended state integrity
  10. Monitor suspicious activity - Alert on unusual patterns