jesrzrz/mcp-kafka-client
If you are the rightful owner of mcp-kafka-client and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
The Kafka MCP Server is a Model Context Protocol server designed for seamless interaction with Apache Kafka, including support for Avro schemas via the Schema Registry.
Kafka MCP Server
An MCP (Model Context Protocol) server for interacting with Apache Kafka, including Schema Registry with support for Avro.
🚀 Features
- Topic Listing: Get all available topics in your Kafka cluster
- Schema Querying: Read Avro schemas from Schema Registry
- Payload Generation: Generate valid test payloads based on Avro schemas
- Message Production: Send messages to Kafka with automatic Avro serialization
- Message Consumption: Read and analyze messages from topics with Avro deserialization
- Schema Analysis: Analyze the structure of Avro schemas in detail
📋 Requirements
- Python 3.10 or higher
- Access to a Kafka cluster
- Confluent Schema Registry
🔧 Installation
- Clone or create the project
cd kafka-mcp-server
- Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
- Install dependencies
pip install -r requirements.txt
- Configure environment variables
Copy the example file and edit with your credentials:
cp .env.example .env
Edit the .env file:
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=your-kafka-broker.example.com:9092
# Schema Registry Configuration
SCHEMA_REGISTRY_URL=https://your-schema-registry.example.com
# Kafka Security
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=OAUTHBEARER
🎯 Usage
Run the MCP server
python -m kafka_mcp.server
Configure in Claude Desktop
Edit your Claude Desktop configuration file (claude_desktop_config.json):
{
"mcpServers": {
"kafka": {
"command": "python",
"args": ["-m", "kafka_mcp.server"],
"cwd": "/full/path/to/kafka-mcp-server",
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "your-kafka-broker.example.com:9092",
"AZURE_TENANT_ID": "your-tenant-id",
"AZURE_CLIENT_ID": "your-client-id",
"AZURE_CLIENT_SECRET": "your-client-secret",
"AZURE_SCOPE": "https://your-kafka-resource/.default",
"SCHEMA_REGISTRY_URL": "https://your-schema-registry.example.com",
"KAFKA_SECURITY_PROTOCOL": "SASL_SSL",
"KAFKA_SASL_MECHANISM": "OAUTHBEARER"
}
}
}
}
🛠️ Available Tools
1. list_topics
Lists all available topics in the Kafka cluster.
Parameters:
timeout(optional): Timeout in seconds (default: 10.0)
Usage example in Claude:
List all available Kafka topics
2. get_topic_schema
Gets the Avro schema of a topic from Schema Registry.
Parameters:
topic(required): Topic nameis_key(optional): If true, gets the key schema; if not, the value schema (default: false)
Usage example in Claude:
Get the schema for topic "users"
3. generate_payload
Generates valid test payloads based on the topic's Avro schema.
Parameters:
topic(required): Topic namecustom_values(optional): Custom values for specific fieldscount(optional): Number of payloads to generate (default: 1)is_key(optional): If true, generates for key schema (default: false)
Usage example in Claude:
Generate 5 test payloads for topic "orders"
With custom values:
Generate a payload for topic "users" with email "test@example.com" and name "John Doe"
4. produce_message
Sends a message to Kafka with automatic Avro serialization.
Parameters:
topic(required): Topic namevalue(required): Message value as JSON objectkey(optional): Message key as string or JSON object- If string: encoded as UTF-8 bytes
- If JSON object: serialized with Avro schema (if available) or JSON
schema(optional): Avro schema in JSON format (fetched from registry if not provided)key_schema(optional): Avro schema for the key
Usage examples in Claude:
With string key:
Send a message to topic "users" with key "user-123" and payload: {"id": "123", "name": "Alice", "email": "alice@example.com"}
With JSON object key:
Send a message to topic "users" with key {"userId": "123"} and payload: {"name": "Alice", "email": "alice@example.com"}
Without key:
Send a message to topic "users" with this payload: {"id": "123", "name": "Alice", "email": "alice@example.com"}
5. consume_messages
Reads and analyzes messages from a topic with Avro deserialization.
Parameters:
topic(required): Topic namenum_messages(optional): Number of messages to consume (default: 10)from_beginning(optional): If true, reads from the beginning of the topic (default: false)group_id(optional): Consumer group ID. If not provided, a unique group ID is automatically generated for each request to avoid conflicts between parallel consumers
Usage example in Claude:
Read the last 20 messages from topic "orders"
Note: Each consumption request uses a unique consumer group by default, allowing multiple parallel reads without conflicts. You can specify a custom group_id if you need to track consumer offsets across requests.
6. analyze_schema
Analyzes an Avro schema and provides detailed information about its structure.
Parameters:
topic(required): Topic nameis_key(optional): If true, analyzes key schema (default: false)
Usage example in Claude:
Analyze the schema for topic "products" and show me all fields
Authentication flow
- The server obtains an access token using Azure credentials
- The token is included in each Kafka connection as SASL/OAUTHBEARER
- Tokens are automatically renewed when they expire
📊 Intelligent Payload Generation
The payload generator includes intelligent logic based on field names:
email: Generates valid emails (e.g.,user123@example.com)name: Generates common namesid/uuid: Generates numeric IDsurl/uri: Generates valid URLsphone: Generates phone numbersdate/timestamp: Generates ISO dates/timestampsstatus: Generates common statuses (active, inactive, pending, completed)country: Generates country codes (US, UK, ES, etc.)currency: Generates currency codes (USD, EUR, GBP, etc.)
🧪 Complete Usage Examples
Typical testing workflow
# 1. List available topics
"List all topics"
# 2. View topic schema
"Show me the schema for topic 'user-events'"
# 3. Analyze schema structure
"Analyze the schema for topic 'user-events' and explain each field"
# 4. Generate test payloads
"Generate 3 test payloads for 'user-events'"
# 5. Send test message
"Send a test message to topic 'user-events' with userId='test-123'"
# 6. Verify it was sent
"Read the last 5 messages from 'user-events'"
Message debugging
# Read recent messages
"Read the last 50 messages from topic 'error-logs'"
# Analyze patterns
"Analyze messages from topic 'transactions' and look for patterns in the last 100 messages"
📁 Project Structure
kafka-mcp-server/
├── src/
│ └── kafka_mcp/
│ ├── __init__.py
│ ├── __main__.py # Entry point
│ ├── server.py # Main MCP server
│ ├── config.py # Configuration
│ ├── auth.py # Azure OAuth authentication
│ ├── kafka_client.py # Kafka and Schema Registry client
│ └── payload_generator.py # Avro payload generator
├── tests/ # Tests (to be implemented)
├── .env.example # Configuration example
├── .gitignore
├── pyproject.toml
├── requirements.txt
└── README.md
🐛 Troubleshooting
Error: "Failed to obtain OAuth token"
- Verify that your Azure credentials are correct
- Check that the
AZURE_SCOPEis correct - Review that the application has permissions on the Kafka resource
Error: "No schema found for topic"
- Verify that the topic exists and has a registered schema
- Check the Schema Registry URL
- Ensure that the subject follows the format
<topic>-valueor<topic>-key
Error: "Connection timeout"
- Verify network connectivity to the Kafka cluster
- Check that the
KAFKA_BOOTSTRAP_SERVERSare correct - Review SSL/TLS configuration if applicable
🔜 Roadmap
- Unit and integration tests
- Support for JSON Schema and Protobuf
- Metrics and monitoring
- Standalone CLI for quick testing
- Support for message transformations
- Kafka Connect integration
📝 License
MIT License
🤝 Contributions
Contributions are welcome. Please:
- Fork the project
- Create a branch for your feature (
git checkout -b feature/AmazingFeature) - Commit your changes (
git commit -m 'Add some AmazingFeature') - Push to the branch (
git push origin feature/AmazingFeature) - Open a Pull Request
👥 Author
Jesus Rodriguez
🙏 Acknowledgments
- Confluent Platform for the Kafka libraries
- Anthropic for the MCP protocol
- Apache Kafka community