Skip to main content

Overview

Conductor provides production-ready observability built specifically for Cloudflare Workers. Every ensemble execution emits structured logs, metrics, and traces that integrate seamlessly with Cloudflare’s native observability platform.
Design Philosophy: Observability is not an afterthought. Conductor uses structured JSON logging that Cloudflare Workers Logs automatically captures, indexes, and makes queryable. No external services required.

Key Features

Structured JSON Logging

Automatic field extraction and indexing in Cloudflare Dashboard

Analytics Engine

Unlimited-cardinality metrics with SQL querying

Child Loggers

Scoped context for requests, executions, members

Zero Configuration

Works out-of-the-box with Cloudflare Workers

OpenTelemetry Support

Optional integration with Datadog, Honeycomb, New Relic

Performance

0.1ms per log call, async writes, no blocking

Quick Start

Basic Logging

import { createLogger } from '@ensemble-edge/conductor/observability';

const logger = createLogger({
  serviceName: 'my-service',
  environment: 'production'
});

logger.info('Ensemble execution started', {
  ensembleName: 'company-intel',
  userId: 'alice'
});

logger.error('Member execution failed', new Error('Timeout'), {
  memberName: 'fetch-data',
  attemptNumber: 3
});

With Cloudflare Workers

import { ConductorRuntime } from '@ensemble-edge/conductor';
import { createLogger } from '@ensemble-edge/conductor/observability';

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    // Create logger with Analytics Engine binding
    const logger = createLogger(
      {
        serviceName: 'conductor',
        environment: env.ENVIRONMENT,
        enableAnalytics: true
      },
      env.ANALYTICS  // Analytics Engine binding
    );

    const conductor = new ConductorRuntime({ env, ctx, logger });

    const result = await conductor.executeEnsemble('company-intel', {
      input: { domain: 'acme.com' }
    });

    return Response.json(result);
  }
};

Logger Configuration

LoggerConfig Interface

interface LoggerConfig {
  /**
   * Minimum log level to output
   * @default LogLevel.INFO (or DEBUG if DEBUG=true)
   */
  level?: LogLevel;

  /**
   * Service name for grouping logs
   * @default 'conductor'
   */
  serviceName?: string;

  /**
   * Environment (production, staging, development)
   * @default 'production'
   */
  environment?: string;

  /**
   * Enable debug mode
   * @default false (or true if DEBUG=true env var)
   */
  debug?: boolean;

  /**
   * Enable Analytics Engine metrics
   * @default true
   */
  enableAnalytics?: boolean;

  /**
   * Base context applied to all logs
   */
  baseContext?: LogContext;
}

Log Levels

enum LogLevel {
  DEBUG = 'debug',  // Development troubleshooting
  INFO = 'info',    // Normal operational events
  WARN = 'warn',    // Concerning but non-critical
  ERROR = 'error'   // Errors needing attention
}

Debug Mode

Enable debug logs via environment variable or config:
// Via environment variable
wrangler.toml:
[env.development]
DEBUG = "true"

// Via config
const logger = createLogger({
  debug: true,
  level: LogLevel.DEBUG
});

logger.debug('Detailed execution info', {
  memberName: 'analyze',
  inputSize: 1024,
  cacheStatus: 'miss'
});

Structured Logging

Log Context

Every log accepts a LogContext object for structured data:
interface LogContext {
  // Execution identifiers
  requestId?: string;
  executionId?: string;
  ensembleName?: string;
  memberName?: string;
  stepIndex?: number;
  attemptNumber?: number;

  // Performance metrics
  durationMs?: number;

  // User tracking
  userId?: string;
  sessionId?: string;

  // Custom fields
  [key: string]: unknown;
}

Example Usage

logger.info('Member execution started', {
  ensembleName: 'company-intel',
  memberName: 'analyze',
  stepIndex: 2,
  userId: 'alice',
  model: 'claude-3-5-sonnet-20241022'
});

logger.error('API request failed', error, {
  ensembleName: 'company-intel',
  memberName: 'fetch-data',
  attemptNumber: 3,
  endpoint: 'https://api.example.com',
  statusCode: 503
});

Log Entry Format

