如何在 Apache Beam 中计算时间 window 内的元素,并在计数达到某个阈值时发出数据?
How to count elements inside a time window in Apache beam, and emit the data when the count reach some threshold?
我想在 window 时间内计算来自无限源(我使用 Pub/Sub 作为源)的每个键的元素,并在达到某个阈值时发出计数结果。例如,我想在固定时间 window 的 10 分钟内对元素进行计数,并在计数 > 5 时将结果发送给另一个 Pub/Sub。
transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)
然而,写入Pub/Sub的结果似乎被延迟了,据我估计,结果在window时间到期后发出。我还需要立即发出结果吗?window option/code
您可以在 the documentation 中看到您可以在 window 上添加触发器。
转到第 9.3 节,您会看到 AfterCount
触发器的定义,它会在每次“计数完成”时发出数据。
我没在Python里写过这个(只在Java里写过),但是代码应该是这样的
transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600),
trigger=AfterCount(5))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)
为了确保处理所有元素(如果 window 末尾的元素少于 5 个),您需要构建一个复合触发器,如下所示
trigger=Repeatedly(
AfterAny(AfterCount(5), AfterProcessingTime(10 * 60))),
我想在 window 时间内计算来自无限源(我使用 Pub/Sub 作为源)的每个键的元素,并在达到某个阈值时发出计数结果。例如,我想在固定时间 window 的 10 分钟内对元素进行计数,并在计数 > 5 时将结果发送给另一个 Pub/Sub。
transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)
然而,写入Pub/Sub的结果似乎被延迟了,据我估计,结果在window时间到期后发出。我还需要立即发出结果吗?window option/code
您可以在 the documentation 中看到您可以在 window 上添加触发器。
转到第 9.3 节,您会看到 AfterCount
触发器的定义,它会在每次“计数完成”时发出数据。
我没在Python里写过这个(只在Java里写过),但是代码应该是这样的
transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600),
trigger=AfterCount(5))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)
为了确保处理所有元素(如果 window 末尾的元素少于 5 个),您需要构建一个复合触发器,如下所示
trigger=Repeatedly(
AfterAny(AfterCount(5), AfterProcessingTime(10 * 60))),