Skip to main content

Overview

Members are the building blocks of Conductor workflows. This guide walks through creating Function, Think, Data, and API members from scratch.

Member Anatomy

Every member consists of two parts:

1. Configuration (member.yaml)

Defines metadata, type, and schema:
name: my-member
type: Function
description: What this member does

config:
  # Member-specific configuration

schema:
  input:
    type: object
    properties:
      # Input parameters
  output:
    type: object
    properties:
      # Output format

2. Implementation (index.ts)

The code that executes:
export default async function myMember({ input, state, setState, env }) {
  // Your logic here
  return { result: 'output' };
}

Using the CLI

Create Member with CLI

# Create Function member
conductor add member calculate-metrics Function

# Create Think member
conductor add member analyze-sentiment Think

# Create Data member
conductor add member fetch-user-data Data

# Create API member
conductor add member fetch-pricing API
This creates the directory structure and template files.

Function Members

Basic Function Member

# members/calculate-metrics/member.yaml
name: calculate-metrics
type: Function
description: Calculate business metrics

schema:
  input:
    type: object
    properties:
      revenue:
        type: number
      costs:
        type: number
    required: [revenue, costs]

  output:
    type: object
    properties:
      profit:
        type: number
      margin:
        type: number
// members/calculate-metrics/index.ts
export default async function calculateMetrics({ input }) {
  const { revenue, costs } = input;

  const profit = revenue - costs;
  const margin = (profit / revenue) * 100;

  return {
    profit,
    margin: parseFloat(margin.toFixed(2))
  };
}

Function Member with SDK Factory

// members/calculate-metrics/index.ts
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ input }) {
    const { revenue, costs } = input;

    return {
      profit: revenue - costs,
      margin: ((revenue - costs) / revenue) * 100
    };
  }
});

Function Member with State

import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ input, state, setState }) {
    // Read from shared state
    const previousResult = state.previousCalculation;

    // Calculate
    const result = performCalculation(input, previousResult);

    // Write to shared state
    setState({
      previousCalculation: result,
      timestamp: Date.now()
    });

    return { result };
  }
});

Function Member with Utilities

import { createFunctionMember } from '@ensemble-edge/conductor/sdk';
import { validateEmail, formatCurrency } from '../../lib/helpers';

export default createFunctionMember({
  async handler({ input }) {
    if (!validateEmail(input.email)) {
      throw new Error('Invalid email address');
    }

    return {
      email: input.email,
      formattedTotal: formatCurrency(input.total)
    };
  }
});

Think Members

Think members use AI providers for reasoning tasks.

Basic Think Member

# members/analyze-sentiment/member.yaml
name: analyze-sentiment
type: Think
description: Analyze text sentiment with AI

config:
  provider: openai
  model: gpt-4o-mini
  routing: cloudflare-gateway
  temperature: 0.3
  maxTokens: 100
  systemPrompt: |
    Analyze the sentiment of the given text.
    Respond with a JSON object:
    {
      "sentiment": "positive" | "negative" | "neutral",
      "confidence": 0.0 to 1.0,
      "reasoning": "brief explanation"
    }

schema:
  input:
    type: object
    properties:
      text:
        type: string
    required: [text]

  output:
    type: object
    properties:
      sentiment:
        type: string
      confidence:
        type: number
      reasoning:
        type: string
// members/analyze-sentiment/index.ts
// Think members handled by Conductor runtime
export default null;
Think members don’t require implementation code - Conductor handles AI provider calls automatically based on the configuration.

Think Member with Structured Output

config:
  provider: openai
  model: gpt-4o
  routing: cloudflare-gateway
  temperature: 0.2
  responseFormat:
    type: json_schema
    json_schema:
      name: company_analysis
      strict: true
      schema:
        type: object
        properties:
          industry:
            type: string
          employees:
            type: number
          founded:
            type: number
          summary:
            type: string
        required: [industry, employees, founded, summary]
        additionalProperties: false

Think Member with Dynamic Prompts

config:
  provider: anthropic
  model: claude-3-5-sonnet-20241022
  routing: cloudflare-gateway
  temperature: 0.7
  # Prompt interpolation with input
  systemPrompt: |
    You are analyzing companies in the ${input.industry} industry.
    Focus on ${input.analysisType} factors.

