Skip to content

Streaming

Stream responses from AI providers in real-time using ActiveAgent's streaming callbacks. This guide covers handling streaming responses with callbacks that execute at different points in the streaming lifecycle.

Overview

ActiveAgent provides three streaming callbacks:

  • on_stream_open - Invoked when the stream begins
  • on_stream - Invoked for every chunk received during streaming
  • on_stream_close - Invoked when the stream completes

Callbacks automatically receive a StreamChunk object if they accept a parameter, providing access to the current message state and incremental delta content.

Basic Streaming

Enable streaming by passing stream: true to your agent or prompt:

ruby
class ChatAgent < ActiveAgent::Base
  generate_with :openai, model: "gpt-4", stream: true

  on_stream :handle_chunk

  def chat(message)
    prompt(message)
  end

  private

  def handle_chunk(chunk)
    print chunk.delta if chunk.delta
  end
end
ruby
# Usage
ChatAgent.chat("Hello!").generate_now

StreamChunk Object

Each callback receives a StreamChunk with two attributes:

  • message - The current message object from the provider (accumulated state)
  • delta - The incremental content for this specific chunk (may be nil)

The delta contains only the new content received in the current chunk, while message contains the accumulated message state. Not all chunks contain a delta—some may only contain metadata updates.

ruby
def log_chunk(chunk)
  # Delta contains only new content for this chunk
  Rails.logger.debug("New content: #{chunk.delta}")

  # Message contains accumulated provider response
  Rails.logger.debug("Full message so far: #{chunk.message.inspect}")
end

Lifecycle Callbacks

Use callbacks to handle different points in the streaming lifecycle:

on_stream_open

Invoked when streaming begins. Use this to initialize state:

ruby
on_stream_open :start_timer

def start_timer(chunk)
  @start_time = Time.current
end

on_stream

Invoked for every chunk. Keep processing lightweight:

ruby
on_stream :broadcast_chunk

def broadcast_chunk(chunk)
  return unless chunk.delta

  ActionCable.server.broadcast("chat", content: chunk.delta)
end

on_stream_close

Invoked when streaming completes. Use for cleanup and final processing:

ruby
on_stream_close :save_response

def save_response(chunk)
  Message.create!(content: chunk.message)
end

Callback Options

Optional Parameters

Callbacks can accept a chunk parameter or omit it:

ruby
# With chunk parameter - receives StreamChunk
on_stream :process_chunk

def process_chunk(chunk)
  print chunk.delta if chunk.delta
end

# Without chunk parameter
on_stream :increment_counter

def increment_counter
  @counter ||= 0
  @counter += 1
end

Multiple Callbacks

Register multiple callbacks that execute in order:

ruby
on_stream :log_chunk, :broadcast_chunk, :save_to_buffer

Conditional Execution

Use :if and :unless to conditionally execute callbacks:

ruby
on_stream       :debug_chunk,   if:     :debug_mode?
on_stream_close :save_response, unless: :test_environment?

def debug_mode?
  Rails.env.development?
end

Block Syntax

Define callbacks inline with blocks:

ruby
on_stream do |chunk|
  print chunk.delta if chunk.delta
end

on_stream_close do
  Rails.logger.info("Stream completed")
end

Provider Support

Streaming is supported by these providers:

  • OpenAI - All chat completion models (GPT-4, GPT-3.5 Turbo, etc.)
  • Anthropic - Claude models (Claude 3 and Claude 4 families)
  • OpenRouter - Most models with streaming capability
  • Ollama - Local models with streaming support

See the providers documentation for provider-specific configuration.

Best Practices

Guard Against Nil Deltas

Always check for nil delta values:

ruby
def process_chunk(chunk)
  return unless chunk.delta
  print chunk.delta
end

Initialize State in on_stream_open

Set up buffers and counters before streaming:

ruby
on_stream_open { @buffer = [] }
on_stream { |chunk| @buffer << chunk.delta if chunk.delta }
on_stream_close { process(@buffer.join) }

Handle Errors Gracefully

Prevent callback errors from interrupting the stream:

ruby
def safe_broadcast(chunk)
  return unless chunk.delta
  ActionCable.server.broadcast("channel", content: chunk.delta)
rescue => e
  Rails.logger.error("Broadcast failed: #{e.message}")
end

Keep on_stream Callbacks Light

Heavy processing should happen in on_stream_close:

ruby
# Good
on_stream { |chunk| @buffer << chunk.delta if chunk.delta }
on_stream_close { expensive_processing(@buffer.join) }

# Avoid - runs for every chunk!
on_stream { |chunk| expensive_processing(chunk.delta) }

Limitations

  • Background jobs: Streaming doesn't work with prompt_later since callbacks require an active agent instance
  • Tool execution: Providers may pause streaming to execute tools, then resume
  • Provider differences: Streaming behavior varies by provider—some send metadata chunks, others only send content
  • Structured output: Not all providers support streaming with structured output schemas

Next Steps