跳转到内容
123xiao | 无名键客

《Node.js 中基于 Worker Threads 与队列的 CPU 密集型任务处理实战》

字数: 0 阅读时长: 1 分钟

背景与问题

Node.js 很适合做 I/O 密集型服务,但一旦遇到 CPU 密集型任务,事情就容易变味。

比如这些场景:

  • 图片压缩、缩略图生成
  • 大批量 JSON/CSV 解析
  • 密码学计算、哈希、签名
  • 数据聚合、规则引擎、复杂排序
  • 音视频转码前后的预处理

很多同学的第一反应是:既然 Node.js 异步这么强,那我把任务丢进 Promise 不就行了?

可惜不行。
因为 Promise 解决的是“异步调度”,不是“把 CPU 计算搬走”。

如果你在主线程里跑一个超重的计算函数,即使外面包了 async/await,事件循环还是会被卡住。表现通常是:

  • 接口延迟突然飙升
  • 心跳、定时器不准
  • Web 服务吞吐下降
  • 监控里 CPU 占用高,但并发能力反而很差

这时候,比较实用的一种方案就是:

主线程负责接收请求和调度,Worker Threads 负责实际计算,中间用任务队列做削峰和限流。

这篇文章我会带你从 0 到 1 做一个可运行的 demo,重点不是“能跑”,而是理解这套模型为什么靠谱、哪里容易踩坑。


前置知识与环境准备

你需要知道什么

建议你至少熟悉:

  • Node.js 基础模块用法
  • async/await
  • HTTP 服务基础
  • JavaScript 中数组、对象、Map 的基本操作

环境

  • Node.js 18 及以上
  • 任意终端
  • 一个空目录

初始化项目:

mkdir node-worker-queue-demo
cd node-worker-queue-demo
npm init -y

我们这篇文章只用 Node 内置模块,不额外安装第三方依赖。


为什么不能只靠异步

先看一个“看起来异步,实际仍然阻塞”的例子。

function heavyCalculation(n) {
  let count = 0;
  for (let i = 0; i < n; i++) {
    count += Math.sqrt(i);
  }
  return count;
}

async function run() {
  console.log('start');
  const result = heavyCalculation(1e9);
  console.log('done', result);
}

run();
console.log('after run');

你会发现:

  • after run 虽然会打印
  • 但整个进程在计算期间依然是“卡”的
  • 如果这是 Web 服务,别的请求也会受影响

原因很简单:
JavaScript 执行计算的还是主线程。

所以,我们需要把这类任务丢给 Worker Threads。


核心原理

这套方案其实可以拆成 3 个角色:

  1. 主线程(Main Thread)

    • 接收用户请求
    • 把任务放入队列
    • 从队列中取任务并分发给 Worker
    • 返回结果或状态
  2. 任务队列(Queue)

    • 缓冲瞬时高峰
    • 控制并发数
    • 避免无限创建 Worker
    • 支持排队、超时、失败处理
  3. Worker Threads

    • 真正执行 CPU 密集型任务
    • 与主线程通过消息通信
    • 计算完成后把结果回传

整体流程图

flowchart LR
    A[HTTP 请求] --> B[主线程]
    B --> C[任务入队]
    C --> D[调度器]
    D --> E1[Worker 1]
    D --> E2[Worker 2]
    D --> E3[Worker 3]
    E1 --> F[结果回传]
    E2 --> F
    E3 --> F
    F --> B
    B --> G[响应客户端]

为什么要“队列 + 固定数量 Worker”

很多人第一次写 Worker,会这么干:

  • 来一个请求
  • new 一个 Worker
  • 计算完销毁

这能跑,但通常不优雅,问题包括:

  • Worker 创建本身有成本
  • 高并发下线程数失控
  • 内存占用不稳定
  • 调度行为不可控

更稳妥的思路是:

  • 预创建固定数量的 Worker
  • 任务进入队列
  • 空闲 Worker 再去领任务

这就是一个简化版线程池模型。

主线程与 Worker 的通信关系

sequenceDiagram
    participant Client as 客户端
    participant Main as 主线程
    participant Queue as 任务队列
    participant Worker as Worker线程

    Client->>Main: 提交计算请求
    Main->>Queue: enqueue(task)
    Main->>Main: 查找空闲Worker
    Main->>Worker: postMessage(task)
    Worker->>Worker: 执行CPU计算
    Worker->>Main: message(result)
    Main->>Client: 返回结果

