Overview
Complete e-commerce workflow handling order placement, payment processing, inventory updates, and customer notifications with proper error handling and rollback capabilities.Order Processing Flow
Copy
Order → Validate → Reserve Inventory → Process Payment → Fulfill → Notify
Complete Workflow
Copy
name: process-order
description: Complete order processing workflow
state:
schema:
order: object
inventoryReserved: boolean
paymentProcessed: boolean
fulfillmentStatus: string
flow:
# Validate Order
- member: validate-order
type: Function
input:
items: ${input.items}
customerId: ${input.customerId}
shippingAddress: ${input.shippingAddress}
state:
set: [order]
# Check Inventory Availability
parallel:
- member: check-inventory
foreach: ${input.items}
type: Data
config:
storage: d1
operation: query
query: |
SELECT quantity FROM inventory
WHERE product_id = ? AND quantity >= ?
input:
params:
- ${item.productId}
- ${item.quantity}
# Reserve Inventory
- member: reserve-inventory
condition: ${check-inventory.output.every(r => r.results.length > 0)}
foreach: ${input.items}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE inventory
SET quantity = quantity - ?,
reserved = reserved + ?
WHERE product_id = ?
input:
params:
- ${item.quantity}
- ${item.quantity}
- ${item.productId}
state:
set: [inventoryReserved]
# Process Payment
- member: process-payment
condition: ${state.inventoryReserved}
type: API
config:
url: "https://api.stripe.com/v1/payment_intents"
method: POST
headers:
Authorization: "Bearer ${env.STRIPE_SECRET_KEY}"
Content-Type: "application/x-www-form-urlencoded"
input:
body:
amount: ${calculate-total(input.items)}
currency: "usd"
customer: ${input.customerId}
payment_method: ${input.paymentMethodId}
confirm: true
state:
set: [paymentProcessed]
# Handle Payment Failure - Release Inventory
- member: release-inventory-on-payment-failure
condition: ${!process-payment.success && state.inventoryReserved}
foreach: ${input.items}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE inventory
SET quantity = quantity + ?,
reserved = reserved - ?
WHERE product_id = ?
input:
params:
- ${item.quantity}
- ${item.quantity}
- ${item.productId}
# Create Order Record
- member: create-order
condition: ${state.paymentProcessed}
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO orders (
customer_id, total, status, payment_intent_id, created_at
) VALUES (?, ?, 'processing', ?, CURRENT_TIMESTAMP)
RETURNING id
input:
params:
- ${input.customerId}
- ${calculate-total(input.items)}
- ${process-payment.output.data.id}
# Create Order Items
- member: create-order-items
condition: ${create-order.success}
foreach: ${input.items}
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO order_items (
order_id, product_id, quantity, price
) VALUES (?, ?, ?, ?)
input:
params:
- ${create-order.output.results[0].id}
- ${item.productId}
- ${item.quantity}
- ${item.price}
# Update Inventory - Remove from Reserved
- member: finalize-inventory
condition: ${create-order.success}
foreach: ${input.items}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE inventory
SET reserved = reserved - ?
WHERE product_id = ?
input:
params:
- ${item.quantity}
- ${item.productId}
# Create Fulfillment Request
- member: create-fulfillment
condition: ${create-order.success}
type: API
config:
url: "${env.FULFILLMENT_API}/orders"
method: POST
headers:
Authorization: "Bearer ${env.FULFILLMENT_KEY}"
input:
body:
orderId: ${create-order.output.results[0].id}
items: ${input.items}
shippingAddress: ${input.shippingAddress}
priority: ${input.priority || 'standard'}
state:
set: [fulfillmentStatus]
# Send Confirmation Email
- member: send-confirmation
condition: ${create-order.success}
type: API
config:
url: "${env.EMAIL_SERVICE_URL}/send"
method: POST
input:
body:
to: ${input.customerEmail}
template: "order_confirmation"
data:
orderNumber: ${create-order.output.results[0].id}
items: ${input.items}
total: ${calculate-total(input.items)}
estimatedDelivery: ${calculate-delivery-date()}
# Send to Analytics
- member: track-order
condition: ${create-order.success}
type: API
config:
url: "${env.ANALYTICS_URL}/events"
method: POST
input:
body:
event: "order_placed"
userId: ${input.customerId}
properties:
orderId: ${create-order.output.results[0].id}
total: ${calculate-total(input.items)}
items: ${input.items.length}
output:
success: ${create-order.success}
orderId: ${create-order.output.results[0]?.id}
paymentIntent: ${process-payment.output.data?.id}
fulfillmentId: ${create-fulfillment.output.data?.id}
error: ${!create-order.success ? 'Order processing failed' : null}
Inventory Management
Copy
name: check-and-alert-inventory
description: Monitor inventory and alert on low stock
flow:
# Check low stock items
- member: check-low-stock
type: Data
config:
storage: d1
operation: query
query: |
SELECT product_id, product_name, quantity, reorder_level
FROM inventory
WHERE quantity <= reorder_level
# Get product details
- member: enrich-products
condition: ${check-low-stock.output.results.length > 0}
foreach: ${check-low-stock.output.results}
type: Data
config:
storage: d1
operation: query
query: |
SELECT * FROM products WHERE id = ?
input:
params: [${item.product_id}]
# Send alerts
- member: alert-low-stock
condition: ${check-low-stock.output.results.length > 0}
type: API
config:
url: "${env.SLACK_WEBHOOK}"
method: POST
input:
body:
text: "⚠️ Low Stock Alert"
blocks:
- type: "section"
text:
type: "mrkdwn"
text: |
*Low Stock Items*
${check-low-stock.output.results.map(item =>
`• ${item.product_name}: ${item.quantity} units`
).join('\n')}
output:
lowStockItems: ${check-low-stock.output.results}
Order Cancellation
Copy
name: cancel-order
description: Cancel order and refund
flow:
# Get order details
- member: get-order
type: Data
config:
storage: d1
operation: query
query: |
SELECT * FROM orders WHERE id = ? AND status != 'cancelled'
input:
params: [${input.orderId}]
# Check if cancellable
- member: check-cancellable
condition: ${get-order.output.results.length > 0}
type: Function
input:
order: ${get-order.output.results[0]}
status: ${get-order.output.results[0].status}
# Refund payment
- member: refund-payment
condition: ${check-cancellable.output.canCancel}
type: API
config:
url: "https://api.stripe.com/v1/refunds"
method: POST
headers:
Authorization: "Bearer ${env.STRIPE_SECRET_KEY}"
input:
body:
payment_intent: ${get-order.output.results[0].payment_intent_id}
# Restore inventory
- member: get-order-items
condition: ${refund-payment.success}
type: Data
config:
storage: d1
operation: query
query: |
SELECT product_id, quantity FROM order_items
WHERE order_id = ?
input:
params: [${input.orderId}]
- member: restore-inventory
condition: ${get-order-items.success}
foreach: ${get-order-items.output.results}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE inventory
SET quantity = quantity + ?
WHERE product_id = ?
input:
params:
- ${item.quantity}
- ${item.product_id}
# Update order status
- member: update-order-status
condition: ${refund-payment.success}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE orders
SET status = 'cancelled', cancelled_at = CURRENT_TIMESTAMP
WHERE id = ?
input:
params: [${input.orderId}]
# Notify customer
- member: send-cancellation-email
condition: ${update-order-status.success}
type: API
config:
url: "${env.EMAIL_SERVICE_URL}/send"
method: POST
input:
body:
to: ${input.customerEmail}
template: "order_cancelled"
data:
orderNumber: ${input.orderId}
refundAmount: ${get-order.output.results[0].total}
output:
cancelled: ${update-order-status.success}
refunded: ${refund-payment.success}
Product Recommendations
Copy
name: product-recommendations
description: AI-powered product recommendations
flow:
# Get customer purchase history
- member: get-purchase-history
type: Data
config:
storage: d1
operation: query
query: |
SELECT p.* FROM products p
JOIN order_items oi ON p.id = oi.product_id
JOIN orders o ON oi.order_id = o.id
WHERE o.customer_id = ?
ORDER BY o.created_at DESC
LIMIT 10
input:
params: [${input.customerId}]
# Get similar products via embeddings
- member: find-similar
type: RAG
config:
vectorizeBinding: "VECTORIZE"
indexName: "products"
operation: query
input:
query: ${get-purchase-history.output.results.map(p => p.name).join(', ')}
topK: 10
filters:
inStock: true
# Generate personalized recommendations
- member: generate-recommendations
type: Think
config:
provider: openai
model: gpt-4o
input:
prompt: |
Based on this customer's purchase history:
${JSON.stringify(get-purchase-history.output.results)}
And these similar products:
${JSON.stringify(find-similar.output.results)}
Generate 5 personalized product recommendations with reasons.
output:
recommendations: ${generate-recommendations.output.products}
Testing
Copy
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('process-order', () => {
it('should process order successfully', async () => {
const conductor = await TestConductor.create({
mocks: {
db: {
inventory: [
{ product_id: '1', quantity: 100 }
]
},
http: {
responses: {
'https://api.stripe.com/v1/payment_intents': {
status: 200,
data: { id: 'pi_123', status: 'succeeded' }
}
}
}
}
});
const result = await conductor.executeEnsemble('process-order', {
customerId: 'cus_123',
customerEmail: 'customer@example.com',
items: [
{ productId: '1', quantity: 2, price: 29.99 }
],
paymentMethodId: 'pm_123',
shippingAddress: {
street: '123 Main St',
city: 'San Francisco',
state: 'CA',
zip: '94102'
}
});
expect(result).toBeSuccessful();
expect(result.output.orderId).toBeDefined();
expect(result.output.paymentIntent).toBe('pi_123');
});
it('should rollback on payment failure', async () => {
const conductor = await TestConductor.create({
mocks: {
http: {
responses: {
'https://api.stripe.com/v1/payment_intents': {
status: 402,
error: 'Payment failed'
}
}
}
}
});
const result = await conductor.executeEnsemble('process-order', {
customerId: 'cus_123',
items: [{ productId: '1', quantity: 2 }]
});
expect(result.output.success).toBe(false);
// Verify inventory was released
});
});
Best Practices
- Atomic operations - Ensure consistency across steps
- Rollback on failure - Restore state when errors occur
- Idempotency - Handle duplicate requests safely
- Inventory locking - Prevent overselling
- Payment verification - Confirm payment before fulfillment
- Customer notifications - Keep customers informed
- Error logging - Track failures for debugging
- Monitoring - Alert on critical issues

