Skip to main content

Architecture

Documents  Chunk  Embed  Store (Vectorize)
                                    
User Question  Embed  Search  Generate Answer

Complete Implementation

1. Document Ingestion Ensemble

ensemble: ingest-documents

inputs:
  documents:
    type: array
    required: true
  source:
    type: string
    required: true

agents:
  # 1. Chunk documents
  - name: chunk
    operation: code
    config:
      script: scripts/chunk-documents
    input:
      documents: ${input.documents}
      source: ${input.source}
// scripts/chunk-documents.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function chunkDocuments(context: AgentExecutionContext) {
  const { documents, source } = context.input
  const chunkSize = 1000
  const overlap = 200
  const chunks = []

  for (const doc of documents) {
    const text = doc.content

    for (let i = 0; i < text.length; i += chunkSize - overlap) {
      chunks.push({
        text: text.substring(i, i + chunkSize),
        metadata: {
          document_id: doc.id,
          title: doc.title,
          source: source,
          chunk_index: chunks.length,
          created_at: new Date().toISOString()
        }
      })
    }
  }

  return { chunks }
}

  # 2. Embed chunks (parallel)
  - name: embed
    operation: rag
    config:
      action: embed
      text: ${chunk.output.chunks.map(c => c.text)}
      metadata: ${chunk.output.chunks.map(c => c.metadata)}
      namespace: documents

  # 3. Store metadata in D1
  - name: store-metadata
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
        VALUES (?, ?, ?, ?, ?)
      params: ${chunk.output.chunks.map(c => [
        c.metadata.document_id + '-' + c.metadata.chunk_index,
        c.metadata.document_id,
        c.text,
        JSON.stringify(c.metadata),
        c.metadata.created_at
      ])}

output:
  chunks_created: ${chunk.output.chunks.length}
  embedding_status: ${embed.output}

2. Question Answering Ensemble

ensemble: rag-qa

inputs:
  question:
    type: string
    required: true
  topK:
    type: number
    default: 5
  filters:
    type: object

agents:
  # 1. Search relevant chunks
  - name: search
    operation: rag
    config:
      action: search
      query: ${input.question}
      topK: ${input.topK}
      namespace: documents
      filter: ${input.filters}
      includeMetadata: true

  # 2. Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      temperature: 0.3
      prompt: |
        You are a helpful assistant. Answer the question using ONLY the provided context.
        If the answer is not in the context, say "I don't have enough information."

        Context:
        ${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}

        Question: ${input.question}

        Provide a clear, accurate answer with citations like [1], [2] etc.

  # 3. Format response with sources
  - name: format
    operation: code
    config:
      script: scripts/format-rag-response
    input:
      answer: ${answer.output}
      search_results: ${search.output.results}
      question: ${input.question}

output:
  response: ${format.output}
// scripts/format-rag-response.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function formatRagResponse(context: AgentExecutionContext) {
  const { answer, search_results, question } = context.input

  return {
    answer: answer,
    sources: search_results.map((r: any, i: number) => ({
      citation: i + 1,
      text: r.text.substring(0, 200) + '...',
      title: r.metadata.title,
      document_id: r.metadata.document_id,
      relevance: r.score
    })),
    metadata: {
      question: question,
      num_sources: search_results.length,
      timestamp: new Date().toISOString()
    }
  }
}

3. Hybrid Search Enhancement

ensemble: rag-qa-hybrid

agents:
  # Vector search
  - name: vector-search
    operation: rag
    config:
      action: search
      query: ${input.question}
      topK: 10
      namespace: documents

  # Keyword search
  - name: keyword-search
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT id, document_id, text, metadata
        FROM document_chunks
        WHERE text LIKE ? OR text LIKE ?
        LIMIT 10
      params:
        - '%${input.question}%'
        - '%${input.question.split(' ').join('%')}%'

  # Rerank with LLM
  - name: rerank
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Rank these document chunks by relevance to: "${input.question}"
        Return the top 5 as a JSON array of indices (0-based).

        Vector results:
        ${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}

        Keyword results:
        ${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}

  # Select top results
  - name: select-top
    operation: code
    config:
      script: scripts/select-top-results
    input:
      vector_results: ${vector-search.output.results}
      keyword_results: ${keyword-search.output}
      top_indices: ${rerank.output}

