Skip to main content
Define and version reusable SQL queries as components for consistent data operations across ensembles.

Overview

Query components enable you to:
  • Reuse SQL queries across multiple agents and ensembles
  • Version queries for reproducibility and rollback
  • Organize complex multi-step queries
  • Share queries across teams
  • A/B test different query strategies

Quick Start

1. Create a Query Component

Create a SQL file:
-- queries/get-customer-profile.sql
SELECT
  c.id,
  c.name,
  c.email,
  c.created_at,
  COUNT(o.id) as order_count,
  SUM(o.total) as lifetime_value
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE c.email = ${email}
GROUP BY c.id, c.name, c.email, c.created_at
LIMIT 1

2. Add to Edgit

edgit components add get-customer-profile queries/get-customer-profile.sql query
edgit tag create get-customer-profile v1.0.0
edgit tag set get-customer-profile production v1.0.0
edgit push --tags --force

3. Reference in Your Ensemble

ensemble: customer-lookup

agents:
  - name: fetch-profile
    operation: data
    config:
      database: "postgres"
      # Reference the query component
      query: "query://get-customer-profile@v1.0.0"
      params:
        email: ${input.customer_email}

inputs:
  customer_email:
    type: string
    required: true

outputs:
  profile: ${fetch-profile.output}

URI Format and Versioning

All query components use the standardized URI format:
query://{path}[@{version}]
Format breakdown:
  • query:// - Protocol identifier for query components
  • {path} - Logical path to the query (e.g., get-customer-profile, analytics/daily-sales)
  • [@{version}] - Optional version identifier (defaults to @latest)
Version format:
  • @latest - Always uses the most recent version
  • @v1 - Uses latest patch of major version (v1.x.x)
  • @v1.0.0 - Specific semantic version (immutable)
  • @prod - Custom tag for production queries
  • @staging - Custom tag for staging queries

Example URIs

# Always latest version
query: "query://get-customer-profile"
query: "query://get-customer-profile@latest"

# Specific semantic version
query: "query://get-customer-profile@v1.0.0"
query: "query://get-customer-profile@v2.1.3"

# Custom tags
query: "query://get-customer-profile@prod"
query: "query://get-customer-profile@staging"

# Nested paths
query: "query://analytics/daily-sales@v1"
query: "query://reports/customer-summary@v1"

How to Reference in Ensembles

There are three ways to reference queries in your ensembles: Use the query:// URI format to reference versioned query components:
ensemble: user-analyzer

agents:
  - name: get-users
    operation: data
    config:
      database: "postgres"
      query: "query://active-users@v1.0.0"
      params:
        days: 30

outputs:
  users: ${get-users.output}

2. Template Expression Format

Use ${components.query_name@version} to embed query references:
ensemble: complex-report

agents:
  - name: generate-report
    operation: data
    config:
      database: "analytics"
      query: |
        -- Base query from component
        ${components.base-report-query@v1}

        -- Add custom filters
        WHERE created_at >= ${input.start_date}
          AND region = ${input.region}
      params:
        start_date: ${input.start_date}
        region: ${input.region}

inputs:
  start_date:
    type: string
  region:
    type: string

outputs:
  report: ${generate-report.output}

3. Inline Query

For simple operations or during development, use inline SQL directly:
ensemble: simple-lookup

agents:
  - name: get-user
    operation: data
    config:
      database: "postgres"
      query: |
        SELECT id, name, email, created_at
        FROM users
        WHERE id = ?
        LIMIT 1
      params:
        - ${input.user_id}

inputs:
  user_id:
    type: integer

outputs:
  user: ${get-user.output}

Using Query Components

With Dynamic Parameters

ensemble: sales-report

agents:
  - name: fetch-sales
    operation: data
    config:
      database: "analytics_db"
      query: "query://sales-by-month@v1"
      params:
        start_date: ${input.start_date}
        end_date: ${input.end_date}
        region: ${input.region}

inputs:
  start_date:
    type: string
    format: date
  end_date:
    type: string
    format: date
  region:
    type: string

outputs:
  sales_data: ${fetch-sales.output}

Multi-Step Data Workflow

ensemble: customer-enrichment

flow:
  - agent: fetch-profile
  - agent: fetch-orders
  - agent: fetch-interactions

agents:
  - name: fetch-profile
    operation: data
    config:
      database: "production"
      query: "query://get-customer-profile@v1"
      params:
        customer_id: ${input.customer_id}

  - name: fetch-orders
    operation: data
    config:
      database: "production"
      query: "query://get-customer-orders@v1"
      params:
        customer_id: ${input.customer_id}

  - name: fetch-interactions
    operation: data
    config:
      database: "analytics"
      query: "query://get-customer-interactions@v1"
      params:
        customer_id: ${input.customer_id}

outputs:
  profile: ${fetch-profile.output}
  orders: ${fetch-orders.output}
  interactions: ${fetch-interactions.output}

Query Types and Examples

Customer/User Queries

-- queries/get-customer-profile.sql
SELECT
  id,
  name,
  email,
  phone,
  created_at,
  last_login,
  subscription_status
