loading…
Search for a command to run...
loading…
MCP server for Apache Kafka that allows LLM agents to inspect topics, consumer groups, and safely manage offsets (reset, rewind).
MCP server for Apache Kafka that allows LLM agents to inspect topics, consumer groups, and safely manage offsets (reset, rewind).
An MCP server implementation for Kafka, allowing LLMs to interact with and manage Kafka clusters.
describe_cluster, describe_brokers.list_topics, create create_topic, delete delete_topic, describe describe_topic, and increase partitions create_partitions.describe_configs and modify alter_configs dynamic configs for topics, brokers, and groups.list_consumer_groups, describe describe_consumer_group, and securely manage offsets with reset_consumer_group_offset and rewind_consumer_group_offset_by_timestamp. Advanced tools include state validation, dry runs, and execution audit logging.consume_messages (from beginning, latest, or specific offsets) and produce messages produce_message.uv package manager (recommended)uv sync
The server requires the KAFKA_BOOTSTRAP_SERVERS environment variable.
KAFKA_BOOTSTRAP_SERVERS: Comma-separated list of broker urls (e.g., localhost:9092).KAFKA_CLIENT_ID: (Optional) Client ID for connection (default: kafka-mcp).You can run the server directly using uv or python, or use Docker.
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
uv run kafka-mcp
Build the Docker image:
docker build -t kafka-mcp .
Run the container:
docker run -i --rm -e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 kafka-mcp
(Note: Use host.docker.internal instead of localhost if your Kafka cluster is running on the host machine.)
Add the following to your Claude Desktop configuration file (claude_desktop_config.json):
{
"mcpServers": {
"kafka": {
"command": "<uv PATH>",
"args": [
"--directory",
"<kafka-mcp PATH>",
"run",
"kafka-mcp"
],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
}
}
}
}
To verify that the server can start and connect to your Kafka cluster (ensure your Kafka is running first):
# Set your bootstrap server
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# Run a quick check
uv run python -c "from src.kafka_mcp import main; print('Imports successful')"
| Category | Tool Name | Description |
|---|---|---|
| Cluster | describe_cluster |
Get cluster metadata (controller, brokers). |
describe_brokers |
List all brokers. | |
| Topics | list_topics |
List all available topics. |
describe_topic |
Get detailed info (partitions, replicas) for a topic. | |
create_topic |
Create a new topic with partitions/replication factor. | |
delete_topic |
Delete a topic. | |
create_partitions |
Increase partitions for a topic. | |
| Configs | describe_configs |
View dynamic configs for topic/broker/group. |
alter_configs |
Update dynamic configs. | |
| Consumers | list_consumer_groups |
List all active consumer groups. |
describe_consumer_group |
Get members and state of a group. | |
get_consumer_group_offsets |
Get committed offset, high/low watermarks, and calculate total lag for a topic. | |
reset_consumer_group_offset |
Safely change consumer group offsets to earliest, latest, or a specific offset. | |
rewind_consumer_group_offset_by_timestamp |
Rewind/advance consumer group offsets securely based on a timestamp. | |
| Messages | consume_messages |
Consume messages from a topic (supports offsets, limits). |
produce_message |
Send a message to a topic. |
src/kafka_mcp/
├── configs/ # Configuration handling
├── connections/ # Kafka client factories (singleton)
├── tools/ # Tool implementations
│ ├── admin.py # Topic & Config management
│ ├── cluster.py # Cluster metadata
│ ├── consumer.py # Consumer group & message consumption
│ └── producer.py # Message production
└── main.py # Entry point & MCP tool registration
KAFKA_BOOTSTRAP_SERVERS is correct and reachable.Добавь это в claude_desktop_config.json и перезапусти Claude Desktop.
{
"mcpServers": {
"wklee610-kafka-mcp": {
"command": "npx",
"args": []
}
}
}