loading…
Search for a command to run...
loading…
An MCP server that enables interaction with Kafka clusters to manage topics, monitor consumer groups, and stream messages. It provides a comprehensive suite of
An MCP server that enables interaction with Kafka clusters to manage topics, monitor consumer groups, and stream messages. It provides a comprehensive suite of tools for broker metadata inspection and local Kafka user management.
Kafka MCP Server
Overview
FastMCP-based MCP server that connects to Kafka and exposes MCP tools for common broker, topic, consumer, and retention operations.
Features
Requirements
Quick start (uv)
Default MCP SSE endpoint: http://localhost:8000/mcp
Makefile shortcuts
Configuration
Server settings come from FastMCP ServerSettings. The defaults are:
FastMCP supports environment overrides such as:
Smoke tests and clients can use:
Kafka connection payload
Most tools require a Kafka connection object with the following fields:
MCP tool catalog
Each tool name below is the MCP command. Parameter shapes match the schemas in app/schemas.py.
health - Returns {"status":"ok"}.list_topics - Lists topics with partition and replication info.connectioncreate_topic - Creates a topic with optional configs.connection, payload (name, num_partitions, replication_factor, configs)delete_topic - Deletes a topic by name.connection, nametopic_configs - Fetches topic configuration values.connection, nametopic_retention - Returns retention.ms for a topic.connection, nametail_messages - Reads the most recent messages for a topic.connection, name, payload (limit)live_messages - Collects a short live stream of messages.connection, name, payload (max_messages, duration_seconds, poll_interval_ms)list_consumer_groups - Lists consumer groups with state and member count.connectionconsumer_group_lag - Computes lag per partition for a group.connection, group_idcluster_info - Returns broker and controller metadata.connectionlist_kafka_users - Lists locally stored user entries.upsert_kafka_user - Creates or updates a local user entry.user (username, sasl_mechanism, note)delete_kafka_user - Deletes a local user entry.usernameMCP command examples (Python)
from mcp import ClientSession
from mcp.client.sse import sse_client
MCP_URL = "http://localhost:8000/mcp"
connection = {
"bootstrap_servers": "localhost:9092",
"security_protocol": "PLAINTEXT",
"sasl_mechanism": None,
"sasl_username": None,
"sasl_password": None,
"ssl_cafile": None,
"ssl_certfile": None,
"ssl_keyfile": None,
"oauth_token": None,
}
async with sse_client(MCP_URL) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
await session.call_tool("list_topics", {"connection": connection})
await session.call_tool(
"create_topic",
{
"connection": connection,
"payload": {
"name": "example-topic",
"num_partitions": 1,
"replication_factor": 1,
"configs": {},
},
},
)
Local storage
Kafka user entries are stored at app/data/users.json.
Testing
Smoke test all MCP tools end-to-end:
LocalAI (optional)
LocalAI is not required for Kafka MCP usage, but this repo includes helper targets for running models locally:
Добавь это в claude_desktop_config.json и перезапусти Claude Desktop.
{
"mcpServers": {
"kafka-mcp-server": {
"command": "npx",
"args": []
}
}
}PRs, issues, code search, CI status
Database, auth and storage
Reference / test server with prompts, resources, and tools.
Secure file operations with configurable access controls.