如何将输入传递给 beam.Flatten()?

How to pass input to beam.Flatten()?

我开始使用带有 python 的 Apache Beam,但每 30 分钟就会卡住一次。我正在尝试展平然后转换:

lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
output = ( lines
           | 'process' >> beam.Map(process_xmls) # returns list
           | 'jsons' >> beam.Map(lambda x: [beam.Create(jsons.dump(model)) for model in x])
           | 'flatten' >> beam.Flatten()
           | beam.WindowInto(window.FixedWindows(1, 0)))

所以在 运行 这段代码之后我得到了这个错误:

ValueError: Input to Flatten must be an iterable. Got a value of type <class 'apache_beam.pvalue.PCollection'> instead.

我该怎么办?

beam.Flatten() 操作采用可迭代的 PCollections 和 returns 一个新的 PCollection,它包含输入 PCollections 中所有元素的并集。不可能有 PCollection 的 PCollection。

我认为您在这里寻找的是 beam.FlatMap 操作。这与 beam.Map 的不同之处在于它为每个输入发出多个元素。例如,如果您有一个包含元素 {'two', 'words'} 的 pcollection lines,那么

lines | beam.Map(list)

将是由两个列表组成的 PCollection

{['t', 'w', 'o'], ['w', 'o', 'r', 'd', 's']}

lines | beam.FlatMap(list)

会导致 PCollection 由几个字母组成

{'t', 'w', 'o', 'w', 'o', 'r', 'd', 's'}.

因此你的最终程序看起来像

lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
output = ( lines
           | 'process' >> beam.FlatMap(process_xmls) # concatinates all lists returned by process_xmls into a single PCollection
           | 'jsons' >> beam.Map(jsons.dumps)  # apply json.dumps to each element
           | beam.WindowInto(window.FixedWindows(1, 0)))

(另请注意 json.dumps,返回字符串,可能是您想要的,而不是 json.dump,它采用第二个参数作为要写入的 file/stream。