CDF-Kafka-MCP-Server

BrooksIan/CDF-Kafka-MCP-Server

3.2

If you are the rightful owner of CDF-Kafka-MCP-Server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.

The CDF Kafka MCP Server is a Model Context Protocol server designed to integrate AI models with Apache Kafka, featuring Apache Knox authentication for secure enterprise environments.

Tools
5
Resources
0
Prompts
0

CDF Kafka MCP Server

A Model Context Protocol (MCP) server for Apache Kafka with Apache Knox authentication support, inspired by the SSB-MCP-Server implementation.

Overview

The CDF Kafka MCP Server provides a comprehensive bridge between AI models and Apache Kafka clusters through the Model Context Protocol. It enables AI applications to interact with Kafka topics, produce and consume messages, and manage consumer groups through a secure, enterprise-ready interface with Apache Knox authentication.

📸 Visual Examples: This README includes screenshots showing how to create topics, add data, and list topics using the SMM (Streams Messaging Manager) web interface. See the Visual Examples section for step-by-step visual guides.

Features

🔐 Enterprise Security

  • Apache Knox Gateway authentication support with Admin API integration
  • CDP Cloud authentication with token and basic auth support
  • Multiple authentication methods (token-based, username/password, CDP tokens)
  • TLS/SSL configuration for secure communications
  • Configurable SSL certificate verification
  • Service discovery through Knox Gateway

🚀 Enhanced Kafka Operations

  • Multi-approach Topic Creation: Knox Gateway, CDP Cloud, Connect API, Admin Client
  • Multi-approach Message Production: Direct, Knox, CDP, Connect API
  • Advanced Topic Management: Create, list, describe, delete, configure with fallback methods
  • Message Operations: Produce, consume with metadata and multiple transport methods
  • Batch Message Production: Efficient bulk message handling
  • Offset Management: Advanced partition and offset management
  • Consumer Group Management: Full consumer group lifecycle management

📊 Monitoring & Health Checks

  • Comprehensive Health Monitoring: Real-time health status of all services
  • Performance Metrics: Request counts, response times, success rates
  • Service Discovery: Automatic detection of available services
  • Health History: Track health status over time
  • Individual Health Checks: Granular health monitoring per service
  • Metrics Collection: Detailed performance and usage metrics

🔧 Knox Gateway Integration

  • Topology Management: Create and manage Knox topologies
  • Service Configuration: Configure Kafka services through Knox
  • Token Management: JWT token handling and validation
  • Service Discovery: Automatic service endpoint discovery
  • Health Monitoring: Knox-specific health checks
  • Admin API: Full Knox Admin API integration

☁️ CDP Cloud Support

  • CDP Proxy API: Integration with CDP proxy endpoints
  • CDP Token Authentication: Support for CDP-specific tokens
  • Service Health: CDP Cloud service health monitoring
  • API Discovery: Automatic CDP API endpoint discovery
  • Fallback Support: Graceful fallback between CDP and other methods

🛠️ Developer Experience

  • Full Model Context Protocol compliance with 40+ tools
  • Rich Metadata: Comprehensive error reporting and status information
  • Flexible Configuration: YAML and environment variable configuration
  • Comprehensive Logging: Structured logging for debugging and monitoring
  • Multiple Transport Methods: Support for various Kafka access methods
  • Graceful Degradation: Automatic fallback between different approaches

Limitations

Known Issues

  • Admin Client: kafka-python Admin Client may fail in some environments (NodeNotReadyError)
  • Producer Timeouts: Message production may timeout in certain Cloudera environments
  • MockSourceConnector: Cloudera MockSourceConnector may not produce messages reliably
  • Knox Gateway: Requires proper configuration for full functionality

Workarounds

For reliable operations, use these alternatives:

# List topics
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# Create topics
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic

# Produce messages
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

# Consume messages
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

Quick Start

Get Up and Running

  1. Start the Docker Environment
git clone https://github.com/ibrooks/cdf-kafka-mcp-server.git
cd cdf-kafka-mcp-server
docker-compose up -d
  1. Verify Services
# Check Kafka
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# Check SMM UI
curl -f http://localhost:9991/ || echo "SMM UI not ready yet"
  1. Add Test Data
