Skip to main content

storage Operation

Access Cloudflare storage: KV (key-value), D1 (SQL database), R2 (object storage), and Vectorize (vector database). The storage operation provides a unified interface to all Cloudflare data primitives with automatic binding resolution.

Configuration

config:
  type: string       # kv, d1, r2, vectorize
  action: string     # get, put, delete, list, query
  [type-specific options]

KV (Key-Value Store)

Global key-value cache with eventual consistency. Perfect for configuration, sessions, and caching.

GET Operation

operations:
  - name: get-user
    operation: storage
    config:
      type: kv
      action: get
      key: user-${input.id}
Output:
{
  value: any | null     // Parsed JSON value
  found: boolean        // true if key exists
  metadata: object      // Optional metadata
}

PUT Operation

operations:
  - name: cache-user
    operation: storage
    config:
      type: kv
      action: put
      key: user-${input.id}
      value: ${input.data}
      expirationTtl: 3600  # Expires in 1 hour
Options:
  • expirationTtl (number) - Seconds until expiration
  • expiration (number) - Unix timestamp for expiration
  • metadata (object) - Custom metadata (max 1KB)

DELETE Operation

operations:
  - name: invalidate-cache
    operation: storage
    config:
      type: kv
      action: delete
      key: user-${input.id}

LIST Operation

operations:
  - name: list-users
    operation: storage
    config:
      type: kv
      action: list
      prefix: user-
      limit: 100
Options:
  • prefix (string) - Filter keys by prefix
  • limit (number) - Max results (default: 1000, max: 1000)
  • cursor (string) - Pagination cursor
Output:
{
  keys: Array<{
    name: string
    expiration?: number
    metadata?: object
  }>
  list_complete: boolean
  cursor?: string
}

D1 (SQL Database)

Serverless SQLite database for relational data with full SQL support.

Query Operation

operations:
  - name: get-user
    operation: storage
    config:
      type: d1
      query: SELECT * FROM users WHERE id = ?
      params: [${input.user_id}]
Output:
{
  results: any[]        // Query results
  success: boolean
  meta: {
    duration: number    // Query duration (ms)
    rows_read: number
    rows_written: number
  }
}

Parameterized Queries

Always use parameterized queries to prevent SQL injection:
operations:
  - name: search-users
    operation: storage
    config:
      type: d1
      query: |
        SELECT * FROM users
        WHERE email LIKE ? AND status = ?
        ORDER BY created_at DESC
        LIMIT ?
      params:
        - %@example.com
        - active
        - 10

INSERT with RETURNING

operations:
  - name: create-user
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO users (name, email, status)
        VALUES (?, ?, ?)
        RETURNING id
      params:
        - ${input.name}
        - ${input.email}
        - active

UPDATE Operation

operations:
  - name: update-user
    operation: storage
    config:
      type: d1
      query: |
        UPDATE users
        SET name = ?, email = ?, updated_at = CURRENT_TIMESTAMP
        WHERE id = ?
      params:
        - ${input.name}
        - ${input.email}
        - ${input.id}

DELETE Operation

operations:
  - name: delete-user
    operation: storage
    config:
      type: d1
      query: DELETE FROM users WHERE id = ?
      params: [${input.id}]

Complex Queries

operations:
  - name: user-analytics
    operation: storage
    config:
      type: d1
      query: |
        SELECT
          u.id,
          u.name,
          u.email,
          COUNT(o.id) as order_count,
          SUM(o.total) as total_spent,
          AVG(o.total) as avg_order_value
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
        WHERE u.status = ?
        GROUP BY u.id
        HAVING order_count > ?
        ORDER BY total_spent DESC
        LIMIT ?
      params:
        - active
        - 5
        - 100

Transactions

For atomic operations across multiple queries:
operations:
  - name: transfer-funds
    operation: storage
    config:
      type: d1
      batch:
        - query: UPDATE accounts SET balance = balance - ? WHERE id = ?
          params: [${input.amount}, ${input.from_account}]
        - query: UPDATE accounts SET balance = balance + ? WHERE id = ?
          params: [${input.amount}, ${input.to_account}]
        - query: INSERT INTO transactions (from_account, to_account, amount) VALUES (?, ?, ?)
          params: [${input.from_account}, ${input.to_account}, ${input.amount}]

