Skip to main content

RAG Pipeline Playbook

Complete end-to-end RAG implementation. Document ingestion, vector search, and answer generation.

Architecture

Documents → Chunk → Embed → Store (Vectorize)

User Question → Embed → Search → Generate Answer

Complete Implementation

1. Document Ingestion Ensemble

ensemble: ingest-documents

inputs:
  documents:
    type: array
    required: true
  source:
    type: string
    required: true

agents:
  # 1. Chunk documents
  - name: chunk
    operation: code
    config:
      code: |
        const chunkSize = 1000;
        const overlap = 200;
        const chunks = [];

        for (const doc of ${input.documents}) {
          const text = doc.content;

          for (let i = 0; i < text.length; i += chunkSize - overlap) {
            chunks.push({
              text: text.substring(i, i + chunkSize),
              metadata: {
                document_id: doc.id,
                title: doc.title,
                source: ${input.source},
                chunk_index: chunks.length,
                created_at: new Date().toISOString()
              }
            });
          }
        }

        return { chunks };

  # 2. Embed chunks (parallel)
  - name: embed
    agent: rag
    config:
      action: embed
      text: ${chunk.output.chunks.map(c => c.text)}
      metadata: ${chunk.output.chunks.map(c => c.metadata)}
      namespace: documents

  # 3. Store metadata in D1
  - name: store-metadata
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
        VALUES (?, ?, ?, ?, ?)
      params: ${chunk.output.chunks.map(c => [
        c.metadata.document_id + '-' + c.metadata.chunk_index,
        c.metadata.document_id,
        c.text,
        JSON.stringify(c.metadata),
        c.metadata.created_at
      ])}

output:
  chunks_created: ${chunk.output.chunks.length}
  embedding_status: ${embed.output}

2. Question Answering Ensemble

ensemble: rag-qa

inputs:
  question:
    type: string
    required: true
  topK:
    type: number
    default: 5
  filters:
    type: object

agents:
  # 1. Search relevant chunks
  - name: search
    agent: rag
    config:
      action: search
      query: ${input.question}
      topK: ${input.topK}
      namespace: documents
      filter: ${input.filters}
      includeMetadata: true

  # 2. Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      temperature: 0.3
      prompt: |
        You are a helpful assistant. Answer the question using ONLY the provided context.
        If the answer is not in the context, say "I don't have enough information."

        Context:
        ${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}

        Question: ${input.question}

        Provide a clear, accurate answer with citations like [1], [2] etc.

  # 3. Format response with sources
  - name: format
    operation: code
    config:
      code: |
        return {
          answer: ${answer.output},
          sources: ${search.output.results}.map((r, i) => ({
            citation: i + 1,
            text: r.text.substring(0, 200) + '...',
            title: r.metadata.title,
            document_id: r.metadata.document_id,
            relevance: r.score
          })),
          metadata: {
            question: ${input.question},
            num_sources: ${search.output.results.length},
            timestamp: new Date().toISOString()
          }
        };

output:
  response: ${format.output}

3. Hybrid Search Enhancement

ensemble: rag-qa-hybrid

