DocsGuidesTesting

Testing

MockStreamTransport lets you test agent interactions deterministically without a running LangGraph server. Script exact event sequences, step through streaming lifecycles, and verify every signal transition in your Angular test specs.

No flaky tests

MockStreamTransport eliminates network dependencies, timing issues, and server state. Every test run produces identical results. Your CI pipeline stays green.

Python: Testing the Agent

Before testing the Angular side, make sure your agent logic is correct. LangGraph agents are plain Python functions — test them directly with pytest.

import pytest
from langchain_core.messages import HumanMessage
from my_agent.agent import graph
 
@pytest.mark.asyncio
async def test_agent_responds():
    result = await graph.ainvoke(
        {"messages": [HumanMessage(content="Hello")]},
        config={"configurable": {"thread_id": "test_1"}},
    )
    assert len(result["messages"]) >= 2
    assert result["messages"][-1].type == "ai"
 
@pytest.mark.asyncio
async def test_agent_uses_tools():
    result = await graph.ainvoke(
        {"messages": [HumanMessage(content="Search for LangGraph docs")]},
        config={"configurable": {"thread_id": "test_2"}},
    )
    # Verify the agent called the search tool
    tool_messages = [m for m in result["messages"] if m.type == "tool"]
    assert len(tool_messages) > 0
Agent tests are fast

With MemorySaver and a mocked LLM, agent tests run in milliseconds. Use langchain_core.language_models.FakeListChatModel to remove the LLM dependency entirely.

MockStreamTransport: Basic Setup

On the Angular side, MockStreamTransport replaces the real HTTP transport. Create it inside TestBed.runInInjectionContext so streamResource() has access to Angular's dependency injection.

import { TestBed } from '@angular/core/testing';
import { MockStreamTransport, streamResource } from '@cacheplane/stream-resource';
import type { BaseMessage } from '@cacheplane/stream-resource';
 
describe('ChatComponent', () => {
  it('should display agent messages', () => {
    const transport = new MockStreamTransport();
 
    TestBed.runInInjectionContext(() => {
      const chat = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'test_agent',
        transport,
      });
 
      // Emit a values event — simulates the agent responding
      transport.emit([
        {
          type: 'values',
          messages: [{ role: 'assistant', content: 'Hello!' }],
        },
      ]);
 
      expect(chat.messages().length).toBe(1);
      expect(chat.messages()[0].content).toBe('Hello!');
    });
  });
});

Scripted Event Sequences

Pass event batches to the constructor for sequential playback. Each call to nextBatch() advances one step — giving you frame-by-frame control over what the component sees.

const transport = new MockStreamTransport([
  // Batch 1: Agent starts thinking
  [{ type: 'values', messages: [{ role: 'assistant', content: 'Analyzing...' }] }],
  // Batch 2: Agent finishes
  [{ type: 'values', messages: [{ role: 'assistant', content: 'Here is your answer.' }] }],
]);
 
TestBed.runInInjectionContext(() => {
  const chat = streamResource<{ messages: BaseMessage[] }>({
    assistantId: 'test_agent',
    transport,
  });
 
  chat.submit({ messages: [{ role: 'user', content: 'Explain signals' }] });
 
  // Step through each batch
  transport.nextBatch();
  expect(chat.messages()[0].content).toBe('Analyzing...');
 
  transport.nextBatch();
  expect(chat.messages()[0].content).toBe('Here is your answer.');
});

Testing the Streaming Lifecycle

The most common test pattern verifies the full submit-to-resolved lifecycle: submit triggers loading, values arrive, and the status settles to resolved.

import { TestBed } from '@angular/core/testing';
import { MockStreamTransport, streamResource } from '@cacheplane/stream-resource';
 
describe('streaming lifecycle', () => {
  it('should transition through loading → values → resolved', () => {
    const transport = new MockStreamTransport([
      [{ type: 'values', messages: [{ role: 'assistant', content: 'Thinking...' }] }],
      [{ type: 'values', messages: [{ role: 'assistant', content: 'Done!' }] }],
    ]);
 
    TestBed.runInInjectionContext(() => {
      const chat = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'test_agent',
        transport,
      });
 
      // Initial state
      expect(chat.status()).toBe('idle');
      expect(chat.messages()).toEqual([]);
 
      // Submit triggers loading
      chat.submit({ messages: [{ role: 'user', content: 'Hello' }] });
      expect(chat.status()).toBe('loading');
      expect(chat.isLoading()).toBe(true);
 
      // First batch — partial response
      transport.nextBatch();
      expect(chat.messages()[0].content).toBe('Thinking...');
      expect(chat.status()).toBe('loading');
 
      // Second batch — final response
      transport.nextBatch();
      expect(chat.messages()[0].content).toBe('Done!');
 
      // Stream completes
      transport.complete();
      expect(chat.status()).toBe('resolved');
      expect(chat.isLoading()).toBe(false);
    });
  });
});

Testing Interrupts

Script an interrupt event to test human-in-the-loop flows. Verify the interrupt signal surfaces the payload, then resume and confirm the agent continues.

import { TestBed } from '@angular/core/testing';
import { MockStreamTransport, streamResource } from '@cacheplane/stream-resource';
 
