williajm/mcp_kafka
If you are the rightful owner of mcp_kafka and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
MCP Kafka is a Model Context Protocol server designed to provide AI assistants with secure and controlled access to Apache Kafka clusters.
MCP Kafka
A Model Context Protocol (MCP) server for Apache Kafka that provides AI assistants with safe, controlled access to Kafka clusters.
Features
- 12 Kafka tools for topic management, message operations, consumer groups, and cluster info
- 2-tier access control: READ (default) and READ/WRITE modes
- Universal Kafka support: SASL (PLAIN, SCRAM, GSSAPI) and mTLS authentication
- Safety controls: Protected internal topics, consumer group validation, message size limits
- Built with FastMCP: Modern MCP server implementation
Installation
# Using uv (recommended)
uv add mcp-kafka
# Using pip
pip install mcp-kafka
Quick Start
1. Configure Environment
# Basic connection
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# SASL authentication
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password
# Enable write operations (disabled by default)
export SAFETY_ALLOW_WRITE_OPERATIONS=true
2. Run the Server
# stdio transport (default, for MCP clients)
uv run mcp-kafka
# HTTP transport (for web integrations)
uv run mcp-kafka --transport http --host 127.0.0.1 --port 8000
# Using convenience scripts
./scripts/http-read.sh # Read-only HTTP server
./scripts/http-readwrite.sh # Read-write HTTP server
CLI Options
| Option | Default | Description |
|---|---|---|
--transport | stdio | Transport type: stdio or http |
--host | 127.0.0.1 | Host to bind (HTTP only) |
--port | 8000 | Port to bind (HTTP only) |
--health-check | - | Run health check and exit |
--version, -v | - | Show version and exit |
3. Connect to MCP Client
Add to your MCP client configuration (e.g., Claude Desktop):
{
"mcpServers": {
"kafka": {
"command": "uv",
"args": ["run", "mcp-kafka"],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
}
}
}
}
Available Tools
Topic Management
| Tool | Access | Description |
|---|---|---|
kafka_list_topics | READ | List all topics with partition counts |
kafka_describe_topic | READ | Get detailed topic info and configuration |
kafka_create_topic | READ/WRITE | Create a new topic with partitions, replication factor, and config |
Message Operations
| Tool | Access | Description |
|---|---|---|
kafka_consume_messages | READ | Peek at messages (no offset commit) |
kafka_produce_message | READ/WRITE | Produce a message with optional key, headers, and partition |
Consumer Group Management
| Tool | Access | Description |
|---|---|---|
kafka_list_consumer_groups | READ | List all consumer groups |
kafka_describe_consumer_group | READ | Get group details, members, and lag |
kafka_get_consumer_lag | READ | Get lag per partition |
kafka_reset_offsets | READ/WRITE | Reset consumer group offsets to earliest, latest, or specific offset |
Cluster Information
| Tool | Access | Description |
|---|---|---|
kafka_cluster_info | READ | Get cluster metadata |
kafka_list_brokers | READ | List all brokers |
kafka_get_watermarks | READ | Get topic partition watermarks |
Configuration
For detailed configuration options and examples, see .
Environment Variables
Kafka Connection
| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Kafka broker addresses |
KAFKA_SECURITY_PROTOCOL | PLAINTEXT | Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) |
KAFKA_CLIENT_ID | mcp-kafka | Client identifier |
KAFKA_TIMEOUT | 30 | Operation timeout in seconds |
SASL Authentication
| Variable | Default | Description |
|---|---|---|
KAFKA_SASL_MECHANISM | - | SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI) |
KAFKA_SASL_USERNAME | - | SASL username |
KAFKA_SASL_PASSWORD | - | SASL password |
KAFKA_SASL_KERBEROS_SERVICE_NAME | kafka | Kerberos service name |
KAFKA_SASL_KERBEROS_KEYTAB | - | Path to Kerberos keytab |
KAFKA_SASL_KERBEROS_PRINCIPAL | - | Kerberos principal |
SSL/TLS
| Variable | Default | Description |
|---|---|---|
KAFKA_SSL_CA_LOCATION | - | CA certificate path |
KAFKA_SSL_CERTIFICATE_LOCATION | - | Client certificate path |
KAFKA_SSL_KEY_LOCATION | - | Client key path |
KAFKA_SSL_KEY_PASSWORD | - | Client key password |
Safety Controls
| Variable | Default | Description |
|---|---|---|
SAFETY_ALLOW_WRITE_OPERATIONS | false | Enable READ/WRITE tools |
SAFETY_MAX_MESSAGE_SIZE | 1048576 | Max message size in bytes (1MB) |
SAFETY_MAX_CONSUME_MESSAGES | 100 | Max messages per consume request |
SAFETY_TOPIC_BLOCKLIST | - | Comma-separated blocked topic patterns |
Security Controls
| Variable | Default | Description |
|---|---|---|
SECURITY_RATE_LIMIT_ENABLED | true | Enable rate limiting |
SECURITY_RATE_LIMIT_RPM | 60 | Max requests per minute |
SECURITY_AUDIT_LOG_ENABLED | true | Enable audit logging |
SECURITY_AUDIT_LOG_FILE | mcp_audit.log | Audit log file path |
SECURITY_OAUTH_ENABLED | false | Enable OAuth/OIDC authentication |
SECURITY_OAUTH_ISSUER | - | OAuth issuer URL (e.g., https://auth.example.com) |
SECURITY_OAUTH_AUDIENCE | - | Expected JWT audience claim |
SECURITY_OAUTH_JWKS_URL | - | JWKS endpoint URL (auto-derived from issuer if not set) |
Access Control
MCP Kafka uses a simple 2-tier access control system:
READ Access (Default)
- List topics, consumer groups, and brokers
- Describe topics and consumer groups
- Consume messages (read-only peek)
- Get cluster info and watermarks
READ/WRITE Access
Requires SAFETY_ALLOW_WRITE_OPERATIONS=true:
- Create topics
- Produce messages
- Reset consumer group offsets
Protected Resources
The following resources are always protected:
- Internal topics:
__consumer_offsets,__transaction_state - Internal consumer groups: Groups starting with
__ - Topics in blocklist: Configured via
SAFETY_TOPIC_BLOCKLIST
Development
Prerequisites
- Python 3.11+
- uv package manager
Setup
# Clone repository
git clone https://github.com/williajm/mcp_kafka.git
cd mcp_kafka
# Install dependencies
uv sync --all-extras
# Run tests
uv run pytest
# Run linting
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/
# Run type checking
uv run mypy src/
Local Kafka for Testing
A Docker Compose environment is provided for local development:
# Start Kafka
docker compose -f docker/docker-compose.yml up -d
# Create test topics
docker compose -f docker/docker-compose.yml exec kafka \
kafka-topics --create --topic test-topic --partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
# Stop Kafka
docker compose -f docker/docker-compose.yml down
License
MIT License - see LICENSE file for details.