Documentation Index
Fetch the complete documentation index at: https://docs.ensemble.ai/llms.txt
Use this file to discover all available pages before exploring further.
Architecture
Documents Chunk Embed Store (Vectorize)
User Question Embed Search Generate Answer
Complete Implementation
1. Document Ingestion Ensemble
ensemble: ingest-documents
inputs:
documents:
type: array
required: true
source:
type: string
required: true
agents:
# 1. Chunk documents
- name: chunk
operation: code
config:
script: scripts/chunk-documents
input:
documents: ${input.documents}
source: ${input.source}
// scripts/chunk-documents.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function chunkDocuments(context: AgentExecutionContext) {
const { documents, source } = context.input
const chunkSize = 1000
const overlap = 200
const chunks = []
for (const doc of documents) {
const text = doc.content
for (let i = 0; i < text.length; i += chunkSize - overlap) {
chunks.push({
text: text.substring(i, i + chunkSize),
metadata: {
document_id: doc.id,
title: doc.title,
source: source,
chunk_index: chunks.length,
created_at: new Date().toISOString()
}
})
}
}
return { chunks }
}
# 2. Embed chunks (parallel)
- name: embed
operation: rag
config:
action: embed
text: ${chunk.output.chunks.map(c => c.text)}
metadata: ${chunk.output.chunks.map(c => c.metadata)}
namespace: documents
# 3. Store metadata in D1
- name: store-metadata
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO document_chunks (id, document_id, text, metadata, created_at)
VALUES (?, ?, ?, ?, ?)
params: ${chunk.output.chunks.map(c => [
c.metadata.document_id + '-' + c.metadata.chunk_index,
c.metadata.document_id,
c.text,
JSON.stringify(c.metadata),
c.metadata.created_at
])}
output:
chunks_created: ${chunk.output.chunks.length}
embedding_status: ${embed.output}
2. Question Answering Ensemble
- YAML
- TypeScript
ensemble: rag-qa
inputs:
question:
type: string
required: true
topK:
type: number
default: 5
filters:
type: object
agents:
# 1. Search relevant chunks
- name: search
operation: rag
config:
action: search
query: ${input.question}
topK: ${input.topK}
namespace: documents
filter: ${input.filters}
includeMetadata: true
# 2. Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
temperature: 0.3
prompt: |
You are a helpful assistant. Answer the question using ONLY the provided context.
If the answer is not in the context, say "I don't have enough information."
Context:
${search.output.results.map((r, i) => `[${i+1}] ${r.text}`).join('\n\n')}
Question: ${input.question}
Provide a clear, accurate answer with citations like [1], [2] etc.
# 3. Format response with sources
- name: format
operation: code
config:
script: scripts/format-rag-response
input:
answer: ${answer.output}
search_results: ${search.output.results}
question: ${input.question}
output:
response: ${format.output}
import { createEnsemble, step } from '@anthropic/conductor'
interface RAGInput {
question: string
topK?: number
filters?: Record<string, unknown>
}
const ragQA = createEnsemble('rag-qa')
.setInput<RAGInput>()
// 1. Search relevant chunks
.addStep(
step('search')
.operation('rag')
.config({
action: 'search',
query: '${input.question}',
topK: '${input.topK || 5}',
namespace: 'documents',
filter: '${input.filters}',
includeMetadata: true
})
)
// 2. Generate answer
.addStep(
step('answer')
.operation('think')
.config({
provider: 'openai',
model: 'gpt-4o',
temperature: 0.3,
prompt: `
You are a helpful assistant. Answer the question using ONLY the provided context.
If the answer is not in the context, say "I don't have enough information."
Context:
\${search.output.results.map((r, i) => \`[\${i+1}] \${r.text}\`).join('\\n\\n')}
Question: \${input.question}
Provide a clear, accurate answer with citations like [1], [2] etc.
`
})
)
// 3. Format response with sources
.addStep(
step('format')
.operation('code')
.config({ script: 'scripts/format-rag-response' })
.input({
answer: '${answer.output}',
search_results: '${search.output.results}',
question: '${input.question}'
})
)
.build()
export default ragQA
// scripts/format-rag-response.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function formatRagResponse(context: AgentExecutionContext) {
const { answer, search_results, question } = context.input
return {
answer: answer,
sources: search_results.map((r: any, i: number) => ({
citation: i + 1,
text: r.text.substring(0, 200) + '...',
title: r.metadata.title,
document_id: r.metadata.document_id,
relevance: r.score
})),
metadata: {
question: question,
num_sources: search_results.length,
timestamp: new Date().toISOString()
}
}
}
3. Hybrid Search Enhancement
ensemble: rag-qa-hybrid
agents:
# Vector search
- name: vector-search
operation: rag
config:
action: search
query: ${input.question}
topK: 10
namespace: documents
# Keyword search
- name: keyword-search
operation: data
config:
backend: d1
binding: DB
operation: query
sql: |
SELECT id, document_id, text, metadata
FROM document_chunks
WHERE text LIKE ? OR text LIKE ?
LIMIT 10
params:
- '%${input.question}%'
- '%${input.question.split(' ').join('%')}%'
# Rerank with LLM
- name: rerank
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Rank these document chunks by relevance to: "${input.question}"
Return the top 5 as a JSON array of indices (0-based).
Vector results:
${vector-search.output.results.map((r, i) => `${i}. ${r.text}`).join('\n')}
Keyword results:
${keyword-search.output.map((r, i) => `${i+10}. ${r.text}`).join('\n')}
# Select top results
- name: select-top
operation: code
config:
script: scripts/select-top-results
input:
vector_results: ${vector-search.output.results}
keyword_results: ${keyword-search.output}
top_indices: ${rerank.output}
```typescript
// scripts/select-top-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function selectTopResults(context: AgentExecutionContext) {
const { vector_results, keyword_results, top_indices } = context.input
const allResults = [
...vector_results,
...keyword_results
]
const topIndices = JSON.parse(top_indices)
return {
results: topIndices.map((i: number) => allResults[i])
}
}
# Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Answer using these sources:
${select-top.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
Advanced Patterns
Multi-Query RAG
agents:
# Generate query variations
- name: generate-queries
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Generate 3 different search queries for: ${input.question}
Return as JSON array of strings.
# Search with each query
- name: search-1
operation: rag
config:
action: search
query: ${generate-queries.output[0]}
topK: 5
- name: search-2
operation: rag
config:
action: search
query: ${generate-queries.output[1]}
topK: 5
- name: search-3
operation: rag
config:
action: search
query: ${generate-queries.output[2]}
topK: 5
# Deduplicate and combine
- name: combine
operation: code
config:
script: scripts/combine-search-results
input:
results_1: ${search-1.output.results}
results_2: ${search-2.output.results}
results_3: ${search-3.output.results}
```typescript
// scripts/combine-search-results.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function combineSearchResults(context: AgentExecutionContext) {
const { results_1, results_2, results_3 } = context.input
const all = [
...results_1,
...results_2,
...results_3
]
// Deduplicate by document ID + chunk index
const unique = [...new Map(
all.map((r: any) => [r.metadata.document_id + '-' + r.metadata.chunk_index, r])
).values()]
// Sort by relevance score
return {
results: unique.sort((a: any, b: any) => b.score - a.score).slice(0, 5)
}
}
Conversational RAG
ensemble: rag-conversational
state:
schema:
history: array
agents:
# Rewrite question with context
- name: contextualize
operation: think
config:
provider: openai
model: gpt-4o-mini
prompt: |
Given this conversation history:
${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}
Rewrite this follow-up question to be standalone:
"${input.question}"
# Search
- name: search
operation: rag
config:
action: search
query: ${contextualize.output}
topK: 5
# Generate answer
- name: answer
operation: think
config:
provider: openai
model: gpt-4o
prompt: |
Conversation history:
${state.history.map(h => `${h.role}: ${h.content}`).join('\n')}
Context:
${search.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
# Update history
- name: update-history
operation: code
state:
use: [history]
set:
history: ${[
...state.history,
{ role: 'user', content: input.question },
{ role: 'assistant', content: answer.output }
]}
config:
script: scripts/return-updated-status
// scripts/return-updated-status.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function returnUpdatedStatus(context: AgentExecutionContext) {
return { updated: true }
}
Production Considerations
Error Handling
agents:
- name: search
operation: rag
retry:
maxAttempts: 3
backoff: exponential
- name: fallback-answer
condition: ${search.failed}
operation: think
config:
prompt: |
You don't have access to the knowledge base.
Politely explain you can't answer: ${input.question}
Caching
agents:
- name: search
operation: rag
cache:
ttl: 3600
key: rag-search-${hash(input.question)}
- name: answer
operation: think
cache:
ttl: 3600
key: rag-answer-${hash(input.question + search.output)}
Monitoring
agents:
- name: answer
operation: think
- name: log-interaction
operation: data
config:
backend: d1
binding: DB
operation: execute
sql: |
INSERT INTO rag_interactions (
question, answer, num_sources, avg_relevance, timestamp
) VALUES (?, ?, ?, ?, ?)
params:
- ${input.question}
- ${answer.output}
- ${search.output.results.length}
- ${search.output.results.reduce((sum, r) => sum + r.score, 0) / search.output.results.length}
- ${Date.now()}
Database Schema
-- Document chunks metadata
CREATE TABLE document_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
text TEXT NOT NULL,
metadata TEXT NOT NULL,
created_at TEXT NOT NULL,
INDEX idx_document_id (document_id),
INDEX idx_created_at (created_at)
);
-- Full-text search
CREATE VIRTUAL TABLE document_chunks_fts USING fts5(
text,
content=document_chunks
);
-- RAG interactions log
CREATE TABLE rag_interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question TEXT NOT NULL,
answer TEXT NOT NULL,
num_sources INTEGER NOT NULL,
avg_relevance REAL NOT NULL,
timestamp INTEGER NOT NULL,
INDEX idx_timestamp (timestamp)
);
Deployment
# wrangler.toml
[ai]
binding = "AI"
[[vectorize]]
binding = "VECTORIZE"
index_name = "documents"
[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "your-db-id"
Testing
describe('RAG Pipeline', () => {
it('should ingest and retrieve documents', async () => {
// Ingest
const ingestResult = await conductor.execute('ingest-documents', {
documents: [
{
id: 'doc-1',
title: 'Test Document',
content: 'This is a test document about RAG pipelines.'
}
],
source: 'test'
});
expect(ingestResult.chunks_created).toBeGreaterThan(0);
// Query
const qaResult = await conductor.execute('rag-qa', {
question: 'What is this document about?'
});
expect(qaResult.response.answer).toContain('RAG');
expect(qaResult.response.sources.length).toBeGreaterThan(0);
});
});
Performance Tips
- Chunk size: 1000-1500 characters with 200-character overlap
- Top K: Start with 5, increase if answers are incomplete
- Cache aggressively: 1 hour for common questions
- Use filters: Filter by date, category, etc. for faster search
- Monitor relevance: Log average relevance scores
Next Steps
RAG Operation
RAG operation reference
Vectorize
Vector storage
Multi-Agent Analysis
Multi-agent patterns
Document Intelligence
Document processing

