如果没有收到响应,如何使 RabbitMQConsumer 超时

How to timeout RabbitMQConsumer if it didn`t receive response

我得到了使用来自 RabbitMQ 队列的消息的代码。如果它在一段时间内没有收到消息,它应该会失败。

msgs, err := ch.Consume(
        c.ResponseQueue, // queue
        "",              // consumer
        false,           // auto-ack
        false,           // exclusive
        false,           // no-local
        false,           // no-wait
        nil,             // args
    )
    failOnError(err, "Failed to register a consumer")

...

loop:
    for timeout := time.After(time.Second); ; {
        select {
        case <-timeout:
            log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
            return errors.New("Failed to receive response for action")
        default:
            for d := range msgs {
                if corrID == d.CorrelationId {
                    err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
                    if err != nil {
                        return err
                    }
                    ch.Ack(d.DeliveryTag, false)
                    break loop
                }
            }
        }
    }

我从 RabbitMQ 手册中获取了使用代码并尝试了一些实现超时的建议。我知道如何在 Java 中做到这一点,但不能在 Golang 中重复它。

提前致谢。

更新:

已将 select 更改为:

c1 := make(chan error, 1)
    go func() {
        for d := range msgs {
            if corrID == d.CorrelationId {
                err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
                if err != nil {
                    c1 <- err
                }
                ch.Ack(d.DeliveryTag, false)
                c1 <- nil
            }
        }
    }()

    select {
    case <-time.After(defaultTimeout * time.Second):
        log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
        return errors.New("Failed to receive response in time for action")
    case err := <-c1:
        failOnError(err, "Failed to process response")
    }
    return err

现在它按预期工作 - 如果它没有收到具有正确 corellationId 的消息,它将因超时而失败。谢谢大家的帮助。

您的循环有一个 select,有 2 种情况:超时和 default 分支。进入循环后不会触发超时,因此执行 default 分支。

default 分支在 msgs 通道上包含一个 for range,它一直从通道接收数据,直到它关闭(并且已从中接收到所有值)。通常这不应该发生,因此不会重新访问超时情况(仅当发生某些错误并且 msgs 已关闭时)。

而是在循环内部使用 select,有 2 种情况,一种是超时,另一种是只从 msgs 接收一个值。如果收到消息,则重新开始超时。对于可重启的定时器,使用 time.Timer.

timeout := time.Second
timer := time.NewTimer(timeout)
for {
    select {
    case <-timer.C:
        fmt.Println("timeout, returning")
        return
    case msg := <-msgs:
        fmt.Println("received message:", msg)
        // Reset timer: it must be stopped first
        // (and drain its channel if it reports false)
        if !timer.Stop() {
            <-timer.C
        }
        timer.Reset(timeout)
    }
}

查看此 Go Playground 示例以查看实际效果。

请注意,如果您不需要在收到消息后重置计时器,只需注释掉重置代码即可。此外,如果不需要重置,time.After() 更简单:

timeout := time.After(time.Second)
for {
    select {
    case <-timeout:
        fmt.Println("timeout, returning")
        return
    case msg := <-msgs:
        fmt.Println("received message:", msg, time.Now())
    }
}

Go Playground 上试试这个。

最后一点:如果您在超时发生之前从循环中跳出,则后台计时器不会立即释放(仅在超时发生时)。如果你经常需要这个操作,你可以使用 context.WithTimeout() to obtain a context.Context 和一个取消函数,你可以在返回之前立即调用它来释放定时器资源(最好是延迟的)。

这是它的样子:

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for {
    select {
    case <-ctx.Done():
        fmt.Println("timeout, returning")
        return
    case msg := <-msgs:
        fmt.Println("received message:", msg, time.Now())
    }
}

Go Playground 上试试这个。

已将 select 更改为:

c1 := make(chan error, 1)
    go func() {
        for d := range msgs {
            if corrID == d.CorrelationId {
                err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
                if err != nil {
                    c1 <- err
                }
                ch.Ack(d.DeliveryTag, false)
                c1 <- nil
            }
        }
    }()

    select {
    case <-time.After(defaultTimeout * time.Second):
        log.Printf("Failed to receive response for action %+v\n Payload: %+v\nError: %+v\n", action, body, err)
        return errors.New("Failed to receive response in time for action")
    case err := <-c1:
        failOnError(err, "Failed to process response")
    }
    return err