背景与问题
在 Node.js 服务里,很多业务一开始都喜欢“同步做完再返回”:
- 用户下单后立刻发短信
- 创建订单后同步扣库存、写日志、发通知
- 导出报表时直接在接口里跑大查询
- 第三方回调失败后靠接口调用方重试
一旦业务量上来,问题会很快暴露:
- 请求链路过长:接口响应慢,甚至超时。
- 外部依赖不稳定:短信、邮件、支付、Webhook 这类服务偶发失败很常见。
- 任务有时效差异:有的要立刻执行,有的要延迟执行,有的失败后要稍后重试。
- 服务重启后任务丢失:如果任务只在进程内存中,进程挂了,任务也没了。
- 重复执行与幂等问题:尤其在“失败重试 + 消费者崩溃恢复”的场景中,很容易出现同一个任务被执行多次。
这时候,引入一个可靠任务队列就很自然了。
在 Node.js 生态里,BullMQ 是一个很实用的选择:它基于 Redis,API 清晰,支持重试、延迟任务、并发处理、限速、失败队列等能力,足够支撑大部分中型系统。
这篇文章我会从“高可靠设计”这个角度来讲,不只是把任务塞进队列,而是重点讨论:
- 失败了怎么重试
- 延迟任务怎么做
- Worker 崩溃后怎么恢复
- 如何避免重复执行带来的业务事故
- 实战里怎么排查“任务卡住/重复消费/Redis 压力大”等问题
方案定位与取舍分析
先说结论:BullMQ + Redis 很适合中小到中大型的异步任务场景,但它不是所有场景的终极答案。
为什么选 BullMQ
它比较适合这些任务:
- 邮件、短信、推送
- 订单超时取消
- 异步生成报表
- 图片/音视频转码的任务调度
- Webhook 投递与失败重试
- 业务异步解耦
它的优势:
- 接入成本低:Node.js 里直接用。
- Redis 持久化与恢复机制成熟:比纯内存队列可靠。
- 内置能力全:延迟、重试、优先级、并发、限速、事件监听都有。
- 生态稳定:比自己手写一个 Redis List + 定时轮询靠谱得多。
不适合的场景
但如果你碰到这些情况,要谨慎:
- 超大规模流式事件处理:更偏向 Kafka 这类日志流系统。
- 严格顺序与强事务一致性要求非常高:需要额外设计。
- 超长耗时任务:例如单任务几小时,Worker 心跳与锁续期要特别关注。
- 海量延迟任务:虽然能做,但到非常大的量级时,需要做专门的容量评估。
与常见方案简单对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 进程内队列 | 简单 | 重启丢任务,不可横向扩展 | 本地脚本、临时工具 |
| Redis + BullMQ | 易用、功能全、恢复能力好 | 依赖 Redis,极端规模下要调优 | 大多数 Node.js 异步任务 |
| RabbitMQ | 路由灵活,消息模型成熟 | 使用和维护门槛更高 | 复杂消息路由 |
| Kafka | 吞吐高,适合事件流 | 不是“任务队列”的直接替代 | 大数据、日志流 |
核心原理
BullMQ 的关键不是“能把任务放进去”,而是它如何围绕 Redis 组织任务状态和消费流程。
一个任务的大致生命周期
stateDiagram-v2
[*] --> waiting
waiting --> active: worker 获取任务
active --> completed: 执行成功
active --> failed: 执行失败且重试耗尽
active --> delayed: 失败后按退避策略延迟重试
delayed --> waiting: 到达执行时间
failed --> waiting: 人工/程序重入队
completed --> [*]
几个核心状态:
waiting:等待被消费active:Worker 正在执行delayed:延迟执行completed:执行成功failed:执行失败
可靠性的几个核心点
1. 任务持久化
任务不是放在 Node.js 内存里,而是写到 Redis。
这意味着:
- 服务重启后,任务仍然存在
- 多个 Worker 可以共享同一个队列
- 可以跨机器扩容
2. 锁与“卡住任务”恢复
Worker 取到任务后,会持有一个锁,并持续续期。
如果 Worker 崩了、进程被杀、机器宕机,锁会失效。BullMQ 会把这类任务识别为 stalled job(卡住任务),再把它重新投回可处理状态。
这就是“失败恢复”的基础,但它也带来一个非常现实的问题:
同一个任务,理论上可能被执行不止一次。
所以业务层必须设计幂等性。
3. 重试与退避策略
并不是所有失败都应该立刻进入 failed。
网络抖动、第三方 502、瞬时限流,这些都属于“可恢复失败”。
BullMQ 允许你配置:
attempts:最大重试次数backoff:退避策略,如固定延迟、指数退避
这比在业务代码里 setTimeout 自己重试要安全得多,因为重试状态是持久化的。
4. 延迟任务
比如:
- 下单 30 分钟未支付自动取消
- 注册后 24 小时发送提醒邮件
- Webhook 首次失败后 5 分钟再试
BullMQ 的 delay 可以天然覆盖这类场景。
系统交互流程
下面这张图把“生产任务、处理任务、失败重试、恢复”串起来看会更直观。
sequenceDiagram
participant API as API 服务
participant Queue as BullMQ Queue
participant Redis as Redis
participant Worker as Worker
participant Third as 第三方服务
API->>Queue: add(job, opts)
Queue->>Redis: 写入 waiting/delayed
Worker->>Redis: 获取任务并加锁
Worker->>Third: 调用外部接口
alt 成功
Worker->>Redis: 标记 completed
else 临时失败
Worker->>Redis: 按 backoff 放入 delayed
else 永久失败/重试耗尽
Worker->>Redis: 标记 failed
end
alt Worker 崩溃
Redis-->>Worker: 锁过期
Worker->>Redis: stalled 检测后重新入队
end
架构拆分建议
实际项目里,我通常会把角色拆清楚:
- Producer:只负责投递任务
- Worker:只负责消费
- Scheduler / QueueEvents:负责补全延迟、失败、完成事件观测
- 业务幂等层:保证重复执行也不会产生脏数据
flowchart LR
A[HTTP/API 服务] --> B[Queue Producer]
B --> C[(Redis)]
C --> D[Worker 1]
C --> E[Worker 2]
C --> F[Worker N]
D --> G[业务数据库]
E --> G
F --> G
D --> H[第三方服务]
E --> H
F --> H
C --> I[QueueEvents/监控]
一个很重要的经验是:不要让 API 进程既是高频生产者,又是重 CPU Worker。
否则你会发现接口延迟和异步处理互相影响,压测时特别明显。
实战代码(可运行)
下面我们做一个简化但完整的例子:订单创建后,异步发送通知;失败自动重试;也支持延迟发送;Worker 崩溃后任务可恢复。
目录结构
bullmq-demo/
├─ package.json
├─ producer.js
├─ worker.js
├─ events.js
└─ idempotency-store.js
安装依赖
npm init -y
npm install bullmq ioredis
package.json
{
"name": "bullmq-demo",
"version": "1.0.0",
"type": "commonjs",
"scripts": {
"producer": "node producer.js",
"worker": "node worker.js",
"events": "node events.js"
}
}
共享 Redis 连接与幂等存储
这里为了演示简化,幂等记录也存在 Redis。生产环境你也可以放数据库,并加唯一索引。
idempotency-store.js
const IORedis = require('ioredis');
const connection = new IORedis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: null
});
async function markProcessedOnce(key, ttlSeconds = 24 * 3600) {
// SET key value NX EX ttl
const result = await connection.set(key, '1', 'EX', ttlSeconds, 'NX');
return result === 'OK';
}
async function closeConnection() {
await connection.quit();
}
module.exports = {
connection,
markProcessedOnce,
closeConnection
};
生产者:投递任务
producer.js
const { Queue } = require('bullmq');
const { connection } = require('./idempotency-store');
const queueName = 'notification-queue';
const queue = new Queue(queueName, {
connection,
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 5000,
attempts: 5,
backoff: {
type: 'exponential',
delay: 3000
}
}
});
async function main() {
// 立即执行任务
await queue.add(
'send-order-notification',
{
orderId: 'ORD-1001',
userId: 'U-88',
channel: 'sms',
content: '您的订单已创建成功'
},
{
jobId: 'order-notify-ORD-1001'
}
);
// 延迟任务:30 秒后执行
await queue.add(
'send-order-notification',
{
orderId: 'ORD-1002',
userId: 'U-99',
channel: 'email',
content: '这是一个延迟发送的通知'
},
{
jobId: 'order-notify-ORD-1002',
delay: 30 * 1000
}
);
console.log('任务已投递');
await queue.close();
process.exit(0);
}
main().catch(async (err) => {
console.error('producer error:', err);
await queue.close();
process.exit(1);
});
这里有两个关键点
-
jobId很重要
它可以避免同一个业务任务被重复添加。比如订单ORD-1001的通知任务,如果 API 因为网络抖动重复提交,BullMQ 会基于相同jobId去重。 -
attempts + backoff
它不是写在业务函数里,而是写在队列层。这样任务重试状态不会因为进程重启而丢失。
Worker:消费与失败恢复
worker.js
const { Worker } = require('bullmq');
const { connection, markProcessedOnce } = require('./idempotency-store');
const queueName = 'notification-queue';
async function fakeSendNotification(data) {
// 模拟第三方接口有概率失败
const random = Math.random();
// 模拟耗时
await new Promise((resolve) => setTimeout(resolve, 1500));
if (random < 0.5) {
const err = new Error('第三方通知服务暂时不可用');
err.code = 'THIRD_PARTY_TEMP_ERROR';
throw err;
}
console.log(`[通知成功] orderId=${data.orderId}, channel=${data.channel}`);
return {
ok: true,
providerMessageId: `msg_${Date.now()}`
};
}
const worker = new Worker(
queueName,
async (job) => {
console.log(`开始处理 jobId=${job.id}, name=${job.name}`);
// 幂等键:确保同一业务任务重复执行时不会重复发送
const idempotentKey = `notify:processed:${job.data.orderId}:${job.data.channel}`;
const firstTime = await markProcessedOnce(idempotentKey, 24 * 3600);
if (!firstTime) {
console.log(`检测到重复执行,直接跳过 orderId=${job.data.orderId}`);
return { skipped: true };
}
const result = await fakeSendNotification(job.data);
return result;
},
{
connection,
concurrency: 5,
lockDuration: 30000,
stalledInterval: 30000,
maxStalledCount: 1
}
);
worker.on('completed', (job, result) => {
console.log(`job completed: id=${job.id}`, result);
});
worker.on('failed', (job, err) => {
console.error(`job failed: id=${job && job.id}, err=${err.message}`);
});
worker.on('error', (err) => {
console.error('worker error:', err);
});
async function shutdown() {
console.log('收到退出信号,准备关闭 worker...');
await worker.close();
process.exit(0);
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
事件监听:观察重试、完成、失败
events.js
const { QueueEvents } = require('bullmq');
const { connection } = require('./idempotency-store');
const queueName = 'notification-queue';
const queueEvents = new QueueEvents(queueName, { connection });
queueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log('[event completed]', jobId, returnvalue);
});
queueEvents.on('failed', ({ jobId, failedReason }) => {
console.log('[event failed]', jobId, failedReason);
});
queueEvents.on('waiting', ({ jobId }) => {
console.log('[event waiting]', jobId);
});
queueEvents.on('delayed', ({ jobId, delay }) => {
console.log('[event delayed]', jobId, delay);
});
queueEvents.on('stalled', ({ jobId }) => {
console.log('[event stalled]', jobId);
});
queueEvents.on('error', (err) => {
console.error('queueEvents error:', err);
});
console.log('QueueEvents started');
运行方式
先启动 Redis,然后开三个终端:
终端 1:事件监听
npm run events
终端 2:启动 Worker
npm run worker
终端 3:投递任务
npm run producer
如果你多运行几次,会看到:
- 有的任务一次成功
- 有的任务会失败后进入重试
- 延迟任务会在指定时间后再执行
- 如果你在 Worker 执行过程中强制 kill 掉进程,部分任务会被恢复后重新执行
失败恢复设计:只靠 BullMQ 还不够
很多人第一次用队列时,会有一个误区:
“只要用了消息队列,就能保证任务只执行一次。”
这在工程上几乎不成立。
BullMQ 更接近 至少一次投递(at-least-once) 的语义,而不是“绝对仅一次”。
所以高可靠设计必须补上以下几层。
1. 业务幂等
比如“发送优惠券”“扣库存”“变更订单状态”这类操作,必须支持重复调用不出错。
常见手段:
- 数据库唯一索引
- 状态机校验(已完成则跳过)
- 幂等表记录请求号
- Redis
SET NX作为短期防重
例如发券:
CREATE TABLE coupon_send_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
biz_id VARCHAR(64) NOT NULL,
user_id BIGINT NOT NULL,
coupon_id BIGINT NOT NULL,
created_at DATETIME NOT NULL,
UNIQUE KEY uk_biz_id (biz_id)
);
消费时用 biz_id 去插入,如果唯一键冲突,说明发过了,直接视为成功。
2. 区分“临时失败”和“永久失败”
不是所有异常都值得重试。
建议把异常分类:
- 临时失败:网络超时、503、限流、连接中断
- 永久失败:参数错误、数据不存在、业务条件不满足
例如:
function isRetryableError(err) {
return [
'ECONNRESET',
'ETIMEDOUT',
'THIRD_PARTY_TEMP_ERROR'
].includes(err.code);
}
在 Worker 中:
if (!isRetryableError(err)) {
// 可以包装成一个明确错误,避免无意义重试
throw new Error(`NON_RETRYABLE: ${err.message}`);
}
如果你不做分类,最终会看到大量“明明不可能成功却反复重试”的任务,把 Redis 和 Worker 都拖慢。
3. 死信/人工补偿机制
BullMQ 没有像某些 MQ 那样显式的“死信交换器”概念,但你完全可以在 failed 后做自己的补偿策略:
- 定时扫描失败任务,人工重放
- 失败写入数据库工单表
- 告警推送到企业微信/Slack
- 将失败任务转移到
xxx-dead-letter队列
这一步很关键,因为“高可靠”并不等于“系统永远不出错”,而是出错后能被发现、能被恢复。
容量估算与并发取舍
中级工程师容易忽略的一点是:队列系统的瓶颈,很多时候不在 BullMQ,而在下游。
一个简单估算方法
假设:
- 平均每秒入队 200 个任务
- 每个任务处理耗时 500ms
- 单个 Worker 并发设为 20
理论上,单个 Worker 的吞吐大约是:
20 / 0.5 = 40 task/s
那至少需要:
200 / 40 = 5 个 Worker
再考虑重试、波峰和第三方限流,实际通常要再留 30%~50% 裕量。
并发不是越大越好
我见过不少项目把 concurrency 开到几百,结果:
- Node 事件循环被压住
- 第三方接口被打爆
- Redis 命令数飙升
- 失败任务更快堆积
建议按任务类型调整:
- IO 型任务:并发可以高一些
- CPU 型任务:并发要保守,必要时拆到独立进程甚至其他语言服务
延迟任务很多时要关注什么
如果系统里有大量延迟任务,比如几十万、几百万级:
- Redis 内存要预估
- 队列数量不要无限膨胀
- 删除策略要做好,避免 completed/failed 积压
- 监控延迟漂移:计划 5 分钟执行,实际是否拖到 10 分钟
常见坑与排查
这一节我尽量讲得“接地气”一点,都是实战里很常见的问题。
坑 1:任务重复执行
现象
- 用户收到两条短信
- 同一订单被重复取消
- 一条 webhook 被投递多次
原因
- Worker 在处理过程中崩溃,任务被恢复
- 生产者重复投递
- 接口重试导致同一业务被多次入队
- 锁续期失败,被误判 stalled
排查路径
- 看是否设置了业务唯一
jobId - 看 Worker 是否有长时间阻塞事件循环
- 看是否实现了业务幂等
- 看 Redis 是否出现网络抖动或阻塞
处理建议
- 入队时设置
jobId - 消费时实现幂等
- 不要在 Worker 里写大 CPU 计算
- 长任务适当调大
lockDuration
坑 2:任务卡住不动
现象
- 队列里有 waiting,但 Worker 似乎不消费
- delayed 任务一直不到点执行
- active 任务长期不结束
常见原因
- Worker 根本没启动
- Redis 连接异常
- 队列名写错
- 任务执行函数卡死
- 并发设置过小,任务被前面的大任务堵住
排查命令思路
你可以先从日志和 Redis 状态入手。
在应用里打印:
console.log('queue name:', queueName);
console.log('redis status:', connection.status);
如果要看 Redis 当前压力:
redis-cli info memory
redis-cli info clients
redis-cli info stats
我的经验
如果 active 长时间不释放,先怀疑:
- 业务代码里有未结束的 Promise
- 调用第三方接口没有超时控制
- 有同步 CPU 重活阻塞了事件循环
比如 HTTP 请求一定要设超时,而不是无限等:
async function withTimeout(promise, ms) {
let timer;
const timeout = new Promise((_, reject) => {
timer = setTimeout(() => reject(new Error('timeout')), ms);
});
try {
return await Promise.race([promise, timeout]);
} finally {
clearTimeout(timer);
}
}
坑 3:失败任务越来越多
现象
- failed 数量持续上升
- 重试把队列越压越长
- Redis 内存持续增长
原因
- 重试次数设置过高
- 永久失败任务也在重试
- 下游服务不可用但没有熔断
removeOnFail没配置,失败记录一直堆积
处理建议
- 区分可重试与不可重试错误
- 给第三方依赖做熔断、限流
- 合理设置
attempts - 对失败任务做归档/清理策略
坑 4:Redis 内存涨得快
现象
- Redis 内存不断上涨
- 队列查询越来越慢
- 运维开始追着问是不是“你们任务系统又爆了”
常见原因
completed/failed没清理- 返回结果太大
- 任务 data 放了大对象
- 单队列积压严重
建议
removeOnComplete、removeOnFail要配置- 不要把大文件内容直接塞进 job data
- job data 只放必要字段,例如 ID、路径、引用
- 报表或大对象放对象存储/数据库,任务只传引用
安全/性能最佳实践
这一部分我尽量给能直接落地的建议。
1. 不要把敏感数据明文塞进任务体
错误示范:
await queue.add('send-email', {
email: 'user@example.com',
password: 'plain-text-password',
token: 'secret-token'
});
任务数据会进入 Redis。
如果 Redis 没有做好隔离、ACL、网络访问控制,这些信息会非常危险。
建议:
- 只传业务 ID
- 敏感字段到消费时再查
- Redis 开启访问控制,部署在内网
- 使用 TLS/认证(如果环境支持)
2. 队列数据最小化
一个好的 job data 通常长这样:
{
orderId: "ORD-1001",
userId: "U-88",
templateId: "tpl_sms_created"
}
而不是把整个订单 JSON 全塞进去。
好处:
- Redis 更省内存
- 任务更稳定
- 业务数据变更时不容易产生“旧快照问题”
3. 任务处理必须设超时边界
外部调用建议统一包装超时、重试、熔断。
否则某个第三方接口卡 2 分钟,你的 Worker 很快就会被拖死。
4. 按任务类型拆队列
不要把以下任务混在一个队列里:
- 秒级短信通知
- 分钟级报表导出
- 30 分钟后自动取消订单
- 视频转码
建议拆分:
notification-queuereport-queueorder-delay-queuemedia-process-queue
这样更容易做:
- 独立并发控制
- 独立监控
- 独立告警
- 独立扩容
5. 做监控,不要“等投诉才知道挂了”
至少监控这些指标:
- waiting 数量
- active 数量
- failed 增量
- delayed 数量
- 完成耗时分位数
- 重试次数
- stalled 次数
- Redis 内存和连接数
如果没有监控,高可靠其实只是口头说法。
6. 优雅停机
Worker 收到 SIGTERM 时不要直接退出。
应该先 worker.close(),让当前任务有机会收尾。
尤其在容器环境或 K8s 下,这一步非常重要。
一个更稳妥的生产级配置示例
下面给一个更接近生产环境的队列初始化方式。
const { Queue } = require('bullmq');
const IORedis = require('ioredis');
const connection = new IORedis({
host: process.env.REDIS_HOST || '127.0.0.1',
port: Number(process.env.REDIS_PORT || 6379),
password: process.env.REDIS_PASSWORD || undefined,
maxRetriesPerRequest: null,
enableReadyCheck: true,
lazyConnect: false
});
const notificationQueue = new Queue('notification-queue', {
connection,
defaultJobOptions: {
attempts: 4,
backoff: {
type: 'exponential',
delay: 5000
},
removeOnComplete: 1000,
removeOnFail: 3000
}
});
module.exports = {
connection,
notificationQueue
};
如果你的下游有明显限流要求,还可以加限速:
const { Worker } = require('bullmq');
const worker = new Worker(
'notification-queue',
async (job) => {
// process job
},
{
connection,
concurrency: 10,
limiter: {
max: 100,
duration: 1000
}
}
);
这表示每秒最多处理 100 个任务,对于短信、邮件、Webhook 这类场景非常实用。
边界条件与设计建议
最后把几个容易忽视的边界条件单独拎出来。
什么时候 BullMQ 足够用
如果你的需求是:
- 延迟任务
- 异步通知
- 失败重试
- 多 Worker 并发处理
- 服务重启后任务不丢
那 BullMQ 通常已经够用了。
什么时候要补更多基础设施
如果你的需求开始出现:
- 跨地域多活
- 极高吞吐事件流
- 严格顺序消费
- 海量任务审计追踪
- 多语言复杂消费者体系
那可能要考虑:
- Kafka 做事件流
- 独立调度平台
- 更严格的任务编排系统
- 数据库 outbox/inbox 模式
一条很实用的建议
如果你现在的系统还比较简单,不要一开始就堆满复杂概念。
建议先做到这 5 件事:
- 任务入队带
jobId - 消费逻辑实现幂等
- 配置
attempts + backoff - completed/failed 做自动清理
- 把失败事件接入告警
这 5 条做好,可靠性会比“只是把任务异步化”高很多。
总结
BullMQ + Redis 在 Node.js 里做高可靠任务队列,是一个非常务实的方案。
它真正有价值的地方,不只是“异步”,而是把下面这些能力系统化了:
- 任务持久化
- 失败自动重试
- 延迟执行
- Worker 崩溃后的恢复
- 并发与限速控制
- 事件观测与运维排查
但也要记住一个核心事实:
队列本身解决的是“调度与恢复”,不是“业务绝对只执行一次”。
所以真正的高可靠,一定是 BullMQ 的恢复能力 + 业务层幂等 + 失败补偿机制 三者一起完成的。
如果你准备在项目里落地,我建议按下面顺序推进:
- 先按业务域拆好队列
- 为每类任务定义唯一业务键
- 配置合理的重试与退避策略
- 给所有外部调用加超时
- 建立失败告警与人工补偿入口
- 再去做更细的性能调优和容量扩展
这样做,系统不一定“永远不失败”,但至少失败时你知道它在哪、为什么、怎么恢复。
这就是工程上真正有价值的“高可靠”。