Skip to main content

Overview

This example demonstrates a production-ready workflow that gathers company data from multiple sources, analyzes it with AI, validates quality, and generates a comprehensive report. It showcases parallel processing, AI integration, state management, caching, and quality scoring.

Complete Workflow

name: company-intelligence-report
description: Gather and analyze comprehensive company intelligence

state:
  schema:
    companyData: object
    websiteContent: object
    analysis: object
    qualityScore: number

scoring:
  enabled: true
  defaultThresholds:
    minimum: 0.75
  maxRetries: 3
  backoffStrategy: exponential

flow:
  # Step 1: Gather data from multiple sources in parallel
  parallel:
    # Scrape official website
    - member: scrape-website
      input:
        url: "https://${input.domain}"
        output: markdown
      cache:
        ttl: 3600
      state:
        set: [websiteContent]

    # Fetch from external APIs
    - member: fetch-crunchbase
      type: API
      config:
        url: "https://api.crunchbase.com/v4/entities/organizations/${input.companyName}"
        method: GET
        headers:
          Authorization: "Bearer ${env.CRUNCHBASE_API_KEY}"
      cache:
        ttl: 86400

    # Search for news articles
    - member: fetch-news
      type: API
      config:
        url: "https://newsapi.org/v2/everything"
        method: GET
      input:
        query: ${input.companyName}
        pageSize: 5
        apiKey: ${env.NEWS_API_KEY}
      cache:
        ttl: 3600

  # Step 2: Combine gathered data
  - member: combine-data
    type: Function
    state:
      use: [websiteContent]
      set: [companyData]

  # Step 3: Analyze with AI (parallel perspectives)
  parallel:
    - member: analyze-business-model
      type: Think
      config:
        provider: anthropic
        model: claude-3-5-sonnet-20241022
        routing: cloudflare-gateway
        temperature: 0.3
        maxTokens: 1000
        responseFormat:
          type: json_object
        systemPrompt: |
          Analyze the company's business model based on the provided data.
          Return JSON with: revenueModel, targetMarket, valueProposition, competitiveAdvantage
      input:
        companyData: ${state.companyData}
        websiteContent: ${scrape-website.output.content}
        crunchbaseData: ${fetch-crunchbase.output.data}
      cache:
        ttl: 3600

    - member: analyze-market-position
      type: Think
      config:
        provider: openai
        model: gpt-4o
        routing: cloudflare-gateway
        temperature: 0.3
        maxTokens: 1000
        systemPrompt: |
          Analyze the company's market position and competitive landscape.
          Be concise and data-driven.
      input:
        companyData: ${state.companyData}
        newsArticles: ${fetch-news.output.articles}
      cache:
        ttl: 3600

    - member: analyze-technology
      type: Think
      config:
        provider: openai
        model: gpt-4o
        routing: cloudflare-gateway
        temperature: 0.3
        maxTokens: 800
        systemPrompt: |
          Analyze the company's technology stack and technical capabilities.
          Focus on innovation and technical advantages.
      input:
        websiteContent: ${scrape-website.output.content}
      cache:
        ttl: 3600

  # Step 4: Synthesize all analyses
  - member: synthesize-analysis
    type: Think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
      routing: cloudflare-gateway
      temperature: 0.5
      maxTokens: 2000
      systemPrompt: |
        Synthesize all the analysis perspectives into a comprehensive company intelligence report.
        Structure: Executive Summary, Business Model, Market Position, Technology, Opportunities, Risks, Recommendations
    input:
      businessModel: ${analyze-business-model.output}
      marketPosition: ${analyze-market-position.output}
      technology: ${analyze-technology.output}
      companyData: ${state.companyData}
    state:
      set: [analysis]
    scoring:
      evaluator: validate
      evaluatorConfig:
        type: judge
        model: gpt-4o
        systemPrompt: "Evaluate the quality and completeness of this company analysis report."
      criteria:
        completeness: "All required sections are present and detailed"
        accuracy: "Analysis is factually accurate based on provided data"
        insights: "Provides actionable insights and recommendations"
        clarity: "Report is clear, well-structured, and professional"
      thresholds:
        minimum: 0.8
      onFailure: retry
      retryLimit: 3
      requireImprovement: true

  # Step 5: Generate executive summary
  - member: generate-executive-summary
    type: Think
    config:
      provider: openai
      model: gpt-4o
      routing: cloudflare-gateway
      temperature: 0.4
      maxTokens: 300
      systemPrompt: |
        Create a concise executive summary (2-3 paragraphs) of the company analysis.
        Focus on key findings and recommendations.
    input:
      fullAnalysis: ${synthesize-analysis.output}

  # Step 6: Format final report
  - member: format-report
    type: Function
    state:
      use: [companyData, analysis]

