State Management
How agent state flows from LangGraph's server-side state machine into Angular Signals — and why the separation between server state and UI state makes your app simpler, not more complex.
LangGraph Platform owns the state. Angular owns the view. streamResource() is the read-only bridge between them. You never manually sync, serialize, or manage agent state in your Angular code.
State Lives on the Server
In a traditional Angular app, state lives in an NgRx store or a signals-based service. In a LangGraph app, the agent's state lives on the server — in LangGraph Platform's checkpoint store. Your Angular app is a stateless view layer that reads state through signals as the agent streams it back.
This inversion is intentional. Agent state can span multiple LLM calls, tool executions, and human-in-the-loop interrupts. It needs to survive browser refreshes, reconnections, and even server deployments. A server-side checkpoint store handles all of that automatically. Your Angular app just calls .submit() and reads signals.
Your Angular component calls agent.submit({ messages: [userMsg] }). No state is stored in the component.
@cacheplane/stream-resource forwards the input to FetchStreamTransport, which opens an HTTP POST and SSE connection to LangGraph Platform.
The agent runs its nodes — calling the LLM, invoking tools, checking conditions — and streams SSE events back with incremental state updates.
Incoming SSE chunks are parsed and pushed into BehaviorSubjects — one per signal type.
BehaviorSubjects are converted to Angular Signals via toSignal(). Every update triggers Angular's change detection automatically.
Components using OnPush re-render only when signal values change. No manual detectChanges(), no zone triggers, no subscriptions to manage.
Python State Design
On the Python side, your agent's state is a TypedDict. The fields you define here are exactly what streamResource<T>() exposes in TypeScript. Getting the Python state design right is the most important architectural decision in your agent.
The TypedDict Pattern
Every LangGraph state is a TypedDict. Fields can be plain values or annotated with reducers that control how updates are merged.
from typing_extensions import TypedDict
class ChatState(TypedDict):
messages: list # Will be replaced on each update
session_id: str # Single value, replaced on update
turn_count: int # Single value, replaced on updateReducers: How State Merges
When a node returns {"messages": [new_msg]}, LangGraph doesn't replace the messages list — it calls the reducer to merge the update. This is what Annotated[list, add] means: use Python's operator.add to concatenate lists.
from typing_extensions import TypedDict, Annotated
from operator import add
class ResearchState(TypedDict):
# Each node can append to these — they accumulate across the run
messages: Annotated[list, add]
sources: Annotated[list[str], add]
findings: Annotated[list[str], add]
# These are replaced (last write wins)
query: str
model: str
confidence: float
def researcher_node(state: ResearchState) -> dict:
result = llm.invoke(state["messages"])
new_sources = extract_sources(result.content)
# Returns partial state — only fields being updated
# LangGraph merges this into the existing state
return {
"messages": [result], # Appended via reducer
"sources": new_sources, # Appended via reducer
"confidence": 0.87, # Replaced
}Nodes return only the fields they change. LangGraph merges partial updates into the full state object. This is why you can have 10 nodes each updating different fields without conflicts.
TypeScript Interface Mapping
The TypeScript interface you pass to streamResource<T>() is your contract with the Python state. Every Python state field maps to a TypeScript property. The types don't need to match exactly — they just need to be compatible with the JSON that LangGraph streams back.
from typing_extensions import TypedDict, Annotated
from operator import add
from langgraph.graph import MessagesState
class ProjectState(MessagesState):
# From MessagesState: messages: Annotated[list[AnyMessage], add_messages]
files: Annotated[list[str], add]
analysis: dict[str, any] | None
progress: int
plan: Annotated[list[str], add]
error: str | NoneOnce you define the interface, every field is accessible via agent.value():
// Full typed state object
const state = agent.value(); // Signal<ProjectState>
// Computed values from nested fields
const score = computed(() => agent.value().analysis?.score ?? 0);
const fileCount = computed(() => agent.value().files.length);
const isDone = computed(() => agent.value().progress === 100);
// Direct messages access (shortcut for agent.value().messages)
const messages = agent.messages(); // Signal<BaseMessage[]>State Updates During Streaming
The agent doesn't wait until it's finished to send state updates. It streams partial state updates as each node completes. Your Angular signals update incrementally throughout the run.
How Partial Updates Arrive
LangGraph streams in values mode by default — each SSE event contains the full state snapshot after a node completes. In messages mode, you get individual message tokens as they're generated.
const agent = streamResource<ProjectState>({
assistantId: 'project_agent',
// Default: values mode — full state after each node
// streamMode: 'messages' — token-by-token for text fields
});Signals Update Mid-Stream
Because every state update is a new signal value, your templates reflect the agent's progress in real time — without polling, without timers, without manual state management.
@Component({
changeDetection: ChangeDetectionStrategy.OnPush,
template: `
<!-- Updates as each file is processed -->
<p>Files processed: {{ agent.value().files.length }}</p>
<!-- Progress bar updates as agent emits progress updates -->
<progress [value]="agent.value().progress" max="100" />
<!-- Plan items appear as the agent builds the plan -->
@for (step of agent.value().plan; track step) {
<li>{{ step }}</li>
}
<!-- Messages stream token by token -->
@for (msg of agent.messages(); track $index) {
<app-message [message]="msg" />
}
`
})
export class ProjectComponent {
readonly agent = streamResource<ProjectState>({
assistantId: 'project_agent',
});
}Immutability and OnPush
Every signal update produces a new object reference. Angular's OnPush change detection compares references — when a signal emits a new value, the component re-renders. You never need to clone objects or call markForCheck() manually.
// Safe: computed() re-evaluates when agent.value() changes
const hasErrors = computed(() =>
(agent.value().analysis?.issues ?? []).length > 0
);
// Safe: @for tracks by identity, not index, for stable DOM
// track $index is fine for messages since they always append
@for (msg of agent.messages(); track $index) {
<app-message [message]="msg" />
}
// Safe: null-coalescing handles state fields not yet populated
const score = computed(() => agent.value().analysis?.score ?? 0);streamResource() uses toSignal() internally with requireSync: false. Signals always have a value — even before the first stream update. You never need to handle undefined explicitly for the signal itself, though individual state fields may be null until the agent populates them.
Thread State vs Application State
There are two kinds of state in a LangGraph Angular app, and keeping them separate makes your code much easier to reason about.
Thread state is owned by LangGraph Platform. You read it through streamResource() signals. You never write to it directly — you only send new input via .submit().
Application state is owned by your Angular component or service. It's UI-only: sidebar visibility, active tab, selected message, form input values. It has nothing to do with the agent.
@Component({
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ChatComponent {
// --- Thread state (from agent, read-only) ---
readonly agent = streamResource<ChatState>({
assistantId: 'chat_agent',
});
// Convenience computed values from thread state
readonly messages = this.agent.messages; // Signal<BaseMessage[]>
readonly isLoading = this.agent.isLoading; // Signal<boolean>
readonly interrupted = this.agent.interrupt; // Signal<Interrupt | null>
// --- Application state (your Angular signals) ---
readonly sidebarOpen = signal(true);
readonly activeTab = signal<'chat' | 'history' | 'settings'>('chat');
readonly inputText = signal('');
readonly selectedMessageId = signal<string | null>(null);
// --- Actions ---
send() {
const text = this.inputText();
if (!text.trim()) return;
this.agent.submit({ messages: [{ role: 'user', content: text }] });
this.inputText.set(''); // UI state — clear the input
}
approve() {
this.agent.submit(null, { resume: { approved: true } });
}
}A common mistake is copying agent.messages() into a local signal to "control" it. This creates stale state bugs and defeats the purpose of the reactive signal model. Read thread state directly from agent.* signals and derive what you need with computed().
The Checkpoint Model
LangGraph Platform persists state at every node boundary using a checkpoint store. Each checkpoint is an immutable snapshot of the full state at a point in time.
Thread: "user_123_session"
│
├── Checkpoint 1 ← After call_model: { messages: [HumanMessage, AIMessage] }
├── Checkpoint 2 ← After tool_node: { messages: [..., ToolMessage] }
├── Checkpoint 3 ← After call_model: { messages: [..., AIMessage("Here's what I found...")] }
└── (current)
What This Means for Your Angular App
Resumable threads — If the user refreshes the page or closes the browser, the thread is still there. Pass the same threadId and streamResource() will restore the full conversation history automatically.
Time travel — You can fork a thread at any checkpoint and replay it with different input. This powers the time-travel debugging guides.
Interrupt persistence — When the agent raises an Interrupt, the checkpoint captures everything. The agent can be resumed hours or days later.
const agent = streamResource<ChatState>({
assistantId: 'chat_agent',
// Same threadId = restored conversation history
threadId: signal(this.route.snapshot.params['threadId']),
// New threadId auto-created for new conversations
onThreadId: (id) => this.router.navigate(['/chat', id]),
});
// Read checkpoint history for time-travel UI
const history = agent.history(); // Signal<ThreadState[]>
const branch = agent.branch(); // Signal<string> — active branch IDFor full checkpoint and time-travel patterns, see the Persistence guide and Time Travel guide.
Custom State Fields
messages is just one field. Real agents carry rich state: structured plans, tool results, progress indicators, metadata, and more. Every custom field you define in Python is available in your TypeScript interface.
from typing_extensions import TypedDict, Annotated
from operator import add
from langgraph.graph import MessagesState
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5-mini")
class ResearchState(MessagesState):
# Accumulating lists — each node can append
plan: Annotated[list[str], add]
sources: Annotated[list[str], add]
findings: Annotated[list[str], add]
# Scalar progress
progress: int # 0–100
# Structured results
report: dict | None # Final report when complete
# Agent metadata
query: str
model_used: str
def planner_node(state: ResearchState) -> dict:
steps = llm.invoke([
{"role": "system", "content": "Break this query into research steps."},
*state["messages"]
])
plan_items = steps.content.split("\n")
return {
"plan": plan_items, # Appended via reducer
"progress": 10,
"model_used": "gpt-5-mini",
}
def researcher_node(state: ResearchState) -> dict:
# Runs once per plan step in a loop
for step in state["plan"]:
result = search(step)
yield {
"findings": [result], # Each iteration appends
"progress": state["progress"] + (80 // len(state["plan"])),
}Derived State with computed()
You rarely need to consume agent.value() raw in your template. Use computed() to derive clean, focused values:
readonly agent = streamResource<ResearchState>({
assistantId: 'research_agent',
});
// Derived signals — recalculate only when their dependencies change
readonly progress = computed(() => this.agent.value().progress);
readonly isPlanning = computed(() => this.agent.value().plan.length === 0 && this.agent.isLoading());
readonly sourceCount = computed(() => this.agent.value().sources.length);
readonly hasReport = computed(() => this.agent.value().report !== null);
readonly reportTitle = computed(() => this.agent.value().report?.title ?? '');What's Next
How streamResource() uses Angular Signals for zero-subscription reactive rendering.
Configure stream modes — values, messages, events — for different use cases.
Thread-based conversation persistence and checkpoint configuration.
Fork threads at any checkpoint and replay with different input.
Human-in-the-loop approval flows and how interrupt state surfaces in Angular.
Nodes, edges, and the graph execution model behind the state machine.