RAG Pipeline Playbook
Complete end-to-end RAG implementation. Document ingestion, vector search, and answer generation.Architecture
Copy
Documents → Chunk → Embed → Store (Vectorize)
↓
User Question → Embed → Search → Generate Answer
Complete Implementation
1. Document Ingestion Ensemble
Copy
ensemble: ingest-documents
inputs:
documents:
type: array
required: true
source:
type: string
required: true
agents:
# 1. Chunk documents
- name: chunk
operation: code
config:
code: |
const chunkSize = 1000;
const overlap = 200;
const chunks = [];
for (const doc of ${input.documents}) {
const text = doc.content;
for (let i = 0; i < text.length; i += chunkSize - overlap) {
chunks.push({
text: text.substring(i, i + chunkSize),
metadata: {
document_id: doc.id,
title: doc.title,
source: ${input.source},
chunk_index: chunks.length,
created_at: new Date().toISOString()
}
});
}
}
return { chunks };
# 2. Embed chunks (parallel)
- name: embed
agent: rag
config:
action: embed
text: ${chunk.output.chunks.map(c => c.text)}
metadata: ${chunk.output.chunks.map(c => c.metadata)}
namespace: documents
# 3. Store metadata in D1
- name: store-metadata
operation: storage
config:
type: d1
query: |
INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
VALUES (?, ?, ?, ?, ?)
params: ${chunk.output.chunks.map(c => [
c.metadata.document_id + '-' + c.metadata.chunk_index,
c.metadata.document_id,
c.text,
JSON.stringify(c.metadata),
c.metadata.created_at
])}
output:
chunks_created: ${chunk.output.chunks.length}
embedding_status: ${embed.output}
2. Question Answering Ensemble
Copy
ensemble: rag-qa
inputs:
question:
type: string
required: true
topK:
type: number
default: 5
filters:
type: object
agents:
# 1. Search relevant chunks
- name: search
agent: rag
config:
action: search
query: ${input.question}
topK: ${input.topK}
namespace: documents
filter: ${input.filters}
includeMetadata: true
# 2. Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
temperature: 0.3
prompt: |
You are a helpful assistant. Answer the question using ONLY the provided context.
If the answer is not in the context, say "I don't have enough information."
Context:
${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}
Question: ${input.question}
Provide a clear, accurate answer with citations like [1], [2] etc.
# 3. Format response with sources
- name: format
operation: code
config:
code: |
return {
answer: ${answer.output},
sources: ${search.output.results}.map((r, i) => ({
citation: i + 1,
text: r.text.substring(0, 200) + '...',
title: r.metadata.title,
document_id: r.metadata.document_id,
relevance: r.score
})),
metadata: {
question: ${input.question},
num_sources: ${search.output.results.length},
timestamp: new Date().toISOString()
}
};
output:
response: ${format.output}
3. Hybrid Search Enhancement
Copy
ensemble: rag-qa-hybrid
agents:
# Vector search
- name: vector-search
agent: rag
config:
action: search
query: ${input.question}
topK: 10
namespace: documents
# Keyword search
- name: keyword-search
operation: storage
config:
type: d1
query: |
SELECT id, document_id, text, metadata
FROM document_chunks
WHERE text LIKE ? OR text LIKE ?
LIMIT 10
params:
- '%${input.question}%'
- '%${input.question.split(' ').join('%')}%'
# Rerank with LLM
- name: rerank
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Rank these document chunks by relevance to: "${input.question}"
Return the top 5 as a JSON array of indices (0-based).
Vector results:
${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}
Keyword results:
${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}
# Select top results
- name: select-top
operation: code
config:
code: |
const allResults = [
...${vector-search.output.results},
...${keyword-search.output}
];
const topIndices = JSON.parse(${rerank.output});
return {
results: topIndices.map(i => allResults[i])
};
# Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Answer using these sources:
${select-top.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
Advanced Patterns
Multi-Query RAG
Copy
agents:
# Generate query variations
- name: generate-queries
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Generate 3 different search queries for: ${input.question}
Return as JSON array of strings.
# Search with each query
- name: search-1
agent: rag
config:
action: search
query: ${generate-queries.output[0]}
topK: 5
- name: search-2
agent: rag
config:
action: search
query: ${generate-queries.output[1]}
topK: 5
- name: search-3
agent: rag
config:
action: search
query: ${generate-queries.output[2]}
topK: 5
# Deduplicate and combine
- name: combine
operation: code
config:
code: |
const all = [
...${search-1.output.results},
...${search-2.output.results},
...${search-3.output.results}
];
// Deduplicate by document ID + chunk index
const unique = [...new Map(
all.map(r => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
).values()];
// Sort by relevance score
return {
results: unique.sort((a, b) => b.score - a.score).slice(0, 5)
};
Conversational RAG
Copy
ensemble: rag-conversational
state:
schema:
history: array
agents:
# Rewrite question with context
- name: contextualize
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Given this conversation history:
${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}
Rewrite this follow-up question to be standalone:
"${input.question}"
# Search
- name: search
agent: rag
config:
action: search
query: ${contextualize.output}
topK: 5
# Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Conversation history:
${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}
Context:
${search.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
# Update history
- name: update-history
operation: code
state:
use: [history]
set:
history: ${[
...state.history,
{ role: 'user', content: input.question },
{ role: 'assistant', content: answer.output }
]}
config:
code: return { updated: true };
Production Considerations
Error Handling
Copy
agents:
- name: search
agent: rag
retry:
maxAttempts: 3
backoff: exponential
- name: fallback-answer
condition: ${search.failed}
operation: think
config:
prompt: |
You don't have access to the knowledge base.
Politely explain you can't answer: ${input.question}
Caching
Copy
agents:
- name: search
agent: rag
cache:
ttl: 3600
key: rag-search-${hash(input.question)}
- name: answer
operation: think
cache:
ttl: 3600
key: rag-answer-${hash(input.question + search.output)}
Monitoring
Copy
agents:
- name: answer
operation: think
- name: log-interaction
operation: storage
config:
type: d1
query: |
INSERT INTO rag_interactions (
question, answer, num_sources, avg_relevance, timestamp
) VALUES (?, ?, ?, ?, ?)
params:
- ${input.question}
- ${answer.output}
- ${search.output.results.length}
- ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
- ${Date.now()}
Database Schema
Copy
-- Document chunks metadata
CREATE TABLE document_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
text TEXT NOT NULL,
metadata TEXT NOT NULL,
created_at TEXT NOT NULL,
INDEX idx_document_id (document_id),
INDEX idx_created_at (created_at)
);
-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
text,
content=document_chunks
);
-- RAG interactions log
CREATE TABLE rag_interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question TEXT NOT NULL,
answer TEXT NOT NULL,
num_sources INTEGER NOT NULL,
avg_relevance REAL NOT NULL,
timestamp INTEGER NOT NULL,
INDEX idx_timestamp (timestamp)
);
Deployment
Copy
# wrangler.toml
[ai]
binding = "AI"
[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"
[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"
Testing
Copy
describe('RAG Pipeline', () => {
it('should ingest and retrieve documents', async () => {
// Ingest
const ingestResult = await conductor.execute('ingest-documents', {
documents: [
{
id: 'doc-1',
title: 'Test Document',
content: 'This is a test document about RAG pipelines.'
}
],
source: 'test'
});
expect(ingestResult.chunks_created).toBeGreaterThan(0);
// Query
const qaResult = await conductor.execute('rag-qa', {
question: 'What is this document about?'
});
expect(qaResult.response.answer).toContain('RAG');
expect(qaResult.response.sources.length).toBeGreaterThan(0);
});
});
Performance Tips
- Chunk size: 1000-1500 characters with 200-character overlap
- Top K: Start with 5, increase if answers are incomplete
- Cache aggressively: 1 hour for common questions
- Use filters: Filter by date, category, etc. for faster search
- Monitor relevance: Log average relevance scores