Data Members

Data members interact with storage systems.

KV Storage Member

# members/cache-get/member.yaml
name: cache-get
type: Data
description: Get value from KV cache

config:
  storage: kv
  operation: get
  binding: CACHE  # wrangler.toml binding name

schema:
  input:
    type: object
    properties:
      key:
        type: string
    required: [key]

  output:
    type: object
    properties:
      value:
        type: unknown
      found:
        type: boolean

D1 Database Member

# members/get-user/member.yaml
name: get-user
type: Data
description: Get user from D1 database

config:
  storage: d1
  operation: query
  binding: DB
  query: |
    SELECT id, name, email, created_at
    FROM users
    WHERE id = ?

schema:
  input:
    type: object
    properties:
      userId:
        type: number
    required: [userId]

Custom Data Member

// members/advanced-query/index.ts
import { createDataMember } from '@ensemble-edge/conductor/sdk';

export default createDataMember({
  async handler({ input, env }) {
    // Complex query logic
    const results = await env.DB.prepare(`
      SELECT u.*, COUNT(o.id) as order_count
      FROM users u
      LEFT JOIN orders o ON u.id = o.user_id
      WHERE u.status = ?
      GROUP BY u.id
      ORDER BY order_count DESC
      LIMIT ?
    `)
      .bind(input.status, input.limit)
      .all();

    return {
      users: results.results,
      total: results.results.length
    };
  }
});

R2 Storage Member

import { createDataMember } from '@ensemble-edge/conductor/sdk';

export default createDataMember({
  async handler({ input, env }) {
    const { operation, key, value } = input;

    switch (operation) {
      case 'get':
        const object = await env.STORAGE.get(key);
        return {
          value: object ? await object.text() : null,
          found: !!object
        };

      case 'put':
        await env.STORAGE.put(key, value);
        return { success: true };

      case 'delete':
        await env.STORAGE.delete(key);
        return { success: true };

      default:
        throw new Error(`Unknown operation: ${operation}`);
    }
  }
});

API Members

API members make HTTP requests to external services.

Basic API Member

# members/fetch-pricing/member.yaml
name: fetch-pricing
type: API
description: Fetch pricing from external API

config:
  url: "https://api.example.com/pricing"
  method: GET
  headers:
    Authorization: "Bearer ${env.API_KEY}"
  timeout: 30000
  retries: 3

schema:
  input:
    type: object
    properties:
      plan:
        type: string
    required: [plan]

  output:
    type: object
    properties:
      price:
        type: number
      currency:
        type: string

API Member with Dynamic URL

config:
  url: "https://api.example.com/users/${input.userId}"
  method: GET
  headers:
    Authorization: "Bearer ${env.API_KEY}"

Custom API Member

// members/advanced-api-call/index.ts
import { createAPIMember } from '@ensemble-edge/conductor/sdk';

export default createAPIMember({
  async handler({ input, env }) {
    const response = await fetch(input.url, {
      method: input.method || 'GET',
      headers: {
        'Authorization': `Bearer ${env.API_KEY}`,
        'Content-Type': 'application/json',
        ...input.headers
      },
      body: input.body ? JSON.stringify(input.body) : undefined
    });

    if (!response.ok) {
      throw new Error(`API error: ${response.statusText}`);
    }

    const data = await response.json();

    return {
      data,
      status: response.status,
      headers: Object.fromEntries(response.headers.entries())
    };
  }
});

API Member with Retry Logic

import { createAPIMember } from '@ensemble-edge/conductor/sdk';

async function fetchWithRetry(url: string, options: RequestInit, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url, options);
      if (response.ok) return response;

      // Retry on 5xx errors
      if (response.status >= 500 && attempt < maxRetries) {
        await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
        continue;
      }

      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    } catch (error) {
      if (attempt === maxRetries) throw error;
      await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
    }
  }
}

export default createAPIMember({
  async handler({ input, env }) {
    const response = await fetchWithRetry(input.url, {
      method: 'GET',
      headers: { 'Authorization': `Bearer ${env.API_KEY}` }
    });

    return { data: await response.json() };
  }
});

