mcp-clickhouse

gemiit/mcp-clickhouse

3.2

If you are the rightful owner of mcp-clickhouse and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to henry@mcphub.com.

MCP-ClickHouse Server is a minimal yet powerful Model Context Protocol server that provides a structured interface to interact with a single-instance ClickHouse database.

Tools
  1. query

    Execute arbitrary SQL and stream results.

  2. insert

    Bulk-insert rows in structured format.

  3. create_database

    Create a new database.

  4. create_table

    Create a table with specified engine, columns, etc.

  5. server_info

    Return ClickHouse version, uptime, databases, etc.

MCP-ClickHouse Server

๐Ÿ“–

A minimal yet powerful Model Context Protocol (MCP) server that exposes single-instance ClickHouse as a set of structured tools and resources.
It lets LLM agents, chat-ops bots, or any MCP-compatible client query, write and explore ClickHouse using a simple JSON-RPC interface โ€“ no driver, DSN or SQL parsing required.


Feature Highlights

  • Arbitrary SQL queries โ€“ results in JSON / CSV / Pretty formats
  • High-throughput batch INSERT for structured data
  • Instant schema & data discovery via schema:// / data:// resources
  • ๐Ÿค– AI Assistant Prompts โ€“ 5 professional ClickHouse consulting assistants covering table analysis, query optimization, schema design, performance troubleshooting, and migration planning
  • Built-in Prometheus /metrics endpoint
  • Optional OpenTelemetry tracing (OTLP)
  • Fully async & production-ready (FastAPI + clickhouse-driver)
  • Container & K8s manifests provided โ€“ but equally happy to run via stdio for local AI assistants
  • Zero external dependencies beyond ClickHouse itself

Available Tools

Tool nameCategoryDescription
querySQLExecute arbitrary SQL and stream results
insertSQLBulk-insert rows ([{column: value, โ€ฆ}, โ€ฆ])
create_databaseSchemaCreate a new database
create_tableSchemaCreate a table (engine, columns, etc. as JSON)
server_infoAdminReturn ClickHouse version, uptime, databases, etc.

Resources resource://schema โ€“ browse DBs / tables / columns, incl. DDL resource://data โ€“ sample rows, counts, custom readonly SQL via query string

AI Assistant Prompts

Prompt nameFunctionDescription
analyze_tableTable AnalysisAnalyze ClickHouse table structure, data distribution and suggest optimizations
optimize_queryQuery OptimizationAnalyze and suggest optimizations for ClickHouse SQL queries
design_schemaSchema DesignDesign optimal ClickHouse database schema for specific use cases
troubleshoot_performancePerformance TroubleshootingDiagnose and resolve ClickHouse performance issues
plan_migrationMigration PlanningGenerate detailed migration plans from specified source systems to ClickHouse

Each prompt is available in both Chinese and English versions, providing professional ClickHouse consulting advice


Quick Start

1 ยท Docker

docker run --rm -p 8000:8000 \
  -e CH_HOST=host.docker.internal \
  mcp/clickhouse run -t sse

2 ยท Poetry (local)

poetry install
poetry run mcp-clickhouse run -t sse

Configuration

All options are simple environment variables (can live in .env).

This project directly uses FastMCP's native configuration system.
Uses FASTMCP_ prefixed environment variables to configure FastMCP server directly, without additional wrapper layers.
This design is simpler and more direct, reducing configuration complexity.

FastMCP server configuration example:

# FastMCP Server Basic Settings
FASTMCP_HOST=0.0.0.0
FASTMCP_PORT=3000
FASTMCP_DEBUG=false
FASTMCP_LOG_LEVEL=INFO  # DEBUG, INFO, WARNING, ERROR, CRITICAL

# FastMCP Path Configuration
FASTMCP_MOUNT_PATH=/mcp
FASTMCP_SSE_PATH=/sse
FASTMCP_MESSAGE_PATH=/messages/
FASTMCP_STREAMABLE_HTTP_PATH=/mcp

# FastMCP Transport Settings
FASTMCP_STATELESS_HTTP=false
FASTMCP_JSON_RESPONSE=false

# FastMCP Resource Management
FASTMCP_WARN_ON_DUPLICATE_RESOURCES=true
FASTMCP_WARN_ON_DUPLICATE_TOOLS=true
FASTMCP_WARN_ON_DUPLICATE_PROMPTS=true
VariableDefaultDescription
MCP
FASTMCP_HOST0.0.0.0Bind address
FASTMCP_PORT3000Port
FASTMCP_TRANSPORTstreamable-httpstreamable-http, sse, stdio
ClickHouse
CH_HOSTlocalhostHost
CH_PORT9000Port
CH_USER / CH_PASSWORDdefault / emptyCredentials
CH_DATABASEdefaultDefault DB
CH_SECUREfalseEnable TLS
Observability
LOG_LEVELINFODEBUG, INFO, โ€ฆ
METRICS_ENABLEDtrueExpose /metrics
TRACING_ENABLEDfalseEnable OTLP export
OTLP_ENDPOINTCollector URL
Misc
TEMP_DIR/tmp/mcp-clickhouseTemp files
MAX_UPLOAD_SIZE104857600Upload limit (bytes)

