Skip to main content
Define and version reusable SQL queries as components for consistent data operations across ensembles.

Overview

Query components enable you to:
  • Reuse SQL queries across multiple agents and ensembles
  • Version queries for reproducibility and rollback
  • Organize complex multi-step queries
  • Share queries across teams
  • A/B test different query strategies

Quick Start

1. Create a Query Component

Create a SQL file:
-- queries/get-customer-profile.sql
SELECT
  c.id,
  c.name,
  c.email,
  c.created_at,
  COUNT(o.id) as order_count,
  SUM(o.total) as lifetime_value
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE c.email = ${email}
GROUP BY c.id, c.name, c.email, c.created_at
LIMIT 1

2. Add to Edgit

edgit components add get-customer-profile queries/get-customer-profile.sql query
edgit tag create get-customer-profile v1.0.0
edgit tag set get-customer-profile production v1.0.0
edgit push --tags --force

3. Reference in Your Ensemble

ensemble: customer-lookup

agents:
  - name: fetch-profile
    operation: data
    config:
      database: "postgres"
      # Reference the query component
      query: "query://[email protected]"
      params:
        email: ${input.customer_email}

inputs:
  customer_email:
    type: string
    required: true

outputs:
  profile: ${fetch-profile.output}

URI Format and Versioning

All query components use the standardized URI format:
query://{path}[@{version}]
Format breakdown:
  • query:// - Protocol identifier for query components
  • {path} - Logical path to the query (e.g., get-customer-profile, analytics/daily-sales)
  • [@{version}] - Optional version identifier (defaults to @latest)
Version format:
  • @latest - Always uses the most recent version
  • @v1 - Uses latest patch of major version (v1.x.x)
  • @v1.0.0 - Specific semantic version (immutable)
  • @prod - Custom tag for production queries
  • @staging - Custom tag for staging queries

Example URIs

# Always latest version
query: "query://get-customer-profile"
query: "query://get-customer-profile@latest"

# Specific semantic version
query: "query://[email protected]"
query: "query://[email protected]"

# Custom tags
query: "query://get-customer-profile@prod"
query: "query://get-customer-profile@staging"

# Nested paths
query: "query://analytics/daily-sales@v1"
query: "query://reports/customer-summary@v1"

How to Reference in Ensembles

There are three ways to reference queries in your ensembles: Use the query:// URI format to reference versioned query components:
ensemble: user-analyzer

agents:
  - name: get-users
    operation: data
    config:
      database: "postgres"
      query: "query://[email protected]"
      params:
        days: 30

outputs:
  users: ${get-users.output}

2. Template Expression Format

Use ${components.query_name@version} to embed query references:
ensemble: complex-report

agents:
  - name: generate-report
    operation: data
    config:
      database: "analytics"
      query: |
        -- Base query from component
        ${components.base-report-query@v1}

        -- Add custom filters
        WHERE created_at >= ${input.start_date}
          AND region = ${input.region}
      params:
        start_date: ${input.start_date}
        region: ${input.region}

inputs:
  start_date:
    type: string
  region:
    type: string

outputs:
  report: ${generate-report.output}

3. Inline Query

For simple operations or during development, use inline SQL directly:
ensemble: simple-lookup

agents:
  - name: get-user
    operation: data
    config:
      database: "postgres"
      query: |
        SELECT id, name, email, created_at
        FROM users
        WHERE id = ?
        LIMIT 1
      params:
        - ${input.user_id}

inputs:
  user_id:
    type: integer

outputs:
  user: ${get-user.output}

Using Query Components

With Dynamic Parameters

ensemble: sales-report

agents:
  - name: fetch-sales
    operation: data
    config:
      database: "analytics_db"
      query: "query://sales-by-month@v1"
      params:
        start_date: ${input.start_date}
        end_date: ${input.end_date}
        region: ${input.region}

inputs:
  start_date:
    type: string
    format: date
  end_date:
    type: string
    format: date
  region:
    type: string

outputs:
  sales_data: ${fetch-sales.output}

Multi-Step Data Workflow

ensemble: customer-enrichment

flow:
  - agent: fetch-profile
  - agent: fetch-orders
  - agent: fetch-interactions

agents:
  - name: fetch-profile
    operation: data
    config:
      database: "production"
      query: "query://get-customer-profile@v1"
      params:
        customer_id: ${input.customer_id}

  - name: fetch-orders
    operation: data
    config:
      database: "production"
      query: "query://get-customer-orders@v1"
      params:
        customer_id: ${input.customer_id}

  - name: fetch-interactions
    operation: data
    config:
      database: "analytics"
      query: "query://get-customer-interactions@v1"
      params:
        customer_id: ${input.customer_id}

outputs:
  profile: ${fetch-profile.output}
  orders: ${fetch-orders.output}
  interactions: ${fetch-interactions.output}

Query Types and Examples

Customer/User Queries

-- queries/get-customer-profile.sql
SELECT
  id,
  name,
  email,
  phone,
  created_at,
  last_login,
  subscription_status