R2 (Object Storage)

S3-compatible object storage for files, images, backups, and large data.

GET Object

operations:
  - name: get-file
    operation: storage
    config:
      type: r2
      action: get
      key: documents/${input.filename}
Output:
{
  body: ReadableStream | string | ArrayBuffer
  httpMetadata: {
    contentType?: string
    contentLanguage?: string
    contentDisposition?: string
    contentEncoding?: string
    cacheControl?: string
    cacheExpiry?: Date
  }
  customMetadata: Record<string, string>
  size: number
  etag: string
  uploaded: Date
}

PUT Object

operations:
  - name: upload-file
    operation: storage
    config:
      type: r2
      action: put
      key: documents/${input.filename}
      value: ${input.content}
      httpMetadata:
        contentType: ${input.contentType}
      customMetadata:
        uploadedBy: ${input.userId}
        category: ${input.category}
Options:
  • httpMetadata (object) - Standard HTTP metadata
  • customMetadata (object) - Custom key-value metadata (max 2KB)

DELETE Object

operations:
  - name: delete-file
    operation: storage
    config:
      type: r2
      action: delete
      key: documents/${input.filename}

LIST Objects

operations:
  - name: list-files
    operation: storage
    config:
      type: r2
      action: list
      prefix: documents/
      limit: 1000
Options:
  • prefix (string) - Filter by prefix
  • limit (number) - Max results (default: 1000)
  • delimiter (string) - Directory delimiter
  • cursor (string) - Pagination cursor
  • include (string[]) - Include metadata: ['httpMetadata', 'customMetadata']
Output:
{
  objects: Array<{
    key: string
    size: number
    etag: string
    uploaded: Date
    httpMetadata?: object
    customMetadata?: object
  }>
  truncated: boolean
  cursor?: string
  delimitedPrefixes: string[]
}

HEAD Object (Metadata Only)

operations:
  - name: check-file
    operation: storage
    config:
      type: r2
      action: head
      key: documents/${input.filename}
Returns object metadata without downloading the body.

Vectorize (Vector Database)

Vector database for semantic search, RAG, and similarity matching.

Insert Vectors

operations:
  - name: index-document
    operation: storage
    config:
      type: vectorize
      action: insert
      id: doc-${input.id}
      vector: ${embed.output}
      metadata:
        text: ${input.text}
        category: ${input.category}
        timestamp: ${Date.now()}
Batch Insert:
operations:
  - name: index-batch
    operation: storage
    config:
      type: vectorize
      action: insert
      vectors:
        - id: doc-1
          values: ${embed1.output}
          metadata: { text: "..." }
        - id: doc-2
          values: ${embed2.output}
          metadata: { text: "..." }

Query Vectors

operations:
  - name: search-similar
    operation: storage
    config:
      type: vectorize
      action: query
      vector: ${query-embed.output}
      topK: 10
      returnMetadata: true
      returnValues: false
Options:
  • topK (number) - Number of results (max: 100)
  • returnMetadata (boolean) - Include metadata (default: true)
  • returnValues (boolean) - Include vectors (default: false)
  • filter (object) - Metadata filter
Output:
{
  matches: Array<{
    id: string
    score: number       // Similarity score
    metadata?: object
    values?: number[]
  }>
}

Query with Metadata Filter

operations:
  - name: search-filtered
    operation: storage
    config:
      type: vectorize
      action: query
      vector: ${query-embed.output}
      topK: 5
      filter:
        category: "documentation"
        timestamp: { $gte: ${Date.now() - 86400000} }

Get Vector by ID

operations:
  - name: get-vector
    operation: storage
    config:
      type: vectorize
      action: getByIds
      ids: [doc-1, doc-2, doc-3]

Delete Vectors

operations:
  - name: delete-vector
    operation: storage
    config:
      type: vectorize
      action: deleteByIds
      ids: [doc-${input.id}]

Common Patterns

Cache-Aside Pattern

Check cache first, then database:
operations:
  # Step 1: Try cache
  - name: get-from-cache
    operation: storage
    config:
      type: kv
      action: get
      key: user-${input.id}

  # Step 2: Fetch from DB if not cached
  - name: get-from-db
    condition: ${!get-from-cache.output.found}
    operation: storage
    config:
      type: d1
      query: SELECT * FROM users WHERE id = ?
      params: [${input.id}]

  # Step 3: Store in cache for next time
  - name: cache-result
    condition: ${get-from-db.output.results.length > 0}
    operation: storage
    config:
      type: kv
      action: put
      key: user-${input.id}
      value: ${get-from-db.output.results[0]}
      expirationTtl: 3600