echo '{"message": "Hello from Quick Start!", "timestamp": "2024-10-23T20:00:00Z"}' | \
docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic cursortest
  1. View in SMM UI
  1. Test MCP Server (Optional)
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
uv run python -m cdf_kafka_mcp_server

Installation

Prerequisites

  • Python 3.10 or higher
  • Apache Kafka cluster
  • Apache Knox Gateway (optional, for enterprise authentication)

Install from Source

git clone https://github.com/ibrooks/cdf-kafka-mcp-server.git
cd cdf-kafka-mcp-server
uv pip install -e .

Cloudera Agent Studio

For use with Cloudera Agent Studio:

{
  "mcpServers": {
    "cdf-kafka-mcp-server": {
      "command": "uvx",
      "args": ["--from", "git+https://github.com/ibrooks/cdf-kafka-mcp-server@main", "run-server"],
      "env": {
        "KAFKA_BOOTSTRAP_SERVERS": "kafka-broker1:9092,kafka-broker2:9092",
        "KNOX_TOKEN": "<your_knox_bearer_token>",
        "KNOX_GATEWAY": "https://knox-gateway.yourshere.cloudera.site:8443"
      }
    }
  }
}

Configuration

Cloud Deployment Configurations

The project includes pre-configured templates for major cloud Kafka services:

AWS MSK (Managed Streaming for Kafka)
# Use AWS MSK configuration
cp config/kafka_config_aws_msk.yaml config/kafka_config.yaml

# Set AWS credentials
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_REGION="us-east-1"
Confluent Cloud
# Use Confluent Cloud configuration
cp config/kafka_config_confluent_cloud.yaml config/kafka_config.yaml

# Set Confluent credentials
export CONFLUENT_API_KEY="your-api-key"
export CONFLUENT_API_SECRET="your-api-secret"
Azure Event Hubs for Kafka
# Use Azure Event Hubs configuration
cp config/kafka_config_azure_eventhub.yaml config/kafka_config.yaml

# Set Azure connection string
export AZURE_EVENTHUB_CONNECTION_STRING="your-connection-string"
Generic Cloud Configuration
# Use generic cloud configuration
cp config/kafka_config_cloud.yaml config/kafka_config.yaml

# Set your cloud provider credentials
export KAFKA_SASL_USERNAME="your-username"
export KAFKA_SASL_PASSWORD="your-password"
Quick Setup Script

For easy cloud deployment setup, use the provided script:

# Make the script executable
chmod +x setup_cloud.sh

# Setup for your cloud provider
./setup_cloud.sh aws-msk
./setup_cloud.sh confluent-cloud
./setup_cloud.sh azure-eventhub
./setup_cloud.sh generic
Environment Variables Template

For cloud deployments, use the environment variables template:

# Copy and customize the template
cp config/env_cloud_template.txt .env
# Edit .env with your actual values

Cloud Deployment Considerations

When deploying to cloud environments, consider these important factors:

Security
  • Use IAM Roles: For AWS MSK, prefer IAM roles over access keys
  • Rotate Credentials: Regularly rotate API keys and tokens
  • Network Security: Use VPCs and security groups appropriately
  • Encryption: Ensure TLS/SSL is enabled for all connections
Performance
  • Connection Pooling: Cloud services may have connection limits
  • Retry Logic: Implement exponential backoff for retries
  • Monitoring: Use cloud provider monitoring tools
  • Scaling: Consider auto-scaling based on load
Cost Optimization
  • Resource Sizing: Right-size your Kafka clusters
  • Data Retention: Set appropriate retention policies
  • Compression: Enable compression to reduce bandwidth costs
  • Monitoring: Monitor usage to avoid unexpected charges
Reliability
  • Multi-AZ: Deploy across multiple availability zones
  • Backup: Implement proper backup strategies
  • Disaster Recovery: Plan for disaster recovery scenarios
  • Health Checks: Implement comprehensive health monitoring

Configuration File

Create a configuration file at config/kafka_config.yaml:

kafka:
  bootstrap_servers: "localhost:9092"
  client_id: "cdf-kafka-mcp-server"
  security_protocol: "PLAINTEXT"
  timeout: 30

