madamak/apache-airflow-mcp-server
If you are the rightful owner of apache-airflow-mcp-server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
The Apache Airflow MCP Server is a human entrypoint for managing and inspecting Airflow DAGs, runs, and logs through a structured JSON interface.
Airflow MCP Server
Human entrypoint for running and using the Apache Airflow MCP server. This server exposes safe, focused tools to inspect Airflow DAGs, runs, and logs (with optional write operations gated by client approval). Responses are structured JSON objects (dicts) and include a request_id for traceability.
Quickstart
1) Install the server (PyPI)
Install with uv so you are exercising the exact bits that ship to users and get reproducible virtualenvs:
uv tool install apache-airflow-mcp-server
No uv? Fall back to pip:
pip install apache-airflow-mcp-server
2) Configure instances (required)
Set AIRFLOW_MCP_INSTANCES_FILE to a YAML file listing available Airflow instances. Values may reference environment variables using ${VAR} syntax. Missing variables cause startup errors.
Example (examples/instances.yaml):
# Data team staging instance
data-stg:
host: https://airflow.data-stg.example.com/
api_version: v1
verify_ssl: true
auth:
type: basic
username: ${AIRFLOW_INSTANCE_DATA_STG_USERNAME}
password: ${AIRFLOW_INSTANCE_DATA_STG_PASSWORD}
# ML team staging instance
ml-stg:
host: https://airflow.ml-stg.example.com/
api_version: v1
verify_ssl: true
auth:
type: basic
username: ${AIRFLOW_INSTANCE_ML_STG_USERNAME}
password: ${AIRFLOW_INSTANCE_ML_STG_PASSWORD}
# Bearer token (experimental)
# ml-prod:
# host: https://airflow.ml-prod.example.com/
# api_version: v1
# verify_ssl: true
# auth:
# type: bearer
# token: ${AIRFLOW_INSTANCE_ML_PROD_TOKEN}
Bearer token auth is experimental; basic auth remains the primary, well-tested path.
Kubernetes deployment tip: Provide instances.yaml via a Secret and mount it at /config/instances.yaml (set AIRFLOW_MCP_INSTANCES_FILE=/config/instances.yaml).
Environment variables:
AIRFLOW_MCP_INSTANCES_FILE(required): path to registry YAMLAIRFLOW_MCP_DEFAULT_INSTANCE(optional): default instance keyAIRFLOW_MCP_HTTP_HOST(default: 127.0.0.1)AIRFLOW_MCP_HTTP_PORT(default: 8765)AIRFLOW_MCP_TIMEOUT_SECONDS(default: 30)AIRFLOW_MCP_LOG_FILE(optional)AIRFLOW_MCP_HTTP_BLOCK_GET_ON_MCP(default: true)
3) Run the server
- HTTP (recommended for tooling):
uv run airflow-mcp --transport http --host 127.0.0.1 --port 8765
- STDIO (CLI/terminal workflows):
uv run airflow-mcp --transport stdio
Health check (HTTP): GET /health → 200 OK.
Tip: A fastmcp.json is included for discovery/config by FastMCP tooling:
{
"$schema": "https://gofastmcp.com/schemas/fastmcp_config/v1.json",
"entrypoint": { "file": "src/airflow_mcp/server.py", "object": "mcp" },
"deployment": { "transport": "http", "host": "127.0.0.1", "port": 8765 }
}
4) Typical incident workflow
Start from an Airflow UI URL (often in a Datadog alert):
airflow_resolve_url(url)→ resolveinstance,dag_id,dag_run_id,task_id.airflow_list_dag_runs(instance|ui_url, dag_id)→ confirm recent state.airflow_get_task_instance(instance|ui_url, dag_id, dag_run_id, task_id, include_rendered?, max_rendered_bytes?)→ inspect task metadata, attempts, and optional rendered fields.airflow_get_task_instance_logs(instance|ui_url, dag_id, dag_run_id, task_id, try_number, filter_level?, context_lines?, tail_lines?, max_bytes?)→ inspect failure with optional filtering and truncation.
All tools accept either instance or ui_url. If both are given and disagree, the call fails with INSTANCE_MISMATCH. ui_url must be a fully qualified http(s) Airflow URL; use airflow_list_instances() to discover valid hosts when you only have an instance key.
Tool Reference (Structured JSON)
Discovery and URL utilities:
airflow_list_instances()→ list configured instance keys and defaultairflow_describe_instance(instance)→ host, api_version, verify_ssl,auth_type(redacted)airflow_resolve_url(url)→ resolve instance and identifiers from an Airflow UI URL
Read-only tools:
airflow_list_dags(instance|ui_url, limit?, offset?, state?/filters)→ compact DAGs with UI linksairflow_get_dag(instance|ui_url, dag_id)→ DAG details + UI linkairflow_list_dag_runs(instance|ui_url, dag_id, state?, limit?, offset?, order_by?, descending?)→ runs + per-run UI links. Defaults toexecution_datedescending ("latest first"). Accepts explicit ordering bystart_date,end_date, orexecution_dateairflow_get_dag_run(instance|ui_url, dag_id, dag_run_id)→ run details + UI linkairflow_list_task_instances(instance|ui_url, dag_id, dag_run_id, limit?, offset?, state?, task_ids?)→ task attempts for a run, with per-attempt log URLs, optional server-side filtering by state or task id, and afiltersecho describing applied filters (countreflects filtered results;total_entriesmirrors the API response when available)airflow_get_task_instance(instance|ui_url, dag_id, dag_run_id, task_id, include_rendered?, max_rendered_bytes?)→ concise task metadata (state, timings, attempts, config) with optional rendered template fields and direct UI links; sensors incrementtry_numberon every reschedule, so treat it as an attempt index (derivedretries_*fields are heuristic)airflow_get_task_instance_logs(instance|ui_url, dag_id, dag_run_id, task_id, try_number, filter_level?, context_lines?, tail_lines?, max_bytes?)→ log text with optional filtering; response includestruncated,auto_tailed, and statsairflow_dataset_events(instance|ui_url, dataset_uri, limit?)→ dataset events (optional capability)
Write tools (require client approval; destructive):
airflow_trigger_dag(instance|ui_url, dag_id, conf?, logical_date?, dag_run_id?, note?)airflow_clear_task_instances(instance|ui_url, dag_id, task_ids?, start_date?, end_date?, include_*?, dry_run?)airflow_clear_dag_run(instance|ui_url, dag_id, dag_run_id, include_*?, dry_run?, reset_dag_runs?)airflow_pause_dag(instance|ui_url, dag_id)/airflow_unpause_dag(instance|ui_url, dag_id)
Contract:
- Success: dict payloads include a
request_id(traceable in logs). FastMCP serializes them automatically. - Failure: tools raise an MCP
ToolErrorwhose payload remains a compact JSON string{ "code": "INVALID_INPUT", "message": "...", "request_id": "...", "context"?: {...} }.
Log Filtering (airflow_get_task_instance_logs)
Efficient incident triage with server-side filtering and normalized payloads:
Typical incident workflow:
# Example: Find errors in recent execution with context
airflow_get_task_instance_logs(
dag_id="etl_pipeline",
dag_run_id="scheduled__2025-10-30",
task_id="transform_data",
try_number=2,
tail_lines=500, # Last 500 lines only
filter_level="error", # Show ERROR, CRITICAL, FATAL, Exception, Traceback
context_lines=5 # Include 5 lines before/after each error
)
Two-call pattern (required for try_number):
ti = airflow_get_task_instance(...)→ readti["attempts"]["try_number"].airflow_get_task_instance_logs(..., try_number=ti["attempts"]["try_number"]).
Sensors and reschedules can increment try_number without consuming retries; keeping this explicit prevents the server from making incorrect assumptions about the “latest attempt.”
Sensors treat each reschedule as another attempt, so try_number is best interpreted as an attempt index rather than “number of retries consumed.” The derived retries_consumed / retries_remaining fields are heuristics based on configured retries and may not match Airflow’s notion for long-running sensors—always inspect the task metadata if you need authoritative counts.
Parameters:
filter_level:"error"(strict) |"warning"(includes errors) |"info"(all levels)context_lines: Symmetric context (N before + N after each match), clamped to [0, 1000]tail_lines: Extract last N lines before filtering, clamped to [0, 100K]max_bytes: Hard cap (default: 100KB ≈ 25K tokens)- Numeric inputs provided as floats/strings are coerced and clamped server-side so clients can pass user-entered values safely.
Response shape:
log: Single normalized string. When the Airflow API returns host-segmented logs, headers of the form--- [worker-1] ---(or--- [unknown-host] ---) are inserted with blank lines between segments so LLMs can reason about execution locality.truncated:trueif output exceededmax_bytesauto_tailed:trueif log >100MB triggered automatic tail to last 10K linesmatch_count: Number of lines matchingfilter_level(before context expansion)meta.filters: Echo of effective filters applied (shows clamped values)bytes_returned,original_lines,returned_lines: stats that align with the normalized text seen by the client
Failed Task Discovery (airflow_list_task_instances)
airflow_list_task_instances now exposes server-side filters that make the dedicated
airflow_get_failed_task_instance helper unnecessary. Use the same primitive for every
failed-task workflow:
# All failed tasks for a known run (count reflects filtered results)
failed = airflow_list_task_instances(
dag_id="etl_pipeline",
dag_run_id="scheduled__2025-10-30",
state=["failed"]
)
# Failed tasks for a subset of task IDs (case: only sensors + downloaders)
subset = airflow_list_task_instances(
dag_id="etl_pipeline",
dag_run_id="backfill__2025-10-30",
state=["failed"],
task_ids=["check_source", "download_payload"]
)
# Recipe: latest failed run → failed tasks
latest_failed = airflow_list_dag_runs(
dag_id="etl_pipeline",
state=["failed"],
limit=1
)["dag_runs"][0]
failed_tasks = airflow_list_task_instances(
dag_id="etl_pipeline",
dag_run_id=latest_failed["dag_run_id"],
state=["failed"]
)
Why the change?
- A single tool stays composable: filter by
state,task_ids, or both. - The response includes
filters(echo) andtotal_entrieswhen the API shares it, so callers can detect whether additional pagination is needed while still getting a filteredcount. - Agents that previously relied on
airflow_get_failed_task_instanceshould migrate toairflow_list_task_instances(state=["failed"])(optionally preceded byairflow_list_dag_runsto resolve the relevant run).
Task Instance Metadata (airflow_get_task_instance)
Pair metadata with logs for faster triage:
# Example: Inspect failed task metadata and rendered fields
task_meta = airflow_get_task_instance(
dag_id="etl_pipeline",
dag_run_id="scheduled__2025-10-30",
task_id="transform_data",
include_rendered=True,
max_rendered_bytes=100_000,
)
# Use try_number + ui_url.log to pull logs next
logs = airflow_get_task_instance_logs(
dag_id="etl_pipeline",
dag_run_id="scheduled__2025-10-30",
task_id=task_meta["task_instance"]["task_id"],
try_number=task_meta["attempts"]["try_number"],
filter_level="error",
)
Highlights:
task_instance: State, host, operator, timings (with computedduration_ms)task_config: Owner, retries, retry_delay (when available)attempts: Current try, retries consumed/remainingrendered_fields(optional): Rendered template values with byte cap (max_rendered_bytes, default 100KB; truncated payload returns{ "_truncated": "Increase max_rendered_bytes" })ui_url: Directgridandloglinks for the task attempt
Client Notes
- MCP clients (e.g., IDEs) should call tools by name with JSON args and handle JSON responses.
- For URL-based calls, the server validates the hostname against the configured registry (SSRF guard).
- Write tools are annotated so MCP clients will prompt for confirmation before execution.
Add to MCP clients
Cursor – stdio (spawn on demand)
Add an entry to ~/.cursor/mcp.json. Cursor will spawn the server process and communicate over stdio.
{
"mcpServers": {
"airflow-stdio": {
"command": "uv",
"args": [
"--directory",
"/path/to/apache-airflow-mcp-server",
"run",
"airflow-mcp",
"--transport",
"stdio"
],
"env": {
"AIRFLOW_MCP_INSTANCES_FILE": "/path/to/instances.yaml",
"AIRFLOW_MCP_DEFAULT_INSTANCE": "production"
}
}
}
}
Notes:
- Replace
/path/to/apache-airflow-mcp-serverand/path/to/instances.yamlwith your paths. - No long-running server is required; Cursor starts the process per session.
Cursor – local HTTP
Run the server yourself, then point Cursor at the HTTP endpoint.
- Start the server:
uv run airflow-mcp --transport http --host 127.0.0.1 --port 8765
- In
~/.cursor/mcp.json:
{
"mcpServers": {
"airflow-local-http": {
"url": "http://127.0.0.1:8765/mcp"
}
}
}
Cursor – remote HTTP
Point Cursor at a deployed server. Optional headers can be added if fronted by an auth proxy.
{
"mcpServers": {
"airflow-remote-http": {
"url": "https://airflow-mcp.internal.example.com/mcp",
"headers": {
"X-API-Key": "your-token-if-proxied"
}
}
}
}
Other clients
- Many MCP clients accept either a stdio command or an HTTP URL; the examples above generalize.
- A
fastmcp.jsonis included for FastMCP-aware tooling that can auto-discover entrypoints.
Development
# Install dependencies
uv sync
# Run tests
uv run pytest
# Run linter
uv run ruff check .
# Run the server (stdio)
uv run airflow-mcp --transport stdio
# Run the server (HTTP)
uv run airflow-mcp --transport http --host 127.0.0.1 --port 8765
Testing strategy:
- No real network calls; tests patch
airflow_mcp.client_factory._import_airflow_client. - Reset registry cache in fixtures; assert
request_idpresence, URL precedence, and structured log fields.
Observability:
- Structured logs with
tool_start,tool_success/tool_error,duration_ms,response_bytes, and context fields.
License
Apache 2.0 - see for details.