FROM customers
WHERE id = ${customer_id}

Aggregation Queries

-- queries/monthly-revenue.sql
SELECT
  DATE_TRUNC('month', created_at) as month,
  COUNT(*) as order_count,
  SUM(total) as revenue,
  AVG(total) as avg_order_value
FROM orders
WHERE created_at >= ${start_date}
  AND created_at < ${end_date}
GROUP BY DATE_TRUNC('month', created_at)
ORDER BY month DESC

Join Queries

-- queries/orders-with-customer.sql
SELECT
  o.id as order_id,
  o.created_at,
  o.total,
  c.name as customer_name,
  c.email as customer_email,
  p.name as product_name,
  p.price
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id
LEFT JOIN products p ON o.product_id = p.id
WHERE o.created_at >= ${start_date}
ORDER BY o.created_at DESC
LIMIT ${limit}

Time Series Queries

-- queries/daily-metrics.sql
SELECT
  DATE(created_at) as date,
  COUNT(*) as total_events,
  COUNT(DISTINCT user_id) as unique_users,
  AVG(duration_ms) as avg_duration
FROM events
WHERE created_at >= ${start_date}
  AND event_type = ${event_type}
GROUP BY DATE(created_at)
ORDER BY date DESC

Search/Filter Queries

-- queries/search-products.sql
SELECT
  id,
  name,
  description,
  price,
  stock_quantity,
  category,
  rating
FROM products
WHERE (name ILIKE ${search_term}
   OR description ILIKE ${search_term})
  AND category = ${category}
  AND price BETWEEN ${min_price} AND ${max_price}
ORDER BY rating DESC
LIMIT ${limit}

Caching and Performance

Query components are automatically cached for 1 hour (3600 seconds) after first load.

Default Caching

agents:
  - name: fetch-data
    operation: data
    config:
      database: "postgres"
      query: "query://get-customer-profile@v1"
      # Cached for 1 hour automatically

Custom Cache TTL

agents:
  - name: fetch-data
    operation: data
    config:
      database: "postgres"
      query: "query://get-customer-profile@v1"
      cache:
        ttl: 3600  # 1 hour in seconds

Bypass Cache

agents:
  - name: fetch-fresh-data
    operation: data
    config:
      database: "postgres"
      query: "query://get-customer-profile@v1"
      cache:
        bypass: true  # Fresh query every time

Best Practices

1. Use Parameterized Queries

Always use parameters to prevent SQL injection:
-- Good: Uses parameterized query
SELECT * FROM customers WHERE id = ${customer_id}

-- Bad: String interpolation risk
SELECT * FROM customers WHERE id = '${customer_id}'

2. Add Indexes for Performance

-- Add indexes for columns used in WHERE clauses
CREATE INDEX idx_customer_email ON customers(email);
CREATE INDEX idx_order_customer_id ON orders(customer_id);

3. Optimize with LIMIT

-- Always limit results to prevent huge datasets
SELECT * FROM events LIMIT ${limit}

-- Use OFFSET for pagination
SELECT * FROM orders
WHERE created_at >= ${start_date}
LIMIT ${limit}
OFFSET ${offset}

4. Use Clear Naming

-- Good: Descriptive column aliases
SELECT
  c.id as customer_id,
  c.name as customer_name,
  COUNT(o.id) as order_count

-- Bad: Unclear aliases
SELECT c.id, c.name, COUNT(o.id)

5. Comment Complex Queries

-- queries/customer-ltv.sql
-- Calculate customer lifetime value with RFM metrics
-- Used for segmentation and targeting

WITH customer_metrics AS (
  SELECT
    customer_id,
    COUNT(*) as purchase_count,
    MAX(created_at) as last_purchase_date,
    MIN(created_at) as first_purchase_date,
    SUM(total) as lifetime_value
  FROM orders
  WHERE status = 'completed'
  GROUP BY customer_id
)
SELECT
  cm.customer_id,
  cm.lifetime_value,
  DATEDIFF(day, cm.first_purchase_date, cm.last_purchase_date) as customer_age_days,
  cm.purchase_count,
  cm.lifetime_value / cm.purchase_count as avg_order_value
FROM customer_metrics cm

6. Version for Changes

Track query changes with semantic versioning:
# v1.0.0 - Initial query
edgit tag create get-customer-profile v1.0.0

# v1.1.0 - Add new column (backward compatible)
edgit tag create get-customer-profile v1.1.0

# v2.0.0 - Restructure output (breaking change)
edgit tag create get-customer-profile v2.0.0

7. Test Query Performance

# Use EXPLAIN to analyze query plans
EXPLAIN ANALYZE
SELECT * FROM customers WHERE email = '[email protected]';

Versioning Strategy

Development Workflow

# 1. Create initial version
edgit tag create get-customer-profile v1.0.0

# 2. Optimize in v1.1.0
edgit tag create get-customer-profile v1.1.0

# 3. Promote to production
edgit tag set get-customer-profile production v1.1.0

Staged Testing

