Guide · Agentic Frameworks
MCP server LlamaIndex integration
LlamaIndex is one of the most widely-used Python frameworks for building RAG pipelines and agentic workflows. MCP fills the tool-calling half of that picture: it gives LlamaIndex agents a standardized way to reach live, external capabilities — databases, search engines, code runners, APIs — without hard-coding function wrappers for each one. The llama-index-tools-mcp package provides MCPToolSpec, which translates a connected MCP server's tool list into LlamaIndex BaseTool objects that any LlamaIndex agent can call directly. The critical decisions are connection lifecycle (one persistent client, not one per call), error propagation (return structured results, not bare exceptions), and monitoring the remote MCP servers that your LlamaIndex pipeline depends on for correctness and uptime.
TL;DR
Install llama-index-tools-mcp and mcp. Create a ClientSession to the MCP server inside an async context manager, then pass it to MCPToolSpec(session=session) and call await spec.to_tool_list_async() to get LlamaIndex AsyncTool objects. Pass those to FunctionCallingAgent.from_tools(tools, llm=llm) or ReActAgent.from_tools(tools, llm=llm). Keep the ClientSession alive for the entire agent run — do not reconnect per call. Catch ToolMetadataException and MCP McpError at the agent boundary. Monitor your MCP endpoint with AliveMCP — a dead server mid-pipeline wastes all preceding LLM tokens.
LlamaIndex agent architecture
LlamaIndex provides two primary agent patterns relevant to MCP integration:
| Agent type | How it calls tools | Best fit |
|---|---|---|
FunctionCallingAgent | Uses the LLM's native tool-calling API (function_call / tool_use) | Fast, structured output; works with OpenAI, Anthropic, Gemini, Mistral |
ReActAgent | ReAct loop: Thought → Action → Observation text | Any LLM; verbose reasoning trace; better for debugging |
AgentWorkflow | State machine with explicit steps; tools bound per step | Complex multi-step pipelines where step order matters |
QueryEngineTool | Wraps a QueryEngine as an agent tool | Hybrid: agent orchestrates both RAG retrieval and MCP tool calls |
MCP tools integrate at the BaseTool level — they slot into any of these agent types without framework changes.
Connecting to an MCP server with MCPToolSpec
The mcp Python SDK provides transport-level connection primitives. MCPToolSpec wraps a connected session and handles the translation from MCP tool schemas to LlamaIndex ToolMetadata and AsyncTool objects:
import asyncio
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client
from llama_index.tools.mcp import MCPToolSpec
from llama_index.core.agent import FunctionCallingAgent
from llama_index.llms.anthropic import Anthropic
# --- HTTP/SSE transport (production MCP servers) ---
async def build_agent_sse():
async with sse_client(
url="https://search.internal/sse",
headers={"Authorization": f"Bearer {os.environ['SEARCH_TOKEN']}"},
) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
spec = MCPToolSpec(session=session)
tools = await spec.to_tool_list_async()
llm = Anthropic(model="claude-sonnet-4-6", api_key=os.environ["ANTHROPIC_API_KEY"])
agent = FunctionCallingAgent.from_tools(
tools=tools,
llm=llm,
verbose=True,
system_prompt="You are a research assistant with access to a live search MCP server.",
)
response = await agent.aquery("What are the latest papers on MCP server security?")
return str(response)
# --- Stdio transport (local MCP servers, development) ---
async def build_agent_stdio():
server_params = StdioServerParameters(
command="node",
args=["./my-mcp-server/build/index.js"],
env={"DATA_DIR": "/data/corpus"},
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
spec = MCPToolSpec(session=session)
tools = await spec.to_tool_list_async()
# ... same agent setup
The ClientSession is the lifecycle boundary. All tool calls that happen while the agent is running must occur within the same session context. to_tool_list_async() calls tools/list on the MCP server once at startup; the returned tool objects carry the discovered schemas and delegate actual invocation back to the live session.
Persistent connection for multi-turn agents
LlamaIndex agents may make many tool calls before returning a final response — a ReActAgent might execute 10–20 tool calls over several reasoning steps. Creating a new ClientSession for each call adds 50–200 ms of connection overhead per call and multiplies load on the MCP server. The correct pattern is a single session for the entire agent run:
from contextlib import asynccontextmanager
from mcp.client.sse import sse_client
from mcp import ClientSession
class MCPSessionPool:
"""One persistent session per MCP server URL."""
def __init__(self):
self._sessions: dict[str, ClientSession] = {}
self._context_managers = {}
async def acquire(self, url: str, token: str) -> ClientSession:
if url not in self._sessions:
cm = sse_client(
url=url,
headers={"Authorization": f"Bearer {token}"},
)
read, write = await cm.__aenter__()
session = ClientSession(read, write)
await session.__aenter__()
await session.initialize()
self._sessions[url] = session
self._context_managers[url] = (cm, session)
return self._sessions[url]
async def close_all(self):
for url, (cm, session) in self._context_managers.items():
try:
await session.__aexit__(None, None, None)
await cm.__aexit__(None, None, None)
except Exception:
pass
self._sessions.clear()
self._context_managers.clear()
pool = MCPSessionPool()
HTTP/2 multiplexing means multiple concurrent tool calls on the same session are fine. The pool gives you connection reuse across multiple sequential agent invocations within the same process lifetime.
Error handling in LlamaIndex MCP tools
MCP errors surface in LlamaIndex in two ways: tool execution errors (where the MCP server returns isError: true in the tool result) and transport errors (connection refused, timeout, invalid JSON-RPC response). Handle both at the agent boundary:
from llama_index.core.tools import FunctionTool
from mcp import McpError
import httpx
def make_safe_mcp_tool(spec: MCPToolSpec, tool_name: str):
"""
Wrap a single MCP tool with error handling that prevents
LlamaIndex agent loop crashes on MCP failures.
"""
raw_tools = {t.metadata.name: t for t in spec.to_tool_list()}
async def safe_call(**kwargs) -> str:
try:
result = await raw_tools[tool_name].acall(**kwargs)
# MCP isError: true is surfaced as an error-flagged result
if hasattr(result, "is_error") and result.is_error:
return f"Tool returned error: {result.content}. Try a different approach."
return str(result)
except McpError as e:
return f"MCP protocol error: {e.error.message} (code {e.error.code}). The server may be temporarily unavailable."
except httpx.TimeoutException:
return "MCP server did not respond within the timeout. Try a simpler query or check server status at alivemcp.com."
except httpx.ConnectError:
return "Cannot connect to the MCP server. It may be down — check alivemcp.com for current status."
except Exception as e:
return f"Unexpected error calling {tool_name}: {type(e).__name__}: {e}"
return safe_call
Returning error strings rather than raising exceptions is the safe default for LlamaIndex agents: FunctionCallingAgent and ReActAgent both inject tool results (including error strings) back into the LLM context as observations, allowing the LLM to reason about the failure and attempt an alternative path. An uncaught exception propagates out of the agent loop and discards all reasoning done up to that point.
MCP resources as LlamaIndex documents
Beyond tools, MCP servers expose resources — structured data objects (files, records, blobs) that can be listed and read. LlamaIndex can ingest MCP resources directly as Document objects for indexing or retrieval:
from mcp import ClientSession
from llama_index.core import Document, VectorStoreIndex
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
async def index_mcp_resources(session: ClientSession, uri_prefix: str = "file://") -> VectorStoreIndex:
"""
List all MCP resources under uri_prefix, read each one,
and build a LlamaIndex VectorStoreIndex from the content.
"""
resources_result = await session.list_resources()
docs = []
for resource in resources_result.resources:
if not resource.uri.startswith(uri_prefix):
continue
try:
read_result = await session.read_resource(resource.uri)
for content in read_result.contents:
if content.type == "text":
docs.append(Document(
text=content.text,
metadata={
"source": resource.uri,
"name": resource.name,
"mimeType": resource.mimeType or "text/plain",
},
))
except Exception as e:
print(f"Failed to read resource {resource.uri}: {e}")
pipeline = IngestionPipeline(transformations=[SentenceSplitter(chunk_size=512)])
nodes = await pipeline.arun(documents=docs)
return VectorStoreIndex(nodes)
Combining MCP resources with MCP tools in one LlamaIndex agent gives you a powerful hybrid: the vector index answers questions about static corpus data, while MCP tools answer queries requiring live execution (run a query, fetch a URL, call an API). Wrap the index as a QueryEngineTool and include it alongside your MCP AsyncTool objects in FunctionCallingAgent.from_tools().
ReActAgent with multiple MCP servers
For workflows that span multiple specialized MCP servers, combine tool lists from separate sessions. Each server provides a distinct capability; the ReActAgent reasons about which tool to invoke based on its description:
async def build_multi_mcp_agent():
sessions = {}
tools = []
server_configs = [
("search", "https://search.internal/sse", os.environ["SEARCH_TOKEN"]),
("database", "https://db.internal/sse", os.environ["DB_TOKEN"]),
("calendar", "https://cal.internal/sse", os.environ["CAL_TOKEN"]),
]
# Open all sessions first, then gather tools
for name, url, token in server_configs:
session = await pool.acquire(url, token)
spec = MCPToolSpec(session=session)
server_tools = await spec.to_tool_list_async()
# Prefix tool names to avoid collisions across servers
for tool in server_tools:
tool.metadata.name = f"{name}__{tool.metadata.name}"
tools.extend(server_tools)
llm = Anthropic(model="claude-sonnet-4-6")
agent = ReActAgent.from_tools(
tools=tools,
llm=llm,
max_iterations=25,
verbose=True,
)
return agent
Prefixing tool names with the server name prevents collisions when two servers expose tools with the same name (e.g., both a search server and a database server might expose list_items). The prefix also helps the LLM understand which server it is targeting, which can reduce tool selection errors.
Monitoring MCP servers in LlamaIndex pipelines
LlamaIndex pipelines — especially RAG pipelines augmented with MCP tools — often run unattended on a schedule: nightly report generation, continuous document ingestion, batch question answering over a corpus. A dead MCP server mid-run produces a partial result with no clear error signal: the agent loop exhausts its retries, the final answer omits data from the failed server, and the calling application sees a valid-looking but incomplete response.
Two defenses: add a preflight health check at the start of each pipeline run, and run AliveMCP continuous monitoring on each MCP server endpoint independently of your application code:
async def preflight_check(session: ClientSession, server_name: str) -> None:
"""
Verify the MCP server is reachable and initialized before
spending any LLM tokens on the pipeline.
"""
try:
# tools/list is a lightweight RPC with no side effects
result = await asyncio.wait_for(session.list_tools(), timeout=5.0)
if not result.tools:
raise RuntimeError(f"MCP server '{server_name}' returned empty tool list")
except asyncio.TimeoutError:
raise RuntimeError(f"MCP server '{server_name}' did not respond within 5 s")
except Exception as e:
raise RuntimeError(f"MCP server '{server_name}' preflight failed: {e}") from e
async def run_pipeline(topic: str):
session = await pool.acquire(
os.environ["SEARCH_URL"], os.environ["SEARCH_TOKEN"]
)
await preflight_check(session, "search") # fail fast, before any LLM cost
spec = MCPToolSpec(session=session)
tools = await spec.to_tool_list_async()
agent = FunctionCallingAgent.from_tools(tools=tools, llm=Anthropic())
return await agent.aquery(f"Research and summarize: {topic}")
The preflight check catches servers that are down before you spend any LLM tokens. AliveMCP covers the continuous case: it probes your MCP endpoint every 60 seconds and alerts you within a minute of any failure, so you learn about an outage long before your next scheduled pipeline run discovers it.
Frequently asked questions
What is the difference between MCPToolSpec and writing FunctionTool wrappers by hand?
MCPToolSpec introspects the connected MCP server at runtime — it calls tools/list, reads the JSON Schema for each tool's input parameters, and constructs ToolMetadata objects automatically. Writing FunctionTool wrappers by hand means you maintain a static mapping of tool names, descriptions, and schemas that drifts out of sync when the MCP server adds or changes tools. Use MCPToolSpec for any MCP server you do not control; use hand-written wrappers only when you need custom argument preprocessing or error handling beyond what the spec generates.
Can I use MCPToolSpec with a local stdio MCP server during development?
Yes — pass StdioServerParameters to stdio_client() and use the resulting session with MCPToolSpec exactly as you would an SSE session. Stdio transport is synchronous from the perspective of the MCP SDK: the spawned subprocess receives JSON-RPC requests on stdin and writes responses on stdout. The async Python wrapper handles this transparently. Use stdio for local development and switch to HTTP/SSE for production without changing the MCPToolSpec code.
How do I pass authentication context per user when using MCPToolSpec?
MCP authentication is at the transport level — the bearer token or API key goes in the HTTP headers when creating the SSE connection. For per-user auth, create a separate ClientSession per user (or per request) using that user's credentials. Do not share a session across users with different permission levels — the MCP server's authorization decisions are based on the session-level credentials, not per-call parameters. If the MCP server uses service-level auth (one credential for the whole application), sharing a session is correct.
Does LlamaIndex support MCP streaming responses?
MCP progress notifications (incremental streaming from long-running tools) are not yet surfaced through MCPToolSpec's to_tool_list_async(). Intermediate notifications are delivered on the transport but discarded at the SDK layer before the tool result reaches LlamaIndex. If you need streaming output from a long MCP tool call, implement a custom FunctionTool that uses the MCP SDK's session.call_tool() directly and streams partial results into a shared buffer accessible to the calling code.
How do I handle an MCP server that goes down while a LlamaIndex agent is mid-run?
The safest pattern is wrapping each tool's acall() in a try/except that catches httpx.ConnectError and McpError and returns an error string (see the make_safe_mcp_tool example above). This allows the LLM to acknowledge the failure in its reasoning trace and produce a partial answer rather than crashing the agent. Complement this with health check probes and AliveMCP monitoring so you receive an alert as soon as the server goes down, independently of your agent's error handling.
Further reading
- MCP server LangChain integration — using MCP tools in LangChain agents
- MCP server CrewAI integration — multi-agent teams with MCP tools
- MCP server Pydantic AI integration — type-safe agents with MCP
- MCP server health check — designing a robust /health endpoint
- MCP server error handling — isError, error codes, and retry patterns
- AliveMCP — continuous protocol monitoring for MCP servers