Skip to main content

Overview

Migrate to Conductor v1.0.0 from other orchestration frameworks or upgrade from preview versions. This guide covers migration patterns, code transformations, and compatibility considerations.

From Conductor Preview to v1.0.0

Breaking Changes

1. Member Type Names

# ❌ Preview version
- member: ai-task
  type: AI  # Old name

# ✅ v1.0.0
- member: ai-task
  type: Think  # New name

2. Configuration Structure

# ❌ Preview version
- member: fetch
  type: HTTP
  url: "https://api.example.com"
  method: GET

# ✅ v1.0.0
- member: fetch
  type: Fetch
  config:
    url: "https://api.example.com"
    method: GET

3. State Management

# ❌ Preview version
- member: process
  useState: ["data"]
  setState: ["result"]

# ✅ v1.0.0
- member: process
  state:
    use: [data]
    set: [result]

4. Error Handling

# ❌ Preview version
- member: risky
  ignoreErrors: true

# ✅ v1.0.0
- member: risky
  continue_on_error: true

Migration Script

// migrate-ensemble.ts
import fs from 'fs';
import yaml from 'yaml';

interface OldEnsemble {
  name: string;
  members: any[];
}

function migrateEnsemble(oldPath: string, newPath: string): void {
  const content = fs.readFileSync(oldPath, 'utf-8');
  const ensemble = yaml.parse(content) as OldEnsemble;

  // Migrate member types
  ensemble.members = ensemble.members.map(member => {
    // Type name changes
    if (member.type === 'AI') member.type = 'Think';
    if (member.type === 'HTTP') member.type = 'Fetch';

    // Configuration structure
    if (member.url || member.method || member.headers) {
      member.config = {
        url: member.url,
        method: member.method,
        headers: member.headers
      };
      delete member.url;
      delete member.method;
      delete member.headers;
    }

    // State management
    if (member.useState || member.setState) {
      member.state = {
        use: member.useState,
        set: member.setState
      };
      delete member.useState;
      delete member.setState;
    }

    // Error handling
    if (member.ignoreErrors !== undefined) {
      member.continue_on_error = member.ignoreErrors;
      delete member.ignoreErrors;
    }

    return member;
  });

  // Write migrated ensemble
  fs.writeFileSync(newPath, yaml.stringify(ensemble));
  console.log(`Migrated ${oldPath}${newPath}`);
}

// Usage
migrateEnsemble(
  'ensembles-old/my-workflow.yaml',
  'ensembles/my-workflow.yaml'
);

Update Dependencies

# Update package.json
npm install @ensemble-edge/conductor@1.0.0

# Update wrangler.toml
cat > wrangler.toml <<EOF
name = "my-conductor-app"
main = "src/index.ts"
compatibility_date = "2024-01-01"

[ai]
binding = "AI"

[[d1_databases]]
binding = "DB"
database_name = "production-db"
database_id = "your-database-id"
EOF

# Redeploy
npx wrangler deploy

From Temporal

Workflow → Ensemble

# ❌ Temporal (Python)
@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Validate order
        await workflow.execute_activity(
            validate_order,
            order_id,
            start_to_close_timeout=timedelta(seconds=30)
        )

        # Process payment
        payment_result = await workflow.execute_activity(
            process_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=60)
        )

        # Ship order
        await workflow.execute_activity(
            ship_order,
            order_id,
            start_to_close_timeout=timedelta(seconds=30)
        )

        return "completed"
# ✅ Conductor (YAML)
name: order-workflow
description: Process customer order

flow:
  # Validate order
  - member: validate-order
    type: Function
    input:
      orderId: ${input.orderId}

  # Process payment
  - member: process-payment
    type: API
    config:
      url: "${env.PAYMENT_API}/charge"
      method: POST
      timeout: 60000
    input:
      body:
        orderId: ${input.orderId}

  # Ship order
  - member: ship-order
    type: Function
    input:
      orderId: ${input.orderId}
      paymentId: ${process-payment.output.id}

output:
  status: "completed"
  orderId: ${input.orderId}
  paymentId: ${process-payment.output.id}

Activities → Members

# ❌ Temporal Activity
@activity.defn
async def send_email(to: str, subject: str, body: str) -> None:
    async with aiohttp.ClientSession() as session:
        await session.post(
            "https://api.sendgrid.com/v3/mail/send",
            json={
                "to": to,
                "subject": subject,
                "body": body
            }
        )
