如何检查主题队列是否为空然后终止订阅者?

How to check if the topic-queue is empty and then terminate the subscriber?

在我的业务应用程序中,我必须定期批处理来自主题的所有消息,因为它比以先到先得的方式处理它们便宜。我目前计划这样做的方式是让订阅者每 T 小时运行一次 cronjob。我目前正在解决的问题是如何在处理完所有消息后终止订阅者。我想每 T 小时启动 cronjob,让订阅者使用主题队列中的所有消息并终止。据我了解,没有 pub-sub Java API 告诉我主题队列是否为空。我提出了以下两种解决方案:

  1. 创建异步拉取的订阅者。在它消耗所有消息时休眠 t minutes,然后使用 subscriber.stopAsync().awaitTerminated(); 终止它。在这种方法中,我有可能在终止订阅者之前不会使用所有消息。一个 google 例子 here

  2. 使用Pub/Sub Cloud monitoringfind the value of the metric subscription/num_undelivered_messages. Then pull that many messages using the synchronous pull example provided by Google here。然后终止订阅者。

有更好的方法吗?

谢谢!

一个月前我在 Go 中完成了同样的实现。我的假设如下:

  • 如果队列中有消息,应用会非常快速地使用它们(2 条消息之间的间隔小于 100 毫秒)。
  • 如果队列为空(我的应用程序已完成对所有消息的消费),新消息可以到达但速度低于 100 毫秒

因此,我实现了这个: * 每次我收到一条消息, * 我暂停100ms超时 * 我处理并确认消息 * 我将 100 毫秒超时重置为 0 * 如果触发 100 毫秒超时,我将终止我的请求订阅

在我的用例中,我安排每 10 分钟处理一次。所以,我在 9 分钟 30 设置了一个全局超时来完成处理,让新的应用程序实例继续处理

只是一件棘手的事情:对于第一条消息,将超时设置为 2 秒。实际上,由于连接建立,第一条消息消息需要更长的时间才能到达。因此在你初始化超时时设置一个标志 "is the first message or not".

如果对你的实现有帮助,我可以分享我的 Go 代码。

编辑

这是我关于消息处理的 Go 代码

func (pubSubService *pubSubService) Received() (msgArray []*pubsub.Message, err error) {
    ctx := context.Background()
    cctx, cancel := context.WithCancel(ctx)

    // Connect to PubSub
    client, err := pubsub.NewClient(cctx, pubSubService.projectId)
    if err != nil {
        log.Fatalf("Impossible to connect to pubsub client for project %s", pubSubService.projectId)
    }

    // Put all the message in a array. It will be processed at the end (stored to BQ, as is)
    msgArray = []*pubsub.Message{}

    // Channel to receive messages
    var receivedMessage = make(chan *pubsub.Message)

    // Handler to receive message (through the channel) or cancel the the context if the timeout is reached
    go func() {
        //Initial timeout because the first received is longer than this others.
        timeOut := time.Duration(3000)
        for {
            select {
            case msg := <-receivedMessage:
                //After the first receive, the timeout is changed
                timeOut = pubSubService.waitTimeOutInMillis // Environment variable = 200
                msgArray = append(msgArray, msg)
            case <-time.After(timeOut * time.Millisecond):
                log.Debug("Cancel by timeout")
                cancel()
                return
            }
        }
    }()

    // Global timeout
    go func(){
        timeOut = pubSubService.globalWaitTimeOutInMillis // Environment variable = 750
        time.Sleep(timeOut * time.Second):
        log.Debug("Cancel by global timeout")
        cancel()
        return
    }

    // Connect to the subscription and pull it util the channel is canceled
    sub := client.Subscription(pubSubService.subscriptionName)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
        receivedMessage <- msg
        msg.Ack()
    })
}

可能值得考虑 Cloud Pub/Sub 是否是适合这种情况的技术。如果要进行批处理,最好将数据存储在 Google 云存储或数据库中。云 Pub/Sub 确实最适合连续发送 pulling/processing 消息。

您的两个建议是试图确定何时没有更多消息需要处理。没有真正干净的方法来做到这一点。您的第一个建议是可行的,但请记住,虽然大多数消息将以极快的速度传递,但可能会有一些异常值需要更长的时间才能发送给您的订阅者。如果处理 all 条未完成的消息至关重要,则此方法可能行不通。但是,如果下次启动订阅者时偶尔处理消息是可以的,那么您可以使用这种方法。最好按照 guillaum blaquiere 的建议设置自上次收到消息后的计时器,尽管我会使用 1 分钟左右的超时而不是 100 毫秒。

您的第二个建议是监控未送达消息的数量,然后发送拉取请求以检索那么多消息,这种方法并不可行。首先,拉取请求的 max_messages 属性 不 保证 将返回最多 max_messages 的所有可用消息。可以在拉取响应中取回零条消息,但仍有未送达的消息。因此,您必须保留收到的消息数并尝试匹配 num_undelivered_messages 指标。在这种情况下,您必须考虑重复交付以及 Stackdriver 监控指标可能滞后于实际值这一事实。如果该值太大,您可能正在尝试获取您不会收到的消息。如果该值太小,您可能无法收到所有消息。

在这两种方法中,跟踪自上次收到消息以来多长时间的方法是更好的方法,但有提到的注意事项。