如果没有收到响应,如何使 RabbitMQConsumer 超时
How to timeout RabbitMQConsumer if it didn`t receive response
我得到了使用来自 RabbitMQ 队列的消息的代码。如果它在一段时间内没有收到消息,它应该会失败。
msgs, err := ch.Consume(
c.ResponseQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
...
loop:
for timeout := time.After(time.Second); ; {
select {
case <-timeout:
log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
return errors.New("Failed to receive response for action")
default:
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
return err
}
ch.Ack(d.DeliveryTag, false)
break loop
}
}
}
}
我从 RabbitMQ 手册中获取了使用代码并尝试了一些实现超时的建议。我知道如何在 Java 中做到这一点,但不能在 Golang 中重复它。
提前致谢。
更新:
已将 select 更改为:
c1 := make(chan error, 1)
go func() {
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
c1 <- err
}
ch.Ack(d.DeliveryTag, false)
c1 <- nil
}
}
}()
select {
case <-time.After(defaultTimeout * time.Second):
log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
return errors.New("Failed to receive response in time for action")
case err := <-c1:
failOnError(err, "Failed to process response")
}
return err
现在它按预期工作 - 如果它没有收到具有正确 corellationId 的消息,它将因超时而失败。谢谢大家的帮助。
您的循环有一个 select
,有 2 种情况:超时和 default
分支。进入循环后不会触发超时,因此执行 default
分支。
default
分支在 msgs
通道上包含一个 for range
,它一直从通道接收数据,直到它关闭(并且已从中接收到所有值)。通常这不应该发生,因此不会重新访问超时情况(仅当发生某些错误并且 msgs
已关闭时)。
而是在循环内部使用 select
,有 2 种情况,一种是超时,另一种是只从 msgs
接收一个值。如果收到消息,则重新开始超时。对于可重启的定时器,使用 time.Timer
.
timeout := time.Second
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg)
// Reset timer: it must be stopped first
// (and drain its channel if it reports false)
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
}
}
查看此 Go Playground 示例以查看实际效果。
请注意,如果您不需要在收到消息后重置计时器,只需注释掉重置代码即可。此外,如果不需要重置,time.After()
更简单:
timeout := time.After(time.Second)
for {
select {
case <-timeout:
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg, time.Now())
}
}
在 Go Playground 上试试这个。
最后一点:如果您在超时发生之前从循环中跳出,则后台计时器不会立即释放(仅在超时发生时)。如果你经常需要这个操作,你可以使用 context.WithTimeout()
to obtain a context.Context
和一个取消函数,你可以在返回之前立即调用它来释放定时器资源(最好是延迟的)。
这是它的样子:
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg, time.Now())
}
}
在 Go Playground 上试试这个。
已将 select 更改为:
c1 := make(chan error, 1)
go func() {
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
c1 <- err
}
ch.Ack(d.DeliveryTag, false)
c1 <- nil
}
}
}()
select {
case <-time.After(defaultTimeout * time.Second):
log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
return errors.New("Failed to receive response in time for action")
case err := <-c1:
failOnError(err, "Failed to process response")
}
return err
我得到了使用来自 RabbitMQ 队列的消息的代码。如果它在一段时间内没有收到消息,它应该会失败。
msgs, err := ch.Consume(
c.ResponseQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
...
loop:
for timeout := time.After(time.Second); ; {
select {
case <-timeout:
log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
return errors.New("Failed to receive response for action")
default:
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
return err
}
ch.Ack(d.DeliveryTag, false)
break loop
}
}
}
}
我从 RabbitMQ 手册中获取了使用代码并尝试了一些实现超时的建议。我知道如何在 Java 中做到这一点,但不能在 Golang 中重复它。
提前致谢。
更新:
已将 select 更改为:
c1 := make(chan error, 1)
go func() {
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
c1 <- err
}
ch.Ack(d.DeliveryTag, false)
c1 <- nil
}
}
}()
select {
case <-time.After(defaultTimeout * time.Second):
log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
return errors.New("Failed to receive response in time for action")
case err := <-c1:
failOnError(err, "Failed to process response")
}
return err
现在它按预期工作 - 如果它没有收到具有正确 corellationId 的消息,它将因超时而失败。谢谢大家的帮助。
您的循环有一个 select
,有 2 种情况:超时和 default
分支。进入循环后不会触发超时,因此执行 default
分支。
default
分支在 msgs
通道上包含一个 for range
,它一直从通道接收数据,直到它关闭(并且已从中接收到所有值)。通常这不应该发生,因此不会重新访问超时情况(仅当发生某些错误并且 msgs
已关闭时)。
而是在循环内部使用 select
,有 2 种情况,一种是超时,另一种是只从 msgs
接收一个值。如果收到消息,则重新开始超时。对于可重启的定时器,使用 time.Timer
.
timeout := time.Second
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg)
// Reset timer: it must be stopped first
// (and drain its channel if it reports false)
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
}
}
查看此 Go Playground 示例以查看实际效果。
请注意,如果您不需要在收到消息后重置计时器,只需注释掉重置代码即可。此外,如果不需要重置,time.After()
更简单:
timeout := time.After(time.Second)
for {
select {
case <-timeout:
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg, time.Now())
}
}
在 Go Playground 上试试这个。
最后一点:如果您在超时发生之前从循环中跳出,则后台计时器不会立即释放(仅在超时发生时)。如果你经常需要这个操作,你可以使用 context.WithTimeout()
to obtain a context.Context
和一个取消函数,你可以在返回之前立即调用它来释放定时器资源(最好是延迟的)。
这是它的样子:
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg, time.Now())
}
}
在 Go Playground 上试试这个。
已将 select 更改为:
c1 := make(chan error, 1)
go func() {
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
c1 <- err
}
ch.Ack(d.DeliveryTag, false)
c1 <- nil
}
}
}()
select {
case <-time.After(defaultTimeout * time.Second):
log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
return errors.New("Failed to receive response in time for action")
case err := <-c1:
failOnError(err, "Failed to process response")
}
return err