使用 Go 从 Google Pub Sub 读取所有可用消息

Reading all the available messages from Google Pub Sub using Go

我正在尝试从 google pub-sub 中的主题获取所有可用消息。 但是在 go 中,一旦 Pub-Sub 中没有更多消息剩余,我找不到可以取消接收回调的配置。

我认为的一种方法是使用 Google 云监控 Api 在这个答案 中描述的来获取来自 Pub-Sub 的消息总数,然后保留一个计算读取的消息数并在计数等于该数字时调用取消,但我不确定这是否是继续前进的正确方法。

    var mu sync.Mutex
    received := 0
    sub := client.Subscription(subID)
    cctx, cancel := context.WithCancel(ctx)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
            mu.Lock()
            defer mu.Unlock()
            fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
            msg.Ack()
            received++
            if received == TotalNumberOfMessages {
                    cancel()
            }
    })
    if err != nil {
            return fmt.Errorf("Receive: %v", err)
    }

我也尝试过使用带超时的上下文,即获取直到未满足此上下文截止日期,然后取消。

ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}

但话又说回来,我无法确定所有消息都已处理。

请提出一个解决方案,确保当 Pub-Sub 中没有剩余消息时 subscription.Receive 函数停止。

我已经在我以前的公司实现了(遗憾的是我不再有代码,它在我以前的公司 git...)。然而它奏效了。

原理如下

msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
    msg <- m
    })
for {
  select {
    case res := <-msg:
      fmt.Fprintf(w, "Got message: %q\n", string(res.Data))
      res.Ack()
  
    case <-time.After(3 * time.Second):
        fmt.Println("timeout")
        cancel()
    }
}