knox:
  gateway: "https://knox-gateway.example.com:8443"
  token: "your-knox-token-here"
  verify_ssl: true
  service: "kafka"

Environment Variables

You can also configure the server using environment variables:

export KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
export KNOX_GATEWAY="https://knox-gateway.example.com:8443"
export KNOX_TOKEN="your-knox-token-here"
export KNOX_VERIFY_SSL="true"

Claude Desktop Integration

Add to your Claude Desktop configuration (claude_desktop_config.json):

{
  "mcpServers": {
    "cdf-kafka-mcp-server": {
      "command": "cdf-kafka-mcp-server",
      "args": ["--config", "./config/kafka_config.yaml"],
      "env": {
        "KNOX_GATEWAY": "https://knox-gateway.example.com:8443",
        "KNOX_TOKEN": "your-knox-token-here"
      }
    }
  }
}

Usage

Basic Setup

Direct Kafka Connection:

export KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
uv run python -m cdf_kafka_mcp_server

Knox Gateway (Production):

export KNOX_GATEWAY="https://your-knox-gateway:8444"
export KNOX_TOKEN="your-bearer-token-here"
export KNOX_SERVICE="kafka"
uv run python -m cdf_kafka_mcp_server

Configuration File:

uv run python -m cdf_kafka_mcp_server --config config/kafka_config.yaml

Authentication Methods

Knox Bearer Token (Recommended):

export KNOX_GATEWAY="https://knox-gateway.company.com:8444"
export KNOX_TOKEN="your-bearer-token-here"
export KNOX_SERVICE="kafka"
uv run python -m cdf_kafka_mcp_server

Direct Kafka Connection:

export KAFKA_BOOTSTRAP_SERVERS="kafka1:9092,kafka2:9092"
export KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export KAFKA_SASL_MECHANISM="PLAIN"
export KAFKA_SASL_USERNAME="your-username"
export KAFKA_SASL_PASSWORD="your-password"
uv run python -m cdf_kafka_mcp_server

Configuration

Environment Variables:

  • KAFKA_BOOTSTRAP_SERVERS - Kafka broker addresses (required)
  • KNOX_GATEWAY - Knox Gateway URL (for Knox auth)
  • KNOX_TOKEN - Bearer token (for Knox auth)
  • MCP_LOG_LEVEL - Log level (default: INFO)

YAML Configuration:

kafka:
  bootstrap_servers: "localhost:9092"
  security_protocol: "PLAINTEXT"

knox:
  gateway: "https://knox-gateway.company.com:8444"
  token: "your-bearer-token-here"
  service: "kafka"

Available MCP Tools

The server provides 40+ MCP tools for comprehensive Kafka operations:

🗂️ Topic Management (7 tools):

  • list_topics - List all Kafka topics
  • create_topic - Enhanced: Create topics using multiple approaches (Knox, CDP, Connect, Admin)
  • describe_topic - Get detailed topic information
  • delete_topic - Delete a Kafka topic
  • topic_exists - Check if a topic exists
  • get_topic_partitions - Get topic partition count
  • update_topic_config - Update topic configuration

📨 Message Operations (6 tools):

  • produce_message - Enhanced: Produce messages using multiple approaches (Direct, Knox, CDP, Connect)
  • consume_messages - Consume messages from a topic
  • get_topic_offsets - Get topic partition offsets
  • get_consumer_groups - List consumer groups
  • get_consumer_group_details - Get consumer group details
  • reset_consumer_group_offsets - Reset consumer group offsets

🔌 Kafka Connect Management (15 tools):

  • list_connectors - List all connectors
  • create_connector - Create a new connector
  • get_connector_status - Get connector status
  • delete_connector - Delete a connector
  • pause_connector - Pause a connector
  • resume_connector - Resume a connector
  • list_connector_plugins - List available plugins
  • validate_connector_config - Validate connector config

🔧 Knox Gateway Integration (7 tools):

  • test_knox_connection - Test Knox Gateway connection
  • get_knox_metadata - Get Knox Gateway metadata
  • get_knox_gateway_info - New: Get Knox Gateway information and status
  • list_knox_topologies - New: List all Knox topologies
  • get_knox_topology - New: Get specific Knox topology configuration
  • create_knox_topology - New: Create a new Knox topology for Kafka services
  • get_knox_service_health - New: Get health status of Knox services
  • get_knox_service_urls - New: Get service URLs through Knox Gateway

