Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt

Use this file to discover all available pages before exploring further.

Architecture

Documents  Chunk  Embed  Store (Vectorize)
                                    
User Question  Embed  Search  Generate Answer

Complete Implementation

1. Document Ingestion Ensemble

ensemble: ingest-documents

inputs:
  documents:
    type: array
    required: true
  source:
    type: string
    required: true

agents:
  # 1. Chunk documents
  - name: chunk
    operation: code
    config:
      script: scripts/chunk-documents
    input:
      documents: ${input.documents}
      source: ${input.source}
// scripts/chunk-documents.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function chunkDocuments(context: AgentExecutionContext) {
  const { documents, source } = context.input
  const chunkSize = 1000
  const overlap = 200
  const chunks = []

  for (const doc of documents) {
    const text = doc.content

    for (let i = 0; i < text.length; i += chunkSize - overlap) {
      chunks.push({
        text: text.substring(i, i + chunkSize),
        metadata: {
          document_id: doc.id,
          title: doc.title,
          source: source,
          chunk_index: chunks.length,
          created_at: new Date().toISOString()
        }
      })
    }
  }

  return { chunks }
}

  # 2. Embed chunks (parallel)
  - name: embed
    operation: rag
    config:
      action: embed
      text: ${chunk.output.chunks.map(c => c.text)}
      metadata: ${chunk.output.chunks.map(c => c.metadata)}
      namespace: documents

  # 3. Store metadata in D1
  - name: store-metadata
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
        VALUES (?, ?, ?, ?, ?)
      params: ${chunk.output.chunks.map(c => [
        c.metadata.document_id + '-' + c.metadata.chunk_index,
        c.metadata.document_id,
        c.text,
        JSON.stringify(c.metadata),
        c.metadata.created_at
      ])}

output:
  chunks_created: ${chunk.output.chunks.length}
  embedding_status: ${embed.output}

2. Question Answering Ensemble

ensemble: rag-qa

inputs:
  question:
    type: string
    required: true
  topK:
    type: number
    default: 5
  filters:
    type: object

agents:
  # 1. Search relevant chunks
  - name: search
    operation: rag
    config:
      action: search
      query: ${input.question}
      topK: ${input.topK}
      namespace: documents
      filter: ${input.filters}
      includeMetadata: true

  # 2. Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      temperature: 0.3
      prompt: |
        You are a helpful assistant. Answer the question using ONLY the provided context.
        If the answer is not in the context, say "I don't have enough information."

        Context:
        ${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}

        Question: ${input.question}

        Provide a clear, accurate answer with citations like [1], [2] etc.

  # 3. Format response with sources
  - name: format
    operation: code
    config:
      script: scripts/format-rag-response
    input:
      answer: ${answer.output}
      search_results: ${search.output.results}
      question: ${input.question}

output:
  response: ${format.output}
// scripts/format-rag-response.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function formatRagResponse(context: AgentExecutionContext) {
  const { answer, search_results, question } = context.input

  return {
    answer: answer,
    sources: search_results.map((r: any, i: number) => ({
      citation: i + 1,
      text: r.text.substring(0, 200) + '...',
      title: r.metadata.title,
      document_id: r.metadata.document_id,
      relevance: r.score
    })),
    metadata: {
      question: question,
      num_sources: search_results.length,
      timestamp: new Date().toISOString()
    }
  }
}

3. Hybrid Search Enhancement

ensemble: rag-qa-hybrid

agents:
  # Vector search
  - name: vector-search
    operation: rag
    config:
      action: search
      query: ${input.question}
      topK: 10
      namespace: documents

  # Keyword search
  - name: keyword-search
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT id, document_id, text, metadata
        FROM document_chunks
        WHERE text LIKE ? OR text LIKE ?
        LIMIT 10
      params:
        - '%${input.question}%'
        - '%${input.question.split(' ').join('%')}%'

  # Rerank with LLM
  - name: rerank
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Rank these document chunks by relevance to: "${input.question}"
        Return the top 5 as a JSON array of indices (0-based).

        Vector results:
        ${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}

        Keyword results:
        ${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}

  # Select top results
  - name: select-top
    operation: code
    config:
      script: scripts/select-top-results
    input:
      vector_results: ${vector-search.output.results}
      keyword_results: ${keyword-search.output}
      top_indices: ${rerank.output}