# ✅ Conductor Member
- member: send-email
  type: API
  config:
    url: "https://api.sendgrid.com/v3/mail/send"
    method: POST
    headers:
      Authorization: "Bearer ${env.SENDGRID_API_KEY}"
  input:
    body:
      to: ${input.to}
      subject: ${input.subject}
      body: ${input.body}

Signals → HITL

# ❌ Temporal Signal
@workflow.defn
class ApprovalWorkflow:
    def __init__(self) -> None:
        self._approved = False

    @workflow.signal
    async def approve(self) -> None:
        self._approved = True

    @workflow.run
    async def run(self) -> str:
        await workflow.wait_condition(lambda: self._approved)
        return "approved"
# ✅ Conductor HITL
name: approval-workflow
description: Wait for human approval

flow:
  - member: request-approval
    type: HITL
    config:
      prompt: "Approve this request?"
      fields:
        - name: approved
          type: boolean
          label: "Approve?"
        - name: comments
          type: textarea
          label: "Comments"

  - member: process-approval
    condition: ${request-approval.output.approved}
    type: Function

output:
  status: ${request-approval.output.approved ? 'approved' : 'rejected'}

From AWS Step Functions

State Machine → Ensemble

// ❌ Step Functions (JSON)
{
  "Comment": "Order processing workflow",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder",
      "Next": "ProcessPayment"
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ProcessPayment",
      "Next": "CheckPaymentStatus"
    },
    "CheckPaymentStatus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.paymentStatus",
          "StringEquals": "SUCCESS",
          "Next": "ShipOrder"
        }
      ],
      "Default": "PaymentFailed"
    },
    "ShipOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ShipOrder",
      "End": true
    },
    "PaymentFailed": {
      "Type": "Fail",
      "Error": "PaymentFailed"
    }
  }
}
# ✅ Conductor (YAML)
name: order-processing
description: Process customer order

flow:
  - member: validate-order
    type: Function

  - member: process-payment
    type: API
    config:
      url: "${env.PAYMENT_API}/charge"
      method: POST

  - member: ship-order
    condition: ${process-payment.output.status === 'SUCCESS'}
    type: Function

  - member: handle-payment-failure
    condition: ${process-payment.output.status !== 'SUCCESS'}
    type: Function
    input:
      error: "Payment failed"

output:
  success: ${process-payment.output.status === 'SUCCESS'}
  orderId: ${input.orderId}

Parallel States → Parallel Flow

// ❌ Step Functions
{
  "Type": "Parallel",
  "Branches": [
    {
      "StartAt": "Task1",
      "States": { "Task1": { "Type": "Task", "Resource": "arn:..." } }
    },
    {
      "StartAt": "Task2",
      "States": { "Task2": { "Type": "Task", "Resource": "arn:..." } }
    }
  ]
}
# ✅ Conductor
parallel:
  - member: task1
    type: Function
  - member: task2
    type: Function

From Langchain

Chain → Ensemble

# ❌ Langchain (Python)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI

llm = OpenAI(temperature=0.7)

# Multi-step chain
prompt1 = PromptTemplate(
    input_variables=["topic"],
    template="Generate 3 blog post ideas about {topic}"
)
chain1 = LLMChain(llm=llm, prompt=prompt1)

prompt2 = PromptTemplate(
    input_variables=["ideas"],
    template="Pick the best idea from: {ideas}"
)
chain2 = LLMChain(llm=llm, prompt=prompt2)

ideas = chain1.run(topic="AI")
best = chain2.run(ideas=ideas)
# ✅ Conductor (YAML)
name: blog-post-ideas
description: Generate and select blog post ideas

flow:
  - member: generate-ideas
    type: Think
    config:
      provider: openai
      model: gpt-4o
      temperature: 0.7
    input:
      prompt: "Generate 3 blog post ideas about ${input.topic}"

  - member: select-best
    type: Think
    config:
      provider: openai
      model: gpt-4o
      temperature: 0.7
    input:
      prompt: |
        Pick the best idea from:
        ${generate-ideas.output.text}

output:
  bestIdea: ${select-best.output.text}

RAG → RAG Member

# ❌ Langchain RAG
from langchain.vectorstores import Pinecone
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA

embeddings = OpenAIEmbeddings()
vectorstore = Pinecone.from_existing_index("my-index", embeddings)

qa = RetrievalQA.from_chain_type(
    llm=OpenAI(),
    retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)

result = qa.run("What is Conductor?")
# ✅ Conductor RAG
name: rag-query
description: Query knowledge base

flow:
  - member: search-knowledge
    type: RAG
    config:
      vectorizeBinding: "VECTORIZE"
      indexName: "my-index"
      operation: query
    input:
      query: ${input.question}
      topK: 3

  - member: generate-answer
    type: Think
    config:
      provider: openai
      model: gpt-4o
    input:
      prompt: |
        Context:
        ${search-knowledge.output.results.map(r => r.text).join('\n\n')}

        Question: ${input.question}

        Answer:

output:
  answer: ${generate-answer.output.text}

From n8n

Workflow → Ensemble

// ❌ n8n (JSON)
{
  "nodes": [
    {
      "name": "HTTP Request",
      "type": "n8n-nodes-base.httpRequest",
      "parameters": {
        "url": "https://api.example.com/data",
        "method": "GET"
      }
    },
    {
      "name": "AI Transform",
      "type": "n8n-nodes-base.openAi",
      "parameters": {
        "operation": "chat",
        "text": "Summarize: {{$json.body}}"
      }
    },
    {
      "name": "Save to Database",
      "type": "n8n-nodes-base.postgres",
      "parameters": {
        "operation": "insert",
        "table": "summaries"
      }
    }
  ]
}
# ✅ Conductor (YAML)
name: fetch-and-summarize
description: Fetch data, summarize, and save

flow:
  - member: fetch-data
    type: Fetch
    config:
      url: "https://api.example.com/data"
      method: GET

  - member: summarize
    type: Think
    config:
      provider: openai
      model: gpt-4o
    input:
      prompt: "Summarize: ${fetch-data.output.body}"

  - member: save-summary
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO summaries (content, created_at)
        VALUES (?, CURRENT_TIMESTAMP)
    input:
      params: [${summarize.output.text}]

output:
  summary: ${summarize.output.text}

Deployment Migration

From Docker to Cloudflare Workers

# ❌ Old Dockerfile
FROM node:18
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["npm", "start"]
# ✅ Cloudflare Workers
# No Docker needed! Deploy directly to edge:

# Install Wrangler
npm install -g wrangler

# Deploy
npx wrangler deploy

# Update on every push
git push  # GitHub Actions deploys automatically

Environment Variables

# ❌ Docker .env file
OPENAI_API_KEY=sk-...
DATABASE_URL=postgres://...

# ✅ Cloudflare Secrets
echo "sk-..." | npx wrangler secret put OPENAI_API_KEY

Testing Migration

Update Test Framework

// ❌ Old testing approach
import { createMockWorkflow } from 'old-framework';

test('workflow executes', async () => {
  const workflow = createMockWorkflow('my-workflow');
  const result = await workflow.execute({ input: 'test' });
  expect(result.success).toBe(true);
});
// ✅ Conductor testing
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('my-workflow', () => {
  it('should execute successfully', async () => {
    const conductor = await TestConductor.create();

    const result = await conductor.executeEnsemble('my-workflow', {
      input: 'test'
    });

    expect(result).toBeSuccessful();
    expect(result.output).toBeDefined();
  });
});

Best Practices

  1. Migrate incrementally - One workflow at a time
  2. Test thoroughly - Use TestConductor for each migrated workflow
  3. Keep both systems running - Parallel deployment during migration
  4. Monitor closely - Watch for errors after migration
  5. Document changes - Track what was migrated and when
  6. Train team - Ensure everyone understands new patterns
  7. Use feature flags - Gradual rollout of migrated workflows
  8. Backup data - Before migrating state/data

Migration Checklist

  • Review breaking changes documentation
  • Update dependencies to v1.0.0
  • Migrate ensemble definitions
  • Update member type names
  • Restructure configuration
  • Update state management syntax
  • Update error handling
  • Migrate tests to TestConductor
  • Update CI/CD pipelines
  • Set up Cloudflare secrets
  • Deploy to staging
  • Test thoroughly
  • Monitor staging
  • Deploy to production
  • Update documentation
  • Train team

Getting Help