> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# RAG Pipeline

> Complete end-to-end RAG implementation. Document ingestion, vector search, and answer generation.

## Architecture

```
Documents  Chunk  Embed  Store (Vectorize)
                                    
User Question  Embed  Search  Generate Answer
```

## Complete Implementation

### 1. Document Ingestion Ensemble

```yaml theme={null}
ensemble: ingest-documents

inputs:
  documents:
    type: array
    required: true
  source:
    type: string
    required: true

agents:
  # 1. Chunk documents
  - name: chunk
    operation: code
    config:
      script: scripts/chunk-documents
    input:
      documents: ${input.documents}
      source: ${input.source}
```

```typescript theme={null}
// scripts/chunk-documents.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function chunkDocuments(context: AgentExecutionContext) {
  const { documents, source } = context.input
  const chunkSize = 1000
  const overlap = 200
  const chunks = []

  for (const doc of documents) {
    const text = doc.content

    for (let i = 0; i < text.length; i += chunkSize - overlap) {
      chunks.push({
        text: text.substring(i, i + chunkSize),
        metadata: {
          document_id: doc.id,
          title: doc.title,
          source: source,
          chunk_index: chunks.length,
          created_at: new Date().toISOString()
        }
      })
    }
  }

  return { chunks }
}
```

```yaml theme={null}

  # 2. Embed chunks (parallel)
  - name: embed
    operation: rag
    config:
      action: embed
      text: ${chunk.output.chunks.map(c => c.text)}
      metadata: ${chunk.output.chunks.map(c => c.metadata)}
      namespace: documents

  # 3. Store metadata in D1
  - name: store-metadata
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
        VALUES (?, ?, ?, ?, ?)
      params: ${chunk.output.chunks.map(c => [
        c.metadata.document_id + '-' + c.metadata.chunk_index,
        c.metadata.document_id,
        c.text,
        JSON.stringify(c.metadata),
        c.metadata.created_at
      ])}

output:
  chunks_created: ${chunk.output.chunks.length}
  embedding_status: ${embed.output}
```

### 2. Question Answering Ensemble