Conductor outputs structured JSON that Cloudflare automatically indexes:
{
  "timestamp": "2025-01-15T10:30:45.123Z",
  "level": "info",
  "message": "Member execution completed",
  "context": {
    "requestId": "req_abc123",
    "executionId": "exec_xyz789",
    "ensembleName": "company-intel",
    "memberName": "analyze",
    "stepIndex": 2,
    "durationMs": 1234,
    "userId": "alice"
  }
}

Error Logging

Errors include full stack traces and ConductorError details:
try {
  await member.execute(context);
} catch (error) {
  logger.error('Member execution failed', error, {
    memberName: 'analyze',
    attemptNumber: 2
  });
}
Output:
{
  "timestamp": "2025-01-15T10:30:45.123Z",
  "level": "error",
  "message": "Member execution failed",
  "context": {
    "memberName": "analyze",
    "attemptNumber": 2
  },
  "error": {
    "name": "MemberExecutionError",
    "message": "LLM request timed out",
    "stack": "MemberExecutionError: LLM request timed out\n    at ...",
    "code": "MEMBER_EXECUTION_FAILED",
    "details": {
      "memberName": "analyze",
      "timeout": 30000
    }
  }
}

Child Loggers

Create scoped loggers with inherited context:

Request-Scoped Logger

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    const baseLogger = createLogger({ serviceName: 'conductor' });

    // Create child logger with request context
    const requestLogger = baseLogger.child({
      requestId: crypto.randomUUID(),
      userId: request.headers.get('x-user-id')
    });

    // All logs inherit requestId and userId
    requestLogger.info('Request started');

    const conductor = new ConductorRuntime({
      env,
      ctx,
      logger: requestLogger
    });

    const result = await conductor.executeEnsemble('company-intel', {
      input: { domain: 'acme.com' }
    });

    requestLogger.info('Request completed', {
      durationMs: performance.now()
    });

    return Response.json(result);
  }
};

Execution-Scoped Logger

Conductor automatically creates child loggers for each execution:
// Inside Executor
const executionLogger = this.logger.child({
  executionId: crypto.randomUUID(),
  ensembleName: ensemble.name
});

executionLogger.info('Ensemble execution started', {
  inputKeys: Object.keys(input)
});

// Member execution inherits execution context
const memberLogger = executionLogger.child({
  memberName: member.name,
  stepIndex: index
});

memberLogger.info('Member execution started');

Nested Context

Child loggers can be nested infinitely:
const baseLogger = createLogger();

const requestLogger = baseLogger.child({
  requestId: 'req_123'
});

const executionLogger = requestLogger.child({
  executionId: 'exec_456',
  ensembleName: 'company-intel'
});

const memberLogger = executionLogger.child({
  memberName: 'analyze',
  stepIndex: 2
});

// Logs include all inherited context:
// { requestId, executionId, ensembleName, memberName, stepIndex }
memberLogger.info('Processing data');

Metrics with Analytics Engine

Recording Metrics

Analytics Engine provides unlimited-cardinality metrics with SQL querying:
const logger = createLogger(
  { enableAnalytics: true },
  env.ANALYTICS
);

// Record ensemble execution metric
logger.metric('ensemble.execution', {
  blobs: [
    ensemble.name,        // blob1: ensemble name
    'completed',          // blob2: status
    userId                // blob3: user ID
  ],
  doubles: [
    executionTime,        // double1: duration in ms
    memberCount,          // double2: number of members
    cacheHitRate          // double3: cache hit rate
  ],
  indexes: [
    'ensemble.execution'  // index1: metric name for fast filtering
  ]
});

// Record member execution metric
logger.metric('member.execution', {
  blobs: [memberName, 'success', model],
  doubles: [duration, tokenCount, cost],
  indexes: ['member.execution']
});

MetricDataPoint Interface

interface MetricDataPoint {
  /**
   * String values (up to 20)
   * Use for: names, statuses, IDs, models
   */
  blobs?: string[];

  /**
   * Numeric values (up to 20)
   * Use for: durations, counts, rates, costs
   */
  doubles?: number[];

  /**
   * Indexed strings (up to 20)
   * Use for: fast filtering in queries
   */
  indexes?: string[];
}

Querying Metrics

