DocsConceptsAngular Signals

Angular Signals

Angular Signals are the reactive primitive that powers streamResource(). If you're coming from a Python AI/agent background and wondering how Angular handles real-time streaming data, this page is your guide. Every property on a StreamResourceRef is a Signal, which means your templates update automatically as tokens arrive — no manual subscriptions, no async pipes, no RxJS boilerplate.

For Python developers

Think of Signals like a Python property with built-in change notification. When the value changes, every consumer — templates, computed values, effects — re-evaluates automatically. If you've used Pydantic models with validators that react to field changes, Signals are the Angular equivalent but deeply integrated into the rendering engine.

What Are Angular Signals?

A Signal is a reactive value container introduced in Angular 16+. You create one, read it by calling it like a function, and Angular tracks which templates and computations depend on it.

import { signal, computed } from '@angular/core';
 
// Create a writable signal
const count = signal(0);
 
// Read the current value — call it like a function
console.log(count()); // 0
 
// Update the value
count.set(1);
count.update(prev => prev + 1);
 
// Derive new values with computed()
const doubled = computed(() => count() * 2);
console.log(doubled()); // 4

The key insight: Angular knows which Signals a template reads. When those Signals change, Angular re-renders only the affected parts of the DOM. No diffing the entire tree, no zone.js overhead.

How streamResource Uses Signals Internally

Under the hood, streamResource() receives Server-Sent Events (SSE) over HTTP and feeds them into RxJS BehaviorSubjects. It then converts those BehaviorSubjects into Angular Signals using toSignal(). This is the bridge between the async streaming world and Angular's synchronous reactivity model.

// Simplified view of what streamResource does internally:
 
// 1. SSE events arrive as an observable stream
const messages$ = new BehaviorSubject<BaseMessage[]>([]);
const status$ = new BehaviorSubject<ResourceStatus>('idle');
 
// 2. Each SSE chunk updates the BehaviorSubject
transport.onChunk(chunk => {
  messages$.next([...messages$.getValue(), chunk.message]);
});
 
// 3. BehaviorSubjects become Signals via toSignal()
const messages = toSignal(messages$, { initialValue: [] });
const status = toSignal(status$, { initialValue: 'idle' });
 
// 4. Your component reads pure Signals — no RxJS knowledge needed
Why this matters

The BehaviorSubject-to-Signal conversion means you get the best of both worlds: RxJS handles the async SSE transport (reconnection, backpressure, error recovery), while Signals handle the synchronous UI reactivity (change detection, template binding, computed derivations). You only interact with the Signal side.

The Streaming Lifecycle as Signals

Every streamResource() instance moves through a lifecycle: idle, loading, tokens arriving, then resolved (or error). The status() Signal reflects each transition in real time.

1
idle — Waiting for input

The resource has been created but no request has been submitted yet. All Signals hold their initial values.

const chat = streamResource<ChatState>({
  assistantId: 'chat_agent',
});
 
console.log(chat.status());     // 'idle'
console.log(chat.messages());   // []
console.log(chat.isLoading());  // false
2
loading — Request in flight

After calling submit(), the status transitions to 'loading'. The SSE connection is open and the agent is processing.

chat.submit({ messages: [{ role: 'user', content: 'Explain quantum computing' }] });
 
console.log(chat.status());     // 'loading'
console.log(chat.isLoading());  // true
console.log(chat.messages());   // [] (no tokens yet)
3
loading — Tokens streaming

As the agent generates tokens, the messages() Signal updates with each chunk. The status remains 'loading' throughout.

// After first few tokens arrive:
console.log(chat.status());     // 'loading' (still streaming)
console.log(chat.messages());   // [AIMessageChunk("Quantum computing uses...")]
 
// After more tokens:
console.log(chat.messages());   // [AIMessageChunk("Quantum computing uses qubits...")]
// The message content grows as tokens stream in
4
resolved — Stream complete

The agent has finished. All tokens have arrived. The status transitions to 'resolved'.

console.log(chat.status());     // 'resolved'
console.log(chat.isLoading());  // false
console.log(chat.messages());   // [AIMessage("Quantum computing uses qubits to...")]
5
error — Something went wrong

If the agent fails or the connection drops, the status transitions to 'error' and the error() Signal contains the failure details.

console.log(chat.status());     // 'error'
console.log(chat.error());      // HttpErrorResponse { status: 500, ... }
console.log(chat.isLoading());  // false

Composing Derived State with computed()

computed() lets you derive new Signals from streamResource Signals. These derived Signals update automatically whenever their dependencies change — during streaming, that means every time a new token arrives.

import { computed } from '@angular/core';
import { streamResource } from '@cacheplane/stream-resource';
 
const chat = streamResource<ChatState>({
  assistantId: 'chat_agent',
});
 
// Count all messages in the conversation
const messageCount = computed(() => chat.messages().length);
 
