Guide · Agentic Frameworks

MCP server LangGraph integration

LangGraph builds stateful, long-running agent workflows on top of LangChain by giving you explicit control over graph topology — nodes that process state, edges that route between nodes, and checkpointers that persist state across interruptions. MCP servers are the natural tool layer for LangGraph: they implement the external capabilities that graph nodes call, while LangGraph handles the orchestration, memory, and decision flow. The integration works through the same langchain-mcp-adapters client used for standard LangChain agents, but LangGraph adds important production considerations around connection lifecycle across checkpoints, parallel tool calls, and error recovery as a first-class graph structure.

TL;DR

Use MultiServerMCPClient.get_tools() to load MCP tools, then pass them to LangGraph's ToolNode or create_react_agent. Initialize the MCP client once and reuse it across graph invocations — MCP connections do not persist across process restarts, so reconnect when resuming from a checkpoint. Use StateGraph with conditional edges from the tool node to handle MCP errors as graph-level routing decisions rather than exceptions. Monitor MCP servers with AliveMCP: a dead server mid-graph wastes the token cost of every step that ran successfully before the failure.

LangGraph vs LangChain for agent workflows

LangChain's AgentExecutor and create_react_agent implement a fixed ReAct loop: think → act → observe → repeat until done. LangGraph lets you define arbitrary graph topologies: nodes for different processing steps, conditional edges that route based on state, parallel branches that execute simultaneously, interrupt points for human-in-the-loop review, and persistent state that survives process restarts.

The practical difference becomes visible when tool results affect which tool to call next, when some steps should run in parallel, or when you need to pause and wait for human approval. LangGraph expresses all of these as graph structure; LangChain's AgentExecutor delegates them to the LLM on every loop iteration.

CapabilityLangChain AgentExecutorLangGraph StateGraph
Tool callingFixed ReAct loopToolNode in graph topology
Parallel tool callsLimitedParallel branches in graph
State persistenceConversationMemory (in-memory)Checkpointer (SQLite, Redis, Postgres)
Error recoveryException + retryConditional edge to error node
Human-in-the-loopNot built-ininterrupt_before/after
Multi-agentAgent-as-tool patternSubgraph composition

Basic setup: MCP tools in a LangGraph agent

The fastest path is create_react_agent from langgraph.prebuilt, which builds a standard ReAct StateGraph with a ToolNode wired in:

import asyncio
import os
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver

async def main():
    async with MultiServerMCPClient({
        "search": {
            "transport": "streamable_http",
            "url": "https://search.internal/mcp",
            "headers": {"Authorization": f"Bearer {os.environ['SEARCH_TOKEN']}"},
        },
    }) as client:
        tools = await client.get_tools()
        model = ChatAnthropic(model="claude-sonnet-4-6")
        checkpointer = MemorySaver()  # Use SqliteSaver in production

        agent = create_react_agent(model, tools, checkpointer=checkpointer)

        config = {"configurable": {"thread_id": "user-session-123"}}
        result = await agent.ainvoke(
            {"messages": [("user", "Find recent papers on MCP protocol security")]},
            config=config,
        )
        print(result["messages"][-1].content)

asyncio.run(main())

The thread_id in the config is the checkpointer key — all messages and state for this conversation are stored under that key. Subsequent invocations with the same thread_id continue the conversation from where it left off.

Custom StateGraph with ToolNode

For more control over the graph topology, define your own StateGraph with an explicit ToolNode:

from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode

class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    tool_error_count: int  # track consecutive tool failures

def should_continue(state: AgentState) -> str:
    last_message = state["messages"][-1]
    if not last_message.tool_calls:
        return "end"
    if state["tool_error_count"] >= 3:
        return "error_handler"  # route to error node after 3 failures
    return "tools"

async def call_model(state: AgentState):
    response = await model.ainvoke(state["messages"])
    return {"messages": [response]}

async def handle_error(state: AgentState):
    return {"messages": [("assistant", "I encountered repeated tool failures. Please check the service status and try again.")]}

# Build graph
builder = StateGraph(AgentState)
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(mcp_tools))
builder.add_node("error_handler", handle_error)

builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue, {
    "tools": "tools",
    "end": END,
    "error_handler": "error_handler",
})
builder.add_edge("tools", "agent")
builder.add_edge("error_handler", END)

graph = builder.compile(checkpointer=checkpointer)

The tool_error_count field in state accumulates across tool node executions. The routing function reads it to divert to the error handler after three consecutive failures — preventing an infinite retry loop on a broken MCP server.

Checkpoint persistence and MCP reconnection

LangGraph checkpointers persist graph state (messages, custom state fields) across process restarts. MCP connections are not persisted — they live only for the duration of the MultiServerMCPClient context manager. When resuming a long-running graph after a process restart (common in serverless and scheduled job environments), you must reconnect to the MCP server before the graph continues.

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async def resume_agent(thread_id: str, new_message: str):
    # Re-establish MCP connection on every resume — connections don't persist
    async with MultiServerMCPClient({
        "search": {"transport": "streamable_http", "url": "https://search.internal/mcp",
                   "headers": {"Authorization": f"Bearer {os.environ['SEARCH_TOKEN']}"}},
    }) as client:
        tools = await client.get_tools()
        model = ChatAnthropic(model="claude-sonnet-4-6")

        async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
            agent = create_react_agent(model, tools, checkpointer=checkpointer)
            config = {"configurable": {"thread_id": thread_id}}
            # LangGraph loads existing state from checkpointer automatically
            result = await agent.ainvoke(
                {"messages": [("user", new_message)]},
                config=config,
            )
    return result["messages"][-1].content

