Basic Pattern
Copy
ensemble: api-workflow
agents:
# Call APIs in sequence or parallel
- name: api-1
operation: fetch
inputs:
url: https://api1.example.com/data
- name: api-2
operation: fetch
inputs:
url: https://api2.example.com/data
body: ${api-1.output.body}
# Transform and combine
- name: combine
operation: code
inputs:
data:
source1: ${api-1.output.body}
source2: ${api-2.output.body}
OAuth Flow
Copy
ensemble: oauth-api-call
agents:
# Get access token
- name: get-token
operation: fetch
inputs:
url: https://auth.example.com/oauth/token
method: POST
body:
grant_type: client_credentials
client_id: ${env.CLIENT_ID}
client_secret: ${env.CLIENT_SECRET}
cache:
ttl: 3600
key: oauth-token
# Use token
- name: api-call
operation: fetch
inputs:
url: https://api.example.com/data
headers:
Authorization: Bearer ${get-token.output.body.access_token}
Session-Based Authentication
Use cookies with KV storage for stateful session management:Copy
ensemble: session-auth-api
trigger:
- type: http
path: /api/protected
methods: [GET, POST]
public: true # Auth handled by session check
agents:
# Validate session from cookie
- name: validate-session
condition: ${input.cookies.session_id}
operation: storage
config:
type: kv
action: get
key: session-${input.cookies.session_id}
# Check if session is valid and not expired
- name: check-auth
operation: code
config:
handler: |
const session = context.input.session
if (!session) {
return { authenticated: false, error: 'No session' }
}
if (session.expires_at < Date.now()) {
return { authenticated: false, error: 'Session expired' }
}
return { authenticated: true, userId: session.user_id }
input:
session: ${validate-session.output.value}
# Reject if not authenticated
- name: reject
condition: ${!check-auth.output.authenticated}
operation: code
config:
handler: |
return {
status: 401,
error: context.input.error || 'Unauthorized'
}
input:
error: ${check-auth.output.error}
# Proceed with authenticated request
- name: protected-action
condition: ${check-auth.output.authenticated}
operation: fetch
inputs:
url: https://api.internal.com/user/${check-auth.output.userId}
# Refresh session TTL on activity
- name: refresh-session
condition: ${check-auth.output.authenticated}
operation: storage
config:
type: kv
action: put
key: session-${input.cookies.session_id}
value: ${validate-session.output.value}
expirationTtl: 86400 # Extend 24 hours
output:
data: ${protected-action.output.body}
error: ${reject.output.error}
Login Endpoint
Create sessions on successful authentication:Copy
ensemble: login
trigger:
- type: http
path: /api/login
methods: [POST]
public: true
agents:
# Verify credentials
- name: verify
operation: fetch
inputs:
url: https://auth.internal.com/verify
method: POST
body: ${input.body}
# Create session on success
- name: create-session
condition: ${verify.output.body.success}
operation: code
config:
handler: |
return {
session_id: crypto.randomUUID(),
user_id: context.input.userId,
expires_at: Date.now() + 86400000
}
input:
userId: ${verify.output.body.user_id}
# Store session
- name: save-session
condition: ${verify.output.body.success}
operation: storage
config:
type: kv
action: put
key: session-${create-session.output.session_id}
value: ${create-session.output}
expirationTtl: 86400
# Set session cookie
- name: set-cookie
condition: ${verify.output.body.success}
operation: cookies
config:
action: set
name: session_id
value: ${create-session.output.session_id}
httpOnly: true
secure: true
sameSite: strict
maxAge: 86400
purpose: essential
output:
success: ${verify.output.body.success}
user: ${verify.output.body.user}
Parallel API Aggregation
Copy
ensemble: aggregate-apis
agents:
# Call multiple APIs in parallel
- name: weather
operation: fetch
inputs:
url: https://api.weather.com/current?city=${input.city}
- name: news
operation: fetch
inputs:
url: https://api.news.com/local?city=${input.city}
- name: events
operation: fetch
inputs:
url: https://api.events.com/upcoming?city=${input.city}
# Combine results
- name: combine
operation: code
config:
script: scripts/combine-city-data
input:
city: ${input.city}
weather: ${weather.output.body}
news_articles: ${news.output.body.articles}
events: ${events.output.body.events}
Copy
// scripts/combine-city-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function combineCityData(context: AgentExecutionContext) {
const { city, weather, news_articles, events } = context.input
return {
city,
weather,
news: news_articles.slice(0, 5),
events: events.slice(0, 10)
}
}
Webhook Handling
Copy
ensemble: process-webhook
agents:
# Validate webhook signature
- name: validate
operation: code
config:
script: scripts/validate-webhook-signature
input:
body: ${input.body}
signature: ${input.headers['x-signature']}
secret: ${env.WEBHOOK_SECRET}
# Process if valid
- name: process
condition: ${validate.output.valid}
operation: code
config:
script: scripts/process-webhook-data
Copy
// scripts/validate-webhook-signature.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
import * as crypto from 'crypto'
export default function validateWebhookSignature(context: AgentExecutionContext) {
const { body, signature, secret } = context.input
const computedSignature = crypto
.createHmac('sha256', secret)
.update(JSON.stringify(body))
.digest('hex')
return {
valid: computedSignature === signature
}
}
Copy
// scripts/process-webhook-data.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function processWebhookData(context: AgentExecutionContext) {
// Process webhook data
return { processed: true }
}
Copy
# Forward to internal API
- name: forward
condition: ${process.executed}
operation: fetch
inputs:
url: ${env.INTERNAL_API_URL}
method: POST
body: ${input.body}
Rate Limiting
Copy
ensemble: rate-limited-api
agents:
# Check rate limit
- name: check-limit
operation: storage
config:
type: kv
action: get
key: rate-limit-${input.user_id}
# Allow if under limit
- name: allow
condition: ${!check-limit.output || check-limit.output.count < 100}
operation: fetch
inputs:
url: https://api.example.com/data
# Update counter
- name: update-limit
condition: ${allow.executed}
operation: storage
config:
type: kv
action: put
key: rate-limit-${input.user_id}
value:
count: ${(check-limit.output?.count || 0) + 1}
reset_at: ${Date.now() + 3600000}
ttl: 3600
# Reject if over limit
- name: reject
condition: ${check-limit.output && check-limit.output.count >= 100}
operation: code
config:
script: scripts/reject-rate-limited
input:
reset_at: ${check-limit.output.reset_at}
Copy
// scripts/reject-rate-limited.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function rejectRateLimited(context: AgentExecutionContext) {
const { reset_at } = context.input
return {
error: 'Rate limit exceeded',
retry_after: reset_at
}
}
Circuit Breaker
Copy
ensemble: circuit-breaker
agents:
# Check circuit state
- name: check-circuit
operation: storage
config:
type: kv
action: get
key: circuit-${input.service}
# Call if circuit closed
- name: call-api
condition: ${!check-circuit.output || check-circuit.output.state !== 'open'}
operation: fetch
inputs:
url: ${input.url}
retry:
maxAttempts: 3
# Open circuit on failure
- name: open-circuit
condition: ${call-api.failed}
operation: storage
config:
type: kv
action: put
key: circuit-${input.service}
value:
state: open
opened_at: ${Date.now()}
ttl: 60
# Use fallback if circuit open
- name: fallback
condition: ${check-circuit.output?.state === 'open'}
operation: code
config:
script: scripts/circuit-breaker-fallback
Copy
// scripts/circuit-breaker-fallback.ts
import type { AgentExecutionContext } from '@ensemble-edge/conductor'
export default function circuitBreakerFallback(context: AgentExecutionContext) {
return {
error: 'Service unavailable',
fallback: true
}
}