<Tabs>
  <Tab title="YAML">
    ```yaml theme={null}
    ensemble: rag-qa

    inputs:
      question:
        type: string
        required: true
      topK:
        type: number
        default: 5
      filters:
        type: object

    agents:
      # 1. Search relevant chunks
      - name: search
        operation: rag
        config:
          action: search
          query: ${input.question}
          topK: ${input.topK}
          namespace: documents
          filter: ${input.filters}
          includeMetadata: true

      # 2. Generate answer
      - name: answer
        operation: think
        config:
          provider: openai
          model: gpt-4o
          temperature: 0.3
          prompt: |
            You are a helpful assistant. Answer the question using ONLY the provided context.
            If the answer is not in the context, say "I don't have enough information."

            Context:
            ${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}

            Question: ${input.question}

            Provide a clear, accurate answer with citations like [1], [2] etc.

      # 3. Format response with sources
      - name: format
        operation: code
        config:
          script: scripts/format-rag-response
        input:
          answer: ${answer.output}
          search_results: ${search.output.results}
          question: ${input.question}

    output:
      response: ${format.output}
    ```
  </Tab>

  <Tab title="TypeScript">
    ```typescript theme={null}
    import { createEnsemble, step } from '@anthropic/conductor'

    interface RAGInput {
      question: string
      topK?: number
      filters?: Record<string, unknown>
    }

    const ragQA = createEnsemble('rag-qa')
      .setInput<RAGInput>()

      // 1. Search relevant chunks
      .addStep(
        step('search')
          .operation('rag')
          .config({
            action: 'search',
            query: '${input.question}',
            topK: '${input.topK || 5}',
            namespace: 'documents',
            filter: '${input.filters}',
            includeMetadata: true
          })
      )

      // 2. Generate answer
      .addStep(
        step('answer')
          .operation('think')
          .config({
            provider: 'openai',
            model: 'gpt-4o',
            temperature: 0.3,
            prompt: `
              You are a helpful assistant. Answer the question using ONLY the provided context.
              If the answer is not in the context, say "I don't have enough information."

              Context:
              \${search.output.results.map((r, i) => \`[\${i+1}] \${r.text}\`).join('\\n\\n')}

              Question: \${input.question}

              Provide a clear, accurate answer with citations like [1], [2] etc.
            `
          })
      )

      // 3. Format response with sources
      .addStep(
        step('format')
          .operation('code')
          .config({ script: 'scripts/format-rag-response' })
          .input({
            answer: '${answer.output}',
            search_results: '${search.output.results}',
            question: '${input.question}'
          })
      )

      .build()

    export default ragQA
    ```
  </Tab>
</Tabs>

```typescript theme={null}
// scripts/format-rag-response.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function formatRagResponse(context: AgentExecutionContext) {
  const { answer, search_results, question } = context.input

  return {
    answer: answer,
    sources: search_results.map((r: any, i: number) => ({
      citation: i + 1,
      text: r.text.substring(0, 200) + '...',
      title: r.metadata.title,
      document_id: r.metadata.document_id,
      relevance: r.score
    })),
    metadata: {
      question: question,
      num_sources: search_results.length,
      timestamp: new Date().toISOString()
    }
  }
}
```

### 3. Hybrid Search Enhancement

````yaml theme={null}
ensemble: rag-qa-hybrid

agents:
  # Vector search
  - name: vector-search
    operation: rag
    config:
      action: search
      query: ${input.question}
      topK: 10
      namespace: documents

  # Keyword search
  - name: keyword-search
    operation: data
    config:
      backend: d1
      binding: DB
      operation: query
      sql: |
        SELECT id, document_id, text, metadata
        FROM document_chunks
        WHERE text LIKE ? OR text LIKE ?
        LIMIT 10
      params:
        - '%${input.question}%'
        - '%${input.question.split(' ').join('%')}%'

  # Rerank with LLM
  - name: rerank
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Rank these document chunks by relevance to: "${input.question}"
        Return the top 5 as a JSON array of indices (0-based).

        Vector results:
        ${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}

        Keyword results:
        ${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}

  # Select top results
  - name: select-top
    operation: code
    config:
      script: scripts/select-top-results
    input:
      vector_results: ${vector-search.output.results}
      keyword_results: ${keyword-search.output}
      top_indices: ${rerank.output}

```typescript
// scripts/select-top-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function selectTopResults(context: AgentExecutionContext) {
  const { vector_results, keyword_results, top_indices } = context.input

  const allResults = [
    ...vector_results,
    ...keyword_results
  ]

  const topIndices = JSON.parse(top_indices)
  return {
    results: topIndices.map((i: number) => allResults[i])
  }
}
````

```yaml theme={null}
  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Answer using these sources:
        ${select-top.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}
```

## Advanced Patterns

### Multi-Query RAG

````yaml theme={null}
agents:
  # Generate query variations
  - name: generate-queries
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Generate 3 different search queries for: ${input.question}
        Return as JSON array of strings.

  # Search with each query
  - name: search-1
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[0]}
      topK: 5

  - name: search-2
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[1]}
      topK: 5

  - name: search-3
    operation: rag
    config:
      action: search
      query: ${generate-queries.output[2]}
      topK: 5

  # Deduplicate and combine
  - name: combine
    operation: code
    config:
      script: scripts/combine-search-results
    input:
      results_1: ${search-1.output.results}
      results_2: ${search-2.output.results}
      results_3: ${search-3.output.results}

```typescript
// scripts/combine-search-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function combineSearchResults(context: AgentExecutionContext) {
  const { results_1, results_2, results_3 } = context.input

  const all = [
    ...results_1,
    ...results_2,
    ...results_3
  ]

  // Deduplicate by document ID + chunk index
  const unique = [...new Map(
    all.map((r: any) => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
  ).values()]

  // Sort by relevance score
  return {
    results: unique.sort((a: any, b: any) => b.score - a.score).slice(0, 5)
  }
}
````

### Conversational RAG

```yaml theme={null}
ensemble: rag-conversational

state:
  schema:
    history: array

