根据常见输入在 Apache Beam 中分支和合并 pcollection 列表

Branching and Merging pcollection list in Apache Beam from common input

我正在构建数据流管道,但在分支和合并输出时遇到了一些问题。我要搭建的管道如下:

  1. 读取一些输入数据input_data
  2. 一个。在 input_data 上提取一些指标 metric_1。 B. 在 input_data
  3. 上提取一些其他指标 metric_2
  4. 由于这些指标提取的计算量很大,我想从主 input_data 分支出来,然后合并输出以进行进一步计算。合并输出 output.

下面是封装了我的实际管道的一些示例代码

class ReadData(beam.DoFn):
    def process(self, element):
        # read from source
        return [{'input': np.random.rand(100,10)}]


class GetFirstMetric(beam.DoFn):
    def process(self, element):
        # some processing
        return [{'first': np.random.rand(100,4)}]


class GetSecondMetric(beam.DoFn):
    def process(self, element):
        # some processing
        return [{'second': np.random.rand(100,3)}]


def run():
    with beam.Pipeline() as p:
        input_data = (p | 'read sample data' >> beam.ParDo(ReadData()))

        metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
        metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))

        output = ((metric_1, metric_2) 
                  | beam.Flatten()
                  | beam.combiners.ToList()
                  | beam.Map(print)
        )

当我 运行 执行此操作时,出现 'PBegin' object has no attribute 'windowing' 错误。我在 Java 中看到了一些用于执行此类操作的示例和示例代码。但是我在 Python 中找不到合适的资源来做同样的事情。我的问题如下:

  1. 分支和合并 pcollection 的正确方法是什么(特别是如果分支来自公共输入)?

  2. 是否有更好的流水线设计来实现同样的目的?

提前致谢!

在此代码中,您的问题是您不是 'starting' 初始 PCollection。在 ReadData.process 中 - 变量 element 的值是多少?

嗯,运行ner 无法得出一个值,因为没有输入 pcollection。您需要创建您的第一个 PCollection。您将执行类似于以下代码的操作...

至于将它们放入列表中 - 辅助输入策略可能会奏效。您可以尝试以下操作吗:

def run():
    with beam.Pipeline() as p:
        starter_pcoll = p | beam.Create(['any'])
        input_data = (starter_pcoll | 'read sample data' >> beam.ParDo(ReadData()))

        metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
        metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))

        side_in = beam.pvalue.AsList((metric_1, metric_2) 
                                     | beam.Flatten())

        p | beam.Create(['any']) | beam.Map(lambda x, si: print(si),
                                            side_in)

这应该使您的管道 运行。很高兴进一步澄清您的具体问题。