Subgraphs
Subgraphs let you compose complex agents from smaller, focused units. streamResource() tracks subagent execution through dedicated signals, giving you visibility into delegated work.
LangGraph calls them subgraphs (modular graph composition). Deep Agents calls them subagents (task delegation). streamResource() supports both patterns through the same API.
How subgraph composition works
Subgraph composition starts on the agent side. Each subgraph is a fully compiled StateGraph that can be added as a node in a parent graph.
from langgraph.graph import END, START, MessagesState, StateGraph
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5-mini")
# --- Research subgraph ---
def search_web(state: MessagesState) -> dict:
query = state["messages"][-1].content
results = web_search(query)
return {"messages": [{"role": "assistant", "content": results}]}
def summarize_results(state: MessagesState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
research_builder = StateGraph(MessagesState)
research_builder.add_node("search", search_web)
research_builder.add_node("summarize", summarize_results)
research_builder.add_edge(START, "search")
research_builder.add_edge("search", "summarize")
research_builder.add_edge("summarize", END)
research_subgraph = research_builder.compile()
# --- Analysis subgraph ---
def analyze_data(state: MessagesState) -> dict:
response = llm.invoke([
{"role": "system", "content": "Analyze the data and provide insights."},
*state["messages"],
])
return {"messages": [response]}
analysis_builder = StateGraph(MessagesState)
analysis_builder.add_node("analyze", analyze_data)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", END)
analysis_subgraph = analysis_builder.compile()
# --- Parent orchestrator ---
def route_task(state: MessagesState) -> str:
last = state["messages"][-1].content.lower()
if "research" in last or "search" in last:
return "research"
return "analyze"
builder = StateGraph(MessagesState)
builder.add_node("research", research_subgraph)
builder.add_node("analyze", analysis_subgraph)
builder.add_conditional_edges(START, route_task)
builder.add_edge("research", END)
builder.add_edge("analyze", END)
graph = builder.compile()Tracking subagent execution
The subagents() signal contains a Map of active subagent streams. Use it to inspect the full set of delegated tasks and their current state.
const orchestrator = streamResource<OrchestratorState>({
assistantId: 'orchestrator',
subagentToolNames: ['research', 'analyze', 'summarize'],
});
// All subagent streams (active and completed)
const subagents = computed(() => orchestrator.subagents());
// Only active ones
const running = computed(() => orchestrator.activeSubagents());
const runningCount = computed(() => running().length);
// React to count changes
effect(() => {
console.log(`${runningCount()} subagents currently running`);
});Subagent stream details
Each SubagentStreamRef exposes its own reactive signals — status, messages, and errors — so you can surface granular progress in your UI.
// Access a specific subagent by its tool call ID
const researchAgent = computed(() =>
orchestrator.subagents().get('research-tool-call-id')
);
// Track its progress
const researchStatus = computed(() => researchAgent()?.status());
const researchMessages = computed(() => researchAgent()?.messages() ?? []);
const researchError = computed(() => researchAgent()?.error());Orchestrator pattern
The orchestrator pattern delegates specialised work to subagents and merges their results. Each subagent runs its own graph independently while the parent coordinates.
const pipeline = streamResource<PipelineState>({
assistantId: 'pipeline-orchestrator',
subagentToolNames: ['fetch-data', 'transform', 'validate', 'publish'],
filterSubagentMessages: true,
});
// Derive a summary of all subagent states
const pipelineStatus = computed(() => {
const agents = pipeline.subagents();
const entries = [...agents.entries()];
return {
total: entries.length,
pending: entries.filter(([, a]) => a.status() === 'pending').length,
running: entries.filter(([, a]) => a.status() === 'loading').length,
done: entries.filter(([, a]) => a.status() === 'complete').length,
failed: entries.filter(([, a]) => a.status() === 'error').length,
};
});Subagent progress UI
Render live progress for each subagent using the signals above.
import { Component, computed, inject, ChangeDetectionStrategy } from '@angular/core';
@Component({
selector: 'app-subagent-progress',
templateUrl: './progress-panel.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class SubagentProgressComponent {
orchestrator = inject(OrchestratorService).resource;
subagentEntries = computed(() =>
[...this.orchestrator.subagents().entries()]
);
}Filtering subagent messages
By default, subagent messages appear in the parent's messages() signal. Filter them out for a cleaner parent view.
const orchestrator = streamResource<OrchestratorState>({
assistantId: 'orchestrator',
filterSubagentMessages: true, // Hide subagent messages from parent
subagentToolNames: ['research', 'analyze'],
});
// Parent messages only (no subagent chatter)
const parentMessages = computed(() => orchestrator.messages());Set subagentToolNames to the tool names that spawn subagents. streamResource() uses this to identify which tool calls create subagent streams.
Error handling per subagent
Each subagent exposes its own error() signal so failures are isolated — one subagent failing does not stop the others.
const agents = orchestrator.subagents();
for (const [id, agent] of agents) {
effect(() => {
const err = agent.error();
if (err) {
console.error(`Subagent ${id} failed:`, err.message);
// Retry, surface to user, or fall back gracefully
}
});
}
// Collect all failed subagents reactively
const failedAgents = computed(() =>
[...orchestrator.subagents().entries()].filter(
([, agent]) => agent.status() === 'error'
)
);Always check failedAgents() before presenting final results. A completed orchestrator can still have subagents that errored — success at the top level does not guarantee all delegates succeeded.
When to use subagents vs a single agent
Use subagents when tasks are independent and can run in parallel, when each task needs its own context window, or when you want isolated error boundaries. Use a single agent for sequential reasoning, tasks that share tightly coupled state, or when latency from spawning subagents outweighs the parallelism benefit.
What's Next
Understand how streamResource() surfaces tokens, status, and errors in real time.
Inspect earlier states and replay alternate execution paths with checkpoint history.
Write unit and integration tests for orchestrator graphs and subagent interactions.
Full reference for streamResource() options, signals, and subagent configuration.