RabbitMQ pub/sub 实现不工作
RabbitMQ pub/sub implementation not working
我已将 RabbitMQ pub/sub tutorial 转换为以下虚拟测试。不知何故,它没有按预期工作。
amqpURL
是有效的 AMQP 服务(即 RabbitMQ)URL。我已经用队列示例对其进行了测试并且它有效。它以某种方式在 "exchange"
中失败
我希望 TestDummy 记录“[x] Hello World”。不知何故它没有发生。只有发送端工作正常。
我做错了什么?
import (
"fmt"
"log"
"testing"
"github.com/streadway/amqp"
)
func TestDummy(t *testing.T) {
done := exchangeReceive()
exchangeSend("Hello World")
<-done
}
func exchangeSend(msg string) {
failOnError := func(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
log.Printf("exchangeSend: connect %s", amqpURL)
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := []byte(msg)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func exchangeReceive() <-chan bool {
done := make(chan bool)
failOnError := func(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
log.Printf("exchangeReceive: connect %s", amqpURL)
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
done <- true
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
return done
}
这里有些愚蠢的错误。当 exchangeRecieve 结束时,延迟语句被触发并因此关闭连接。这就是我重写失败的原因。
我以这种方式更改了我的代码并且它起作用了:
import (
"fmt"
"os"
"testing"
"time"
"github.com/streadway/amqp"
)
func TestDummy(t *testing.T) {
amqpURL := os.Getenv("CLOUDAMQP_URL")
t.Logf(" [*] amqpURL: %s", amqpURL)
results1 := exchangeReceive(t, "consumer 1", amqpURL)
results2 := exchangeReceive(t, "consumer 2", amqpURL)
time.Sleep(50 * time.Millisecond)
exchangeSend(t, amqpURL, "Hello World")
if want, have := "Hello World", <-results1; want != have {
t.Errorf("expected %#v, got %#v", want, have)
}
if want, have := "Hello World", <-results2; want != have {
t.Errorf("expected %#v, got %#v", want, have)
}
}
func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string {
out := make(chan string)
failOnError := func(err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
t.Logf(" [x] %s received: %s", name, d.Body)
out <- string(d.Body)
}
}()
t.Logf(" [*] %s ready to receive", name)
return out
}
func exchangeSend(t *testing.T, amqpURL, msg string) {
failOnError := func(err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := []byte(msg)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
t.Logf(" [x] Sent %s", body)
}
我已将 RabbitMQ pub/sub tutorial 转换为以下虚拟测试。不知何故,它没有按预期工作。
amqpURL
是有效的 AMQP 服务(即 RabbitMQ)URL。我已经用队列示例对其进行了测试并且它有效。它以某种方式在 "exchange"
我希望 TestDummy 记录“[x] Hello World”。不知何故它没有发生。只有发送端工作正常。
我做错了什么?
import (
"fmt"
"log"
"testing"
"github.com/streadway/amqp"
)
func TestDummy(t *testing.T) {
done := exchangeReceive()
exchangeSend("Hello World")
<-done
}
func exchangeSend(msg string) {
failOnError := func(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
log.Printf("exchangeSend: connect %s", amqpURL)
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := []byte(msg)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func exchangeReceive() <-chan bool {
done := make(chan bool)
failOnError := func(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
log.Printf("exchangeReceive: connect %s", amqpURL)
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
done <- true
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
return done
}
这里有些愚蠢的错误。当 exchangeRecieve 结束时,延迟语句被触发并因此关闭连接。这就是我重写失败的原因。
我以这种方式更改了我的代码并且它起作用了:
import (
"fmt"
"os"
"testing"
"time"
"github.com/streadway/amqp"
)
func TestDummy(t *testing.T) {
amqpURL := os.Getenv("CLOUDAMQP_URL")
t.Logf(" [*] amqpURL: %s", amqpURL)
results1 := exchangeReceive(t, "consumer 1", amqpURL)
results2 := exchangeReceive(t, "consumer 2", amqpURL)
time.Sleep(50 * time.Millisecond)
exchangeSend(t, amqpURL, "Hello World")
if want, have := "Hello World", <-results1; want != have {
t.Errorf("expected %#v, got %#v", want, have)
}
if want, have := "Hello World", <-results2; want != have {
t.Errorf("expected %#v, got %#v", want, have)
}
}
func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string {
out := make(chan string)
failOnError := func(err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
t.Logf(" [x] %s received: %s", name, d.Body)
out <- string(d.Body)
}
}()
t.Logf(" [*] %s ready to receive", name)
return out
}
func exchangeSend(t *testing.T, amqpURL, msg string) {
failOnError := func(err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := []byte(msg)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
t.Logf(" [x] Sent %s", body)
}