NiFi-MCP-Server

sfc-gh-ncover/NiFi-MCP-Server

3.2

If you are the rightful owner of NiFi-MCP-Server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.

The NiFi MCP Server is a Model Context Protocol server that provides read and write access to Apache NiFi using Snowflake Openflow with Programmatic Access Tokens (PAT).

Tools
5
Resources
0
Prompts
0

NiFi MCP Server

Note: This is a fork of kevinbtalbert/NiFi-MCP-Server, modified to work with Snowflake Openflow using Programmatic Access Tokens (PAT) instead of Knox authentication.

Model Context Protocol server providing selectable read and write access to Apache NiFi via Snowflake Openflow using Programmatic Access Tokens (PAT).

Works with both NiFi 1.x and 2.x - automatic version detection and adaptation.

Features

  • Automatic version detection - Detects NiFi 1.x vs 2.x and adapts behavior
  • Snowflake Openflow authentication - Uses Programmatic Access Tokens (PAT) for secure authentication
  • Flexible configuration - Configure via environment variables OR at runtime via chat
  • Read-only by default - Safe exploration of NiFi flows and configuration
  • Intelligent flow building - Pattern recognition and requirements gathering for complex flows
  • 30 read-only MCP tools for exploring NiFi:
    • configure_connection(base_url, pat_token) - Configure NiFi connection at runtime (alternative to env vars)
    • get_connection_info() - Check current connection status
    • get_nifi_version() - Version and build information
    • get_root_process_group() - Root process group details
    • list_processors(process_group_id) - List processors in a process group
    • list_connections(process_group_id) - List connections in a process group
    • get_bulletins(after_ms?) - Recent bulletins and alerts
    • list_parameter_contexts() - Parameter contexts
    • get_controller_services(process_group_id?) - Controller services
    • list_registries() - List all NiFi Registry clients
    • get_registry_buckets(registry_id) - Get all buckets in a registry
    • get_registry_flows(registry_id, bucket_id) - Get all flows in a bucket
    • get_registry_flow_versions(registry_id, bucket_id, flow_id) - Get all versions of a flow
    • get_processor_types() - Available processor types for flow building
    • search_flow(query) - Search for components in the flow
    • get_connection_details(connection_id) - Detailed connection information
    • get_processor_details(processor_id) - Detailed processor configuration
    • list_input_ports(process_group_id) - Input ports for a process group
    • list_output_ports(process_group_id) - Output ports for a process group
    • get_processor_state(processor_id) - Quick processor state check
    • check_connection_queue(connection_id) - Queue size (flowfiles + bytes)
    • get_flow_summary(process_group_id) - Flow statistics and health overview
    • analyze_flow_build_request(user_request) - Intelligent pattern recognition and requirements gathering
    • get_parameter_context_details(context_id) - Get parameter context with all parameters
    • get_flow_health_status(process_group_id) - Comprehensive flow health check (processors, services, connections, errors)
    • find_controller_services_by_type(process_group_id, service_type) - Search for existing controller services by type (prevents 409 conflicts)
    • check_configuration() - Validate current environment configuration
    • get_setup_instructions() - Interactive setup guidance for NiFi MCP Server
    • get_best_practices_guide() - Best practices for building NiFi flows
    • get_recommended_workflow(flow_type) - Step-by-step guidance for common flow patterns
  • 43 write operations (when NIFI_READONLY=false):
    • start_processor(processor_id, version) - Start a processor
    • stop_processor(processor_id, version) - Stop a processor
    • create_processor(...) - Create a new processor
    • update_processor_config(...) - Update processor configuration
    • delete_processor(processor_id, version) - Delete a processor
    • create_connection(...) - Connect components
    • delete_connection(connection_id, version) - Delete a connection
    • empty_connection_queue(connection_id) - Empty flowfiles from queue (⚠️ data loss)
    • create_controller_service(pg_id, service_type, name) - Create controller services (DBCPConnectionPool, RecordWriters, etc.)
    • update_controller_service_properties(service_id, version, properties) - Configure service properties
    • get_controller_service_details(service_id) - Get service configuration (read-only but listed here for context)
    • delete_controller_service(service_id, version) - Remove controller services
    • enable_controller_service(service_id, version) - Enable a controller service
    • disable_controller_service(service_id, version) - Disable a controller service
    • create_process_group(parent_id, name, x, y) - Create process groups (folders) for organizing flows
    • update_process_group_name(pg_id, version, name) - Rename process groups
    • delete_process_group(pg_id, version) - Remove empty process groups
    • create_input_port(pg_id, name, x, y) - Create input ports for inter-process-group communication
    • create_output_port(pg_id, name, x, y) - Create output ports for inter-process-group communication
    • update_input_port(port_id, version, name) - Rename input ports
    • update_output_port(port_id, version, name) - Rename output ports
    • delete_input_port(port_id, version) - Remove input ports
    • delete_output_port(port_id, version) - Remove output ports
    • create_parameter_context(name, description, parameters) - Create parameter contexts for environment-specific config
    • update_parameter_context(context_id, version, ...) - Update parameter contexts
    • delete_parameter_context(context_id, version) - Remove parameter contexts
    • start_input_port(port_id, version) - Start input port to enable data flow
    • stop_input_port(port_id, version) - Stop input port
    • start_output_port(port_id, version) - Start output port to enable data flow
    • stop_output_port(port_id, version) - Stop output port
    • apply_parameter_context_to_process_group(pg_id, pg_version, context_id) - Apply parameter context to enable #{param} usage
    • start_process_group(pg_id) - Start entire process group recursively (all processors, services, ports)
    • stop_process_group(pg_id) - Stop entire process group recursively (all processors, ports)
    • enable_controller_services(pg_id) - Enable all controller services in a group
    • disable_controller_services(pg_id) - Disable all controller services in a group
    • terminate_processor(processor_id, version) - Force-terminate stuck processor (last resort)
    • start_new_flow(flow_name, flow_description) - Smart flow builder that automatically creates process groups and enforces best practices
    • import_versioned_flow(parent_pg_id, registry_id, bucket_id, flow_id, flow_name, version?, x?, y?) - Import a versioned flow from a NiFi Registry