output:
  company:
    name: ${input.companyName}
    domain: ${input.domain}
    dataGathered: ${state.companyData.timestamp}

  report:
    executiveSummary: ${generate-executive-summary.output.text}
    fullAnalysis: ${synthesize-analysis.output.text}
    businessModel: ${analyze-business-model.output}
    marketPosition: ${analyze-market-position.output}
    technology: ${analyze-technology.output}

  quality:
    analysisScore: ${synthesize-analysis.scoring.score}
    attempts: ${synthesize-analysis.scoring.attempts}
    passed: ${synthesize-analysis.scoring.passed}

  sources:
    website: ${scrape-website.cached}
    crunchbase: ${fetch-crunchbase.cached}
    news: ${fetch-news.output.totalResults}

  generatedAt: ${format-report.output.timestamp}

Member Implementations

combine-data (Function Member)

// members/combine-data/index.ts
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ input, state, setState }) {
    const { websiteContent } = state;
    const crunchbaseData = input.crunchbaseData;
    const newsArticles = input.newsArticles;

    const combined = {
      website: {
        url: websiteContent.url,
        content: websiteContent.content,
        metadata: websiteContent.metadata
      },
      crunchbase: crunchbaseData ? {
        description: crunchbaseData.properties?.short_description,
        founded: crunchbaseData.properties?.founded_on,
        employees: crunchbaseData.properties?.num_employees_enum,
        funding: crunchbaseData.properties?.total_funding_usd
      } : null,
      news: newsArticles?.map(article => ({
        title: article.title,
        description: article.description,
        url: article.url,
        publishedAt: article.publishedAt
      })) || [],
      timestamp: Date.now()
    };

    setState({ companyData: combined });

    return { success: true, sources: Object.keys(combined).length };
  }
});

format-report (Function Member)

// members/format-report/index.ts
import { createFunctionMember } from '@ensemble-edge/conductor/sdk';

export default createFunctionMember({
  async handler({ state }) {
    const { companyData, analysis } = state;

    const report = {
      metadata: {
        generatedAt: new Date().toISOString(),
        company: companyData.website.metadata.title,
        version: '1.0'
      },
      content: {
        dataQuality: {
          websiteScraped: !!companyData.website.content,
          crunchbaseData: !!companyData.crunchbase,
          newsArticles: companyData.news.length
        },
        analysis: analysis
      },
      timestamp: Date.now()
    };

    return report;
  }
});

Execution

Using API

curl -X POST https://your-worker.workers.dev/api/v1/execute \
  -H "Content-Type: application/json" \
  -d '{
    "ensemble": "company-intelligence-report",
    "input": {
      "companyName": "Acme Corp",
      "domain": "acme.com"
    }
  }'

Using SDK

import { Executor } from '@ensemble-edge/conductor';

const executor = new Executor({ env, ctx });

const result = await executor.executeEnsemble('company-intelligence-report', {
  companyName: 'Acme Corp',
  domain: 'acme.com'
});

console.log(result.output.report.executiveSummary);
console.log('Quality Score:', result.output.quality.analysisScore);
console.log('Attempts:', result.output.quality.attempts);

Key Features Demonstrated

1. Parallel Data Gathering

parallel:
  - member: scrape-website      # 2 seconds
  - member: fetch-crunchbase    # 1 second
  - member: fetch-news          # 1 second
# Total: 2 seconds (not 4 seconds)

2. State Management

state:
  schema:
    companyData: object    # Shared across members
    analysis: object       # Built up over time

3. Caching Strategy

# Website content - 1 hour
cache:
  ttl: 3600

# Crunchbase data - 24 hours
cache:
  ttl: 86400

4. Quality Scoring

scoring:
  evaluator: validate
  thresholds:
    minimum: 0.8
  onFailure: retry
  retryLimit: 3
  requireImprovement: true

5. AI Gateway Routing