```typescript
// scripts/select-top-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function selectTopResults(context: AgentExecutionContext) {
  const { vector_results, keyword_results, top_indices } = context.input

  const allResults = [
    ...vector_results,
    ...keyword_results
  ]

  const topIndices = JSON.parse(top_indices)
  return {
    results: topIndices.map((i: number) => allResults[i])
  }
}
  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Answer using these sources:
        ${select-top.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

Advanced Patterns

Multi-Query RAG

agents:
  # Generate query variations
  - name: generate-queries
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Generate 3 different search queries for: ${input.question}
        Return as JSON array of strings.

  # Search with each query
  - name: search-1
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[0]}
      topK: 5

  - name: search-2
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[1]}
      topK: 5

  - name: search-3
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[2]}
      topK: 5

  # Deduplicate and combine
  - name: combine
    operation: code
    config:
      script: scripts/combine-search-results
    input:
      results_1: ${search-1.output.results}
      results_2: ${search-2.output.results}
      results_3: ${search-3.output.results}

```typescript
// scripts/combine-search-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function combineSearchResults(context: AgentExecutionContext) {
  const { results_1, results_2, results_3 } = context.input

  const all = [
    ...results_1,
    ...results_2,
    ...results_3
  ]

  // Deduplicate by document ID + chunk index
  const unique = [...new Map(
    all.map((r: any) => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
  ).values()]

  // Sort by relevance score
  return {
    results: unique.sort((a: any, b: any) => b.score - a.score).slice(0, 5)
  }
}

Conversational RAG

ensemble: rag-conversational

state:
  schema:
    history: array

agents:
  # Rewrite question with context
  - name: contextualize
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Given this conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Rewrite this follow-up question to be standalone:
        "${input.question}"

  # Search
  - name: search
    operation: rag
    config:
      action: search
      query: ${contextualize.output}
      topK: 5

  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Context:
        ${search.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

  # Update history
  - name: update-history
    operation: code
    state:
      use: [history]
      set:
        history: ${[
          ...state.history,
          { role: 'user', content: input.question },
          { role: 'assistant', content: answer.output }
        ]}
    config:
      script: scripts/return-updated-status
// scripts/return-updated-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function returnUpdatedStatus(context: AgentExecutionContext) {
  return { updated: true }
}

Production Considerations

Error Handling

agents:
  - name: search
    operation: rag
    retry:
      maxAttempts: 3
      backoff: exponential

  - name: fallback-answer
    condition: ${search.failed}
    operation: think
    config:
      prompt: |
        You don't have access to the knowledge base.
        Politely explain you can't answer: ${input.question}

Caching

agents:
  - name: search
    operation: rag
    cache:
      ttl: 3600
      key: rag-search-${hash(input.question)}

  - name: answer
    operation: think
    cache:
      ttl: 3600
      key: rag-answer-${hash(input.question + search.output)}

Monitoring

agents:
  - name: answer
    operation: think

  - name: log-interaction
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO rag_interactions (
          question, answer, num_sources, avg_relevance, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.question}
        - ${answer.output}
        - ${search.output.results.length}
        - ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
        - ${Date.now()}

Database Schema

-- Document chunks metadata
CREATE TABLE document_chunks (
  id TEXT PRIMARY KEY,
  document_id TEXT NOT NULL,
  text TEXT NOT NULL,
  metadata TEXT NOT NULL,
  created_at TEXT NOT NULL,
  INDEX idx_document_id (document_id),
  INDEX idx_created_at (created_at)
);

-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
  text,
  content=document_chunks
);

-- RAG interactions log
CREATE TABLE rag_interactions (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  question TEXT NOT NULL,
  answer TEXT NOT NULL,
  num_sources INTEGER NOT NULL,
  avg_relevance REAL NOT NULL,
  timestamp INTEGER NOT NULL,
  INDEX idx_timestamp (timestamp)
);

Deployment

# wrangler.toml
[ai]
binding = "AI"

[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"

[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"

Testing

describe('RAG Pipeline', () => {
  it('should ingest and retrieve documents', async () => {
    // Ingest
    const ingestResult = await conductor.execute('ingest-documents', {
      documents: [
        {
          id: 'doc-1',
          title: 'Test Document',
          content: 'This is a test document about RAG pipelines.'
        }
      ],
      source: 'test'
    });

    expect(ingestResult.chunks_created).toBeGreaterThan(0);

    // Query
    const qaResult = await conductor.execute('rag-qa', {
      question: 'What is this document about?'
    });

    expect(qaResult.response.answer).toContain('RAG');
    expect(qaResult.response.sources.length).toBeGreaterThan(0);
  });
});

Performance Tips

  1. Chunk size: 1000-1500 characters with 200-character overlap
  2. Top K: Start with 5, increase if answers are incomplete
  3. Cache aggressively: 1 hour for common questions
  4. Use filters: Filter by date, category, etc. for faster search
  5. Monitor relevance: Log average relevance scores

Next Steps

RAG Operation

RAG operation reference

Vectorize

Vector storage

Multi-Agent Analysis

Multi-agent patterns

Document Intelligence

Document processing