Overview
Migrate to Conductor v1.0.0 from other orchestration frameworks or upgrade from preview versions. This guide covers migration patterns, code transformations, and compatibility considerations.From Conductor Preview to v1.0.0
Breaking Changes
1. Member Type Names
Copy
# ❌ Preview version
- member: ai-task
type: AI # Old name
# ✅ v1.0.0
- member: ai-task
type: Think # New name
2. Configuration Structure
Copy
# ❌ Preview version
- member: fetch
type: HTTP
url: "https://api.example.com"
method: GET
# ✅ v1.0.0
- member: fetch
type: Fetch
config:
url: "https://api.example.com"
method: GET
3. State Management
Copy
# ❌ Preview version
- member: process
useState: ["data"]
setState: ["result"]
# ✅ v1.0.0
- member: process
state:
use: [data]
set: [result]
4. Error Handling
Copy
# ❌ Preview version
- member: risky
ignoreErrors: true
# ✅ v1.0.0
- member: risky
continue_on_error: true
Migration Script
Copy
// migrate-ensemble.ts
import fs from 'fs';
import yaml from 'yaml';
interface OldEnsemble {
name: string;
members: any[];
}
function migrateEnsemble(oldPath: string, newPath: string): void {
const content = fs.readFileSync(oldPath, 'utf-8');
const ensemble = yaml.parse(content) as OldEnsemble;
// Migrate member types
ensemble.members = ensemble.members.map(member => {
// Type name changes
if (member.type === 'AI') member.type = 'Think';
if (member.type === 'HTTP') member.type = 'Fetch';
// Configuration structure
if (member.url || member.method || member.headers) {
member.config = {
url: member.url,
method: member.method,
headers: member.headers
};
delete member.url;
delete member.method;
delete member.headers;
}
// State management
if (member.useState || member.setState) {
member.state = {
use: member.useState,
set: member.setState
};
delete member.useState;
delete member.setState;
}
// Error handling
if (member.ignoreErrors !== undefined) {
member.continue_on_error = member.ignoreErrors;
delete member.ignoreErrors;
}
return member;
});
// Write migrated ensemble
fs.writeFileSync(newPath, yaml.stringify(ensemble));
console.log(`Migrated ${oldPath} → ${newPath}`);
}
// Usage
migrateEnsemble(
'ensembles-old/my-workflow.yaml',
'ensembles/my-workflow.yaml'
);
Update Dependencies
Copy
# Update package.json
npm install @ensemble-edge/conductor@1.0.0
# Update wrangler.toml
cat > wrangler.toml <<EOF
name = "my-conductor-app"
main = "src/index.ts"
compatibility_date = "2024-01-01"
[ai]
binding = "AI"
[[d1_databases]]
binding = "DB"
database_name = "production-db"
database_id = "your-database-id"
EOF
# Redeploy
npx wrangler deploy
From Temporal
Workflow → Ensemble
Copy
# ❌ Temporal (Python)
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Validate order
await workflow.execute_activity(
validate_order,
order_id,
start_to_close_timeout=timedelta(seconds=30)
)
# Process payment
payment_result = await workflow.execute_activity(
process_payment,
order_id,
start_to_close_timeout=timedelta(seconds=60)
)
# Ship order
await workflow.execute_activity(
ship_order,
order_id,
start_to_close_timeout=timedelta(seconds=30)
)
return "completed"
Copy
# ✅ Conductor (YAML)
name: order-workflow
description: Process customer order
flow:
# Validate order
- member: validate-order
type: Function
input:
orderId: ${input.orderId}
# Process payment
- member: process-payment
type: API
config:
url: "${env.PAYMENT_API}/charge"
method: POST
timeout: 60000
input:
body:
orderId: ${input.orderId}
# Ship order
- member: ship-order
type: Function
input:
orderId: ${input.orderId}
paymentId: ${process-payment.output.id}
output:
status: "completed"
orderId: ${input.orderId}
paymentId: ${process-payment.output.id}
Activities → Members
Copy
# ❌ Temporal Activity
@activity.defn
async def send_email(to: str, subject: str, body: str) -> None:
async with aiohttp.ClientSession() as session:
await session.post(
"https://api.sendgrid.com/v3/mail/send",
json={
"to": to,
"subject": subject,
"body": body
}
)
Copy
# ✅ Conductor Member
- member: send-email
type: API
config:
url: "https://api.sendgrid.com/v3/mail/send"
method: POST
headers:
Authorization: "Bearer ${env.SENDGRID_API_KEY}"
input:
body:
to: ${input.to}
subject: ${input.subject}
body: ${input.body}
Signals → HITL
Copy
# ❌ Temporal Signal
@workflow.defn
class ApprovalWorkflow:
def __init__(self) -> None:
self._approved = False
@workflow.signal
async def approve(self) -> None:
self._approved = True
@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self._approved)
return "approved"
Copy
# ✅ Conductor HITL
name: approval-workflow
description: Wait for human approval
flow:
- member: request-approval
type: HITL
config:
prompt: "Approve this request?"
fields:
- name: approved
type: boolean
label: "Approve?"
- name: comments
type: textarea
label: "Comments"
- member: process-approval
condition: ${request-approval.output.approved}
type: Function
output:
status: ${request-approval.output.approved ? 'approved' : 'rejected'}
From AWS Step Functions
State Machine → Ensemble
Copy
// ❌ Step Functions (JSON)
{
"Comment": "Order processing workflow",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder",
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ProcessPayment",
"Next": "CheckPaymentStatus"
},
"CheckPaymentStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.paymentStatus",
"StringEquals": "SUCCESS",
"Next": "ShipOrder"
}
],
"Default": "PaymentFailed"
},
"ShipOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ShipOrder",
"End": true
},
"PaymentFailed": {
"Type": "Fail",
"Error": "PaymentFailed"
}
}
}
Copy
# ✅ Conductor (YAML)
name: order-processing
description: Process customer order
flow:
- member: validate-order
type: Function
- member: process-payment
type: API
config:
url: "${env.PAYMENT_API}/charge"
method: POST
- member: ship-order
condition: ${process-payment.output.status === 'SUCCESS'}
type: Function
- member: handle-payment-failure
condition: ${process-payment.output.status !== 'SUCCESS'}
type: Function
input:
error: "Payment failed"
output:
success: ${process-payment.output.status === 'SUCCESS'}
orderId: ${input.orderId}
Parallel States → Parallel Flow
Copy
// ❌ Step Functions
{
"Type": "Parallel",
"Branches": [
{
"StartAt": "Task1",
"States": { "Task1": { "Type": "Task", "Resource": "arn:..." } }
},
{
"StartAt": "Task2",
"States": { "Task2": { "Type": "Task", "Resource": "arn:..." } }
}
]
}
Copy
# ✅ Conductor
parallel:
- member: task1
type: Function
- member: task2
type: Function
From Langchain
Chain → Ensemble
Copy
# ❌ Langchain (Python)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
llm = OpenAI(temperature=0.7)
# Multi-step chain
prompt1 = PromptTemplate(
input_variables=["topic"],
template="Generate 3 blog post ideas about {topic}"
)
chain1 = LLMChain(llm=llm, prompt=prompt1)
prompt2 = PromptTemplate(
input_variables=["ideas"],
template="Pick the best idea from: {ideas}"
)
chain2 = LLMChain(llm=llm, prompt=prompt2)
ideas = chain1.run(topic="AI")
best = chain2.run(ideas=ideas)
Copy
# ✅ Conductor (YAML)
name: blog-post-ideas
description: Generate and select blog post ideas
flow:
- member: generate-ideas
type: Think
config:
provider: openai
model: gpt-4o
temperature: 0.7
input:
prompt: "Generate 3 blog post ideas about ${input.topic}"
- member: select-best
type: Think
config:
provider: openai
model: gpt-4o
temperature: 0.7
input:
prompt: |
Pick the best idea from:
${generate-ideas.output.text}
output:
bestIdea: ${select-best.output.text}
RAG → RAG Member
Copy
# ❌ Langchain RAG
from langchain.vectorstores import Pinecone
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
embeddings = OpenAIEmbeddings()
vectorstore = Pinecone.from_existing_index("my-index", embeddings)
qa = RetrievalQA.from_chain_type(
llm=OpenAI(),
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)
result = qa.run("What is Conductor?")
Copy
# ✅ Conductor RAG
name: rag-query
description: Query knowledge base
flow:
- member: search-knowledge
type: RAG
config:
vectorizeBinding: "VECTORIZE"
indexName: "my-index"
operation: query
input:
query: ${input.question}
topK: 3
- member: generate-answer
type: Think
config:
provider: openai
model: gpt-4o
input:
prompt: |
Context:
${search-knowledge.output.results.map(r => r.text).join('\n\n')}
Question: ${input.question}
Answer:
output:
answer: ${generate-answer.output.text}
From n8n
Workflow → Ensemble
Copy
// ❌ n8n (JSON)
{
"nodes": [
{
"name": "HTTP Request",
"type": "n8n-nodes-base.httpRequest",
"parameters": {
"url": "https://api.example.com/data",
"method": "GET"
}
},
{
"name": "AI Transform",
"type": "n8n-nodes-base.openAi",
"parameters": {
"operation": "chat",
"text": "Summarize: {{$json.body}}"
}
},
{
"name": "Save to Database",
"type": "n8n-nodes-base.postgres",
"parameters": {
"operation": "insert",
"table": "summaries"
}
}
]
}
Copy
# ✅ Conductor (YAML)
name: fetch-and-summarize
description: Fetch data, summarize, and save
flow:
- member: fetch-data
type: Fetch
config:
url: "https://api.example.com/data"
method: GET
- member: summarize
type: Think
config:
provider: openai
model: gpt-4o
input:
prompt: "Summarize: ${fetch-data.output.body}"
- member: save-summary
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO summaries (content, created_at)
VALUES (?, CURRENT_TIMESTAMP)
input:
params: [${summarize.output.text}]
output:
summary: ${summarize.output.text}
Deployment Migration
From Docker to Cloudflare Workers
Copy
# ❌ Old Dockerfile
FROM node:18
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["npm", "start"]
Copy
# ✅ Cloudflare Workers
# No Docker needed! Deploy directly to edge:
# Install Wrangler
npm install -g wrangler
# Deploy
npx wrangler deploy
# Update on every push
git push # GitHub Actions deploys automatically
Environment Variables
Copy
# ❌ Docker .env file
OPENAI_API_KEY=sk-...
DATABASE_URL=postgres://...
# ✅ Cloudflare Secrets
echo "sk-..." | npx wrangler secret put OPENAI_API_KEY
Testing Migration
Update Test Framework
Copy
// ❌ Old testing approach
import { createMockWorkflow } from 'old-framework';
test('workflow executes', async () => {
const workflow = createMockWorkflow('my-workflow');
const result = await workflow.execute({ input: 'test' });
expect(result.success).toBe(true);
});
Copy
// ✅ Conductor testing
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('my-workflow', () => {
it('should execute successfully', async () => {
const conductor = await TestConductor.create();
const result = await conductor.executeEnsemble('my-workflow', {
input: 'test'
});
expect(result).toBeSuccessful();
expect(result.output).toBeDefined();
});
});
Best Practices
- Migrate incrementally - One workflow at a time
- Test thoroughly - Use TestConductor for each migrated workflow
- Keep both systems running - Parallel deployment during migration
- Monitor closely - Watch for errors after migration
- Document changes - Track what was migrated and when
- Train team - Ensure everyone understands new patterns
- Use feature flags - Gradual rollout of migrated workflows
- Backup data - Before migrating state/data
Migration Checklist
- Review breaking changes documentation
- Update dependencies to v1.0.0
- Migrate ensemble definitions
- Update member type names
- Restructure configuration
- Update state management syntax
- Update error handling
- Migrate tests to TestConductor
- Update CI/CD pipelines
- Set up Cloudflare secrets
- Deploy to staging
- Test thoroughly
- Monitor staging
- Deploy to production
- Update documentation
- Train team

