kafka-mcp-server

arabaaoui/kafka-mcp-server

3.2

If you are the rightful owner of kafka-mcp-server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.

The Kafka MCP Server v2.0 is a modern Model Context Protocol server designed for Kafka, leveraging Spring Boot 3.2 and Spring AI MCP for efficient and scalable operations.

Tools
24
Resources
0
Prompts
0

🛠️ Outils MCP Disponibles

Total : 22 outils répartis en 7 catégories

📁 Topics (4 outils)

OutilDescriptionParamètres
listTopicsListe tous les topics-
describeTopicDécrit un topic spécifiquetopic (string)
createTopicCrée un nouveau topictopic, partitions, replicationFactor
deleteTopicSupprime un topictopic (string)

💬 Messages (2 outils)

OutilDescriptionParamètres
consumeMessagesConsomme des messages d'un topictopic, maxMessages?, timeout?
produceMessageProduit un message vers un topictopic, key?, value

👥 Consumer Groups (2 outils)

OutilDescriptionParamètres
listConsumerGroupsListe tous les consumer groups-
describeConsumerGroupDécrit un consumer groupgroup (string)

📊 Cluster Monitoring (5 outils)

OutilDescriptionParamètres
getClusterInfoInformations du cluster (brokers, controller)-
describeBrokerConfiguration d'un brokerbrokerId (number)
getClusterMetricsMétriques de santé du cluster-
analyzeTopicPartitionsAnalyse des partitions avec offsetstopic (string)
checkClusterHealthDiagnostic complet avec recommandations-

🔍 Cluster Mode & Platform (1 outil) 🆕

OutilDescriptionParamètres
detectClusterModeDétecte KRaft/Zookeeper + plateforme (Strimzi/Confluent/Apache/MSK)-

📈 Prometheus Metrics (2 outils) 🆕

OutilDescriptionParamètres
getPrometheusMetricsMétriques Kafka depuis Prometheus-
getBrokerPrometheusMetricsMétriques broker (throughput, CPU, RAM)brokerId (number)

📋 Schema Registry (8 outils) 🆕

OutilDescriptionParamètres
getSchemaRegistryInfoInfo du Schema Registry-
listSchemaSubjectsListe tous les sujets-
getSchemaDetailsDétails d'un schémasubject, version?
listSchemaVersionsToutes les versions d'un sujetsubject
registerSchemaEnregistre un nouveau schémasubject, schema, schemaType?
checkSchemaCompatibilityVérifie la compatibilitésubject, schema, version?
deleteSchemaVersionSupprime une versionsubject, version
deleteSchemaSubjectSupprime un sujet completsubject

⚠️ Erreur commune: "Unexpected end of JSON input"

Symptôme :

SyntaxError: Unexpected token '.', "  .   ____  "... is not valid JSON

Cause : Le banner Spring Boot ou des logs polluent stdout.

