DocsAPI ReferenceFetchStreamTransport

FetchStreamTransport

FetchStreamTransport is the production-ready transport that opens a real server-sent event connection using the browser's fetch API and reads a ReadableStream response body. It is the default transport you register with provideStreamResource in production builds.

When you interact with it directly

In most apps you will never import or inject FetchStreamTransport by name — you register it once in provideStreamResource and forget about it. The two cases where you reach for it explicitly are:

  1. Per-resource override — you want one resource to use a different transport than the global default while everything else stays on FetchStreamTransport.
  2. Outside the DI tree — you are constructing a resource in a context where global providers are not available and you need to supply the transport manually.
import { inject } from '@angular/core';
import { streamResource, FetchStreamTransport } from '@cacheplane/stream-resource';
 
// Override transport for a single resource
const events = streamResource<Event>({
  url: () => '/api/events',
  transport: inject(FetchStreamTransport),
});

How it works

FetchStreamTransport makes a fetch call to the given URL and expects the server to respond with Content-Type: text/event-stream. It then reads the ReadableStream body line-by-line, parses SSE data: fields, and emits each parsed JSON value into the resource signal.

The transport handles:

  • Backpressure — reads chunks at the pace the browser delivers them
  • Cancellation — aborts the underlying fetch when interrupt() is called or the resource is destroyed
  • Error propagation — network errors and non-2xx responses surface through resource.error()
Transport interface

FetchStreamTransport implements the StreamTransport interface. You can create custom transports (e.g. WebSocket-backed) by implementing the same interface and providing them in place of this class.

What's Next

FetchStreamTransportclass

Production transport that connects to a LangGraph Platform API via HTTP and SSE. Creates threads automatically if no threadId is provided, and streams events using the LangGraph SDK client.

Parameters

ParameterTypeDescription
apiUrlstringBase URL of the LangGraph Platform API
onThreadId?objectOptional callback invoked when a new thread is created

Methods

joinStream(threadId: string, runId: string, lastEventId: string | undefined, signal: AbortSignal)

Join an already-started run without creating a new thread.

ParameterTypeDescription
threadIdstring
runIdstring
lastEventIdstring | undefined
signalAbortSignal
stream(assistantId: string, threadId: string | null, payload: unknown, signal: AbortSignal)

Open a streaming connection, creating a thread if needed.

ParameterTypeDescription
assistantIdstring
threadIdstring | null
payloadunknown
signalAbortSignal

Examples

const transport = new FetchStreamTransport(
  'http://localhost:2024',
  (id) => console.log('New thread:', id),
);