Guide · MCP Event Streaming Integration
MCP Server Kafka — KafkaJS producer and consumer tools, topic management, health monitoring
Apache Kafka is central to event-driven architectures — MCP agents need to publish events, read from topics, inspect consumer group lag, and manage topic configuration. This guide covers exposing Kafka operations as MCP tools using KafkaJS: singleton producer and admin client setup, safe publish tools with message validation, consumer tools for synchronous topic reads, consumer group lag monitoring, and a /health/kafka endpoint that AliveMCP can poll to detect broker connectivity issues before agents experience timeouts.
TL;DR
Use a singleton Kafka instance with a connected Producer kept open — reconnecting on every tool call is expensive. Validate message keys and values with Zod before publishing. For consumer tools, use a transient consumer (subscribe → read N messages → disconnect) rather than a long-running consumer loop, which doesn't fit the synchronous MCP tool model. Validate topic names against an allow-list — never accept arbitrary topic names from callers. Wire a /health/kafka endpoint that calls admin.describeCluster() to verify broker reachability.
KafkaJS setup: singleton producer and admin client
KafkaJS uses a layered client model: Kafka (configuration and factory), Producer (publishing), Consumer (subscribing), and Admin (cluster management). For MCP servers, keep the Producer and Admin clients connected as singletons — connecting and disconnecting on every tool call adds 100–500ms per operation and can trigger rebalance events that affect other consumers.
import { Kafka, logLevel, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({
clientId: 'mcp-server',
brokers: (process.env.KAFKA_BROKERS ?? 'localhost:9092').split(','),
// SSL for Confluent Cloud, MSK, or self-hosted with TLS
ssl: process.env.KAFKA_SSL === 'true',
// SASL authentication (Confluent Cloud, MSK IAM)
sasl: process.env.KAFKA_SASL_MECHANISM
? {
mechanism: process.env.KAFKA_SASL_MECHANISM as 'plain' | 'scram-sha-256' | 'scram-sha-512',
username: process.env.KAFKA_SASL_USERNAME!,
password: process.env.KAFKA_SASL_PASSWORD!
}
: undefined,
// Suppress info-level logs in production — they're very verbose
logLevel: process.env.NODE_ENV === 'production' ? logLevel.ERROR : logLevel.WARN,
// Retry configuration for transient broker errors
retry: {
initialRetryTime: 300,
retries: 5
}
});
// Singleton producer — connected once at startup
const producer = kafka.producer({
idempotent: true, // exactly-once delivery (requires acks: -1)
transactionTimeout: 30000
});
// Singleton admin client — for topic management and health checks
const admin = kafka.admin();
// Connect both at startup
async function initKafka(): Promise<void> {
await Promise.all([producer.connect(), admin.connect()]);
console.error('Kafka producer and admin connected');
}
initKafka().catch((err) => {
console.error('Kafka connection failed:', err.message);
process.exit(1);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await Promise.all([producer.disconnect(), admin.disconnect()]);
});
export { kafka, producer, admin };
Setting idempotent: true on the producer enables Kafka's idempotent producer mode — the broker deduplicates retried messages using sequence numbers, so network errors during publish don't result in duplicate messages. This requires the producer to use acks: -1 (all replicas must acknowledge). It's the right default for MCP tools that publish events because a failed tool call often results in a retry from the calling agent.
Topic allow-list and message publishing tools
Never accept an arbitrary topic name from the MCP caller. An agent with tool access to an unrestricted Kafka producer can publish to internal topics — audit logs, compacted state topics, or internal control topics — that could corrupt the broker's metadata. Always validate topic names against a hard-coded allow-list.
import { z } from 'zod';
import { McpError, ErrorCode } from '@modelcontextprotocol/sdk/types.js';
import { producer } from './kafka-client.js';
// Hard-coded allow-list — never from caller input
const ALLOWED_TOPICS = new Set([
'user.events',
'order.events',
'notification.requests',
'audit.log'
]);
function validateTopic(topic: string): void {
if (!ALLOWED_TOPICS.has(topic)) {
throw new McpError(
ErrorCode.InvalidParams,
`Topic '${topic}' is not in the allowed topic list. Allowed: ${[...ALLOWED_TOPICS].join(', ')}`
);
}
}
// ---- publish_message ----
server.tool(
'publish_message',
{
topic: z.string().min(1),
key: z.string().optional(),
value: z.record(z.unknown()), // JSON object
headers: z.record(z.string()).optional(),
partition: z.number().int().min(0).optional()
},
async ({ topic, key, value, headers, partition }) => {
validateTopic(topic);
const result = await producer.send({
topic,
messages: [{
key: key ?? null,
value: JSON.stringify(value),
headers: headers
? Object.fromEntries(Object.entries(headers).map(([k, v]) => [k, Buffer.from(v)]))
: undefined,
partition
}],
acks: -1 // wait for all replicas (required for idempotent producer)
});
const metadata = result[0];
return {
content: [{
type: 'text',
text: JSON.stringify({
topic: metadata.topicName,
partition: metadata.partition,
offset: metadata.baseOffset,
timestamp: metadata.logAppendTime
})
}]
};
}
);
// ---- publish_batch ----
server.tool(
'publish_batch',
{
topic: z.string(),
messages: z.array(z.object({
key: z.string().optional(),
value: z.record(z.unknown()),
headers: z.record(z.string()).optional()
})).min(1).max(100)
},
async ({ topic, messages }) => {
validateTopic(topic);
const result = await producer.send({
topic,
messages: messages.map(({ key, value, headers }) => ({
key: key ?? null,
value: JSON.stringify(value),
headers: headers
? Object.fromEntries(Object.entries(headers).map(([k, v]) => [k, Buffer.from(v)]))
: undefined
})),
compression: CompressionTypes.GZIP, // compress batch for network efficiency
acks: -1
});
return {
content: [{
type: 'text',
text: JSON.stringify({
messages_sent: messages.length,
partitions: result.map(r => ({ partition: r.partition, offset: r.baseOffset }))
})
}]
};
}
);
Consumer tools: synchronous topic reads
Long-running Kafka consumers (consumer groups that continuously poll) don't fit the MCP tool model — MCP tools must return synchronously. Instead, implement a "read N messages from offset" pattern: create a transient consumer, subscribe to the topic, seek to the requested offset, collect N messages, then disconnect.
import { kafka } from './kafka-client.js';
// ---- read_messages ----
server.tool(
'read_messages',
{
topic: z.string(),
partition: z.number().int().min(0).default(0),
offset: z.string().default('latest'), // 'latest', 'earliest', or a numeric offset string
max_messages: z.number().int().min(1).max(50).default(10),
timeout_ms: z.number().int().min(1000).max(30000).default(10000)
},
async ({ topic, partition, offset, max_messages, timeout_ms }) => {
validateTopic(topic);
// Each transient consumer needs a unique group ID
const groupId = `mcp-reader-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const consumer = kafka.consumer({ groupId });
try {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: offset === 'earliest' });
const messages: Array<{
key: string | null;
value: unknown;
offset: string;
partition: number;
timestamp: string;
headers: Record<string, string>;
}> = [];
// If a specific numeric offset was requested, seek to it
if (offset !== 'latest' && offset !== 'earliest') {
consumer.seek({ topic, partition, offset });
}
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => resolve(), timeout_ms);
consumer.run({
eachMessage: async ({ message, partition: msgPartition }) => {
if (msgPartition !== partition && offset !== 'earliest') return;
messages.push({
key: message.key?.toString() ?? null,
value: message.value
? (() => {
try { return JSON.parse(message.value.toString()); }
catch { return message.value.toString(); }
})()
: null,
offset: message.offset,
partition: msgPartition,
timestamp: new Date(parseInt(message.timestamp)).toISOString(),
headers: Object.fromEntries(
Object.entries(message.headers ?? {}).map(([k, v]) => [
k,
Buffer.isBuffer(v) ? v.toString() : String(v ?? '')
])
)
});
if (messages.length >= max_messages) {
clearTimeout(timer);
resolve();
}
}
}).catch(reject);
});
return {
content: [{
type: 'text',
text: JSON.stringify({ messages, count: messages.length }, null, 2)
}]
};
} finally {
await consumer.disconnect().catch(() => {}); // always disconnect
}
}
);
The transient consumer pattern creates a new consumer group for each read operation. This means each read starts from the beginning or latest — it doesn't maintain a committed offset across tool calls. For stateful consumption where you need to track "where did I read up to", store the offset returned in the tool response and pass it as the offset parameter on the next call.
Consumer group lag monitoring tool
Consumer lag (messages in a topic that a consumer group hasn't processed yet) is the primary health signal for Kafka-based pipelines. Expose it as an MCP tool so agents can detect when downstream consumers are falling behind.
import { admin } from './kafka-client.js';
server.tool(
'get_consumer_lag',
{
group_id: z.string().min(1),
topic: z.string().optional()
},
async ({ group_id, topic }) => {
const offsets = await admin.fetchOffsets({ groupId: group_id, topics: topic ? [topic] : [] });
const topicOffsets = await admin.fetchTopicOffsets(topic ?? '');
const lagReport = offsets.map((groupOffset) => {
const latestOffsets = topicOffsets.find(t => t.topic === groupOffset.topic);
if (!latestOffsets) return null;
const partitionLags = groupOffset.partitions.map((partition) => {
const latest = latestOffsets.partitions.find(p => p.partition === partition.partition);
const committedOffset = parseInt(partition.offset);
const latestOffset = latest ? parseInt(latest.offset) : 0;
const lag = Math.max(0, latestOffset - committedOffset);
return { partition: partition.partition, committed: committedOffset, latest: latestOffset, lag };
});
const totalLag = partitionLags.reduce((sum, p) => sum + p.lag, 0);
return { topic: groupOffset.topic, total_lag: totalLag, partitions: partitionLags };
}).filter(Boolean);
return {
content: [{
type: 'text',
text: JSON.stringify({ group_id, lag: lagReport }, null, 2)
}]
};
}
);
Health endpoint: /health/kafka
import express from 'express';
import { admin } from './kafka-client.js';
const app = express();
app.get('/health/kafka', async (_req, res) => {
const start = Date.now();
try {
const cluster = await admin.describeCluster();
const latencyMs = Date.now() - start;
res.json({
status: 'ok',
latency_ms: latencyMs,
cluster_id: cluster.clusterId,
broker_count: cluster.brokers.length,
controller_id: cluster.controller,
brokers: cluster.brokers.map(b => ({
id: b.nodeId,
host: b.host,
port: b.port
}))
});
} catch (err) {
res.status(503).json({
status: 'error',
error: (err as Error).message,
elapsed_ms: Date.now() - start
});
}
});
app.listen(3001);
admin.describeCluster() verifies both broker connectivity and that the admin client's credentials are valid. It returns the list of active brokers — a partial broker list (fewer brokers than expected) can indicate a broker failure even when the cluster is still reachable. Wire AliveMCP to monitor this endpoint and alert on both 503 responses and latency spikes above 500ms, which often precede broker overload.
| Kafka failure mode | Process HTTP check | /health/kafka detects it? |
|---|---|---|
| All brokers unreachable | 200 (process alive) | Yes — describeCluster times out → 503 |
| Single broker failed (partial cluster) | 200 | Partial — broker count decreases in response |
| SASL credentials expired | 200 | Yes — authentication error → 503 |
| Topic partition leader election | 200 | No — cluster appears healthy during election |
| Consumer group rebalancing | 200 | No — monitor consumer lag tool instead |
Frequently asked questions
Should I use KafkaJS or the Confluent Kafka Node.js client?
KafkaJS is the most popular choice for TypeScript MCP servers — it's pure JavaScript (no native binaries), works everywhere Node.js runs, and has excellent TypeScript types. The Confluent node-rdkafka client wraps the native librdkafka library — it has higher throughput and lower latency for high-volume producers, but requires build tooling (node-gyp), doesn't work in Lambda or some containerized environments, and is harder to type. For MCP server use cases (low-to-medium volume, developer ergonomics matter), KafkaJS is the right choice. Switch to node-rdkafka only if you're processing millions of messages per second.
How do I connect to Confluent Cloud from an MCP server?
Confluent Cloud uses SASL/PLAIN over TLS. Set ssl: true and sasl: { mechanism: 'plain', username: process.env.CONFLUENT_API_KEY, password: process.env.CONFLUENT_API_SECRET }. The broker address is your Confluent Cloud bootstrap server (format: pkc-xxxx.region.provider.confluent.cloud:9092). Confluent Cloud enforces topic-level ACLs — the API key needs WRITE on the topic for the producer and READ for consumers. Test connectivity with admin.describeCluster() before wiring tool handlers.
How do I handle message schema evolution in MCP tools?
The simplest approach for MCP tools is to treat Kafka message values as JSON and use Zod to validate the schema at the tool parameter boundary (before publishing) and at the consumer boundary (after reading). When the schema evolves, add optional fields with .optional() — don't remove or rename required fields. For teams that need formal schema management, integrate with Confluent Schema Registry: publish with an Avro/JSON Schema serializer and validate deserialized messages against a registered schema version. The MCP Server Zod Validation guide covers schema validation patterns in detail.
What's the right pattern for MCP tools that need to wait for a Kafka response?
Kafka is fundamentally asynchronous — publish a message and get a result back via a different topic later. For request/reply patterns in MCP tools, use a correlation ID: (1) generate a UUID, (2) publish the request with the UUID as the key, (3) create a transient consumer on the reply topic, (4) poll for a message with a matching correlation ID header, (5) disconnect and return. Set a timeout (5–10 seconds) and return an error if no reply arrives. This pattern works for orchestration tools (trigger a pipeline, wait for the result) but not for high-throughput operations where waiting is too expensive.
Further reading
- MCP Server Redis — pub/sub, streams, and caching for MCP tools
- MCP Server Webhooks — receiving Kafka events via webhook bridge
- MCP Server Zod Validation — schema validation for tool parameters and messages
- MCP Server Error Handling — retry patterns for transient Kafka errors
- MCP Server Health Check — designing health endpoints for uptime monitors