Use SQL to query Analytics Engine data:
-- Average execution time by ensemble
SELECT
  blob1 as ensemble_name,
  AVG(double1) as avg_duration_ms,
  COUNT(*) as execution_count
FROM ANALYTICS_DATASET
WHERE index1 = 'ensemble.execution'
  AND timestamp > NOW() - INTERVAL '7' DAY
GROUP BY blob1
ORDER BY avg_duration_ms DESC;
-- P95 latency by member
SELECT
  blob1 as member_name,
  APPROX_QUANTILES(double1, 100)[OFFSET(95)] as p95_latency
FROM ANALYTICS_DATASET
WHERE index1 = 'member.execution'
  AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY blob1;
-- Error rate by ensemble
SELECT
  blob1 as ensemble_name,
  blob2 as status,
  COUNT(*) as count
FROM ANALYTICS_DATASET
WHERE index1 = 'ensemble.execution'
  AND timestamp > NOW() - INTERVAL '1' HOUR
GROUP BY blob1, blob2;

Real-World Metrics Example

// Track ensemble performance
logger.metric('ensemble.performance', {
  blobs: [
    ensemble.name,
    result.success ? 'success' : 'error',
    env.ENVIRONMENT
  ],
  doubles: [
    result.metrics.totalDuration,
    result.metrics.cacheHits,
    result.metrics.members.length
  ],
  indexes: ['ensemble.performance', ensemble.name]
});

// Track member quality
if (scoringResult) {
  logger.metric('member.quality', {
    blobs: [memberName, ensemble.name],
    doubles: [
      scoringResult.score,
      scoringResult.criteria?.accuracy ?? 0,
      scoringResult.criteria?.completeness ?? 0
    ],
    indexes: ['member.quality']
  });
}

// Track costs
logger.metric('llm.cost', {
  blobs: [model, provider, ensemble.name],
  doubles: [tokenCount, cost, duration],
  indexes: ['llm.cost', provider]
});

Viewing Logs

Cloudflare Dashboard

  1. Navigate to Workers & Pages > Your Worker > Logs
  2. Use Live mode for real-time tailing
  3. Use Search mode for historical queries
Filter by context fields:
ensembleName:"company-intel" AND memberName:"analyze"
level:error AND timestamp > "2025-01-15T10:00:00Z"
userId:"alice" AND durationMs > 1000

Local Development (Wrangler)

Tail logs during local development:
# Real-time log streaming
npx wrangler tail

# Filter by status
npx wrangler tail --status error

# Filter by IP
npx wrangler tail --ip 1.2.3.4

# Format JSON output
npx wrangler tail --format json

Log Retention

Cloudflare Workers Logs:
  • Free Plan: 1 day retention
  • Paid Plan: 7-90 days retention
  • Limit: 5 billion logs per day
  • Max Size: 256KB per log entry
For longer retention, export to:
  • Cloudflare Logpush → S3/R2
  • Analytics Engine (permanent storage)
  • External SIEM (via OpenTelemetry)

OpenTelemetry Integration

For teams using external observability platforms (Datadog, Honeycomb, New Relic, Langfuse), Conductor provides optional OpenTelemetry support.
Performance Impact: OpenTelemetry adds 50-100ms latency per request due to external HTTP calls. Only use when integration with external platforms is required.

Setup

import { createOpenTelemetryLogger } from '@ensemble-edge/conductor/observability';

const logger = createOpenTelemetryLogger({
  exporterUrl: 'https://api.honeycomb.io',
  serviceName: 'conductor',
  samplingRate: 0.1,  // Sample 10% of requests
  headers: {
    'x-honeycomb-team': env.HONEYCOMB_API_KEY
  },
  enableConsoleLogging: true  // Also log to Workers Logs
});

const conductor = new ConductorRuntime({ env, ctx, logger });

Supported Platforms

Honeycomb

Observability for production systems

Datadog

Full-stack monitoring and APM

New Relic

Application performance monitoring

Langfuse

LLM observability and tracing

Configuration

interface OpenTelemetryConfig {
  /**
   * OTLP exporter endpoint
   * @example 'https://api.honeycomb.io'
   */
  exporterUrl: string;

  /**
   * Service name for traces
   */
  serviceName: string;

  /**
   * Sampling rate (0.0 to 1.0)
   * @default 1.0 (sample everything)
   */
  samplingRate?: number;