The checkpoint stores conversation history and custom state; it does not need to store the MCP connection because connections are stateless at the protocol level — a new connection starts with initialize and picks up exactly where any prior session left off from the tool's perspective.

Parallel tool calls with multiple MCP servers

LangGraph's ToolNode executes tool calls in parallel when the LLM returns multiple tool calls in a single response. If those tool calls go to different MCP servers, the calls execute simultaneously, reducing total latency:

async with MultiServerMCPClient({
    "search": {"transport": "streamable_http", "url": "https://search.internal/mcp",
               "headers": {"Authorization": f"Bearer {os.environ['SEARCH_TOKEN']}"}},
    "database": {"transport": "streamable_http", "url": "https://db.internal/mcp",
                 "headers": {"Authorization": f"Bearer {os.environ['DB_TOKEN']}"}},
}) as client:
    tools = await client.get_tools()  # tools from both servers, all available to ToolNode

When the LLM decides to call search_papers (from the search server) and get_citations (from the database server) simultaneously, ToolNode sends both calls in parallel via asyncio.gather. The total latency is max(search_latency, db_latency) rather than their sum. This parallel execution is a key advantage of the LangGraph + multi-server MCP architecture — monitor each server independently with separate uptime probes so a failure in one is distinguishable from a failure in the other.

Human-in-the-loop interrupts and MCP elicitation

LangGraph's interrupt_before pauses graph execution before a specified node, waiting for human input before resuming. MCP's elicitation pauses execution within a single tool call to collect additional user input. These are complementary — LangGraph interrupts operate at the graph orchestration level; MCP elicitation operates at the individual tool level:

MechanismScopeWhen to use
LangGraph interrupt_beforeEntire graph executionApprove the agent's plan before it calls any tools
LangGraph interrupt_afterEntire graph executionReview tool results before the agent continues reasoning
MCP elicitationSingle tool handlerCollect required parameters during tool execution (e.g. confirmation for destructive actions)

Both mechanisms depend on the infrastructure being available when the human resumes. Configure MCP session TTL to outlast your expected human review window, and use AliveMCP to verify the server is still up when the workflow resumes after the interrupt.

Monitoring MCP servers in long-running LangGraph workflows

LangGraph workflows are often long-running: a research agent might run for 10–30 minutes, executing dozens of tool calls across multiple MCP servers. A server that goes down at step 15 of a 20-step workflow wastes the token cost of all prior steps. For multi-step workflows with non-trivial cost, verify MCP server health before starting the graph and set up continuous monitoring so you are alerted immediately when a server fails:

async def run_research_workflow(topic: str, thread_id: str):
    # Health check before starting an expensive workflow
    async with httpx.AsyncClient(timeout=5.0) as http:
        for server_url, token_env in [
            ("https://search.internal/mcp", "SEARCH_TOKEN"),
            ("https://db.internal/mcp", "DB_TOKEN"),
        ]:
            resp = await http.post(server_url, json={
                "jsonrpc": "2.0", "method": "initialize",
                "params": {"protocolVersion": "2025-03-26", "capabilities": {},
                           "clientInfo": {"name": "preflight", "version": "1"}}, "id": 1,
            }, headers={"Authorization": f"Bearer {os.environ[token_env]}"})
            if resp.status_code != 200:
                raise RuntimeError(f"MCP server {server_url} is unavailable — aborting workflow")

AliveMCP runs these probes for you continuously, outside your application code. When it detects a server failure, it alerts before your workflow starts — not after 20 expensive tool calls reveal the problem at step 21.

Frequently asked questions

Should I use create_react_agent or build a custom StateGraph for MCP tools?

Start with create_react_agent — it handles the common ReAct loop correctly and supports checkpointing. Move to a custom StateGraph when you need: (1) routing based on tool call results (e.g., "if the search tool returned no results, try the database tool"); (2) parallel branches that each call different MCP servers; (3) error recovery as explicit graph nodes rather than exceptions; or (4) human-in-the-loop interrupts at specific points in the workflow.

How does LangGraph handle MCP tool errors in ToolNode?

ToolNode catches exceptions raised by MCP tool calls (including ToolException from isError: true responses) and converts them to ToolMessage objects with the error content. These error messages are added to the graph state and the LLM sees them on the next agent node invocation. The LLM can then decide to retry with different arguments, use a different tool, or report the failure to the user. Use conditional edges to detect repeated failures and route to a dedicated error handler node.

Can I use LangGraph's streaming with MCP progress notifications?

Yes, partially. graph.astream_events() surfaces tool start/end events. MCP progress notifications arrive as SSE frames on the underlying connection during the tool execution window but are not emitted as distinct LangGraph events. To surface real-time progress in your UI, implement a progress callback that writes to a shared queue or WebSocket alongside the graph stream.

How do I share state between the LangGraph graph and MCP tool handlers?

MCP tool handlers should not read from or write to LangGraph state directly — MCP is a protocol layer that knows nothing about the orchestrating framework. Pass the information a tool handler needs as tool arguments. Pass results from tool handlers back as tool output text. LangGraph state is for the agent's internal bookkeeping; MCP is for the tool implementation. For identity and auth context that should not be in tool arguments, use MCP context propagation at the session level.

What checkpointer should I use in production with MCP tools?

Use AsyncSqliteSaver for single-instance deployments or PostgresSaver / RedisSaver for multi-instance production deployments where multiple workers might resume the same graph. The checkpointer stores graph state, not MCP connections — connections are always re-established on resume. For serverless environments (AWS Lambda, Cloud Run), use PostgresSaver since each invocation may use a different process instance.

Further reading

Know when your MCP server is down — before users do

AliveMCP probes your server's MCP endpoint every minute, detects protocol errors and transport failures, and pages you before users notice.

Start monitoring free