背景与问题
Node.js 很适合做 I/O 密集型服务,但一旦遇到 CPU 密集型任务,事情就容易变味。
比如这些场景:
- 图片压缩、缩略图生成
- 大批量 JSON/CSV 解析
- 密码学计算、哈希、签名
- 数据聚合、规则引擎、复杂排序
- 音视频转码前后的预处理
很多同学的第一反应是:既然 Node.js 异步这么强,那我把任务丢进 Promise 不就行了?
可惜不行。
因为 Promise 解决的是“异步调度”,不是“把 CPU 计算搬走”。
如果你在主线程里跑一个超重的计算函数,即使外面包了 async/await,事件循环还是会被卡住。表现通常是:
- 接口延迟突然飙升
- 心跳、定时器不准
- Web 服务吞吐下降
- 监控里 CPU 占用高,但并发能力反而很差
这时候,比较实用的一种方案就是:
主线程负责接收请求和调度,Worker Threads 负责实际计算,中间用任务队列做削峰和限流。
这篇文章我会带你从 0 到 1 做一个可运行的 demo,重点不是“能跑”,而是理解这套模型为什么靠谱、哪里容易踩坑。
前置知识与环境准备
你需要知道什么
建议你至少熟悉:
- Node.js 基础模块用法
async/await- HTTP 服务基础
- JavaScript 中数组、对象、Map 的基本操作
环境
- Node.js 18 及以上
- 任意终端
- 一个空目录
初始化项目:
mkdir node-worker-queue-demo
cd node-worker-queue-demo
npm init -y
我们这篇文章只用 Node 内置模块,不额外安装第三方依赖。
为什么不能只靠异步
先看一个“看起来异步,实际仍然阻塞”的例子。
function heavyCalculation(n) {
let count = 0;
for (let i = 0; i < n; i++) {
count += Math.sqrt(i);
}
return count;
}
async function run() {
console.log('start');
const result = heavyCalculation(1e9);
console.log('done', result);
}
run();
console.log('after run');
你会发现:
after run虽然会打印- 但整个进程在计算期间依然是“卡”的
- 如果这是 Web 服务,别的请求也会受影响
原因很简单:
JavaScript 执行计算的还是主线程。
所以,我们需要把这类任务丢给 Worker Threads。
核心原理
这套方案其实可以拆成 3 个角色:
-
主线程(Main Thread)
- 接收用户请求
- 把任务放入队列
- 从队列中取任务并分发给 Worker
- 返回结果或状态
-
任务队列(Queue)
- 缓冲瞬时高峰
- 控制并发数
- 避免无限创建 Worker
- 支持排队、超时、失败处理
-
Worker Threads
- 真正执行 CPU 密集型任务
- 与主线程通过消息通信
- 计算完成后把结果回传
整体流程图
flowchart LR
A[HTTP 请求] --> B[主线程]
B --> C[任务入队]
C --> D[调度器]
D --> E1[Worker 1]
D --> E2[Worker 2]
D --> E3[Worker 3]
E1 --> F[结果回传]
E2 --> F
E3 --> F
F --> B
B --> G[响应客户端]
为什么要“队列 + 固定数量 Worker”
很多人第一次写 Worker,会这么干:
- 来一个请求
- new 一个 Worker
- 计算完销毁
这能跑,但通常不优雅,问题包括:
- Worker 创建本身有成本
- 高并发下线程数失控
- 内存占用不稳定
- 调度行为不可控
更稳妥的思路是:
- 预创建固定数量的 Worker
- 任务进入队列
- 空闲 Worker 再去领任务
这就是一个简化版线程池模型。
主线程与 Worker 的通信关系
sequenceDiagram
participant Client as 客户端
participant Main as 主线程
participant Queue as 任务队列
participant Worker as Worker线程
Client->>Main: 提交计算请求
Main->>Queue: enqueue(task)
Main->>Main: 查找空闲Worker
Main->>Worker: postMessage(task)
Worker->>Worker: 执行CPU计算
Worker->>Main: message(result)
Main->>Client: 返回结果
实战代码(可运行)
下面我们做一个完整示例:
- 提供一个 HTTP 服务
- 提交一个“计算质数数量”的 CPU 密集型任务
- 使用固定大小的 Worker 池
- 使用内存队列调度任务
项目结构如下:
node-worker-queue-demo/
├─ index.js
├─ pool.js
└─ worker.js
第一步:实现 Worker 逻辑
worker.js
const { parentPort } = require('worker_threads');
function countPrimes(max) {
let count = 0;
for (let i = 2; i <= max; i++) {
let isPrime = true;
const sqrt = Math.sqrt(i);
for (let j = 2; j <= sqrt; j++) {
if (i % j === 0) {
isPrime = false;
break;
}
}
if (isPrime) count++;
}
return count;
}
parentPort.on('message', (task) => {
const { taskId, number } = task;
try {
const start = Date.now();
const result = countPrimes(number);
const duration = Date.now() - start;
parentPort.postMessage({
taskId,
result,
duration,
});
} catch (error) {
parentPort.postMessage({
taskId,
error: error.message,
});
}
});
这里 Worker 的职责很单纯:
- 收到任务
- 做计算
- 返回结果
不要把太多业务状态塞进 Worker 里,后面维护会轻松很多。
第二步:实现一个简单的 Worker 池 + 队列
pool.js
const path = require('path');
const { Worker } = require('worker_threads');
class WorkerPool {
constructor(size) {
this.size = size;
this.workers = [];
this.queue = [];
this.callbacks = new Map();
this.taskId = 0;
}
init() {
for (let i = 0; i < this.size; i++) {
this.addWorker();
}
}
addWorker() {
const worker = new Worker(path.resolve(__dirname, 'worker.js'));
const wrapper = {
worker,
busy: false,
currentTaskId: null,
};
worker.on('message', (message) => {
const { taskId, result, duration, error } = message;
const callback = this.callbacks.get(taskId);
if (callback) {
this.callbacks.delete(taskId);
wrapper.busy = false;
wrapper.currentTaskId = null;
if (error) {
callback.reject(new Error(error));
} else {
callback.resolve({ result, duration });
}
}
this.dispatch();
});
worker.on('error', (err) => {
if (wrapper.currentTaskId !== null) {
const callback = this.callbacks.get(wrapper.currentTaskId);
if (callback) {
this.callbacks.delete(wrapper.currentTaskId);
callback.reject(err);
}
}
wrapper.busy = false;
wrapper.currentTaskId = null;
this.replaceWorker(wrapper);
this.dispatch();
});
worker.on('exit', (code) => {
if (code !== 0) {
if (wrapper.currentTaskId !== null) {
const callback = this.callbacks.get(wrapper.currentTaskId);
if (callback) {
this.callbacks.delete(wrapper.currentTaskId);
callback.reject(new Error(`Worker exited with code ${code}`));
}
}
wrapper.busy = false;
wrapper.currentTaskId = null;
this.replaceWorker(wrapper);
}
});
this.workers.push(wrapper);
}
replaceWorker(oldWrapper) {
const index = this.workers.indexOf(oldWrapper);
if (index !== -1) {
this.workers.splice(index, 1);
}
this.addWorker();
}
runTask(payload) {
return new Promise((resolve, reject) => {
const taskId = ++this.taskId;
this.queue.push({ taskId, payload });
this.callbacks.set(taskId, { resolve, reject });
this.dispatch();
});
}
dispatch() {
const idleWorker = this.workers.find((w) => !w.busy);
if (!idleWorker) return;
const task = this.queue.shift();
if (!task) return;
idleWorker.busy = true;
idleWorker.currentTaskId = task.taskId;
idleWorker.worker.postMessage({
taskId: task.taskId,
...task.payload,
});
if (this.queue.length > 0) {
setImmediate(() => this.dispatch());
}
}
getStats() {
const busyWorkers = this.workers.filter((w) => w.busy).length;
return {
poolSize: this.size,
busyWorkers,
idleWorkers: this.size - busyWorkers,
queueSize: this.queue.length,
pendingCallbacks: this.callbacks.size,
};
}
async destroy() {
await Promise.all(this.workers.map((w) => w.worker.terminate()));
}
}
module.exports = WorkerPool;
这个版本是“教学友好版”,特点是:
- 简洁
- 可运行
- 足够体现核心机制
它还不是生产级线程池,但已经能覆盖绝大多数入门到中级实践的思路。
第三步:提供 HTTP 接口
index.js
const http = require('http');
const os = require('os');
const WorkerPool = require('./pool');
const cpuCount = os.cpus().length;
const poolSize = Math.max(1, Math.min(cpuCount - 1, 4));
const pool = new WorkerPool(poolSize);
pool.init();
function sendJson(res, statusCode, data) {
res.writeHead(statusCode, {
'Content-Type': 'application/json; charset=utf-8',
});
res.end(JSON.stringify(data));
}
const server = http.createServer(async (req, res) => {
const url = new URL(req.url, 'http://localhost');
if (req.method === 'GET' && url.pathname === '/health') {
return sendJson(res, 200, {
ok: true,
stats: pool.getStats(),
});
}
if (req.method === 'GET' && url.pathname === '/prime') {
const number = Number(url.searchParams.get('number') || 200000);
if (!Number.isInteger(number) || number < 2 || number > 2000000) {
return sendJson(res, 400, {
error: 'number 必须是 2 到 2000000 之间的整数',
});
}
try {
const data = await pool.runTask({ number });
return sendJson(res, 200, {
input: number,
primeCount: data.result,
durationMs: data.duration,
pool: pool.getStats(),
});
} catch (error) {
return sendJson(res, 500, {
error: error.message,
});
}
}
sendJson(res, 404, { error: 'Not Found' });
});
server.listen(3000, () => {
console.log(`Server started on http://localhost:3000`);
console.log(`Worker pool size: ${poolSize}`);
});
async function shutdown() {
console.log('Shutting down...');
server.close(async () => {
await pool.destroy();
process.exit(0);
});
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
第四步:运行与验证
启动服务:
node index.js
访问健康检查:
curl http://localhost:3000/health
示例返回:
{
"ok": true,
"stats": {
"poolSize": 4,
"busyWorkers": 0,
"idleWorkers": 4,
"queueSize": 0,
"pendingCallbacks": 0
}
}
提交计算任务:
curl "http://localhost:3000/prime?number=300000"
如果你多开几个终端并发请求,再看 /health,就能明显观察到:
- 有些 Worker 正在忙
- 新任务在队列里排队
- 主线程仍然能响应健康检查
这就是最关键的收益:
重计算不再堵死主线程。
逐步验证清单
如果你想确认自己真的理解了,不妨按这个顺序测一遍。
1. 单请求验证
先发一个小任务:
curl "http://localhost:3000/prime?number=100000"
确认能正常返回结果。
2. 并发验证
开多个终端,同时发请求:
curl "http://localhost:3000/prime?number=400000"
curl "http://localhost:3000/prime?number=450000"
curl "http://localhost:3000/prime?number=500000"
curl "http://localhost:3000/prime?number=550000"
然后看:
curl http://localhost:3000/health
确认 busyWorkers 和 queueSize 有变化。
3. 边界验证
试试非法参数:
curl "http://localhost:3000/prime?number=-1"
curl "http://localhost:3000/prime?number=abc"
curl "http://localhost:3000/prime?number=99999999"
确认服务能拒绝异常输入,而不是傻乎乎地开始算。
核心原理再深入一点
上面的代码已经能跑,但你最好再理解两件事。
1. Worker Threads 不是 Cluster
很多人会混淆这两个概念。
- Worker Threads:同一进程内的多线程,适合把 CPU 计算从主线程移走
- Cluster / 多进程:多个 Node 进程共同提供服务,适合提升服务实例并发和隔离性
简单说:
- 想解决“主线程被重计算卡住” → 优先看 Worker Threads
- 想利用多核做 Web 服务扩展 → 可考虑 Cluster、PM2、容器副本扩容
2. 队列的价值不只是“排队”
队列至少解决 4 个问题:
- 削峰:瞬时很多任务时先缓冲
- 限流:避免 Worker 数量无限增长
- 公平调度:让任务按规则执行
- 可观测:你能知道当前积压了多少任务
状态变化示意
stateDiagram-v2
[*] --> Waiting
Waiting --> Running: 分配到空闲Worker
Running --> Success: 计算完成
Running --> Failed: 抛错/Worker退出
Failed --> Waiting: 可选重试
Success --> [*]
常见坑与排查
这部分很重要。我自己第一次做这类方案时,问题基本都不在“API 不会用”,而在“行为和预期不一样”。
坑 1:任务还是把主线程卡住了
典型原因
- 重计算逻辑没放进 Worker,而是还在主线程里
- 提交任务前做了大量同步预处理
- 返回结果时做了大对象序列化
排查思路
检查计算路径是不是这样:
- 主线程只做参数校验
- 组装轻量任务对象
postMessage发给 Worker- Worker 内执行计算
如果你在 runTask() 之前还做了复杂计算,那瓶颈仍在主线程。
坑 2:Worker 开太多,机器反而更慢
原因
线程不是越多越好。
CPU 核数是有限的,线程过多会带来:
- 上下文切换开销
- 内存占用增加
- 调度抖动
- GC 压力变大
建议
一个保守起点是:
const poolSize = Math.max(1, Math.min(os.cpus().length - 1, 4));
也就是说:
- 先别把机器吃满
- 给主线程和系统留余量
- 再根据监控做压测调整
坑 3:大对象传输让性能变差
主线程和 Worker 之间通信,通常需要做结构化克隆。
如果你传的是超大的对象、数组、Buffer,开销会很明显。
现象
- Worker 计算本身不慢
- 但整体响应时间依然高
- CPU 使用异常
- 内存抖动明显
解决思路
- 只传必要字段
- 不要把整个请求对象传进去
- 能传 ID 就别传全量内容
- 对二进制数据,考虑
Transferable或SharedArrayBuffer(需要明确安全边界)
坑 4:队列无限增长,内存被吃爆
这是很容易被忽略的问题。
如果你的服务接收任务速度远高于 Worker 消费速度,而队列又没有上限,那么最终结果通常是:
- 队列越积越多
- 内存不断上涨
- 延迟越来越高
- 进程被 OOM 杀掉
止血方案
给队列设置最大长度。超过就拒绝请求,返回 429 或 503。
例如在 runTask 里加限制:
if (this.queue.length >= 1000) {
return Promise.reject(new Error('Queue is full'));
}
这是非常实用的“熔断式保护”。
坑 5:Worker 崩了以后任务丢失
如果 Worker 因异常退出,而你没有处理:
- 当前执行的任务会悬空
- Promise 可能永远不 resolve/reject
- 调用方看起来像“卡死”
正确做法
- 监听
error - 监听
exit - 对正在执行的任务做 reject
- 自动补一个新的 Worker
上面我们的池子代码已经做了最基本的处理。
安全/性能最佳实践
这部分更偏“上线前必看”。
1. 对输入做强校验
CPU 密集型任务尤其怕“恶意输入”。
例如:
- 超大数字
- 异常深度的数据结构
- 特殊构造导致算法退化
一定要限制:
- 参数类型
- 参数范围
- 请求频率
- 单任务最大计算量
像本文示例中的:
if (!Number.isInteger(number) || number < 2 || number > 2000000)
这不是形式主义,而是避免别人一脚把你服务踩死。
2. 给任务加超时控制
有些计算可能因为 bug、死循环或极端输入跑太久。
如果不做超时,Worker 可能长期被占住。
一个常见做法是给任务 Promise 包一层超时:
function withTimeout(promise, ms) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Task timeout')), ms)
),
]);
}
调用时:
const data = await withTimeout(pool.runTask({ number }), 5000);
更进一步,你还可以在超时后直接销毁异常 Worker 并重建。
3. 队列要有上限和降级策略
生产环境里,不要只想着“尽量接住所有请求”,要想“接不住时怎么优雅失败”。
建议至少有这些策略:
- 队列长度上限
- 超过上限直接拒绝
- 区分高优先级与低优先级任务
- 支持快速失败,而不是把延迟拖到几分钟
4. 做监控,不要盲调线程数
你需要观察的指标包括:
- 队列长度
- Worker 忙碌数
- 任务平均耗时
- 任务失败率
- 进程 CPU
- RSS / Heap 内存
- 事件循环延迟
如果没有这些指标,线程池大小基本只能靠猜。
5. 小心同步日志
一个很容易忽略的问题:
如果你在高频任务路径上疯狂 console.log,性能会被拖垮。
建议:
- 只记录关键事件
- 错误日志结构化输出
- 高频路径打采样日志
- 不要在每个任务开始/结束都打大量日志
6. 真正生产环境可考虑外部任务队列
本文用的是内存队列,优点是简单,适合:
- 单机服务
- 本地工具
- 中小规模任务处理
- 学习与原型验证
但它也有边界:
- 进程重启后任务丢失
- 无法跨实例共享
- 不适合长生命周期任务
如果你的场景变成:
- 多实例部署
- 任务不能丢
- 需要重试、延迟执行、优先级
- 需要后台异步处理
那就应该考虑:
- Redis + BullMQ / Bee-Queue
- RabbitMQ
- Kafka
- 数据库任务表 + 调度器
也就是说:
Worker Threads 解决“算在哪里”,队列系统解决“任务怎么可靠流转”。
这两个层次不要混为一谈。
一个更稳妥的改进方向
如果你准备把上面的 demo 往生产环境推进,我建议优先补这几项:
改进项 1:队列长度限制
runTask(payload) {
const MAX_QUEUE_SIZE = 100;
if (this.queue.length >= MAX_QUEUE_SIZE) {
return Promise.reject(new Error('Queue is full'));
}
return new Promise((resolve, reject) => {
const taskId = ++this.taskId;
this.queue.push({ taskId, payload });
this.callbacks.set(taskId, { resolve, reject });
this.dispatch();
});
}
改进项 2:任务超时
async function runTaskWithTimeout(pool, payload, timeoutMs = 5000) {
return Promise.race([
pool.runTask(payload),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Task timeout')), timeoutMs)
),
]);
}
改进项 3:优先级队列
当你的任务有轻重缓急时,可以拆成:
- high queue
- normal queue
- low queue
调度器优先取高优任务,而不是所有任务先进先出。
什么时候不该用这套方案
不是所有 CPU 问题都适合 Worker Threads。
以下场景要谨慎:
1. 任务特别重,单机根本算不动
比如单次计算就要吃满 CPU 几十秒甚至几分钟。
这种时候更适合:
- 独立计算服务
- 后台任务系统
- 分布式处理平台
2. 任务需要非常高的可靠性
如果任务不能丢、必须重试、要可审计,内存队列明显不够。
应该引入持久化消息系统。
3. 任务依赖原生库或外部命令更合适
有些工作例如视频转码,本质上更适合交给:
ffmpeg- Python/Rust/C++ 服务
- 独立进程
Node.js 负责调度和编排,未必要自己算。
总结
回到一开始的问题:
Node.js 怎么处理 CPU 密集型任务,且不把主线程拖死?
一个实用答案就是:
- 用 Worker Threads 承担计算
- 用 任务队列 做缓冲与调度
- 用 固定大小线程池 控制并发
- 用 校验、超时、限流、监控 保证可用性
你可以把这篇文章的示例记成一句话:
主线程做调度,Worker 做计算,队列做秩序。
如果你现在正在做中小规模服务,我建议这样落地:
- 先把最重的同步计算挪到 Worker
- 池大小从
CPU 核数 - 1或更保守值开始 - 立刻加上队列长度限制
- 给任务加超时
- 压测后再调优,不要凭感觉定参数
最后给一个边界判断:
- 单机、短任务、能接受进程内队列丢失:本文方案很合适
- 多实例、任务必须可靠、需要重试与持久化:请把外部消息队列也纳入设计
如果你先把这版 demo 跑起来,再用压测工具打一轮,很快就能直观理解 Worker Threads 的价值。这个过程比只看概念要有效得多。