Skip to main content

Overview

Complete e-commerce workflow handling order placement, payment processing, inventory updates, and customer notifications with proper error handling and rollback capabilities.

Order Processing Flow

Order → Validate → Reserve Inventory → Process Payment → Fulfill → Notify

Complete Workflow

name: process-order
description: Complete order processing workflow

state:
  schema:
    order: object
    inventoryReserved: boolean
    paymentProcessed: boolean
    fulfillmentStatus: string

flow:
  # Validate Order
  - member: validate-order
    type: Function
    input:
      items: ${input.items}
      customerId: ${input.customerId}
      shippingAddress: ${input.shippingAddress}
    state:
      set: [order]

  # Check Inventory Availability
  parallel:
    - member: check-inventory
      foreach: ${input.items}
      type: Data
      config:
        storage: d1
        operation: query
        query: |
          SELECT quantity FROM inventory
          WHERE product_id = ? AND quantity >= ?
      input:
        params:
          - ${item.productId}
          - ${item.quantity}

  # Reserve Inventory
  - member: reserve-inventory
    condition: ${check-inventory.output.every(r => r.results.length > 0)}
    foreach: ${input.items}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE inventory
        SET quantity = quantity - ?,
            reserved = reserved + ?
        WHERE product_id = ?
    input:
      params:
        - ${item.quantity}
        - ${item.quantity}
        - ${item.productId}
    state:
      set: [inventoryReserved]

  # Process Payment
  - member: process-payment
    condition: ${state.inventoryReserved}
    type: API
    config:
      url: "https://api.stripe.com/v1/payment_intents"
      method: POST
      headers:
        Authorization: "Bearer ${env.STRIPE_SECRET_KEY}"
        Content-Type: "application/x-www-form-urlencoded"
    input:
      body:
        amount: ${calculate-total(input.items)}
        currency: "usd"
        customer: ${input.customerId}
        payment_method: ${input.paymentMethodId}
        confirm: true
    state:
      set: [paymentProcessed]

  # Handle Payment Failure - Release Inventory
  - member: release-inventory-on-payment-failure
    condition: ${!process-payment.success && state.inventoryReserved}
    foreach: ${input.items}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE inventory
        SET quantity = quantity + ?,
            reserved = reserved - ?
        WHERE product_id = ?
    input:
      params:
        - ${item.quantity}
        - ${item.quantity}
        - ${item.productId}

  # Create Order Record
  - member: create-order
    condition: ${state.paymentProcessed}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO orders (
          customer_id, total, status, payment_intent_id, created_at
        ) VALUES (?, ?, 'processing', ?, CURRENT_TIMESTAMP)
        RETURNING id
    input:
      params:
        - ${input.customerId}
        - ${calculate-total(input.items)}
        - ${process-payment.output.data.id}

  # Create Order Items
  - member: create-order-items
    condition: ${create-order.success}
    foreach: ${input.items}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO order_items (
          order_id, product_id, quantity, price
        ) VALUES (?, ?, ?, ?)
    input:
      params:
        - ${create-order.output.results[0].id}
        - ${item.productId}
        - ${item.quantity}
        - ${item.price}

  # Update Inventory - Remove from Reserved
  - member: finalize-inventory
    condition: ${create-order.success}
    foreach: ${input.items}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE inventory
        SET reserved = reserved - ?
        WHERE product_id = ?
    input:
      params:
        - ${item.quantity}
        - ${item.productId}

  # Create Fulfillment Request
  - member: create-fulfillment
    condition: ${create-order.success}
    type: API
    config:
      url: "${env.FULFILLMENT_API}/orders"
      method: POST
      headers:
        Authorization: "Bearer ${env.FULFILLMENT_KEY}"
    input:
      body:
        orderId: ${create-order.output.results[0].id}
        items: ${input.items}
        shippingAddress: ${input.shippingAddress}
        priority: ${input.priority || 'standard'}
    state:
      set: [fulfillmentStatus]

  # Send Confirmation Email
  - member: send-confirmation
    condition: ${create-order.success}
    type: API
    config:
      url: "${env.EMAIL_SERVICE_URL}/send"
      method: POST
    input:
      body:
        to: ${input.customerEmail}
        template: "order_confirmation"
        data:
          orderNumber: ${create-order.output.results[0].id}
          items: ${input.items}
          total: ${calculate-total(input.items)}
          estimatedDelivery: ${calculate-delivery-date()}

  # Send to Analytics
  - member: track-order
    condition: ${create-order.success}
    type: API
    config:
      url: "${env.ANALYTICS_URL}/events"
      method: POST
    input:
      body:
        event: "order_placed"
        userId: ${input.customerId}
        properties:
          orderId: ${create-order.output.results[0].id}
          total: ${calculate-total(input.items)}
          items: ${input.items.length}

output:
  success: ${create-order.success}
  orderId: ${create-order.output.results[0]?.id}
  paymentIntent: ${process-payment.output.data?.id}
  fulfillmentId: ${create-fulfillment.output.data?.id}
  error: ${!create-order.success ? 'Order processing failed' : null}

Inventory Management

name: check-and-alert-inventory
description: Monitor inventory and alert on low stock

