loading…
Search for a command to run...
loading…
A durable DAG-based task planner exposed as an MCP server that lets AI orchestrators break a goal into a dependency graph of tasks, execute them in parallel whe
A durable DAG-based task planner exposed as an MCP server that lets AI orchestrators break a goal into a dependency graph of tasks, execute them in parallel where possible, track state durably, and handle human-in-the-loop approval through 22 MCP tools.
A durable DAG-based task planner exposed as an MCP (Model Context Protocol) server. It lets AI orchestrators (Claude, ADK-based agents, etc.) break a goal into a dependency graph of tasks, execute them in parallel where possible, track state durably, and handle human-in-the-loop approval — all through a clean set of 22 MCP tools.
The fastest way to run the server is with uvx — no virtual environment or pip install needed:
uvx dag-planner-mcp
Pass arguments (e.g. HTTP transport) the same way:
uvx dag-planner-mcp --transport streamable-http --host 0.0.0.0 --port 8000
With an environment variable:
DATABASE_URL="sqlite:///dag_planner.db" uvx dag-planner-mcp
Open claude_desktop_config.json and add:
{
"mcpServers": {
"dag-planner-mcp": {
"command": "uvx",
"args": ["dag-planner-mcp"],
"env": {
"DATABASE_URL": "sqlite:////home/user/data/dag_planner.db"
}
}
}
}
No installation step is required — uvx fetches and caches the package automatically on first run.
AI agents (Claude, Copilot, etc.) can pick up ready-made instructions for using this MCP server by installing the bundled skill:
npx skills add Shubhamnegi/dag-planner-mcp --skill use-mcp-tool
Or install directly from the skill path:
npx skills add https://github.com/Shubhamnegi/dag-planner-mcp/tree/main/skills/use-mcp-tool
| File | Purpose |
|---|---|
| skills/use-mcp-tool/SKILL.md | Core instructions — when/how to use the tool |
| skills/use-mcp-tool/references/setup.md | Full installation and client integration guide |
| skills/use-mcp-tool/references/examples.md | Runnable code examples (parallel tasks, HITL gates, checkpoints) |
| skills/use-mcp-tool/references/troubleshooting.md | Common failure cases and fixes |
| skills/use-mcp-tool/scripts/smoke_test.py | Quick sanity check — run after install |
| skills/use-mcp-tool/scripts/example_client.py | Complete orchestrator example (stdio + HTTP) |
DATABASE_URL environment variable (defaults to sqlite:///dag_planner.db)| Dependency | Version |
|---|---|
| Python | ≥ 3.11 |
| mcp[cli] | ≥ 1.6.0 |
| sqlalchemy | ≥ 2.0 |
| pydantic | ≥ 2.0 |
| jsonschema | ≥ 4.0 |
| aiosqlite | ≥ 0.19 |
Optional (PostgreSQL):
| Dependency | Version |
|---|---|
| asyncpg | ≥ 0.29 |
Optional (Dashboard):
| Dependency | Version |
|---|---|
| streamlit | ≥ 1.35 |
| graphviz | ≥ 0.20 |
| pandas | ≥ 2.0 |
git clone https://github.com/Shubhamnegi/dag-planner-mcp.git
cd dag-planner-mcp
python -m venv .venv
source .venv/bin/activate # Linux / macOS
.venv\Scripts\activate # Windows
SQLite (dev — no extra dependencies):
pip install -e .
PostgreSQL (prod):
pip install -e ".[postgres]"
With development/test dependencies:
pip install -e ".[dev]"
With Streamlit dashboard:
pip install -e ".[dashboard]"
The server is controlled entirely via the DATABASE_URL environment variable. Tables are created automatically on first start.
# Default — creates dag_planner.db in the current directory
export DATABASE_URL="sqlite:///dag_planner.db"
# Absolute path
export DATABASE_URL="sqlite:////home/user/data/dag_planner.db"
# In-memory (testing only — data lost on exit)
export DATABASE_URL="sqlite:///:memory:"
No additional setup is required for SQLite.
export DATABASE_URL="postgresql://user:password@localhost:5432/dag_planner"
Create the database first:
CREATE DATABASE dag_planner;
Then start the server — SQLAlchemy will create all tables automatically.
For connection pooling / SSL in production you can pass extra query parameters:
export DATABASE_URL="postgresql://user:password@host:5432/dag_planner?sslmode=require"
| Variable | Default | Description |
|---|---|---|
DATABASE_URL |
sqlite:///dag_planner.db |
SQLAlchemy connection URL (SQLite or PostgreSQL) |
MCP_HOST |
127.0.0.1 |
Host to bind when using HTTP transport |
MCP_PORT |
8000 |
Port to bind when using HTTP transport |
dag-planner-mcp
# or
python -m dag_planner_mcp.server
The server reads from stdin and writes to stdout — no port is opened.
dag-planner-mcp --transport streamable-http --host 0.0.0.0 --port 8000
The MCP endpoint will be available at:
http://localhost:8000/mcp
The recommended approach is to use uvx so no manual installation is needed (see Quick start with uvx above).
If you prefer to point at a locally installed binary:
Open the Claude Desktop configuration file:
~/Library/Application Support/Claude/claude_desktop_config.json%APPDATA%\Claude\claude_desktop_config.jsonAdd the server under mcpServers:
{
"mcpServers": {
"dag-planner-mcp": {
"command": "/path/to/.venv/bin/dag-planner-mcp",
"env": {
"DATABASE_URL": "sqlite:////home/user/data/dag_planner.db"
}
}
}
}
Replace /path/to/.venv/bin/dag-planner-mcp with the absolute path to the installed script (run which dag-planner-mcp after installation).
Start the server in HTTP mode:
DATABASE_URL="sqlite:///dag_planner.db" \
dag-planner-mcp --transport streamable-http --host 0.0.0.0 --port 8000
Then point your MCP client at:
http://localhost:8000/mcp
{
"mcpServers": {
"dag-planner-mcp": {
"url": "http://localhost:8000/mcp"
}
}
}
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async with streamablehttp_client("http://localhost:8000/mcp") as (r, w, _):
async with ClientSession(r, w) as session:
await session.initialize()
result = await session.call_tool("create_workflow_run", {"goal": "Analyze AWS costs"})
A read-only Streamlit dashboard ships in the dashboard/ directory. It reads
directly from the same database as the MCP server (via DATABASE_URL) and
never writes any data.
| Page | Description |
|---|---|
| Overview | Summary metric cards, task status bar chart, recent runs |
| Workflows | Paginated & searchable list of all workflow runs |
| Run Detail | Per-run deep-dive: task table, interactive DAG graph, event log, human approvals |
| Task Detail | Full task state including all JSON payloads |
# Install with dashboard extras
pip install -e ".[dashboard]"
# Point at the same database your MCP server uses
export DATABASE_URL="sqlite:///dag_planner.db"
# — or for PostgreSQL —
export DATABASE_URL="postgresql://user:password@localhost:5432/dag_planner"
# Launch
streamlit run dashboard/app.py
The dashboard opens at http://localhost:8501 by default.
DAG visualization requires the
graphvizsystem package in addition to the Python bindings. Install it withbrew install graphviz(macOS) orapt-get install graphviz(Debian/Ubuntu). If the system package is absent the page falls back to a plain adjacency table.
| Tool | Description |
|---|---|
create_workflow_run |
Create a new workflow run (returns run_id) |
create_plan_graph |
Define the task DAG for a run (validates for cycles) |
replace_plan_branch |
Cancel downstream tasks and graft a new plan branch |
| Tool | Description |
|---|---|
get_ready_tasks |
List tasks that are ready and unclaimed |
claim_task_for_execution |
Atomically claim a ready task with a time-bounded lease |
| Tool | Description |
|---|---|
mark_task_running |
Transition a claimed task to running |
mark_task_completed |
Mark done; auto-promotes dependent tasks to ready |
mark_task_failed |
Mark failed with optional retry |
mark_task_blocked_human |
Block a task awaiting human decision |
resume_task |
Resume a human-blocked task after decision |
| Tool | Description |
|---|---|
put_task_output |
Store working or final output |
put_task_checkpoint |
Save an incremental checkpoint |
get_task_payload_refs |
Retrieve all payload data for a task |
| Tool | Description |
|---|---|
get_task |
Full state of a single task |
list_tasks |
List tasks for a run with filters |
get_workflow_run |
Workflow run state |
get_blocked_tasks |
Tasks blocked on human or dependencies |
get_dag_edges |
All DAG edges for a run |
| Tool | Description |
|---|---|
validate_task_output |
Validate output against the task's JSON Schema contract |
validate_dag_acyclic |
Check a task list for cycles before submitting |
| Tool | Description |
|---|---|
get_my_task |
Narrow task view for a subagent |
update_my_progress |
Update working output and optional checkpoint |
submit_my_output |
Submit final output and complete the task |
request_human_input |
Block task and request a human decision |
import json
from mcp import ClientSession
from mcp.client.stdio import stdio_client
async def run():
async with stdio_client(["dag-planner-mcp"]) as (r, w):
async with ClientSession(r, w) as session:
await session.initialize()
# 1. Create a workflow
res = await session.call_tool("create_workflow_run", {
"goal": "Analyze AWS cost spike and send report"
})
run_id = json.loads(res.content[0].text)["data"]["run_id"]
# 2. Define the task DAG
await session.call_tool("create_plan_graph", {
"run_id": run_id,
"tasks": [
{
"task_key": "fetch_data",
"title": "Fetch cost data",
"description": "Pull last 3 weeks of AWS cost data",
"owner_agent": "data_agent",
"depends_on": [],
"output_contract": {"type": "object", "required": ["cost_data"]}
},
{
"task_key": "analyze",
"title": "Analyze spike",
"description": "Identify top services causing the spike",
"owner_agent": "analyst_agent",
"depends_on": ["fetch_data"],
"output_contract": {"type": "object", "required": ["summary"]}
}
]
})
# 3. Execution loop
while True:
res = await session.call_tool("get_ready_tasks", {"run_id": run_id})
tasks = json.loads(res.content[0].text)["data"]["tasks"]
if not tasks:
break # All done (or blocked)
for task in tasks:
task_id = task["task_id"]
await session.call_tool("claim_task_for_execution", {
"task_id": task_id, "executor_id": "orchestrator-1"
})
await session.call_tool("mark_task_running", {"task_id": task_id})
# ... dispatch to subagent, collect result ...
output = {"summary": "EC2 caused 40% increase"}
await session.call_tool("put_task_output", {
"task_id": task_id, "output": output, "is_final": True
})
await session.call_tool("validate_task_output", {"task_id": task_id})
await session.call_tool("mark_task_completed", {
"task_id": task_id, "final_output": output
})
pip install -e ".[dev]"
pytest tests/ -v
All tests use an in-memory SQLite database and require no external services.
Выполни в терминале:
claude mcp add dag-planner-mcp -- npx Безопасность
Низкий рискАвтоматическая эвристика по публичным данным — не гарантия безопасности.