DocsConceptsState Management

State Management

How agent state flows from LangGraph's server-side state machine into Angular Signals — and why the separation between server state and UI state makes your app simpler, not more complex.

Mental model

LangGraph Platform owns the state. Angular owns the view. streamResource() is the read-only bridge between them. You never manually sync, serialize, or manage agent state in your Angular code.

State Lives on the Server

In a traditional Angular app, state lives in an NgRx store or a signals-based service. In a LangGraph app, the agent's state lives on the server — in LangGraph Platform's checkpoint store. Your Angular app is a stateless view layer that reads state through signals as the agent streams it back.

This inversion is intentional. Agent state can span multiple LLM calls, tool executions, and human-in-the-loop interrupts. It needs to survive browser refreshes, reconnections, and even server deployments. A server-side checkpoint store handles all of that automatically. Your Angular app just calls .submit() and reads signals.

1
User submits input

Your Angular component calls agent.submit({ messages: [userMsg] }). No state is stored in the component.

2
streamResource() sends the request

@cacheplane/stream-resource forwards the input to FetchStreamTransport, which opens an HTTP POST and SSE connection to LangGraph Platform.

3
LangGraph Platform executes the graph

The agent runs its nodes — calling the LLM, invoking tools, checking conditions — and streams SSE events back with incremental state updates.

4
FetchStreamTransport parses the stream

Incoming SSE chunks are parsed and pushed into BehaviorSubjects — one per signal type.

5
streamResource() converts to Signals

BehaviorSubjects are converted to Angular Signals via toSignal(). Every update triggers Angular's change detection automatically.

6
Templates re-render

Components using OnPush re-render only when signal values change. No manual detectChanges(), no zone triggers, no subscriptions to manage.

Python State Design

On the Python side, your agent's state is a TypedDict. The fields you define here are exactly what streamResource<T>() exposes in TypeScript. Getting the Python state design right is the most important architectural decision in your agent.

The TypedDict Pattern

Every LangGraph state is a TypedDict. Fields can be plain values or annotated with reducers that control how updates are merged.

from typing_extensions import TypedDict
 
class ChatState(TypedDict):
    messages: list        # Will be replaced on each update
    session_id: str       # Single value, replaced on update
    turn_count: int       # Single value, replaced on update

Reducers: How State Merges

When a node returns {"messages": [new_msg]}, LangGraph doesn't replace the messages list — it calls the reducer to merge the update. This is what Annotated[list, add] means: use Python's operator.add to concatenate lists.

from typing_extensions import TypedDict, Annotated
from operator import add
 
class ResearchState(TypedDict):
    # Each node can append to these — they accumulate across the run
    messages: Annotated[list, add]
    sources: Annotated[list[str], add]
    findings: Annotated[list[str], add]
 
    # These are replaced (last write wins)
    query: str
    model: str
    confidence: float
 
def researcher_node(state: ResearchState) -> dict:
    result = llm.invoke(state["messages"])
    new_sources = extract_sources(result.content)
 
    # Returns partial state — only fields being updated
    # LangGraph merges this into the existing state
    return {
        "messages": [result],           # Appended via reducer
        "sources": new_sources,          # Appended via reducer
        "confidence": 0.87,              # Replaced
    }
Partial returns are the norm

Nodes return only the fields they change. LangGraph merges partial updates into the full state object. This is why you can have 10 nodes each updating different fields without conflicts.

TypeScript Interface Mapping

The TypeScript interface you pass to streamResource<T>() is your contract with the Python state. Every Python state field maps to a TypeScript property. The types don't need to match exactly — they just need to be compatible with the JSON that LangGraph streams back.

from typing_extensions import TypedDict, Annotated
from operator import add
from langgraph.graph import MessagesState
 
class ProjectState(MessagesState):
    # From MessagesState: messages: Annotated[list[AnyMessage], add_messages]
    files: Annotated[list[str], add]
    analysis: dict[str, any] | None
    progress: int
    plan: Annotated[list[str], add]
    error: str | None

