Skip to main content

Flow Control

Flow control determines when and how agents execute. Sequential, parallel, conditional - you control it all declaratively. No imperative code, no manual orchestration. Just declare what you want.

Execution Modes

Sequential Execution

Agents with dependencies run sequentially:
agents:
  - name: fetch
    agent: fetcher
    inputs:
      url: ${input.url}

  - name: process
    operation: code
    config:
      code: return { processed: ${fetch.output} };

  - name: store
    operation: storage
    config:
      type: d1
      query: INSERT INTO data (json) VALUES (?)
      params: [${process.output}]
Execution order: fetch � process � store (because each depends on the previous)

Parallel Execution

Agents without dependencies run in parallel:
agents:
  # These 3 run in parallel
  - name: fetch-a
    agent: fetcher
    inputs:
      url: https://api-a.com

  - name: fetch-b
    agent: fetcher
    inputs:
      url: https://api-b.com

  - name: fetch-c
    agent: fetcher
    inputs:
      url: https://api-c.com

  # This waits for all 3
  - name: merge
    operation: code
    config:
      code: |
        return {
          a: ${fetch-a.output},
          b: ${fetch-b.output},
          c: ${fetch-c.output}
        };
Performance: 3x faster than sequential execution.

Conditional Execution

Simple Conditions

Skip agents based on boolean expressions:
agents:
  - name: generate
    operation: think
    condition: ${input.use_ai}  # Only run if input.use_ai is true
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: ${input.text}

Based on Previous Outputs

agents:
  - name: validate
    agent: validator
    inputs:
      data: ${input.data}

  - name: process
    condition: ${validate.output.valid}  # Only run if valid
    operation: code
    config:
      code: return { processed: ${input.data} };

  - name: log-error
    condition: ${!validate.output.valid}  # Only run if invalid
    operation: storage
    config:
      type: d1
      query: INSERT INTO errors (data) VALUES (?)
      params: [${validate.output.errors}]

Cache-or-Generate Pattern

agents:
  - name: check-cache
    operation: storage
    config:
      type: kv
      action: get
      key: result-${input.query}

  # Only generate if cache miss
  - name: generate
    condition: ${check-cache.output.value === null}
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: ${input.query}

  # Only save if generated
  - name: save-cache
    condition: ${generate.executed}
    operation: storage
    config:
      type: kv
      action: put
      key: result-${input.query}
      value: ${generate.output}

output:
  result: ${check-cache.output.value || generate.output}
  from_cache: ${!generate.executed}

Error Handling

Fallback Pattern

agents:
  - name: try-primary
    operation: http
    config:
      url: https://primary-api.com
    retry:
      maxAttempts: 2
      backoff: exponential

  - name: try-secondary
    condition: ${try-primary.failed}  # Only if primary failed
    operation: http
    config:
      url: https://backup-api.com

  - name: use-cache
    condition: ${try-primary.failed && try-secondary.failed}
    operation: storage
    config:
      type: kv
      action: get
      key: cached-data

output:
  data: ${try-primary.output || try-secondary.output || use-cache.output}
  source: ${(() => {
    if (try-primary.executed && !try-primary.failed) return 'primary';
    if (try-secondary.executed && !try-secondary.failed) return 'secondary';
    return 'cache';
  })()}

Retry Logic

agents:
  - name: flaky-operation
    operation: http
    config:
      url: https://api.example.com
    retry:
      maxAttempts: 3            # Try up to 3 times
      backoff: exponential       # Exponential backoff (1s, 2s, 4s)
      initialDelay: 1000         # Start with 1 second
      maxDelay: 10000            # Cap at 10 seconds
      retryOn: [500, 502, 503]   # Only retry these status codes

Complex Conditions

Multiple Conditions

agents:
  - name: premium-processing
    condition: ${input.user.premium && input.data.size < 10000000}
    operation: think
    config:
      provider: openai
      model: gpt-4o
      prompt: ${input.data}

  - name: standard-processing
    condition: ${!input.user.premium || input.data.size >= 10000000}
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: ${input.data}

Dynamic Routing

agents:
  - name: classify
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: |
        Classify this request: "${input.text}"
        Return only: urgent, normal, or low

  # Route based on classification
  - name: urgent-handler
    condition: ${classify.output === 'urgent'}
    agent: urgent-processor
    inputs:
      data: ${input.text}

  - name: normal-handler
    condition: ${classify.output === 'normal'}
    agent: normal-processor
    inputs:
      data: ${input.text}

  - name: low-handler
    condition: ${classify.output === 'low'}
    agent: low-processor
    inputs:
      data: ${input.text}

output:
  result: ${urgent-handler.output || normal-handler.output || low-handler.output}
  priority: ${classify.output}

Loops and Iteration

Array Processing

agents:
  # Process each item in array
  - name: process-items
    operation: code
    config:
      code: |
        const items = ${input.items};
        return {
          results: items.map(item => processItem(item))
        };

Batch Processing