Quick Start

For Snowflake Openflow deployments

Your Openflow base URL will typically look like:

https://of--<account>-<suffix>.snowflakecomputing.app/<your-prefix>

For example:

https://of--your-account-your-env.region.snowflakecomputing.app/your-prefix

You'll need a Programmatic Access Token (PAT) generated in Snowflake. See the Snowflake PAT documentation for details.

Setup

Option 1: Using uv (Recommended)

uv is a fast Python package installer and resolver. Install uv first if you don't have it.

  1. Clone and install:

    git clone https://github.com/kevinbtalbert/nifi-mcp-server.git
    cd nifi-mcp-server
    uv sync
    
  2. Configure Claude Desktop - Edit ~/Library/Application Support/Claude/claude_desktop_config.json:

     {
       "mcpServers": {
         "nifi-mcp-server": {
           "command": "/opt/homebrew/bin/uv",
           "args": [
             "run",
             "--project",
             "/FULL/PATH/TO/NiFi-MCP-Server",
             "-m",
             "nifi_mcp_server.server"
           ],
           "env": {
             "MCP_TRANSPORT": "stdio",
             "OPENFLOW_BASE_URL": "https://of--your-account-your-env.region.snowflakecomputing.app/your-prefix",
             "OPENFLOW_PAT": "<your_programmatic_access_token>",
             "NIFI_READONLY": "false"
           }
         }
       }
     }
    
  3. Restart Claude Desktop and start asking questions about your NiFi flows!

Option 2: Using Snowflake Cortex

For Snowflake Cortex users, add the MCP server using the CLI:

cortex mcp add nifi_mcp_server /opt/homebrew/bin/uv \
  --args "run,--project=/FULL/PATH/TO/NiFi-MCP-Server,-m,nifi_mcp_server.server" \
  --env "OPENFLOW_BASE_URL=https://of--your-account.snowflakecomputing.app/your-prefix,OPENFLOW_PAT=<your_pat_token>,NIFI_READONLY=false,MCP_TRANSPORT=stdio"

Or configure at runtime via chat (no env vars needed):

cortex mcp add nifi_mcp_server /opt/homebrew/bin/uv \
  --args "run,--project=/FULL/PATH/TO/NiFi-MCP-Server,-m,nifi_mcp_server.server" \
  --env "MCP_TRANSPORT=stdio,NIFI_READONLY=false"

Then in chat: "Configure connection to [your-url] with PAT token [your-token]"

Note: Replace /opt/homebrew/bin/uv with your actual uv path (find with which uv).

Option 3: Direct Installation (with uvx)

For quick installation without cloning, use the uvx command:

{
  "mcpServers": {
    "nifi-mcp-server": {
      "command": "uvx",
      "args": [
        "--from",
        "git+https://github.com/kevinbtalbert/nifi-mcp-server@main",
        "run-server"
      ],
      "env": {
        "MCP_TRANSPORT": "stdio",
        "OPENFLOW_BASE_URL": "https://of--your-account-your-env.region.snowflakecomputing.app/your-prefix",
        "OPENFLOW_PAT": "<your_programmatic_access_token>",
        "NIFI_READONLY": "true"
      }
    }
  }
}

