Skip to main content

Basic Pattern

ensemble: api-workflow

agents:
  # Call APIs in sequence or parallel
  - name: api-1
    operation: fetch
    inputs:
      url: https://api1.example.com/data

  - name: api-2
    operation: fetch
    inputs:
      url: https://api2.example.com/data
      body: ${api-1.output.body}

  # Transform and combine
  - name: combine
    operation: code
    inputs:
      data:
        source1: ${api-1.output.body}
        source2: ${api-2.output.body}

OAuth Flow

ensemble: oauth-api-call

agents:
  # Get access token
  - name: get-token
    operation: fetch
    inputs:
      url: https://auth.example.com/oauth/token
      method: POST
      body:
        grant_type: client_credentials
        client_id: ${env.CLIENT_ID}
        client_secret: ${env.CLIENT_SECRET}
    cache:
      ttl: 3600
      key: oauth-token

  # Use token
  - name: api-call
    operation: fetch
    inputs:
      url: https://api.example.com/data
      headers:
        Authorization: Bearer ${get-token.output.body.access_token}

Session-Based Authentication

Use cookies with KV storage for stateful session management:
ensemble: session-auth-api

trigger:
  - type: http
    path: /api/protected
    methods: [GET, POST]
    public: true  # Auth handled by session check

agents:
  # Validate session from cookie
  - name: validate-session
    condition: ${input.cookies.session_id}
    operation: storage
    config:
      type: kv
      action: get
      key: session-${input.cookies.session_id}

  # Check if session is valid and not expired
  - name: check-auth
    operation: code
    config:
      handler: |
        const session = context.input.session
        if (!session) {
          return { authenticated: false, error: 'No session' }
        }
        if (session.expires_at < Date.now()) {
          return { authenticated: false, error: 'Session expired' }
        }
        return { authenticated: true, userId: session.user_id }
    input:
      session: ${validate-session.output.value}

  # Reject if not authenticated
  - name: reject
    condition: ${!check-auth.output.authenticated}
    operation: code
    config:
      handler: |
        return {
          status: 401,
          error: context.input.error || 'Unauthorized'
        }
    input:
      error: ${check-auth.output.error}

  # Proceed with authenticated request
  - name: protected-action
    condition: ${check-auth.output.authenticated}
    operation: fetch
    inputs:
      url: https://api.internal.com/user/${check-auth.output.userId}

  # Refresh session TTL on activity
  - name: refresh-session
    condition: ${check-auth.output.authenticated}
    operation: storage
    config:
      type: kv
      action: put
      key: session-${input.cookies.session_id}
      value: ${validate-session.output.value}
      expirationTtl: 86400  # Extend 24 hours

output:
  data: ${protected-action.output.body}
  error: ${reject.output.error}

Login Endpoint

Create sessions on successful authentication:
ensemble: login

trigger:
  - type: http
    path: /api/login
    methods: [POST]
    public: true

agents:
  # Verify credentials
  - name: verify
    operation: fetch
    inputs:
      url: https://auth.internal.com/verify
      method: POST
      body: ${input.body}

  # Create session on success
  - name: create-session
    condition: ${verify.output.body.success}
    operation: code
    config:
      handler: |
        return {
          session_id: crypto.randomUUID(),
          user_id: context.input.userId,
          expires_at: Date.now() + 86400000
        }
    input:
      userId: ${verify.output.body.user_id}

  # Store session
  - name: save-session
    condition: ${verify.output.body.success}
    operation: storage
    config:
      type: kv
      action: put
      key: session-${create-session.output.session_id}
      value: ${create-session.output}
      expirationTtl: 86400

  # Set session cookie
  - name: set-cookie
    condition: ${verify.output.body.success}
    operation: cookies
    config:
      action: set
      name: session_id
      value: ${create-session.output.session_id}
      httpOnly: true
      secure: true
      sameSite: strict
      maxAge: 86400
      purpose: essential

output:
  success: ${verify.output.body.success}
  user: ${verify.output.body.user}

Parallel API Aggregation

ensemble: aggregate-apis

