Overview
Automated content moderation workflow combining AI classification, rule-based filtering, and human review for user-generated content.Moderation Flow
Copy
Content → AI Classification → Rule Check → Human Review (if needed) → Decision
Complete Workflow
Copy
name: moderate-content
description: Multi-layer content moderation
flow:
# AI Classification
- member: classify-content
type: Think
config:
provider: openai
model: gpt-4o
temperature: 0.2
input:
prompt: |
Classify this content for moderation:
Content: ${input.content}
Type: ${input.contentType}
Classify on these dimensions (0-1 scale):
- toxic: Hate speech, harassment, threats
- sexual: Sexual or adult content
- violence: Violent or graphic content
- spam: Spam or promotional content
- misleading: Misinformation or scams
Return JSON: { toxic, sexual, violence, spam, misleading, reasoning }
# Rule-Based Filtering
- member: check-banned-words
type: Function
input:
content: ${input.content}
bannedWords: ${env.BANNED_WORDS.split(',')}
- member: check-links
type: Function
input:
content: ${input.content}
allowedDomains: ${env.ALLOWED_DOMAINS.split(',')}
# Calculate composite score
- member: calculate-score
type: Transform
input:
data:
ai: ${classify-content.output}
bannedWords: ${check-banned-words.output}
links: ${check-links.output}
expression: |
{
"score": (
(ai.toxic * 2) +
(ai.sexual * 1.5) +
(ai.violence * 2) +
(ai.spam * 1) +
(ai.misleading * 1.5) +
(bannedWords.count * 0.3) +
(links.suspicious * 0.5)
) / 10,
"reasons": $append(
$append(ai.reasoning, bannedWords.matches),
links.flagged
)
}
# Auto-approve if low risk
- member: auto-approve
condition: ${calculate-score.output.score < 0.3}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE content
SET status = 'approved', moderated_at = CURRENT_TIMESTAMP
WHERE id = ?
input:
params: [${input.contentId}]
# Auto-reject if high risk
- member: auto-reject
condition: ${calculate-score.output.score > 0.7}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE content
SET status = 'rejected',
rejection_reason = ?,
moderated_at = CURRENT_TIMESTAMP
WHERE id = ?
input:
params:
- ${JSON.stringify(calculate-score.output.reasons)}
- ${input.contentId}
# Human review for medium risk
- member: request-human-review
condition: ${calculate-score.output.score >= 0.3 && calculate-score.output.score <= 0.7}
type: HITL
config:
prompt: "Review this content for moderation"
timeout: 3600000 # 1 hour
context:
content: ${input.content}
contentType: ${input.contentType}
author: ${input.author}
aiScore: ${calculate-score.output.score}
reasons: ${calculate-score.output.reasons}
fields:
- name: approved
type: boolean
label: "Approve content?"
- name: category
type: select
label: "Violation category (if rejected)"
options:
- "hate_speech"
- "sexual_content"
- "violence"
- "spam"
- "misinformation"
- "other"
- name: notes
type: textarea
label: "Moderator notes"
# Apply human decision
- member: apply-human-decision
condition: ${request-human-review.success}
type: Data
config:
storage: d1
operation: query
query: |
UPDATE content
SET status = ?,
rejection_reason = ?,
moderator_notes = ?,
moderated_at = CURRENT_TIMESTAMP,
moderated_by = 'human'
WHERE id = ?
input:
params:
- ${request-human-review.output.approved ? 'approved' : 'rejected'}
- ${request-human-review.output.category}
- ${request-human-review.output.notes}
- ${input.contentId}
# Log moderation decision
- member: log-decision
type: Data
config:
storage: d1
operation: query
query: |
INSERT INTO moderation_log (
content_id, decision, score, method, timestamp
) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
input:
params:
- ${input.contentId}
- ${auto-approve.success ? 'approved' : auto-reject.success ? 'rejected' : request-human-review.output.approved ? 'approved' : 'rejected'}
- ${calculate-score.output.score}
- ${auto-approve.success || auto-reject.success ? 'automatic' : 'human'}
# Notify user if rejected
- member: notify-rejection
condition: ${auto-reject.success || (request-human-review.success && !request-human-review.output.approved)}
type: API
config:
url: "${env.NOTIFICATION_SERVICE}"
method: POST
input:
body:
userId: ${input.userId}
type: "content_rejected"
reason: ${auto-reject.success ? calculate-score.output.reasons : request-human-review.output.notes}
output:
decision: ${auto-approve.success ? 'approved' : auto-reject.success ? 'rejected' : request-human-review.output.approved ? 'approved' : 'rejected'}
method: ${auto-approve.success || auto-reject.success ? 'automatic' : 'human'}
score: ${calculate-score.output.score}
reasons: ${calculate-score.output.reasons}
Image Moderation
Copy
name: moderate-image
description: Image content moderation
flow:
# Vision AI Classification
- member: analyze-image
type: Think
config:
provider: openai
model: gpt-4o
input:
prompt: |
Analyze this image for moderation.
Classify for: adult content, violence, gore, hate symbols
Image URL: ${input.imageUrl}
# Check against known hashes
- member: check-hash
type: Data
config:
storage: kv
operation: get
binding: BLOCKED_HASHES
input:
key: ${calculate-hash(input.imageUrl)}
# Moderate based on results
- member: make-decision
type: Function
input:
aiAnalysis: ${analyze-image.output}
hashMatch: ${check-hash.output.value != null}
output:
approved: ${make-decision.output.approved}
confidence: ${analyze-image.output.confidence}
Bulk Moderation
Copy
name: bulk-moderate
description: Moderate multiple items
flow:
# Get pending content
- member: get-pending
type: Data
config:
storage: d1
operation: query
query: |
SELECT * FROM content
WHERE status = 'pending'
ORDER BY created_at DESC
LIMIT 100
# Moderate each item
- member: moderate-items
foreach: ${get-pending.output.results}
type: Ensemble
input:
ensemble: "moderate-content"
data:
contentId: ${item.id}
content: ${item.text}
contentType: ${item.type}
userId: ${item.user_id}
author: ${item.author}
output:
processed: ${moderate-items.output.length}
approved: ${moderate-items.output.filter(r => r.decision === 'approved').length}
rejected: ${moderate-items.output.filter(r => r.decision === 'rejected').length}
humanReview: ${moderate-items.output.filter(r => r.method === 'human').length}
Appeal Process
Copy
name: handle-appeal
description: Process content appeal
flow:
# Get original decision
- member: get-original
type: Data
config:
storage: d1
operation: query
query: |
SELECT * FROM content
WHERE id = ? AND status = 'rejected'
input:
params: [${input.contentId}]
# Get moderation history
- member: get-history
type: Data
config:
storage: d1
operation: query
query: |
SELECT * FROM moderation_log
WHERE content_id = ?
ORDER BY timestamp DESC
input:
params: [${input.contentId}]
# Re-evaluate with AI
- member: re-evaluate
type: Think
config:
provider: anthropic
model: claude-3-5-sonnet-20241022
input:
prompt: |
Re-evaluate this content moderation decision.
Original content: ${get-original.output.results[0].text}
Original decision: rejected
Reason: ${get-original.output.results[0].rejection_reason}
User appeal: ${input.appealReason}
History: ${JSON.stringify(get-history.output.results)}
Should the appeal be granted?
# Human final review
- member: final-review
type: HITL
config:
prompt: "Review content appeal"
context:
content: ${get-original.output.results[0].text}
originalDecision: ${get-original.output.results[0].rejection_reason}
appeal: ${input.appealReason}
aiRecommendation: ${re-evaluate.output.recommendation}
fields:
- name: grantAppeal
type: boolean
label: "Grant appeal?"
- name: finalReason
type: textarea
label: "Final decision reason"
# Update status
- member: update-status
type: Data
config:
storage: d1
operation: query
query: |
UPDATE content
SET status = ?,
appeal_decision = ?,
appeal_decided_at = CURRENT_TIMESTAMP
WHERE id = ?
input:
params:
- ${final-review.output.grantAppeal ? 'approved' : 'rejected'}
- ${final-review.output.finalReason}
- ${input.contentId}
output:
appealGranted: ${final-review.output.grantAppeal}
reason: ${final-review.output.finalReason}
Proactive Monitoring
Copy
name: monitor-content-patterns
description: Detect emerging harmful patterns
flow:
# Get recent rejections
- member: get-rejections
type: Data
config:
storage: d1
operation: query
query: |
SELECT rejection_reason, COUNT(*) as count
FROM content
WHERE status = 'rejected'
AND moderated_at > datetime('now', '-24 hours')
GROUP BY rejection_reason
ORDER BY count DESC
# Analyze patterns
- member: analyze-patterns
type: Think
config:
provider: openai
model: gpt-4o
input:
prompt: |
Analyze these content moderation patterns:
${JSON.stringify(get-rejections.output.results)}
Identify:
1. Emerging threats
2. Coordinated campaigns
3. New evasion techniques
# Alert if significant
- member: alert-moderators
condition: ${analyze-patterns.output.significantPatterns}
type: API
config:
url: "${env.SLACK_WEBHOOK}"
method: POST
input:
body:
text: "⚠️ New moderation pattern detected"
blocks:
- type: "section"
text:
type: "mrkdwn"
text: ${analyze-patterns.output.summary}
output:
patterns: ${analyze-patterns.output.patterns}
Testing
Copy
import { describe, it, expect } from 'vitest';
import { TestConductor } from '@ensemble-edge/conductor/testing';
describe('moderate-content', () => {
it('should auto-approve safe content', async () => {
const conductor = await TestConductor.create({
mocks: {
ai: {
responses: {
'classify-content': {
toxic: 0.1,
sexual: 0.05,
violence: 0.0,
spam: 0.0,
misleading: 0.0,
reasoning: 'Safe content'
}
}
}
}
});
const result = await conductor.executeEnsemble('moderate-content', {
contentId: '123',
content: 'This is a nice comment!',
contentType: 'comment',
userId: 'user_123'
});
expect(result.output.decision).toBe('approved');
expect(result.output.method).toBe('automatic');
});
it('should auto-reject harmful content', async () => {
const conductor = await TestConductor.create({
mocks: {
ai: {
responses: {
'classify-content': {
toxic: 0.9,
sexual: 0.0,
violence: 0.0,
spam: 0.0,
misleading: 0.0,
reasoning: 'Contains hate speech'
}
}
}
}
});
const result = await conductor.executeEnsemble('moderate-content', {
contentId: '456',
content: 'Harmful content...',
contentType: 'comment'
});
expect(result.output.decision).toBe('rejected');
expect(result.output.method).toBe('automatic');
});
it('should request human review for borderline content', async () => {
const conductor = await TestConductor.create({
mocks: {
ai: {
responses: {
'classify-content': {
toxic: 0.5,
sexual: 0.0,
violence: 0.0,
spam: 0.0,
misleading: 0.0
}
}
}
}
});
const execution = await conductor.executeEnsemble('moderate-content', {
contentId: '789',
content: 'Borderline content...',
contentType: 'comment'
});
expect(execution.status).toBe('waiting_for_input');
// Simulate human approval
await conductor.respondToHITL(execution.id, {
approved: true,
notes: 'Context makes it acceptable'
});
const result = await conductor.waitForCompletion(execution.id);
expect(result.output.decision).toBe('approved');
expect(result.output.method).toBe('human');
});
});
Best Practices
- Multi-layer approach - AI + rules + human review
- Clear thresholds - Auto-approve/reject boundaries
- Context matters - Consider content type and user history
- Fast decisions - Auto-moderate when confidence is high
- Human oversight - Borderline cases need review
- Appeal process - Allow users to appeal decisions
- Monitor patterns - Detect coordinated abuse
- Log everything - Track decisions for improvement

