Streaming
StreamResource provides token-by-token streaming from LangGraph agents via Server-Sent Events (SSE). Every update lands directly in Angular Signals — no subscriptions, no manual change detection.
Make sure you've completed the Installation guide first.
How streaming works
Streaming starts on the agent side. LangGraph's astream() method controls what data is sent over the SSE connection. On the Angular side, streamResource() consumes those events and maps them to Signals.
from langgraph.graph import END, START, MessagesState, StateGraph
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5-mini", streaming=True)
def call_model(state: MessagesState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", END)
graph = builder.compile()
# Stream modes control what SSE chunks contain:
# "values" — full state snapshot after each node
async for chunk in graph.astream(
{"messages": [("user", "Hello")]},
stream_mode="values",
):
print(chunk)
# "messages" — individual message tokens as generated
async for chunk in graph.astream(
{"messages": [("user", "Hello")]},
stream_mode="messages",
):
print(chunk)
# "events" — raw run events (on_chain_start, on_llm_stream, etc.)
async for event in graph.astream_events(
{"messages": [("user", "Hello")]},
version="v2",
):
print(event["event"], event.get("data"))Stream status
The status() signal reports the current lifecycle state of the SSE connection:
No active stream. The resource is ready to accept a new message.
Tokens are arriving over the SSE connection. Signal values update in real-time with each chunk.
The connection was interrupted or the agent returned an error. Inspect error() for the full details.
Stream modes
LangGraph supports three stream modes. Pass streamMode to control what each SSE chunk contains.
// Receives the full agent state after every node execution.
// Best for message-based chat interfaces.
const chat = streamResource<{ messages: BaseMessage[] }>({
assistantId: 'chat_agent',
streamMode: 'values',
});
// chat.messages() always contains the complete message listUse values for most chat UIs — it gives you a consistent, complete state snapshot. Switch to messages only when you need sub-token latency or are rendering a live typing cursor.
Error handling
If the SSE connection drops or the agent throws, status() transitions to 'error' and error() is populated. Use these signals to render fallback UI and retry.
import { Component, computed, ChangeDetectionStrategy } from '@angular/core';
import { streamResource } from '@cacheplane/stream-resource';
import { BaseMessage } from '@langchain/core/messages';
@Component({
selector: 'app-chat',
templateUrl: './chat.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ChatComponent {
readonly chat = streamResource<{ messages: BaseMessage[] }>({
assistantId: 'chat_agent',
});
readonly hasError = computed(() => this.chat.status() === 'error');
retry() {
// Re-stream using the same thread so context is preserved
this.chat.submit();
}
}error() surfaces both transport-level failures (lost connection, 5xx) and application-level errors returned by the agent graph. Check error().cause for the underlying HTTP status when you need to distinguish them.
Throttle configuration
By default StreamResource emits a signal update for every incoming SSE chunk. On fast connections this can trigger hundreds of renders per second. Use the throttle option to coalesce updates.
const chat = streamResource<{ messages: BaseMessage[] }>({
assistantId: 'chat_agent',
// Batch incoming chunks and flush at most once every 50 ms
throttle: 50,
});The value is in milliseconds. A throttle of 0 (default) disables batching and passes every chunk through immediately. Good starting values:
| Use case | Recommended throttle | |---|---| | Token-by-token typing effect | 0 ms (disabled) | | Standard chat bubble | 50 ms | | Background summarisation | 150 ms |
Each call to chat.submit() opens a new SSE connection. Connections are automatically closed when the agent run completes or when the Angular component is destroyed — you do not need to manage the lifecycle manually.
What's Next
Resume conversations across page reloads using thread IDs and checkpointers.
Pause agent execution mid-stream to collect human input before continuing.
Unit-test components that use streamResource with the built-in test harness.
Full option reference for streamResource(), including all configuration keys.