outputs:
  user: ${get-from-cache.output.found ? get-from-cache.output.value : get-from-db.output.results[0]}

Write-Through Cache

Update database and cache simultaneously:
operations:
  # Step 1: Write to database
  - name: update-db
    operation: storage
    config:
      type: d1
      query: UPDATE users SET name = ?, email = ? WHERE id = ?
      params:
        - ${input.name}
        - ${input.email}
        - ${input.id}

  # Step 2: Update cache immediately
  - name: update-cache
    operation: storage
    config:
      type: kv
      action: put
      key: user-${input.id}
      value:
        id: ${input.id}
        name: ${input.name}
        email: ${input.email}
      expirationTtl: 3600

Pagination

operations:
  - name: list-users
    operation: storage
    config:
      type: d1
      query: |
        SELECT * FROM users
        ORDER BY created_at DESC
        LIMIT ? OFFSET ?
      params:
        - ${input.limit || 20}
        - ${(input.page - 1) * input.limit}

Semantic Search (RAG)

ensemble: semantic-search

inputs:
  query: string

operations:
  # Step 1: Embed query
  - name: embed-query
    operation: ml
    config:
      model: '@cf/baai/bge-base-en-v1.5'
      input: ${input.query}

  # Step 2: Search vectors
  - name: search-vectors
    operation: storage
    config:
      type: vectorize
      action: query
      vector: ${embed-query.output}
      topK: 10
      returnMetadata: true

  # Step 3: Rerank with AI
  - name: rerank
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      temperature: 0.2
      prompt: |
        Query: ${input.query}

        Search results:
        ${search-vectors.output.matches.map(m => m.metadata.text).join('\n\n')}

        Rerank these results by relevance to the query.
        Return top 5 as JSON array with original indices.

outputs:
  results: ${rerank.output}

File Upload Pipeline

operations:
  # Step 1: Upload to R2
  - name: upload-file
    operation: storage
    config:
      type: r2
      action: put
      key: uploads/${input.filename}
      value: ${input.file}
      httpMetadata:
        contentType: ${input.contentType}
      customMetadata:
        uploadedBy: ${input.userId}
        uploadedAt: ${Date.now()}

  # Step 2: Save metadata to D1
  - name: save-metadata
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO files (filename, user_id, size, content_type, uploaded_at)
        VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
        RETURNING id
      params:
        - ${input.filename}
        - ${input.userId}
        - ${input.file.size}
        - ${input.contentType}

  # Step 3: Cache file info
  - name: cache-file-info
    operation: storage
    config:
      type: kv
      action: put
      key: file-${save-metadata.output.results[0].id}
      value:
        id: ${save-metadata.output.results[0].id}
        filename: ${input.filename}
        url: https://r2.example.com/${input.filename}
      expirationTtl: 86400

outputs:
  fileId: ${save-metadata.output.results[0].id}
  url: https://r2.example.com/${input.filename}

Bindings Setup

Configure storage bindings in wrangler.toml:
# KV Namespace
[[kv_namespaces]]
binding = "CACHE"
id = "your-kv-namespace-id"
preview_id = "your-preview-namespace-id"

# D1 Database
[[d1_databases]]
binding = "DB"
database_name = "production-db"
database_id = "your-database-id"

# R2 Bucket
[[r2_buckets]]
binding = "STORAGE"
bucket_name = "production-files"
preview_bucket_name = "preview-files"

# Vectorize Index
[[vectorize]]
binding = "VECTORIZE"
index_name = "documents-index"

Performance Tips

1. Use Batch Operations

Combine multiple D1 queries:
operations:
  - name: batch-updates
    operation: storage
    config:
      type: d1
      batch:
        - query: UPDATE users SET active = ? WHERE id = ?
          params: [true, 1]
        - query: UPDATE users SET active = ? WHERE id = ?
          params: [true, 2]
        - query: UPDATE users SET active = ? WHERE id = ?
          params: [true, 3]

2. Index Database Fields