// Get the last message (useful for showing the latest response)
const lastMessage = computed(() => chat.messages().at(-1));
 
// Extract just the assistant's messages
const assistantMessages = computed(() =>
  chat.messages().filter(m => m._getType() === 'ai')
);
 
// Track which tools the agent is actively calling
const activeTools = computed(() =>
  chat.messages()
    .filter(m => m._getType() === 'ai')
    .flatMap(m => m.tool_calls ?? [])
    .filter(tc => !tc.result)
);
 
// Build a user-facing error message
const errorDisplay = computed(() => {
  const err = chat.error();
  if (!err) return null;
  if (err instanceof HttpErrorResponse) {
    return err.status === 429
      ? 'Rate limited. Please wait a moment.'
      : `Server error (${err.status})`;
  }
  return 'An unexpected error occurred.';
});
 
// Combine multiple signals into a single view model
const viewModel = computed(() => ({
  messages: chat.messages(),
  isStreaming: chat.isLoading(),
  canSend: chat.status() !== 'loading',
  messageCount: messageCount(),
  error: errorDisplay(),
}));
computed() is lazy and cached

A computed() only re-evaluates when one of its dependencies actually changes, and it caches the result. If chat.messages() emits the same reference, downstream computeds skip their work entirely. This matters for high-frequency streaming where tokens arrive rapidly.

Side Effects with effect()

Use effect() when a Signal change should trigger work that lives outside the template — logging, analytics, scrolling, persisting state. Effects run in the injection context and are automatically cleaned up when the component is destroyed.

import { effect } from '@angular/core';
 
// Log errors for observability
effect(() => {
  const err = chat.error();
  if (err) {
    console.error('[StreamResource] Agent error:', err);
    this.analytics.track('agent_error', { error: err });
  }
});
 
// Auto-scroll to bottom when new messages arrive
effect(() => {
  const msgs = chat.messages();
  if (msgs.length > 0) {
    // Schedule after Angular renders the new message
    setTimeout(() => {
      this.chatContainer.nativeElement.scrollTo({
        top: this.chatContainer.nativeElement.scrollHeight,
        behavior: 'smooth',
      });
    });
  }
});
 
// Track streaming duration for performance monitoring
effect(() => {
  const status = chat.status();
  if (status === 'loading') {
    this.streamStart = performance.now();
  }
  if (status === 'resolved' && this.streamStart) {
    const duration = performance.now() - this.streamStart;
    this.analytics.track('stream_duration_ms', { duration });
    this.streamStart = null;
  }
});
Avoid writing to Signals inside effect()

Writing to a Signal inside an effect() can create infinite loops. If you need to transform one Signal into another, use computed() instead. Reserve effect() for side effects that leave the reactive graph — DOM manipulation, logging, analytics, network calls.

Template Patterns

Angular's new control flow syntax (@if, @for, @switch) works naturally with Signals. Here's a complete chat template that handles every lifecycle state.

import { ChangeDetectionStrategy, Component, computed, effect, ElementRef, ViewChild } from '@angular/core';
import { streamResource } from '@cacheplane/stream-resource';
 
@Component({
  selector: 'app-chat',
  changeDetection: ChangeDetectionStrategy.OnPush,
  template: `
    <!-- Status bar -->
    @switch (chat.status()) {
      @case ('loading') {
        <div class="status-bar streaming">
          Agent is responding...
        </div>
      }
      @case ('error') {
        <div class="status-bar error">
          {{ errorDisplay() }}
          <button (click)="retry()">Retry</button>
        </div>
      }
    }
 
    <!-- Message list -->
    <div class="messages" #chatContainer>
      @for (message of chat.messages(); track $index) {
        @switch (message._getType()) {
          @case ('human') {
            <div class="message user">
              {{ message.content }}
            </div>
          }
          @case ('ai') {
            <div class="message assistant">
              {{ message.content }}
 
              <!-- Show tool calls if the assistant invoked any -->
              @for (tool of message.tool_calls ?? []; track tool.id) {
                <div class="tool-call">
                  Called: {{ tool.name }}
                </div>
              }
            </div>
          }
          @case ('tool') {
            <div class="message tool-result">
              {{ message.name }}: {{ message.content }}
            </div>
          }
        }
      } @empty {
        <div class="empty-state">
          Send a message to start the conversation.
        </div>
      }
    </div>
 
    <!-- Input area -->
    <form (submit)="send($event)">
      <input
        #input
        type="text"
        placeholder="Type a message..."
        [disabled]="chat.isLoading()"
      />
      <button type="submit" [disabled]="chat.isLoading()">
        @if (chat.isLoading()) { Streaming... } @else { Send }
      </button>
    </form>
  `,
})
export class ChatComponent {
  @ViewChild('chatContainer') chatContainer!: ElementRef<HTMLElement>;
 
  chat = streamResource<ChatState>({
    assistantId: 'chat_agent',
  });
 
