loading…
Search for a command to run...
loading…
High-performance job queue for Bun & AI agents. SQLite persistence, cron scheduling, priorities, retries, DLQ, webhooks, native MCP server. Zero external depend
High-performance job queue for Bun & AI agents. SQLite persistence, cron scheduling, priorities, retries, DLQ, webhooks, native MCP server. Zero external dependencies.
High-performance job queue for Bun. Built for AI agents and automation.
Zero external dependencies. MCP-native. TypeScript-first.
Documentation · Benchmarks · npm
bun add bunqueue
import { Bunqueue } from 'bunqueue/client';
const app = new Bunqueue('emails', {
embedded: true,
processor: async (job) => {
console.log(`Sending to ${job.data.to}`);
return { sent: true };
},
});
await app.add('send', { to: '[email protected]' });
That's it. Queue + Worker in one object. No Redis, no config, no setup.
Simple Mode gives you a Queue and a Worker in a single object. Add jobs, process them, add middleware, schedule crons — all from one place.
Use Bunqueue when producer and consumer are in the same process. For distributed systems, use Queue + Worker separately. For AI agent workflows, use the MCP Server instead — agents control queues via natural language without writing code.
new Bunqueue('emails', opts)
│
├── this.queue = new Queue('emails', ...)
├── this.worker = new Worker('emails', ...)
│
└── Subsystems (all optional):
├── RetryEngine ── jitter, fibonacci, exponential, custom
├── CircuitBreaker ── pauses worker after N failures
├── BatchAccumulator ── groups N jobs into one call
├── TriggerManager ── "on complete → create job B"
├── TtlChecker ── rejects expired jobs
├── PriorityAger ── boosts old jobs' priority
├── CancellationManager ── AbortController per job
└── DedupDebounceMerger ── deduplication & debounce
Processing pipeline per job:
Job → Circuit Breaker → TTL check → AbortController → Retry → Middleware → Processor
Route jobs to different handlers by name:
const app = new Bunqueue('notifications', {
embedded: true,
routes: {
'send-email': async (job) => {
await sendEmail(job.data.to);
return { channel: 'email' };
},
'send-sms': async (job) => {
await sendSMS(job.data.to);
return { channel: 'sms' };
},
},
});
await app.add('send-email', { to: 'alice' });
await app.add('send-sms', { to: 'bob' });
Note: Use one of
processor,routes, orbatch. Passing multiple or none throws an error.
Wraps every job execution. Execution order is onion-style: mw1 → mw2 → processor → mw2 → mw1. When no middleware is added, zero overhead.
// Timing middleware
app.use(async (job, next) => {
const start = Date.now();
const result = await next();
console.log(`${job.name}: ${Date.now() - start}ms`);
return result;
});
// Error recovery middleware
app.use(async (job, next) => {
try {
return await next();
} catch (err) {
return { recovered: true, error: err.message };
}
});
Accumulates N jobs and processes them together. Flushes when buffer reaches size or timeout expires. On close(), remaining jobs are flushed.
const app = new Bunqueue('db-inserts', {
embedded: true,
batch: {
size: 50,
timeout: 2000,
processor: async (jobs) => {
const rows = jobs.map(j => j.data.row);
await db.insertMany('table', rows);
return jobs.map(() => ({ inserted: true }));
},
},
});
5 strategies + retry predicate:
const app = new Bunqueue('api-calls', {
embedded: true,
processor: async (job) => {
const res = await fetch(job.data.url);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return { status: res.status };
},
retry: {
maxAttempts: 5,
delay: 1000,
strategy: 'jitter',
retryIf: (error) => error.message.includes('503'),
},
});
| Strategy | Formula | Use case |
|---|---|---|
fixed |
delay every time |
Rate-limited APIs |
exponential |
delay × 2^attempt |
General purpose |
jitter |
delay × 2^attempt × random(0.5-1.0) |
Thundering herd prevention |
fibonacci |
delay × fib(attempt) |
Gradual backoff |
custom |
customBackoff(attempt, error) → ms |
Anything |
This is in-process retry — the job stays active. Different from core
attempts/backoffwhich re-queues.
Cancel running jobs with AbortController:
const app = new Bunqueue('encoding', {
embedded: true,
processor: async (job) => {
const signal = app.getSignal(job.id);
for (const chunk of chunks) {
if (signal?.aborted) throw new Error('Cancelled');
await encode(chunk);
}
return { done: true };
},
});
const job = await app.add('video', { file: 'big.mp4' });
app.cancel(job.id); // cancel immediately
app.cancel(job.id, 5000); // cancel after 5s grace period
The signal works with fetch too: await fetch(url, { signal }).
Pauses the worker after too many consecutive failures:
CLOSED ──→ failures ≥ threshold ──→ OPEN (worker paused)
│
←── success ──── HALF-OPEN ←── timeout expires
const app = new Bunqueue('payments', {
embedded: true,
processor: async (job) => paymentGateway.charge(job.data),
circuitBreaker: {
threshold: 5,
resetTimeout: 30000,
onOpen: () => alert('Gateway down!'),
onClose: () => alert('Gateway recovered'),
},
});
app.getCircuitState(); // 'closed' | 'open' | 'half-open'
app.resetCircuit(); // force close + resume worker
Create follow-up jobs automatically when a job completes or fails:
const app = new Bunqueue('orders', {
embedded: true,
routes: {
'place-order': async (job) => ({ orderId: job.data.id, total: 99 }),
'send-receipt': async (job) => ({ sent: true }),
'fraud-alert': async (job) => ({ alerted: true }),
},
});
app.trigger({
on: 'place-order',
create: 'send-receipt',
data: (result, job) => ({ id: job.data.id }),
});
// Conditional trigger (only for large orders)
app.trigger({
on: 'place-order',
create: 'fraud-alert',
data: (result) => ({ amount: result.total }),
condition: (result) => result.total > 1000,
});
// Chain triggers
app
.trigger({ on: 'step-1', create: 'step-2', data: (r) => r })
.trigger({ on: 'step-2', create: 'step-3', data: (r) => r });
Expire unprocessed jobs. Checked when the worker picks up the job:
const app = new Bunqueue('otp', {
embedded: true,
processor: async (job) => verifyOTP(job.data.code),
ttl: {
defaultTtl: 300000,
perName: {
'verify-otp': 60000,
'daily-report': 0,
},
},
});
app.setDefaultTtl(120000);
app.setNameTtl('flash-sale', 30000);
Resolution: perName[job.name] → defaultTtl → 0 (no TTL).
Automatically boosts priority of old waiting jobs to prevent starvation:
const app = new Bunqueue('tasks', {
embedded: true,
processor: async (job) => ({ done: true }),
priorityAging: {
interval: 60000,
minAge: 300000,
boost: 2,
maxPriority: 100,
maxScan: 200,
},
});
A job with priority 1, after 5 min: 3, after 10 min: 5, … capped at 100.
Prevent duplicate jobs. Jobs with the same name + data get the same dedup ID:
const app = new Bunqueue('webhooks', {
embedded: true,
processor: async (job) => processWebhook(job.data),
deduplication: {
ttl: 60000,
extend: false,
replace: false,
},
});
await app.add('hook', { event: 'user.created', userId: '123' });
await app.add('hook', { event: 'user.created', userId: '123' }); // deduplicated!
await app.add('hook', { event: 'user.updated', userId: '123' }); // different data → new job
Override per-job: await app.add('task', data, { deduplication: { id: 'my-id', ttl: 5000 } }).
Coalesce rapid same-name jobs. Only the last one in the TTL window gets processed:
const app = new Bunqueue('search', {
embedded: true,
processor: async (job) => executeSearch(job.data.query),
debounce: { ttl: 500 },
});
await app.add('search', { query: 'h' });
await app.add('search', { query: 'he' });
await app.add('search', { query: 'hello' }); // only this one processes
const app = new Bunqueue('api', {
embedded: true,
processor: async (job) => callExternalAPI(job.data),
rateLimit: { max: 100, duration: 1000 },
});
// Per-group rate limiting (e.g., per customer)
const app2 = new Bunqueue('api', {
embedded: true,
processor: async (job) => callAPI(job.data),
rateLimit: { max: 10, duration: 1000, groupKey: 'customerId' },
});
app.setGlobalRateLimit(50, 1000);
app.removeGlobalRateLimit();
const app = new Bunqueue('critical', {
embedded: true,
processor: async (job) => riskyOperation(job.data),
dlq: {
autoRetry: true,
autoRetryInterval: 3600000,
maxAutoRetries: 3,
maxAge: 604800000,
maxEntries: 10000,
},
});
app.getDlq(); // all entries
app.getDlqStats(); // { total, byReason, ... }
app.getDlq({ reason: 'timeout' }); // filter by reason
app.retryDlq(); // retry all
app.purgeDlq(); // clear all
Failure reasons: explicit_fail, max_attempts_exceeded, timeout, stalled, ttl_expired, worker_lost.
await app.cron('daily-report', '0 9 * * *', { type: 'report' });
await app.cron('eu-digest', '0 8 * * 1', { type: 'weekly' }, { timezone: 'Europe/Rome' });
await app.every('healthcheck', 30000, { type: 'ping' });
await app.listCrons();
await app.removeCron('healthcheck');
app.on('completed', (job, result) => { });
app.on('failed', (job, error) => { });
app.on('active', (job) => { });
app.on('progress', (job, progress) => { });
app.on('stalled', (jobId, reason) => { });
app.on('error', (error) => { });
app.on('ready', () => { });
app.on('drained', () => { });
app.on('closed', () => { });
await app.add('task', { key: 'value' });
await app.add('urgent', data, { priority: 10, delay: 5000, attempts: 5, durable: true });
await app.addBulk([
{ name: 'email', data: { to: 'alice' } },
{ name: 'email', data: { to: 'bob' }, opts: { priority: 10 } },
]);
app.pause(); // pause queue + worker
app.resume(); // resume both
await app.close(); // graceful shutdown
await app.close(true); // force shutdown
import { Bunqueue, shutdownManager } from 'bunqueue/client';
const app = new Bunqueue<{ payload: string }>('my-app', {
embedded: true,
routes: {
'process': async (job) => ({ id: job.data.payload, status: 'done' }),
'notify': async (job) => ({ sent: true }),
'alert': async (job) => ({ alerted: true }),
},
concurrency: 10,
retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' },
circuitBreaker: { threshold: 5, resetTimeout: 30000 },
ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } },
priorityAging: { interval: 60000, minAge: 300000, boost: 1 },
deduplication: { ttl: 5000 },
rateLimit: { max: 100, duration: 1000 },
dlq: { autoRetry: true, maxAge: 604800000 },
});
app.use(async (job, next) => {
const start = Date.now();
const result = await next();
console.log(`${job.name}: ${Date.now() - start}ms`);
return result;
});
app
.trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) })
.trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data });
await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' });
await app.add('process', { payload: 'ORD-001' });
process.on('SIGINT', async () => {
await app.close();
shutdownManager();
});
Orchestrate multi-step business processes with branching, saga compensation, and human-in-the-loop signals. Built on top of bunqueue — no new infrastructure.
import { Workflow, Engine } from 'bunqueue/workflow';
const orderFlow = new Workflow('order-pipeline')
.step('validate', async (ctx) => {
const { orderId, amount } = ctx.input as { orderId: string; amount: number };
if (amount <= 0) throw new Error('Invalid amount');
return { orderId };
})
.step('reserve-stock', async () => {
await inventory.reserve();
return { reserved: true };
}, {
compensate: async () => await inventory.release(), // Auto-rollback on failure
})
.step('charge', async () => {
return { txId: await payments.charge() };
}, {
compensate: async () => await payments.refund(),
})
.step('confirm', async (ctx) => {
const { txId } = ctx.steps['charge'] as { txId: string };
return { emailSent: true, txId };
});
const engine = new Engine({ embedded: true });
engine.register(orderFlow);
await engine.start('order-pipeline', { orderId: 'ORD-1', amount: 99.99 });
Features:
.parallel()waitFor('event') pauses execution, engine.signal() resumes itwaitFor('event', { timeout }) fails if signal doesn't arrive in time.subWorkflow(), child results passed backdoUntil() and doWhile() for conditional iteration with safety limitsstep:0, step:1, ...).map().parse() schemaengine.subscribe(id, callback) to monitor a specific execution's eventsengine.on/onAny)engine.cleanup() / engine.archive() for execution history managementvs Competitors:
| bunqueue | Temporal | Inngest | Trigger.dev | |
|---|---|---|---|---|
| Infrastructure | None (embedded) | PostgreSQL + 7 services | Cloud-only | Redis + PostgreSQL |
| Saga compensation | Built-in | Manual | Manual | Manual |
| Human-in-the-loop | .waitFor() |
Signals API | step.waitForEvent() |
Waitpoint tokens |
| Self-hosted | Zero-config | Complex | No | Complex |
| Pricing | Free (MIT) | Free / Cloud $$ | Per-execution | Free tier, then $50/mo+ |
// Branching
const flow = new Workflow('tiered')
.step('classify', async (ctx) => {
const { amount } = ctx.input as { amount: number };
return { tier: amount > 1000 ? 'vip' : 'basic' };
})
.branch((ctx) => (ctx.steps['classify'] as { tier: string }).tier)
.path('vip', (w) => w.step('vip-handler', async () => ({ discount: 20 })))
.path('basic', (w) => w.step('basic-handler', async () => ({ discount: 0 })))
.step('done', async () => ({ processed: true }));
// Human-in-the-loop
const approvalFlow = new Workflow('expense')
.step('submit', async (ctx) => {
const { amount } = ctx.input as { amount: number };
return { amount };
})
.waitFor('manager-approval')
.step('reimburse', async (ctx) => {
const decision = ctx.signals['manager-approval'] as { approved: boolean };
return { status: decision.approved ? 'paid' : 'rejected' };
});
// Signal a waiting workflow
await engine.signal(run.id, 'manager-approval', { approved: true });
bunqueue Dashboard
A visual interface for managing queues, jobs, workers and monitoring in real time. Currently in beta.
https://github.com/user-attachments/assets/e8a8d38e-b4a6-4dc8-8360-876c0f24d116
Want early access? Reach out at [email protected]
| Library | Requires | AI-native |
|---|---|---|
| BullMQ | Redis | No |
| Agenda | MongoDB | No |
| pg-boss | PostgreSQL | No |
| bunqueue | Nothing | Yes |
Queue, Worker, QueueEvents
bunqueue is the first job queue with native MCP support. AI agents get a full-featured scheduler, task queue, and monitoring system — no glue code needed.
HTTP Handlers solve a fundamental problem: an AI agent can schedule jobs and manage queues, but it cannot run a persistent worker. When the agent registers an HTTP handler, bunqueue spawns an embedded Worker that continuously pulls jobs and calls your HTTP endpoint. Responses are saved as results. Failed calls retry automatically via DLQ.
What AI agents can do with bunqueue:
# One command to connect Claude Code
claude mcp add bunqueue -- bunx bunqueue-mcp
// Claude Desktop / Cursor / Windsurf — add to MCP config
{
"mcpServers": {
"bunqueue": {
"command": "bunx",
"args": ["bunqueue-mcp"]
}
}
}
Example agent interactions:
Plugin ecosystem — bunqueue ships with auto-discovery (.mcp.json), a custom Claude Code agent for bunqueue tasks, and installable skills for setup, API reference, and real-world patterns. Drop bunqueue into any project and your AI tools discover it automatically.
Supports embedded (local SQLite) and TCP (remote server) modes. Full MCP documentation →
Great for:
Not ideal for:
If you're already running Redis, BullMQ is great — battle-tested and feature-rich.
bunqueue is for when you don't want to run Redis. SQLite with WAL mode handles surprisingly high throughput for single-node deployments (tested up to 286K ops/sec). You get persistence, priorities, delays, retries, cron jobs, and DLQ — without the operational overhead of another service.
bun add bunqueue
Requires Bun runtime. Node.js is not supported.
bunqueue runs in two modes depending on your architecture:
| Embedded | Server (TCP) | |
|---|---|---|
| How it works | Queue runs inside your process | Standalone server, clients connect via TCP |
| Setup | bun add bunqueue |
docker run or bunqueue start |
| Performance | 286K ops/sec | 149K ops/sec |
| Best for | Single-process apps, CLIs, serverless | Multiple workers, separate producer/consumer |
| Scaling | Same process only | Multiple clients across machines |
Everything runs in your process. No server, no network, no setup.
import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('emails', { embedded: true });
const worker = new Worker(
'emails',
async (job) => {
console.log('Processing:', job.data);
return { sent: true };
},
{ embedded: true }
);
await queue.add('welcome', { to: '[email protected]' });
Run bunqueue as a standalone server. Multiple workers and producers connect via TCP.
# Start with persistent data
docker run -d -p 6789:6789 -p 6790:6790 \
-v bunqueue-data:/app/data \
ghcr.io/egeominotti/bunqueue:latest
Connect from your app:
import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('tasks', { connection: { host: 'localhost', port: 6789 } });
const worker = new Worker(
'tasks',
async (job) => {
return { done: true };
},
{ connection: { host: 'localhost', port: 6789 } }
);
await queue.add('process', { data: 'hello' });
One object. Queue + Worker + Routes + Middleware + Cron. Zero boilerplate.
import { Bunqueue } from 'bunqueue/client';
const app = new Bunqueue('notifications', {
embedded: true,
// Route jobs by name
routes: {
'send-email': async (job) => {
console.log(`Email to ${job.data.to}`);
return { sent: true };
},
'send-sms': async (job) => {
console.log(`SMS to ${job.data.to}`);
return { sent: true };
},
},
concurrency: 10,
});
// Middleware — wraps every job (logging, timing, error recovery)
app.use(async (job, next) => {
const start = Date.now();
const result = await next();
console.log(`${job.name} took ${Date.now() - start}ms`);
return result;
});
// Cron — scheduled jobs
await app.cron('daily-report', '0 9 * * *', { type: 'summary' });
await app.every('healthcheck', 30000, { type: 'ping' });
// Events
app.on('completed', (job, result) => console.log(result));
app.on('failed', (job, err) => console.error(err));
// Add jobs
await app.add('send-email', { to: '[email protected]' });
await app.add('send-sms', { to: '+1234567890' });
// Graceful shutdown
await app.close();
Works with both embedded and TCP mode. Simple Mode docs →
SQLite handles surprisingly high throughput for single-node deployments:
| Mode | Peak Throughput | Use Case |
|---|---|---|
| Embedded | 286K ops/sec | Same process |
| TCP | 149K ops/sec | Distributed workers |
Run
bun run benchto verify on your hardware. Full benchmark methodology →
# Start with Prometheus + Grafana
docker compose --profile monitoring up -d
MIT
Добавь это в claude_desktop_config.json и перезапусти Claude Desktop.
{
"mcpServers": {
"bunqueue": {
"command": "npx",
"args": [
"-y",
"bunqueue"
]
}
}
}