Solution : ✅ Déjà corrigé dans ce projet via :

  • spring.main.banner-mode: off dans `application# Kafka MCP Server v2.0 - Spring Boot Edition

Un serveur MCP (Model Context Protocol) moderne pour Kafka, développé avec Spring Boot 3.2 et Spring AI MCP.

🚀 Nouveautés v2.0

  • Spring Boot 3.2 - Architecture moderne et performante
  • Spring AI MCP - Annotations @Tool pour une intégration native
  • Auto-configuration - Configuration simplifiée via application.yml
  • Type-safe DTOs - Réponses structurées avec Lombok
  • Logging avancé - Traçabilité complète des opérations
  • Container optimisé - Multi-stage build pour un Docker léger

📋 Prérequis

  • Java 17+ (pour développement local)
  • Maven 3.9+ (pour build local)
  • Docker (pour déploiement containerisé)
  • Accès réseau au cluster Kafka/Strimzi

🏗️ Architecture du Projet

Structure des dossiers

kafka-mcp-server/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── carrefour/
│   │   │           └── kafka/
│   │   │               └── mcp/
│   │   │                   ├── KafkaMcpServerApplication.java  # Point d'entrée Spring Boot
│   │   │                   ├── config/
│   │   │                   │   └── KafkaConfig.java           # Configuration Kafka (Admin, Producer, Consumer)
│   │   │                   ├── tools/
│   │   │                   │   └── KafkaMcpTools.java         # Services MCP avec @Tool annotations
│   │   │                   └── dto/
│   │   │                       └── *.java                     # DTOs pour les réponses (Lombok)
│   │   └── resources/
│   │       └── application.yml                                # Configuration Spring Boot
│   └── test/
│       └── java/
│           └── com/carrefour/kafka/mcp/
│               └── KafkaMcpToolsTest.java                     # Tests unitaires
├── Dockerfile                                                 # Multi-stage Docker build
├── pom.xml                                                    # Dépendances Maven
├── docker-compose.yml                                         # Configuration Docker Compose
└── README.md

Architecture en couches

┌─────────────────────────────────────────┐
│         Claude Desktop / API            │
│      (Client MCP via stdio)             │
└────────────────┬────────────────────────┘
                 │ JSON-RPC over stdio
                 ▼
┌─────────────────────────────────────────┐
│       Spring AI MCP Framework           │
│  - Protocol handling (initialize, etc)  │
│  - Tool discovery & invocation          │
│  - JSON serialization                   │
└────────────────┬────────────────────────┘
                 │ @Tool methods
                 ▼
┌─────────────────────────────────────────┐
│         KafkaMcpTools Service           │
│  @Tool kafka_list_topics()              │
│  @Tool kafka_describe_topic()           │
│  @Tool kafka_create_topic()             │
│  @Tool kafka_consume_messages()         │
│  @Tool kafka_produce_message()          │
│  etc...                                 │
└────────────────┬────────────────────────┘
                 │ Uses
                 ▼
┌─────────────────────────────────────────┐
│     Spring Kafka Components             │
│  - KafkaAdmin (topic management)        │
│  - KafkaTemplate (producing)            │
│  - KafkaConsumer (consuming)            │
│  - AdminClient (admin operations)       │
└────────────────┬────────────────────────┘
                 │ Kafka Protocol
                 ▼
┌─────────────────────────────────────────┐
│      Kafka/Strimzi Cluster              │
│  bootstrap.dev.kafkahub...9431          │
└─────────────────────────────────────────┘

🔧 Composants Principaux

1. KafkaMcpServerApplication

Point d'entrée Spring Boot avec annotations :

  • @SpringBootApplication - Auto-configuration
  • @EnableKafka - Active le support Kafka

2. KafkaConfig

Configuration centralisée Kafka :

  • KafkaAdmin : Opérations administratives (create/delete topics, describe groups)
  • ProducerFactory : Factory pour les producteurs de messages
  • ConsumerFactory : Factory pour les consommateurs de messages
  • KafkaTemplate : Template Spring pour simplifier la production
  • Support SASL/SSL : Configuration sécurité via properties

3. KafkaMcpTools

Service principal avec annotations @Tool Spring AI :

@Tool(name = "kafka_list_topics",
        description = "Liste tous les topics Kafka disponibles")
public TopicListResponse listTopics() { ... }

@Tool(name = "kafka_consume_messages",
        description = "Consomme des messages d'un topic")
public MessageListResponse consumeMessages(
@ToolParameter(description = "Nom du topic", required = true) String topic,
@ToolParameter(description = "Max messages", required = false) Integer maxMessages
        ) { ... }

Caractéristiques :

  • Injection automatique des dépendances via @RequiredArgsConstructor
  • Logging détaillé avec SLF4J
  • Gestion d'erreurs robuste avec try-catch
  • Retour de DTOs structurés

4. DTOs (Data Transfer Objects)

Classes immutables avec Lombok :

  • @Data - Getters/setters automatiques
  • @Builder - Pattern builder fluide
  • @NoArgsConstructor / @AllArgsConstructor - Constructeurs

Exemples :

  • TopicListResponse - Liste de topics
  • MessageInfo - Détails d'un message
  • ConsumerGroupDescriptionResponse - État d'un consumer group

🚀 Installation et Build

⚠️ IMPORTANT - Protocole MCP et stdout

Le protocole MCP utilise stdout exclusivement pour JSON-RPC. Tous les logs doivent aller sur stderr.

Déjà configuré dans le projet :

  • Banner Spring Boot désactivé (banner-mode: off)
  • Logs redirigés vers stderr via logback-spring.xml
  • Aucune sortie console sur stdout

Option 1: Build avec Docker (recommandé)

# Clone le repository
git clone <your-repo>
cd kafka-mcp-server

# Build l'image Docker
docker build -t kafka-mcp-server:2.0.0 .

# L'image finale fait ~150MB (optimisée avec Alpine)

Option 2: Test du build

# Exécuter le script de test (Linux/Mac)
chmod +x test-mcp-server.sh
./test-mcp-server.sh

# Le script vérifie:
# ✅ stdout contient uniquement du JSON
# ✅ 8 outils MCP sont exposés
# ✅ Connexion Kafka fonctionne

Option 3: Build avec Maven

# Build le projet
mvn clean package

# Le JAR sera dans target/kafka-mcp-server-2.0.0.jar
java -jar target/kafka-mcp-server-2.0.0.jar

Option 4: Développement local

# Avec Maven Wrapper
./mvnw spring-boot:run

# Ou avec Maven installé
mvn spring-boot:run

📦 Configuration

Variables d'environnement

VariableDescriptionDéfautObligatoire
KAFKA_BOOTSTRAP_SERVERSAdresses des brokers Kafkalocalhost:9092
KAFKA_SECURITY_PROTOCOLProtocole de sécuritéPLAINTEXT
KAFKA_SASL_MECHANISMMécanisme SASL-
KAFKA_SASL_JAAS_CONFIGConfiguration JAAS-

application.yml

kafka:
  bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
  security-protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
  sasl-mechanism: ${KAFKA_SASL_MECHANISM:}
  sasl-jaas-config: ${KAFKA_SASL_JAAS_CONFIG:}

spring:
  ai:
    mcp:
      server:
        enabled: true
        transport: stdio
        name: kafka-carrefour

🎯 Utilisation

1. Démarrage avec Docker

Configuration de base (PLAINTEXT)
docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431 \
  kafka-mcp-server:2.0.0
Configuration avec SASL/SSL
docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=broker.prod.kafka.com:9093 \
  -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
  -e KAFKA_SASL_MECHANISM=SCRAM-SHA-512 \
  -e KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="pass";' \
  kafka-mcp-server:2.0.0

2. Configuration Claude Desktop

Éditez ~/.config/Claude/claude_desktop_config.json (Linux/Mac) ou %APPDATA%\Claude\claude_desktop_config.json (Windows) :

Configuration Linux/Mac
{
  "mcpServers": {
    "kafka-carrefour": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "--network=host",
        "-e",
        "KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431",
        "kafka-mcp-server:2.0.0"
      ]
    }
  }
}
Configuration Windows avec WSL
{
  "mcpServers": {
    "kafka-carrefour": {
      "command": "wsl",
      "args": [
        "--",
        "docker",
        "run",
        "-i",
        "--rm",
        "--network=host",
        "-e",
        "KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431",
        "kafka-mcp-server:2.0.0"
      ]
    }
  }
}

3. Redémarrage de Claude Desktop

# Sur Linux
killall claude
claude &

# Sur Mac
killall Claude
open -a Claude

# Sur Windows - relancer l'application manuellement

4. Vérification de la connexion

Dans Claude, demandez :

Quels sont les outils Kafka disponibles ?

Vous devriez voir les 8 outils MCP listés.

🛠️ Outils MCP Disponibles

Topics

1. kafka_list_topics

Liste tous les topics du cluster.

Paramètres : Aucun

Exemple Claude :

Liste-moi tous les topics Kafka

Réponse :

{
  "topics": ["000", "002", "010", "transactions"],
  "count": 4
}

2. kafka_describe_topic

Décrit un topic avec partitions, replicas, et configuration.

Paramètres :

  • topic (string, requis) : Nom du topic

Exemple Claude :

Décris-moi le topic '000' en détail

Réponse :

{
  "name": "000",
  "partitions": [
    {
      "partition": 0,
      "leader": 1,
      "replicas": [1, 2, 3],
      "isr": [1, 2, 3]
    }
  ],
  "partitionCount": 3,
  "isInternal": false,
  "configurations": {
    "retention.ms": "604800000",
    "segment.bytes": "1073741824"
  }
}

3. kafka_create_topic

Crée un nouveau topic.

Paramètres :

  • topic (string, requis) : Nom du topic
  • partitions (number, optionnel) : Nombre de partitions (défaut: 1)
  • replicationFactor (number, optionnel) : Facteur de réplication (défaut: 1)

Exemple Claude :

Crée un topic 'test-events' avec 3 partitions et un facteur de réplication de 2

Réponse :

{
  "topic": "test-events",
  "created": true,
  "partitions": 3,
  "replicationFactor": 2,
  "message": "Topic created successfully"
}

4. kafka_delete_topic

Supprime un topic existant.

Paramètres :

  • topic (string, requis) : Nom du topic

Exemple Claude :

Supprime le topic 'old-logs'

Messages

5. kafka_consume_messages

Consomme des messages d'un topic.

Paramètres :

  • topic (string, requis) : Nom du topic
  • maxMessages (number, optionnel) : Nombre max de messages (défaut: 10)
  • timeout (number, optionnel) : Timeout en ms (défaut: 5000)

Exemple Claude :

Consomme 20 messages du topic 'transactions'

Réponse :

{
  "topic": "transactions",
  "messages": [
    {
      "topic": "transactions",
      "partition": 0,
      "offset": 1234,
      "key": "user-123",
      "value": "{\"amount\": 99.99}",
      "timestamp": 1698765432000
    }
  ],
  "count": 20
}

6. kafka_produce_message

Produit un message vers un topic.

Paramètres :

  • topic (string, requis) : Nom du topic
  • key (string, optionnel) : Clé du message
  • value (string, requis) : Valeur du message

Exemple Claude :

Envoie un message de test avec la clé 'test-1' et la valeur '{"status":"ok"}' vers le topic 'events'

Réponse :

{
  "topic": "events",
  "partition": 2,
  "offset": 5678,
  "timestamp": 1698765432000,
  "success": true,
  "message": "Message produced successfully"
}

Consumer Groups

7. kafka_list_consumer_groups

Liste tous les consumer groups.

Paramètres : Aucun

Exemple Claude :

Quels sont les consumer groups actifs ?

Réponse :

{
  "groups": ["app-consumer", "analytics-group"],
  "count": 2
}

8. kafka_describe_consumer_group

Décrit un consumer group avec ses membres.

Paramètres :

  • group (string, requis) : Nom du consumer group

Exemple Claude :

Décris le consumer group 'app-consumer'

Réponse :

{
  "groupId": "app-consumer",
  "state": "Stable",
  "members": [
    {
      "memberId": "consumer-1-abc123",
      "clientId": "consumer-1",
      "host": "/192.168.1.10",
      "assignment": ["events-0", "events-1"]
    }
  ],
  "memberCount": 1,
  "coordinator": 1
}

🧪 Tests et Debugging

Test manuel avec JSON-RPC

# Test d'initialisation
echo '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}},"id":1}' | \
docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
  kafka-mcp-server:2.0.0

# Test de listing des tools
echo '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":2}' | \
docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
  kafka-mcp-server:2.0.0

# Test d'appel d'un tool
echo '{"jsonrpc":"2.0","method":"tools/call","params":{"name":"kafka_list_topics","arguments":{}},"id":3}' | \
docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
  kafka-mcp-server:2.0.0

Logs détaillés

# Capture des logs dans un fichier
docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
  kafka-mcp-server:2.0.0 2>&1 | tee mcp-server.log

# Les logs incluent :
# - Connexion au cluster Kafka
# - Découverte des tools MCP
# - Exécution des commandes
# - Erreurs détaillées

Tests unitaires

# Exécution des tests
mvn test

# Tests avec couverture
mvn test jacoco:report

Vérification de connectivité Kafka

# Test avec kafka-broker-api-versions
docker run --rm confluentinc/cp-kafka:latest \
  kafka-broker-api-versions \
  --bootstrap-server bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431

# Test avec kafka-topics
docker run --rm confluentinc/cp-kafka:latest \
  kafka-topics --list \
  --bootstrap-server bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431

🔐 Sécurité

Configuration SASL/PLAIN

docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=broker:9093 \
  -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
  -e KAFKA_SASL_MECHANISM=PLAIN \
  -e KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";' \
  kafka-mcp-server:2.0.0

Configuration SASL/SCRAM

docker run -i --rm \
  -e KAFKA_BOOTSTRAP_SERVERS=broker:9093 \
  -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
  -e KAFKA_SASL_MECHANISM=SCRAM-SHA-512 \
  -e KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="secret";' \
  kafka-mcp-server:2.0.0

Avec certificats SSL (montage de volumes)

docker run -i --rm \
  -v /path/to/certs:/certs:ro \
  -e KAFKA_BOOTSTRAP_SERVERS=broker:9093 \
  -e KAFKA_SECURITY_PROTOCOL=SSL \
  -e KAFKA_SSL_TRUSTSTORE_LOCATION=/certs/truststore.jks \
  -e KAFKA_SSL_TRUSTSTORE_PASSWORD=trustpass \
  -e KAFKA_SSL_KEYSTORE_LOCATION=/certs/keystore.jks \
  -e KAFKA_SSL_KEYSTORE_PASSWORD=keypass \
  kafka-mcp-server:2.0.0

🐳 Docker Compose

Créez docker-compose.yml pour un déploiement simplifié :

version: '3.8'

services:
  kafka-mcp-server:
    build: .
    image: kafka-mcp-server:2.0.0
    stdin_open: true
    tty: true
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431
      - KAFKA_SECURITY_PROTOCOL=PLAINTEXT
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge
docker-compose up -d
docker-compose logs -f kafka-mcp-server

📊 Monitoring et Observabilité

Métriques Spring Boot Actuator

Ajoutez dans pom.xml :

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Ajoutez dans application.yml :

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics
  metrics:
    export:
      prometheus:
        enabled: true

Logs structurés

Configuration Logback (logback-spring.xml) :

<configuration>
    <appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
    </appender>
    <root level="INFO">
        <appender-ref ref="JSON"/>
    </root>
</configuration>

🚀 Cas d'usage avancés

1. Monitoring de la santé du cluster

Claude, analyse l'état de santé du cluster Kafka :
1. Liste tous les topics
2. Pour chaque topic, vérifie s'il a des partitions offline
3. Liste les consumer groups et leur état
4. Identifie les groupes en erreur

2. Migration de données

Claude, copie les 1000 derniers messages du topic 'prod-events' vers 'backup-events'

3. Analyse de messages

Claude, consomme 100 messages du topic 'logs', analyse les erreurs et donne-moi un résumé

4. Création d'environnement de test

Claude, crée 5 topics de test (test-1 à test-5) avec 3 partitions chacun, puis envoie 10 messages de test dans chacun

🔄 Roadmap

  • Support des Kafka Streams
  • Gestion des Schema Registry (Avro/Protobuf)
  • Monitoring avancé avec Prometheus
  • Support des transactions Kafka
  • Interface REST additionnelle
  • Support Kubernetes/Helm charts

🤝 Contribution

# Fork le projet
git checkout -b feature/ma-fonctionnalite
mvn test
git commit -m "Ajout de ma fonctionnalité"
git push origin feature/ma-fonctionnalite
# Créer une Pull Request

📝 Licence

Ce projet est sous licence MIT.

🆘 Support

  • Issues GitHub : Pour les bugs et feature requests
  • Documentation : Ce README et les JavaDoc
  • Logs : Activez le niveau DEBUG pour plus de détails

🎓 Ressources