  errorDisplay = computed(() => {
    const err = this.chat.error();
    if (!err) return '';
    return err instanceof HttpErrorResponse
      ? `Error ${err.status}: ${err.statusText}`
      : 'Connection lost. Please retry.';
  });
 
  scrollEffect = effect(() => {
    const msgs = this.chat.messages();
    if (msgs.length) {
      setTimeout(() =>
        this.chatContainer?.nativeElement.scrollTo({
          top: this.chatContainer.nativeElement.scrollHeight,
          behavior: 'smooth',
        })
      );
    }
  });
 
  send(event: Event) {
    event.preventDefault();
    const input = (event.target as HTMLFormElement).querySelector('input')!;
    const content = input.value.trim();
    if (!content) return;
 
    this.chat.submit({
      messages: [{ role: 'user', content }],
    });
    input.value = '';
  }
 
  retry() {
    this.chat.submit({
      messages: [{ role: 'user', content: 'Please try again.' }],
    });
  }
}

OnPush Change Detection

Every component using streamResource() should use ChangeDetectionStrategy.OnPush. Here's why it works and why it's efficient.

With the default change detection strategy, Angular checks every component in the tree on every browser event — clicks, timers, HTTP responses. For a streaming agent emitting dozens of tokens per second, that means hundreds of unnecessary checks across your entire app.

With OnPush, Angular only checks a component when:

  1. An @Input() reference changes
  2. An event fires inside the component's template
  3. A Signal that the template reads changes

Since streamResource() exposes Signals, condition 3 handles everything. When a new token arrives and messages() updates, Angular marks only the components reading that Signal for check — not the entire tree.

@Component({
  // Always use OnPush with streamResource
  changeDetection: ChangeDetectionStrategy.OnPush,
  template: `
    <p>{{ chat.messages().length }} messages</p>
    @if (chat.isLoading()) {
      <app-typing-indicator />
    }
  `,
})
export class ChatComponent {
  chat = streamResource<ChatState>({ assistantId: 'chat_agent' });
}
No markForCheck() needed

With older Observable-based patterns, you had to call ChangeDetectorRef.markForCheck() or use the async pipe to trigger OnPush updates. Signals do this automatically. When a Signal's value changes, Angular's internal notification system marks the component dirty — zero manual intervention.

Python Agent to Angular Signals

The real power of streamResource() is how it pairs a Python LangGraph agent with Angular Signals. The agent defines the logic; Signals surface the results in real time.

from langgraph.graph import END, START, MessagesState, StateGraph
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
 
llm = ChatOpenAI(model="gpt-5-mini", streaming=True)
 
@tool
def search_knowledge_base(query: str) -> str:
    """Search internal documentation for relevant information."""
    results = vector_store.similarity_search(query, k=3)
    return "\n".join(r.page_content for r in results)
 
tools = [search_knowledge_base]
 
def call_model(state: MessagesState) -> dict:
    response = llm.bind_tools(tools).invoke(state["messages"])
    return {"messages": [response]}
 
def should_continue(state: MessagesState) -> str:
    last_msg = state["messages"][-1]
    if last_msg.tool_calls:
        return "tools"
    return END
 
# Build the graph
builder = StateGraph(MessagesState)
builder.add_node("model", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "model")
builder.add_conditional_edges("model", should_continue)
builder.add_edge("tools", "model")
 
graph = builder.compile()

When the Python agent calls search_knowledge_base, the tool call streams to Angular as a message. When the tool returns, the result streams as another message. The agent's final response streams token by token. Every one of these events updates the messages() Signal, and your template re-renders the new content automatically.

Performance: Signals vs Alternatives

High-frequency token streaming puts unique pressure on a frontend framework. Here's why Signals with OnPush outperform the alternatives.

| Approach | Token update cost | Memory overhead | Cleanup required | |---|---|---|---| | Signals + OnPush | Marks only reading components | None beyond Signal | Automatic | | Observable + async pipe | Creates/destroys subscriptions per @if block | Subscription objects | Pipe handles it | | Observable + manual subscribe | Full component check if you forget markForCheck() | Subscription tracking | Manual unsubscribe | | Default change detection | Checks entire component tree | None | None |

For a typical chat UI receiving 30-50 tokens per second:

  • Signals + OnPush: Only the message list component and its direct ancestors are checked. The sidebar, header, settings panel — all skipped.
  • Default strategy: Every component in the tree is checked 30-50 times per second, even components with no streaming data.
  • Observable + async pipe: Works correctly but creates and destroys subscriptions each time an @if or @for block re-evaluates, adding GC pressure during rapid streaming.
Signal equality checks

Signals use referential equality (===) by default. streamResource() creates new array references for messages() only when the array actually changes (a new token arrives). Between updates, reading messages() returns the same reference and skips downstream recomputation. For custom equality, pass an equal function when creating a computed().

What's Next