如何检查主题队列是否为空然后终止订阅者?
How to check if the topic-queue is empty and then terminate the subscriber?
在我的业务应用程序中,我必须定期批处理来自主题的所有消息,因为它比以先到先得的方式处理它们便宜。我目前计划这样做的方式是让订阅者每 T
小时运行一次 cronjob
。我目前正在解决的问题是如何在处理完所有消息后终止订阅者。我想每 T
小时启动 cronjob
,让订阅者使用主题队列中的所有消息并终止。据我了解,没有 pub-sub
Java API 告诉我主题队列是否为空。我提出了以下两种解决方案:
创建异步拉取的订阅者。在它消耗所有消息时休眠 t minutes
,然后使用 subscriber.stopAsync().awaitTerminated();
终止它。在这种方法中,我有可能在终止订阅者之前不会使用所有消息。一个 google 例子 here
使用Pub/Sub Cloud monitoring
到find the value of the metric subscription/num_undelivered_messages
. Then pull that many messages using the synchronous pull example provided by Google here。然后终止订阅者。
有更好的方法吗?
谢谢!
一个月前我在 Go 中完成了同样的实现。我的假设如下:
- 如果队列中有消息,应用会非常快速地使用它们(2 条消息之间的间隔小于 100 毫秒)。
- 如果队列为空(我的应用程序已完成对所有消息的消费),新消息可以到达但速度低于 100 毫秒
因此,我实现了这个:
* 每次我收到一条消息,
* 我暂停100ms超时
* 我处理并确认消息
* 我将 100 毫秒超时重置为 0
* 如果触发 100 毫秒超时,我将终止我的请求订阅
在我的用例中,我安排每 10 分钟处理一次。所以,我在 9 分钟 30 设置了一个全局超时来完成处理,让新的应用程序实例继续处理
只是一件棘手的事情:对于第一条消息,将超时设置为 2 秒。实际上,由于连接建立,第一条消息消息需要更长的时间才能到达。因此在你初始化超时时设置一个标志 "is the first message or not".
如果对你的实现有帮助,我可以分享我的 Go 代码。
编辑
这是我关于消息处理的 Go 代码
func (pubSubService *pubSubService) Received() (msgArray []*pubsub.Message, err error) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
// Connect to PubSub
client, err := pubsub.NewClient(cctx, pubSubService.projectId)
if err != nil {
log.Fatalf("Impossible to connect to pubsub client for project %s", pubSubService.projectId)
}
// Put all the message in a array. It will be processed at the end (stored to BQ, as is)
msgArray = []*pubsub.Message{}
// Channel to receive messages
var receivedMessage = make(chan *pubsub.Message)
// Handler to receive message (through the channel) or cancel the the context if the timeout is reached
go func() {
//Initial timeout because the first received is longer than this others.
timeOut := time.Duration(3000)
for {
select {
case msg := <-receivedMessage:
//After the first receive, the timeout is changed
timeOut = pubSubService.waitTimeOutInMillis // Environment variable = 200
msgArray = append(msgArray, msg)
case <-time.After(timeOut * time.Millisecond):
log.Debug("Cancel by timeout")
cancel()
return
}
}
}()
// Global timeout
go func(){
timeOut = pubSubService.globalWaitTimeOutInMillis // Environment variable = 750
time.Sleep(timeOut * time.Second):
log.Debug("Cancel by global timeout")
cancel()
return
}
// Connect to the subscription and pull it util the channel is canceled
sub := client.Subscription(pubSubService.subscriptionName)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
receivedMessage <- msg
msg.Ack()
})
}
可能值得考虑 Cloud Pub/Sub 是否是适合这种情况的技术。如果要进行批处理,最好将数据存储在 Google 云存储或数据库中。云 Pub/Sub 确实最适合连续发送 pulling/processing 消息。
您的两个建议是试图确定何时没有更多消息需要处理。没有真正干净的方法来做到这一点。您的第一个建议是可行的,但请记住,虽然大多数消息将以极快的速度传递,但可能会有一些异常值需要更长的时间才能发送给您的订阅者。如果处理 all 条未完成的消息至关重要,则此方法可能行不通。但是,如果下次启动订阅者时偶尔处理消息是可以的,那么您可以使用这种方法。最好按照 guillaum blaquiere 的建议设置自上次收到消息后的计时器,尽管我会使用 1 分钟左右的超时而不是 100 毫秒。
您的第二个建议是监控未送达消息的数量,然后发送拉取请求以检索那么多消息,这种方法并不可行。首先,拉取请求的 max_messages
属性 不 保证 将返回最多 max_messages
的所有可用消息。可以在拉取响应中取回零条消息,但仍有未送达的消息。因此,您必须保留收到的消息数并尝试匹配 num_undelivered_messages
指标。在这种情况下,您必须考虑重复交付以及 Stackdriver 监控指标可能滞后于实际值这一事实。如果该值太大,您可能正在尝试获取您不会收到的消息。如果该值太小,您可能无法收到所有消息。
在这两种方法中,跟踪自上次收到消息以来多长时间的方法是更好的方法,但有提到的注意事项。
在我的业务应用程序中,我必须定期批处理来自主题的所有消息,因为它比以先到先得的方式处理它们便宜。我目前计划这样做的方式是让订阅者每 T
小时运行一次 cronjob
。我目前正在解决的问题是如何在处理完所有消息后终止订阅者。我想每 T
小时启动 cronjob
,让订阅者使用主题队列中的所有消息并终止。据我了解,没有 pub-sub
Java API 告诉我主题队列是否为空。我提出了以下两种解决方案:
创建异步拉取的订阅者。在它消耗所有消息时休眠
t minutes
,然后使用subscriber.stopAsync().awaitTerminated();
终止它。在这种方法中,我有可能在终止订阅者之前不会使用所有消息。一个 google 例子 here使用
Pub/Sub Cloud monitoring
到find the value of the metricsubscription/num_undelivered_messages
. Then pull that many messages using the synchronous pull example provided by Google here。然后终止订阅者。
有更好的方法吗?
谢谢!
一个月前我在 Go 中完成了同样的实现。我的假设如下:
- 如果队列中有消息,应用会非常快速地使用它们(2 条消息之间的间隔小于 100 毫秒)。
- 如果队列为空(我的应用程序已完成对所有消息的消费),新消息可以到达但速度低于 100 毫秒
因此,我实现了这个: * 每次我收到一条消息, * 我暂停100ms超时 * 我处理并确认消息 * 我将 100 毫秒超时重置为 0 * 如果触发 100 毫秒超时,我将终止我的请求订阅
在我的用例中,我安排每 10 分钟处理一次。所以,我在 9 分钟 30 设置了一个全局超时来完成处理,让新的应用程序实例继续处理
只是一件棘手的事情:对于第一条消息,将超时设置为 2 秒。实际上,由于连接建立,第一条消息消息需要更长的时间才能到达。因此在你初始化超时时设置一个标志 "is the first message or not".
如果对你的实现有帮助,我可以分享我的 Go 代码。
编辑
这是我关于消息处理的 Go 代码
func (pubSubService *pubSubService) Received() (msgArray []*pubsub.Message, err error) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
// Connect to PubSub
client, err := pubsub.NewClient(cctx, pubSubService.projectId)
if err != nil {
log.Fatalf("Impossible to connect to pubsub client for project %s", pubSubService.projectId)
}
// Put all the message in a array. It will be processed at the end (stored to BQ, as is)
msgArray = []*pubsub.Message{}
// Channel to receive messages
var receivedMessage = make(chan *pubsub.Message)
// Handler to receive message (through the channel) or cancel the the context if the timeout is reached
go func() {
//Initial timeout because the first received is longer than this others.
timeOut := time.Duration(3000)
for {
select {
case msg := <-receivedMessage:
//After the first receive, the timeout is changed
timeOut = pubSubService.waitTimeOutInMillis // Environment variable = 200
msgArray = append(msgArray, msg)
case <-time.After(timeOut * time.Millisecond):
log.Debug("Cancel by timeout")
cancel()
return
}
}
}()
// Global timeout
go func(){
timeOut = pubSubService.globalWaitTimeOutInMillis // Environment variable = 750
time.Sleep(timeOut * time.Second):
log.Debug("Cancel by global timeout")
cancel()
return
}
// Connect to the subscription and pull it util the channel is canceled
sub := client.Subscription(pubSubService.subscriptionName)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
receivedMessage <- msg
msg.Ack()
})
}
可能值得考虑 Cloud Pub/Sub 是否是适合这种情况的技术。如果要进行批处理,最好将数据存储在 Google 云存储或数据库中。云 Pub/Sub 确实最适合连续发送 pulling/processing 消息。
您的两个建议是试图确定何时没有更多消息需要处理。没有真正干净的方法来做到这一点。您的第一个建议是可行的,但请记住,虽然大多数消息将以极快的速度传递,但可能会有一些异常值需要更长的时间才能发送给您的订阅者。如果处理 all 条未完成的消息至关重要,则此方法可能行不通。但是,如果下次启动订阅者时偶尔处理消息是可以的,那么您可以使用这种方法。最好按照 guillaum blaquiere 的建议设置自上次收到消息后的计时器,尽管我会使用 1 分钟左右的超时而不是 100 毫秒。
您的第二个建议是监控未送达消息的数量,然后发送拉取请求以检索那么多消息,这种方法并不可行。首先,拉取请求的 max_messages
属性 不 保证 将返回最多 max_messages
的所有可用消息。可以在拉取响应中取回零条消息,但仍有未送达的消息。因此,您必须保留收到的消息数并尝试匹配 num_undelivered_messages
指标。在这种情况下,您必须考虑重复交付以及 Stackdriver 监控指标可能滞后于实际值这一事实。如果该值太大,您可能正在尝试获取您不会收到的消息。如果该值太小,您可能无法收到所有消息。
在这两种方法中,跟踪自上次收到消息以来多长时间的方法是更好的方法,但有提到的注意事项。