Interrupts
Interrupts let your LangGraph agent pause mid-execution and hand control to a human. The agent proposes an action, the graph freezes, your Angular UI shows an approval dialog, the user decides, and the agent resumes with the human's decision. streamResource() surfaces interrupts as Angular Signals, so building approval flows, confirmation dialogs, and multi-step review experiences requires no manual event wiring.
Use interrupts when an agent action is irreversible (sending an email, placing an order, deleting data), when the agent needs a human decision it cannot make on its own, or when compliance requires explicit approval before execution.
The Interrupt Lifecycle
Before diving into code, understand the five-stage lifecycle that every interrupt follows:
The agent reasons about the user's request and determines an action that requires human approval. It builds a structured payload describing what it wants to do.
The agent node calls raise Interrupt(value={...}), which freezes the graph. The interrupt payload is persisted in the checkpoint and streamed to the client.
streamResource() updates the interrupt() signal. Your Angular template detects the change through OnPush change detection and renders an approval dialog with the interrupt payload.
The user reviews the proposed action and clicks Approve or Reject. Your component calls agent.submit() with a resume payload containing the decision.
LangGraph resumes the graph from the interrupted checkpoint. The next node receives the human's decision and either executes or aborts the action.
Python: Raising an Interrupt
An interrupt is raised inside any graph node by calling raise Interrupt(value={...}). The value can be any JSON-serializable object — it becomes the payload your Angular component displays.
from langgraph.graph import END, START, StateGraph
from langgraph.types import Interrupt, Command
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict, Annotated
from operator import add
llm = ChatOpenAI(model="gpt-5-mini")
class State(TypedDict):
messages: Annotated[list, add]
proposed_action: dict
approval_result: dict
def plan_action(state: State) -> dict:
"""Agent analyzes the request and proposes an action."""
response = llm.invoke([
{"role": "system", "content": (
"Analyze the user's request. If it requires sending "
"an email, modifying data, or any irreversible action, "
"return a JSON action plan with keys: action, target, "
"description, risk_level."
)},
*state["messages"]
])
action = parse_json(response.content)
return {
"proposed_action": action,
"messages": [response],
}
def request_approval(state: State) -> dict:
"""Pause the graph and ask the human for approval."""
action = state["proposed_action"]
raise Interrupt(value={
"action": action["action"],
"target": action["target"],
"description": action["description"],
"risk_level": action.get("risk_level", "medium"),
})
def execute_action(state: State) -> dict:
"""Run the approved action or explain the rejection."""
result = state.get("approval_result", {})
if result.get("approved"):
# Execute the real action
outcome = perform_action(state["proposed_action"])
return {
"messages": [{"role": "assistant", "content": (
f"Done. {outcome}"
)}]
}
else:
reason = result.get("reason", "No reason given")
return {
"messages": [{"role": "assistant", "content": (
f"Action cancelled. Reason: {reason}"
)}]
}
# Build the graph: plan → approve → execute
builder = StateGraph(State)
builder.add_node("plan", plan_action)
builder.add_node("approve", request_approval)
builder.add_node("execute", execute_action)
builder.add_edge(START, "plan")
builder.add_edge("plan", "approve")
builder.add_edge("approve", "execute")
builder.add_edge("execute", END)
graph = builder.compile()Place the raise Interrupt() call in its own dedicated node. This gives you a clean three-node pattern (plan, approve, execute) where the interrupt sits between reasoning and action. If you raise an interrupt inside a node that also does work, the work before the interrupt runs twice on resume.
Angular: Building an Approval Component
When the agent raises an interrupt, streamResource() populates the interrupt() signal with the interrupt payload. Your component reads this signal to render a dialog and calls submit() to resume.
import {
Component,
computed,
signal,
ChangeDetectionStrategy,
} from '@angular/core';
import { streamResource, BaseMessage } from '@cacheplane/stream-resource';
interface ApprovalPayload {
action: string;
target: string;
description: string;
risk_level: 'low' | 'medium' | 'high';
}
interface AgentState {
messages: BaseMessage[];
proposed_action: ApprovalPayload;
approval_result: { approved: boolean; reason?: string };
}
@Component({
selector: 'app-approval',
templateUrl: './approval.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ApprovalComponent {
agent = streamResource<AgentState>({
assistantId: 'approval_agent',
});
messages = computed(() => this.agent.messages());
pendingApproval = computed(() => this.agent.interrupt());
isLoading = computed(() => this.agent.isLoading());
rejectionReason = signal('');
riskClass = computed(() => {
const interrupt = this.pendingApproval();
if (!interrupt) return '';
const level = interrupt.value?.risk_level ?? 'medium';
return `risk-${level}`;
});
send(input: string) {
this.agent.submit({
messages: [{ role: 'user', content: input }],
});
}
approve() {
this.agent.submit(null, {
resume: { approved: true },
});
}
reject() {
this.agent.submit(null, {
resume: {
approved: false,
reason: this.rejectionReason() || 'User rejected',
},
});
this.rejectionReason.set('');
}
}Multi-Step Approval Pattern
Some workflows require multiple approvals in sequence. For example, an agent that plans a multi-step deployment might need approval at each stage. Each node in the graph can raise its own interrupt.
from langgraph.graph import END, START, StateGraph
from langgraph.types import Interrupt
from typing_extensions import TypedDict, Annotated
from operator import add
class DeployState(TypedDict):
messages: Annotated[list, add]
plan: list[dict]
current_step: int
completed_steps: list[str]
def create_plan(state: DeployState) -> dict:
"""Generate a multi-step deployment plan."""
plan = [
{"step": "backup", "description": "Back up current database"},
{"step": "migrate", "description": "Run schema migrations"},
{"step": "deploy", "description": "Deploy new application version"},
]
return {"plan": plan, "current_step": 0}
def approve_step(state: DeployState) -> dict:
"""Interrupt for each step that needs approval."""
step_index = state["current_step"]
step = state["plan"][step_index]
raise Interrupt(value={
"step_number": step_index + 1,
"total_steps": len(state["plan"]),
"step": step["step"],
"description": step["description"],
"completed": state.get("completed_steps", []),
})
def execute_step(state: DeployState) -> dict:
"""Execute the approved step and advance."""
step = state["plan"][state["current_step"]]
# ... perform the actual deployment step ...
return {
"completed_steps": [step["step"]],
"current_step": state["current_step"] + 1,
"messages": [{"role": "assistant", "content": (
f"Completed: {step['description']}"
)}],
}
def should_continue(state: DeployState) -> str:
if state["current_step"] < len(state["plan"]):
return "approve_step"
return END
builder = StateGraph(DeployState)
builder.add_node("create_plan", create_plan)
builder.add_node("approve_step", approve_step)
builder.add_node("execute_step", execute_step)
builder.add_edge(START, "create_plan")
builder.add_edge("create_plan", "approve_step")
builder.add_edge("approve_step", "execute_step")
builder.add_conditional_edges("execute_step", should_continue)
graph = builder.compile()Typed Interrupt Payloads with BagTemplate
By default, interrupt() returns an untyped object. The BagTemplate generic parameter on streamResource() lets you define the exact shape of your interrupt payloads, giving you full TypeScript safety throughout your component.
BagTemplate is a type parameter on the streamResource configuration that maps signal names to their types. When you specify an interrupt type through BagTemplate, the interrupt() signal returns a properly typed object instead of unknown. This means your template expressions, computed signals, and event handlers all benefit from compile-time checking.
import { streamResource, BagTemplate } from '@cacheplane/stream-resource';
// Define the exact shape of your interrupt payload
interface DeployApproval {
step_number: number;
total_steps: number;
step: string;
description: string;
completed: string[];
}
// Pass the interrupt type via BagTemplate
const agent = streamResource<
DeployState,
BagTemplate<{ interrupt: DeployApproval }>
>({
assistantId: 'deploy_agent',
});
// Now interrupt() is typed — no casting needed
const step = agent.interrupt();
// ^? Signal<{ value: DeployApproval } | null>
// TypeScript catches errors at compile time
const num = step?.value.step_number; // number — correct
const bad = step?.value.nonexistent; // Error — property doesn't existDefine your interrupt payload interfaces alongside your Python state schema. This creates a contract between your agent and your UI. When the Python payload shape changes, the TypeScript interface should change too. Consider generating types from a shared schema to keep them in sync.
Timeout Handling
Interrupts pause graph execution indefinitely by default — the agent waits until a human responds. In production, you often need to handle cases where no one responds within a reasonable time. There are two strategies for managing interrupt timeouts.
Server-side timeout with a background task: Schedule a background job that checks for stale interrupts and resumes them with a default decision.
async def check_stale_interrupts():
"""Periodic task to auto-reject stale interrupts."""
threads = await client.threads.search(
status="interrupted",
metadata={"interrupt_type": "approval"},
)
for thread in threads:
created = thread.updated_at
if (now() - created).total_seconds() > 3600: # 1 hour timeout
await client.runs.create(
thread["thread_id"],
assistant_id="approval_agent",
input=None,
command={"resume": {
"approved": False,
"reason": "Auto-rejected: approval timeout",
}},
)Client-side timeout in Angular: Use a timer in your component to auto-reject if the user does not act.
import { effect } from '@angular/core';
import { timer } from 'rxjs';
// Watch for interrupts and start a timeout
effect(() => {
const interrupt = this.agent.interrupt();
if (interrupt) {
const sub = timer(5 * 60 * 1000).subscribe(() => {
// Auto-reject after 5 minutes of inaction
this.agent.submit(null, {
resume: { approved: false, reason: 'Approval timeout' },
});
});
// Clean up if user responds before timeout
return () => sub.unsubscribe();
}
});Avoid running both server-side and client-side timeouts simultaneously. If both fire, the second resume call will fail because the graph already moved past the interrupt. Choose server-side timeouts for reliability (works even if the browser closes) or client-side timeouts for immediacy.
Because interrupts are checkpointed, the user can close their browser, come back hours later, and still approve or reject the pending action. The graph state is frozen in the checkpoint store, not in browser memory.