使用 apache beam 执行设置差异

Perform set difference with apach beam

我有两个列表ab,它们之间有一些共同的元素,我想找到那些共同的元素及其数量,为此我编写了以下程序。

import functools
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.direct.direct_runner import DirectRunner
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()

p = beam.Pipeline(InteractiveRunner(underlying_runner=DirectRunner()), options=options)

def form_pair(element, side_input):
  for i,e in enumerate(side_input):
    if e == element:
      return i,e


a = ['a','b', 'c', 'c', 'b']
b = ['a','a','b', 'c', 'b', 'b','d', 'e', 'f']

x0 = p | "0" >> beam.Create(a) | "1" >> beam.Distinct()
x1 = beam.pvalue.AsList(x0)


x3 = p | "2" >> beam.Create(b)
x4 = x3 | "3" >> beam.Map(functools.partial(form_pair, side_input=x1))
x5 = x4 | "4" >> beam.combiners.Count.PerKey()


r = p.run().wait_until_finish()

print(r.get(x5))

这是给我以下错误

TypeError: 'AsList' object is not iterable [while running '3']

我错误地将侧输入传递给 beam.Map 函数这是正确的方法

x4 = x3 | "3" >> beam.Map(form_pair, x1) 而不是 x4 = x3 | "3" >> beam.Map(functools.partial(form_pair, side_input=x1)) 这是错误的。