BrooksIan/CDF-Kafka-MCP-Server
If you are the rightful owner of CDF-Kafka-MCP-Server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
The CDF Kafka MCP Server is a Model Context Protocol server designed to integrate AI models with Apache Kafka, featuring Apache Knox authentication for secure enterprise environments.
CDF Kafka MCP Server
A Model Context Protocol (MCP) server for Apache Kafka with Apache Knox authentication support, inspired by the SSB-MCP-Server implementation.
Overview
The CDF Kafka MCP Server provides a comprehensive bridge between AI models and Apache Kafka clusters through the Model Context Protocol. It enables AI applications to interact with Kafka topics, produce and consume messages, and manage consumer groups through a secure, enterprise-ready interface with Apache Knox authentication.
📸 Visual Examples: This README includes screenshots showing how to create topics, add data, and list topics using the SMM (Streams Messaging Manager) web interface. See the Visual Examples section for step-by-step visual guides.
Features
🔐 Enterprise Security
- Apache Knox Gateway authentication support with Admin API integration
- CDP Cloud authentication with token and basic auth support
- Multiple authentication methods (token-based, username/password, CDP tokens)
- TLS/SSL configuration for secure communications
- Configurable SSL certificate verification
- Service discovery through Knox Gateway
🚀 Enhanced Kafka Operations
- Multi-approach Topic Creation: Knox Gateway, CDP Cloud, Connect API, Admin Client
- Multi-approach Message Production: Direct, Knox, CDP, Connect API
- Advanced Topic Management: Create, list, describe, delete, configure with fallback methods
- Message Operations: Produce, consume with metadata and multiple transport methods
- Batch Message Production: Efficient bulk message handling
- Offset Management: Advanced partition and offset management
- Consumer Group Management: Full consumer group lifecycle management
📊 Monitoring & Health Checks
- Comprehensive Health Monitoring: Real-time health status of all services
- Performance Metrics: Request counts, response times, success rates
- Service Discovery: Automatic detection of available services
- Health History: Track health status over time
- Individual Health Checks: Granular health monitoring per service
- Metrics Collection: Detailed performance and usage metrics
🔧 Knox Gateway Integration
- Topology Management: Create and manage Knox topologies
- Service Configuration: Configure Kafka services through Knox
- Token Management: JWT token handling and validation
- Service Discovery: Automatic service endpoint discovery
- Health Monitoring: Knox-specific health checks
- Admin API: Full Knox Admin API integration
☁️ CDP Cloud Support
- CDP Proxy API: Integration with CDP proxy endpoints
- CDP Token Authentication: Support for CDP-specific tokens
- Service Health: CDP Cloud service health monitoring
- API Discovery: Automatic CDP API endpoint discovery
- Fallback Support: Graceful fallback between CDP and other methods
🛠️ Developer Experience
- Full Model Context Protocol compliance with 40+ tools
- Rich Metadata: Comprehensive error reporting and status information
- Flexible Configuration: YAML and environment variable configuration
- Comprehensive Logging: Structured logging for debugging and monitoring
- Multiple Transport Methods: Support for various Kafka access methods
- Graceful Degradation: Automatic fallback between different approaches
Limitations
Known Issues
- Admin Client:
kafka-pythonAdmin Client may fail in some environments (NodeNotReadyError) - Producer Timeouts: Message production may timeout in certain Cloudera environments
- MockSourceConnector: Cloudera MockSourceConnector may not produce messages reliably
- Knox Gateway: Requires proper configuration for full functionality
Workarounds
For reliable operations, use these alternatives:
# List topics
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Create topics
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic
# Produce messages
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
# Consume messages
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Quick Start
Get Up and Running
- Start the Docker Environment
git clone https://github.com/ibrooks/cdf-kafka-mcp-server.git
cd cdf-kafka-mcp-server
docker-compose up -d
- Verify Services
# Check Kafka
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Check SMM UI
curl -f http://localhost:9991/ || echo "SMM UI not ready yet"
- Add Test Data
echo '{"message": "Hello from Quick Start!", "timestamp": "2024-10-23T20:00:00Z"}' | \
docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic cursortest
- View in SMM UI
- Open http://localhost:9991/
- Login with
admin/admin123 - Navigate to the cursortest topic
- Test MCP Server (Optional)
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
uv run python -m cdf_kafka_mcp_server
Installation
Prerequisites
- Python 3.10 or higher
- Apache Kafka cluster
- Apache Knox Gateway (optional, for enterprise authentication)
Install from Source
git clone https://github.com/ibrooks/cdf-kafka-mcp-server.git
cd cdf-kafka-mcp-server
uv pip install -e .
Cloudera Agent Studio
For use with Cloudera Agent Studio:
{
"mcpServers": {
"cdf-kafka-mcp-server": {
"command": "uvx",
"args": ["--from", "git+https://github.com/ibrooks/cdf-kafka-mcp-server@main", "run-server"],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "kafka-broker1:9092,kafka-broker2:9092",
"KNOX_TOKEN": "<your_knox_bearer_token>",
"KNOX_GATEWAY": "https://knox-gateway.yourshere.cloudera.site:8443"
}
}
}
}
Configuration
Cloud Deployment Configurations
The project includes pre-configured templates for major cloud Kafka services:
AWS MSK (Managed Streaming for Kafka)
# Use AWS MSK configuration
cp config/kafka_config_aws_msk.yaml config/kafka_config.yaml
# Set AWS credentials
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_REGION="us-east-1"
Confluent Cloud
# Use Confluent Cloud configuration
cp config/kafka_config_confluent_cloud.yaml config/kafka_config.yaml
# Set Confluent credentials
export CONFLUENT_API_KEY="your-api-key"
export CONFLUENT_API_SECRET="your-api-secret"
Azure Event Hubs for Kafka
# Use Azure Event Hubs configuration
cp config/kafka_config_azure_eventhub.yaml config/kafka_config.yaml
# Set Azure connection string
export AZURE_EVENTHUB_CONNECTION_STRING="your-connection-string"
Generic Cloud Configuration
# Use generic cloud configuration
cp config/kafka_config_cloud.yaml config/kafka_config.yaml
# Set your cloud provider credentials
export KAFKA_SASL_USERNAME="your-username"
export KAFKA_SASL_PASSWORD="your-password"
Quick Setup Script
For easy cloud deployment setup, use the provided script:
# Make the script executable
chmod +x setup_cloud.sh
# Setup for your cloud provider
./setup_cloud.sh aws-msk
./setup_cloud.sh confluent-cloud
./setup_cloud.sh azure-eventhub
./setup_cloud.sh generic
Environment Variables Template
For cloud deployments, use the environment variables template:
# Copy and customize the template
cp config/env_cloud_template.txt .env
# Edit .env with your actual values
Cloud Deployment Considerations
When deploying to cloud environments, consider these important factors:
Security
- Use IAM Roles: For AWS MSK, prefer IAM roles over access keys
- Rotate Credentials: Regularly rotate API keys and tokens
- Network Security: Use VPCs and security groups appropriately
- Encryption: Ensure TLS/SSL is enabled for all connections
Performance
- Connection Pooling: Cloud services may have connection limits
- Retry Logic: Implement exponential backoff for retries
- Monitoring: Use cloud provider monitoring tools
- Scaling: Consider auto-scaling based on load
Cost Optimization
- Resource Sizing: Right-size your Kafka clusters
- Data Retention: Set appropriate retention policies
- Compression: Enable compression to reduce bandwidth costs
- Monitoring: Monitor usage to avoid unexpected charges
Reliability
- Multi-AZ: Deploy across multiple availability zones
- Backup: Implement proper backup strategies
- Disaster Recovery: Plan for disaster recovery scenarios
- Health Checks: Implement comprehensive health monitoring
Configuration File
Create a configuration file at config/kafka_config.yaml:
kafka:
bootstrap_servers: "localhost:9092"
client_id: "cdf-kafka-mcp-server"
security_protocol: "PLAINTEXT"
timeout: 30
knox:
gateway: "https://knox-gateway.example.com:8443"
token: "your-knox-token-here"
verify_ssl: true
service: "kafka"
Environment Variables
You can also configure the server using environment variables:
export KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
export KNOX_GATEWAY="https://knox-gateway.example.com:8443"
export KNOX_TOKEN="your-knox-token-here"
export KNOX_VERIFY_SSL="true"
Claude Desktop Integration
Add to your Claude Desktop configuration (claude_desktop_config.json):
{
"mcpServers": {
"cdf-kafka-mcp-server": {
"command": "cdf-kafka-mcp-server",
"args": ["--config", "./config/kafka_config.yaml"],
"env": {
"KNOX_GATEWAY": "https://knox-gateway.example.com:8443",
"KNOX_TOKEN": "your-knox-token-here"
}
}
}
}
Usage
Basic Setup
Direct Kafka Connection:
export KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
uv run python -m cdf_kafka_mcp_server
Knox Gateway (Production):
export KNOX_GATEWAY="https://your-knox-gateway:8444"
export KNOX_TOKEN="your-bearer-token-here"
export KNOX_SERVICE="kafka"
uv run python -m cdf_kafka_mcp_server
Configuration File:
uv run python -m cdf_kafka_mcp_server --config config/kafka_config.yaml
Authentication Methods
Knox Bearer Token (Recommended):
export KNOX_GATEWAY="https://knox-gateway.company.com:8444"
export KNOX_TOKEN="your-bearer-token-here"
export KNOX_SERVICE="kafka"
uv run python -m cdf_kafka_mcp_server
Direct Kafka Connection:
export KAFKA_BOOTSTRAP_SERVERS="kafka1:9092,kafka2:9092"
export KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export KAFKA_SASL_MECHANISM="PLAIN"
export KAFKA_SASL_USERNAME="your-username"
export KAFKA_SASL_PASSWORD="your-password"
uv run python -m cdf_kafka_mcp_server
Configuration
Environment Variables:
KAFKA_BOOTSTRAP_SERVERS- Kafka broker addresses (required)KNOX_GATEWAY- Knox Gateway URL (for Knox auth)KNOX_TOKEN- Bearer token (for Knox auth)MCP_LOG_LEVEL- Log level (default: INFO)
YAML Configuration:
kafka:
bootstrap_servers: "localhost:9092"
security_protocol: "PLAINTEXT"
knox:
gateway: "https://knox-gateway.company.com:8444"
token: "your-bearer-token-here"
service: "kafka"
Available MCP Tools
The server provides 40+ MCP tools for comprehensive Kafka operations:
🗂️ Topic Management (7 tools):
list_topics- List all Kafka topicscreate_topic- Enhanced: Create topics using multiple approaches (Knox, CDP, Connect, Admin)describe_topic- Get detailed topic informationdelete_topic- Delete a Kafka topictopic_exists- Check if a topic existsget_topic_partitions- Get topic partition countupdate_topic_config- Update topic configuration
📨 Message Operations (6 tools):
produce_message- Enhanced: Produce messages using multiple approaches (Direct, Knox, CDP, Connect)consume_messages- Consume messages from a topicget_topic_offsets- Get topic partition offsetsget_consumer_groups- List consumer groupsget_consumer_group_details- Get consumer group detailsreset_consumer_group_offsets- Reset consumer group offsets
🔌 Kafka Connect Management (15 tools):
list_connectors- List all connectorscreate_connector- Create a new connectorget_connector_status- Get connector statusdelete_connector- Delete a connectorpause_connector- Pause a connectorresume_connector- Resume a connectorlist_connector_plugins- List available pluginsvalidate_connector_config- Validate connector config
🔧 Knox Gateway Integration (7 tools):
test_knox_connection- Test Knox Gateway connectionget_knox_metadata- Get Knox Gateway metadataget_knox_gateway_info- New: Get Knox Gateway information and statuslist_knox_topologies- New: List all Knox topologiesget_knox_topology- New: Get specific Knox topology configurationcreate_knox_topology- New: Create a new Knox topology for Kafka servicesget_knox_service_health- New: Get health status of Knox servicesget_knox_service_urls- New: Get service URLs through Knox Gateway
☁️ CDP Cloud Integration (4 tools):
test_cdp_connection- New: Test connection to CDP Cloudget_cdp_apis- New: Get information about available CDP APIsget_cdp_service_health- New: Get health status of CDP servicesvalidate_cdp_token- New: Validate a CDP token
📊 Monitoring & Health Checks (5 tools):
get_health_status- New: Get comprehensive health status of all servicesget_health_summary- New: Get a summary of health statusget_health_history- New: Get health check historyget_service_metrics- New: Get service performance metricsrun_health_check- New: Run a specific health check
🔗 System Information (3 tools):
get_broker_info- Get Kafka broker informationget_cluster_metadata- Get cluster metadatatest_connection- Test Kafka connection
Usage Examples
Enhanced Topic Operations:
{"tool": "list_topics", "arguments": {}}
{"tool": "create_topic", "arguments": {"name": "user-events", "partitions": 3, "method": "auto"}}
{"tool": "describe_topic", "arguments": {"name": "user-events"}}
Enhanced Message Operations:
{"tool": "produce_message", "arguments": {"topic": "user-events", "value": "Hello Kafka!", "method": "auto"}}
{"tool": "consume_messages", "arguments": {"topic": "user-events", "max_count": 10}}
Knox Gateway Operations:
{"tool": "get_knox_gateway_info", "arguments": {}}
{"tool": "create_knox_topology", "arguments": {"topology_name": "kafka-topology", "kafka_brokers": ["broker1:9092"]}}
{"tool": "get_knox_service_health", "arguments": {"topology": "default"}}
CDP Cloud Operations:
{"tool": "test_cdp_connection", "arguments": {}}
{"tool": "get_cdp_apis", "arguments": {}}
{"tool": "validate_cdp_token", "arguments": {"token": "your-cdp-token"}}
Monitoring & Health Checks:
{"tool": "get_health_status", "arguments": {}}
{"tool": "get_health_summary", "arguments": {}}
{"tool": "run_health_check", "arguments": {"check_name": "kafka"}}
{"tool": "get_service_metrics", "arguments": {}}
Kafka Connect:
{"tool": "list_connectors", "arguments": {}}
{"tool": "create_connector", "arguments": {"name": "my-connector", "config": {...}}}
Claude Desktop Integration
{
"mcpServers": {
"cdf-kafka-mcp-server": {
"command": "uv",
"args": ["run", "python", "-m", "cdf_kafka_mcp_server"],
"env": {
"KNOX_GATEWAY": "https://your-knox-gateway:8444",
"KNOX_TOKEN": "your-bearer-token-here"
}
}
}
}
Visual Examples
Creating a Topic via SMM UI
The following screenshot shows how to create a new Kafka topic using the Streams Messaging Manager (SMM) web interface:

This example demonstrates the topic creation process in the SMM web interface, showing the topic name, partitions, replication factor, and configuration options.
Adding Data to a Topic
The following screenshot shows how to add data to a Kafka topic using the SMM web interface:

This example demonstrates the message production process in the SMM web interface, showing how to send messages to a Kafka topic with key-value pairs and headers.
Listing Topics via SMM UI
The following screenshot shows how to view and manage Kafka topics using the Streams Messaging Manager (SMM) web interface:

This example demonstrates the topic listing and management interface in SMM, showing all available topics with their configurations, partition counts, and status information.
Using MCP Tools vs. SMM UI
- SMM UI: Manual, interactive operations through web interface
- MCP Tools: Automated, programmatic operations through AI models
- Kafka CLI: Command-line operations for scripting and automation
Troubleshooting
Common Issues
MCP Server Won't Start:
# Check environment variables
echo $KAFKA_BOOTSTRAP_SERVERS
echo $KNOX_GATEWAY
echo $KNOX_TOKEN
# Test with debug logging
export MCP_LOG_LEVEL="DEBUG"
uv run python -m cdf_kafka_mcp_server
Knox Authentication Issues:
# Verify Knox Gateway is accessible
curl -k https://your-knox-gateway:8444/gateway/admin/v1/version
# Test with different SSL settings
export KNOX_VERIFY_SSL="false" # For testing
MCP Tools Not Working:
# Test connection first
# Use test_connection MCP tool
# Verify Kafka Connect is running
curl http://localhost:28083/connectors
Debug Mode
Enable debug logging:
export MCP_LOG_LEVEL="DEBUG"
uv run python -m cdf_kafka_mcp_server
Health Check Commands
# Check all services
docker-compose ps
# Check Kafka health
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Check SMM health
curl -f http://localhost:9991/
# Check Kafka Connect health
curl -f http://localhost:28083/connectors
Emergency Workarounds
If MCP server is not working:
- Use Kafka CLI for all operations
- Use SMM UI for visualization
- Use Kafka Connect REST API directly
- Check service logs for specific errors
Security Considerations
- All sensitive data (passwords, tokens, secrets) is automatically redacted in responses
- Knox tokens are cached and refreshed automatically
- TLS is enforced for Knox connections
- Configuration files should be secured with appropriate permissions
Cloud Testing
Quick Setup
Choose Cloud Provider:
# AWS MSK
./Testing/run_cloud_tests.sh aws-msk
# Confluent Cloud
./Testing/run_cloud_tests.sh confluent-cloud
# Azure Event Hubs
./Testing/run_cloud_tests.sh azure-eventhub
# CDP Cloud (Cloudera Data Platform)
./Testing/run_cloud_tests.sh cdp-cloud
# CDP Cloud MCP Tools Test (Comprehensive)
./Testing/run_cdp_cloud_tests.sh
# Generic SASL_SSL
./Testing/run_cloud_tests.sh generic
Set Environment Variables:
# AWS MSK Example
export KAFKA_BOOTSTRAP_SERVERS="your-msk-cluster.kafka.us-east-1.amazonaws.com:9092"
export KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export KAFKA_SASL_MECHANISM="SCRAM-SHA-512"
export KAFKA_SASL_USERNAME="your-iam-username"
export KAFKA_SASL_PASSWORD="your-iam-password"
# Optional Knox Gateway
export KNOX_GATEWAY="https://your-knox-gateway:8444"
export KNOX_TOKEN="your-bearer-token-here"
Run Tests:
# Run comprehensive cloud tests
./Testing/run_cloud_tests.sh aws-msk --debug
# Or run individual test script
uv run python3 Testing/test_cloud_connection.py
Cloud Configuration Files
config/kafka_config_aws_msk.yaml- AWS MSK configurationconfig/kafka_config_confluent_cloud.yaml- Confluent Cloud configurationconfig/kafka_config_azure_eventhub.yaml- Azure Event Hubs configurationconfig/kafka_config_cdp_cloud.yaml- CDP Cloud (Cloudera Data Platform) configurationconfig/kafka_config_cloud.yaml- Generic cloud configuration
Documentation
- - Comprehensive cloud testing guide
- - CDP Cloud specific testing guide
- Cloud Deployment Configurations - Configuration examples
Development
Project Structure
├── src/
│ └── cdf_kafka_mcp_server/ # Main package
│ ├── __init__.py # Package initialization
│ ├── main.py # CLI entry point
│ ├── config.py # Configuration management
│ ├── knox_client.py # Knox authentication
│ ├── kafka_client.py # Kafka client implementation
│ └── mcp_server.py # MCP server implementation
├── config/ # Configuration files
├── examples/ # Usage examples
├── Testing/ # Test files
├── pyproject.toml # Project configuration
└── README.md # Documentation
Building
# Install in development mode
pip install -e .
# Build package
python -m build
# Run tests
python -m pytest
# Format code
black src/ Testing/
isort src/ Testing/
# Lint code
flake8 src/ Testing/
mypy src/
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
License
Apache License 2.0
Acknowledgments
This project is inspired by the SSB-MCP-Server implementation, which provides excellent patterns for MCP server development and enterprise integration.