agents:
  # Call multiple APIs in parallel
  - name: weather
    operation: fetch
    inputs:
      url: https://api.weather.com/current?city=${input.city}

  - name: news
    operation: fetch
    inputs:
      url: https://api.news.com/local?city=${input.city}

  - name: events
    operation: fetch
    inputs:
      url: https://api.events.com/upcoming?city=${input.city}

  # Combine results
  - name: combine
    operation: code
    config:
      script: scripts/combine-city-data
    input:
      city: ${input.city}
      weather: ${weather.output.body}
      news_articles: ${news.output.body.articles}
      events: ${events.output.body.events}
// scripts/combine-city-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function combineCityData(context: AgentExecutionContext) {
  const { city, weather, news_articles, events } = context.input

  return {
    city,
    weather,
    news: news_articles.slice(0, 5),
    events: events.slice(0, 10)
  }
}

Webhook Handling

ensemble: process-webhook

agents:
  # Validate webhook signature
  - name: validate
    operation: code
    config:
      script: scripts/validate-webhook-signature
    input:
      body: ${input.body}
      signature: ${input.headers['x-signature']}
      secret: ${env.WEBHOOK_SECRET}

  # Process if valid
  - name: process
    condition: ${validate.output.valid}
    operation: code
    config:
      script: scripts/process-webhook-data
// scripts/validate-webhook-signature.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
import * as crypto from 'crypto'

export default function validateWebhookSignature(context: AgentExecutionContext) {
  const { body, signature, secret } = context.input

  const computedSignature = crypto
    .createHmac('sha256', secret)
    .update(JSON.stringify(body))
    .digest('hex')

  return {
    valid: computedSignature === signature
  }
}
// scripts/process-webhook-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function processWebhookData(context: AgentExecutionContext) {
  // Process webhook data
  return { processed: true }
}
  # Forward to internal API
  - name: forward
    condition: ${process.executed}
    operation: fetch
    inputs:
      url: ${env.INTERNAL_API_URL}
      method: POST
      body: ${input.body}

Rate Limiting

ensemble: rate-limited-api

agents:
  # Check rate limit
  - name: check-limit
    operation: storage
    config:
      type: kv
      action: get
      key: rate-limit-${input.user_id}

  # Allow if under limit
  - name: allow
    condition: ${!check-limit.output || check-limit.output.count < 100}
    operation: fetch
    inputs:
      url: https://api.example.com/data

  # Update counter
  - name: update-limit
    condition: ${allow.executed}
    operation: storage
    config:
      type: kv
      action: put
      key: rate-limit-${input.user_id}
      value:
        count: ${(check-limit.output?.count || 0) + 1}
        reset_at: ${Date.now() + 3600000}
      ttl: 3600

  # Reject if over limit
  - name: reject
    condition: ${check-limit.output && check-limit.output.count >= 100}
    operation: code
    config:
      script: scripts/reject-rate-limited
    input:
      reset_at: ${check-limit.output.reset_at}
// scripts/reject-rate-limited.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function rejectRateLimited(context: AgentExecutionContext) {
  const { reset_at } = context.input
  return {
    error: 'Rate limit exceeded',
    retry_after: reset_at
  }
}

Circuit Breaker

ensemble: circuit-breaker

agents:
  # Check circuit state
  - name: check-circuit
    operation: storage
    config:
      type: kv
      action: get
      key: circuit-${input.service}

  # Call if circuit closed
  - name: call-api
    condition: ${!check-circuit.output || check-circuit.output.state !== 'open'}
    operation: fetch
    inputs:
      url: ${input.url}
    retry:
      maxAttempts: 3

  # Open circuit on failure
  - name: open-circuit
    condition: ${call-api.failed}
    operation: storage
    config:
      type: kv
      action: put
      key: circuit-${input.service}
      value:
        state: open
        opened_at: ${Date.now()}
      ttl: 60

  # Use fallback if circuit open
  - name: fallback
    condition: ${check-circuit.output?.state === 'open'}
    operation: code
    config:
      script: scripts/circuit-breaker-fallback
// scripts/circuit-breaker-fallback.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'

export default function circuitBreakerFallback(context: AgentExecutionContext) {
  return {
    error: 'Service unavailable',
    fallback: true
  }
}

Next Steps