Skip to main content

Overview

Data members handle storage operations across Cloudflare’s data primitives: KV (key-value), D1 (SQL), R2 (object storage), and Vectorize (vector database). They abstract away platform-specific APIs with a unified interface.

Storage Types

KV

Global key-value cache with eventual consistency

D1

Serverless SQLite database for relational data

R2

Object storage compatible with S3 API

Vectorize

Vector database for embeddings and semantic search

Basic Configuration

name: get-user
type: Data
description: Get user from KV cache

config:
  storage: kv           # Storage type: kv, d1, r2, vectorize
  operation: get        # Operation: get, put, delete, list, query
  binding: CACHE        # Binding name from wrangler.toml

schema:
  input:
    type: object
    properties:
      key:
        type: string
    required: [key]

  output:
    type: object
    properties:
      value:
        type: unknown
      found:
        type: boolean

KV Storage

GET Operation

name: cache-get
type: Data

config:
  storage: kv
  operation: get
  binding: CACHE

schema:
  input:
    properties:
      key: string
  output:
    properties:
      value: unknown
      found: boolean
Usage:
flow:
  - member: cache-get
    input:
      key: "user:123"

PUT Operation

name: cache-put
type: Data

config:
  storage: kv
  operation: put
  binding: CACHE
  ttl: 3600  # Optional: expires after 1 hour

schema:
  input:
    properties:
      key: string
      value: unknown
Usage:
flow:
  - member: cache-put
    input:
      key: "user:123"
      value: ${fetch-user.output}

DELETE Operation

name: cache-delete
type: Data

config:
  storage: kv
  operation: delete
  binding: CACHE

LIST Operation

name: cache-list
type: Data

config:
  storage: kv
  operation: list
  binding: CACHE

schema:
  input:
    properties:
      prefix: string      # Optional: filter by prefix
      limit: number       # Optional: max results
      cursor: string      # Optional: pagination

D1 Database

Query Operation

name: get-user
type: Data

config:
  storage: d1
  operation: query
  binding: DB
  query: |
    SELECT id, name, email, created_at
    FROM users
    WHERE id = ?

schema:
  input:
    properties:
      userId: number

Parameterized Queries

name: search-users
type: Data

config:
  storage: d1
  operation: query
  binding: DB
  query: |
    SELECT * FROM users
    WHERE email LIKE ? AND status = ?
    ORDER BY created_at DESC
    LIMIT ?

schema:
  input:
    properties:
      emailPattern: string
      status: string
      limit: number
Usage:
flow:
  - member: search-users
    input:
      emailPattern: "%@example.com"
      status: "active"
      limit: 10

INSERT/UPDATE/DELETE

name: create-user
type: Data

config:
  storage: d1
  operation: query
  binding: DB
  query: |
    INSERT INTO users (name, email, status)
    VALUES (?, ?, ?)
    RETURNING id

schema:
  input:
    properties:
      name: string
      email: string
      status: string

R2 Object Storage

GET Object

name: get-file
type: Data

config:
  storage: r2
  operation: get
  binding: STORAGE

schema:
  input:
    properties:
      key: string
  output:
    properties:
      content: string
      contentType: string
      size: number

PUT Object

name: upload-file
type: Data

config:
  storage: r2
  operation: put
  binding: STORAGE

schema:
  input:
    properties:
      key: string
      content: string
      contentType: string

DELETE Object

name: delete-file
type: Data

config:
  storage: r2
  operation: delete
  binding: STORAGE

LIST Objects

name: list-files
type: Data

config:
  storage: r2
  operation: list
  binding: STORAGE

schema:
  input:
    properties:
      prefix: string
      limit: number

Vectorize

Insert Vectors

name: index-documents
type: Data

config:
  storage: vectorize
  operation: put
  binding: VECTORIZE

schema:
  input:
    properties:
      vectors:
        type: array
        items:
          type: object
          properties:
            id: string
            values: array
            metadata: object

Search Vectors

name: search-documents
type: Data

config:
  storage: vectorize
  operation: query
  binding: VECTORIZE

schema:
  input:
    properties:
      query: array        # Query vector
      topK: number        # Number of results
      filter: object      # Optional metadata filter

Custom Data Members

For complex operations, implement custom logic:
// members/advanced-query/index.ts
import { createDataMember } from '@ensemble-edge/conductor/sdk';