Usage & Integration

STDIO (ideal for local AI assistants)

poetry run mcp-clickhouse run -t stdio

SSE or Streamable-HTTP server

poetry run mcp-clickhouse run -t sse
# or
poetry run mcp-clickhouse run -t streamable-http

Claude Desktop snippet

{
  "mcpServers": {
    "clickhouse": {
      "command": "mcp-clickhouse",
      "args": ["-t", "stdio"],
      "env": {
        "CH_HOST": "localhost",
        "CH_PORT": "9000"
      }
    }
  }
}

(Replace command with docker run --rm -i mcp/clickhouse run -t stdio if you prefer Docker.)

Claude Desktop (SSE)

If you prefer to run the server in SSE mode (HTTP long-poll, supports multiple concurrent clients):

{
  "mcpServers": {
    "clickhouse": {
      "type": "sse",
      "url": "http://localhost:8000/sse"
    }
  }
}

First start the server separately:

# Local (Poetry)
poetry run mcp-clickhouse run -t sse

# Or via Docker
docker run --rm -p 8000:8000 \
  -e CH_HOST=host.docker.internal \
  mcp/clickhouse run -t sse

AI Assistant Usage Examples

Usage Examples

This section provides comprehensive usage examples, from basic operations to AI assistant features, helping you quickly get started and fully utilize all MCP-ClickHouse capabilities.

Basic Operations

Quick Query - System Information and Basic Operations
from mcp.client.quick import call_tool

# Get ClickHouse server information
server_info = call_tool(
    "http://localhost:3000/mcp",
    "server_info",
    {}
)
print("=== Server Information ===")
print(f"Version: {server_info.get('version')}")
print(f"Uptime: {server_info.get('uptime')}")

# Execute basic queries
current_status = call_tool(
    "http://localhost:3000/mcp",
    "query",
    {
        "sql": "SELECT now() as current_time, version() as version, uptime() as uptime",
        "format": "JSONEachRow"
    }
)
print("\n=== Current Status ===")
for row in current_status:
    print(f"Time: {row[0]}")
    print(f"Version: {row[1]}")
    print(f"Uptime: {row[2]} seconds")

# List available databases
databases = call_tool(
    "http://localhost:3000/mcp",
    "query",
    {"sql": "SHOW DATABASES"}
)
print(f"\n=== Available Databases ({len(databases)} total) ===")
for db in databases:
    print(f"- {db[0]}")

Advanced Operations