Once you define the interface, every field is accessible via agent.value():

// Full typed state object
const state = agent.value();         // Signal<ProjectState>
 
// Computed values from nested fields
const score = computed(() => agent.value().analysis?.score ?? 0);
const fileCount = computed(() => agent.value().files.length);
const isDone = computed(() => agent.value().progress === 100);
 
// Direct messages access (shortcut for agent.value().messages)
const messages = agent.messages();   // Signal<BaseMessage[]>

State Updates During Streaming

The agent doesn't wait until it's finished to send state updates. It streams partial state updates as each node completes. Your Angular signals update incrementally throughout the run.

How Partial Updates Arrive

LangGraph streams in values mode by default — each SSE event contains the full state snapshot after a node completes. In messages mode, you get individual message tokens as they're generated.

const agent = streamResource<ProjectState>({
  assistantId: 'project_agent',
  // Default: values mode — full state after each node
  // streamMode: 'messages' — token-by-token for text fields
});

Signals Update Mid-Stream

Because every state update is a new signal value, your templates reflect the agent's progress in real time — without polling, without timers, without manual state management.

@Component({
  changeDetection: ChangeDetectionStrategy.OnPush,
  template: `
    <!-- Updates as each file is processed -->
    <p>Files processed: {{ agent.value().files.length }}</p>
 
    <!-- Progress bar updates as agent emits progress updates -->
    <progress [value]="agent.value().progress" max="100" />
 
    <!-- Plan items appear as the agent builds the plan -->
    @for (step of agent.value().plan; track step) {
      <li>{{ step }}</li>
    }
 
    <!-- Messages stream token by token -->
    @for (msg of agent.messages(); track $index) {
      <app-message [message]="msg" />
    }
  `
})
export class ProjectComponent {
  readonly agent = streamResource<ProjectState>({
    assistantId: 'project_agent',
  });
}

Immutability and OnPush

Every signal update produces a new object reference. Angular's OnPush change detection compares references — when a signal emits a new value, the component re-renders. You never need to clone objects or call markForCheck() manually.

// Safe: computed() re-evaluates when agent.value() changes
const hasErrors = computed(() =>
  (agent.value().analysis?.issues ?? []).length > 0
);
 
// Safe: @for tracks by identity, not index, for stable DOM
// track $index is fine for messages since they always append
@for (msg of agent.messages(); track $index) {
  <app-message [message]="msg" />
}
 
// Safe: null-coalescing handles state fields not yet populated
const score = computed(() => agent.value().analysis?.score ?? 0);
No manual subscriptions

streamResource() uses toSignal() internally with requireSync: false. Signals always have a value — even before the first stream update. You never need to handle undefined explicitly for the signal itself, though individual state fields may be null until the agent populates them.

Thread State vs Application State

There are two kinds of state in a LangGraph Angular app, and keeping them separate makes your code much easier to reason about.

Thread state is owned by LangGraph Platform. You read it through streamResource() signals. You never write to it directly — you only send new input via .submit().

Application state is owned by your Angular component or service. It's UI-only: sidebar visibility, active tab, selected message, form input values. It has nothing to do with the agent.