  /**
   * Custom headers for authentication
   * @example { 'x-honeycomb-team': 'YOUR_API_KEY' }
   */
  headers?: Record<string, string>;

  /**
   * Enable console logging alongside OTLP export
   * @default true
   */
  enableConsoleLogging?: boolean;
}

When to Use OpenTelemetry

Use OpenTelemetry when:
  • You need integration with existing observability platforms
  • You want centralized dashboards across services
  • You require advanced APM features (flame graphs, distributed tracing)
  • Compliance requires external log storage
Use Cloudflare-native logging when:
  • You deploy exclusively on Cloudflare Workers
  • You want zero-latency observability
  • You need cost-effective high-volume logging
  • You prefer serverless-native tools

Project Configuration

You can configure observability settings project-wide using conductor.config.ts:
// conductor.config.ts
export default {
  observability: {
    /**
     * Enable structured logging
     */
    logging: true,

    /**
     * Log level (debug, info, warn, error)
     */
    logLevel: 'info',

    /**
     * Enable Analytics Engine metrics
     */
    metrics: true,

    /**
     * OpenTelemetry configuration
     */
    opentelemetry: {
      enabled: true,
      endpoint: 'https://api.honeycomb.io',
      headers: {
        'x-honeycomb-team': process.env.HONEYCOMB_API_KEY
      }
    },

    /**
     * Track token usage and costs for LLM calls
     */
    trackTokenUsage: true
  }
};
Configuration Options:
  • logging - Enable/disable structured logging (default: true)
  • logLevel - Minimum log level: debug, info, warn, or error (default: info)
  • metrics - Enable/disable Analytics Engine metrics (default: true)
  • opentelemetry.enabled - Enable OpenTelemetry integration (default: false)
  • opentelemetry.endpoint - OTLP exporter URL
  • opentelemetry.headers - Authentication headers for external platform
  • trackTokenUsage - Track LLM token usage and costs (default: true)
Platform-Specific Examples:
  • Datadog
  • Honeycomb
  • New Relic
  • Langfuse
opentelemetry: {
  enabled: true,
  endpoint: 'https://http-intake.logs.datadoghq.com',
  headers: {
    'DD-API-KEY': process.env.DATADOG_API_KEY
  }
}

Best Practices

1. Use Structured Context

  • ✅ Good
  • ❌ Bad
logger.info('Member execution completed', {
  memberName: 'analyze',
  durationMs: 1234,
  cacheHit: true,
  model: 'claude-3-5-sonnet-20241022'
});
Why: Structured context enables filtering, aggregation, and alerting in Cloudflare Dashboard.

2. Use Child Loggers for Scoped Context

  • ✅ Good
  • ❌ Bad
const requestLogger = baseLogger.child({
  requestId: 'req_123',
  userId: 'alice'
});

requestLogger.info('Processing request');
requestLogger.info('Fetching data');
requestLogger.info('Request complete');
// All logs inherit requestId and userId

3. Use Appropriate Log Levels

// DEBUG - Development troubleshooting
logger.debug('Cache lookup', { key: 'user:123', hit: false });

// INFO - Normal operational events
logger.info('Ensemble execution completed', { duration: 1234 });

// WARN - Concerning but non-critical
logger.warn('High cache miss rate', { rate: 0.85 });

// ERROR - Errors needing attention
logger.error('Member execution failed', error, { attemptNumber: 3 });

4. Include Rich Context in Errors

try {
  await member.execute(context);
} catch (error) {
  logger.error('Member execution failed', error, {
    memberName: member.name,
    memberType: member.type,
    attemptNumber: retryCount,
    inputSize: JSON.stringify(input).length,
    timeout: config.timeout,
    model: config.model,
    provider: config.provider
  });
  throw error;
}

5. Track Business Metrics

// Track revenue impact
logger.metric('business.conversion', {
  blobs: [ensemble.name, userId, tier],
  doubles: [revenue, executionCost, roi],
  indexes: ['business.conversion']
});

// Track user satisfaction
logger.metric('user.satisfaction', {
  blobs: [ensemble.name, userId],
  doubles: [qualityScore, responseTime, thumbsUp ? 1 : 0],
  indexes: ['user.satisfaction']
});

