在工作线程中处理异步消息
Handle async messages in worker threads
我们正在使用 NodeJS 实验人员来完成一些 CPU 密集型任务。这些任务是通过 parentPort
消息传递的消息启动的。在线程运行期间,它们需要将数据持久化到数据库,这是一个由 promises 支持的异步操作。
我们看到的是 parentPort
消息在我们执行异步操作时不断发送到处理函数。
我们正在做的代码示例:
const { parentPort, Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
for (const x of i) {
worker.postMessage({ idx: x });
}
} else {
parentPort.on('message', async (value) => {
await testAsync(value);
});
}
async function testAsync(value) {
return new Promise((resolve) => {
console.log(`Starting wait for ${value.idx}`);
setTimeout(() => {
console.log(`Complete resolve for ${value.idx}`);
resolve();
if(value.idx == 9) {
setTimeout(() => process.exit(0), 2000);
}
}, 500);
});
}
在上面的示例中,我们看到 Starting wait for ...
在任何 Complete resolve ...
消息出现之前打印。使用 async-await
我们期望事件处理程序在处理新事件之前等待已解决的承诺。在真实的例子中,数据库连接可能会失败,从而抛出异常,所以我们要确保在接受新消息之前已经完全处理了当前消息。
我们是不是做错了什么?
如果不是,是否可以实现按顺序处理事件的预期目标?
看来你想将消息入队,并且一次只处理一件事。
parentPort.on('message', () => {}
是一个事件监听器,当事件被触发时,不会等到之前回调里面的异步操作完成。
所以,如果你触发'message'
一千次,testAsync
将被执行一千次,无需等待。
需要在worker中实现一个队列,并限制并发。 NPM中有多个promise队列包。
我将在这个例子中使用p-queue
。
const PQueue = require('p-queue');
const { parentPort, Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
for (const x of i) {
worker.postMessage({ idx: x });
}
} else {
const queue = new PQueue({ concurrency: 1 }); // set concurrency
parentPort.on('message', value => {
queue.add(() => testAsync(value));
});
}
async function testAsync(value) {
return new Promise((resolve) => {
console.log(`Starting wait for ${value.idx}`);
setTimeout(() => {
console.log(`Complete resolve for ${value.idx}`);
resolve();
if(value.idx == 9) {
setTimeout(() => process.exit(0), 2000);
}
}, 500);
});
}
现在输出将是:
starting wait for 1
complete resolve for 1
starting wait for 2
complete resolve for 2
starting wait for N
complete resolve for N
我们正在使用 NodeJS 实验人员来完成一些 CPU 密集型任务。这些任务是通过 parentPort
消息传递的消息启动的。在线程运行期间,它们需要将数据持久化到数据库,这是一个由 promises 支持的异步操作。
我们看到的是 parentPort
消息在我们执行异步操作时不断发送到处理函数。
我们正在做的代码示例:
const { parentPort, Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
for (const x of i) {
worker.postMessage({ idx: x });
}
} else {
parentPort.on('message', async (value) => {
await testAsync(value);
});
}
async function testAsync(value) {
return new Promise((resolve) => {
console.log(`Starting wait for ${value.idx}`);
setTimeout(() => {
console.log(`Complete resolve for ${value.idx}`);
resolve();
if(value.idx == 9) {
setTimeout(() => process.exit(0), 2000);
}
}, 500);
});
}
在上面的示例中,我们看到 Starting wait for ...
在任何 Complete resolve ...
消息出现之前打印。使用 async-await
我们期望事件处理程序在处理新事件之前等待已解决的承诺。在真实的例子中,数据库连接可能会失败,从而抛出异常,所以我们要确保在接受新消息之前已经完全处理了当前消息。
我们是不是做错了什么?
如果不是,是否可以实现按顺序处理事件的预期目标?
看来你想将消息入队,并且一次只处理一件事。
parentPort.on('message', () => {}
是一个事件监听器,当事件被触发时,不会等到之前回调里面的异步操作完成。
所以,如果你触发'message'
一千次,testAsync
将被执行一千次,无需等待。
需要在worker中实现一个队列,并限制并发。 NPM中有多个promise队列包。
我将在这个例子中使用p-queue
。
const PQueue = require('p-queue');
const { parentPort, Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
for (const x of i) {
worker.postMessage({ idx: x });
}
} else {
const queue = new PQueue({ concurrency: 1 }); // set concurrency
parentPort.on('message', value => {
queue.add(() => testAsync(value));
});
}
async function testAsync(value) {
return new Promise((resolve) => {
console.log(`Starting wait for ${value.idx}`);
setTimeout(() => {
console.log(`Complete resolve for ${value.idx}`);
resolve();
if(value.idx == 9) {
setTimeout(() => process.exit(0), 2000);
}
}, 500);
});
}
现在输出将是:
starting wait for 1
complete resolve for 1
starting wait for 2
complete resolve for 2
starting wait for N
complete resolve for N