实战代码(可运行)

下面我们做一个完整示例:

  • 提供一个 HTTP 服务
  • 提交一个“计算质数数量”的 CPU 密集型任务
  • 使用固定大小的 Worker 池
  • 使用内存队列调度任务

项目结构如下:

node-worker-queue-demo/
├─ index.js
├─ pool.js
└─ worker.js

第一步:实现 Worker 逻辑

worker.js

const { parentPort } = require('worker_threads');

function countPrimes(max) {
  let count = 0;

  for (let i = 2; i <= max; i++) {
    let isPrime = true;
    const sqrt = Math.sqrt(i);

    for (let j = 2; j <= sqrt; j++) {
      if (i % j === 0) {
        isPrime = false;
        break;
      }
    }

    if (isPrime) count++;
  }

  return count;
}

parentPort.on('message', (task) => {
  const { taskId, number } = task;

  try {
    const start = Date.now();
    const result = countPrimes(number);
    const duration = Date.now() - start;

    parentPort.postMessage({
      taskId,
      result,
      duration,
    });
  } catch (error) {
    parentPort.postMessage({
      taskId,
      error: error.message,
    });
  }
});

这里 Worker 的职责很单纯:

  • 收到任务
  • 做计算
  • 返回结果

不要把太多业务状态塞进 Worker 里,后面维护会轻松很多。


第二步:实现一个简单的 Worker 池 + 队列

pool.js

const path = require('path');
const { Worker } = require('worker_threads');

class WorkerPool {
  constructor(size) {
    this.size = size;
    this.workers = [];
    this.queue = [];
    this.callbacks = new Map();
    this.taskId = 0;
  }

  init() {
    for (let i = 0; i < this.size; i++) {
      this.addWorker();
    }
  }

  addWorker() {
    const worker = new Worker(path.resolve(__dirname, 'worker.js'));
    const wrapper = {
      worker,
      busy: false,
      currentTaskId: null,
    };

    worker.on('message', (message) => {
      const { taskId, result, duration, error } = message;
      const callback = this.callbacks.get(taskId);

      if (callback) {
        this.callbacks.delete(taskId);
        wrapper.busy = false;
        wrapper.currentTaskId = null;

        if (error) {
          callback.reject(new Error(error));
        } else {
          callback.resolve({ result, duration });
        }
      }

      this.dispatch();
    });

    worker.on('error', (err) => {
      if (wrapper.currentTaskId !== null) {
        const callback = this.callbacks.get(wrapper.currentTaskId);
        if (callback) {
          this.callbacks.delete(wrapper.currentTaskId);
          callback.reject(err);
        }
      }

      wrapper.busy = false;
      wrapper.currentTaskId = null;

      this.replaceWorker(wrapper);
      this.dispatch();
    });

    worker.on('exit', (code) => {
      if (code !== 0) {
        if (wrapper.currentTaskId !== null) {
          const callback = this.callbacks.get(wrapper.currentTaskId);
          if (callback) {
            this.callbacks.delete(wrapper.currentTaskId);
            callback.reject(new Error(`Worker exited with code ${code}`));
          }
        }

        wrapper.busy = false;
        wrapper.currentTaskId = null;
        this.replaceWorker(wrapper);
      }
    });

    this.workers.push(wrapper);
  }

  replaceWorker(oldWrapper) {
    const index = this.workers.indexOf(oldWrapper);
    if (index !== -1) {
      this.workers.splice(index, 1);
    }
    this.addWorker();
  }

  runTask(payload) {
    return new Promise((resolve, reject) => {
      const taskId = ++this.taskId;
      this.queue.push({ taskId, payload });
      this.callbacks.set(taskId, { resolve, reject });
      this.dispatch();
    });
  }

  dispatch() {
    const idleWorker = this.workers.find((w) => !w.busy);
    if (!idleWorker) return;

    const task = this.queue.shift();
    if (!task) return;

    idleWorker.busy = true;
    idleWorker.currentTaskId = task.taskId;

    idleWorker.worker.postMessage({
      taskId: task.taskId,
      ...task.payload,
    });

    if (this.queue.length > 0) {
      setImmediate(() => this.dispatch());
    }
  }

  getStats() {
    const busyWorkers = this.workers.filter((w) => w.busy).length;
    return {
      poolSize: this.size,
      busyWorkers,
      idleWorkers: this.size - busyWorkers,
      queueSize: this.queue.length,
      pendingCallbacks: this.callbacks.size,
    };
  }

