Skip to main content

Overview

The DataMember class handles database operations using Cloudflare’s data storage services: D1 (SQL), KV (key-value), and Durable Objects (stateful objects).
import { DataMember } from '@ensemble-edge/conductor';

const data = new DataMember({
  name: 'fetch-user',
  config: {
    type: 'd1',
    operation: 'query',
    query: 'SELECT * FROM users WHERE id = ?',
    params: ['${input.userId}']
  }
});

const result = await data.execute({ userId: 'user_123' });

Constructor

new DataMember(options: DataMemberOptions)
options
DataMemberOptions
required
Data member configuration (extends MemberOptions)
options.name
string
required
Member name
options.config
DataConfig
required
Data operation configuration
options.config.type
string
required
Storage type: d1, kv, durable-object
options.config.operation
string
required
Operation: query, execute, get, put, delete, list
options.config.binding
string
Environment binding name
interface DataConfig {
  type: 'd1' | 'kv' | 'durable-object';
  operation: string;
  binding?: string;
  [key: string]: any;
}

D1 Database Operations

Query

Execute SELECT queries:
- member: get-user
  type: Data
  config:
    type: d1
    operation: query
    binding: DB
    query: SELECT * FROM users WHERE email = ?
    params:
      - ${input.email}
const member = new DataMember({
  name: 'get-user',
  config: {
    type: 'd1',
    operation: 'query',
    query: 'SELECT * FROM users WHERE email = ?',
    params: ['${input.email}']
  }
});

const result = await member.execute({ email: 'user@example.com' });
// result.results: [{ id: 1, email: '...', name: '...' }]

Execute

Run INSERT, UPDATE, DELETE:
- member: create-user
  type: Data
  config:
    type: d1
    operation: execute
    query: |
      INSERT INTO users (email, name, created_at)
      VALUES (?, ?, ?)
    params:
      - ${input.email}
      - ${input.name}
      - ${Date.now()}
const result = await member.execute({
  email: 'new@example.com',
  name: 'New User'
});
// result.success: true
// result.meta.changes: 1
// result.meta.last_row_id: 123

Batch Operations

Execute multiple statements:
- member: batch-insert
  type: Data
  config:
    type: d1
    operation: batch
    statements:
      - query: INSERT INTO users (email, name) VALUES (?, ?)
        params: [${input.email}, ${input.name}]
      - query: INSERT INTO audit_log (action, user_email) VALUES (?, ?)
        params: ['user_created', ${input.email}]

Transactions

Execute statements atomically:
- member: transfer-funds
  type: Data
  config:
    type: d1
    operation: transaction
    statements:
      - query: UPDATE accounts SET balance = balance - ? WHERE id = ?
        params: [${input.amount}, ${input.fromAccount}]
      - query: UPDATE accounts SET balance = balance + ? WHERE id = ?
        params: [${input.amount}, ${input.toAccount}]
      - query: INSERT INTO transactions (from_id, to_id, amount) VALUES (?, ?, ?)
        params: [${input.fromAccount}, ${input.toAccount}, ${input.amount}]

Prepared Statements

Reuse compiled queries:
const member = new DataMember({
  name: 'find-users',
  config: {
    type: 'd1',
    operation: 'query',
    query: 'SELECT * FROM users WHERE name LIKE ? LIMIT ?',
    prepare: true  // Compile query once
  }
});

KV Operations

Get

Read key-value pairs:
- member: get-cache
  type: Data
  config:
    type: kv
    operation: get
    binding: CACHE
    key: ${input.cacheKey}
    type: json  # text, json, arrayBuffer, stream
const result = await member.execute({ cacheKey: 'user:123' });
// result.value: { name: 'Alice', ... }
// result.metadata: { customField: 'value' }

Put

Write key-value pairs:
- member: cache-result
  type: Data
  config:
    type: kv
    operation: put
    binding: CACHE
    key: user:${input.userId}
    value: ${input.userData}
    expirationTtl: 3600  # 1 hour
    metadata:
      cached_at: ${Date.now()}

Delete

Remove keys:
- member: clear-cache
  type: Data
  config:
    type: kv
    operation: delete
    binding: CACHE
    key: ${input.cacheKey}

List

List keys with prefix:
- member: list-users
  type: Data
  config:
    type: kv
    operation: list
    binding: CACHE
    prefix: "user:"
    limit: 100
const result = await member.execute({});
// result.keys: [{ name: 'user:1', metadata: {...} }, ...]
// result.cursor: 'abc123' (for pagination)

Get with Metadata

- member: get-with-meta
  type: Data
  config:
    type: kv
    operation: getWithMetadata
    binding: CACHE
    key: ${input.key}
    type: json

Durable Objects

Get Stub

Get a Durable Object instance:
- member: get-counter
  type: Data
  config:
    type: durable-object
    operation: getStub
    binding: COUNTER
    id: ${input.counterId}
    # or name: ${input.counterName} for named instances

Fetch

Call Durable Object:
- member: increment-counter
  type: Data
  config:
    type: durable-object
    operation: fetch
    binding: COUNTER
    id: ${input.counterId}
    request:
      method: POST
      path: /increment
      body:
        amount: ${input.amount}
