Cledar/flink-mcp
If you are the rightful owner of flink-mcp and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
The Flink MCP Server connects to Apache Flink SQL Gateway, enabling interaction with Flink clusters through a model context protocol.
flink-mcp — Flink MCP Server
This project provides an MCP server that connects to Apache Flink SQL Gateway.
Prerequisites
-
A running Apache Flink cluster and SQL Gateway
- Start cluster:
./bin/start-cluster.sh - Start gateway:
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost - Verify:
curl http://localhost:8083/v3/info
- Start cluster:
-
Configure environment:
- Set
SQL_GATEWAY_API_BASE_URL(defaulthttp://localhost:8083). You can use a.envfile at repo root.
- Set
Run
Install and run via the console script:
pip install -e .
flink-mcp
MCP clients should launch the server over stdio with command: flink-mcp.
Ensure SQL_GATEWAY_API_BASE_URL is set in your environment or .env.
Tools (v0.2.5)
flink_info(resource): returns cluster info from/v3/info.open_new_session(properties?: dict)->{ sessionHandle, ... }.get_config(sessionHandle: str): returns session configuration.configure_session(sessionHandle: str, statement: str): apply session-scoped DDL/config (CREATE/USE/SET/RESET/LOAD/UNLOAD/ADD JAR).run_query_collect_and_stop(sessionHandle: str, query: str, max_rows: int=5, max_seconds: float=15.0): execute, fetch up to N rows within T seconds, then STOP the job if ajobIDis present; closes the operation.run_query_stream_start(sessionHandle: str, query: str): execute a streaming query and return{ jobID, operationHandle }; the job is left running.fetch_result_page(sessionHandle: str, operationHandle: str, token: int): fetch a single page; returns{ page, nextToken, isEnd }.cancel_job(sessionHandle: str, jobId: str): issueSTOP JOB '<jobId>', wait until DESCRIBE JOB status is not RUNNING; returns{ jobID, status, jobGone, jobStatus }.
Notes
-
Tools are stateless; clients manage and pass session/operation handles explicitly.
-
run_query_stream_startreturns bothjobIDandoperationHandle; usefetch_result_pageto stream results. -
cancel_jobissues STOP and waits using DESCRIBE JOB;close_operationis invoked internally where appropriate. -
Endpoints target SQL Gateway v3-style paths.