  async destroy() {
    await Promise.all(this.workers.map((w) => w.worker.terminate()));
  }
}

module.exports = WorkerPool;

这个版本是“教学友好版”,特点是:

  • 简洁
  • 可运行
  • 足够体现核心机制

它还不是生产级线程池,但已经能覆盖绝大多数入门到中级实践的思路。


第三步:提供 HTTP 接口

index.js

const http = require('http');
const os = require('os');
const WorkerPool = require('./pool');

const cpuCount = os.cpus().length;
const poolSize = Math.max(1, Math.min(cpuCount - 1, 4));
const pool = new WorkerPool(poolSize);

pool.init();

function sendJson(res, statusCode, data) {
  res.writeHead(statusCode, {
    'Content-Type': 'application/json; charset=utf-8',
  });
  res.end(JSON.stringify(data));
}

const server = http.createServer(async (req, res) => {
  const url = new URL(req.url, 'http://localhost');

  if (req.method === 'GET' && url.pathname === '/health') {
    return sendJson(res, 200, {
      ok: true,
      stats: pool.getStats(),
    });
  }

  if (req.method === 'GET' && url.pathname === '/prime') {
    const number = Number(url.searchParams.get('number') || 200000);

    if (!Number.isInteger(number) || number < 2 || number > 2000000) {
      return sendJson(res, 400, {
        error: 'number 必须是 2 到 2000000 之间的整数',
      });
    }

    try {
      const data = await pool.runTask({ number });
      return sendJson(res, 200, {
        input: number,
        primeCount: data.result,
        durationMs: data.duration,
        pool: pool.getStats(),
      });
    } catch (error) {
      return sendJson(res, 500, {
        error: error.message,
      });
    }
  }

  sendJson(res, 404, { error: 'Not Found' });
});

server.listen(3000, () => {
  console.log(`Server started on http://localhost:3000`);
  console.log(`Worker pool size: ${poolSize}`);
});

async function shutdown() {
  console.log('Shutting down...');
  server.close(async () => {
    await pool.destroy();
    process.exit(0);
  });
}

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

第四步:运行与验证

启动服务:

node index.js

访问健康检查:

curl http://localhost:3000/health

示例返回:

{
  "ok": true,
  "stats": {
    "poolSize": 4,
    "busyWorkers": 0,
    "idleWorkers": 4,
    "queueSize": 0,
    "pendingCallbacks": 0
  }
}

提交计算任务:

curl "http://localhost:3000/prime?number=300000"

如果你多开几个终端并发请求,再看 /health,就能明显观察到:

  • 有些 Worker 正在忙
  • 新任务在队列里排队
  • 主线程仍然能响应健康检查

这就是最关键的收益:
重计算不再堵死主线程。


逐步验证清单

如果你想确认自己真的理解了,不妨按这个顺序测一遍。

1. 单请求验证

先发一个小任务:

curl "http://localhost:3000/prime?number=100000"

确认能正常返回结果。

2. 并发验证

开多个终端,同时发请求:

curl "http://localhost:3000/prime?number=400000"
curl "http://localhost:3000/prime?number=450000"
curl "http://localhost:3000/prime?number=500000"
curl "http://localhost:3000/prime?number=550000"

然后看:

curl http://localhost:3000/health

确认 busyWorkersqueueSize 有变化。

3. 边界验证

试试非法参数:

curl "http://localhost:3000/prime?number=-1"
curl "http://localhost:3000/prime?number=abc"
curl "http://localhost:3000/prime?number=99999999"

确认服务能拒绝异常输入,而不是傻乎乎地开始算。


核心原理再深入一点

上面的代码已经能跑,但你最好再理解两件事。

1. Worker Threads 不是 Cluster

很多人会混淆这两个概念。

  • Worker Threads:同一进程内的多线程,适合把 CPU 计算从主线程移走
  • Cluster / 多进程:多个 Node 进程共同提供服务,适合提升服务实例并发和隔离性

简单说:

  • 想解决“主线程被重计算卡住” → 优先看 Worker Threads
  • 想利用多核做 Web 服务扩展 → 可考虑 Cluster、PM2、容器副本扩容

2. 队列的价值不只是“排队”

队列至少解决 4 个问题:

  • 削峰:瞬时很多任务时先缓冲
  • 限流:避免 Worker 数量无限增长
  • 公平调度:让任务按规则执行
  • 可观测:你能知道当前积压了多少任务

