Skip to main content

Overview

Automated content moderation workflow combining AI classification, rule-based filtering, and human review for user-generated content.

Moderation Flow

Content → AI Classification → Rule Check → Human Review (if needed) → Decision

Complete Workflow

name: moderate-content
description: Multi-layer content moderation

flow:
  # AI Classification
  - member: classify-content
    type: Think
    config:
      provider: openai
      model: gpt-4o
      temperature: 0.2
    input:
      prompt: |
        Classify this content for moderation:

        Content: ${input.content}
        Type: ${input.contentType}

        Classify on these dimensions (0-1 scale):
        - toxic: Hate speech, harassment, threats
        - sexual: Sexual or adult content
        - violence: Violent or graphic content
        - spam: Spam or promotional content
        - misleading: Misinformation or scams

        Return JSON: { toxic, sexual, violence, spam, misleading, reasoning }

  # Rule-Based Filtering
  - member: check-banned-words
    type: Function
    input:
      content: ${input.content}
      bannedWords: ${env.BANNED_WORDS.split(',')}

  - member: check-links
    type: Function
    input:
      content: ${input.content}
      allowedDomains: ${env.ALLOWED_DOMAINS.split(',')}

  # Calculate composite score
  - member: calculate-score
    type: Transform
    input:
      data:
        ai: ${classify-content.output}
        bannedWords: ${check-banned-words.output}
        links: ${check-links.output}
      expression: |
        {
          "score": (
            (ai.toxic * 2) +
            (ai.sexual * 1.5) +
            (ai.violence * 2) +
            (ai.spam * 1) +
            (ai.misleading * 1.5) +
            (bannedWords.count * 0.3) +
            (links.suspicious * 0.5)
          ) / 10,
          "reasons": $append(
            $append(ai.reasoning, bannedWords.matches),
            links.flagged
          )
        }

  # Auto-approve if low risk
  - member: auto-approve
    condition: ${calculate-score.output.score < 0.3}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE content
        SET status = 'approved', moderated_at = CURRENT_TIMESTAMP
        WHERE id = ?
    input:
      params: [${input.contentId}]

  # Auto-reject if high risk
  - member: auto-reject
    condition: ${calculate-score.output.score > 0.7}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE content
        SET status = 'rejected',
            rejection_reason = ?,
            moderated_at = CURRENT_TIMESTAMP
        WHERE id = ?
    input:
      params:
        - ${JSON.stringify(calculate-score.output.reasons)}
        - ${input.contentId}

  # Human review for medium risk
  - member: request-human-review
    condition: ${calculate-score.output.score >= 0.3 && calculate-score.output.score <= 0.7}
    type: HITL
    config:
      prompt: "Review this content for moderation"
      timeout: 3600000  # 1 hour
      context:
        content: ${input.content}
        contentType: ${input.contentType}
        author: ${input.author}
        aiScore: ${calculate-score.output.score}
        reasons: ${calculate-score.output.reasons}
      fields:
        - name: approved
          type: boolean
          label: "Approve content?"
        - name: category
          type: select
          label: "Violation category (if rejected)"
          options:
            - "hate_speech"
            - "sexual_content"
            - "violence"
            - "spam"
            - "misinformation"
            - "other"
        - name: notes
          type: textarea
          label: "Moderator notes"

  # Apply human decision
  - member: apply-human-decision
    condition: ${request-human-review.success}
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE content
        SET status = ?,
            rejection_reason = ?,
            moderator_notes = ?,
            moderated_at = CURRENT_TIMESTAMP,
            moderated_by = 'human'
        WHERE id = ?
    input:
      params:
        - ${request-human-review.output.approved ? 'approved' : 'rejected'}
        - ${request-human-review.output.category}
        - ${request-human-review.output.notes}
        - ${input.contentId}

  # Log moderation decision
  - member: log-decision
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        INSERT INTO moderation_log (
          content_id, decision, score, method, timestamp
        ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
    input:
      params:
        - ${input.contentId}
        - ${auto-approve.success ? 'approved' : auto-reject.success ? 'rejected' : request-human-review.output.approved ? 'approved' : 'rejected'}
        - ${calculate-score.output.score}
        - ${auto-approve.success || auto-reject.success ? 'automatic' : 'human'}

  # Notify user if rejected
  - member: notify-rejection
    condition: ${auto-reject.success || (request-human-review.success && !request-human-review.output.approved)}
    type: API
    config:
      url: "${env.NOTIFICATION_SERVICE}"
      method: POST
    input:
      body:
        userId: ${input.userId}
        type: "content_rejected"
        reason: ${auto-reject.success ? calculate-score.output.reasons : request-human-review.output.notes}

output:
  decision: ${auto-approve.success ? 'approved' : auto-reject.success ? 'rejected' : request-human-review.output.approved ? 'approved' : 'rejected'}
  method: ${auto-approve.success || auto-reject.success ? 'automatic' : 'human'}
  score: ${calculate-score.output.score}
  reasons: ${calculate-score.output.reasons}

Image Moderation

name: moderate-image
description: Image content moderation

flow:
  # Vision AI Classification
  - member: analyze-image
    type: Think
    config:
      provider: openai
      model: gpt-4o
    input:
      prompt: |
        Analyze this image for moderation.
        Classify for: adult content, violence, gore, hate symbols

        Image URL: ${input.imageUrl}

  # Check against known hashes
  - member: check-hash
    type: Data
    config:
      storage: kv
      operation: get
      binding: BLOCKED_HASHES
    input:
      key: ${calculate-hash(input.imageUrl)}

  # Moderate based on results
  - member: make-decision
    type: Function
    input:
      aiAnalysis: ${analyze-image.output}
      hashMatch: ${check-hash.output.value != null}

output:
  approved: ${make-decision.output.approved}
  confidence: ${analyze-image.output.confidence}

Bulk Moderation

name: bulk-moderate
description: Moderate multiple items

flow:
  # Get pending content
  - member: get-pending
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT * FROM content
        WHERE status = 'pending'
        ORDER BY created_at DESC
        LIMIT 100

  # Moderate each item
  - member: moderate-items
    foreach: ${get-pending.output.results}
    type: Ensemble
    input:
      ensemble: "moderate-content"
      data:
        contentId: ${item.id}
        content: ${item.text}
        contentType: ${item.type}
        userId: ${item.user_id}
        author: ${item.author}

output:
  processed: ${moderate-items.output.length}
  approved: ${moderate-items.output.filter(r => r.decision === 'approved').length}
  rejected: ${moderate-items.output.filter(r => r.decision === 'rejected').length}
  humanReview: ${moderate-items.output.filter(r => r.method === 'human').length}

Appeal Process

name: handle-appeal
description: Process content appeal

flow:
  # Get original decision
  - member: get-original
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT * FROM content
        WHERE id = ? AND status = 'rejected'
    input:
      params: [${input.contentId}]

  # Get moderation history
  - member: get-history
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT * FROM moderation_log
        WHERE content_id = ?
        ORDER BY timestamp DESC
    input:
      params: [${input.contentId}]

  # Re-evaluate with AI
  - member: re-evaluate
    type: Think
    config:
      provider: anthropic
      model: claude-3-5-sonnet-20241022
    input:
      prompt: |
        Re-evaluate this content moderation decision.

        Original content: ${get-original.output.results[0].text}
        Original decision: rejected
        Reason: ${get-original.output.results[0].rejection_reason}
        User appeal: ${input.appealReason}

        History: ${JSON.stringify(get-history.output.results)}

        Should the appeal be granted?

  # Human final review
  - member: final-review
    type: HITL
    config:
      prompt: "Review content appeal"
      context:
        content: ${get-original.output.results[0].text}
        originalDecision: ${get-original.output.results[0].rejection_reason}
        appeal: ${input.appealReason}
        aiRecommendation: ${re-evaluate.output.recommendation}
      fields:
        - name: grantAppeal
          type: boolean
          label: "Grant appeal?"
        - name: finalReason
          type: textarea
          label: "Final decision reason"

  # Update status
  - member: update-status
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        UPDATE content
        SET status = ?,
            appeal_decision = ?,
            appeal_decided_at = CURRENT_TIMESTAMP
        WHERE id = ?
    input:
      params:
        - ${final-review.output.grantAppeal ? 'approved' : 'rejected'}
        - ${final-review.output.finalReason}
        - ${input.contentId}

output:
  appealGranted: ${final-review.output.grantAppeal}
  reason: ${final-review.output.finalReason}

Proactive Monitoring

name: monitor-content-patterns
description: Detect emerging harmful patterns

flow:
  # Get recent rejections
  - member: get-rejections
    type: Data
    config:
      storage: d1
      operation: query
      query: |
        SELECT rejection_reason, COUNT(*) as count
        FROM content
        WHERE status = 'rejected'
          AND moderated_at > datetime('now', '-24 hours')
        GROUP BY rejection_reason
        ORDER BY count DESC

  # Analyze patterns
  - member: analyze-patterns
    type: Think
    config:
      provider: openai
      model: gpt-4o
    input:
      prompt: |
        Analyze these content moderation patterns:
        ${JSON.stringify(get-rejections.output.results)}

        Identify:
        1. Emerging threats
        2. Coordinated campaigns
        3. New evasion techniques

  # Alert if significant
  - member: alert-moderators
    condition: ${analyze-patterns.output.significantPatterns}
    type: API
    config:
      url: "${env.SLACK_WEBHOOK}"
      method: POST
    input:
      body:
        text: "⚠️ New moderation pattern detected"
        blocks:
          - type: "section"
            text:
              type: "mrkdwn"
              text: ${analyze-patterns.output.summary}

output:
  patterns: ${analyze-patterns.output.patterns}

Testing

import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';

describe('moderate-content', () => {
  it('should auto-approve safe content', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        ai: {
          responses: {
            'classify-content': {
              toxic: 0.1,
              sexual: 0.05,
              violence: 0.0,
              spam: 0.0,
              misleading: 0.0,
              reasoning: 'Safe content'
            }
          }
        }
      }
    });

    const result = await conductor.executeEnsemble('moderate-content', {
      contentId: '123',
      content: 'This is a nice comment!',
      contentType: 'comment',
      userId: 'user_123'
    });

    expect(result.output.decision).toBe('approved');
    expect(result.output.method).toBe('automatic');
  });

  it('should auto-reject harmful content', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        ai: {
          responses: {
            'classify-content': {
              toxic: 0.9,
              sexual: 0.0,
              violence: 0.0,
              spam: 0.0,
              misleading: 0.0,
              reasoning: 'Contains hate speech'
            }
          }
        }
      }
    });

    const result = await conductor.executeEnsemble('moderate-content', {
      contentId: '456',
      content: 'Harmful content...',
      contentType: 'comment'
    });

    expect(result.output.decision).toBe('rejected');
    expect(result.output.method).toBe('automatic');
  });

  it('should request human review for borderline content', async () => {
    const conductor = await TestConductor.create({
      mocks: {
        ai: {
          responses: {
            'classify-content': {
              toxic: 0.5,
              sexual: 0.0,
              violence: 0.0,
              spam: 0.0,
              misleading: 0.0
            }
          }
        }
      }
    });

    const execution = await conductor.executeEnsemble('moderate-content', {
      contentId: '789',
      content: 'Borderline content...',
      contentType: 'comment'
    });

    expect(execution.status).toBe('waiting_for_input');

    // Simulate human approval
    await conductor.respondToHITL(execution.id, {
      approved: true,
      notes: 'Context makes it acceptable'
    });

    const result = await conductor.waitForCompletion(execution.id);

    expect(result.output.decision).toBe('approved');
    expect(result.output.method).toBe('human');
  });
});

Best Practices

  1. Multi-layer approach - AI + rules + human review
  2. Clear thresholds - Auto-approve/reject boundaries
  3. Context matters - Consider content type and user history
  4. Fast decisions - Auto-moderate when confidence is high
  5. Human oversight - Borderline cases need review
  6. Appeal process - Allow users to appeal decisions
  7. Monitor patterns - Detect coordinated abuse
  8. Log everything - Track decisions for improvement