FROM customers
WHERE id = ${customer_id}

Aggregation Queries

-- queries/monthly-revenue.sql
SELECT
  DATE_TRUNC('month', created_at) as month,
  COUNT(*) as order_count,
  SUM(total) as revenue,
  AVG(total) as avg_order_value
FROM orders
WHERE created_at >= ${start_date}
  AND created_at < ${end_date}
GROUP BY DATE_TRUNC('month', created_at)
ORDER BY month DESC

Join Queries

-- queries/orders-with-customer.sql
SELECT
  o.id as order_id,
  o.created_at,
  o.total,
  c.name as customer_name,
  c.email as customer_email,
  p.name as product_name,
  p.price
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id
LEFT JOIN products p ON o.product_id = p.id
WHERE o.created_at >= ${start_date}
ORDER BY o.created_at DESC
LIMIT ${limit}

Time Series Queries

-- queries/daily-metrics.sql
SELECT
  DATE(created_at) as date,
  COUNT(*) as total_events,
  COUNT(DISTINCT user_id) as unique_users,
  AVG(duration_ms) as avg_duration
FROM events
WHERE created_at >= ${start_date}
  AND event_type = ${event_type}
GROUP BY DATE(created_at)
ORDER BY date DESC

Search/Filter Queries

-- queries/search-products.sql
SELECT
  id,
  name,
  description,
  price,
  stock_quantity,
  category,
  rating
FROM products
WHERE (name ILIKE ${search_term}
   OR description ILIKE ${search_term})
  AND category = ${category}
  AND price BETWEEN ${min_price} AND ${max_price}
ORDER BY rating DESC
LIMIT ${limit}

Caching and Performance

Query components are automatically cached for 1 hour (3600 seconds) after first load.

Default Caching

agents:
  - name: fetch-data
    operation: data
    config:
      database: "postgres"
      query: "query://get-customer-profile@v1"
      # Cached for 1 hour automatically

Custom Cache TTL

agents:
  - name: fetch-data
    operation: data
    config:
      database: "postgres"
      query: "query://get-customer-profile@v1"
      cache:
        ttl: 3600  # 1 hour in seconds

Bypass Cache

agents:
  - name: fetch-fresh-data
    operation: data
    config:
      database: "postgres"
      query: "query://get-customer-profile@v1"
      cache:
        bypass: true  # Fresh query every time

Best Practices

1. Use Parameterized Queries

Always use parameters to prevent SQL injection:
-- Good: Uses parameterized query
SELECT * FROM customers WHERE id = ${customer_id}

-- Bad: String interpolation risk
SELECT * FROM customers WHERE id = '${customer_id}'

2. Add Indexes for Performance

-- Add indexes for columns used in WHERE clauses
CREATE INDEX idx_customer_email ON customers(email);
CREATE INDEX idx_order_customer_id ON orders(customer_id);

3. Optimize with LIMIT

-- Always limit results to prevent huge datasets
SELECT * FROM events LIMIT ${limit}

-- Use OFFSET for pagination
SELECT * FROM orders
WHERE created_at >= ${start_date}
LIMIT ${limit}
OFFSET ${offset}

4. Use Clear Naming

-- Good: Descriptive column aliases
SELECT
  c.id as customer_id,
  c.name as customer_name,
  COUNT(o.id) as order_count

-- Bad: Unclear aliases
SELECT c.id, c.name, COUNT(o.id)

5. Comment Complex Queries

-- queries/customer-ltv.sql
-- Calculate customer lifetime value with RFM metrics
-- Used for segmentation and targeting

WITH customer_metrics AS (
  SELECT
    customer_id,
    COUNT(*) as purchase_count,
    MAX(created_at) as last_purchase_date,
    MIN(created_at) as first_purchase_date,
    SUM(total) as lifetime_value
  FROM orders
  WHERE status = 'completed'
  GROUP BY customer_id
)
SELECT
  cm.customer_id,
  cm.lifetime_value,
  DATEDIFF(day, cm.first_purchase_date, cm.last_purchase_date) as customer_age_days,
  cm.purchase_count,
  cm.lifetime_value / cm.purchase_count as avg_order_value
FROM customer_metrics cm

6. Version for Changes

Track query changes with semantic versioning:
# v1.0.0 - Initial query
edgit tag create get-customer-profile v1.0.0

# v1.1.0 - Add new column (backward compatible)
edgit tag create get-customer-profile v1.1.0

# v2.0.0 - Restructure output (breaking change)
edgit tag create get-customer-profile v2.0.0

7. Test Query Performance

# Use EXPLAIN to analyze query plans
EXPLAIN ANALYZE
SELECT * FROM customers WHERE email = 'user@example.com';

Versioning Strategy

Development Workflow

# 1. Create initial version
edgit tag create get-customer-profile v1.0.0

# 2. Optimize in v1.1.0
edgit tag create get-customer-profile v1.1.0

# 3. Promote to production
edgit tag set get-customer-profile production v1.1.0

Staged Testing

ensemble: data-fetch-test

agents:
  - name: fetch-v1
    operation: data
    config:
      query: "query://get-customer-profile@v1.0.0"

  - name: fetch-v2
    operation: data
    config:
      query: "query://get-customer-profile@v1.1.0"