Configuration Options

Configuration can be provided via environment variables OR at runtime via chat:

Environment Variables (Optional)

VariableRequiredDescription
OPENFLOW_BASE_URLOptional*Snowflake Openflow base URL (e.g., https://of--account.snowflakecomputing.app/prefix)
OPENFLOW_PATOptional*Snowflake Programmatic Access Token for authentication
NIFI_READONLYNoRead-only mode (default: true)
VERIFY_SSLNoVerify SSL certificates (default: true)
CA_BUNDLENoPath to CA certificate bundle

*Required if not configured via chat. If you don't set these environment variables, you can configure them at runtime using the configure_connection() tool.

Runtime Configuration via Chat

If you prefer not to set environment variables, you can configure the connection during your chat session:

Using the MCP tools:

  • configure_connection(base_url, pat_token) - Set or update your connection settings
  • get_connection_info() - Check current connection status

Example:

User: "Configure my connection to https://of--myaccount.snowflakecomputing.app/myprefix with PAT token sfp-abc123..."
Assistant: [Uses configure_connection tool to set up the connection]

User: "Now show me the NiFi version"
Assistant: [Uses get_nifi_version tool successfully]

This is useful when:

  • You want to switch between multiple NiFi environments
  • You prefer not to store credentials in config files
  • You're testing or demonstrating the MCP server

Generating a Programmatic Access Token (PAT)

To create a PAT in Snowflake, run the following SQL command:

ALTER USER <username> ADD PROGRAMMATIC ACCESS TOKEN <token_name>;

This will return a token starting with sfp-. Store this securely and use it as your OPENFLOW_PAT environment variable.

Example:

ALTER USER myuser ADD PROGRAMMATIC ACCESS TOKEN nifi_mcp_token;

For more details, see the Snowflake PAT documentation.

Using the API

The NiFi API is accessed at {OPENFLOW_BASE_URL}/nifi-api. For example:

curl -s -H "Authorization: Bearer $OPENFLOW_PAT" \
  "https://of--your-account-your-env.region.snowflakecomputing.app/your-prefix/nifi-api/flow/about"

Example Usage

Read-Only Operations (Default)

Once configured, you can ask Claude questions like:

  • "What version of NiFi am I running?"
  • "List all processors in the root process group"
  • "Show me recent bulletins"
  • "What parameter contexts are configured?"
  • "Tell me about the controller services"
  • "What processor types are available for building flows?"
  • "Search for processors containing 'kafka'"
  • "Show me the details of connection abc-123"

Write Operations (when NIFI_READONLY=false)

⚠️ WARNING: Write operations modify your NiFi flows. Use with caution!

To enable write operations, set NIFI_READONLY=false in your configuration. Then you can:

  • Build flows: "Create a LogAttribute processor named 'MyLogger' in the root process group"
  • Manage processors: "Start processor with ID abc-123", "Stop all processors in group xyz"
  • Connect components: "Create a connection from processor A to processor B for the 'success' relationship"
  • Configure: "Update the scheduling period of processor abc-123 to 30 seconds"
  • Control services: "Enable the DBCPConnectionPool controller service"

Examples:

"Create a GenerateFlowFile processor in process group abc-123"
"Connect processor source-123 to processor dest-456 for success relationship"
"Start processor xyz-789"
"Check the queue status for connection conn-456"
"Empty the queue for connection conn-456 before deletion"  (⚠️ deletes flowfiles permanently)
"Delete connection conn-456"

Important Notes:

  • Version Tracking: NiFi uses optimistic locking. Always fetch current versions before updates:
    processor = get_processor_details(processor_id)
    current_version = processor['revision']['version']
    stop_processor(processor_id, current_version)
    
  • Queue Management: Connections with flowfiles cannot be deleted. Use get_connection_details() to check queue status, then empty_connection_queue() if needed before deletion.

License

Apache License 2.0

Credits

This project is a fork of kevinbtalbert/NiFi-MCP-Server by Kevin Talbert.

Key differences from the original:

  • Authentication: Uses Snowflake Openflow PAT tokens instead of Knox authentication
  • Bulk Operations: Added efficient bulk APIs for starting/stopping process groups and enabling/disabling controller services
  • Runtime Configuration: Added ability to configure connection via chat instead of requiring environment variables
  • Package Manager: Migrated to uv for faster dependency management
  • Recursive Tools: Added recursive operations for listing processors, controller services, and validation errors across process group hierarchies