Create indexes for frequently queried columns:
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_status_created ON users(status, created_at);
CREATE INDEX idx_orders_user_id ON orders(user_id);

3. Use KV for Hot Data

Cache frequently accessed data:
operations:
  - name: cache-hot-data
    operation: storage
    config:
      type: kv
      action: put
      key: hot-${input.key}
      value: ${input.data}
      expirationTtl: 3600  # 1 hour

4. Paginate Large Results

Never fetch all records at once:
# Good: Paginated
operations:
  - name: list-paginated
    operation: storage
    config:
      type: d1
      query: SELECT * FROM users LIMIT ? OFFSET ?
      params: [100, ${input.offset}]

# Bad: Fetch all
operations:
  - name: list-all
    operation: storage
    config:
      type: d1
      query: SELECT * FROM users

5. Use Prepared Statements

D1 automatically prepares parameterized queries for better performance.

Error Handling

operations:
  - name: get-user
    operation: storage
    config:
      type: d1
      query: SELECT * FROM users WHERE id = ?
      params: [${input.id}]
    retry:
      maxAttempts: 3
      backoff: exponential

  # Fallback if database fails
  - name: use-default
    condition: ${!get-user.output.success}
    operation: code
    config:
      code: |
        return { user: { id: ${input.id}, name: 'Guest' } };

Testing

import { TestConductor } from '@ensemble/conductor/testing';

describe('storage operations', () => {
  it('should get user from D1', async () => {
    const conductor = await TestConductor.create({
      projectPath: './conductor',
      mocks: {
        database: {
          'get-user': {
            results: [{
              id: 1,
              name: 'Alice',
              email: 'alice@example.com'
            }],
            success: true
          }
        }
      }
    });

    const result = await conductor.executeAgent('get-user-agent', {
      userId: 1
    });

    expect(result.output.user.name).toBe('Alice');
  });
});

Best Practices

1. Choose the Right Storage
# KV: Cache, config, sessions
# D1: Relational data, transactions
# R2: Files, images, backups
# Vectorize: Embeddings, semantic search
2. Set Appropriate TTLs
# Good: Cache with TTL
operations:
  - name: cache
    operation: storage
    config:
      type: kv
      action: put
      expirationTtl: 3600
3. Always Parameterize Queries
# Good: Parameterized
query: SELECT * FROM users WHERE id = ?
params: [${input.id}]

# Bad: String interpolation (SQL injection risk)
query: SELECT * FROM users WHERE id = ${input.id}
4. Handle Not Found Cases
operations:
  - name: get
    operation: storage
    config:
      type: kv
      action: get
      key: ${input.key}

  - name: handle-not-found
    condition: ${!get.output.found}
    operation: code
    config:
      code: return { error: 'Not found' };
5. Monitor Storage Limits
  • KV: 25MB per value
  • D1: 500MB per database
  • R2: Unlimited with paid plan
  • Vectorize: 200,000 vectors (free), unlimited (paid)
6. Use Transactions for Atomic Operations
# Atomic: Both succeed or both fail
operations:
  - name: transfer
    operation: storage
    config:
      type: d1
      batch:
        - query: UPDATE accounts SET balance = balance - ? WHERE id = ?
        - query: UPDATE accounts SET balance = balance + ? WHERE id = ?
7. Cache Embeddings
# Embeddings are expensive - cache them
operations:
  - name: embed
    operation: ml
    config:
      model: '@cf/baai/bge-base-en-v1.5'
      input: ${input.text}
    cache:
      ttl: 86400
      key: embed-${input.text}

Storage Limits

KV

  • Max value size: 25 MB
  • Max metadata size: 1024 bytes
  • Max keys per account: 1 billion
  • Operations per second: Unlimited

D1

  • Max database size: 500 MB (free), 10 GB (paid)
  • Max query time: 30 seconds
  • Max batch size: 1000 statements
  • Databases per account: 10 (free), 50,000 (paid)

R2

  • Max object size: 5 TB
  • Max multipart parts: 10,000
  • Operations per second: Unlimited
  • Storage: 10 GB/month free, then paid

Vectorize

  • Max dimensions: 1536
  • Max vectors: 200,000 (free), unlimited (paid)
  • Queries per second: Varies by plan
  • Max topK: 100

Next Steps