状态变化示意

stateDiagram-v2
    [*] --> Waiting
    Waiting --> Running: 分配到空闲Worker
    Running --> Success: 计算完成
    Running --> Failed: 抛错/Worker退出
    Failed --> Waiting: 可选重试
    Success --> [*]

常见坑与排查

这部分很重要。我自己第一次做这类方案时,问题基本都不在“API 不会用”,而在“行为和预期不一样”。

坑 1:任务还是把主线程卡住了

典型原因

  • 重计算逻辑没放进 Worker,而是还在主线程里
  • 提交任务前做了大量同步预处理
  • 返回结果时做了大对象序列化

排查思路

检查计算路径是不是这样:

  1. 主线程只做参数校验
  2. 组装轻量任务对象
  3. postMessage 发给 Worker
  4. Worker 内执行计算

如果你在 runTask() 之前还做了复杂计算,那瓶颈仍在主线程。


坑 2:Worker 开太多,机器反而更慢

原因

线程不是越多越好。
CPU 核数是有限的,线程过多会带来:

  • 上下文切换开销
  • 内存占用增加
  • 调度抖动
  • GC 压力变大

建议

一个保守起点是:

const poolSize = Math.max(1, Math.min(os.cpus().length - 1, 4));

也就是说:

  • 先别把机器吃满
  • 给主线程和系统留余量
  • 再根据监控做压测调整

坑 3:大对象传输让性能变差

主线程和 Worker 之间通信,通常需要做结构化克隆。
如果你传的是超大的对象、数组、Buffer,开销会很明显。

现象

  • Worker 计算本身不慢
  • 但整体响应时间依然高
  • CPU 使用异常
  • 内存抖动明显

解决思路

  • 只传必要字段
  • 不要把整个请求对象传进去
  • 能传 ID 就别传全量内容
  • 对二进制数据,考虑 TransferableSharedArrayBuffer(需要明确安全边界)

坑 4:队列无限增长,内存被吃爆

这是很容易被忽略的问题。

如果你的服务接收任务速度远高于 Worker 消费速度,而队列又没有上限,那么最终结果通常是:

  • 队列越积越多
  • 内存不断上涨
  • 延迟越来越高
  • 进程被 OOM 杀掉

止血方案

给队列设置最大长度。超过就拒绝请求,返回 429 或 503。

例如在 runTask 里加限制:

if (this.queue.length >= 1000) {
  return Promise.reject(new Error('Queue is full'));
}

这是非常实用的“熔断式保护”。


坑 5:Worker 崩了以后任务丢失

如果 Worker 因异常退出,而你没有处理:

  • 当前执行的任务会悬空
  • Promise 可能永远不 resolve/reject
  • 调用方看起来像“卡死”

正确做法

  • 监听 error
  • 监听 exit
  • 对正在执行的任务做 reject
  • 自动补一个新的 Worker

上面我们的池子代码已经做了最基本的处理。


安全/性能最佳实践

这部分更偏“上线前必看”。

1. 对输入做强校验

CPU 密集型任务尤其怕“恶意输入”。

例如:

  • 超大数字
  • 异常深度的数据结构
  • 特殊构造导致算法退化

一定要限制:

  • 参数类型
  • 参数范围
  • 请求频率
  • 单任务最大计算量

像本文示例中的:

if (!Number.isInteger(number) || number < 2 || number > 2000000)

这不是形式主义,而是避免别人一脚把你服务踩死。


2. 给任务加超时控制

有些计算可能因为 bug、死循环或极端输入跑太久。
如果不做超时,Worker 可能长期被占住。

一个常见做法是给任务 Promise 包一层超时:

function withTimeout(promise, ms) {
  return Promise.race([
    promise,
    new Promise((_, reject) =>
      setTimeout(() => reject(new Error('Task timeout')), ms)
    ),
  ]);
}

调用时:

const data = await withTimeout(pool.runTask({ number }), 5000);

更进一步,你还可以在超时后直接销毁异常 Worker 并重建。


3. 队列要有上限和降级策略

生产环境里,不要只想着“尽量接住所有请求”,要想“接不住时怎么优雅失败”。

建议至少有这些策略:

  • 队列长度上限
  • 超过上限直接拒绝
  • 区分高优先级与低优先级任务
  • 支持快速失败,而不是把延迟拖到几分钟

4. 做监控,不要盲调线程数

