ai-spark-mcp-server
If you are the rightful owner of ai-spark-mcp-server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to henry@mcphub.com.
Spark MCP Optimizer is a project that implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code.
Spark MCP (Model Context Protocol) Optimizer
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
How It Works
Code Optimization Workflow
graph TB
subgraph Input
A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py]
end
subgraph MCP Client
B --> |Async HTTP| C[SparkMCPClient]
C --> |Protocol Handler| D[Tools Interface]
end
subgraph MCP Server
E[run_server.py] --> F[SparkMCPServer]
F --> |Tool Registry| G[optimize_spark_code]
F --> |Tool Registry| H[analyze_performance]
F --> |Protocol Handler| I[Claude AI Integration]
end
subgraph Resources
I --> |Code Analysis| J[Claude AI Model]
J --> |Optimization| K[Optimized Code Generation]
K --> |Validation| L[PySpark Runtime]
end
subgraph Output
M[optimized_spark_code.py]
N[performance_analysis.md]
end
D --> |MCP Request| F
G --> |Generate| M
H --> |Generate| N
classDef client fill:#e1f5fe,stroke:#01579b
classDef server fill:#f3e5f5,stroke:#4a148c
classDef resource fill:#e8f5e9,stroke:#1b5e20
classDef output fill:#fff3e0,stroke:#e65100
class A,B,C,D client
class E,F,G,H,I server
class J,K,L resource
class M,N,O output
Component Details
-
Input Layer
spark_code_input.py
: Source PySpark code for optimizationrun_client.py
: Client startup and configuration
-
MCP Client Layer
- Tools Interface: Protocol-compliant tool invocation
-
MCP Server Layer
run_server.py
: Server initialization- Tool Registry: Optimization and analysis tools
- Protocol Handler: MCP request/response management
-
Resource Layer
- Claude AI: Code analysis and optimization
- PySpark Runtime: Code execution and validation
-
Output Layer
optimized_spark_code.py
: Optimized codeperformance_analysis.md
: Detailed analysis
This workflow illustrates:
- Input PySpark code submission
- MCP protocol handling and routing
- Claude AI analysis and optimization
- Code transformation and validation
- Performance analysis and reporting
Architecture
This project follows the Model Context Protocol architecture for standardized AI model interactions:
ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
β β β MCP Server β β Resources β
β MCP Client β β (SparkMCPServer)β β β
β (SparkMCPClient) β β β β ββββββββββββββββ β
β β β βββββββββββ β β β Claude AI β β
β βββββββββββ β β β Tools β β <ββ> β β Model β β
β β Tools β β β βRegistry β β β ββββββββββββββββ β
β βInterfaceβ β <ββ> β βββββββββββ β β β
β βββββββββββ β β βββββββββββ β β ββββββββββββββββ β
β β β βProtocol β β β β PySpark β β
β β β βHandler β β β β Runtime β β
β β β βββββββββββ β β ββββββββββββββββ β
ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
β β β
β β β
v v v
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Available β β Registered β β External β
β Tools β β Tools β β Resources β
ββββββββββββββββ€ ββββββββββββββββ€ ββββββββββββββββ€
βoptimize_code β βoptimize_code β β Claude API β
βanalyze_perf β βanalyze_perf β β Spark Engine β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
Components
-
MCP Client
- Provides tool interface for code optimization
- Handles async communication with server
- Manages file I/O for code generation
-
MCP Server
- Implements MCP protocol handlers
- Manages tool registry and execution
- Coordinates between client and resources
-
Resources
- Claude AI: Provides code optimization intelligence
- PySpark Runtime: Executes and validates optimizations
Protocol Flow
- Client sends optimization request via MCP protocol
- Server validates request and invokes appropriate tool
- Tool utilizes Claude AI for optimization
- Optimized code is returned via MCP response
- Client saves and validates the optimized code
End-to-End Functionality
sequenceDiagram
participant U as User
participant C as MCP Client
participant S as MCP Server
participant AI as Claude AI
participant P as PySpark Runtime
U->>C: Submit Spark Code
C->>S: Send Optimization Request
S->>AI: Analyze Code
AI-->>S: Optimization Suggestions
S->>C: Return Optimized Code
C->>P: Run Original Code
C->>P: Run Optimized Code
P-->>C: Execution Results
C->>C: Generate Analysis
C-->>U: Final Report
-
Code Submission
- User places PySpark code in
v1/input/spark_code_input.py
- Code is read by the MCP client
- User places PySpark code in
-
Optimization Process
- MCP client connects to server via standardized protocol
- Server forwards code to Claude AI for analysis
- AI suggests optimizations based on best practices
- Server validates and processes suggestions
-
Code Generation
- Optimized code saved to
v1/output/optimized_spark_code.py
- Includes detailed comments explaining optimizations
- Maintains original code structure while improving performance
- Optimized code saved to
-
Performance Analysis
- Both versions executed in PySpark runtime
- Execution times compared
- Results validated for correctness
- Metrics collected and analyzed
-
Results Generation
- Comprehensive analysis in
v1/output/performance_analysis.md
- Side-by-side execution comparison
- Performance improvement statistics
- Optimization explanations and rationale
- Comprehensive analysis in
Usage
Requirements
- Python 3.8+
- PySpark 3.2.0+
- Anthropic API Key (for Claude AI)
Installation
pip install -r requirements.txt
Quick Start
-
Add your Spark code to optimize in
input/spark_code_input.py
-
Start the MCP server:
python v1/run_server.py
- Run the client to optimize your code:
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py
: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md
: Comprehensive performance analysis
- Run and compare code versions:
python v1/run_optimized.py
This will:
- Execute both original and optimized code
- Compare execution times and results
- Update the performance analysis with execution metrics
- Show detailed performance improvement statistics
Project Structure
ai-mcp/
βββ input/
β βββ spark_code_input.py # Original Spark code to optimize
βββ output/
β βββ optimized_spark_example.py # Generated optimized code
β βββ performance_analysis.md # Detailed performance comparison
βββ spark_mcp/
β βββ client.py # MCP client implementation
β βββ server.py # MCP server implementation
βββ run_client.py # Client script to optimize code
βββ run_server.py # Server startup script
βββ run_optimized.py # Script to run and compare code versions
Why MCP?
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
Direct Claude AI Call vs MCP Server
Aspect | Direct Claude AI Call | MCP Server |
---|---|---|
Integration | β’ Custom integration per team β’ Manual response handling β’ Duplicate implementations | β’ Pre-built client libraries β’ Automated workflows β’ Unified interfaces |
Infrastructure | β’ No built-in validation β’ No result persistence β’ Manual tracking | β’ Automatic validation β’ Result persistence β’ Version control |
Context | β’ Basic code suggestions β’ No execution context β’ Limited optimization scope | β’ Context-aware optimization β’ Full execution history β’ Comprehensive improvements |
Validation | β’ Manual testing required β’ No performance metrics β’ Uncertain outcomes | β’ Automated testing β’ Performance metrics β’ Validated results |
Workflow | β’ Ad-hoc process β’ No standardization β’ Manual intervention needed | β’ Structured process β’ Standard protocols β’ Automated pipeline |
Key Differences:
1. AI Integration
Approach | Code Example | Benefits |
---|---|---|
Traditional | client = anthropic.Client(api_key) response = client.messages.create(...) | β’ Complex setup β’ Custom error handling β’ Tight coupling |
MCP | client = SparkMCPClient() result = await client.optimize_spark_code(code) | β’ Simple interface β’ Built-in validation β’ Loose coupling |
2. Tool Management
Approach | Code Example | Benefits |
---|---|---|
Traditional | class SparkOptimizer: Β Β def register_tool(self, name, func): Β Β Β Β self.tools[name] = func | β’ Manual registration β’ No validation β’ Complex maintenance |
MCP | @register_tool("optimize_spark_code") async def optimize_spark_code(code: str): | β’ Auto-discovery β’ Type checking β’ Easy extension |
3. Resource Management
Approach | Code Example | Benefits |
---|---|---|
Traditional | def __init__(self): Β Β self.claude = init_claude() Β Β self.spark = init_spark() | β’ Manual orchestration β’ Manual cleanup β’ Error-prone |
MCP | @requires_resources(["claude_ai", "spark"]) async def optimize_spark_code(code: str): | β’ Auto-coordination β’ Lifecycle management β’ Error handling |
4. Communication Protocol
Approach | Code Example | Benefits |
---|---|---|
Traditional | {"type": "request", Β "payload": {"code": code}} | β’ Custom format β’ Manual validation β’ Custom debugging |
MCP | {"method": "tools/call", Β "params": {"name": "optimize_code"}} | β’ Standard format β’ Auto-validation β’ Easy debugging |
Features
- Intelligent Code Optimization: Leverages Claude AI to analyze and optimize PySpark code
- Performance Analysis: Provides detailed analysis of performance differences between original and optimized code
- MCP Architecture: Implements the Model Context Protocol for standardized AI model interactions
- Easy Integration: Simple client interface for code optimization requests
- Code Generation: Automatically saves optimized code to separate files
Advanced Usage
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient
async def main():
# Connect to the MCP server
client = SparkMCPClient()
await client.connect()
# Your Spark code to optimize
spark_code = '''
# Your PySpark code here
'''
# Get optimized code with performance analysis
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # Save to output/optimized_spark_example.py
)
# Analyze performance differences
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # Save to output/performance_analysis.md
)
# Run both versions and compare
# You can use the run_optimized.py script or implement your own comparison
await client.close()
# Analyze performance
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()
Example Input and Output
The repository includes an example workflow:
- Input Code (
input/spark_code_input.py
):
# Create DataFrames and join
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# Join and analyze
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")
- Optimized Code (
output/optimized_spark_example.py
):
# Performance-optimized version with caching and improved configurations
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Create and cache DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# Optimized join and analysis
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")
- Performance Analysis (
output/performance_analysis.md
):
## Execution Results Comparison
### Timing Comparison
- Original Code: 5.18 seconds
- Optimized Code: 0.65 seconds
- Performance Improvement: 87.4%
### Optimization Details
- Caching frequently used DataFrames
- Optimized shuffle partitions
- Improved column expressions
- Better memory management
Project Structure
ai-mcp/
βββ spark_mcp/
β βββ __init__.py
β βββ client.py # MCP client implementation
β βββ server.py # MCP server implementation
βββ examples/
β βββ optimize_code.py # Example usage
β βββ optimized_spark_example.py # Generated optimized code
βββ requirements.txt
βββ run_server.py # Server startup script
Available Tools
-
optimize_spark_code
- Optimizes PySpark code for better performance
- Supports basic and advanced optimization levels
- Automatically saves optimized code to examples/optimized_spark_example.py
-
analyze_performance
- Analyzes performance differences between original and optimized code
- Provides insights on:
- Performance improvements
- Resource utilization
- Scalability considerations
- Potential trade-offs
Environment Variables
ANTHROPIC_API_KEY
: Your Anthropic API key for Claude AI
Example Optimizations
The system implements various PySpark optimizations including:
- Broadcast joins for small-large table joins
- Efficient window function usage
- Strategic data caching
- Query plan optimizations
- Performance-oriented operation ordering
Contributing
Feel free to submit issues and enhancement requests!
License
MIT License