没有消费者,但是当我发布一些东西时,为什么会立即返回一个 ack? (golang/rabbitmq)
No consumers, yet when I publish something an ack is immediately returned why? (golang/rabbitmq)
以下是我使用的发布商代码。在消息从队列中取出之前它需要确认。它应该打印出它从消费者那里收到 Ack 或 nack(在代码的底部)。如果你只是 运行 下面的发布者代码本身(没有同时 运行 消费者代码),它应该只是挂起,等待 ack 或 nack 但它没有,它打印出来一个 ack,就好像消费者已经发送了它一样。所以如果我有任何代码错误,我会感到困惑。
基本代码我使用了rabbitmq官方教程中的代码:https://www.rabbitmq.com/tutorials/tutorial-one-go.html
对于 ack/nack 部分代码,我遵循了这个:https://agocs.org/blog/2014/08/19/rabbitmq-best-practices-in-go/
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
ch.Confirm(false)
ack, nack := ch.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "hello"
err = ch.Publish(
"", // exchange
q.Name, // routing key
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
select {
case tag := <-ack:
log.Println("Acked ", tag)
case tag := <-nack:
log.Println("Nack alert! ", tag)
}
}
您将发布者的确认 acks 和 nacks 与消费者的 side acks 和 nacks 混淆了。
文档指出:
For unroutable messages, the broker will issue a confirm once the
exchange verifies a message won't route to any queue (returns an empty
list of queues). If the message is also published as mandatory, the
basic.return is sent to the client before basic.ack. The same is true
for negative acknowledgements (basic.nack).
For routable messages, the basic.ack is sent when a message has been
accepted by all the queues. For persistent messages routed to durable
queues, this means persisting to disk. For mirrored queues, this means
that all mirrors have accepted the message.
所以你看到了正确的行为。 RabbitMQ 正在确认消息已到达队列。
以下是我使用的发布商代码。在消息从队列中取出之前它需要确认。它应该打印出它从消费者那里收到 Ack 或 nack(在代码的底部)。如果你只是 运行 下面的发布者代码本身(没有同时 运行 消费者代码),它应该只是挂起,等待 ack 或 nack 但它没有,它打印出来一个 ack,就好像消费者已经发送了它一样。所以如果我有任何代码错误,我会感到困惑。
基本代码我使用了rabbitmq官方教程中的代码:https://www.rabbitmq.com/tutorials/tutorial-one-go.html
对于 ack/nack 部分代码,我遵循了这个:https://agocs.org/blog/2014/08/19/rabbitmq-best-practices-in-go/
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
ch.Confirm(false)
ack, nack := ch.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "hello"
err = ch.Publish(
"", // exchange
q.Name, // routing key
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
select {
case tag := <-ack:
log.Println("Acked ", tag)
case tag := <-nack:
log.Println("Nack alert! ", tag)
}
}
您将发布者的确认 acks 和 nacks 与消费者的 side acks 和 nacks 混淆了。
文档指出:
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.
所以你看到了正确的行为。 RabbitMQ 正在确认消息已到达队列。