@Component({
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ChatComponent {
  // --- Thread state (from agent, read-only) ---
  readonly agent = streamResource<ChatState>({
    assistantId: 'chat_agent',
  });
 
  // Convenience computed values from thread state
  readonly messages = this.agent.messages;           // Signal<BaseMessage[]>
  readonly isLoading = this.agent.isLoading;         // Signal<boolean>
  readonly interrupted = this.agent.interrupt;       // Signal<Interrupt | null>
 
  // --- Application state (your Angular signals) ---
  readonly sidebarOpen = signal(true);
  readonly activeTab = signal<'chat' | 'history' | 'settings'>('chat');
  readonly inputText = signal('');
  readonly selectedMessageId = signal<string | null>(null);
 
  // --- Actions ---
  send() {
    const text = this.inputText();
    if (!text.trim()) return;
    this.agent.submit({ messages: [{ role: 'user', content: text }] });
    this.inputText.set('');                          // UI state — clear the input
  }
 
  approve() {
    this.agent.submit(null, { resume: { approved: true } });
  }
}
Don't mirror thread state into Angular signals

A common mistake is copying agent.messages() into a local signal to "control" it. This creates stale state bugs and defeats the purpose of the reactive signal model. Read thread state directly from agent.* signals and derive what you need with computed().

The Checkpoint Model

LangGraph Platform persists state at every node boundary using a checkpoint store. Each checkpoint is an immutable snapshot of the full state at a point in time.

Thread: "user_123_session"
│
├── Checkpoint 1  ← After call_model: { messages: [HumanMessage, AIMessage] }
├── Checkpoint 2  ← After tool_node: { messages: [..., ToolMessage] }
├── Checkpoint 3  ← After call_model: { messages: [..., AIMessage("Here's what I found...")] }
└── (current)

What This Means for Your Angular App

Resumable threads — If the user refreshes the page or closes the browser, the thread is still there. Pass the same threadId and streamResource() will restore the full conversation history automatically.

Time travel — You can fork a thread at any checkpoint and replay it with different input. This powers the time-travel debugging guides.

Interrupt persistence — When the agent raises an Interrupt, the checkpoint captures everything. The agent can be resumed hours or days later.

const agent = streamResource<ChatState>({
  assistantId: 'chat_agent',
 
  // Same threadId = restored conversation history
  threadId: signal(this.route.snapshot.params['threadId']),
 
  // New threadId auto-created for new conversations
  onThreadId: (id) => this.router.navigate(['/chat', id]),
});
 
// Read checkpoint history for time-travel UI
const history = agent.history();    // Signal<ThreadState[]>
const branch = agent.branch();      // Signal<string> — active branch ID

For full checkpoint and time-travel patterns, see the Persistence guide and Time Travel guide.

Custom State Fields

messages is just one field. Real agents carry rich state: structured plans, tool results, progress indicators, metadata, and more. Every custom field you define in Python is available in your TypeScript interface.

from typing_extensions import TypedDict, Annotated
from operator import add
from langgraph.graph import MessagesState
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-5-mini")
 
class ResearchState(MessagesState):
    # Accumulating lists — each node can append
    plan: Annotated[list[str], add]
    sources: Annotated[list[str], add]
    findings: Annotated[list[str], add]
 
    # Scalar progress
    progress: int          # 0–100
 
    # Structured results
    report: dict | None    # Final report when complete
 
    # Agent metadata
    query: str
    model_used: str
 
def planner_node(state: ResearchState) -> dict:
    steps = llm.invoke([
        {"role": "system", "content": "Break this query into research steps."},
        *state["messages"]
    ])
    plan_items = steps.content.split("\n")
    return {
        "plan": plan_items,       # Appended via reducer
        "progress": 10,
        "model_used": "gpt-5-mini",
    }
 
def researcher_node(state: ResearchState) -> dict:
    # Runs once per plan step in a loop
    for step in state["plan"]:
        result = search(step)
        yield {
            "findings": [result],  # Each iteration appends
            "progress": state["progress"] + (80 // len(state["plan"])),
        }

Derived State with computed()

You rarely need to consume agent.value() raw in your template. Use computed() to derive clean, focused values:

readonly agent = streamResource<ResearchState>({
  assistantId: 'research_agent',
});
 
// Derived signals — recalculate only when their dependencies change
readonly progress = computed(() => this.agent.value().progress);
readonly isPlanning = computed(() => this.agent.value().plan.length === 0 && this.agent.isLoading());
readonly sourceCount = computed(() => this.agent.value().sources.length);
readonly hasReport = computed(() => this.agent.value().report !== null);
readonly reportTitle = computed(() => this.agent.value().report?.title ?? '');

What's Next