你需要观察的指标包括:

  • 队列长度
  • Worker 忙碌数
  • 任务平均耗时
  • 任务失败率
  • 进程 CPU
  • RSS / Heap 内存
  • 事件循环延迟

如果没有这些指标,线程池大小基本只能靠猜。


5. 小心同步日志

一个很容易忽略的问题:
如果你在高频任务路径上疯狂 console.log,性能会被拖垮。

建议:

  • 只记录关键事件
  • 错误日志结构化输出
  • 高频路径打采样日志
  • 不要在每个任务开始/结束都打大量日志

6. 真正生产环境可考虑外部任务队列

本文用的是内存队列,优点是简单,适合:

  • 单机服务
  • 本地工具
  • 中小规模任务处理
  • 学习与原型验证

但它也有边界:

  • 进程重启后任务丢失
  • 无法跨实例共享
  • 不适合长生命周期任务

如果你的场景变成:

  • 多实例部署
  • 任务不能丢
  • 需要重试、延迟执行、优先级
  • 需要后台异步处理

那就应该考虑:

  • Redis + BullMQ / Bee-Queue
  • RabbitMQ
  • Kafka
  • 数据库任务表 + 调度器

也就是说:

Worker Threads 解决“算在哪里”,队列系统解决“任务怎么可靠流转”。

这两个层次不要混为一谈。


一个更稳妥的改进方向

如果你准备把上面的 demo 往生产环境推进,我建议优先补这几项:

改进项 1:队列长度限制

runTask(payload) {
  const MAX_QUEUE_SIZE = 100;

  if (this.queue.length >= MAX_QUEUE_SIZE) {
    return Promise.reject(new Error('Queue is full'));
  }

  return new Promise((resolve, reject) => {
    const taskId = ++this.taskId;
    this.queue.push({ taskId, payload });
    this.callbacks.set(taskId, { resolve, reject });
    this.dispatch();
  });
}

改进项 2:任务超时

async function runTaskWithTimeout(pool, payload, timeoutMs = 5000) {
  return Promise.race([
    pool.runTask(payload),
    new Promise((_, reject) =>
      setTimeout(() => reject(new Error('Task timeout')), timeoutMs)
    ),
  ]);
}

改进项 3:优先级队列

当你的任务有轻重缓急时,可以拆成:

  • high queue
  • normal queue
  • low queue

调度器优先取高优任务,而不是所有任务先进先出。


什么时候不该用这套方案

不是所有 CPU 问题都适合 Worker Threads。

以下场景要谨慎:

1. 任务特别重,单机根本算不动

比如单次计算就要吃满 CPU 几十秒甚至几分钟。
这种时候更适合:

  • 独立计算服务
  • 后台任务系统
  • 分布式处理平台

2. 任务需要非常高的可靠性

如果任务不能丢、必须重试、要可审计,内存队列明显不够。
应该引入持久化消息系统。

3. 任务依赖原生库或外部命令更合适

有些工作例如视频转码,本质上更适合交给:

  • ffmpeg
  • Python/Rust/C++ 服务
  • 独立进程

Node.js 负责调度和编排,未必要自己算。


总结

回到一开始的问题:
Node.js 怎么处理 CPU 密集型任务,且不把主线程拖死?

一个实用答案就是:

  • Worker Threads 承担计算
  • 任务队列 做缓冲与调度
  • 固定大小线程池 控制并发
  • 校验、超时、限流、监控 保证可用性

你可以把这篇文章的示例记成一句话:

主线程做调度,Worker 做计算,队列做秩序。

如果你现在正在做中小规模服务,我建议这样落地:

  1. 先把最重的同步计算挪到 Worker
  2. 池大小从 CPU 核数 - 1 或更保守值开始
  3. 立刻加上队列长度限制
  4. 给任务加超时
  5. 压测后再调优,不要凭感觉定参数

最后给一个边界判断:

  • 单机、短任务、能接受进程内队列丢失:本文方案很合适
  • 多实例、任务必须可靠、需要重试与持久化:请把外部消息队列也纳入设计

如果你先把这版 demo 跑起来,再用压测工具打一轮,很快就能直观理解 Worker Threads 的价值。这个过程比只看概念要有效得多。


分享到:

上一篇
《Web逆向实战:从抓包定位到参数还原,系统破解前端加密接口的中级方法论》
下一篇
《Docker Compose 到 Kubernetes:中级团队的容器化应用迁移实战与避坑指南》