loading…
Search for a command to run...
loading…
This MCP server provides AI assistants with full control over Celery task queues by exposing all Celery Flower REST API endpoints as tools. It enables monitorin
This MCP server provides AI assistants with full control over Celery task queues by exposing all Celery Flower REST API endpoints as tools. It enables monitoring workers, managing tasks, inspecting queues, and controlling worker pools in real-time through natural language.
CI codecov PyPI Python 3.14+ MCP Ruff uv License: MIT
Give your AI assistant full control over Celery — monitor workers, manage tasks, inspect queues.
Features · Quick Start · Configuration · Tools · Development · Contributing
celery-flower-mcp is a Model Context Protocol server that exposes the full Celery Flower REST API as MCP tools. Point it at your Flower instance and your AI assistant (Claude, Cursor, Windsurf, etc.) can:
All 21 Flower API endpoints are covered.
.env file supporthttpx + FastMCPFLOWER_URL=http://localhost:5555 uvx celery-flower-mcp
git clone https://github.com/Darius1223/celery-flower-mcp
cd celery-flower-mcp
uv sync
uv run python -m source.main
Add to ~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"celery-flower": {
"command": "uvx",
"args": ["celery-flower-mcp"],
"env": {
"FLOWER_URL": "http://localhost:5555"
}
}
}
}
Configuration is read from environment variables or a .env file in the project root. Copy .env.example to get started:
cp .env.example .env
| Variable | Default | Description |
|---|---|---|
FLOWER_URL |
http://localhost:5555 |
Base URL of your Flower instance |
FLOWER_USERNAME |
— | Basic auth username |
FLOWER_PASSWORD |
— | Basic auth password |
FLOWER_API_TOKEN |
— | Bearer token (takes priority over basic auth) |
| Tool | Description |
|---|---|
list_workers |
List all workers — optionally filter by name, refresh live stats, or get status only |
shutdown_worker |
Gracefully shut down a worker |
restart_worker_pool |
Restart a worker's process pool |
grow_worker_pool |
Add N processes to a worker's pool |
shrink_worker_pool |
Remove N processes from a worker's pool |
autoscale_worker_pool |
Configure autoscale min/max bounds |
add_queue_consumer |
Make a worker start consuming from a queue |
cancel_queue_consumer |
Make a worker stop consuming from a queue |
| Tool | Description |
|---|---|
list_tasks |
List tasks with filters: state, worker, name, date range, search, pagination |
list_task_types |
List all registered task types across workers |
get_task_info |
Get full details for a task by UUID |
get_task_result |
Retrieve a task's result (with optional timeout) |
apply_task |
Execute a task synchronously and wait for the result |
async_apply_task |
Dispatch a task asynchronously, returns task UUID |
send_task |
Send a task by name — no registration required on worker side |
abort_task |
Abort a running task |
revoke_task |
Revoke a task; optionally terminate with a signal |
set_task_timeout |
Set soft and/or hard time limits for a task on a worker |
set_task_rate_limit |
Set rate limit for a task on a worker (e.g. 100/m) |
| Tool | Description |
|---|---|
get_queue_lengths |
Get the current depth of all configured queues |
healthcheck |
Check whether the Flower instance is reachable and healthy |
source/
├── main.py # FastMCP server entry point + dishka container wiring
├── settings.py # Pydantic Settings — typed config from env / .env
├── client.py # Async HTTP client wrapping Flower REST API
├── providers.py # dishka Provider — manages FlowerClient lifecycle
└── tools/
├── workers.py # 8 worker management tools
├── tasks.py # 11 task management tools
└── queues.py # 2 queue / health tools
dishka manages the FlowerClient lifecycle: created once at startup, closed cleanly on shutdown via an async generator provider.
make fmt # auto-format with ruff
make lint # lint with ruff
make typecheck # type-check with mypy (strict)
make test # run 49 unit tests
make cov # unit tests + coverage report
make all # fmt + lint + typecheck
The test suite is split into two layers:
Unit tests (tests/) — fast, no external dependencies, use pytest-httpx to mock HTTP calls:
make test
# or
uv run pytest tests/ -m "not integration"
Integration tests (tests/integration/) — run against a real Flower instance backed by Redis and a live Celery worker, all managed by Docker Compose:
make integration
This command:
docker-compose.test.yml) — Redis → Celery worker → Flower/healthcheck endpoint to return OKhttp://localhost:5555The stack is defined in docker-compose.test.yml. The worker and Flower images are built from tests/integration/Dockerfile.worker and tests/integration/Dockerfile.flower.
To start the stack manually for exploratory testing:
docker compose -f docker-compose.test.yml up -d --build
# run tests, explore, etc.
make integration-down # stop + remove volumes
Integration tests use pytest.mark.asyncio(loop_scope="session") so all tests share one event loop — this avoids RuntimeError: Event loop is closed when httpx transports are cleaned up across test boundaries on Python 3.14.
See CONTRIBUTING.md for details on adding new tools or submitting a PR.
See CHANGELOG.md.
Добавь это в claude_desktop_config.json и перезапусти Claude Desktop.
{
"mcpServers": {
"celery-flower-mcp": {
"command": "npx",
"args": []
}
}
}