Architecture
Copy
Documents Chunk Embed Store (Vectorize)
User Question Embed Search Generate Answer
Complete Implementation
1. Document Ingestion Ensemble
Copy
ensemble: ingest-documents
inputs:
documents:
type: array
required: true
source:
type: string
required: true
agents:
# 1. Chunk documents
- name: chunk
operation: code
config:
script: scripts/chunk-documents
input:
documents: ${input.documents}
source: ${input.source}
Copy
// scripts/chunk-documents.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function chunkDocuments(context: AgentExecutionContext) {
const { documents, source } = context.input
const chunkSize = 1000
const overlap = 200
const chunks = []
for (const doc of documents) {
const text = doc.content
for (let i = 0; i < text.length; i += chunkSize - overlap) {
chunks.push({
text: text.substring(i, i + chunkSize),
metadata: {
document_id: doc.id,
title: doc.title,
source: source,
chunk_index: chunks.length,
created_at: new Date().toISOString()
}
})
}
}
return { chunks }
}
Copy
# 2. Embed chunks (parallel)
- name: embed
operation: rag
config:
action: embed
text: ${chunk.output.chunks.map(c => c.text)}
metadata: ${chunk.output.chunks.map(c => c.metadata)}
namespace: documents
# 3. Store metadata in D1
- name: store-metadata
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
VALUES (?, ?, ?, ?, ?)
params: ${chunk.output.chunks.map(c => [
c.metadata.document_id + '-' + c.metadata.chunk_index,
c.metadata.document_id,
c.text,
JSON.stringify(c.metadata),
c.metadata.created_at
])}
output:
chunks_created: ${chunk.output.chunks.length}
embedding_status: ${embed.output}
2. Question Answering Ensemble
- YAML
- TypeScript
Copy
ensemble: rag-qa
inputs:
question:
type: string
required: true
topK:
type: number
default: 5
filters:
type: object
agents:
# 1. Search relevant chunks
- name: search
operation: rag
config:
action: search
query: ${input.question}
topK: ${input.topK}
namespace: documents
filter: ${input.filters}
includeMetadata: true
# 2. Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
temperature: 0.3
prompt: |
You are a helpful assistant. Answer the question using ONLY the provided context.
If the answer is not in the context, say "I don't have enough information."
Context:
${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}
Question: ${input.question}
Provide a clear, accurate answer with citations like [1], [2] etc.
# 3. Format response with sources
- name: format
operation: code
config:
script: scripts/format-rag-response
input:
answer: ${answer.output}
search_results: ${search.output.results}
question: ${input.question}
output:
response: ${format.output}
Copy
import { createEnsemble, step } from '@anthropic/conductor'
interface RAGInput {
question: string
topK?: number
filters?: Record<string, unknown>
}
const ragQA = createEnsemble('rag-qa')
.setInput<RAGInput>()
// 1. Search relevant chunks
.addStep(
step('search')
.operation('rag')
.config({
action: 'search',
query: '${input.question}',
topK: '${input.topK || 5}',
namespace: 'documents',
filter: '${input.filters}',
includeMetadata: true
})
)
// 2. Generate answer
.addStep(
step('answer')
.operation('think')
.config({
provider: 'openai',
model: 'gpt-4o',
temperature: 0.3,
prompt: `
You are a helpful assistant. Answer the question using ONLY the provided context.
If the answer is not in the context, say "I don't have enough information."
Context:
\${search.output.results.map((r, i) => \`[\${i+1}] \${r.text}\`).join('\\n\\n')}
Question: \${input.question}
Provide a clear, accurate answer with citations like [1], [2] etc.
`
})
)
// 3. Format response with sources
.addStep(
step('format')
.operation('code')
.config({ script: 'scripts/format-rag-response' })
.input({
answer: '${answer.output}',
search_results: '${search.output.results}',
question: '${input.question}'
})
)
.build()
export default ragQA
Copy
// scripts/format-rag-response.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function formatRagResponse(context: AgentExecutionContext) {
const { answer, search_results, question } = context.input
return {
answer: answer,
sources: search_results.map((r: any, i: number) => ({
citation: i + 1,
text: r.text.substring(0, 200) + '...',
title: r.metadata.title,
document_id: r.metadata.document_id,
relevance: r.score
})),
metadata: {
question: question,
num_sources: search_results.length,
timestamp: new Date().toISOString()
}
}
}
3. Hybrid Search Enhancement
Copy
ensemble: rag-qa-hybrid
agents:
# Vector search
- name: vector-search
operation: rag
config:
action: search
query: ${input.question}
topK: 10
namespace: documents
# Keyword search
- name: keyword-search
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT id, document_id, text, metadata
FROM document_chunks
WHERE text LIKE ? OR text LIKE ?
LIMIT 10
params:
- '%${input.question}%'
- '%${input.question.split(' ').join('%')}%'
# Rerank with LLM
- name: rerank
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Rank these document chunks by relevance to: "${input.question}"
Return the top 5 as a JSON array of indices (0-based).
Vector results:
${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}
Keyword results:
${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}
# Select top results
- name: select-top
operation: code
config:
script: scripts/select-top-results
input:
vector_results: ${vector-search.output.results}
keyword_results: ${keyword-search.output}
top_indices: ${rerank.output}
```typescript
// scripts/select-top-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function selectTopResults(context: AgentExecutionContext) {
const { vector_results, keyword_results, top_indices } = context.input
const allResults = [
...vector_results,
...keyword_results
]
const topIndices = JSON.parse(top_indices)
return {
results: topIndices.map((i: number) => allResults[i])
}
}
Copy
# Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Answer using these sources:
${select-top.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
Advanced Patterns
Multi-Query RAG
Copy
agents:
# Generate query variations
- name: generate-queries
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Generate 3 different search queries for: ${input.question}
Return as JSON array of strings.
# Search with each query
- name: search-1
operation: rag
config:
action: search
query: ${generate-queries.output[0]}
topK: 5
- name: search-2
operation: rag
config:
action: search
query: ${generate-queries.output[1]}
topK: 5
- name: search-3
operation: rag
config:
action: search
query: ${generate-queries.output[2]}
topK: 5
# Deduplicate and combine
- name: combine
operation: code
config:
script: scripts/combine-search-results
input:
results_1: ${search-1.output.results}
results_2: ${search-2.output.results}
results_3: ${search-3.output.results}
```typescript
// scripts/combine-search-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function combineSearchResults(context: AgentExecutionContext) {
const { results_1, results_2, results_3 } = context.input
const all = [
...results_1,
...results_2,
...results_3
]
// Deduplicate by document ID + chunk index
const unique = [...new Map(
all.map((r: any) => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
).values()]
// Sort by relevance score
return {
results: unique.sort((a: any, b: any) => b.score - a.score).slice(0, 5)
}
}
Conversational RAG
Copy
ensemble: rag-conversational
state:
schema:
history: array
agents:
# Rewrite question with context
- name: contextualize
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Given this conversation history:
${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}
Rewrite this follow-up question to be standalone:
"${input.question}"
# Search
- name: search
operation: rag
config:
action: search
query: ${contextualize.output}
topK: 5
# Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Conversation history:
${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}
Context:
${search.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
# Update history
- name: update-history
operation: code
state:
use: [history]
set:
history: ${[
...state.history,
{ role: 'user', content: input.question },
{ role: 'assistant', content: answer.output }
]}
config:
script: scripts/return-updated-status
Copy
// scripts/return-updated-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function returnUpdatedStatus(context: AgentExecutionContext) {
return { updated: true }
}
Production Considerations
Error Handling
Copy
agents:
- name: search
operation: rag
retry:
maxAttempts: 3
backoff: exponential
- name: fallback-answer
condition: ${search.failed}
operation: think
config:
prompt: |
You don't have access to the knowledge base.
Politely explain you can't answer: ${input.question}
Caching
Copy
agents:
- name: search
operation: rag
cache:
ttl: 3600
key: rag-search-${hash(input.question)}
- name: answer
operation: think
cache:
ttl: 3600
key: rag-answer-${hash(input.question + search.output)}
Monitoring
Copy
agents:
- name: answer
operation: think
- name: log-interaction
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO rag_interactions (
question, answer, num_sources, avg_relevance, timestamp
) VALUES (?, ?, ?, ?, ?)
params:
- ${input.question}
- ${answer.output}
- ${search.output.results.length}
- ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
- ${Date.now()}
Database Schema
Copy
-- Document chunks metadata
CREATE TABLE document_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
text TEXT NOT NULL,
metadata TEXT NOT NULL,
created_at TEXT NOT NULL,
INDEX idx_document_id (document_id),
INDEX idx_created_at (created_at)
);
-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
text,
content=document_chunks
);
-- RAG interactions log
CREATE TABLE rag_interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question TEXT NOT NULL,
answer TEXT NOT NULL,
num_sources INTEGER NOT NULL,
avg_relevance REAL NOT NULL,
timestamp INTEGER NOT NULL,
INDEX idx_timestamp (timestamp)
);
Deployment
Copy
# wrangler.toml
[ai]
binding = "AI"
[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"
[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"
Testing
Copy
describe('RAG Pipeline', () => {
it('should ingest and retrieve documents', async () => {
// Ingest
const ingestResult = await conductor.execute('ingest-documents', {
documents: [
{
id: 'doc-1',
title: 'Test Document',
content: 'This is a test document about RAG pipelines.'
}
],
source: 'test'
});
expect(ingestResult.chunks_created).toBeGreaterThan(0);
// Query
const qaResult = await conductor.execute('rag-qa', {
question: 'What is this document about?'
});
expect(qaResult.response.answer).toContain('RAG');
expect(qaResult.response.sources.length).toBeGreaterThan(0);
});
});
Performance Tips
- Chunk size: 1000-1500 characters with 200-character overlap
- Top K: Start with 5, increase if answers are incomplete
- Cache aggressively: 1 hour for common questions
- Use filters: Filter by date, category, etc. for faster search
- Monitor relevance: Log average relevance scores