Advanced Patterns

Member with Validation

import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

function validateInput(input: any): void {
  if (!input.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(input.email)) {
    throw new Error('Invalid email address');
  }

  if (input.age && (input.age < 0 || input.age > 150)) {
    throw new Error('Invalid age');
  }
}

export default createFunctionMember({
  async handler({ input }) {
    validateInput(input);

    // Process valid input
    return { success: true };
  }
});

Member with Error Handling

import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ input }) {
    try {
      const result = await riskyOperation(input);
      return { result, success: true };
    } catch (error) {
      // Log error but don't throw - return structured error
      console.error('Operation failed:', error);

      return {
        success: false,
        error: error.message,
        fallbackResult: getDefaultValue()
      };
    }
  }
});

Member with Telemetry

import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ input, env }) {
    const startTime = performance.now();

    try {
      const result = await processData(input);

      // Log success
      env.ANALYTICS?.writeDataPoint({
        blobs: ['process-data', 'success'],
        doubles: [performance.now() - startTime],
        indexes: [Date.now()]
      });

      return result;
    } catch (error) {
      // Log failure
      env.ANALYTICS?.writeDataPoint({
        blobs: ['process-data', 'error'],
        doubles: [performance.now() - startTime],
        indexes: [Date.now()]
      });

      throw error;
    }
  }
});

Member with Caching Logic

import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ input, env }) {
    const cacheKey = `result:${input.id}`;

    // Check cache first
    const cached = await env.CACHE.get(cacheKey);
    if (cached) {
      return JSON.parse(cached);
    }

    // Compute result
    const result = await expensiveComputation(input);

    // Cache for 1 hour
    await env.CACHE.put(cacheKey, JSON.stringify(result), {
      expirationTtl: 3600
    });

    return result;
  }
});

Best Practices

1. Single Responsibility

// ✅ Good - focused on one task
export default createFunctionMember({
  async handler({ input }) {
    return { metric: calculateMetric(input) };
  }
});

// ❌ Bad - doing too much
export default createFunctionMember({
  async handler({ input, env }) {
    const data = await fetchData(env);
    const analysis = await analyzeData(data);
    const report = await generateReport(analysis);
    await saveToDatabase(env, report);
    return { report };
  }
});

2. Type Safety

// Define types for clarity
interface CalculateInput {
  revenue: number;
  costs: number;
}

interface CalculateOutput {
  profit: number;
  margin: number;
}

export default createFunctionMember({
  async handler({ input }: { input: CalculateInput }): Promise<CalculateOutput> {
    return {
      profit: input.revenue - input.costs,
      margin: ((input.revenue - input.costs) / input.revenue) * 100
    };
  }
});

3. Error Messages

// ✅ Good - descriptive errors
if (!input.email) {
  throw new Error('Email address is required');
}

if (!isValidEmail(input.email)) {
  throw new Error(`Invalid email format: ${input.email}`);
}

// ❌ Bad - vague errors
if (!input.email || !isValidEmail(input.email)) {
  throw new Error('Bad input');
}

4. Schema Validation

# ✅ Good - strict schema
schema:
  input:
    type: object
    properties:
      email:
        type: string
        format: email
      age:
        type: number
        minimum: 0
        maximum: 150
    required: [email]
    additionalProperties: false

# ❌ Bad - loose schema
schema:
  input:
    type: object

5. Documentation

name: calculate-roi
type: Function
description: Calculate return on investment

# Document each field
schema:
  input:
    type: object
    properties:
      initialInvestment:
        type: number
        description: Initial investment amount in USD
      currentValue:
        type: number
        description: Current value of investment in USD
      timeYears:
        type: number
        description: Investment period in years

Testing Members

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('calculate-metrics member', () => {
  it('should calculate profit and margin', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeMember('calculate-metrics', {
      revenue: 1000,
      costs: 600
    });

    expect(result).toBeSuccessful();
    expect(result.output.profit).toBe(400);
    expect(result.output.margin).toBe(40);
  });

  it('should handle zero revenue', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeMember('calculate-metrics', {
      revenue: 0,
      costs: 100
    });

    expect(result).toHaveError(/division by zero/);
  });
});