describe('interrupt handling', () => {
  it('should surface interrupt and resume on approval', () => {
    const transport = new MockStreamTransport();
 
    TestBed.runInInjectionContext(() => {
      const agent = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'approval_agent',
        transport,
      });
 
      // Agent hits an interrupt
      transport.emit([
        {
          type: 'interrupt',
          value: { action: 'delete_account', risk: 'high' },
        },
      ]);
 
      // Verify interrupt signal
      expect(agent.interrupt()).toBeDefined();
      expect(agent.interrupt()?.value.action).toBe('delete_account');
      expect(agent.interrupt()?.value.risk).toBe('high');
 
      // User approves — resume the agent
      agent.submit(null, { resume: { approved: true } });
 
      // Agent continues after approval
      transport.emit([
        {
          type: 'values',
          messages: [{ role: 'assistant', content: 'Account deleted.' }],
        },
      ]);
 
      expect(agent.interrupt()).toBeNull();
      expect(agent.messages()[0].content).toBe('Account deleted.');
    });
  });
});

Testing Errors

Inject errors with emitError() to verify your component handles failures gracefully.

import { TestBed } from '@angular/core/testing';
import { MockStreamTransport, streamResource } from '@cacheplane/stream-resource';
 
describe('error handling', () => {
  it('should surface errors and set error status', () => {
    const transport = new MockStreamTransport();
 
    TestBed.runInInjectionContext(() => {
      const chat = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'test_agent',
        transport,
      });
 
      chat.submit({ messages: [{ role: 'user', content: 'Hello' }] });
 
      // Simulate a connection failure
      transport.emitError(new Error('Connection lost'));
 
      expect(chat.error()).toBeDefined();
      expect(chat.error()?.message).toBe('Connection lost');
      expect(chat.status()).toBe('error');
      expect(chat.isLoading()).toBe(false);
    });
  });
 
  it('should recover from errors on retry', () => {
    const transport = new MockStreamTransport();
 
    TestBed.runInInjectionContext(() => {
      const chat = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'test_agent',
        transport,
      });
 
      // First attempt fails
      chat.submit({ messages: [{ role: 'user', content: 'Hello' }] });
      transport.emitError(new Error('Timeout'));
      expect(chat.status()).toBe('error');
 
      // Retry succeeds
      chat.submit({ messages: [{ role: 'user', content: 'Hello' }] });
      transport.emit([
        {
          type: 'values',
          messages: [{ role: 'assistant', content: 'Sorry for the delay!' }],
        },
      ]);
 
      expect(chat.status()).not.toBe('error');
      expect(chat.messages()[0].content).toBe('Sorry for the delay!');
    });
  });
});

Testing Thread Switching

Verify that switching threads loads the correct conversation state and clears the previous thread's messages.

describe('thread switching', () => {
  it('should load new thread state on switch', () => {
    const transport = new MockStreamTransport();
 
    TestBed.runInInjectionContext(() => {
      const threadId = signal<string | null>('thread_A');
 
      const chat = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'test_agent',
        threadId,
        transport,
      });
 
      // Thread A has messages
      transport.emit([
        {
          type: 'values',
          messages: [{ role: 'assistant', content: 'Thread A response' }],
        },
      ]);
      expect(chat.messages()[0].content).toBe('Thread A response');
 
      // Switch to thread B
      chat.switchThread('thread_B');
 
      // Thread B loads its own state
      transport.emit([
        {
          type: 'values',
          messages: [{ role: 'assistant', content: 'Thread B response' }],
        },
      ]);
      expect(chat.messages()[0].content).toBe('Thread B response');
    });
  });
 
  it('should create a new thread when switching to null', () => {
    const transport = new MockStreamTransport();
 
    TestBed.runInInjectionContext(() => {
      const chat = streamResource<{ messages: BaseMessage[] }>({
        assistantId: 'test_agent',
        transport,
      });
 
      // Start a conversation
      transport.emit([
        {
          type: 'values',
          messages: [{ role: 'assistant', content: 'Hello' }],
        },
      ]);
 
      // Switch to new thread
      chat.switchThread(null);
      expect(chat.messages()).toEqual([]);
    });
  });
});

Test Setup Workflow

1
Install dependencies

Make sure @cacheplane/stream-resource is available in your test environment. MockStreamTransport ships with the main package — no extra install needed.

2
Create the transport

Instantiate MockStreamTransport with optional pre-scripted batches for sequential playback, or leave it empty for imperative emit() calls.

3
Wrap in injection context

Call TestBed.runInInjectionContext(() => { ... }) so streamResource() can access Angular's injector for signal creation and cleanup.

4
Create the resource

Pass the transport to streamResource() via the transport option. All other options (assistantId, threadId, onThreadId) work identically to production code.

5
Script events

Use transport.emit() for ad-hoc events, transport.nextBatch() for pre-scripted sequences, or transport.emitError() for failure scenarios.

6
Assert signal values

Read signals like chat.messages(), chat.status(), chat.interrupt(), and chat.error() to verify your component reacts correctly.

Integration Testing

For end-to-end confidence, run tests against a real LangGraph dev server. The LangGraph CLI starts a local server that your tests can hit directly.

# Start the dev server
langgraph dev --config langgraph.json
 
# Run Angular tests against it (no MockStreamTransport needed)
ng test --watch=false
Integration tests are slow

Integration tests hit a real server and (potentially) a real LLM. Reserve them for CI pipelines or pre-release smoke tests. Use MockStreamTransport for the vast majority of your test suite — it runs in milliseconds with zero external dependencies.

What's Next