Overview
HITLState is a Durable Object that manages HITL (Human-in-the-Loop) resumption state with strong consistency and automatic expiration. It provides real-time notifications via WebSockets and uses Durable Object alarms for TTL management.
// Create a HITLState Durable Object instance
const id = env.HITL_STATE.idFromName(token);
const stub = env.HITL_STATE.get(id);
// Suspend execution
await stub.fetch('https://do/suspend', {
method: 'POST',
body: JSON.stringify({
token: 'hitl_xyz789',
suspendedState: executionState,
ttl: 3600000 // 1 hour
})
});
// Approve
await stub.fetch('https://do/approve', {
method: 'POST',
body: JSON.stringify({
actor: 'user@example.com',
approvalData: { approved: true, comments: 'Looks good' }
})
});
Configuration
Add to your wrangler.toml:
[[durable_objects.bindings]]
name = "HITL_STATE"
class_name = "HITLState"
script_name = "conductor"
[[migrations]]
tag = "v1"
new_classes = ["HITLState"]
Types
HITLStatus
type HITLStatus = 'pending' | 'approved' | 'rejected' | 'expired' | 'resumed'
StoredHITLState
interface StoredHITLState {
token: string;
status: HITLStatus;
suspendedState: SuspendedExecutionState;
suspendedAt: number;
expiresAt: number;
approvalData?: unknown;
rejectionReason?: string;
events: HITLEvent[];
}
HITLEvent
interface HITLEvent {
type: 'suspended' | 'approved' | 'rejected' | 'expired' | 'resumed';
timestamp: number;
actor?: string;
data?: unknown;
}
SuspendedExecutionState
interface SuspendedExecutionState {
executionId: string;
ensembleName: string;
currentMemberIndex: number;
state: Record<string, unknown>;
outputs: Record<string, unknown>;
input: unknown;
}
HTTP API
POST /suspend
Suspend execution and create HITL state:
suspendedState
SuspendedExecutionState
required
Suspended execution state to restore later
Time-to-live in milliseconds
Example:
const token = `hitl_${crypto.randomUUID()}`;
await stub.fetch('https://do/suspend', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
token,
suspendedState: {
executionId: 'exec_123',
ensembleName: 'content-moderation',
currentMemberIndex: 3,
state: { userId: 'user_456' },
outputs: { content: '...' },
input: { postId: 'post_789' }
},
ttl: 3600000 // 1 hour
})
});
// Return token to user for approval UI
return Response.json({ token, expiresAt: Date.now() + 3600000 });
GET /status
Get current HITL status:
Response:
{
token: string;
status: HITLStatus;
suspendedState: SuspendedExecutionState;
suspendedAt: number;
expiresAt: number;
approvalData?: unknown;
rejectionReason?: string;
events: HITLEvent[];
}
Example:
const response = await stub.fetch('https://do/status');
const state = await response.json();
console.log(`Status: ${state.status}`);
console.log(`Expires: ${new Date(state.expiresAt)}`);
if (state.status === 'pending') {
// Show approval UI
} else if (state.status === 'approved') {
// Resume execution
} else if (state.status === 'expired') {
// Show expired message
}
POST /approve
Approve and enable resumption:
User/system who approved (email, ID, etc.)
Additional approval data (optional)
Example:
await stub.fetch('https://do/approve', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
actor: 'moderator@example.com',
approvalData: {
approved: true,
comments: 'Content looks appropriate',
reviewedAt: Date.now()
}
})
});
// After approval, retrieve suspended state and resume
const statusRes = await stub.fetch('https://do/status');
const state = await statusRes.json();
if (state.status === 'approved') {
await resumeExecution(state.suspendedState, state.approvalData);
}
POST /reject
Reject and prevent resumption:
Rejection reason (optional)
Example:
await stub.fetch('https://do/reject', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
actor: 'moderator@example.com',
reason: 'Content violates community guidelines'
})
});
POST /resume
Mark as resumed (called after successful resumption):
Example:
// After successfully resuming execution
await stub.fetch('https://do/resume', {
method: 'POST'
});
// Then delete the HITL state
await stub.fetch('https://do/', {
method: 'DELETE'
});
DELETE /
Delete HITL state:
Example:
await stub.fetch('https://do/', {
method: 'DELETE'
});
Alarm Handler
The HITLState Durable Object uses alarms for automatic expiration:
async alarm(): Promise<void> {
// Automatically called when TTL expires
// Marks state as 'expired'
// Broadcasts expiration event to WebSocket clients
// Closes all connections
}
The alarm is set when /suspend is called and automatically fires when the TTL expires.
WebSocket API
Connect
Upgrade to WebSocket for live notifications:
Example:
const ws = await stub.fetch('https://do/', {
headers: { 'Upgrade': 'websocket' }
}).then(res => res.webSocket);
ws.addEventListener('message', (event) => {
const update = JSON.parse(event.data);
if (update.type === 'initial_state') {
console.log('HITL status:', update.state.status);
showApprovalUI(update.state);
} else if (update.type === 'approved') {
console.log('Approved by:', update.actor);
hideApprovalUI();
showSuccess();
} else if (update.type === 'rejected') {
console.log('Rejected:', update.reason);
hideApprovalUI();
showRejection(update.reason);
} else if (update.type === 'expired') {
console.log('HITL expired');
hideApprovalUI();
showExpired();
ws.close();
}
});
ws.accept();
Events
initial_state
Sent immediately upon connection:
{
type: 'initial_state';
state: {
token: string;
status: HITLStatus;
suspendedAt: number;
expiresAt: number;
events: HITLEvent[];
};
}
suspended
Sent when execution is suspended:
{
type: 'suspended';
token: string;
expiresAt: number;
timestamp: number;
}
approved
Sent when HITL is approved:
{
type: 'approved';
token: string;
actor: string;
data: unknown;
timestamp: number;
}
rejected
Sent when HITL is rejected:
{
type: 'rejected';
token: string;
actor: string;
reason?: string;
timestamp: number;
}
expired
Sent when HITL expires:
{
type: 'expired';
token: string;
timestamp: number;
}
Usage Patterns
Content Moderation
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Execute ensemble until HITL suspension
const result = await conductor.execute('content-moderation', {
postId: 'post_123',
content: 'User-generated content...'
});
// Check if suspended for human review
if (result.status === 'waiting_for_input' && result.hitl) {
const token = result.hitl.token;
// Create HITL state
const id = env.HITL_STATE.idFromName(token);
const stub = env.HITL_STATE.get(id);
await stub.fetch('https://do/suspend', {
method: 'POST',
body: JSON.stringify({
token,
suspendedState: result.hitl.suspendedState,
ttl: 3600000 // 1 hour
})
});
// Return approval URL
return Response.json({
status: 'pending_review',
reviewUrl: `https://admin.example.com/review/${token}`,
expiresAt: Date.now() + 3600000
});
}
return Response.json(result);
}
};
Approval Interface
// Approval UI handler
async function handleApproval(
request: Request,
env: Env
): Promise<Response> {
const { token, action, actor, comments } = await request.json();
// Get HITL state
const id = env.HITL_STATE.idFromName(token);
const stub = env.HITL_STATE.get(id);
// Check if still pending
const statusRes = await stub.fetch('https://do/status');
const state = await statusRes.json();
if (state.status !== 'pending') {
return Response.json(
{ error: 'HITL request already processed or expired' },
{ status: 400 }
);
}
if (action === 'approve') {
// Approve
await stub.fetch('https://do/approve', {
method: 'POST',
body: JSON.stringify({
actor,
approvalData: { approved: true, comments }
})
});
// Resume execution
await resumeExecution(state.suspendedState, env);
// Mark as resumed
await stub.fetch('https://do/resume', { method: 'POST' });
// Clean up
await stub.fetch('https://do/', { method: 'DELETE' });
return Response.json({ status: 'approved' });
} else {
// Reject
await stub.fetch('https://do/reject', {
method: 'POST',
body: JSON.stringify({
actor,
reason: comments
})
});
return Response.json({ status: 'rejected' });
}
}
async function resumeExecution(
suspendedState: SuspendedExecutionState,
env: Env
) {
// Resume the conductor execution from suspended state
await conductor.resume(suspendedState);
}
Real-Time Approval UI
// Client-side approval monitoring
function createApprovalUI(token: string) {
const ws = new WebSocket(`wss://api.example.com/hitl/${token}/ws`);
ws.onopen = () => {
console.log('Connected to HITL state');
};
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
if (update.type === 'initial_state') {
// Show approval form
showApprovalForm(update.state);
// Show countdown timer
startCountdown(update.state.expiresAt);
} else if (update.type === 'approved') {
// Another moderator approved
hideApprovalForm();
showMessage(`Approved by ${update.actor}`);
ws.close();
} else if (update.type === 'rejected') {
// Another moderator rejected
hideApprovalForm();
showMessage(`Rejected by ${update.actor}: ${update.reason}`);
ws.close();
} else if (update.type === 'expired') {
// Approval window expired
hideApprovalForm();
showMessage('Review window expired');
ws.close();
}
};
return ws;
}
Best Practices
- Set appropriate TTLs - Balance urgency vs. reviewer availability
- Handle expiration - Always check status before operations
- Use WebSockets - For real-time approval UIs
- Validate actors - Authenticate approval requests
- Log events - Track approval history
- Clean up - Delete state after resumption
- Notify reviewers - Send alerts when approval needed
- Provide context - Include all necessary info for reviewers
- Handle race conditions - Multiple reviewers may act simultaneously
- Monitor metrics - Track approval times and rates
Security Considerations
- Validate tokens - Ensure tokens are cryptographically secure
- Authenticate actors - Verify reviewer identity
- Authorize access - Check permissions before approval
- Sanitize data - Clean suspended state before storage
- Rate limit - Prevent abuse of approval endpoints
- Audit logs - Record all approval actions
- Encrypt sensitive data - Protect suspended state
- Implement RBAC - Role-based approval workflows
- Prevent tampering - Validate suspended state integrity
- Monitor suspicious activity - Alert on unusual patterns