agents:
  - name: fetch-batch
    operation: storage
    config:
      type: d1
      query: SELECT * FROM queue WHERE processed = false LIMIT ?
      params: [${input.batch_size || 100}]

  - name: process-batch
    condition: ${fetch-batch.output.rows.length > 0}
    operation: code
    config:
      code: |
        const rows = ${fetch-batch.output.rows};
        return {
          results: rows.map(row => ({
            id: row.id,
            result: processRow(row)
          }))
        };

  - name: mark-processed
    condition: ${process-batch.executed}
    operation: storage
    config:
      type: d1
      query: |
        UPDATE queue SET processed = true
        WHERE id IN (${process-batch.output.results.map(r => r.id).join(',')})

output:
  processed_count: ${process-batch.output?.results.length || 0}
  has_more: ${fetch-batch.output.rows.length === input.batch_size}

Performance Optimization

Minimize Dependencies

# Bad: Sequential (slow)
agents:
  - name: step1
    operation: http
    config:
      url: https://api-a.com

  - name: step2
    operation: http
    config:
      url: https://api-b.com
      data: ${step1.output}  # Creates dependency

# Good: Parallel (fast)
agents:
  - name: step1
    operation: http
    config:
      url: https://api-a.com

  - name: step2
    operation: http
    config:
      url: https://api-b.com

  - name: merge
    operation: code
    config:
      code: |
        return {
          a: ${step1.output},
          b: ${step2.output}
        };

Early Termination

Skip expensive operations when possible:
agents:
  # Quick validation first
  - name: quick-check
    operation: code
    config:
      code: |
        const text = "${input.text}".toLowerCase();
        return { spam: text.includes('viagra') };

  # Only do expensive AI if not obvious spam
  - name: ai-moderation
    condition: ${!quick-check.output.spam}
    operation: think
    config:
      provider: openai
      model: gpt-4o-mini
      prompt: Moderate: ${input.text}

output:
  safe: ${quick-check.output.spam ? false : ai-moderation.output.safe}
  reason: ${quick-check.output.spam ? 'spam detected' : ai-moderation.output.reason}

Advanced Patterns

Fan-Out/Fan-In

Process multiple items in parallel, then aggregate:
agents:
  # Fan-out: Process each URL in parallel
  - name: scrape-url-1
    condition: ${input.urls[0]}
    agent: scraper
    inputs:
      url: ${input.urls[0]}

  - name: scrape-url-2
    condition: ${input.urls[1]}
    agent: scraper
    inputs:
      url: ${input.urls[1]}

  - name: scrape-url-3
    condition: ${input.urls[2]}
    agent: scraper
    inputs:
      url: ${input.urls[2]}

  # Fan-in: Aggregate results
  - name: aggregate
    operation: code
    config:
      code: |
        return {
          results: [
            ${scrape-url-1.output},
            ${scrape-url-2.output},
            ${scrape-url-3.output}
          ].filter(r => r !== undefined)
        };

output:
  all_results: ${aggregate.output.results}

Pipeline with Validation

agents:
  - name: fetch-data
    agent: fetcher
    inputs:
      url: ${input.url}

  - name: validate-schema
    agent: validator
    inputs:
      data: ${fetch-data.output}
      schema: data-schema

  - name: transform
    condition: ${validate-schema.output.valid}
    agent: transformer
    inputs:
      data: ${fetch-data.output}

  - name: validate-business-rules
    condition: ${transform.executed}
    agent: validator
    inputs:
      data: ${transform.output}
      schema: business-rules

  - name: store
    condition: ${validate-business-rules.output.valid}
    operation: storage
    config:
      type: d1
      query: INSERT INTO data (json) VALUES (?)
      params: [${transform.output}]

output:
  success: ${store.executed}
  errors: ${validate-schema.output.errors || validate-business-rules.output.errors}

Circuit Breaker

agents:
  - name: check-circuit
    operation: storage
    config:
      type: kv
      action: get
      key: circuit-breaker-${input.service}

  - name: call-service
    condition: ${check-circuit.output.value !== 'open'}
    operation: http
    config:
      url: ${input.service_url}
    retry:
      maxAttempts: 1

  - name: update-circuit
    operation: storage
    config:
      type: kv
      action: put
      key: circuit-breaker-${input.service}
      value: ${call-service.failed ? 'open' : 'closed'}
      expirationTtl: ${call-service.failed ? 60 : null}

  - name: fallback
    condition: ${call-service.failed || check-circuit.output.value === 'open'}
    operation: storage
    config:
      type: kv
      action: get
      key: cached-${input.service}

output:
  data: ${call-service.output || fallback.output}
  circuit_open: ${check-circuit.output.value === 'open'}

Best Practices

  1. Parallel by Default - Only add dependencies when necessary
  2. Condition Expensive Operations - Skip work when possible
  3. Handle Failures - Always have fallbacks
  4. Use Retry Logic - For transient failures
  5. Early Termination - Fast checks before slow operations
  6. Cache Strategically - Cache expensive results
  7. Test Flow Paths - Test all conditional branches
  8. Monitor Performance - Track execution times

Next Steps