使用发布者确认 RabbitMQ,在什么情况下发布者会收到关于 success/failure 的通知?

Using publisher confirms with RabbitMQ, in which cases publisher will be notified about success/failure?

引用《深入 RabbitMQ》一书:

A Basic.Ack request is sent to a publisher when a message that it has published has been directly consumed by consumer applications on all queues it was routed to or that the message was enqueued and persisted if requested.

Has been directly consumed混淆,是不是意味着消费者发送ack给broker publisher会被告知消费者处理消息成功?或者这意味着当消费者刚从队列中收到消息时,发布者会收到通知?

or that the message was enqueued and persisted if requested。这就像连词或出版商会在其中任何一种发生时得到通知吗? (在这种情况下,发布者会收到两次通知)

使用 node.jsamqplib 想检查实际发生了什么:

// consumer.js
amqp.connect(...)
.then(connection => connection.createChannel())
.then(() => { assert exchange here })
.then(() => { assert queue here })
.then(() => { bind queue and exchange here })
.then(() => {
  channel.consume(QUEUE, (message) => {
    console.log('Raw RabbitMQ message received', message)

    // Simulate some job to do
    setTimeout(() => {
      channel.ack(message, false)
    }, 5000})

  }, { noAck: false })
})

// publisher.js
amqp.connect(...)
.then(connection => connection.createConfirmChannel())
.then(() => { assert exchange here })
.then(() => {
  channel.publish(exchange, routingKey, new Buffer(...),{}, (err, ok) => {
    if (err) {
      console.log('Error from handling confirmation on publisher side', err)
    } else {
      console.log('From handling confirmation on publisher side', ok)
    }
  })
})

运行 示例,我可以看到以下日志:

From handling confirmation on publisher side undefined
Raw RabbitMQ message received
Time to ack the message

据我所知,至少通过这个日志,只有当消息入队时才会通知发布者?(所以让消费者 ack 发送消息将不以任何方式影响出版商)

进一步引用:

If a message cannot be routed, the broker will send a Basic.Nack RPC request indicating the failure. It is then up to the publisher to decide what to do with the message.

更改上面的示例,我只将消息的路由键更改为不应路由到任何地方的内容(没有匹配路由键的绑定),从日志中我可以看到 只有 关注。

From handling confirmation on publisher side undefined

现在我更困惑了,这里究竟通知了哪个发布者?我会理解它会收到一个错误,例如 Can't route anywhere,这将与上面的引用保持一致。但是正如您所看到的,err 没有定义并且作为附带问题,即使 amqplib 在他们的官方文档中使用 (err, ok),在任何一个案例中我都没有看到那些定义。所以这里的输出与上面的例子一样,上面的例子和不可路由的消息有什么不同。

那么我在做什么,发布者将在什么时候收到有关消息发生的确切情况的通知?任何一个可以使用 PublisherConfirms 的具体例子?从上面的日志中,我会得出结论,在您希望 100% 确定消息已入队的情况下使用它是很好的。

默认情况下,发布者对消费者一无所知。

PublisherConfirms用于检查消息是否到达代理,但不检查消息是否已入队。

您可以使用 mandatory 标志来确保邮件已被路由 看到这个 https://www.rabbitmq.com/reliability.html

To ensure messages are routed to a single known queue, the producer can just declare a destination queue and publish directly to it. If messages may be routed in more complex ways but the producer still needs to know if they reached at least one queue, it can set the mandatory flag on a basic.publish, ensuring that a basic.return (containing a reply code and some textual explanation) will be sent back to the client if no queues were appropriately bound.

经过反复搜索,我找到了这个 http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

基本规则如下:

  1. 在 basic.return
  2. 之后立即确认 un-routable 强制消息或即时消息
  3. 临时消息在入队时即被确认
  4. 持久消息在持久化到磁盘或在每个队列上被消费时被确认。

If more than one of these conditions are met, only the first causes a confirm to be sent. Every published message will be confirmed sooner or later and no message will be confirmed more than once.

我不完全确定关于 ack/nack 问题的通知,但是查看 BunnyBus 节点库以获得更简单的 api 和 RabbitMQ 管理:)

https://github.com/xogroup/bunnybus

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus({
    user: 'your-user',
    vhost: 'your-vhost', // cloudamqp defaults vhost to the username
    password: 'your-password',
    server: 'your.server.com'
});

const handler = {
    'test.event': (message, ack) => {

        // Do your work here.

        // acknowledge the message off of the bus.
        return ack();
    }
};

// Create exchange and queue if they do not already exist and then auto connect.
return bunnyBus.subscribe('test', handler)
    .then(() => {

        return bunnyBus.publish({event: 'test.event', body: 'here\'s the thing.'});
    })
    .catch(console.log);