mcp-kafka-client

jesrzrz/mcp-kafka-client

3.1

If you are the rightful owner of mcp-kafka-client and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.

The Kafka MCP Server is a Model Context Protocol server designed for seamless interaction with Apache Kafka, including support for Avro schemas via the Schema Registry.

Tools
6
Resources
0
Prompts
0

Kafka MCP Server

An MCP (Model Context Protocol) server for interacting with Apache Kafka, including Schema Registry with support for Avro.

🚀 Features

  • Topic Listing: Get all available topics in your Kafka cluster
  • Schema Querying: Read Avro schemas from Schema Registry
  • Payload Generation: Generate valid test payloads based on Avro schemas
  • Message Production: Send messages to Kafka with automatic Avro serialization
  • Message Consumption: Read and analyze messages from topics with Avro deserialization
  • Schema Analysis: Analyze the structure of Avro schemas in detail

📋 Requirements

  • Python 3.10 or higher
  • Access to a Kafka cluster
  • Confluent Schema Registry

🔧 Installation

  1. Clone or create the project
cd kafka-mcp-server
  1. Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
  1. Install dependencies
pip install -r requirements.txt
  1. Configure environment variables

Copy the example file and edit with your credentials:

cp .env.example .env

Edit the .env file:

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=your-kafka-broker.example.com:9092

# Schema Registry Configuration
SCHEMA_REGISTRY_URL=https://your-schema-registry.example.com

# Kafka Security
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=OAUTHBEARER

🎯 Usage

Run the MCP server

python -m kafka_mcp.server

Configure in Claude Desktop

Edit your Claude Desktop configuration file (claude_desktop_config.json):

{
  "mcpServers": {
    "kafka": {
      "command": "python",
      "args": ["-m", "kafka_mcp.server"],
      "cwd": "/full/path/to/kafka-mcp-server",
      "env": {
        "KAFKA_BOOTSTRAP_SERVERS": "your-kafka-broker.example.com:9092",
        "AZURE_TENANT_ID": "your-tenant-id",
        "AZURE_CLIENT_ID": "your-client-id",
        "AZURE_CLIENT_SECRET": "your-client-secret",
        "AZURE_SCOPE": "https://your-kafka-resource/.default",
        "SCHEMA_REGISTRY_URL": "https://your-schema-registry.example.com",
        "KAFKA_SECURITY_PROTOCOL": "SASL_SSL",
        "KAFKA_SASL_MECHANISM": "OAUTHBEARER"
      }
    }
  }
}

🛠️ Available Tools

1. list_topics

Lists all available topics in the Kafka cluster.

Parameters:

  • timeout (optional): Timeout in seconds (default: 10.0)

Usage example in Claude:

List all available Kafka topics

2. get_topic_schema

Gets the Avro schema of a topic from Schema Registry.

Parameters:

  • topic (required): Topic name
  • is_key (optional): If true, gets the key schema; if not, the value schema (default: false)

Usage example in Claude:

Get the schema for topic "users"

3. generate_payload

Generates valid test payloads based on the topic's Avro schema.

Parameters:

  • topic (required): Topic name
  • custom_values (optional): Custom values for specific fields
  • count (optional): Number of payloads to generate (default: 1)
  • is_key (optional): If true, generates for key schema (default: false)

Usage example in Claude:

Generate 5 test payloads for topic "orders"

With custom values:

Generate a payload for topic "users" with email "test@example.com" and name "John Doe"

4. produce_message

Sends a message to Kafka with automatic Avro serialization.

Parameters:

  • topic (required): Topic name
  • value (required): Message value as JSON object
  • key (optional): Message key as string or JSON object
    • If string: encoded as UTF-8 bytes
    • If JSON object: serialized with Avro schema (if available) or JSON
  • schema (optional): Avro schema in JSON format (fetched from registry if not provided)
  • key_schema (optional): Avro schema for the key

Usage examples in Claude:

With string key:

Send a message to topic "users" with key "user-123" and payload: {"id": "123", "name": "Alice", "email": "alice@example.com"}

With JSON object key:

Send a message to topic "users" with key {"userId": "123"} and payload: {"name": "Alice", "email": "alice@example.com"}

Without key:

Send a message to topic "users" with this payload: {"id": "123", "name": "Alice", "email": "alice@example.com"}

5. consume_messages

Reads and analyzes messages from a topic with Avro deserialization.

Parameters:

  • topic (required): Topic name
  • num_messages (optional): Number of messages to consume (default: 10)
  • from_beginning (optional): If true, reads from the beginning of the topic (default: false)
  • group_id (optional): Consumer group ID. If not provided, a unique group ID is automatically generated for each request to avoid conflicts between parallel consumers

Usage example in Claude:

Read the last 20 messages from topic "orders"

Note: Each consumption request uses a unique consumer group by default, allowing multiple parallel reads without conflicts. You can specify a custom group_id if you need to track consumer offsets across requests.

6. analyze_schema

Analyzes an Avro schema and provides detailed information about its structure.

Parameters:

  • topic (required): Topic name
  • is_key (optional): If true, analyzes key schema (default: false)

Usage example in Claude:

Analyze the schema for topic "products" and show me all fields

Authentication flow

  1. The server obtains an access token using Azure credentials
  2. The token is included in each Kafka connection as SASL/OAUTHBEARER
  3. Tokens are automatically renewed when they expire

📊 Intelligent Payload Generation

The payload generator includes intelligent logic based on field names:

  • email: Generates valid emails (e.g., user123@example.com)
  • name: Generates common names
  • id/uuid: Generates numeric IDs
  • url/uri: Generates valid URLs
  • phone: Generates phone numbers
  • date/timestamp: Generates ISO dates/timestamps
  • status: Generates common statuses (active, inactive, pending, completed)
  • country: Generates country codes (US, UK, ES, etc.)
  • currency: Generates currency codes (USD, EUR, GBP, etc.)

🧪 Complete Usage Examples

Typical testing workflow

# 1. List available topics
"List all topics"

# 2. View topic schema
"Show me the schema for topic 'user-events'"

# 3. Analyze schema structure
"Analyze the schema for topic 'user-events' and explain each field"

# 4. Generate test payloads
"Generate 3 test payloads for 'user-events'"

# 5. Send test message
"Send a test message to topic 'user-events' with userId='test-123'"

# 6. Verify it was sent
"Read the last 5 messages from 'user-events'"

Message debugging

# Read recent messages
"Read the last 50 messages from topic 'error-logs'"

# Analyze patterns
"Analyze messages from topic 'transactions' and look for patterns in the last 100 messages"

📁 Project Structure

kafka-mcp-server/
├── src/
│   └── kafka_mcp/
│       ├── __init__.py
│       ├── __main__.py          # Entry point
│       ├── server.py             # Main MCP server
│       ├── config.py             # Configuration
│       ├── auth.py               # Azure OAuth authentication
│       ├── kafka_client.py       # Kafka and Schema Registry client
│       └── payload_generator.py  # Avro payload generator
├── tests/                        # Tests (to be implemented)
├── .env.example                  # Configuration example
├── .gitignore
├── pyproject.toml
├── requirements.txt
└── README.md

🐛 Troubleshooting

Error: "Failed to obtain OAuth token"

  • Verify that your Azure credentials are correct
  • Check that the AZURE_SCOPE is correct
  • Review that the application has permissions on the Kafka resource

Error: "No schema found for topic"

  • Verify that the topic exists and has a registered schema
  • Check the Schema Registry URL
  • Ensure that the subject follows the format <topic>-value or <topic>-key

Error: "Connection timeout"

  • Verify network connectivity to the Kafka cluster
  • Check that the KAFKA_BOOTSTRAP_SERVERS are correct
  • Review SSL/TLS configuration if applicable

🔜 Roadmap

  • Unit and integration tests
  • Support for JSON Schema and Protobuf
  • Metrics and monitoring
  • Standalone CLI for quick testing
  • Support for message transformations
  • Kafka Connect integration

📝 License

MIT License

🤝 Contributions

Contributions are welcome. Please:

  1. Fork the project
  2. Create a branch for your feature (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

👥 Author

Jesus Rodriguez

🙏 Acknowledgments

  • Confluent Platform for the Kafka libraries
  • Anthropic for the MCP protocol
  • Apache Kafka community