mcp_kafka

williajm/mcp_kafka

3.3

If you are the rightful owner of mcp_kafka and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.

MCP Kafka is a Model Context Protocol server designed to provide AI assistants with secure and controlled access to Apache Kafka clusters.

Tools
12
Resources
0
Prompts
0

MCP Kafka

CategoryStatus
Build & CICI CodeQL Pre-commit Dependency Review License Compliance
SonarCloudQuality Gate Status Coverage Maintainability Rating Reliability Rating Security Rating
SecurityBandit Dependabot
TechnologyPython 3.11-3.14 Kafka License: MIT Code style: ruff type-checked: mypy MCP

A Model Context Protocol (MCP) server for Apache Kafka that provides AI assistants with safe, controlled access to Kafka clusters.

Features

  • 12 Kafka tools for topic management, message operations, consumer groups, and cluster info
  • 2-tier access control: READ (default) and READ/WRITE modes
  • Universal Kafka support: SASL (PLAIN, SCRAM, GSSAPI) and mTLS authentication
  • Safety controls: Protected internal topics, consumer group validation, message size limits
  • Built with FastMCP: Modern MCP server implementation

Installation

# Using uv (recommended)
uv add mcp-kafka

# Using pip
pip install mcp-kafka

Quick Start

1. Configure Environment

# Basic connection
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# SASL authentication
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password

# Enable write operations (disabled by default)
export SAFETY_ALLOW_WRITE_OPERATIONS=true

2. Run the Server

# stdio transport (default, for MCP clients)
uv run mcp-kafka

# HTTP transport (for web integrations)
uv run mcp-kafka --transport http --host 127.0.0.1 --port 8000

# Using convenience scripts
./scripts/http-read.sh      # Read-only HTTP server
./scripts/http-readwrite.sh # Read-write HTTP server
CLI Options
OptionDefaultDescription
--transportstdioTransport type: stdio or http
--host127.0.0.1Host to bind (HTTP only)
--port8000Port to bind (HTTP only)
--health-check-Run health check and exit
--version, -v-Show version and exit

3. Connect to MCP Client

Add to your MCP client configuration (e.g., Claude Desktop):

{
  "mcpServers": {
    "kafka": {
      "command": "uv",
      "args": ["run", "mcp-kafka"],
      "env": {
        "KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
      }
    }
  }
}

Available Tools

Topic Management

ToolAccessDescription
kafka_list_topicsREADList all topics with partition counts
kafka_describe_topicREADGet detailed topic info and configuration
kafka_create_topicREAD/WRITECreate a new topic with partitions, replication factor, and config

Message Operations

ToolAccessDescription
kafka_consume_messagesREADPeek at messages (no offset commit)
kafka_produce_messageREAD/WRITEProduce a message with optional key, headers, and partition

Consumer Group Management

ToolAccessDescription
kafka_list_consumer_groupsREADList all consumer groups
kafka_describe_consumer_groupREADGet group details, members, and lag
kafka_get_consumer_lagREADGet lag per partition
kafka_reset_offsetsREAD/WRITEReset consumer group offsets to earliest, latest, or specific offset

Cluster Information

ToolAccessDescription
kafka_cluster_infoREADGet cluster metadata
kafka_list_brokersREADList all brokers
kafka_get_watermarksREADGet topic partition watermarks

Configuration

For detailed configuration options and examples, see .

Environment Variables

Kafka Connection
VariableDefaultDescription
KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Kafka broker addresses
KAFKA_SECURITY_PROTOCOLPLAINTEXTSecurity protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
KAFKA_CLIENT_IDmcp-kafkaClient identifier
KAFKA_TIMEOUT30Operation timeout in seconds
SASL Authentication
VariableDefaultDescription
KAFKA_SASL_MECHANISM-SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI)
KAFKA_SASL_USERNAME-SASL username
KAFKA_SASL_PASSWORD-SASL password
KAFKA_SASL_KERBEROS_SERVICE_NAMEkafkaKerberos service name
KAFKA_SASL_KERBEROS_KEYTAB-Path to Kerberos keytab
KAFKA_SASL_KERBEROS_PRINCIPAL-Kerberos principal
SSL/TLS
VariableDefaultDescription
KAFKA_SSL_CA_LOCATION-CA certificate path
KAFKA_SSL_CERTIFICATE_LOCATION-Client certificate path
KAFKA_SSL_KEY_LOCATION-Client key path
KAFKA_SSL_KEY_PASSWORD-Client key password
Safety Controls
VariableDefaultDescription
SAFETY_ALLOW_WRITE_OPERATIONSfalseEnable READ/WRITE tools
SAFETY_MAX_MESSAGE_SIZE1048576Max message size in bytes (1MB)
SAFETY_MAX_CONSUME_MESSAGES100Max messages per consume request
SAFETY_TOPIC_BLOCKLIST-Comma-separated blocked topic patterns
Security Controls
VariableDefaultDescription
SECURITY_RATE_LIMIT_ENABLEDtrueEnable rate limiting
SECURITY_RATE_LIMIT_RPM60Max requests per minute
SECURITY_AUDIT_LOG_ENABLEDtrueEnable audit logging
SECURITY_AUDIT_LOG_FILEmcp_audit.logAudit log file path
SECURITY_OAUTH_ENABLEDfalseEnable OAuth/OIDC authentication
SECURITY_OAUTH_ISSUER-OAuth issuer URL (e.g., https://auth.example.com)
SECURITY_OAUTH_AUDIENCE-Expected JWT audience claim
SECURITY_OAUTH_JWKS_URL-JWKS endpoint URL (auto-derived from issuer if not set)

Access Control

MCP Kafka uses a simple 2-tier access control system:

READ Access (Default)

  • List topics, consumer groups, and brokers
  • Describe topics and consumer groups
  • Consume messages (read-only peek)
  • Get cluster info and watermarks

READ/WRITE Access

Requires SAFETY_ALLOW_WRITE_OPERATIONS=true:

  • Create topics
  • Produce messages
  • Reset consumer group offsets

Protected Resources

The following resources are always protected:

  • Internal topics: __consumer_offsets, __transaction_state
  • Internal consumer groups: Groups starting with __
  • Topics in blocklist: Configured via SAFETY_TOPIC_BLOCKLIST

Development

Prerequisites

  • Python 3.11+
  • uv package manager

Setup

# Clone repository
git clone https://github.com/williajm/mcp_kafka.git
cd mcp_kafka

# Install dependencies
uv sync --all-extras

# Run tests
uv run pytest

# Run linting
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/

# Run type checking
uv run mypy src/

Local Kafka for Testing

A Docker Compose environment is provided for local development:

# Start Kafka
docker compose -f docker/docker-compose.yml up -d

# Create test topics
docker compose -f docker/docker-compose.yml exec kafka \
  kafka-topics --create --topic test-topic --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

# Stop Kafka
docker compose -f docker/docker-compose.yml down

License

MIT License - see LICENSE file for details.