Skip to main content

Overview

Monitor Conductor workflows with metrics, logging, tracing, and alerting. Gain visibility into execution performance, error rates, and system health.

CloudFlare Analytics

Workers Analytics Dashboard

Access in Cloudflare Dashboard:
  • Request volume and rate
  • Response time (p50, p95, p99)
  • Error rate and types
  • CPU time usage
  • Subrequest count

Enable Analytics

Analytics are automatically enabled for all Workers deployments.

Custom Metrics

import { Conductor } from '@ensemble-edge/conductor';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const startTime = Date.now();
    const conductor = new Conductor({ env });

    try {
      const result = await conductor.executeEnsemble('my-workflow', await request.json());

      // Log metrics
      const duration = Date.now() - startTime;
      console.log(JSON.stringify({
        ensemble: 'my-workflow',
        duration,
        success: true,
        timestamp: Date.now()
      }));

      return Response.json(result);
    } catch (error) {
      // Log error
      console.error(JSON.stringify({
        ensemble: 'my-workflow',
        error: error.message,
        timestamp: Date.now()
      }));

      return Response.json({ error: error.message }, { status: 500 });
    }
  }
};

Logging

Structured Logging

- member: log-execution
  type: Function
  input:
    level: "info"
    message: "Processing order"
    context:
      orderId: ${input.orderId}
      customerId: ${input.customerId}
      items: ${input.items.length}
      timestamp: ${Date.now()}

Log Levels

// Info - general information
console.log('Processing started');

// Warn - potential issues
console.warn('High latency detected');

// Error - failures
console.error('Payment failed', error);

// Debug - detailed debugging
console.debug('State:', state);

Log Aggregation

- member: aggregate-logs
  type: Data
  config:
    storage: d1
    operation: query
    query: |
      INSERT INTO execution_log (
        ensemble_name, duration, status, timestamp
      ) VALUES (?, ?, ?, CURRENT_TIMESTAMP)
  input:
    params:
      - ${input.ensemble}
      - ${execution.duration}
      - ${execution.status}

Performance Monitoring

Track Execution Time

output:
  metrics:
    totalDuration: ${execution.duration}
    memberDurations:
      fetchData: ${fetch-data.duration}
      analyze: ${analyze.duration}
      save: ${save.duration}

Performance Thresholds

- member: check-performance
  type: Function
  input:
    duration: ${execution.duration}
    threshold: 2000  # 2 seconds

- member: alert-slow-execution
  condition: ${check-performance.output.slow}
  type: API
  config:
    url: "${env.ALERT_WEBHOOK}"
  input:
    body:
      alert: "Slow execution detected"
      duration: ${execution.duration}
      ensemble: ${input.ensemble}

Member-Level Metrics

flow:
  - member: expensive-operation
    type: Think

  - member: log-cost
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO cost_tracking (
          member, tokens, cost, timestamp
        ) VALUES (?, ?, ?, CURRENT_TIMESTAMP)
    input:
      params:
        - "expensive-operation"
        - ${expensive-operation.output.usage.total_tokens}
        - ${expensive-operation.output.usage.total_tokens * 0.00001}

Error Monitoring

Track Error Rates

- member: log-error
  type: Data
  config:
    storage: d1
    operation: query
    query: |
      INSERT INTO error_log (
        ensemble, member, error_type, message, timestamp
      ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
  input:
    params:
      - ${input.ensemble}
      - ${input.member}
      - ${error.type}
      - ${error.message}

Alert on Errors

- member: check-error-rate
  type: Data
  config:
    storage: d1
    operation: query
    query: |
      SELECT COUNT(*) as error_count
      FROM error_log
      WHERE timestamp > datetime('now', '-5 minutes')

- member: alert-high-errors
  condition: ${check-error-rate.output.results[0].error_count > 10}
  type: API
  config:
    url: "${env.PAGERDUTY_URL}"
  input:
    body:
      event_action: "trigger"
      payload:
        summary: "High error rate detected"
        severity: "critical"

Error Categories

enum ErrorCategory {
  VALIDATION = 'validation',
  AI_PROVIDER = 'ai_provider',
  DATABASE = 'database',
  EXTERNAL_API = 'external_api',
  TIMEOUT = 'timeout',
  UNKNOWN = 'unknown'
}

function categorizeError(error: Error): ErrorCategory {
  if (error.message.includes('validation')) return ErrorCategory.VALIDATION;
  if (error.message.includes('OpenAI') || error.message.includes('Anthropic')) {
    return ErrorCategory.AI_PROVIDER;
  }
  // ... more categorization
  return ErrorCategory.UNKNOWN;
}

Health Checks

Basic Health Check

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    if (request.url.endsWith('/health')) {
      const health = {
        status: 'healthy',
        timestamp: Date.now(),
        version: env.VERSION
      };

      try {
        // Check database
        await env.DB.prepare('SELECT 1').run();
        health.database = 'ok';
      } catch (error) {
        health.database = 'error';
        health.status = 'unhealthy';
      }

      try {
        // Check KV
        await env.CACHE.get('health-check');
        health.kv = 'ok';
      } catch (error) {
        health.kv = 'error';
        health.status = 'unhealthy';
      }

      return Response.json(health, {
        status: health.status === 'healthy' ? 200 : 503
      });
    }

    // ... handle other requests
  }
};

