Apache Beam - Python:如何使用 Accumulation 获取 PCollection 的前 10 个元素?

Apache Beam - Python : How to get the top 10 elements of a PCollection with Accumulation?

我想像这样提取前 10 个最高分:

Paul - 38
Michel - 27
Hugo - 27
Bob - 24
Kevin - 19
...
(10 elements)

我正在使用一个固定的 window 和一个数据驱动的触发器,该触发器在窗格收集 X 个元素后输出早期结果。 另外,我正在使用组合器来获得前 10 名最高分。

(inputs
         | 'Apply Window of time' >> beam.WindowInto(
                        beam.window.FixedWindows(size=5 * 60))
                        trigger=trigger.Repeatedly(trigger.AfterCount(2)),
                  accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
         | beam.ParDo(PairWithOne()) # ('key', 1)
         | beam.CombinePerKey(sum)
         | 'Top 10 scores' >> beam.CombineGlobally(
                        beam.combiners.TopCombineFn(n=10,
                                                    compare=lambda a, b: a[1] < b[
                                                        1])).without_defaults())

这里的问题是第一个输出似乎是正确的,但以下输出包含类似这样的重复键:

Paul - 38
Paul - 37
Michel - 27
Paul - 36
Michel - 26
Kevin - 20
...
(10 elements)

如您所见,我没有得到 10 个不同的 K/V 对,而是得到了重复的键。

当不使用 trigger/accumulation 策略时,这很有效..但是如果我想要 window 2 小时,我想获得频繁的更新...

正如评论中所讨论的,一种可能性是过渡到 Beam SDK 2.13.0 的 Discarding fired panes, which can be set via accumulation_mode=trigger.AccumulationMode.DISCARDING. If you still want to keep the ACCUMULATING mode you might want to modify TopCombineFn so that repeated panes from the same user overwrite the previous value and avoid duplicated keys. TopDistinctFn will take as a base the code here。在 add_input 方法中,我们将进行如下检查:

for current_top_element in enumerate(heap):
  if element[0] == current_top_element[1].value[0]:
    heap[current_top_element[0]] = heap[-1]
    heap.pop()
    heapq.heapify(heap)

基本上,我们会将我们正在评估的元素 (element[0]) 的键与堆中的每个元素进行比较。堆元素的类型是ComparableValue so we can use value to get back the tuple (and value[0] to get the key). If they match we'll want to pop it out from the heap (as we are accumulating the sum will be greater). Beam SDK uses the heapq library so I based my approach on this answer来移除i-th元素(我们使用enumerate来保留索引信息)。

我添加了一些日志记录以帮助检测重复项:

logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))

代码位于 combiners 文件夹内的 top.py 文件中(带有 __init__.py),我将其导入:

from combiners.top import TopDistinctFn

然后,我可以在管道中使用 TopDistinctFn,如下所示:

(inputs
     | 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
     | 'Apply Window of time' >> beam.WindowInto(
                    beam.window.FixedWindows(size=10*60),
                    trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
                    accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
     | 'Sum Score' >> beam.CombinePerKey(sum)   
     | 'Top 10 scores' >> beam.CombineGlobally(
                    TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
     | 'Print results' >> beam.ParDo(PrintTop10Fn()))

可以在 here. generate_messages.py is the Pub/Sub message generator, top.py contains the custom version of TopCombineFn renamed TopDistinctFn (might look overwhelming but I only added a few lines of code starting at line 425) and test_combine.py the main pipeline code. To run this you can put the files in the correct folder, install Beam SDK 2.13.0 if needed, modify project ID and Pub/Sub topic in generate_messages.py and test_combine-py. Then, start publishing messages with python generate_messages.py and, in a different shell, run the pipeline with the DirectRunner: python test_combine.py --streaming. With DataflowRunner you'll probably need to add the extra files 文件中找到完整代码 setup.py

举个例子,Bob领先9分,下一次更新时,他的分数达到11分。他将出现在下一次回顾中,只有更新的分数,没有重复的分数(在我们的日志记录中检测到)。带有 9 分的条目将不会出现,但顶部仍会根据需要有 10 个用户。 Marta 也是如此。我注意到旧分数仍然出现在堆中,即使不在前 10 名中,但我不确定垃圾收集如何与 heapq.

一起工作
INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]

让我知道这是否也适用于您的用例。