Guide · Agentic Frameworks
MCP server Google ADK integration
Google ADK (Agent Development Kit) is Google's open-source Python framework for building production agent pipelines that deploy to Vertex AI Agent Engine, Cloud Run, or any Python ASGI server. ADK represents tools as Python callables that the framework wraps in a FunctionTool and presents to the Gemini model's native function-calling API. MCP servers integrate as a tool implementation layer: you write an async Python function for each MCP tool, ADK wraps it as a FunctionTool, and the agent runtime invokes it when the model selects it. ADK's multi-agent primitives — SequentialAgent, ParallelAgent, and LlmAgent sub-agents — let you assign different MCP servers to specialized sub-agents and orchestrate them from a root agent. The key operational considerations are the MCP session lifecycle relative to ADK's session model, async bridging from synchronous agent contexts, and monitoring MCP servers that your production agent depends on for reliability.
TL;DR
Write an async Python function that calls your MCP server via the MCP Python SDK's session.call_tool(). Decorate it or pass it to FunctionTool(fn). Pass the tool to LlmAgent(tools=[my_tool]). Maintain one MCP ClientSession per server across all agent calls — do not reconnect per invocation. Catch McpError and httpx.ConnectError and return error dicts that ADK converts to function-call error responses. Monitor MCP endpoints with AliveMCP — ADK agents running on Vertex AI produce opaque errors when a dependent MCP server is down.
Google ADK architecture
ADK's core abstractions relevant to MCP integration:
| ADK concept | Description | MCP relevance |
|---|---|---|
LlmAgent | Gemini-backed agent that reasons and calls tools | Receives MCP tools via tools list; invokes them when the model decides |
FunctionTool | Wraps any Python async callable as an ADK tool | The bridge between a Python MCP wrapper function and the ADK tool system |
SequentialAgent | Runs a list of sub-agents in order, passing state between them | Chain an MCP data-fetching agent before an analysis agent |
ParallelAgent | Runs sub-agents concurrently, merges results | Fan out to multiple MCP servers simultaneously, then merge |
Session | Conversation context: history, state, artifacts | MCP connections can be stored in session.state for reuse across turns |
Wrapping MCP tools as FunctionTools
ADK's FunctionTool accepts any async Python function. The function's docstring and type annotations drive the schema that is sent to the Gemini model. Write clear docstrings — Gemini uses them to decide when to call the tool:
import os
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
from google.adk.tools import FunctionTool
# Module-level persistent session — one per MCP server
_search_session: ClientSession | None = None
_search_cm = None
async def get_search_session() -> ClientSession:
global _search_session, _search_cm
if _search_session is None:
_search_cm = sse_client(
url=os.environ["SEARCH_MCP_URL"],
headers={"Authorization": f"Bearer {os.environ['SEARCH_MCP_TOKEN']}"},
)
read, write = await _search_cm.__aenter__()
_search_session = ClientSession(read, write)
await _search_session.__aenter__()
await _search_session.initialize()
return _search_session
async def search_web(query: str, max_results: int = 5) -> dict:
"""Search the web for recent information on a topic.
Use this tool when the user asks about current events, recent releases,
or information that may have changed since the model's knowledge cutoff.
Args:
query: The search query. Be specific — narrow queries return better results.
max_results: Number of search results to return (1-20, default 5).
Returns:
A dict with keys "results" (list of {title, url, snippet}) and "total_found".
"""
try:
session = await get_search_session()
result = await session.call_tool(
"search_web",
arguments={"query": query, "max_results": max_results},
)
if result.isError:
return {"error": result.content[0].text, "results": []}
import json
return json.loads(result.content[0].text)
except Exception as e:
return {
"error": f"MCP search unavailable: {type(e).__name__}: {e}",
"results": [],
}
async def get_page_content(url: str) -> dict:
"""Retrieve and extract the main text content from a webpage URL.
Use after search_web to read the full content of a specific page.
Args:
url: The full URL of the page to retrieve.
Returns:
A dict with keys "content" (extracted text) and "title".
"""
try:
session = await get_search_session()
result = await session.call_tool("get_page_content", arguments={"url": url})
if result.isError:
return {"error": result.content[0].text, "content": ""}
return {"content": result.content[0].text, "title": ""}
except Exception as e:
return {"error": f"MCP fetch unavailable: {e}", "content": ""}
# Wrap as ADK FunctionTools
search_tool = FunctionTool(search_web)
page_tool = FunctionTool(get_page_content)
Returning dicts (rather than strings) allows ADK to pass structured data into the model's tool-use context. Gemini can reason about dict fields directly — results[0].title is more useful context than a flat string. Always include an "error" key in the return dict for failure cases so the model can detect and handle tool failures without parsing error strings heuristically.
Building an LlmAgent with MCP tools
Pass the wrapped tools to an LlmAgent. ADK automatically uses Gemini's native function-calling API to present the tool schemas and dispatch calls:
from google.adk.agents import LlmAgent
from google.adk.models.lite_llm import LiteLlm
research_agent = LlmAgent(
name="ResearchAgent",
model=LiteLlm(model="gemini/gemini-2.0-flash"), # or use vertex AI model
instruction="""You are a research agent with access to live web search tools.
- Always search for current information before answering questions about recent events.
- When search returns results, retrieve the full content of the most relevant page.
- Provide citations for all factual claims.
- If search is unavailable (error key in result), acknowledge it and answer from training data.""",
tools=[search_tool, page_tool],
output_key="research_result", # store final response in session state
)
# For use with Vertex AI Gemini directly:
from google.adk.models import Gemini
agent_with_vertex = LlmAgent(
name="ResearchAgent",
model=Gemini(model="gemini-2.0-flash"),
tools=[search_tool, page_tool],
instruction="...",
)
Multi-agent patterns with MCP servers
ADK's SequentialAgent and ParallelAgent compose multiple LlmAgents. Assign specialized MCP servers to each sub-agent and orchestrate with a root agent:
from google.adk.agents import SequentialAgent, ParallelAgent, LlmAgent
# Sub-agents, each with their own MCP toolset
search_agent = LlmAgent(
name="WebSearcher",
model=Gemini(model="gemini-2.0-flash"),
tools=[search_tool, page_tool],
instruction="Search the web and retrieve relevant pages for the given research query.",
output_key="search_results",
)
database_agent = LlmAgent(
name="DataAnalyst",
model=Gemini(model="gemini-2.0-flash"),
tools=[db_query_tool, db_schema_tool], # MCP tools from a database MCP server
instruction="Query the analytics database to extract statistical data for the research topic.",
output_key="data_results",
)
# Run search and database agents in parallel, then synthesize
parallel_gather = ParallelAgent(
name="ParallelGather",
sub_agents=[search_agent, database_agent],
)
synthesis_agent = LlmAgent(
name="Synthesizer",
model=Gemini(model="gemini-2.0-pro"), # stronger model for synthesis
instruction="""Synthesize the web research and database findings from prior steps
(available in session state as 'search_results' and 'data_results') into a
structured report with sections: Summary, Key Findings, Statistics, and Sources.""",
tools=[], # synthesis step uses no tools — works from session state
output_key="final_report",
)
research_pipeline = SequentialAgent(
name="ResearchPipeline",
sub_agents=[parallel_gather, synthesis_agent],
)
The ParallelAgent runs search_agent and database_agent concurrently — both MCP servers are called simultaneously, halving the wall-clock time for the data-gathering phase. Each sub-agent's output is stored in session state under its output_key, making it available to the synthesis agent in the next sequential step.
Deploying to Vertex AI Agent Engine
ADK agents deploy to Vertex AI Agent Engine with agent_engines.create(). MCP connections are established at runtime inside the deployed agent; credentials come from environment variables or Vertex AI secret manager:
import vertexai
from vertexai.preview import reasoning_engines
vertexai.init(project=os.environ["GCP_PROJECT"], location="us-central1")
# The agent app must define a query() method for Agent Engine
class ResearchApp:
def __init__(self):
self.agent = research_pipeline # the SequentialAgent defined above
async def query(self, *, user_input: str) -> str:
from google.adk.runners import InMemoryRunner
runner = InMemoryRunner(agent=self.agent, app_name="research")
result = ""
async for chunk in runner.run_async(
user_id="user",
session_id="session",
new_message=user_input,
):
if chunk.is_final_response() and chunk.content:
result = chunk.content.parts[0].text
return result
def stream_query(self, *, user_input: str):
# For streaming responses
import asyncio
return asyncio.run(self.query(user_input=user_input))
# Deploy to Vertex AI Agent Engine
remote_agent = reasoning_engines.AdkApp(
agent=research_pipeline,
enable_tracing=True,
).deploy(
display_name="MCP Research Pipeline",
requirements=["google-adk", "mcp", "httpx"],
extra_packages=["./my_mcp_wrappers/"],
)
Vertex AI Agent Engine manages scaling and infrastructure. Your MCP wrapper functions run inside the deployed container — ensure that the MCP server URLs are accessible from Vertex AI's network egress (public MCP servers or VPC-connected private endpoints). Set MCP credentials as Vertex AI environment variable secrets, not hardcoded values, to avoid leaking them in deployment artifacts.
Monitoring MCP servers in ADK pipelines
ADK agents deployed to Vertex AI are opaque from an infrastructure perspective: errors in MCP tool calls appear in Cloud Logging as FunctionTool error entries, but the root cause (MCP server down vs. bad arguments vs. server logic error) requires log diving to determine. AliveMCP monitors your MCP endpoints independently, providing infrastructure-level visibility separate from ADK application logs:
import httpx
from mcp import ClientSession
async def verify_mcp_health(url: str, token: str, server_name: str) -> None:
"""Verify MCP server connectivity before starting an ADK agent session."""
try:
async with httpx.AsyncClient(timeout=5.0) as http:
# Use MCP initialize as a lightweight health probe
resp = await http.post(
url.replace("/sse", ""), # HTTP endpoint
headers={"Authorization": f"Bearer {token}"},
json={
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "health-check", "version": "1"},
},
"id": 0,
},
)
if resp.status_code != 200:
raise RuntimeError(f"MCP server {server_name} returned HTTP {resp.status_code}")
except httpx.ConnectError as e:
raise RuntimeError(
f"Cannot reach MCP server '{server_name}' at {url}: {e}. "
"Check status at https://alivemcp.com"
) from e
# Preflight for long-running ADK pipeline jobs
async def run_research_pipeline_with_checks(query: str) -> str:
await verify_mcp_health(
os.environ["SEARCH_MCP_URL"],
os.environ["SEARCH_MCP_TOKEN"],
"search",
)
from google.adk.runners import InMemoryRunner
runner = InMemoryRunner(agent=research_pipeline, app_name="research")
result = ""
async for chunk in runner.run_async(user_id="u", session_id="s", new_message=query):
if chunk.is_final_response() and chunk.content:
result = chunk.content.parts[0].text
return result
The preflight catches servers that are completely unreachable before any Gemini API tokens are spent. For ongoing monitoring, AliveMCP probes every 60 seconds from outside your VPC, covering the case where the MCP server goes down after your ADK pipeline starts — giving you a minute-resolution alert that correlates with your ADK error logs.
Frequently asked questions
Does Google ADK have built-in MCP support, or do I need to write wrappers?
As of ADK Python v1.x, there is a built-in MCP integration via MCPToolset (in the google.adk.tools.mcp_tool module). This is similar to LlamaIndex's MCPToolSpec: it auto-discovers tools from a live MCP server and wraps them as ADK FunctionTool objects. Check the ADK release notes for the exact API — it may have evolved. The manual wrapper pattern (async function + FunctionTool) described in this guide always works and gives you full control over error handling and argument mapping.
Can I use MCP with Gemma or other models in ADK, not just Gemini?
ADK's LiteLlm model adapter supports any model available in LiteLLM, including locally-hosted models via Ollama. MCP tool calling requires native function-calling support from the underlying model — Gemma models without fine-tuning for function calling will not reliably invoke tools. Use Gemini (1.5 Pro, 2.0 Flash, or 2.0 Pro) for production MCP tool workflows; use smaller models for non-tool tasks in multi-agent pipelines where only some agents need MCP access.
How do I pass per-user credentials to MCP tools in a multi-tenant ADK deployment?
Do not use module-level shared sessions for multi-tenant deployments. Instead, store the user's MCP credentials in the ADK session.state at the start of each conversation and create a per-session ClientSession from those credentials in each tool call. Close the per-session MCP connection when the ADK session ends. The overhead of per-session MCP connections is acceptable if users have distinct permission levels on the MCP server; for service-level auth (all users share one service account), a module-level shared session is correct.
What happens when a MCP tool call times out in an ADK ParallelAgent?
ADK's ParallelAgent runs sub-agents concurrently using asyncio. If a tool call in one sub-agent times out (raises asyncio.TimeoutError or httpx.TimeoutException), catch it in the wrapper function and return an error dict. The other sub-agents continue running unaffected. If you let the timeout exception propagate, it cancels the entire parallel execution — all sub-agents are cancelled, not just the one that timed out. Always handle timeouts explicitly in MCP wrapper functions used in parallel contexts.
Can ADK agents call stdio MCP servers deployed on Vertex AI?
Yes, but with a caveat: stdio MCP servers are launched as subprocesses of the ADK agent process. When deployed to Vertex AI Agent Engine, the subprocess must be bundled in the deployment package (using extra_packages in the deployment spec). Ensure the subprocess binary (e.g., a Node.js MCP server) is included and executable in the Vertex AI container environment. For production deployments, HTTP/SSE transport with a separately deployed MCP server is simpler to manage than bundled stdio servers.
Further reading
- MCP server LangChain integration — using MCP tools in LangChain agents
- MCP server smolagents integration — HuggingFace smolagents with MCP tools
- MCP server Google Gemini integration — Gemini API with MCP tools
- MCP server authentication — API keys, bearer tokens, and OAuth 2.0
- MCP server health check — designing a robust /health endpoint
- AliveMCP — continuous protocol monitoring for MCP servers