Guide · Database & Event Architecture
Data Pipeline Integration for MCP Servers — CDC, Kafka, and keeping tool data fresh
An MCP tool that searches product inventory or answers "what deployments are running right now?" is only as good as the data behind it. Query-on-demand (hitting the source database on every tool call) is always current but adds per-call latency and connection pressure to the source system. Materialized views updated by a change data capture (CDC) pipeline are faster and decouple the MCP server from the source database's availability — but introduce a new failure mode: the pipeline can lag or stop, serving stale data that produces wrong answers without errors. This guide covers three data freshness strategies, CDC with PostgreSQL logical replication, Kafka consumer integration with KafkaJS, Debezium patterns, and how to monitor pipeline freshness so AliveMCP alerts when your tool data goes stale — not just when the server goes down.
TL;DR
For near-real-time tool data: use CDC (PostgreSQL logical replication or Debezium → Kafka) to maintain a local materialized view that tool handlers query directly. Track last_updated_at in your local view and expose a data_freshness check in /health: if the pipeline has been silent longer than your SLA allows, return 503 degraded. Point AliveMCP at that health URL — you get alerts when data goes stale, not just when the server goes dark.
Three data freshness strategies
Before choosing CDC, understand all three options and their tradeoffs:
| Strategy | Freshness | Latency per tool call | Source DB load | Pipeline complexity |
|---|---|---|---|---|
| Query-on-demand | Always current | Database query time (10–200ms) | High (every tool call = one query) | None — direct connection |
| Scheduled refresh (cron) | Lag = refresh interval (minutes) | <5ms (read from local cache) | Low (batch query at interval) | Low — cron + local store |
| CDC / streaming pipeline | Sub-second to seconds | <5ms (read from local store) | Low (WAL-based, no polling) | High — pipeline infrastructure |
Use query-on-demand when: the source database can absorb the load, tool call latency is acceptable, and data must always be current (financial balances, reservation status). Use scheduled refresh when: moderate staleness is acceptable (minutes), source DB load is a concern, and you want minimal operational overhead. Use CDC when: you need near-real-time freshness (<10 seconds), tool calls must be fast, or the source database cannot absorb high query concurrency from the MCP server.
PostgreSQL CDC with logical replication
PostgreSQL logical replication streams committed changes from the WAL (write-ahead log) to a subscriber. The MCP server subscribes to changes in the tables it cares about, maintaining a local SQLite or PostgreSQL materialized view.
Setting up logical replication
-- On the source PostgreSQL server (requires wal_level = logical in postgresql.conf)
-- Create a publication for the tables the MCP server needs
CREATE PUBLICATION mcp_server_feed
FOR TABLE products, inventory_levels, deployments;
-- The MCP server subscribes and creates a logical replication slot
// MCP server: subscribe to logical replication changes
// Using pg-logical-replication npm package
import { LogicalReplicationService, PgoutputPlugin } from 'pg-logical-replication';
const service = new LogicalReplicationService({
host: process.env.PGHOST_SOURCE,
database: process.env.PGDATABASE_SOURCE,
user: process.env.PGREPLICATION_USER,
password: process.env.PGREPLICATION_PASSWORD,
});
const plugin = new PgoutputPlugin({
protoVersion: 1,
publicationNames: ['mcp_server_feed'],
});
// Local materialized store (SQLite or local PostgreSQL)
const localDb = new Database('./local_mirror.db');
service.on('data', async (lsn, log) => {
if (log.tag === 'insert' || log.tag === 'update') {
// Upsert into local mirror
await upsertRecord(localDb, log.relation.name, log.new);
await recordFreshness(log.relation.name);
}
if (log.tag === 'delete') {
await deleteRecord(localDb, log.relation.name, log.key);
}
});
// Track freshness per table
const tableFreshness: Map = new Map();
async function recordFreshness(tableName: string) {
tableFreshness.set(tableName, new Date());
await localDb.run(
'INSERT OR REPLACE INTO _pipeline_health (table_name, last_updated_at) VALUES (?, ?)',
[tableName, new Date().toISOString()]
);
}
// Start replication from last confirmed LSN (persisted across restarts)
const lastLsn = await getLastConfirmedLsn(localDb);
await service.subscribe(plugin, 'mcp_server_slot', lastLsn);
Initial snapshot before streaming
A logical replication slot only streams changes that occur after the slot was created. Before starting to stream, take a full snapshot of the source tables to populate the local mirror:
// Startup: snapshot + stream pattern
async function initializeMirror() {
const slotExists = await checkSlotExists();
if (!slotExists) {
// Take consistent snapshot while creating slot
const snapshotName = await createReplicationSlotWithSnapshot();
await bulkLoadFromSnapshot(snapshotName);
console.log('Initial snapshot loaded');
} else {
console.log('Resuming from existing slot');
}
// Start streaming from last confirmed LSN
await startLogicalReplication();
}
Kafka consumer integration with KafkaJS
For organizations where source data already flows through Kafka (via Debezium CDC, application-level event publishing, or ETL pipelines), the MCP server consumes from the relevant Kafka topics and maintains its local view.
import { Kafka, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: `mcp-server-${process.env.HOSTNAME}`,
brokers: process.env.KAFKA_BROKERS!.split(','),
logLevel: logLevel.WARN,
});
const consumer = kafka.consumer({
groupId: 'mcp-server-inventory', // one group per MCP server service
sessionTimeout: 30_000,
heartbeatInterval: 3_000,
});
await consumer.connect();
await consumer.subscribe({
topics: ['inventory.products', 'inventory.stock_levels'],
fromBeginning: false, // start from latest — snapshot handles historical state
});
let lastMessageAt: Date | null = null;
const consumerLag: Map = new Map();
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
lastMessageAt = new Date();
if (topic === 'inventory.products') {
await upsertProduct(localDb, event);
} else if (topic === 'inventory.stock_levels') {
await upsertStockLevel(localDb, event);
}
// KafkaJS auto-commits offset after eachMessage resolves
// Messages are processed at-least-once — handlers must be idempotent
},
});
Consumer lag monitoring
Consumer lag (how many messages are behind the latest offset) is the primary freshness metric for Kafka-based pipelines. High lag means the MCP server's local view is stale:
// Fetch consumer lag from Kafka admin API
const admin = kafka.admin();
await admin.connect();
async function fetchConsumerLag(): Promise
Debezium CDC → Kafka → MCP server
Debezium is an open-source CDC platform that captures row-level changes from database transaction logs and publishes them to Kafka topics. For PostgreSQL, Debezium uses logical replication (the same mechanism as direct CDC) but adds a managed connector, schema registry integration, and structured change events.
The MCP server is a Kafka consumer — it does not interact with Debezium directly:
| Component | Role | What it manages |
|---|---|---|
| PostgreSQL | Source of truth | Rows + WAL |
| Debezium connector | CDC agent | WAL reading, slot management, schema tracking |
| Kafka | Change event log | Durable, replayable stream of row changes |
| MCP server | Consumer | Local materialized view, tool handler reads |
Debezium change events have a structured envelope:
// Debezium change event structure (PostgreSQL connector)
interface DebeziumEvent {
payload: {
op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
before: Record | null;
after: Record | null;
source: {
table: string;
db: string;
ts_ms: number; // source DB commit timestamp — use for freshness tracking
};
ts_ms: number; // Debezium processing timestamp
};
}
// Process Debezium event in MCP server consumer
async function processDebeziumEvent(event: DebeziumEvent) {
const { op, after, before, source } = event.payload;
// Track source DB timestamp for accurate freshness measurement
const sourceTs = new Date(source.ts_ms);
const pipelineLag = Date.now() - source.ts_ms; // end-to-end CDC lag in ms
if (op === 'c' || op === 'u' || op === 'r') {
await upsertRecord(localDb, source.table, after!);
} else if (op === 'd') {
await deleteRecord(localDb, source.table, before!);
}
// Record freshness using source DB timestamp (not processing timestamp)
tableFreshness.set(source.table, sourceTs);
}
Data freshness monitoring with AliveMCP
A protocol probe confirms the MCP server accepts connections. It does not confirm the local materialized view is current. Data freshness must be monitored separately, via the health endpoint.
Freshness check in /health
// Data freshness check in /health endpoint
app.get('/health', async (req, res) => {
const checks: Record = {};
let status = 'ok';
// Check freshness per table
const FRESHNESS_THRESHOLDS: Record = {
products: 300, // 5 minutes acceptable staleness
inventory_levels: 60, // 1 minute — agents act on this data
deployments: 30, // 30 seconds — near-real-time required
};
for (const [tableName, thresholdSeconds] of Object.entries(FRESHNESS_THRESHOLDS)) {
const lastUpdated = tableFreshness.get(tableName);
if (!lastUpdated) {
checks[`${tableName}_freshness`] = 'never_updated';
status = 'degraded';
continue;
}
const ageSec = (Date.now() - lastUpdated.getTime()) / 1000;
checks[`${tableName}_freshness`] = {
last_updated_at: lastUpdated.toISOString(),
age_seconds: Math.round(ageSec),
status: ageSec < thresholdSeconds ? 'fresh' : 'stale',
};
if (ageSec > thresholdSeconds) {
status = 'degraded';
}
}
// Also check consumer lag if using Kafka
let totalLag = 0;
for (const lag of consumerLag.values()) totalLag += lag;
checks.consumer_lag_total = totalLag;
if (totalLag > 5000) {
checks.consumer_lag_status = 'high';
status = 'degraded';
}
res.status(status === 'ok' ? 200 : 503).json({ status, checks });
});
Point AliveMCP's custom health URL at this endpoint. AliveMCP probes every 60 seconds. When the CDC pipeline stalls (Debezium connector stopped, Kafka consumer group rebalancing stuck, PostgreSQL replication slot lagging), the freshness check exceeds the threshold and AliveMCP alerts. This catches data-quality failures that produce wrong answers without producing errors — the most dangerous class of failure for AI agent workflows.
Handling pipeline failures and backfills
Pipelines fail. Debezium connectors restart. Kafka brokers rebalance. The MCP server must handle these failures gracefully without serving data from an unboundedly-stale view.
Circuit breaker for stale data
// Tool handler checks data freshness before serving results
const MAX_STALENESS_SECONDS = 120;
async function checkFreshness(tableName: string): Promise {
const lastUpdated = tableFreshness.get(tableName);
if (!lastUpdated) {
throw new Error(`data_not_yet_populated: ${tableName}`);
}
const ageSec = (Date.now() - lastUpdated.getTime()) / 1000;
if (ageSec > MAX_STALENESS_SECONDS) {
throw new Error(`data_stale: ${tableName} last updated ${Math.round(ageSec)}s ago`);
}
}
server.tool('search_products', { query: z.string() }, async (args) => {
try {
await checkFreshness('products');
} catch (err) {
// Return error response instead of stale data
return {
content: [{
type: 'text',
text: JSON.stringify({ error: err.message, suggestion: 'try again in 30s' }),
}],
isError: true,
};
}
const results = await localDb.all(
'SELECT * FROM products WHERE name LIKE ? LIMIT 20',
[`%${args.query}%`]
);
return { content: [{ type: 'text', text: JSON.stringify(results) }] };
});
Returning isError: true on data staleness is better than returning stale data silently. The AI agent receives a clear signal that the tool is temporarily unavailable and can retry later or use a fallback. Silently returning stale data causes the agent to make decisions based on incorrect information without knowing it.
Automatic backfill on pipeline gap detection
// Trigger a full table refresh when pipeline gap exceeds threshold
const BACKFILL_THRESHOLD_SECONDS = 300; // 5 minutes without update → backfill
setInterval(async () => {
for (const [tableName, lastUpdated] of tableFreshness) {
const ageSec = (Date.now() - lastUpdated.getTime()) / 1000;
if (ageSec > BACKFILL_THRESHOLD_SECONDS) {
console.log(JSON.stringify({
level: 'warn',
event: 'pipeline_gap_detected',
table: tableName,
age_seconds: ageSec,
action: 'triggering_backfill',
}));
// Fall back to query-on-demand until pipeline recovers
await backfillFromSource(tableName);
}
}
}, 60_000);
Frequently asked questions
When should I use CDC vs query-on-demand for MCP tool data?
Use query-on-demand when: the data changes infrequently (less than once per minute per row), the source database can handle the query load from all MCP tool calls, and per-call latency of 10–200ms is acceptable. Use CDC when: many tool calls per second query the same data (read amplification), the source database cannot absorb direct MCP server queries without performance impact, tool call latency must be <10ms, or you need to aggregate data from multiple source tables into a denormalized view that would require expensive JOINs on every tool call. CDC adds operational complexity — it's a tradeoff that pays off at scale.
What happens to the replication slot if the MCP server is down for a long time?
PostgreSQL replication slots prevent WAL from being cleaned up until the slot consumer has confirmed receipt. If the MCP server is down for hours, the WAL accumulates on the source PostgreSQL server, consuming disk space. PostgreSQL will not reap old WAL files while an active replication slot is behind. Set max_slot_wal_keep_size in PostgreSQL 13+ to put a disk limit on per-slot WAL retention — if the slot falls too far behind, it is dropped and the MCP server must re-snapshot. Monitor disk usage on the source server and alert if WAL is growing. For the Debezium approach, this concern shifts to the Debezium connector (managed by Kafka Connect), which handles slot management and monitoring as part of its operational profile.
How do I handle schema changes (column additions/removals) in CDC?
Schema changes are the most disruptive event in a CDC pipeline. For PostgreSQL logical replication without Debezium: column additions to the source table propagate as NULL for existing rows; your upsert handler must tolerate extra fields. Column removals from the source table cause the replication stream to omit the field — your handler must not fail on missing fields. For Debezium: it integrates with Confluent Schema Registry to track schema evolution; consumers automatically receive the correct schema version per message. The general rule: make your change event handlers schema-tolerant (use optional fields, default missing fields to null) and test schema migrations against a CDC sandbox before deploying to production.
Can I use CDC to feed multiple MCP servers from the same source?
Yes, but manage replication slots carefully. Each MCP server that uses direct PostgreSQL logical replication needs its own replication slot (one slot per consumer). Multiple slots each hold WAL independently, multiplying disk usage pressure if any slot falls behind. The Kafka approach scales better: Debezium uses one slot from PostgreSQL, publishes to Kafka topics, and N MCP servers each create a Kafka consumer group without adding additional load to PostgreSQL. Kafka's consumer group model also allows horizontal scaling of a single MCP service — multiple instances share partitions rather than each needing their own WAL slot.
What is the minimum end-to-end CDC latency I can expect?
For direct PostgreSQL logical replication (pg-logical-replication or similar): 50–500ms end-to-end from commit on primary to update visible in the MCP server's local view — dominated by the WAL delivery latency. For Debezium → Kafka → MCP server: 200ms–2s — the extra hop through Kafka Connect and the Kafka broker adds latency. For most MCP use cases (not high-frequency trading), sub-second CDC lag is more than sufficient. If you need latency below 50ms, PostgreSQL LISTEN/NOTIFY (fired synchronously on commit) gives near-instantaneous notification with a small payload, and the MCP server queries the source directly for the full record.
Further reading
- Event-Driven Architecture for MCP Servers — pub/sub, webhooks, and real-time state
- Read Replica Routing for MCP Servers — write splitting and lag detection
- SQLite for MCP Servers — local storage patterns and WAL configuration
- MCP Server Health Checks — including data freshness in readiness probes
- Graceful Degradation for MCP Servers — partial responses when dependencies fail