Complete Data Workflow - Create, Insert, Query, Analyze
import asyncio
import random
import datetime as dt
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def complete_data_workflow():
    """Demonstrate complete data operations workflow"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            # 1. Create database
            print("=== 1. Creating Test Database ===")
            await session.call_tool("create_database", {
                "database": "demo_analytics"
            })
            
            # 2. Create table
            print("=== 2. Creating User Events Table ===")
            await session.call_tool("create_table", {
                "database": "demo_analytics",
                "table": "user_events",
                "engine": "MergeTree",
                "columns": [
                    {"name": "event_time", "type": "DateTime"},
                    {"name": "user_id", "type": "UInt32"},
                    {"name": "event_type", "type": "String"},
                    {"name": "page_url", "type": "String"},
                    {"name": "session_duration", "type": "UInt32"}
                ],
                "order_by": ["event_time", "user_id"],
                "partition_by": "toYYYYMM(event_time)"
            })
            
            # 3. Generate and insert test data
            print("=== 3. Inserting Test Data ===")
            events = []
            event_types = ['login', 'view', 'click', 'purchase', 'logout']
            pages = ['/home', '/product', '/cart', '/checkout', '/profile']
            
            for i in range(1000):
                events.append({
                    "event_time": (dt.datetime.now() - dt.timedelta(
                        minutes=random.randint(0, 1440)
                    )).strftime('%Y-%m-%d %H:%M:%S'),
                    "user_id": random.randint(1, 100),
                    "event_type": random.choice(event_types),
                    "page_url": random.choice(pages),
                    "session_duration": random.randint(10, 3600)
                })
            
            await session.call_tool("insert", {
                "database": "demo_analytics",
                "table": "user_events",
                "rows": events
            })
            print(f"Successfully inserted {len(events)} records")
            
            # 4. Data verification and basic analysis
            print("=== 4. Data Verification and Analysis ===")
            
            # Total record count
            total_count = await session.call_tool("query", {
                "sql": "SELECT count() FROM demo_analytics.user_events"
            })
            print(f"Total records: {total_count[0][0]:,}")
            
            # User activity analysis
            user_activity = await session.call_tool("query", {
                "sql": """
                SELECT
                    event_type,
                    count() as event_count,
                    uniq(user_id) as unique_users,
                    avg(session_duration) as avg_duration
                FROM demo_analytics.user_events
                GROUP BY event_type
                ORDER BY event_count DESC
                """
            })
            
            print("\nUser Activity Analysis:")
            print("Event Type\t\tEvents\t\tUnique Users\tAvg Duration")
            print("-" * 65)
            for row in user_activity:
                print(f"{row[0]:<12}\t{row[1]:<8}\t{row[2]:<12}\t{row[3]:.1f}s")
            
            return session, "demo_analytics", "user_events"

# Run basic workflow
session, database, table = asyncio.run(complete_data_workflow())
Resource Discovery and System Monitoring
async def advanced_features_demo():
    """Demonstrate advanced features: resource discovery, monitoring and optimization"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            # 1. Use resource discovery features
            print("=== 1. Database Schema Discovery ===")
            
            # Get schema information for all databases
            schema_info = await session.get_resource("resource://schema")
            print("Available database schemas:")
            print(schema_info.get("content", ""))
            # Get table sample data (with time filter)
            sample = await session.get_resource(
                "resource://data/demo_analytics/user_events/sample"
                "?start_time=2024-07-01T00:00:00"
                "&end_time=2024-07-01T23:59:59"
                "&time_column=event_time"
                "&limit=5"
            )
            print("Table sample data (with time range):")
            print(sample)
            
            # 2. System performance monitoring
            print("\n=== 2. System Performance Monitoring ===")
            
            # Query current system load
            system_metrics = await session.call_tool("query", {
                "sql": """
                SELECT
                    'CPU Cores' as metric,
                    toString(value) as value
                FROM system.asynchronous_metrics
                WHERE metric = 'jemalloc.background_thread.num_threads'
                UNION ALL
                SELECT
                    'Memory Usage',
                    formatReadableSize(value)
                FROM system.asynchronous_metrics
                WHERE metric = 'MemoryTracking'
                """
            })
            
            print("System resource status:")
            for metric in system_metrics:
                print(f"- {metric[0]}: {metric[1]}")

asyncio.run(advanced_features_demo())

AI Assistant Features

Based on the above basic operations, you can use AI assistants to get professional ClickHouse consulting advice:

Table Analysis - Deep Structure and Performance Analysis
import asyncio
import time
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def analyze_table_performance():
    """Analyze table structure, data distribution and performance bottlenecks"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            # Get professional table analysis recommendations
            result = await session.call_tool("prompts/get", {
                "name": "analyze_table",
                "arguments": {
                    "database": "ecommerce",
                    "table": "order_events",
                    "sample_size": "10000"  # Analyze 10k sample records
                }
            })
            
            print("=== Table Analysis Report ===")
            print(result[0]["content"]["text"])
            
            # Verify recommendations with actual queries
            stats = await session.call_tool("query", {
                "sql": "SELECT count(), formatReadableSize(sum(data_compressed_bytes)) FROM system.parts WHERE database='ecommerce' AND table='order_events'"
            })
            print(f"Current table stats: {stats}")

# Run analysis
asyncio.run(analyze_table_performance())

Query Optimization - Intelligent SQL Performance Tuning

async def optimize_slow_query():
    """Get query optimization suggestions and verify improvements"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            slow_query = """
            SELECT
                user_id,
                COUNT(*) as event_count,
                AVG(session_duration) as avg_duration
            FROM user_events
            WHERE event_date >= '2024-01-01'
                AND event_type IN ('login', 'purchase', 'view')
            GROUP BY user_id
            HAVING COUNT(*) > 10
            ORDER BY event_count DESC
            LIMIT 100
            """
            
            # Get optimization recommendations
            optimization = await session.call_tool("prompts/get", {
                "name": "optimize_query",
                "arguments": {
                    "query": slow_query,
                    "context": "User behavior analytics report, executed hourly, currently takes over 30 seconds"
                }
            })
            
            print("=== Query Optimization Recommendations ===")
            print(optimization[0]["content"]["text"])
            
            # Test original query performance
            print("\n=== Original Query Performance ===")
            import time
            start_time = time.time()
            result = await session.call_tool("query", {"sql": slow_query})
            execution_time = time.time() - start_time
            print(f"Execution time: {execution_time:.2f}s, Result rows: {len(result)}")

asyncio.run(optimize_slow_query())

Schema Design - Optimal Architecture for Business Scenarios

async def design_analytics_schema():
    """Design database schema for specific business scenarios"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            # Get schema design recommendations
            schema_design = await session.call_tool("prompts/get", {
                "name": "design_schema",
                "arguments": {
                    "use_case": "E-commerce platform real-time analytics system supporting user behavior tracking, order analysis, product recommendations, and live dashboards",
                    "data_volume": "1M daily active users, 50M events per day, ~200GB monthly growth",
                    "query_patterns": "Real-time user profiling, product sales trends, funnel analysis, live monitoring dashboard aggregations"
                }
            })
            
            print("=== Database Schema Design Plan ===")
            print(schema_design[0]["content"]["text"])
            
            # Create example tables based on recommendations
            print("\n=== Creating Example Table Structure ===")
            # Execute recommended CREATE TABLE statements here

