DocsGuidesStreaming

Streaming

StreamResource provides token-by-token streaming from LangGraph agents via Server-Sent Events (SSE). Every update lands directly in Angular Signals — no subscriptions, no manual change detection.

Prerequisites

Make sure you've completed the Installation guide first.

How streaming works

Streaming starts on the agent side. LangGraph's astream() method controls what data is sent over the SSE connection. On the Angular side, streamResource() consumes those events and maps them to Signals.

from langgraph.graph import END, START, MessagesState, StateGraph
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-5-mini", streaming=True)
 
def call_model(state: MessagesState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
 
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", END)
 
graph = builder.compile()
 
# Stream modes control what SSE chunks contain:
 
# "values" — full state snapshot after each node
async for chunk in graph.astream(
    {"messages": [("user", "Hello")]},
    stream_mode="values",
):
    print(chunk)
 
# "messages" — individual message tokens as generated
async for chunk in graph.astream(
    {"messages": [("user", "Hello")]},
    stream_mode="messages",
):
    print(chunk)
 
# "events" — raw run events (on_chain_start, on_llm_stream, etc.)
async for event in graph.astream_events(
    {"messages": [("user", "Hello")]},
    version="v2",
):
    print(event["event"], event.get("data"))

Stream status

The status() signal reports the current lifecycle state of the SSE connection:

1
idle

No active stream. The resource is ready to accept a new message.

2
loading

Tokens are arriving over the SSE connection. Signal values update in real-time with each chunk.

3
error

The connection was interrupted or the agent returned an error. Inspect error() for the full details.

Stream modes

LangGraph supports three stream modes. Pass streamMode to control what each SSE chunk contains.

// Receives the full agent state after every node execution.
// Best for message-based chat interfaces.
const chat = streamResource<{ messages: BaseMessage[] }>({
  assistantId: 'chat_agent',
  streamMode: 'values',
});
 
// chat.messages() always contains the complete message list
Choosing a mode

Use values for most chat UIs — it gives you a consistent, complete state snapshot. Switch to messages only when you need sub-token latency or are rendering a live typing cursor.

Error handling

If the SSE connection drops or the agent throws, status() transitions to 'error' and error() is populated. Use these signals to render fallback UI and retry.

import { Component, computed, ChangeDetectionStrategy } from '@angular/core';
import { streamResource } from '@cacheplane/stream-resource';
import { BaseMessage } from '@langchain/core/messages';
 
@Component({
  selector: 'app-chat',
  templateUrl: './chat.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ChatComponent {
  readonly chat = streamResource<{ messages: BaseMessage[] }>({
    assistantId: 'chat_agent',
  });
 
  readonly hasError = computed(() => this.chat.status() === 'error');
 
  retry() {
    // Re-stream using the same thread so context is preserved
    this.chat.submit();
  }
}
Network errors vs agent errors

error() surfaces both transport-level failures (lost connection, 5xx) and application-level errors returned by the agent graph. Check error().cause for the underlying HTTP status when you need to distinguish them.

Throttle configuration

By default StreamResource emits a signal update for every incoming SSE chunk. On fast connections this can trigger hundreds of renders per second. Use the throttle option to coalesce updates.

const chat = streamResource<{ messages: BaseMessage[] }>({
  assistantId: 'chat_agent',
  // Batch incoming chunks and flush at most once every 50 ms
  throttle: 50,
});

The value is in milliseconds. A throttle of 0 (default) disables batching and passes every chunk through immediately. Good starting values:

| Use case | Recommended throttle | |---|---| | Token-by-token typing effect | 0 ms (disabled) | | Standard chat bubble | 50 ms | | Background summarisation | 150 ms |

SSE connection behavior

Each call to chat.submit() opens a new SSE connection. Connections are automatically closed when the agent run completes or when the Angular component is destroyed — you do not need to manage the lifecycle manually.

What's Next