Cledar/flink-mcp
If you are the rightful owner of flink-mcp and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to henry@mcphub.com.
The Flink MCP Server connects to Apache Flink SQL Gateway, enabling interaction with Flink clusters through a model context protocol.
flink-mcp — Flink MCP Server
This project provides an MCP server that connects to Apache Flink SQL Gateway.
Prerequisites
-
A running Apache Flink cluster and SQL Gateway
- Start cluster:
./bin/start-cluster.sh
- Start gateway:
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
- Verify:
curl http://localhost:8083/v3/info
- Start cluster:
-
Configure environment:
- Set
SQL_GATEWAY_API_BASE_URL
(defaulthttp://localhost:8083
). You can use a.env
file at repo root.
- Set
Run
Install and run via the console script:
pip install -e .
flink-mcp
MCP clients should launch the server over stdio with command: flink-mcp
.
Ensure SQL_GATEWAY_API_BASE_URL
is set in your environment or .env
.
Tools (v0.2.5)
flink_info
(resource): returns cluster info from/v3/info
.open_new_session(properties?: dict)
->{ sessionHandle, ... }
.get_config(sessionHandle: str)
: returns session configuration.configure_session(sessionHandle: str, statement: str)
: apply session-scoped DDL/config (CREATE/USE/SET/RESET/LOAD/UNLOAD/ADD JAR).run_query_collect_and_stop(sessionHandle: str, query: str, max_rows: int=5, max_seconds: float=15.0)
: execute, fetch up to N rows within T seconds, then STOP the job if ajobID
is present; closes the operation.run_query_stream_start(sessionHandle: str, query: str)
: execute a streaming query and return{ jobID, operationHandle }
; the job is left running.fetch_result_page(sessionHandle: str, operationHandle: str, token: int)
: fetch a single page; returns{ page, nextToken, isEnd }
.cancel_job(sessionHandle: str, jobId: str)
: issueSTOP JOB '<jobId>'
, wait until DESCRIBE JOB status is not RUNNING; returns{ jobID, status, jobGone, jobStatus }
.
Notes
-
Tools are stateless; clients manage and pass session/operation handles explicitly.
-
run_query_stream_start
returns bothjobID
andoperationHandle
; usefetch_result_page
to stream results. -
cancel_job
issues STOP and waits using DESCRIBE JOB;close_operation
is invoked internally where appropriate. -
Endpoints target SQL Gateway v3-style paths.