// Durable Object implementation
export class Counter {
  state: DurableObjectState;
  value: number = 0;

  constructor(state: DurableObjectState) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/increment') {
      const { amount } = await request.json();
      this.value += amount;
      await this.state.storage.put('value', this.value);
      return Response.json({ value: this.value });
    }

    return new Response('Not found', { status: 404 });
  }
}

Advanced Patterns

Connection Pooling

Reuse database connections:
const member = new DataMember({
  name: 'query',
  config: {
    type: 'd1',
    operation: 'query',
    query: 'SELECT * FROM users',
    pooling: {
      enabled: true,
      maxConnections: 10,
      idleTimeout: 30000
    }
  }
});

Query Builder

Build queries dynamically:
- member: dynamic-query
  type: Data
  config:
    type: d1
    operation: query
    queryBuilder:
      table: users
      select: ['id', 'name', 'email']
      where:
        status: ${input.status}
        tier: ${input.tier}
      orderBy: created_at DESC
      limit: ${input.limit}

Pagination

Handle large result sets:
- member: paginated-query
  type: Data
  config:
    type: d1
    operation: query
    query: |
      SELECT * FROM users
      WHERE created_at > ?
      ORDER BY created_at
      LIMIT ?
    params:
      - ${input.cursor || 0}
      - ${input.pageSize || 50}

Caching Strategies

Write-Through

flow:
  - member: write-to-db
    type: Data
    config:
      type: d1
      operation: execute
      query: UPDATE users SET name = ? WHERE id = ?
      params: [${input.name}, ${input.userId}]

  - member: update-cache
    type: Data
    config:
      type: kv
      operation: put
      key: user:${input.userId}
      value: ${input.userData}

Cache-Aside

flow:
  - member: check-cache
    type: Data
    config:
      type: kv
      operation: get
      key: user:${input.userId}

  - member: query-db
    condition: ${!check-cache.output.value}
    type: Data
    config:
      type: d1
      operation: query
      query: SELECT * FROM users WHERE id = ?
      params: [${input.userId}]

  - member: populate-cache
    condition: ${!check-cache.output.value}
    type: Data
    config:
      type: kv
      operation: put
      key: user:${input.userId}
      value: ${query-db.output.results[0]}
      expirationTtl: 3600

Error Handling

Constraint Violations

try {
  await member.execute({ email: 'duplicate@example.com' });
} catch (error) {
  if (error.code === 'SQLITE_CONSTRAINT') {
    console.error('Duplicate email');
  }
}

Transaction Rollback

- member: safe-transaction
  type: Data
  config:
    type: d1
    operation: transaction
    statements:
      - query: INSERT INTO orders (user_id, total) VALUES (?, ?)
        params: [${input.userId}, ${input.total}]
      - query: UPDATE inventory SET quantity = quantity - ? WHERE product_id = ?
        params: [${input.quantity}, ${input.productId}]
  retry:
    maxAttempts: 3
    backoff: exponential

Performance Optimization

Indexing

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_created_at ON orders(created_at DESC);

Query Optimization

# Bad - N+1 queries
- member: get-orders
  type: Data
  config:
    type: d1
    operation: query
    query: SELECT * FROM orders WHERE user_id = ?

# Good - JOIN
- member: get-orders-with-items
  type: Data
  config:
    type: d1
    operation: query
    query: |
      SELECT o.*, oi.product_id, oi.quantity
      FROM orders o
      LEFT JOIN order_items oi ON o.id = oi.order_id
      WHERE o.user_id = ?

Batch Reads

- member: batch-read
  type: Data
  config:
    type: d1
    operation: query
    query: SELECT * FROM users WHERE id IN (?, ?, ?, ?)
    params: ${input.userIds}

Testing

import { DataMember } from '@ensemble-edge/conductor';
import { describe, it, expect, beforeEach } from 'vitest';

describe('DataMember', () => {
  let member: DataMember;
  let mockDB: any;

  beforeEach(() => {
    mockDB = {
      prepare: (query: string) => ({
        bind: (...params: any[]) => ({
          all: async () => ({
            results: [{ id: 1, name: 'Test' }]
          })
        })
      })
    };

    member = new DataMember({
      name: 'test-query',
      config: {
        type: 'd1',
        operation: 'query',
        query: 'SELECT * FROM users'
      }
    });
  });

  it('executes D1 query', async () => {
    const result = await member.execute({});
    expect(result.results).toHaveLength(1);
    expect(result.results[0].name).toBe('Test');
  });

  it('handles KV operations', async () => {
    const kvMember = new DataMember({
      name: 'kv-get',
      config: {
        type: 'kv',
        operation: 'get',
        key: 'test-key'
      }
    });

    const result = await kvMember.execute({});
    expect(result.value).toBeDefined();
  });
});

Best Practices

  1. Use prepared statements - Better performance
  2. Index frequently queried columns - Speed up lookups
  3. Batch operations - Reduce round trips
  4. Use transactions - Ensure data consistency
  5. Set appropriate TTLs - For KV caching
  6. Handle constraint errors - Graceful failures
  7. Optimize queries - Use EXPLAIN
  8. Paginate large results - Prevent timeouts
  9. Cache intelligently - Balance freshness vs performance
  10. Monitor query performance - Identify slow queries