☁️ CDP Cloud Integration (4 tools):

  • test_cdp_connection - New: Test connection to CDP Cloud
  • get_cdp_apis - New: Get information about available CDP APIs
  • get_cdp_service_health - New: Get health status of CDP services
  • validate_cdp_token - New: Validate a CDP token

📊 Monitoring & Health Checks (5 tools):

  • get_health_status - New: Get comprehensive health status of all services
  • get_health_summary - New: Get a summary of health status
  • get_health_history - New: Get health check history
  • get_service_metrics - New: Get service performance metrics
  • run_health_check - New: Run a specific health check

🔗 System Information (3 tools):

  • get_broker_info - Get Kafka broker information
  • get_cluster_metadata - Get cluster metadata
  • test_connection - Test Kafka connection

Usage Examples

Enhanced Topic Operations:

{"tool": "list_topics", "arguments": {}}
{"tool": "create_topic", "arguments": {"name": "user-events", "partitions": 3, "method": "auto"}}
{"tool": "describe_topic", "arguments": {"name": "user-events"}}

Enhanced Message Operations:

{"tool": "produce_message", "arguments": {"topic": "user-events", "value": "Hello Kafka!", "method": "auto"}}
{"tool": "consume_messages", "arguments": {"topic": "user-events", "max_count": 10}}

Knox Gateway Operations:

{"tool": "get_knox_gateway_info", "arguments": {}}
{"tool": "create_knox_topology", "arguments": {"topology_name": "kafka-topology", "kafka_brokers": ["broker1:9092"]}}
{"tool": "get_knox_service_health", "arguments": {"topology": "default"}}

CDP Cloud Operations:

{"tool": "test_cdp_connection", "arguments": {}}
{"tool": "get_cdp_apis", "arguments": {}}
{"tool": "validate_cdp_token", "arguments": {"token": "your-cdp-token"}}

Monitoring & Health Checks:

{"tool": "get_health_status", "arguments": {}}
{"tool": "get_health_summary", "arguments": {}}
{"tool": "run_health_check", "arguments": {"check_name": "kafka"}}
{"tool": "get_service_metrics", "arguments": {}}

Kafka Connect:

{"tool": "list_connectors", "arguments": {}}
{"tool": "create_connector", "arguments": {"name": "my-connector", "config": {...}}}

Claude Desktop Integration

{
  "mcpServers": {
    "cdf-kafka-mcp-server": {
      "command": "uv",
      "args": ["run", "python", "-m", "cdf_kafka_mcp_server"],
      "env": {
        "KNOX_GATEWAY": "https://your-knox-gateway:8444",
        "KNOX_TOKEN": "your-bearer-token-here"
      }
    }
  }
}

Visual Examples

Creating a Topic via SMM UI

The following screenshot shows how to create a new Kafka topic using the Streams Messaging Manager (SMM) web interface:

Add Topic

This example demonstrates the topic creation process in the SMM web interface, showing the topic name, partitions, replication factor, and configuration options.

Adding Data to a Topic

The following screenshot shows how to add data to a Kafka topic using the SMM web interface:

Add Data to Topic

This example demonstrates the message production process in the SMM web interface, showing how to send messages to a Kafka topic with key-value pairs and headers.

Listing Topics via SMM UI

The following screenshot shows how to view and manage Kafka topics using the Streams Messaging Manager (SMM) web interface:

List Topics

This example demonstrates the topic listing and management interface in SMM, showing all available topics with their configurations, partition counts, and status information.

Using MCP Tools vs. SMM UI

  • SMM UI: Manual, interactive operations through web interface
  • MCP Tools: Automated, programmatic operations through AI models
  • Kafka CLI: Command-line operations for scripting and automation

Troubleshooting

Common Issues

MCP Server Won't Start:

# Check environment variables
echo $KAFKA_BOOTSTRAP_SERVERS
echo $KNOX_GATEWAY
echo $KNOX_TOKEN

