Streaming
Stream responses from AI providers in real-time using ActiveAgent's streaming callbacks. This guide covers handling streaming responses with callbacks that execute at different points in the streaming lifecycle.
Overview
ActiveAgent provides three streaming callbacks:
on_stream_open- Invoked when the stream beginson_stream- Invoked for every chunk received during streamingon_stream_close- Invoked when the stream completes
Callbacks automatically receive a StreamChunk object if they accept a parameter, providing access to the current message state and incremental delta content.
Basic Streaming
Enable streaming by passing stream: true to your agent or prompt:
class ChatAgent < ActiveAgent::Base
generate_with :openai, model: "gpt-4", stream: true
on_stream :handle_chunk
def chat(message)
prompt(message)
end
private
def handle_chunk(chunk)
print chunk.delta if chunk.delta
end
end# Usage
ChatAgent.chat("Hello!").generate_nowStreamChunk Object
Each callback receives a StreamChunk with two attributes:
message- The current message object from the provider (accumulated state)delta- The incremental content for this specific chunk (may benil)
The delta contains only the new content received in the current chunk, while message contains the accumulated message state. Not all chunks contain a delta—some may only contain metadata updates.
def log_chunk(chunk)
# Delta contains only new content for this chunk
Rails.logger.debug("New content: #{chunk.delta}")
# Message contains accumulated provider response
Rails.logger.debug("Full message so far: #{chunk.message.inspect}")
endLifecycle Callbacks
Use callbacks to handle different points in the streaming lifecycle:
on_stream_open
Invoked when streaming begins. Use this to initialize state:
on_stream_open :start_timer
def start_timer(chunk)
@start_time = Time.current
endon_stream
Invoked for every chunk. Keep processing lightweight:
on_stream :broadcast_chunk
def broadcast_chunk(chunk)
return unless chunk.delta
ActionCable.server.broadcast("chat", content: chunk.delta)
endon_stream_close
Invoked when streaming completes. Use for cleanup and final processing:
on_stream_close :save_response
def save_response(chunk)
Message.create!(content: chunk.message)
endCallback Options
Optional Parameters
Callbacks can accept a chunk parameter or omit it:
# With chunk parameter - receives StreamChunk
on_stream :process_chunk
def process_chunk(chunk)
print chunk.delta if chunk.delta
end
# Without chunk parameter
on_stream :increment_counter
def increment_counter
@counter ||= 0
@counter += 1
endMultiple Callbacks
Register multiple callbacks that execute in order:
on_stream :log_chunk, :broadcast_chunk, :save_to_bufferConditional Execution
Use :if and :unless to conditionally execute callbacks:
on_stream :debug_chunk, if: :debug_mode?
on_stream_close :save_response, unless: :test_environment?
def debug_mode?
Rails.env.development?
endBlock Syntax
Define callbacks inline with blocks:
on_stream do |chunk|
print chunk.delta if chunk.delta
end
on_stream_close do
Rails.logger.info("Stream completed")
endProvider Support
Streaming is supported by these providers:
- OpenAI - All chat completion models (GPT-4, GPT-3.5 Turbo, etc.)
- Anthropic - Claude models (Claude 3 and Claude 4 families)
- OpenRouter - Most models with streaming capability
- Ollama - Local models with streaming support
See the providers documentation for provider-specific configuration.
Best Practices
Guard Against Nil Deltas
Always check for nil delta values:
def process_chunk(chunk)
return unless chunk.delta
print chunk.delta
endInitialize State in on_stream_open
Set up buffers and counters before streaming:
on_stream_open { @buffer = [] }
on_stream { |chunk| @buffer << chunk.delta if chunk.delta }
on_stream_close { process(@buffer.join) }Handle Errors Gracefully
Prevent callback errors from interrupting the stream:
def safe_broadcast(chunk)
return unless chunk.delta
ActionCable.server.broadcast("channel", content: chunk.delta)
rescue => e
Rails.logger.error("Broadcast failed: #{e.message}")
endKeep on_stream Callbacks Light
Heavy processing should happen in on_stream_close:
# Good
on_stream { |chunk| @buffer << chunk.delta if chunk.delta }
on_stream_close { expensive_processing(@buffer.join) }
# Avoid - runs for every chunk!
on_stream { |chunk| expensive_processing(chunk.delta) }Limitations
- Background jobs: Streaming doesn't work with
prompt_latersince callbacks require an active agent instance - Tool execution: Providers may pause streaming to execute tools, then resume
- Provider differences: Streaming behavior varies by provider—some send metadata chunks, others only send content
- Structured output: Not all providers support streaming with structured output schemas
Next Steps
- Error Handling - Handle failures in streaming and generation
- Callbacks - Non-streaming lifecycle events
- Instrumentation - Monitor and measure streaming performance
- Providers - Provider-specific streaming documentation