Detailed Health Check

name: health-check
description: Comprehensive system health check

flow:
  parallel:
    - member: check-database
      type: Data
      config:
        storage: d1
        operation: query
        query: "SELECT 1"
      continue_on_error: true

    - member: check-kv
      type: Data
      config:
        storage: kv
        operation: get
        binding: CACHE
      input:
        key: "health-check"
      continue_on_error: true

    - member: check-ai-provider
      type: Think
      config:
        provider: openai
        model: gpt-4o-mini
      input:
        prompt: "test"
      continue_on_error: true

output:
  status: ${check-database.success && check-kv.success && check-ai-provider.success ? 'healthy' : 'degraded'}
  components:
    database: ${check-database.success ? 'ok' : 'error'}
    kv: ${check-kv.success ? 'ok' : 'error'}
    ai: ${check-ai-provider.success ? 'ok' : 'error'}
  timestamp: ${Date.now()}

Alerting

Slack Alerts

- member: alert-slack
  type: API
  config:
    url: "${env.SLACK_WEBHOOK_URL}"
    method: POST
  input:
    body:
      text: ${input.message}
      blocks:
        - type: "section"
          text:
            type: "mrkdwn"
            text: |
              *Alert: ${input.title}*
              ${input.details}
        - type: "context"
          elements:
            - type: "mrkdwn"
              text: "Severity: ${input.severity} | Time: ${new Date().toISOString()}"

PagerDuty Integration

- member: page-oncall
  type: API
  config:
    url: "https://events.pagerduty.com/v2/enqueue"
    method: POST
    headers:
      Content-Type: "application/json"
  input:
    body:
      routing_key: ${env.PAGERDUTY_KEY}
      event_action: "trigger"
      payload:
        summary: ${input.summary}
        severity: ${input.severity}
        source: "conductor"
        custom_details: ${input.details}

Email Alerts

- member: alert-email
  type: API
  config:
    url: "${env.EMAIL_SERVICE_URL}"
    method: POST
  input:
    body:
      to: ${env.ALERT_EMAIL}
      subject: "[ALERT] ${input.title}"
      body: ${input.details}
      priority: "high"

Tracing

Trace Workflow Execution

async function executeWithTracing(
  conductor: Conductor,
  ensemble: string,
  input: any
) {
  const traceId = crypto.randomUUID();

  console.log(JSON.stringify({
    trace_id: traceId,
    event: 'execution_start',
    ensemble,
    timestamp: Date.now()
  }));

  try {
    const result = await conductor.executeEnsemble(ensemble, input);

    console.log(JSON.stringify({
      trace_id: traceId,
      event: 'execution_complete',
      duration: result.duration,
      timestamp: Date.now()
    }));

    return result;
  } catch (error) {
    console.error(JSON.stringify({
      trace_id: traceId,
      event: 'execution_error',
      error: error.message,
      timestamp: Date.now()
    }));

    throw error;
  }
}

Dashboards

Example Dashboard Queries

-- Error rate by ensemble
SELECT
  ensemble_name,
  COUNT(CASE WHEN status = 'error' THEN 1 END) * 100.0 / COUNT(*) as error_rate
FROM execution_log
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY ensemble_name;

-- Average execution time
SELECT
  ensemble_name,
  AVG(duration) as avg_duration,
  MAX(duration) as max_duration,
  MIN(duration) as min_duration
FROM execution_log
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY ensemble_name;

-- Cost tracking
SELECT
  DATE(timestamp) as date,
  SUM(cost) as total_cost
FROM cost_tracking
GROUP BY DATE(timestamp)
ORDER BY date DESC
LIMIT 30;

Best Practices

  1. Log structured data - JSON format for easy parsing
  2. Track key metrics - Duration, error rate, cost
  3. Set up alerts - Proactive error detection
  4. Monitor costs - Track AI usage and spending
  5. Health checks - Regular system health verification
  6. Trace requests - Unique IDs for debugging
  7. Aggregate logs - Central logging for analysis
  8. Dashboard visibility - Real-time metrics
  9. Alert fatigue - Tune thresholds appropriately
  10. Retain logs - Keep history for trend analysis