kafka-mcp-python

veenaypatil/kafka-mcp-python

3.3

If you are the rightful owner of kafka-mcp-python and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to henry@mcphub.com.

This MCP server connects to an in-house Kafka cluster to answer plain-text questions about Kafka topics and consumers.

Tools
1
Resources
0
Prompts
0

Kafka MCP Server

Purpose

This MCP (Model Context Protocol) server connects to in-house Kafka to answer plain-text questions about topics, consumers

Setup

Prerequisites

  • Python 3.10+ (Required for MCP package)
  • Access to Kafka cluster
  • Schema Registry (optional)
  • Kafka Connect (optional)

Installation

Option 1: Direct Installation (Python 3.10+)
pip install mcp kafka-python aiohttp
Option 2: Using conda/miniconda
conda create -n kafka-mcp python=3.10
conda activate kafka-mcp
pip install mcp kafka-python aiohttp
Option 3: Using pyenv with built-in venv
pyenv install 3.10.12 && pyenv local 3.10.12 && python -m venv kafka-mcp && source kafka-mcp/bin/activate && pip install --upgrade pip && pip install mcp kafka-python aiohttp

IntelliJ / PyCharm Interpreter Note

If the IDE still shows Python 3.9 after activating 3.10:

  • Open Settings > Project > Python Interpreter.
  • Click gear > Add > Existing and select: ./kafka-mcp/bin/python
  • Apply & reindex.
    Terminal activation does not automatically update the IDE interpreter.

Environment Variables

Set before starting MCP server (Claude/other client):

KAFKA_BOOTSTRAP_SERVERS=host1:9092[,host2:9092]
TOPIC_CONSUMERS_CACHE_TTL=600                     # seconds (optional)

Available MCP Tools (Kafka)

ToolPurposeKey Params
consume_messagesFetch messages with strategies latest / earliest / timestamptopic, max_messages, offset_strategy, timestamp
produce_messageSend a messagetopic, message
get_topic_partitionsGet partition count onlytopic
count_messages_last_hoursCount messages produced in last X hourstopic, hours
get_topic_sizeDisk usage via kafka-log-dirstopic
describe_consumer_groupMembers, offsets, lag per topic/partitiongroup_id
get_consumer_group_lagAggregated lag viewgroup_id
get_topic_consumersList consumer groups per topic (cached)topic, force_refresh
clear_topic_consumers_cacheReset cache
get_cache_statusCache diagnostics
describe_topicPartitions (leader/replicas/ISR), retention, replication factor, disk sizetopic
generate_topic_consumers_reportCSV of topics → consumersoutput_file_path (optional)

Consumer Groups Cache

First call to get_topic_consumers builds a global in‑process cache (may take ~1 minute on large clusters). Subsequent calls are fast until TTL expires (default 600s). Use force_refresh=true to rebuild early. clear_topic_consumers_cache resets manually.

Topic Consumers CSV Report

generate_topic_consumers_report writes a CSV to ~/Downloads by default (or a provided filename placed there). Columns:

  1. Topic Name
  2. Number of Consumers
  3. Consumer Groups List (semicolon separated)

Demo

output1

Retention & Disk Notes

describe_topic computes retention (retention.ms → human readable) and queries size via kafka-log-dirs (skipping initial header lines). Failures in size lookup return zero gracefully.

Troubleshooting

  • Topic not found: confirm spelling and cluster (KAFKA_BOOTSTRAP_SERVERS).
  • Slow first consumer lookup: expected (cache build). Check get_cache_status.
  • Empty consumers but known active: cache may be stale → force_refresh=true.
  • Large clusters: increase TOPIC_CONSUMERS_CACHE_TTL to reduce rebuild frequency.

MCP Tool Available

  • ask_kafka_question: Accepts plain text questions about Kafka topics and returns relevant information