Overview
Members are the building blocks of Conductor workflows. This guide walks through creating Function, Think, Data, and API members from scratch.Member Anatomy
Every member consists of two parts:1. Configuration (member.yaml)
Defines metadata, type, and schema:
Copy
name: my-member
type: Function
description: What this member does
config:
# Member-specific configuration
schema:
input:
type: object
properties:
# Input parameters
output:
type: object
properties:
# Output format
2. Implementation (index.ts)
The code that executes:
Copy
export default async function myMember({ input, state, setState, env }) {
// Your logic here
return { result: 'output' };
}
Using the CLI
Create Member with CLI
Copy
# Create Function member
conductor add member calculate-metrics Function
# Create Think member
conductor add member analyze-sentiment Think
# Create Data member
conductor add member fetch-user-data Data
# Create API member
conductor add member fetch-pricing API
Function Members
Basic Function Member
Copy
# members/calculate-metrics/member.yaml
name: calculate-metrics
type: Function
description: Calculate business metrics
schema:
input:
type: object
properties:
revenue:
type: number
costs:
type: number
required: [revenue, costs]
output:
type: object
properties:
profit:
type: number
margin:
type: number
Copy
// members/calculate-metrics/index.ts
export default async function calculateMetrics({ input }) {
const { revenue, costs } = input;
const profit = revenue - costs;
const margin = (profit / revenue) * 100;
return {
profit,
margin: parseFloat(margin.toFixed(2))
};
}
Function Member with SDK Factory
Copy
// members/calculate-metrics/index.ts
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
export default createFunctionMember({
async handler({ input }) {
const { revenue, costs } = input;
return {
profit: revenue - costs,
margin: ((revenue - costs) / revenue) * 100
};
}
});
Function Member with State
Copy
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
export default createFunctionMember({
async handler({ input, state, setState }) {
// Read from shared state
const previousResult = state.previousCalculation;
// Calculate
const result = performCalculation(input, previousResult);
// Write to shared state
setState({
previousCalculation: result,
timestamp: Date.now()
});
return { result };
}
});
Function Member with Utilities
Copy
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
import { validateEmail, formatCurrency } from '../../lib/helpers';
export default createFunctionMember({
async handler({ input }) {
if (!validateEmail(input.email)) {
throw new Error('Invalid email address');
}
return {
email: input.email,
formattedTotal: formatCurrency(input.total)
};
}
});
Think Members
Think members use AI providers for reasoning tasks.Basic Think Member
Copy
# members/analyze-sentiment/member.yaml
name: analyze-sentiment
type: Think
description: Analyze text sentiment with AI
config:
provider: openai
model: gpt-4o-mini
routing: cloudflare-gateway
temperature: 0.3
maxTokens: 100
systemPrompt: |
Analyze the sentiment of the given text.
Respond with a JSON object:
{
"sentiment": "positive" | "negative" | "neutral",
"confidence": 0.0 to 1.0,
"reasoning": "brief explanation"
}
schema:
input:
type: object
properties:
text:
type: string
required: [text]
output:
type: object
properties:
sentiment:
type: string
confidence:
type: number
reasoning:
type: string
Copy
// members/analyze-sentiment/index.ts
// Think members handled by Conductor runtime
export default null;
Think members don’t require implementation code - Conductor handles AI provider calls automatically based on the configuration.
Think Member with Structured Output
Copy
config:
provider: openai
model: gpt-4o
routing: cloudflare-gateway
temperature: 0.2
responseFormat:
type: json_schema
json_schema:
name: company_analysis
strict: true
schema:
type: object
properties:
industry:
type: string
employees:
type: number
founded:
type: number
summary:
type: string
required: [industry, employees, founded, summary]
additionalProperties: false
Think Member with Dynamic Prompts
Copy
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
routing: cloudflare-gateway
temperature: 0.7
# Prompt interpolation with input
systemPrompt: |
You are analyzing companies in the ${input.industry} industry.
Focus on ${input.analysisType} factors.
Data Members
Data members interact with storage systems.KV Storage Member
Copy
# members/cache-get/member.yaml
name: cache-get
type: Data
description: Get value from KV cache
config:
storage: kv
operation: get
binding: CACHE # wrangler.toml binding name
schema:
input:
type: object
properties:
key:
type: string
required: [key]
output:
type: object
properties:
value:
type: unknown
found:
type: boolean
D1 Database Member
Copy
# members/get-user/member.yaml
name: get-user
type: Data
description: Get user from D1 database
config:
storage: d1
operation: query
binding: DB
query: |
SELECT id, name, email, created_at
FROM users
WHERE id = ?
schema:
input:
type: object
properties:
userId:
type: number
required: [userId]
Custom Data Member
Copy
// members/advanced-query/index.ts
import { createDataMember } from '@ensemble-edge/conductor/sdk';
export default createDataMember({
async handler({ input, env }) {
// Complex query logic
const results = await env.DB.prepare(`
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = ?
GROUP BY u.id
ORDER BY order_count DESC
LIMIT ?
`)
.bind(input.status, input.limit)
.all();
return {
users: results.results,
total: results.results.length
};
}
});
R2 Storage Member
Copy
import { createDataMember } from '@ensemble-edge/conductor/sdk';
export default createDataMember({
async handler({ input, env }) {
const { operation, key, value } = input;
switch (operation) {
case 'get':
const object = await env.STORAGE.get(key);
return {
value: object ? await object.text() : null,
found: !!object
};
case 'put':
await env.STORAGE.put(key, value);
return { success: true };
case 'delete':
await env.STORAGE.delete(key);
return { success: true };
default:
throw new Error(`Unknown operation: ${operation}`);
}
}
});
API Members
API members make HTTP requests to external services.Basic API Member
Copy
# members/fetch-pricing/member.yaml
name: fetch-pricing
type: API
description: Fetch pricing from external API
config:
url: "https://api.example.com/pricing"
method: GET
headers:
Authorization: "Bearer ${env.API_KEY}"
timeout: 30000
retries: 3
schema:
input:
type: object
properties:
plan:
type: string
required: [plan]
output:
type: object
properties:
price:
type: number
currency:
type: string
API Member with Dynamic URL
Copy
config:
url: "https://api.example.com/users/${input.userId}"
method: GET
headers:
Authorization: "Bearer ${env.API_KEY}"
Custom API Member
Copy
// members/advanced-api-call/index.ts
import { createAPIMember } from '@ensemble-edge/conductor/sdk';
export default createAPIMember({
async handler({ input, env }) {
const response = await fetch(input.url, {
method: input.method || 'GET',
headers: {
'Authorization': `Bearer ${env.API_KEY}`,
'Content-Type': 'application/json',
...input.headers
},
body: input.body ? JSON.stringify(input.body) : undefined
});
if (!response.ok) {
throw new Error(`API error: ${response.statusText}`);
}
const data = await response.json();
return {
data,
status: response.status,
headers: Object.fromEntries(response.headers.entries())
};
}
});
API Member with Retry Logic
Copy
import { createAPIMember } from '@ensemble-edge/conductor/sdk';
async function fetchWithRetry(url: string, options: RequestInit, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, options);
if (response.ok) return response;
// Retry on 5xx errors
if (response.status >= 500 && attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
continue;
}
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
} catch (error) {
if (attempt === maxRetries) throw error;
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
}
export default createAPIMember({
async handler({ input, env }) {
const response = await fetchWithRetry(input.url, {
method: 'GET',
headers: { 'Authorization': `Bearer ${env.API_KEY}` }
});
return { data: await response.json() };
}
});
Advanced Patterns
Member with Validation
Copy
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
function validateInput(input: any): void {
if (!input.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(input.email)) {
throw new Error('Invalid email address');
}
if (input.age && (input.age < 0 || input.age > 150)) {
throw new Error('Invalid age');
}
}
export default createFunctionMember({
async handler({ input }) {
validateInput(input);
// Process valid input
return { success: true };
}
});
Member with Error Handling
Copy
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
export default createFunctionMember({
async handler({ input }) {
try {
const result = await riskyOperation(input);
return { result, success: true };
} catch (error) {
// Log error but don't throw - return structured error
console.error('Operation failed:', error);
return {
success: false,
error: error.message,
fallbackResult: getDefaultValue()
};
}
}
});
Member with Telemetry
Copy
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
export default createFunctionMember({
async handler({ input, env }) {
const startTime = performance.now();
try {
const result = await processData(input);
// Log success
env.ANALYTICS?.writeDataPoint({
blobs: ['process-data', 'success'],
doubles: [performance.now() - startTime],
indexes: [Date.now()]
});
return result;
} catch (error) {
// Log failure
env.ANALYTICS?.writeDataPoint({
blobs: ['process-data', 'error'],
doubles: [performance.now() - startTime],
indexes: [Date.now()]
});
throw error;
}
}
});
Member with Caching Logic
Copy
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
export default createFunctionMember({
async handler({ input, env }) {
const cacheKey = `result:${input.id}`;
// Check cache first
const cached = await env.CACHE.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
// Compute result
const result = await expensiveComputation(input);
// Cache for 1 hour
await env.CACHE.put(cacheKey, JSON.stringify(result), {
expirationTtl: 3600
});
return result;
}
});
Best Practices
1. Single Responsibility
Copy
// ✅ Good - focused on one task
export default createFunctionMember({
async handler({ input }) {
return { metric: calculateMetric(input) };
}
});
// ❌ Bad - doing too much
export default createFunctionMember({
async handler({ input, env }) {
const data = await fetchData(env);
const analysis = await analyzeData(data);
const report = await generateReport(analysis);
await saveToDatabase(env, report);
return { report };
}
});
2. Type Safety
Copy
// Define types for clarity
interface CalculateInput {
revenue: number;
costs: number;
}
interface CalculateOutput {
profit: number;
margin: number;
}
export default createFunctionMember({
async handler({ input }: { input: CalculateInput }): Promise<CalculateOutput> {
return {
profit: input.revenue - input.costs,
margin: ((input.revenue - input.costs) / input.revenue) * 100
};
}
});
3. Error Messages
Copy
// ✅ Good - descriptive errors
if (!input.email) {
throw new Error('Email address is required');
}
if (!isValidEmail(input.email)) {
throw new Error(`Invalid email format: ${input.email}`);
}
// ❌ Bad - vague errors
if (!input.email || !isValidEmail(input.email)) {
throw new Error('Bad input');
}
4. Schema Validation
Copy
# ✅ Good - strict schema
schema:
input:
type: object
properties:
email:
type: string
format: email
age:
type: number
minimum: 0
maximum: 150
required: [email]
additionalProperties: false
# ❌ Bad - loose schema
schema:
input:
type: object
5. Documentation
Copy
name: calculate-roi
type: Function
description: Calculate return on investment
# Document each field
schema:
input:
type: object
properties:
initialInvestment:
type: number
description: Initial investment amount in USD
currentValue:
type: number
description: Current value of investment in USD
timeYears:
type: number
description: Investment period in years
Testing Members
Copy
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('calculate-metrics member', () => {
it('should calculate profit and margin', async () => {
const conductor = await TestConductor.create();
const result = await conductor.executeMember('calculate-metrics', {
revenue: 1000,
costs: 600
});
expect(result).toBeSuccessful();
expect(result.output.profit).toBe(400);
expect(result.output.margin).toBe(40);
});
it('should handle zero revenue', async () => {
const conductor = await TestConductor.create();
const result = await conductor.executeMember('calculate-metrics', {
revenue: 0,
costs: 100
});
expect(result).toHaveError(/division by zero/);
});
});

