使用 Go 从 Google Pub Sub 读取所有可用消息
Reading all the available messages from Google Pub Sub using Go
我正在尝试从 google pub-sub 中的主题获取所有可用消息。
但是在 go 中,一旦 Pub-Sub 中没有更多消息剩余,我找不到可以取消接收回调的配置。
我认为的一种方法是使用 Google 云监控 Api 在这个答案 中描述的来获取来自 Pub-Sub 的消息总数,然后保留一个计算读取的消息数并在计数等于该数字时调用取消,但我不确定这是否是继续前进的正确方法。
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == TotalNumberOfMessages {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
我也尝试过使用带超时的上下文,即获取直到未满足此上下文截止日期,然后取消。
ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}
但话又说回来,我无法确定所有消息都已处理。
请提出一个解决方案,确保当 Pub-Sub 中没有剩余消息时 subscription.Receive 函数停止。
我已经在我以前的公司实现了(遗憾的是我不再有代码,它在我以前的公司 git...)。然而它奏效了。
原理如下
msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
msg <- m
})
for {
select {
case res := <-msg:
fmt.Fprintf(w, "Got message: %q\n", string(res.Data))
res.Ack()
case <-time.After(3 * time.Second):
fmt.Println("timeout")
cancel()
}
}
我正在尝试从 google pub-sub 中的主题获取所有可用消息。 但是在 go 中,一旦 Pub-Sub 中没有更多消息剩余,我找不到可以取消接收回调的配置。
我认为的一种方法是使用 Google 云监控 Api 在这个答案
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == TotalNumberOfMessages {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
我也尝试过使用带超时的上下文,即获取直到未满足此上下文截止日期,然后取消。
ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}
但话又说回来,我无法确定所有消息都已处理。
请提出一个解决方案,确保当 Pub-Sub 中没有剩余消息时 subscription.Receive 函数停止。
我已经在我以前的公司实现了(遗憾的是我不再有代码,它在我以前的公司 git...)。然而它奏效了。
原理如下
msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
msg <- m
})
for {
select {
case res := <-msg:
fmt.Fprintf(w, "Got message: %q\n", string(res.Data))
res.Ack()
case <-time.After(3 * time.Second):
fmt.Println("timeout")
cancel()
}
}