export default createDataMember({
  async handler({ input, env }) {
    // Complex multi-table query
    const results = await env.DB.prepare(`
      SELECT
        u.*,
        COUNT(o.id) as order_count,
        SUM(o.total) as total_spent
      FROM users u
      LEFT JOIN orders o ON u.id = o.user_id
      WHERE u.status = ?
      GROUP BY u.id
      HAVING order_count > ?
      ORDER BY total_spent DESC
      LIMIT ?
    `)
      .bind(input.status, input.minOrders, input.limit)
      .all();

    return {
      users: results.results,
      total: results.results.length
    };
  }
});

Bindings Setup

Configure bindings in wrangler.toml:
# KV binding
[[kv_namespaces]]
binding = "CACHE"
id = "your-kv-namespace-id"

# D1 binding
[[d1_databases]]
binding = "DB"
database_name = "your-database"
database_id = "your-database-id"

# R2 binding
[[r2_buckets]]
binding = "STORAGE"
bucket_name = "your-bucket"

# Vectorize binding
[[vectorize]]
binding = "VECTORIZE"
index_name = "your-index"

Common Patterns

Cache-Aside Pattern

flow:
  # Try cache first
  - member: get-from-cache
    type: Data
    config:
      storage: kv
      operation: get
      binding: CACHE
    input:
      key: "user:${input.userId}"

  # Fetch from DB if not cached
  - member: get-from-db
    condition: ${!get-from-cache.output.found}
    type: Data
    config:
      storage: d1
      operation: query
      binding: DB
      query: "SELECT * FROM users WHERE id = ?"

  # Store in cache for next time
  - member: store-in-cache
    condition: ${get-from-db.success}
    type: Data
    config:
      storage: kv
      operation: put
      binding: CACHE
    input:
      key: "user:${input.userId}"
      value: ${get-from-db.output}

Write-Through Cache

flow:
  # Write to database
  - member: save-to-db
    type: Data
    config:
      storage: d1
      operation: query
      binding: DB
      query: "UPDATE users SET name = ? WHERE id = ?"

  # Update cache immediately
  - member: update-cache
    type: Data
    config:
      storage: kv
      operation: put
      binding: CACHE
    input:
      key: "user:${input.userId}"
      value: ${input.userData}

Pagination

flow:
  - member: list-users
    type: Data
    config:
      storage: d1
      operation: query
      binding: DB
      query: |
        SELECT * FROM users
        ORDER BY created_at DESC
        LIMIT ? OFFSET ?
    input:
      limit: ${input.limit || 10}
      offset: ${input.page * input.limit}

Error Handling

flow:
  - member: get-user
    type: Data
    retry:
      maxAttempts: 3
      backoff: exponential
    onError:
      continue: true  # Don't fail entire ensemble

  # Fallback if database fails
  - member: use-default
    condition: ${!get-user.success}

Performance Tips

Use Batch Operations

// Batch multiple D1 queries
export default createDataMember({
  async handler({ input, env }) {
    const batch = [
      env.DB.prepare('UPDATE users SET active = ? WHERE id = ?').bind(true, 1),
      env.DB.prepare('UPDATE users SET active = ? WHERE id = ?').bind(true, 2),
      env.DB.prepare('UPDATE users SET active = ? WHERE id = ?').bind(true, 3)
    ];

    const results = await env.DB.batch(batch);
    return { updated: results.length };
  }
});

Index Frequently Queried Fields

-- Create index for faster queries
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);

Use KV for Hot Data

# Cache frequently accessed data in KV
- member: cache-hot-data
  config:
    storage: kv
    ttl: 3600  # 1 hour

Testing Data Members

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('get-user', () => {
  it('should retrieve user from D1', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        database: {
          responses: {
            'get-user': {
              id: 1,
              name: 'Alice',
              email: 'alice@example.com'
            }
          }
        }
      }
    });

    const result = await conductor.executeMember('get-user', {
      userId: 1
    });

    expect(result).toBeSuccessful();
    expect(result.output.name).toBe('Alice');
  });
});

Best Practices

  1. Use appropriate storage - KV for cache, D1 for relational, R2 for files, Vectorize for vectors
  2. Set TTL on cached data - Prevent stale data with appropriate expiration
  3. Parameterize queries - Use ? placeholders to prevent SQL injection
  4. Index database fields - Speed up queries with proper indexes
  5. Batch operations - Combine multiple operations when possible
  6. Handle not found - Check found boolean in output
  7. Use transactions - For related updates in D1
  8. Monitor storage limits - KV: 25MB per value, D1: 500MB per database