arabaaoui/kafka-mcp-server
If you are the rightful owner of kafka-mcp-server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
The Kafka MCP Server v2.0 is a modern Model Context Protocol server designed for Kafka, leveraging Spring Boot 3.2 and Spring AI MCP for efficient and scalable operations.
🛠️ Outils MCP Disponibles
Total : 22 outils répartis en 7 catégories
📁 Topics (4 outils)
| Outil | Description | Paramètres |
|---|---|---|
listTopics | Liste tous les topics | - |
describeTopic | Décrit un topic spécifique | topic (string) |
createTopic | Crée un nouveau topic | topic, partitions, replicationFactor |
deleteTopic | Supprime un topic | topic (string) |
💬 Messages (2 outils)
| Outil | Description | Paramètres |
|---|---|---|
consumeMessages | Consomme des messages d'un topic | topic, maxMessages?, timeout? |
produceMessage | Produit un message vers un topic | topic, key?, value |
👥 Consumer Groups (2 outils)
| Outil | Description | Paramètres |
|---|---|---|
listConsumerGroups | Liste tous les consumer groups | - |
describeConsumerGroup | Décrit un consumer group | group (string) |
📊 Cluster Monitoring (5 outils)
| Outil | Description | Paramètres |
|---|---|---|
getClusterInfo | Informations du cluster (brokers, controller) | - |
describeBroker | Configuration d'un broker | brokerId (number) |
getClusterMetrics | Métriques de santé du cluster | - |
analyzeTopicPartitions | Analyse des partitions avec offsets | topic (string) |
checkClusterHealth | Diagnostic complet avec recommandations | - |
🔍 Cluster Mode & Platform (1 outil) 🆕
| Outil | Description | Paramètres |
|---|---|---|
detectClusterMode | Détecte KRaft/Zookeeper + plateforme (Strimzi/Confluent/Apache/MSK) | - |
📈 Prometheus Metrics (2 outils) 🆕
| Outil | Description | Paramètres |
|---|---|---|
getPrometheusMetrics | Métriques Kafka depuis Prometheus | - |
getBrokerPrometheusMetrics | Métriques broker (throughput, CPU, RAM) | brokerId (number) |
📋 Schema Registry (8 outils) 🆕
| Outil | Description | Paramètres |
|---|---|---|
getSchemaRegistryInfo | Info du Schema Registry | - |
listSchemaSubjects | Liste tous les sujets | - |
getSchemaDetails | Détails d'un schéma | subject, version? |
listSchemaVersions | Toutes les versions d'un sujet | subject |
registerSchema | Enregistre un nouveau schéma | subject, schema, schemaType? |
checkSchemaCompatibility | Vérifie la compatibilité | subject, schema, version? |
deleteSchemaVersion | Supprime une version | subject, version |
deleteSchemaSubject | Supprime un sujet complet | subject |
⚠️ Erreur commune: "Unexpected end of JSON input"
Symptôme :
SyntaxError: Unexpected token '.', " . ____ "... is not valid JSON
Cause : Le banner Spring Boot ou des logs polluent stdout.
Solution : ✅ Déjà corrigé dans ce projet via :
spring.main.banner-mode: offdans `application# Kafka MCP Server v2.0 - Spring Boot Edition
Un serveur MCP (Model Context Protocol) moderne pour Kafka, développé avec Spring Boot 3.2 et Spring AI MCP.
🚀 Nouveautés v2.0
- ✅ Spring Boot 3.2 - Architecture moderne et performante
- ✅ Spring AI MCP - Annotations
@Toolpour une intégration native - ✅ Auto-configuration - Configuration simplifiée via
application.yml - ✅ Type-safe DTOs - Réponses structurées avec Lombok
- ✅ Logging avancé - Traçabilité complète des opérations
- ✅ Container optimisé - Multi-stage build pour un Docker léger
📋 Prérequis
- Java 17+ (pour développement local)
- Maven 3.9+ (pour build local)
- Docker (pour déploiement containerisé)
- Accès réseau au cluster Kafka/Strimzi
🏗️ Architecture du Projet
Structure des dossiers
kafka-mcp-server/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── carrefour/
│ │ │ └── kafka/
│ │ │ └── mcp/
│ │ │ ├── KafkaMcpServerApplication.java # Point d'entrée Spring Boot
│ │ │ ├── config/
│ │ │ │ └── KafkaConfig.java # Configuration Kafka (Admin, Producer, Consumer)
│ │ │ ├── tools/
│ │ │ │ └── KafkaMcpTools.java # Services MCP avec @Tool annotations
│ │ │ └── dto/
│ │ │ └── *.java # DTOs pour les réponses (Lombok)
│ │ └── resources/
│ │ └── application.yml # Configuration Spring Boot
│ └── test/
│ └── java/
│ └── com/carrefour/kafka/mcp/
│ └── KafkaMcpToolsTest.java # Tests unitaires
├── Dockerfile # Multi-stage Docker build
├── pom.xml # Dépendances Maven
├── docker-compose.yml # Configuration Docker Compose
└── README.md
Architecture en couches
┌─────────────────────────────────────────┐
│ Claude Desktop / API │
│ (Client MCP via stdio) │
└────────────────┬────────────────────────┘
│ JSON-RPC over stdio
▼
┌─────────────────────────────────────────┐
│ Spring AI MCP Framework │
│ - Protocol handling (initialize, etc) │
│ - Tool discovery & invocation │
│ - JSON serialization │
└────────────────┬────────────────────────┘
│ @Tool methods
▼
┌─────────────────────────────────────────┐
│ KafkaMcpTools Service │
│ @Tool kafka_list_topics() │
│ @Tool kafka_describe_topic() │
│ @Tool kafka_create_topic() │
│ @Tool kafka_consume_messages() │
│ @Tool kafka_produce_message() │
│ etc... │
└────────────────┬────────────────────────┘
│ Uses
▼
┌─────────────────────────────────────────┐
│ Spring Kafka Components │
│ - KafkaAdmin (topic management) │
│ - KafkaTemplate (producing) │
│ - KafkaConsumer (consuming) │
│ - AdminClient (admin operations) │
└────────────────┬────────────────────────┘
│ Kafka Protocol
▼
┌─────────────────────────────────────────┐
│ Kafka/Strimzi Cluster │
│ bootstrap.dev.kafkahub...9431 │
└─────────────────────────────────────────┘
🔧 Composants Principaux
1. KafkaMcpServerApplication
Point d'entrée Spring Boot avec annotations :
@SpringBootApplication- Auto-configuration@EnableKafka- Active le support Kafka
2. KafkaConfig
Configuration centralisée Kafka :
- KafkaAdmin : Opérations administratives (create/delete topics, describe groups)
- ProducerFactory : Factory pour les producteurs de messages
- ConsumerFactory : Factory pour les consommateurs de messages
- KafkaTemplate : Template Spring pour simplifier la production
- Support SASL/SSL : Configuration sécurité via properties
3. KafkaMcpTools
Service principal avec annotations @Tool Spring AI :
@Tool(name = "kafka_list_topics",
description = "Liste tous les topics Kafka disponibles")
public TopicListResponse listTopics() { ... }
@Tool(name = "kafka_consume_messages",
description = "Consomme des messages d'un topic")
public MessageListResponse consumeMessages(
@ToolParameter(description = "Nom du topic", required = true) String topic,
@ToolParameter(description = "Max messages", required = false) Integer maxMessages
) { ... }
Caractéristiques :
- Injection automatique des dépendances via
@RequiredArgsConstructor - Logging détaillé avec SLF4J
- Gestion d'erreurs robuste avec try-catch
- Retour de DTOs structurés
4. DTOs (Data Transfer Objects)
Classes immutables avec Lombok :
@Data- Getters/setters automatiques@Builder- Pattern builder fluide@NoArgsConstructor/@AllArgsConstructor- Constructeurs
Exemples :
TopicListResponse- Liste de topicsMessageInfo- Détails d'un messageConsumerGroupDescriptionResponse- État d'un consumer group
🚀 Installation et Build
⚠️ IMPORTANT - Protocole MCP et stdout
Le protocole MCP utilise stdout exclusivement pour JSON-RPC. Tous les logs doivent aller sur stderr.
✅ Déjà configuré dans le projet :
- Banner Spring Boot désactivé (
banner-mode: off) - Logs redirigés vers stderr via
logback-spring.xml - Aucune sortie console sur stdout
Option 1: Build avec Docker (recommandé)
# Clone le repository
git clone <your-repo>
cd kafka-mcp-server
# Build l'image Docker
docker build -t kafka-mcp-server:2.0.0 .
# L'image finale fait ~150MB (optimisée avec Alpine)
Option 2: Test du build
# Exécuter le script de test (Linux/Mac)
chmod +x test-mcp-server.sh
./test-mcp-server.sh
# Le script vérifie:
# ✅ stdout contient uniquement du JSON
# ✅ 8 outils MCP sont exposés
# ✅ Connexion Kafka fonctionne
Option 3: Build avec Maven
# Build le projet
mvn clean package
# Le JAR sera dans target/kafka-mcp-server-2.0.0.jar
java -jar target/kafka-mcp-server-2.0.0.jar
Option 4: Développement local
# Avec Maven Wrapper
./mvnw spring-boot:run
# Ou avec Maven installé
mvn spring-boot:run
📦 Configuration
Variables d'environnement
| Variable | Description | Défaut | Obligatoire |
|---|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | Adresses des brokers Kafka | localhost:9092 | ✅ |
KAFKA_SECURITY_PROTOCOL | Protocole de sécurité | PLAINTEXT | ❌ |
KAFKA_SASL_MECHANISM | Mécanisme SASL | - | ❌ |
KAFKA_SASL_JAAS_CONFIG | Configuration JAAS | - | ❌ |
application.yml
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
security-protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
sasl-mechanism: ${KAFKA_SASL_MECHANISM:}
sasl-jaas-config: ${KAFKA_SASL_JAAS_CONFIG:}
spring:
ai:
mcp:
server:
enabled: true
transport: stdio
name: kafka-carrefour
🎯 Utilisation
1. Démarrage avec Docker
Configuration de base (PLAINTEXT)
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431 \
kafka-mcp-server:2.0.0
Configuration avec SASL/SSL
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=broker.prod.kafka.com:9093 \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=SCRAM-SHA-512 \
-e KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="pass";' \
kafka-mcp-server:2.0.0
2. Configuration Claude Desktop
Éditez ~/.config/Claude/claude_desktop_config.json (Linux/Mac) ou %APPDATA%\Claude\claude_desktop_config.json (Windows) :
Configuration Linux/Mac
{
"mcpServers": {
"kafka-carrefour": {
"command": "docker",
"args": [
"run",
"-i",
"--rm",
"--network=host",
"-e",
"KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431",
"kafka-mcp-server:2.0.0"
]
}
}
}
Configuration Windows avec WSL
{
"mcpServers": {
"kafka-carrefour": {
"command": "wsl",
"args": [
"--",
"docker",
"run",
"-i",
"--rm",
"--network=host",
"-e",
"KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431",
"kafka-mcp-server:2.0.0"
]
}
}
}
3. Redémarrage de Claude Desktop
# Sur Linux
killall claude
claude &
# Sur Mac
killall Claude
open -a Claude
# Sur Windows - relancer l'application manuellement
4. Vérification de la connexion
Dans Claude, demandez :
Quels sont les outils Kafka disponibles ?
Vous devriez voir les 8 outils MCP listés.
🛠️ Outils MCP Disponibles
Topics
1. kafka_list_topics
Liste tous les topics du cluster.
Paramètres : Aucun
Exemple Claude :
Liste-moi tous les topics Kafka
Réponse :
{
"topics": ["000", "002", "010", "transactions"],
"count": 4
}
2. kafka_describe_topic
Décrit un topic avec partitions, replicas, et configuration.
Paramètres :
topic(string, requis) : Nom du topic
Exemple Claude :
Décris-moi le topic '000' en détail
Réponse :
{
"name": "000",
"partitions": [
{
"partition": 0,
"leader": 1,
"replicas": [1, 2, 3],
"isr": [1, 2, 3]
}
],
"partitionCount": 3,
"isInternal": false,
"configurations": {
"retention.ms": "604800000",
"segment.bytes": "1073741824"
}
}
3. kafka_create_topic
Crée un nouveau topic.
Paramètres :
topic(string, requis) : Nom du topicpartitions(number, optionnel) : Nombre de partitions (défaut: 1)replicationFactor(number, optionnel) : Facteur de réplication (défaut: 1)
Exemple Claude :
Crée un topic 'test-events' avec 3 partitions et un facteur de réplication de 2
Réponse :
{
"topic": "test-events",
"created": true,
"partitions": 3,
"replicationFactor": 2,
"message": "Topic created successfully"
}
4. kafka_delete_topic
Supprime un topic existant.
Paramètres :
topic(string, requis) : Nom du topic
Exemple Claude :
Supprime le topic 'old-logs'
Messages
5. kafka_consume_messages
Consomme des messages d'un topic.
Paramètres :
topic(string, requis) : Nom du topicmaxMessages(number, optionnel) : Nombre max de messages (défaut: 10)timeout(number, optionnel) : Timeout en ms (défaut: 5000)
Exemple Claude :
Consomme 20 messages du topic 'transactions'
Réponse :
{
"topic": "transactions",
"messages": [
{
"topic": "transactions",
"partition": 0,
"offset": 1234,
"key": "user-123",
"value": "{\"amount\": 99.99}",
"timestamp": 1698765432000
}
],
"count": 20
}
6. kafka_produce_message
Produit un message vers un topic.
Paramètres :
topic(string, requis) : Nom du topickey(string, optionnel) : Clé du messagevalue(string, requis) : Valeur du message
Exemple Claude :
Envoie un message de test avec la clé 'test-1' et la valeur '{"status":"ok"}' vers le topic 'events'
Réponse :
{
"topic": "events",
"partition": 2,
"offset": 5678,
"timestamp": 1698765432000,
"success": true,
"message": "Message produced successfully"
}
Consumer Groups
7. kafka_list_consumer_groups
Liste tous les consumer groups.
Paramètres : Aucun
Exemple Claude :
Quels sont les consumer groups actifs ?
Réponse :
{
"groups": ["app-consumer", "analytics-group"],
"count": 2
}
8. kafka_describe_consumer_group
Décrit un consumer group avec ses membres.
Paramètres :
group(string, requis) : Nom du consumer group
Exemple Claude :
Décris le consumer group 'app-consumer'
Réponse :
{
"groupId": "app-consumer",
"state": "Stable",
"members": [
{
"memberId": "consumer-1-abc123",
"clientId": "consumer-1",
"host": "/192.168.1.10",
"assignment": ["events-0", "events-1"]
}
],
"memberCount": 1,
"coordinator": 1
}
🧪 Tests et Debugging
Test manuel avec JSON-RPC
# Test d'initialisation
echo '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}},"id":1}' | \
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
kafka-mcp-server:2.0.0
# Test de listing des tools
echo '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":2}' | \
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
kafka-mcp-server:2.0.0
# Test d'appel d'un tool
echo '{"jsonrpc":"2.0","method":"tools/call","params":{"name":"kafka_list_topics","arguments":{}},"id":3}' | \
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
kafka-mcp-server:2.0.0
Logs détaillés
# Capture des logs dans un fichier
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
kafka-mcp-server:2.0.0 2>&1 | tee mcp-server.log
# Les logs incluent :
# - Connexion au cluster Kafka
# - Découverte des tools MCP
# - Exécution des commandes
# - Erreurs détaillées
Tests unitaires
# Exécution des tests
mvn test
# Tests avec couverture
mvn test jacoco:report
Vérification de connectivité Kafka
# Test avec kafka-broker-api-versions
docker run --rm confluentinc/cp-kafka:latest \
kafka-broker-api-versions \
--bootstrap-server bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431
# Test avec kafka-topics
docker run --rm confluentinc/cp-kafka:latest \
kafka-topics --list \
--bootstrap-server bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431
🔐 Sécurité
Configuration SASL/PLAIN
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=broker:9093 \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=PLAIN \
-e KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";' \
kafka-mcp-server:2.0.0
Configuration SASL/SCRAM
docker run -i --rm \
-e KAFKA_BOOTSTRAP_SERVERS=broker:9093 \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=SCRAM-SHA-512 \
-e KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="secret";' \
kafka-mcp-server:2.0.0
Avec certificats SSL (montage de volumes)
docker run -i --rm \
-v /path/to/certs:/certs:ro \
-e KAFKA_BOOTSTRAP_SERVERS=broker:9093 \
-e KAFKA_SECURITY_PROTOCOL=SSL \
-e KAFKA_SSL_TRUSTSTORE_LOCATION=/certs/truststore.jks \
-e KAFKA_SSL_TRUSTSTORE_PASSWORD=trustpass \
-e KAFKA_SSL_KEYSTORE_LOCATION=/certs/keystore.jks \
-e KAFKA_SSL_KEYSTORE_PASSWORD=keypass \
kafka-mcp-server:2.0.0
🐳 Docker Compose
Créez docker-compose.yml pour un déploiement simplifié :
version: '3.8'
services:
kafka-mcp-server:
build: .
image: kafka-mcp-server:2.0.0
stdin_open: true
tty: true
environment:
- KAFKA_BOOTSTRAP_SERVERS=bootstrap.dev.kafkahub.external.messaging.vpodg1np.carrefour.com:9431
- KAFKA_SECURITY_PROTOCOL=PLAINTEXT
networks:
- kafka-network
networks:
kafka-network:
driver: bridge
docker-compose up -d
docker-compose logs -f kafka-mcp-server
📊 Monitoring et Observabilité
Métriques Spring Boot Actuator
Ajoutez dans pom.xml :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Ajoutez dans application.yml :
management:
endpoints:
web:
exposure:
include: health,info,metrics
metrics:
export:
prometheus:
enabled: true
Logs structurés
Configuration Logback (logback-spring.xml) :
<configuration>
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="JSON"/>
</root>
</configuration>
🚀 Cas d'usage avancés
1. Monitoring de la santé du cluster
Claude, analyse l'état de santé du cluster Kafka :
1. Liste tous les topics
2. Pour chaque topic, vérifie s'il a des partitions offline
3. Liste les consumer groups et leur état
4. Identifie les groupes en erreur
2. Migration de données
Claude, copie les 1000 derniers messages du topic 'prod-events' vers 'backup-events'
3. Analyse de messages
Claude, consomme 100 messages du topic 'logs', analyse les erreurs et donne-moi un résumé
4. Création d'environnement de test
Claude, crée 5 topics de test (test-1 à test-5) avec 3 partitions chacun, puis envoie 10 messages de test dans chacun
🔄 Roadmap
- Support des Kafka Streams
- Gestion des Schema Registry (Avro/Protobuf)
- Monitoring avancé avec Prometheus
- Support des transactions Kafka
- Interface REST additionnelle
- Support Kubernetes/Helm charts
🤝 Contribution
# Fork le projet
git checkout -b feature/ma-fonctionnalite
mvn test
git commit -m "Ajout de ma fonctionnalité"
git push origin feature/ma-fonctionnalite
# Créer une Pull Request
📝 Licence
Ce projet est sous licence MIT.
🆘 Support
- Issues GitHub : Pour les bugs et feature requests
- Documentation : Ce README et les JavaDoc
- Logs : Activez le niveau DEBUG pour plus de détails