使用 Node.js 使用 RabbitMQ 消息时,我可以等待进程完成吗?
Can I wait for a process to complete when consuming RabbitMQ messages with Node.js?
我对 Node.js 和 ES6 还很陌生,这让我有点困惑。我正在尝试离开进程 运行ning,使用来自 RabbitMQ 队列的消息。在抓取下一条消息之前,它需要能够处理消息(大约需要 30-60 秒)。目前,我拥有的代码会获取所有可能的消息,然后尝试分叉进程。当队列中有 3-5 条消息时,这很好,但是对于 20、50 或 100 条消息,这会导致服务器 运行 内存不足。
我试过使 .consume()
回调函数异步并将 await
添加到消息处理函数。我尝试在 processMessage
周围的 .consume()
回调中包装一个 await new Promise
。我尝试将 await
添加到调用 channel.consume
的行。没有任何改变行为。
#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromQueue = consumeFromQueue;
作为旁注,如果我创建一个字符串数组并循环遍历字符串,当我将 await 添加到 processMessage
行时,它会等待执行进程(30-60 秒),然后再处理下一个字符串。
(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();
所以我基本上需要这样的功能,但需要在 RabbitMQ 中监听队列。
如果您想限制消费者在任何给定时间处理的消息数量,请使用 channel.prefetch():
The count given is the maximum number of messages sent over the
channel that can be awaiting acknowledgement; once there are count
messages outstanding, the server will not send more messages on this
channel until one or more have been acknowledged.
也就是说,如果您只想在继续处理下一条消息之前一次处理一条消息,请设置 channel.prefetch(1)
继续下一步,设置 channel.prefetch(1)
channel.prefetch(1)
我对 Node.js 和 ES6 还很陌生,这让我有点困惑。我正在尝试离开进程 运行ning,使用来自 RabbitMQ 队列的消息。在抓取下一条消息之前,它需要能够处理消息(大约需要 30-60 秒)。目前,我拥有的代码会获取所有可能的消息,然后尝试分叉进程。当队列中有 3-5 条消息时,这很好,但是对于 20、50 或 100 条消息,这会导致服务器 运行 内存不足。
我试过使 .consume()
回调函数异步并将 await
添加到消息处理函数。我尝试在 processMessage
周围的 .consume()
回调中包装一个 await new Promise
。我尝试将 await
添加到调用 channel.consume
的行。没有任何改变行为。
#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromQueue = consumeFromQueue;
作为旁注,如果我创建一个字符串数组并循环遍历字符串,当我将 await 添加到 processMessage
行时,它会等待执行进程(30-60 秒),然后再处理下一个字符串。
(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();
所以我基本上需要这样的功能,但需要在 RabbitMQ 中监听队列。
如果您想限制消费者在任何给定时间处理的消息数量,请使用 channel.prefetch():
The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.
也就是说,如果您只想在继续处理下一条消息之前一次处理一条消息,请设置 channel.prefetch(1)
继续下一步,设置 channel.prefetch(1)
channel.prefetch(1)