DocsGuidesSubgraphs

Subgraphs

Subgraphs let you compose complex agents from smaller, focused units. streamResource() tracks subagent execution through dedicated signals, giving you visibility into delegated work.

Subgraphs vs subagents

LangGraph calls them subgraphs (modular graph composition). Deep Agents calls them subagents (task delegation). streamResource() supports both patterns through the same API.

How subgraph composition works

Subgraph composition starts on the agent side. Each subgraph is a fully compiled StateGraph that can be added as a node in a parent graph.

from langgraph.graph import END, START, MessagesState, StateGraph
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-5-mini")
 
# --- Research subgraph ---
def search_web(state: MessagesState) -> dict:
    query = state["messages"][-1].content
    results = web_search(query)
    return {"messages": [{"role": "assistant", "content": results}]}
 
def summarize_results(state: MessagesState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
 
research_builder = StateGraph(MessagesState)
research_builder.add_node("search", search_web)
research_builder.add_node("summarize", summarize_results)
research_builder.add_edge(START, "search")
research_builder.add_edge("search", "summarize")
research_builder.add_edge("summarize", END)
 
research_subgraph = research_builder.compile()
 
# --- Analysis subgraph ---
def analyze_data(state: MessagesState) -> dict:
    response = llm.invoke([
        {"role": "system", "content": "Analyze the data and provide insights."},
        *state["messages"],
    ])
    return {"messages": [response]}
 
analysis_builder = StateGraph(MessagesState)
analysis_builder.add_node("analyze", analyze_data)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", END)
 
analysis_subgraph = analysis_builder.compile()
 
# --- Parent orchestrator ---
def route_task(state: MessagesState) -> str:
    last = state["messages"][-1].content.lower()
    if "research" in last or "search" in last:
        return "research"
    return "analyze"
 
builder = StateGraph(MessagesState)
builder.add_node("research", research_subgraph)
builder.add_node("analyze", analysis_subgraph)
builder.add_conditional_edges(START, route_task)
builder.add_edge("research", END)
builder.add_edge("analyze", END)
 
graph = builder.compile()

Tracking subagent execution

The subagents() signal contains a Map of active subagent streams. Use it to inspect the full set of delegated tasks and their current state.

const orchestrator = streamResource<OrchestratorState>({
  assistantId: 'orchestrator',
  subagentToolNames: ['research', 'analyze', 'summarize'],
});
 
// All subagent streams (active and completed)
const subagents = computed(() => orchestrator.subagents());
 
// Only active ones
const running = computed(() => orchestrator.activeSubagents());
const runningCount = computed(() => running().length);
 
// React to count changes
effect(() => {
  console.log(`${runningCount()} subagents currently running`);
});

Subagent stream details

Each SubagentStreamRef exposes its own reactive signals — status, messages, and errors — so you can surface granular progress in your UI.

// Access a specific subagent by its tool call ID
const researchAgent = computed(() =>
  orchestrator.subagents().get('research-tool-call-id')
);
 
// Track its progress
const researchStatus = computed(() => researchAgent()?.status());
const researchMessages = computed(() => researchAgent()?.messages() ?? []);
const researchError = computed(() => researchAgent()?.error());

Orchestrator pattern

The orchestrator pattern delegates specialised work to subagents and merges their results. Each subagent runs its own graph independently while the parent coordinates.

const pipeline = streamResource<PipelineState>({
  assistantId: 'pipeline-orchestrator',
  subagentToolNames: ['fetch-data', 'transform', 'validate', 'publish'],
  filterSubagentMessages: true,
});
 
// Derive a summary of all subagent states
const pipelineStatus = computed(() => {
  const agents = pipeline.subagents();
  const entries = [...agents.entries()];
 
  return {
    total: entries.length,
    pending: entries.filter(([, a]) => a.status() === 'pending').length,
    running: entries.filter(([, a]) => a.status() === 'loading').length,
    done: entries.filter(([, a]) => a.status() === 'complete').length,
    failed: entries.filter(([, a]) => a.status() === 'error').length,
  };
});

Subagent progress UI

Render live progress for each subagent using the signals above.

import { Component, computed, inject, ChangeDetectionStrategy } from '@angular/core';
 
@Component({
  selector: 'app-subagent-progress',
  templateUrl: './progress-panel.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class SubagentProgressComponent {
  orchestrator = inject(OrchestratorService).resource;
 
  subagentEntries = computed(() =>
    [...this.orchestrator.subagents().entries()]
  );
}

Filtering subagent messages

By default, subagent messages appear in the parent's messages() signal. Filter them out for a cleaner parent view.

const orchestrator = streamResource<OrchestratorState>({
  assistantId: 'orchestrator',
  filterSubagentMessages: true,  // Hide subagent messages from parent
  subagentToolNames: ['research', 'analyze'],
});
 
// Parent messages only (no subagent chatter)
const parentMessages = computed(() => orchestrator.messages());
Subagent tool names

Set subagentToolNames to the tool names that spawn subagents. streamResource() uses this to identify which tool calls create subagent streams.

Error handling per subagent

Each subagent exposes its own error() signal so failures are isolated — one subagent failing does not stop the others.

const agents = orchestrator.subagents();
 
for (const [id, agent] of agents) {
  effect(() => {
    const err = agent.error();
    if (err) {
      console.error(`Subagent ${id} failed:`, err.message);
      // Retry, surface to user, or fall back gracefully
    }
  });
}
 
// Collect all failed subagents reactively
const failedAgents = computed(() =>
  [...orchestrator.subagents().entries()].filter(
    ([, agent]) => agent.status() === 'error'
  )
);
Partial failures

Always check failedAgents() before presenting final results. A completed orchestrator can still have subagents that errored — success at the top level does not guarantee all delegates succeeded.

When to use subagents vs a single agent

Choosing your architecture

Use subagents when tasks are independent and can run in parallel, when each task needs its own context window, or when you want isolated error boundaries. Use a single agent for sequential reasoning, tasks that share tightly coupled state, or when latency from spawning subagents outweighs the parallelism benefit.

What's Next