outputs:
  v1_result: ${fetch-v1.output}
  v2_result: ${fetch-v2.output}

Rollback Strategy

ensemble: customer-lookup-stable

agents:
  - name: fetch-profile
    operation: data
    config:
      database: "postgres"
      # Use stable version if latest has issues
      query: "query://get-customer-profile@v1.0.0"

Using ctx API in Agents

When building custom agents with TypeScript handlers, you can access SQL queries through the ctx API:

ctx.queries.getSql(name)

Get a SQL query template by name:
// agents/data-fetcher/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function fetchData(ctx: AgentExecutionContext) {
  // Get SQL query template
  const sql = await ctx.queries.getSql('get-customer-profile')

  return {
    query: sql,
    length: sql.length
  }
}

Using Query with D1

// agents/customer-lookup/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

interface CustomerInput {
  customerId: string
}

export default async function lookupCustomer(ctx: AgentExecutionContext) {
  const { customerId } = ctx.input as CustomerInput

  // Get SQL query
  const sql = await ctx.queries.getSql('get-customer-profile')

  // Execute with D1
  const db = ctx.env.DB // D1 binding
  const result = await db.prepare(sql)
    .bind(customerId)
    .first()

  return {
    customer: result,
    found: !!result
  }
}

Parameterized Query Execution

// agents/order-fetcher/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

interface OrderInput {
  startDate: string
  endDate: string
  limit: number
}

export default async function fetchOrders(ctx: AgentExecutionContext) {
  const { startDate, endDate, limit } = ctx.input as OrderInput

  // Get SQL query template
  const sql = await ctx.queries.getSql('orders-by-date-range')

  // Execute with parameters
  const db = ctx.env.DB
  const result = await db.prepare(sql)
    .bind(startDate, endDate, limit)
    .all()

  return {
    orders: result.results || [],
    count: result.results?.length || 0
  }
}

Dynamic Query Selection

// agents/dynamic-reporter/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function generateReport(ctx: AgentExecutionContext) {
  const { reportType, params } = ctx.input as {
    reportType: 'daily' | 'weekly' | 'monthly'
    params: Record<string, any>
  }

  // Select query based on report type
  const queryName = `report-${reportType}`
  const sql = await ctx.queries.getSql(queryName)

  const db = ctx.env.DB
  const result = await db.prepare(sql)
    .bind(...Object.values(params))
    .all()

  return {
    reportType,
    data: result.results || [],
    count: result.results?.length || 0
  }
}

Batch Query Execution

// agents/batch-processor/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

interface BatchInput {
  customerIds: string[]
}

export default async function batchFetch(ctx: AgentExecutionContext) {
  const { customerIds } = ctx.input as BatchInput

  // Get query template
  const sql = await ctx.queries.getSql('get-customer-profile')

  const db = ctx.env.DB

  // Execute queries in parallel
  const results = await Promise.all(
    customerIds.map(id =>
      db.prepare(sql).bind(id).first()
    )
  )

  return {
    customers: results.filter(r => r !== null),
    total: customerIds.length,
    found: results.filter(r => r !== null).length
  }
}

Query with Result Processing

// agents/analytics-processor/index.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default async function processAnalytics(ctx: AgentExecutionContext) {
  const { startDate, endDate } = ctx.input as {
    startDate: string
    endDate: string
  }

  // Get and execute query
  const sql = await ctx.queries.getSql('daily-metrics')
  const db = ctx.env.DB

  const result = await db.prepare(sql)
    .bind(startDate, endDate)
    .all()

  // Process results
  const metrics = result.results || []
  const summary = {
    total_events: metrics.reduce((sum: number, m: any) => sum + m.total_events, 0),
    avg_duration: metrics.reduce((sum: number, m: any) => sum + m.avg_duration, 0) / metrics.length,
    unique_users: new Set(metrics.flatMap((m: any) => m.unique_users)).size
  }

  return {
    daily_metrics: metrics,
    summary,
    period: { startDate, endDate }
  }
}

Troubleshooting

Query Not Found

Error: Component not found: query://get-customer-profile@v1.0.0 Solution:
  1. Check query exists: edgit list queries
  2. Check version: edgit tag list get-customer-profile
  3. Verify deployment: edgit tag show get-customer-profile@v1.0.0

SQL Syntax Error

Error: SQL parse error in query Solution:
  1. Test query directly in database client
  2. Check parameter names match query placeholders
  3. Verify SQL dialect matches database engine

Missing Parameters

Error: Parameter not found: ${customer_id} Solution:
  1. Ensure all ${param} placeholders have values
  2. Check parameter names match exactly (case-sensitive)
  3. Provide default values if needed

Performance Issues

Issue: Query takes too long Solutions:
  1. Add indexes on filtered columns
  2. Limit result set size
  3. Use EXPLAIN to analyze query plan
  4. Consider materializing common queries

Next Steps

Prompt Components

Reusable AI instructions

Config Components

JSON settings as components

Data Operation

Database query operations

Edgit Versioning

Version control for components