flow:
  # Check low stock items
  - member: check-low-stock
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT product_id, product_name, quantity, reorder_level
        FROM inventory
        WHERE quantity <= reorder_level

  # Get product details
  - member: enrich-products
    condition: ${check-low-stock.output.results.length > 0}
    foreach: ${check-low-stock.output.results}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT * FROM products WHERE id = ?
    input:
      params: [${item.product_id}]

  # Send alerts
  - member: alert-low-stock
    condition: ${check-low-stock.output.results.length > 0}
    type: API
    config:
      url: "${env.SLACK_WEBHOOK}"
      method: POST
    input:
      body:
        text: "⚠️ Low Stock Alert"
        blocks:
          - type: "section"
            text:
              type: "mrkdwn"
              text: |
                *Low Stock Items*
                ${check-low-stock.output.results.map(item =>
                  `• ${item.product_name}: ${item.quantity} units`
                ).join('\n')}

output:
  lowStockItems: ${check-low-stock.output.results}

Order Cancellation

name: cancel-order
description: Cancel order and refund

flow:
  # Get order details
  - member: get-order
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT * FROM orders WHERE id = ? AND status != 'cancelled'
    input:
      params: [${input.orderId}]

  # Check if cancellable
  - member: check-cancellable
    condition: ${get-order.output.results.length > 0}
    type: Function
    input:
      order: ${get-order.output.results[0]}
      status: ${get-order.output.results[0].status}

  # Refund payment
  - member: refund-payment
    condition: ${check-cancellable.output.canCancel}
    type: API
    config:
      url: "https://api.stripe.com/v1/refunds"
      method: POST
      headers:
        Authorization: "Bearer ${env.STRIPE_SECRET_KEY}"
    input:
      body:
        payment_intent: ${get-order.output.results[0].payment_intent_id}

  # Restore inventory
  - member: get-order-items
    condition: ${refund-payment.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT product_id, quantity FROM order_items
        WHERE order_id = ?
    input:
      params: [${input.orderId}]

  - member: restore-inventory
    condition: ${get-order-items.success}
    foreach: ${get-order-items.output.results}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE inventory
        SET quantity = quantity + ?
        WHERE product_id = ?
    input:
      params:
        - ${item.quantity}
        - ${item.product_id}

  # Update order status
  - member: update-order-status
    condition: ${refund-payment.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE orders
        SET status = 'cancelled', cancelled_at = CURRENT_TIMESTAMP
        WHERE id = ?
    input:
      params: [${input.orderId}]

  # Notify customer
  - member: send-cancellation-email
    condition: ${update-order-status.success}
    type: API
    config:
      url: "${env.EMAIL_SERVICE_URL}/send"
      method: POST
    input:
      body:
        to: ${input.customerEmail}
        template: "order_cancelled"
        data:
          orderNumber: ${input.orderId}
          refundAmount: ${get-order.output.results[0].total}

output:
  cancelled: ${update-order-status.success}
  refunded: ${refund-payment.success}

Product Recommendations

name: product-recommendations
description: AI-powered product recommendations

flow:
  # Get customer purchase history
  - member: get-purchase-history
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT p.* FROM products p
        JOIN order_items oi ON p.id = oi.product_id
        JOIN orders o ON oi.order_id = o.id
        WHERE o.customer_id = ?
        ORDER BY o.created_at DESC
        LIMIT 10
    input:
      params: [${input.customerId}]

  # Get similar products via embeddings
  - member: find-similar
    type: RAG
    config:
      vectorizeBinding: "VECTORIZE"
      indexName: "products"
      operation: query
    input:
      query: ${get-purchase-history.output.results.map(p => p.name).join(', ')}
      topK: 10
      filters:
        inStock: true

  # Generate personalized recommendations
  - member: generate-recommendations
    type: Think
    config:
      provider: openai
      model: gpt-4o
    input:
      prompt: |
        Based on this customer's purchase history:
        ${JSON.stringify(get-purchase-history.output.results)}

        And these similar products:
        ${JSON.stringify(find-similar.output.results)}

        Generate 5 personalized product recommendations with reasons.

output:
  recommendations: ${generate-recommendations.output.products}

Testing

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('process-order', () => {
  it('should process order successfully', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        db: {
          inventory: [
            { product_id: '1', quantity: 100 }
          ]
        },
        http: {
          responses: {
            'https://api.stripe.com/v1/payment_intents': {
              status: 200,
              data: { id: 'pi_123', status: 'succeeded' }
            }
          }
        }
      }
    });

    const result = await conductor.executeEnsemble('process-order', {
      customerId: 'cus_123',
      customerEmail: 'customer@example.com',
      items: [
        { productId: '1', quantity: 2, price: 29.99 }
      ],
      paymentMethodId: 'pm_123',
      shippingAddress: {
        street: '123 Main St',
        city: 'San Francisco',
        state: 'CA',
        zip: '94102'
      }
    });

    expect(result).toBeSuccessful();
    expect(result.output.orderId).toBeDefined();
    expect(result.output.paymentIntent).toBe('pi_123');
  });

  it('should rollback on payment failure', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        http: {
          responses: {
            'https://api.stripe.com/v1/payment_intents': {
              status: 402,
              error: 'Payment failed'
            }
          }
        }
      }
    });

    const result = await conductor.executeEnsemble('process-order', {
      customerId: 'cus_123',
      items: [{ productId: '1', quantity: 2 }]
    });

    expect(result.output.success).toBe(false);
    // Verify inventory was released
  });
});

Best Practices

  1. Atomic operations - Ensure consistency across steps
  2. Rollback on failure - Restore state when errors occur
  3. Idempotency - Handle duplicate requests safely
  4. Inventory locking - Prevent overselling
  5. Payment verification - Confirm payment before fulfillment
  6. Customer notifications - Keep customers informed
  7. Error logging - Track failures for debugging
  8. Monitoring - Alert on critical issues