ensemble: data-fetch-test

agents:
  - name: fetch-v1
    operation: data
    config:
      query: "query://[email protected]"

  - name: fetch-v2
    operation: data
    config:
      query: "query://[email protected]"

outputs:
  v1_result: ${fetch-v1.output}
  v2_result: ${fetch-v2.output}

Rollback Strategy

ensemble: customer-lookup-stable

agents:
  - name: fetch-profile
    operation: data
    config:
      database: "postgres"
      # Use stable version if latest has issues
      query: "query://[email protected]"

Using ctx API in Agents

When building custom agents with TypeScript handlers, you can access SQL queries through the ctx API:

ctx.queries.getSql(name)

Get a SQL query template by name:
// agents/data-fetcher/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function fetchData(ctx: AgentExecutionContext) {
  // Get SQL query template
  const sql = await ctx.queries.getSql('get-customer-profile')

  return {
    query: sql,
    length: sql.length
  }
}

Using Query with D1

// agents/customer-lookup/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

interface CustomerInput {
  customerId: string
}

export default async function lookupCustomer(ctx: AgentExecutionContext) {
  const { customerId } = ctx.input as CustomerInput

  // Get SQL query
  const sql = await ctx.queries.getSql('get-customer-profile')

  // Execute with D1
  const db = ctx.env.DB // D1 binding
  const result = await db.prepare(sql)
    .bind(customerId)
    .first()

  return {
    customer: result,
    found: !!result
  }
}

Parameterized Query Execution

// agents/order-fetcher/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

interface OrderInput {
  startDate: string
  endDate: string
  limit: number
}

export default async function fetchOrders(ctx: AgentExecutionContext) {
  const { startDate, endDate, limit } = ctx.input as OrderInput

  // Get SQL query template
  const sql = await ctx.queries.getSql('orders-by-date-range')

  // Execute with parameters
  const db = ctx.env.DB
  const result = await db.prepare(sql)
    .bind(startDate, endDate, limit)
    .all()

  return {
    orders: result.results || [],
    count: result.results?.length || 0
  }
}

Dynamic Query Selection

// agents/dynamic-reporter/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function generateReport(ctx: AgentExecutionContext) {
  const { reportType, params } = ctx.input as {
    reportType: 'daily' | 'weekly' | 'monthly'
    params: Record<string, any>
  }

  // Select query based on report type
  const queryName = `report-${reportType}`
  const sql = await ctx.queries.getSql(queryName)

  const db = ctx.env.DB
  const result = await db.prepare(sql)
    .bind(...Object.values(params))
    .all()

  return {
    reportType,
    data: result.results || [],
    count: result.results?.length || 0
  }
}

Batch Query Execution

// agents/batch-processor/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

interface BatchInput {
  customerIds: string[]
}

export default async function batchFetch(ctx: AgentExecutionContext) {
  const { customerIds } = ctx.input as BatchInput

  // Get query template
  const sql = await ctx.queries.getSql('get-customer-profile')

  const db = ctx.env.DB

  // Execute queries in parallel
  const results = await Promise.all(
    customerIds.map(id =>
      db.prepare(sql).bind(id).first()
    )
  )

  return {
    customers: results.filter(r => r !== null),
    total: customerIds.length,
    found: results.filter(r => r !== null).length
  }
}

Query with Result Processing

// agents/analytics-processor/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processAnalytics(ctx: AgentExecutionContext) {
  const { startDate, endDate } = ctx.input as {
    startDate: string
    endDate: string
  }

  // Get and execute query
  const sql = await ctx.queries.getSql('daily-metrics')
  const db = ctx.env.DB

  const result = await db.prepare(sql)
    .bind(startDate, endDate)
    .all()

  // Process results
  const metrics = result.results || []
  const summary = {
    total_events: metrics.reduce((sum: number, m: any) => sum + m.total_events, 0),
    avg_duration: metrics.reduce((sum: number, m: any) => sum + m.avg_duration, 0) / metrics.length,
    unique_users: new Set(metrics.flatMap((m: any) => m.unique_users)).size
  }

  return {
    daily_metrics: metrics,
    summary,
    period: { startDate, endDate }
  }
}

Troubleshooting

Query Not Found

Error: Component not found: query://[email protected] Solution:
  1. Check query exists: edgit list queries
  2. Check version: edgit tag list get-customer-profile
  3. Verify deployment: edgit tag show [email protected]

SQL Syntax Error

Error: SQL parse error in query Solution:
  1. Test query directly in database client
  2. Check parameter names match query placeholders
  3. Verify SQL dialect matches database engine

Missing Parameters

Error: Parameter not found: ${customer_id} Solution:
  1. Ensure all ${param} placeholders have values
  2. Check parameter names match exactly (case-sensitive)
  3. Provide default values if needed

Performance Issues

Issue: Query takes too long Solutions:
  1. Add indexes on filtered columns
  2. Limit result set size
  3. Use EXPLAIN to analyze query plan
  4. Consider materializing common queries

Next Steps