6. Use Global Logger Sparingly

// ✅ Good - Dependency injection
export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    const logger = createLogger({ serviceName: 'conductor' });
    const conductor = new ConductorRuntime({ env, ctx, logger });
    // ...
  }
};

// ⚠️ Use cautiously - Global instance
import { getGlobalLogger } from '@ensemble-edge/conductor/observability';

const logger = getGlobalLogger();
logger.info('Using global logger');
Why: Dependency injection enables better testing and context management.

Performance Characteristics

Logging Performance

  • Per Log Call: ~0.1ms overhead
  • Async Writes: Non-blocking, doesn’t delay responses
  • Batching: Automatic by Cloudflare Workers platform
  • Memory: Minimal (structured JSON only)

Cloudflare Limits

  • Max Logs: 5 billion per day (Free tier)
  • Max Log Size: 256KB per entry
  • Retention: 1-90 days (plan-dependent)
  • Real-Time Tail: 100 requests/second

Analytics Engine Limits

  • Writes: 25 writes per request
  • Data Points: Up to 60 values per write (20 blobs + 20 doubles + 20 indexes)
  • Storage: Unlimited (included in Workers Paid plan)
  • Query Performance: Sub-second for billions of rows

Integration with Conductor Runtime

Automatic Execution Logging

Conductor automatically logs all ensemble and member executions:
// Ensemble start
logger.info('Ensemble execution started', {
  executionId: 'exec_123',
  ensembleName: 'company-intel',
  inputKeys: ['domain', 'detailed']
});

// Member execution
logger.info('Member execution started', {
  executionId: 'exec_123',
  ensembleName: 'company-intel',
  memberName: 'analyze',
  stepIndex: 2,
  memberType: 'Think'
});

// Member completion
logger.info('Member execution completed', {
  executionId: 'exec_123',
  memberName: 'analyze',
  durationMs: 1234,
  cacheHit: false,
  tokenCount: 500
});

// Ensemble completion
logger.info('Ensemble execution completed', {
  executionId: 'exec_123',
  ensembleName: 'company-intel',
  totalDuration: 3456,
  memberCount: 5,
  cacheHits: 2,
  success: true
});

Execution Metrics

logger.metric('execution.summary', {
  blobs: [
    ensemble.name,
    result.success ? 'success' : 'failure',
    env.ENVIRONMENT
  ],
  doubles: [
    result.metrics.totalDuration,
    result.metrics.members.length,
    result.metrics.cacheHits / result.metrics.members.length  // Hit rate
  ],
  indexes: ['execution.summary', ensemble.name]
});

Troubleshooting

Logs Not Appearing

Check log level:
const logger = createLogger({
  level: LogLevel.DEBUG  // Ensure level matches your calls
});
Check Wrangler output:
npx wrangler tail --format pretty
Check Cloudflare Dashboard:
  • Navigate to Workers & Pages > Your Worker > Logs
  • Verify logs appear in Live mode

Analytics Not Recording

Check Analytics Engine binding:
# wrangler.toml
[[analytics_engine_datasets]]
binding = "ANALYTICS"
Verify binding in code:
const logger = createLogger(
  { enableAnalytics: true },
  env.ANALYTICS  // Must match binding name
);
Check write limits:
  • Max 25 writes per request
  • Max 60 values per write

High Cardinality Warnings

If you see cardinality warnings in Analytics Engine:
// ❌ Bad - Too many unique indexes
logger.metric('request', {
  indexes: [userId]  // Millions of unique values
});

// ✅ Good - Use blobs for high cardinality
logger.metric('request', {
  blobs: [userId],
  indexes: ['request']  // Low cardinality
});


Summary

Conductor’s observability system provides: Zero-config structured logging via Cloudflare Workers Logs ✅ Unlimited metrics via Analytics Engine with SQL querying ✅ Child loggers for scoped request/execution/member context ✅ 0.1ms overhead with async, non-blocking writes ✅ Optional OpenTelemetry for external platform integration ✅ Production-ready error tracking with full stack traces Best for:
  • Cloudflare Workers deployments
  • High-volume production workloads
  • Cost-effective observability
  • Edge-native performance
Next: Explore Error Handling to see how Conductor uses structured logging for debugging and recovery.