config:
  routing: cloudflare-gateway  # Persistent caching + analytics

Performance Metrics

Sequential execution: ~45 seconds
  • Scrape website: 2s
  • Fetch Crunchbase: 1s
  • Fetch news: 1s
  • Combine data: 0.1s
  • Analyze business (AI): 15s
  • Analyze market (AI): 15s
  • Analyze tech (AI): 10s
  • Synthesize (AI): 20s
  • Generate summary (AI): 5s
  • Format: 0.1s
Parallel execution: ~25 seconds
  • Gather data (parallel): 2s
  • Combine: 0.1s
  • Analyze (parallel): 15s
  • Synthesize: 20s
  • Summary: 5s
  • Format: 0.1s
With caching: ~5 seconds (data + AI responses cached)

Cost Optimization

Without Optimization

  • GPT-4o calls: 4 × 0.005/1Ktokens×1Ktokens=0.005/1K tokens × 1K tokens = 0.02
  • Claude calls: 2 × 0.003/1Ktokens×1.5Ktokens=0.003/1K tokens × 1.5K tokens = 0.009
  • Total per execution: ~$0.03

With Optimization

  • Cache hit rate: 80% (repeated companies)
  • Effective cost: $0.006 per execution
  • Savings: 80%

Testing

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('company-intelligence-report', () => {
  it('should generate comprehensive report', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        ai: {
          responses: {
            'analyze-business-model': {
              revenueModel: 'SaaS subscription',
              targetMarket: 'Enterprise',
              valueProposition: 'Efficiency gains',
              competitiveAdvantage: 'AI-powered automation'
            },
            'analyze-market-position': {
              text: 'Strong market position in enterprise segment...'
            },
            'analyze-technology': {
              text: 'Modern tech stack with cloud-native architecture...'
            },
            'synthesize-analysis': {
              text: 'Comprehensive analysis report...',
              score: 0.9
            },
            'generate-executive-summary': {
              text: 'Acme Corp is a leading SaaS provider...'
            }
          }
        },
        http: {
          responses: {
            'https://acme.com': {
              status: 200,
              body: '<html>...</html>'
            },
            'https://api.crunchbase.com/*': {
              status: 200,
              data: { properties: { short_description: 'SaaS company' } }
            }
          }
        }
      }
    });

    const result = await conductor.executeEnsemble('company-intelligence-report', {
      companyName: 'Acme Corp',
      domain: 'acme.com'
    });

    expect(result).toBeSuccessful();
    expect(result.output.report.executiveSummary).toContain('Acme Corp');
    expect(result.output.quality.analysisScore).toBeGreaterThan(0.8);
    expect(result.output.report.businessModel.revenueModel).toBe('SaaS subscription');
  });

  it('should retry on low quality score', async () => {
    let attempts = 0;

    const conductor = await TestConductor.create({
      mocks: {
        ai: {
          handler: async (memberName) => {
            if (memberName === 'synthesize-analysis') {
              attempts++;
              return {
                text: 'Analysis...',
                score: attempts === 1 ? 0.6 : 0.9  // Low then high
              };
            }
            return { text: 'Default response' };
          }
        }
      }
    });

    const result = await conductor.executeEnsemble('company-intelligence-report', {
      companyName: 'Test Corp',
      domain: 'test.com'
    });

    expect(result).toBeSuccessful();
    expect(result.output.quality.attempts).toBe(2);
  });
});

Monitoring

// Track execution metrics
const result = await executor.executeEnsemble('company-intelligence-report', input);

console.log('Execution Time:', result.executionTime, 'ms');
console.log('Cache Hits:', {
  website: result.output.sources.website,
  crunchbase: result.output.sources.crunchbase
});
console.log('Quality Score:', result.output.quality.analysisScore);
console.log('AI Attempts:', result.output.quality.attempts);

Production Deployment

# wrangler.toml
name = "company-intelligence"
main = "src/index.ts"

[ai.gateway]
id = "company-intel-gateway"
cache_ttl = 3600

[[kv_namespaces]]
binding = "CACHE"
id = "your-kv-id"
Environment variables:
npx wrangler secret put CRUNCHBASE_API_KEY
npx wrangler secret put NEWS_API_KEY
npx wrangler secret put OPENAI_API_KEY
npx wrangler secret put ANTHROPIC_API_KEY