asyncio.run(design_analytics_schema())

Performance Troubleshooting - System Performance Issue Diagnosis

async def diagnose_performance_issues():
    """Diagnose and resolve ClickHouse performance issues"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            # Collect system information first
            system_info = await session.call_tool("query", {
                "sql": """
                SELECT
                    'CPU Usage' as metric,
                    toString(round(avg(ProfileEvent_OSCPUVirtualTimeMicroseconds)/1000000, 2)) as value
                FROM system.metric_log
                WHERE event_time >= now() - INTERVAL 1 HOUR
                UNION ALL
                SELECT
                    'Memory Usage',
                    formatReadableSize(max(CurrentMetric_MemoryTracking))
                FROM system.metric_log
                WHERE event_time >= now() - INTERVAL 1 HOUR
                """
            })
            
            # Get performance diagnosis recommendations
            troubleshooting = await session.call_tool("prompts/get", {
                "name": "troubleshoot_performance",
                "arguments": {
                    "issue_description": "Large table aggregation queries increased from 5s to 60s response time, memory usage continuously rising, occasional query timeouts",
                    "slow_query": """
                    SELECT
                        toYYYYMM(order_date) as month,
                        category,
                        sum(amount) as total_sales,
                        count() as order_count
                    FROM orders
                    WHERE order_date >= '2023-01-01'
                    GROUP BY month, category
                    ORDER BY month DESC, total_sales DESC
                    """
                }
            })
            
            print("=== Current System Status ===")
            for row in system_info:
                print(f"{row[0]}: {row[1]}")
                
            print("\n=== Performance Issue Diagnosis Report ===")
            print(troubleshooting[0]["content"]["text"])

asyncio.run(diagnose_performance_issues())

Migration Planning - Complete Database Migration Workflow

async def plan_database_migration():
    """Create migration plan from other databases to ClickHouse"""
    async with streamablehttp_client("http://localhost:3000/mcp") as (r, w, _):
        async with ClientSession(r, w) as session:
            await session.initialize()
            
            # Get migration planning recommendations
            migration_plan = await session.call_tool("prompts/get", {
                "name": "plan_migration",
                "arguments": {
                    "source_system": "MySQL 8.0 cluster with order system (50GB), user system (20GB), logging system (500GB)",
                    "data_size": "Total 570GB historical data, 2GB daily growth, peak 5000 QPS",
                    "requirements": "Zero downtime migration, maintain strong data consistency, 10x query performance improvement, real-time analytics support"
                }
            })
            
            print("=== Database Migration Plan ===")
            print(migration_plan[0]["content"]["text"])
            
            # Verify source system connectivity and target system readiness
            print("\n=== Pre-migration Environment Check ===")
            ch_version = await session.call_tool("query", {
                "sql": "SELECT version()"
            })
            print(f"Target ClickHouse version: {ch_version[0][0]}")
            
            # Check available storage space
            storage_info = await session.call_tool("query", {
                "sql": """
                SELECT
                    formatReadableSize(free_space) as free_space,
                    formatReadableSize(total_space) as total_space
                FROM system.disks
                WHERE name = 'default'
                """
            })
            print(f"Available storage: {storage_info[0][0]} / {storage_info[0][1]}")
# Run complete demonstration
asyncio.run(complete_data_workflow())

Development Guide

# create venv & install deps
poetry install

# run lint & type-check
ruff format .
ruff check . --fix

# run tests
poetry run pytest -v

Releasing

docker build -t mcp/clickhouse .
docker run --rm -p 3000:3000 mcp/clickhouse run

Contributing

Bugs, features or docs improvements welcome!
Open an issue or submit a PR โ€“ make sure pytest, black, isort, mypy pass.


License

Apache 2.0 ยฉ 2025 San Francisco AI Factory