DocsAPI ReferenceMockStreamTransport

MockStreamTransport

MockStreamTransport is a test-friendly transport that replaces real network calls with an in-memory event emitter. Use it in unit and component tests to push values on demand and assert against your component's reactive state without a running server.

Complete test example

The pattern below covers the full lifecycle: configure the transport in TestBed, create the component, emit values, and assert signal state.

import { Component, inject } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import {
  provideStreamResource,
  MockStreamTransport,
  streamResource,
} from '@cacheplane/stream-resource';
 
@Component({ template: '' })
class RepoComponent {
  readonly repo = streamResource<{ name: string }>({
    url: () => '/api/repos/42',
  });
}
 
describe('RepoComponent', () => {
  let transport: MockStreamTransport;
 
  beforeEach(() => {
    TestBed.configureTestingModule({
      imports: [RepoComponent],
      providers: [provideStreamResource({ transport: MockStreamTransport })],
    });
    transport = TestBed.inject(MockStreamTransport);
  });
 
  it('reflects the streamed value', () => {
    const fixture = TestBed.createComponent(RepoComponent);
    fixture.detectChanges();
 
    // Push a value into the stream — synchronous, no fakeAsync needed
    transport.emit('/api/repos/42', { name: 'my-repo' });
    fixture.detectChanges();
 
    expect(fixture.componentInstance.repo.value()).toEqual({ name: 'my-repo' });
    expect(fixture.componentInstance.repo.status()).toBe('streaming');
  });
 
  it('surfaces errors through the error signal', () => {
    const fixture = TestBed.createComponent(RepoComponent);
    fixture.detectChanges();
 
    transport.error('/api/repos/42', new Error('not found'));
    fixture.detectChanges();
 
    expect(fixture.componentInstance.repo.status()).toBe('error');
    expect(fixture.componentInstance.repo.error()).toBeInstanceOf(Error);
  });
});

MockStreamTransport API

| Method | Description | |--------|-------------| | emit(url, value) | Pushes a single value into the stream at the given URL path. | | error(url, err) | Triggers an error on the stream at the given URL path. | | complete(url) | Closes the stream cleanly, as if the server sent the final event. |

Deterministic tests

Because MockStreamTransport is synchronous by default, you can emit values and assert state changes in the same test tick — no fakeAsync or tick required.

What's Next

MockStreamTransportclass

Test transport for deterministic agent testing without a real LangGraph server. Script event batches upfront, then emit them manually or step through them in your test specs. Supports error injection and close control.

Parameters

ParameterTypeDescription
scriptStreamEvent[][]Array of event batches. Each batch is emitted as a group.

Methods

close()

Close the stream. Remaining queued events are drained before completion.

emit(events: StreamEvent[])

Manually emit events into the stream.

ParameterTypeDescription
eventsStreamEvent[]
emitError(err: Error)

Inject an error into the stream.

ParameterTypeDescription
errError
isStreaming()

Returns true if a stream is currently active.

nextBatch()

Advance to the next scripted batch and return its events.

stream(_assistantId: string, _threadId: string | null, _payload: unknown, signal: AbortSignal)

Open a streaming connection to an agent and yield events.

ParameterTypeDescription
_assistantIdstring
_threadIdstring | null
_payloadunknown
signalAbortSignal

Examples

const transport = new MockStreamTransport([
  [{ type: 'values', data: { messages: [aiMsg('Hello')] } }],
  [{ type: 'values', data: { status: 'done' } }],
]);