# Test with debug logging
export MCP_LOG_LEVEL="DEBUG"
uv run python -m cdf_kafka_mcp_server

Knox Authentication Issues:

# Verify Knox Gateway is accessible
curl -k https://your-knox-gateway:8444/gateway/admin/v1/version

# Test with different SSL settings
export KNOX_VERIFY_SSL="false"  # For testing

MCP Tools Not Working:

# Test connection first
# Use test_connection MCP tool

# Verify Kafka Connect is running
curl http://localhost:28083/connectors

Debug Mode

Enable debug logging:

export MCP_LOG_LEVEL="DEBUG"
uv run python -m cdf_kafka_mcp_server

Health Check Commands

# Check all services
docker-compose ps

# Check Kafka health
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# Check SMM health
curl -f http://localhost:9991/

# Check Kafka Connect health
curl -f http://localhost:28083/connectors

Emergency Workarounds

If MCP server is not working:

  1. Use Kafka CLI for all operations
  2. Use SMM UI for visualization
  3. Use Kafka Connect REST API directly
  4. Check service logs for specific errors

Security Considerations

  • All sensitive data (passwords, tokens, secrets) is automatically redacted in responses
  • Knox tokens are cached and refreshed automatically
  • TLS is enforced for Knox connections
  • Configuration files should be secured with appropriate permissions

Cloud Testing

Quick Setup

Choose Cloud Provider:

# AWS MSK
./Testing/run_cloud_tests.sh aws-msk

# Confluent Cloud
./Testing/run_cloud_tests.sh confluent-cloud

# Azure Event Hubs
./Testing/run_cloud_tests.sh azure-eventhub

# CDP Cloud (Cloudera Data Platform)
./Testing/run_cloud_tests.sh cdp-cloud

# CDP Cloud MCP Tools Test (Comprehensive)
./Testing/run_cdp_cloud_tests.sh

# Generic SASL_SSL
./Testing/run_cloud_tests.sh generic

Set Environment Variables:

# AWS MSK Example
export KAFKA_BOOTSTRAP_SERVERS="your-msk-cluster.kafka.us-east-1.amazonaws.com:9092"
export KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export KAFKA_SASL_MECHANISM="SCRAM-SHA-512"
export KAFKA_SASL_USERNAME="your-iam-username"
export KAFKA_SASL_PASSWORD="your-iam-password"

# Optional Knox Gateway
export KNOX_GATEWAY="https://your-knox-gateway:8444"
export KNOX_TOKEN="your-bearer-token-here"

Run Tests:

# Run comprehensive cloud tests
./Testing/run_cloud_tests.sh aws-msk --debug

# Or run individual test script
uv run python3 Testing/test_cloud_connection.py

Cloud Configuration Files

  • config/kafka_config_aws_msk.yaml - AWS MSK configuration
  • config/kafka_config_confluent_cloud.yaml - Confluent Cloud configuration
  • config/kafka_config_azure_eventhub.yaml - Azure Event Hubs configuration
  • config/kafka_config_cdp_cloud.yaml - CDP Cloud (Cloudera Data Platform) configuration
  • config/kafka_config_cloud.yaml - Generic cloud configuration

Documentation

Development

Project Structure

├── src/
│   └── cdf_kafka_mcp_server/    # Main package
│       ├── __init__.py          # Package initialization
│       ├── main.py              # CLI entry point
│       ├── config.py            # Configuration management
│       ├── knox_client.py       # Knox authentication
│       ├── kafka_client.py      # Kafka client implementation
│       └── mcp_server.py        # MCP server implementation
├── config/                      # Configuration files
├── examples/                    # Usage examples
├── Testing/                     # Test files
├── pyproject.toml              # Project configuration
└── README.md                   # Documentation

Building

# Install in development mode
pip install -e .

# Build package
python -m build

# Run tests
python -m pytest

# Format code
black src/ Testing/
isort src/ Testing/

# Lint code
flake8 src/ Testing/
mypy src/

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests if applicable
  5. Submit a pull request

License

Apache License 2.0

Acknowledgments

This project is inspired by the SSB-MCP-Server implementation, which provides excellent patterns for MCP server development and enterprise integration.