agents:
  # Rewrite question with context
  - name: contextualize
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Given this conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Rewrite this follow-up question to be standalone:
        "${input.question}"

  # Search
  - name: search
    operation: rag
    config:
      action: search
      query: ${contextualize.output}
      topK: 5

  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Context:
        ${search.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

  # Update history
  - name: update-history
    operation: code
    state:
      use: [history]
      set:
        history: ${[
          ...state.history,
          { role: 'user', content: input.question },
          { role: 'assistant', content: answer.output }
        ]}
    config:
      script: scripts/return-updated-status
```

```typescript theme={null}
// scripts/return-updated-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function returnUpdatedStatus(context: AgentExecutionContext) {
  return { updated: true }
}
```

## Production Considerations

### Error Handling

```yaml theme={null}
agents:
  - name: search
    operation: rag
    retry:
      maxAttempts: 3
      backoff: exponential

  - name: fallback-answer
    condition: ${search.failed}
    operation: think
    config:
      prompt: |
        You don't have access to the knowledge base.
        Politely explain you can't answer: ${input.question}
```

### Caching

```yaml theme={null}
agents:
  - name: search
    operation: rag
    cache:
      ttl: 3600
      key: rag-search-${hash(input.question)}

  - name: answer
    operation: think
    cache:
      ttl: 3600
      key: rag-answer-${hash(input.question + search.output)}
```

### Monitoring

```yaml theme={null}
agents:
  - name: answer
    operation: think

  - name: log-interaction
    operation: data
    config:
      backend: d1
      binding: DB
      operation: execute
      sql: |
        INSERT INTO rag_interactions (
          question, answer, num_sources, avg_relevance, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.question}
        - ${answer.output}
        - ${search.output.results.length}
        - ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
        - ${Date.now()}
```

## Database Schema

```sql theme={null}
-- Document chunks metadata
CREATE TABLE document_chunks (
  id TEXT PRIMARY KEY,
  document_id TEXT NOT NULL,
  text TEXT NOT NULL,
  metadata TEXT NOT NULL,
  created_at TEXT NOT NULL,
  INDEX idx_document_id (document_id),
  INDEX idx_created_at (created_at)
);

-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
  text,
  content=document_chunks
);

-- RAG interactions log
CREATE TABLE rag_interactions (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  question TEXT NOT NULL,
  answer TEXT NOT NULL,
  num_sources INTEGER NOT NULL,
  avg_relevance REAL NOT NULL,
  timestamp INTEGER NOT NULL,
  INDEX idx_timestamp (timestamp)
);
```

## Deployment

```yaml theme={null}
# wrangler.toml
[ai]
binding = "AI"

[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"

[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"
```

## Testing

```typescript theme={null}
describe('RAG Pipeline', () => {
  it('should ingest and retrieve documents', async () => {
    // Ingest
    const ingestResult = await conductor.execute('ingest-documents', {
      documents: [
        {
          id: 'doc-1',
          title: 'Test Document',
          content: 'This is a test document about RAG pipelines.'
        }
      ],
      source: 'test'
    });

    expect(ingestResult.chunks_created).toBeGreaterThan(0);

    // Query
    const qaResult = await conductor.execute('rag-qa', {
      question: 'What is this document about?'
    });

    expect(qaResult.response.answer).toContain('RAG');
    expect(qaResult.response.sources.length).toBeGreaterThan(0);
  });
});
```

## Performance Tips

1. **Chunk size**: 1000-1500 characters with 200-character overlap
2. **Top K**: Start with 5, increase if answers are incomplete
3. **Cache aggressively**: 1 hour for common questions
4. **Use filters**: Filter by date, category, etc. for faster search
5. **Monitor relevance**: Log average relevance scores

## Next Steps

<CardGroup cols={2}>
  <Card title="RAG Operation" icon="database" href="/conductor/starter-kit/built-in/rag">
    RAG operation reference
  </Card>

  <Card title="Vectorize" icon="vector-square" href="/conductor/operations/storage">
    Vector storage
  </Card>

  <Card title="Multi-Agent Analysis" icon="users" href="/conductor/playbooks/multi-agent-analysis">
    Multi-agent patterns
  </Card>

  <Card title="Document Intelligence" icon="file-magnifying-glass" href="/conductor/playbooks/document-intelligence">
    Document processing
  </Card>
</CardGroup>
