如何在 Apache Beam 中计算时间 window 内的元素,并在计数达到某个阈值时发出数据?

How to count elements inside a time window in Apache beam, and emit the data when the count reach some threshold?

我想在 window 时间内计算来自无限源(我使用 Pub/Sub 作为源)的每个键的元素,并在达到某个阈值时发出计数结果。例如,我想在固定时间 window 的 10 分钟内对元素进行计数,并在计数 > 5 时将结果发送给另一个 Pub/Sub。

    transformation = (p
                    | beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
                    | 'parse' >> beam.Map(json.loads)
                    | beam.WindowInto(window.FixedWindows(600))
                    | 'count' >> beam.combiners.Count.PerKey()
                    | 'filter' >> beam.Filter(lambda data: data['count'] > 5))
    transformation | beam.io.WriteToPubSub(known_args.output_topic)

然而,写入Pub/Sub的结果似乎被延迟了,据我估计,结果在window时间到期后发出。我还需要立即发出结果吗?window option/code

您可以在 the documentation 中看到您可以在 window 上添加触发器。

转到第 9.3 节,您会看到 AfterCount 触发器的定义,它会在每次“计数完成”时发出数据。

我没在Python里写过这个(只在Java里写过),但是代码应该是这样的

    transformation = (p
                    | beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
                    | 'parse' >> beam.Map(json.loads)
                    | beam.WindowInto(window.FixedWindows(600),
                        trigger=AfterCount(5))
                    | 'count' >> beam.combiners.Count.PerKey()
                    | 'filter' >> beam.Filter(lambda data: data['count'] > 5))
    transformation | beam.io.WriteToPubSub(known_args.output_topic)

为了确保处理所有元素(如果 window 末尾的元素少于 5 个),您需要构建一个复合触发器,如下所示

trigger=Repeatedly(
        AfterAny(AfterCount(5), AfterProcessingTime(10 * 60))),