如何解决 amqplib Channel#consume 奇怪的签名?
How to work around amqplib's Channel#consume odd signature?
我正在编写一个使用 amqplib 的 Channel#consume 方法的 worker。我希望这个工作人员等待作业并在它们出现在队列中时立即处理它们。
我写了自己的模块来抽象出 ampqlib,这里是获取连接、设置队列和消费消息的相关函数:
const getConnection = function(host) {
return amqp.connect(host);
};
const createChannel = function(conn) {
connection = conn;
return conn.createConfirmChannel();
};
const assertQueue = function(channel, queue) {
return channel.assertQueue(queue);
};
const consume = Promise.method(function(channel, queue, processor) {
processor = processor || function(msg) { if (msg) Promise.resolve(msg); };
return channel.consume(queue, processor)
});
const setupQueue = Promise.method(function setupQueue(queue) {
const amqp_host = 'amqp://' + ((host || process.env.AMQP_HOST) || 'localhost');
return getConnection(amqp_host)
.then(conn => createChannel(conn)) // -> returns a `Channel` object
.tap(channel => assertQueue(channel, queue));
});
consumeJob: Promise.method(function consumeJob(queue) {
return setupQueue(queue)
.then(channel => consume(channel, queue))
});
我的问题是 Channel#consume 的奇怪签名。来自 http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume:
#consume(queue, function(msg) {...}, [options, [function(err, ok) {...}]])
回调不是魔法发生的地方,消息的处理实际上应该在第二个参数中进行,这会破坏承诺的流程。
这就是我计划使用它的方式:
return queueManager.consumeJob(queue)
.then(msg => {
// do some processing
});
但是没用。如果队列中没有消息,则承诺被拒绝,然后如果队列中有消息被丢弃,则什么也不会发生。如果有消息,则只处理一条消息,然后 worker 停止,因为它从 Channel#consume 调用中退出了 "processor" 函数。
我该怎么办?我想保留 queueManager 抽象,这样我的代码更容易推理,但我不知道该怎么做...有任何指示吗?
正如@idbehold 所说,Promises 只能解决一次。如果您想在消息传入时对其进行处理,那么除了使用该函数之外别无他法。 Channel#get 只会检查队列一次然后return;它不适用于需要工人的场景。
只是一个选项。您可以将您的应用程序呈现为一些消息(或事件)的流。这个 http://highlandjs.org/#examples
有一个库
您的代码应该如下所示(这不是一个完整的示例,但我希望它能说明这个想法):
let messageStream = _((push, next) => {
consume(queue, (msg) => {
push(null, msg)
})
)
// now you can operate with your stream in functional style
message.map((msg) => msg + 'some value').each((msg) => // do something with msg)
这种方法为您提供了大量用于同步和转换的原语
http://highlandjs.org/#examples
我正在编写一个使用 amqplib 的 Channel#consume 方法的 worker。我希望这个工作人员等待作业并在它们出现在队列中时立即处理它们。
我写了自己的模块来抽象出 ampqlib,这里是获取连接、设置队列和消费消息的相关函数:
const getConnection = function(host) {
return amqp.connect(host);
};
const createChannel = function(conn) {
connection = conn;
return conn.createConfirmChannel();
};
const assertQueue = function(channel, queue) {
return channel.assertQueue(queue);
};
const consume = Promise.method(function(channel, queue, processor) {
processor = processor || function(msg) { if (msg) Promise.resolve(msg); };
return channel.consume(queue, processor)
});
const setupQueue = Promise.method(function setupQueue(queue) {
const amqp_host = 'amqp://' + ((host || process.env.AMQP_HOST) || 'localhost');
return getConnection(amqp_host)
.then(conn => createChannel(conn)) // -> returns a `Channel` object
.tap(channel => assertQueue(channel, queue));
});
consumeJob: Promise.method(function consumeJob(queue) {
return setupQueue(queue)
.then(channel => consume(channel, queue))
});
我的问题是 Channel#consume 的奇怪签名。来自 http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume:
#consume(queue, function(msg) {...}, [options, [function(err, ok) {...}]])
回调不是魔法发生的地方,消息的处理实际上应该在第二个参数中进行,这会破坏承诺的流程。
这就是我计划使用它的方式:
return queueManager.consumeJob(queue)
.then(msg => {
// do some processing
});
但是没用。如果队列中没有消息,则承诺被拒绝,然后如果队列中有消息被丢弃,则什么也不会发生。如果有消息,则只处理一条消息,然后 worker 停止,因为它从 Channel#consume 调用中退出了 "processor" 函数。
我该怎么办?我想保留 queueManager 抽象,这样我的代码更容易推理,但我不知道该怎么做...有任何指示吗?
正如@idbehold 所说,Promises 只能解决一次。如果您想在消息传入时对其进行处理,那么除了使用该函数之外别无他法。 Channel#get 只会检查队列一次然后return;它不适用于需要工人的场景。
只是一个选项。您可以将您的应用程序呈现为一些消息(或事件)的流。这个 http://highlandjs.org/#examples
有一个库您的代码应该如下所示(这不是一个完整的示例,但我希望它能说明这个想法):
let messageStream = _((push, next) => {
consume(queue, (msg) => {
push(null, msg)
})
)
// now you can operate with your stream in functional style
message.map((msg) => msg + 'some value').each((msg) => // do something with msg)
这种方法为您提供了大量用于同步和转换的原语
http://highlandjs.org/#examples