agents:
  # Vector search
  - name: vector-search
    agent: rag
    config:
      action: search
      query: ${input.question}
      topK: 10
      namespace: documents

  # Keyword search
  - name: keyword-search
    operation: storage
    config:
      type: d1
      query: |
        SELECT id, document_id, text, metadata
        FROM document_chunks
        WHERE text LIKE ? OR text LIKE ?
        LIMIT 10
      params:
        - '%${input.question}%'
        - '%${input.question.split(' ').join('%')}%'

  # Rerank with LLM
  - name: rerank
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Rank these document chunks by relevance to: "${input.question}"
        Return the top 5 as a JSON array of indices (0-based).

        Vector results:
        ${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}

        Keyword results:
        ${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}

  # Select top results
  - name: select-top
    operation: code
    config:
      code: |
        const allResults = [
          ...${vector-search.output.results},
          ...${keyword-search.output}
        ];

        const topIndices = JSON.parse(${rerank.output});
        return {
          results: topIndices.map(i => allResults[i])
        };

  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Answer using these sources:
        ${select-top.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

Advanced Patterns

Multi-Query RAG

agents:
  # Generate query variations
  - name: generate-queries
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Generate 3 different search queries for: ${input.question}
        Return as JSON array of strings.

  # Search with each query
  - name: search-1
    agent: rag
    config:
      action: search
      query: ${generate-queries.output[0]}
      topK: 5

  - name: search-2
    agent: rag
    config:
      action: search
      query: ${generate-queries.output[1]}
      topK: 5

  - name: search-3
    agent: rag
    config:
      action: search
      query: ${generate-queries.output[2]}
      topK: 5

  # Deduplicate and combine
  - name: combine
    operation: code
    config:
      code: |
        const all = [
          ...${search-1.output.results},
          ...${search-2.output.results},
          ...${search-3.output.results}
        ];

        // Deduplicate by document ID + chunk index
        const unique = [...new Map(
          all.map(r => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
        ).values()];

        // Sort by relevance score
        return {
          results: unique.sort((a, b) => b.score - a.score).slice(0, 5)
        };

Conversational RAG

ensemble: rag-conversational

state:
  schema:
    history: array

agents:
  # Rewrite question with context
  - name: contextualize
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Given this conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Rewrite this follow-up question to be standalone:
        "${input.question}"

  # Search
  - name: search
    agent: rag
    config:
      action: search
      query: ${contextualize.output}
      topK: 5

  # Generate answer
  - name: answer
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: |
        Conversation history:
        ${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}

        Context:
        ${search.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

  # Update history
  - name: update-history
    operation: code
    state:
      use: [history]
      set:
        history: ${[
          ...state.history,
          { role: 'user', content: input.question },
          { role: 'assistant', content: answer.output }
        ]}
    config:
      code: return { updated: true };

Production Considerations

Error Handling

agents:
  - name: search
    agent: rag
    retry:
      maxAttempts: 3
      backoff: exponential

  - name: fallback-answer
    condition: ${search.failed}
    operation: think
    config:
      prompt: |
        You don't have access to the knowledge base.
        Politely explain you can't answer: ${input.question}

Caching

agents:
  - name: search
    agent: rag
    cache:
      ttl: 3600
      key: rag-search-${hash(input.question)}

  - name: answer
    operation: think
    cache:
      ttl: 3600
      key: rag-answer-${hash(input.question + search.output)}

Monitoring

agents:
  - name: answer
    operation: think

  - name: log-interaction
    operation: storage
    config:
      type: d1
      query: |
        INSERT INTO rag_interactions (
          question, answer, num_sources, avg_relevance, timestamp
        ) VALUES (?, ?, ?, ?, ?)
      params:
        - ${input.question}
        - ${answer.output}
        - ${search.output.results.length}
        - ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
        - ${Date.now()}

Database Schema

-- Document chunks metadata
CREATE TABLE document_chunks (
  id TEXT PRIMARY KEY,
  document_id TEXT NOT NULL,
  text TEXT NOT NULL,
  metadata TEXT NOT NULL,
  created_at TEXT NOT NULL,
  INDEX idx_document_id (document_id),
  INDEX idx_created_at (created_at)
);

-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
  text,
  content=document_chunks
);

-- RAG interactions log
CREATE TABLE rag_interactions (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  question TEXT NOT NULL,
  answer TEXT NOT NULL,
  num_sources INTEGER NOT NULL,
  avg_relevance REAL NOT NULL,
  timestamp INTEGER NOT NULL,
  INDEX idx_timestamp (timestamp)
);

Deployment

# wrangler.toml
[ai]
binding = "AI"

[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"

[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"

Testing

describe('RAG Pipeline', () => {
  it('should ingest and retrieve documents', async () => {
    // Ingest
    const ingestResult = await conductor.execute('ingest-documents', {
      documents: [
        {
          id: 'doc-1',
          title: 'Test Document',
          content: 'This is a test document about RAG pipelines.'
        }
      ],
      source: 'test'
    });

    expect(ingestResult.chunks_created).toBeGreaterThan(0);

    // Query
    const qaResult = await conductor.execute('rag-qa', {
      question: 'What is this document about?'
    });

    expect(qaResult.response.answer).toContain('RAG');
    expect(qaResult.response.sources.length).toBeGreaterThan(0);
  });
});

Performance Tips

  1. Chunk size: 1000-1500 characters with 200-character overlap
  2. Top K: Start with 5, increase if answers are incomplete
  3. Cache aggressively: 1 hour for common questions
  4. Use filters: Filter by date, category, etc. for faster search
  5. Monitor relevance: Log average relevance scores

Next Steps