数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection

Dataflow: Using Top module with Python SDK: single-element PCollection

我正在查看 incubator-beam 存储库中的 word_counting.py 示例(从 Dataflow 文档链接),我想修改它以获得出现次数最多的 n。这是我的管道:

  counts = (lines
        | 'split' >> (beam.ParDo(WordExtractingDoFn())
                      .with_output_types(unicode))
        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
        | 'group' >> beam.GroupByKey()
        | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

我使用 Top.Of() 方法添加了一行,但它似乎是 returns 一个 PCollection,其中一个数组作为单个元素(我是等待有序的 PCollection,但查看文档似乎 PCollections 是无序的集合。

当管道运行时,beam.Map 仅遍历一个元素(即整个数组)并且在 'format' 中,lambda 函数引发错误,因为它无法将整个数组映射到元组 (word,c)

我应该如何在不中断流水线的情况下处理这个单元素 PCollection?

如果你想把一个PCollection的iterables扩展成一个PCollection的这些iterables的元素,你可以使用FlatMap,它的参数是一个从元素到一个函数结果的可迭代:在你的例子中,元素本身就是可迭代的,所以我们使用恒等函数。

  counts = ...
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
        | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  ...