loading…
Search for a command to run...
loading…
An MCP server that optimizes Apache Spark code using Claude AI, providing intelligent code optimization suggestions and performance analysis.
An MCP server that optimizes Apache Spark code using Claude AI, providing intelligent code optimization suggestions and performance analysis.
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
graph TB
subgraph Input
A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py]
end
subgraph MCP Client
B --> |Async HTTP| C[SparkMCPClient]
C --> |Protocol Handler| D[Tools Interface]
end
subgraph MCP Server
E[run_server.py] --> F[SparkMCPServer]
F --> |Tool Registry| G[optimize_spark_code]
F --> |Tool Registry| H[analyze_performance]
F --> |Protocol Handler| I[Claude AI Integration]
end
subgraph Resources
I --> |Code Analysis| J[Claude AI Model]
J --> |Optimization| K[Optimized Code Generation]
K --> |Validation| L[PySpark Runtime]
end
subgraph Output
M[optimized_spark_code.py]
N[performance_analysis.md]
end
D --> |MCP Request| F
G --> |Generate| M
H --> |Generate| N
classDef client fill:#e1f5fe,stroke:#01579b
classDef server fill:#f3e5f5,stroke:#4a148c
classDef resource fill:#e8f5e9,stroke:#1b5e20
classDef output fill:#fff3e0,stroke:#e65100
class A,B,C,D client
class E,F,G,H,I server
class J,K,L resource
class M,N,O output
Input Layer
spark_code_input.py: Source PySpark code for optimizationrun_client.py: Client startup and configurationMCP Client Layer
MCP Server Layer
run_server.py: Server initializationResource Layer
Output Layer
optimized_spark_code.py: Optimized codeperformance_analysis.md: Detailed analysisThis workflow illustrates:
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ MCP Server │ │ Resources │
│ MCP Client │ │ (SparkMCPServer)│ │ │
│ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │
│ │ │ ┌─────────┐ │ │ │ Claude AI │ │
│ ┌─────────┐ │ │ │ Tools │ │ <──> │ │ Model │ │
│ │ Tools │ │ │ │Registry │ │ │ └──────────────┘ │
│ │Interface│ │ <──> │ └─────────┘ │ │ │
│ └─────────┘ │ │ ┌─────────┐ │ │ ┌──────────────┐ │
│ │ │ │Protocol │ │ │ │ PySpark │ │
│ │ │ │Handler │ │ │ │ Runtime │ │
│ │ │ └─────────┘ │ │ └──────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ │ │
v v v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Available │ │ Registered │ │ External │
│ Tools │ │ Tools │ │ Resources │
├──────────────┤ ├──────────────┤ ├──────────────┤
│optimize_code │ │optimize_code │ │ Claude API │
│analyze_perf │ │analyze_perf │ │ Spark Engine │
└──────────────┘ └──────────────┘ └──────────────┘
MCP Client
MCP Server
Resources
sequenceDiagram
participant U as User
participant C as MCP Client
participant S as MCP Server
participant AI as Claude AI
participant P as PySpark Runtime
U->>C: Submit Spark Code
C->>S: Send Optimization Request
S->>AI: Analyze Code
AI-->>S: Optimization Suggestions
S->>C: Return Optimized Code
C->>P: Run Original Code
C->>P: Run Optimized Code
P-->>C: Execution Results
C->>C: Generate Analysis
C-->>U: Final Report
Code Submission
v1/input/spark_code_input.pyOptimization Process
Code Generation
v1/output/optimized_spark_code.pyPerformance Analysis
Results Generation
v1/output/performance_analysis.mdpip install -r requirements.txt
Add your Spark code to optimize in input/spark_code_input.py
Start the MCP server:
python v1/run_server.py
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md: Comprehensive performance analysispython v1/run_optimized.py
This will:
ai-mcp/
├── input/
│ └── spark_code_input.py # Original Spark code to optimize
├── output/
│ ├── optimized_spark_example.py # Generated optimized code
│ └── performance_analysis.md # Detailed performance comparison
├── spark_mcp/
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── run_client.py # Client script to optimize code
├── run_server.py # Server startup script
└── run_optimized.py # Script to run and compare code versions
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
| Aspect | Direct Claude AI Call | MCP Server |
|---|---|---|
| Integration | • Custom integration per team • Manual response handling • Duplicate implementations |
• Pre-built client libraries • Automated workflows • Unified interfaces |
| Infrastructure | • No built-in validation • No result persistence • Manual tracking |
• Automatic validation • Result persistence • Version control |
| Context | • Basic code suggestions • No execution context • Limited optimization scope |
• Context-aware optimization • Full execution history • Comprehensive improvements |
| Validation | • Manual testing required • No performance metrics • Uncertain outcomes |
• Automated testing • Performance metrics • Validated results |
| Workflow | • Ad-hoc process • No standardization • Manual intervention needed |
• Structured process • Standard protocols • Automated pipeline |
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | client = anthropic.Client(api_key)response = client.messages.create(...) |
• Complex setup • Custom error handling • Tight coupling |
| MCP | client = SparkMCPClient()result = await client.optimize_spark_code(code) |
• Simple interface • Built-in validation • Loose coupling |
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | class SparkOptimizer:def register_tool(self, name, func):self.tools[name] = func |
• Manual registration • No validation • Complex maintenance |
| MCP | @register_tool("optimize_spark_code")async def optimize_spark_code(code: str): |
• Auto-discovery • Type checking • Easy extension |
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | def __init__(self):self.claude = init_claude()self.spark = init_spark() |
• Manual orchestration • Manual cleanup • Error-prone |
| MCP | @requires_resources(["claude_ai", "spark"])async def optimize_spark_code(code: str): |
• Auto-coordination • Lifecycle management • Error handling |
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | {"type": "request","payload": {"code": code}} |
• Custom format • Manual validation • Custom debugging |
| MCP | {"method": "tools/call","params": {"name": "optimize_code"}} |
• Standard format • Auto-validation • Easy debugging |
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient
async def main():
# Connect to the MCP server
client = SparkMCPClient()
await client.connect()
# Your Spark code to optimize
spark_code = '''
# Your PySpark code here
'''
# Get optimized code with performance analysis
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # Save to output/optimized_spark_example.py
)
# Analyze performance differences
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # Save to output/performance_analysis.md
)
# Run both versions and compare
# You can use the run_optimized.py script or implement your own comparison
await client.close()
# Analyze performance
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()
The repository includes an example workflow:
input/spark_code_input.py):# Create DataFrames and join
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# Join and analyze
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")
output/optimized_spark_example.py):# Performance-optimized version with caching and improved configurations
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Create and cache DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# Optimized join and analysis
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")
output/performance_analysis.md):## Execution Results Comparison
### Timing Comparison
- Original Code: 5.18 seconds
- Optimized Code: 0.65 seconds
- Performance Improvement: 87.4%
### Optimization Details
- Caching frequently used DataFrames
- Optimized shuffle partitions
- Improved column expressions
- Better memory management
ai-mcp/
├── spark_mcp/
│ ├── __init__.py
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── examples/
│ ├── optimize_code.py # Example usage
│ └── optimized_spark_example.py # Generated optimized code
├── requirements.txt
└── run_server.py # Server startup script
optimize_spark_code
analyze_performance
ANTHROPIC_API_KEY: Your Anthropic API key for Claude AIThe system implements various PySpark optimizations including:
Feel free to submit issues and enhancement requests!
MIT License
Run in your terminal:
claude mcp add spark-mcp-optimizer -- npx Security
Low riskAutomated heuristic from public metadata — not a security guarantee.