```typescript
// scripts/select-top-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function selectTopResults(context: AgentExecutionContext) {
  const { vector_results, keyword_results, top_indices } = context.input

  const allResults = [
    ...vector_results,
    ...keyword_results
  ]

  const topIndices = JSON.parse(top_indices)
  return {
    results: topIndices.map((i: number) => allResults[i])
  }
}
  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Answer using these sources:
        ${select-top.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

Advanced Patterns

Multi-Query RAG

agents:
  # Generate query variations
  - name: generate-queries
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Generate 3 different search queries for: ${input.question}
        Return as JSON array of strings.

  # Search with each query
  - name: search-1
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[0]}
      topK: 5

  - name: search-2
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[1]}
      topK: 5

  - name: search-3
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[2]}
      topK: 5

  # Deduplicate and combine
  - name: combine
    operation: code
    config:
      script: scripts/combine-search-results
    input:
      results_1: ${search-1.output.results}
      results_2: ${search-2.output.results}
      results_3: ${search-3.output.results}

```typescript
// scripts/combine-search-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function combineSearchResults(context: AgentExecutionContext) {
  const { results_1, results_2, results_3 } = context.input

  const all = [
    ...results_1,
    ...results_2,
    ...results_3
  ]

  // Deduplicate by document ID + chunk index
  const unique = [...new Map(
    all.map((r: any) => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
  ).values()]

  // Sort by relevance score
  return {
    results: unique.sort((a: any, b: any) => b.score - a.score).slice(0, 5)
  }
}

Conversational RAG

ensemble: rag-conversational

state:
  schema:
    history: array

agents:
  # Rewrite question with context
  - name: contextualize
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Given this conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Rewrite this follow-up question to be standalone:
        "${input.question}"

  # Search
  - name: search
    operation: rag
    config:
      action: search
      query: ${contextualize.output}
      topK: 5

  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Context:
        ${search.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

  # Update history
  - name: update-history
    operation: code
    state:
      use: [history]
      set:
        history: ${[
          ...state.history,
          { role: 'user', content: input.question },
          { role: 'assistant', content: answer.output }
        ]}
    config:
      script: scripts/return-updated-status
// scripts/return-updated-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function returnUpdatedStatus(context: AgentExecutionContext) {
  return { updated: true }
}

Production Considerations

Error Handling

agents:
  - name: search
    operation: rag
    retry:
      maxAttempts: 3
      backoff: exponential

  - name: fallback-answer
    condition: ${search.failed}
    operation: think
    config:
      prompt: |
        You don't have access to the knowledge base.
        Politely explain you can't answer: ${input.question}

Caching

agents:
  - name: search
    operation: rag
    cache:
      ttl: 3600
      key: rag-search-${hash(input.question)}

  - name: answer
    operation: think
    cache:
      ttl: 3600
      key: rag-answer-${hash(input.question + search.output)}

Monitoring

agents:
  - name: answer
    operation: think

  - name: log-interaction
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO rag_interactions (
          question, answer, num_sources, avg_relevance, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.question}
        - ${answer.output}
        - ${search.output.results.length}
        - ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
        - ${Date.now()}

Database Schema

-- Document chunks metadata
CREATE TABLE document_chunks (
  id TEXT PRIMARY KEY,
  document_id TEXT NOT NULL,
  text TEXT NOT NULL,
  metadata TEXT NOT NULL,
  created_at TEXT NOT NULL,
  INDEX idx_document_id (document_id),
  INDEX idx_created_at (created_at)
);

-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
  text,
  content=document_chunks
);

-- RAG interactions log
CREATE TABLE rag_interactions (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  question TEXT NOT NULL,
  answer TEXT NOT NULL,
  num_sources INTEGER NOT NULL,
  avg_relevance REAL NOT NULL,
  timestamp INTEGER NOT NULL,
  INDEX idx_timestamp (timestamp)
);

Deployment

# wrangler.toml
[ai]
binding = "AI"

[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"

[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"

Testing

describe('RAG Pipeline', () => {
  it('should ingest and retrieve documents', async () => {
    // Ingest
    const ingestResult = await conductor.execute('ingest-documents', {
      documents: [
        {
          id: 'doc-1',
          title: 'Test Document',
          content: 'This is a test document about RAG pipelines.'
        }
      ],
      source: 'test'
    });

    expect(ingestResult.chunks_created).toBeGreaterThan(0);

    // Query
    const qaResult = await conductor.execute('rag-qa', {
      question: 'What is this document about?'
    });

    expect(qaResult.response.answer).toContain('RAG');
    expect(qaResult.response.sources.length).toBeGreaterThan(0);
  });
});

Performance Tips

  1. Chunk size: 1000-1500 characters with 200-character overlap
  2. Top K: Start with 5, increase if answers are incomplete
  3. Cache aggressively: 1 hour for common questions
  4. Use filters: Filter by date, category, etc. for faster search
  5. Monitor relevance: Log average relevance scores

Next Steps