在 pardo 类名中使用 args

using args in pardo classname

是否可以在pardo(classname(args))中给出参数。因为我试过了,但它说过程函数需要 3 个参数,给定了 2 个。我也试过 init func.. 没有希望。请帮忙。如果可能的话,让我知道如何重写这段代码。 注意:splitcols 和 filtercols 工作正常,因为没有给出参数。

    import apache_beam as beam
    class splitcols(beam.DoFn):
      def process(self,elements):
        return [elements.split(',')]
    class filtercols(beam.DoFn):
      def process(self,elements):
        if elements[1]=='Drs.':
          return [elements]
    class addvals(beam.DoFn):
      def process(self,elements,a): #here I tried to accept the arg
        return [(elements[a],1)]
    p1 = beam.Pipeline()
    attendance_count = (
        p1
        |beam.io.ReadFromText('100Records.csv')
        |beam.ParDo(splitcols())
        |beam.ParDo(filtercols())
        |beam.ParDo(addvals(2))  #here I tried to give args
        #|beam.CombinePerKey(sum)
        #|beam.Map(lambda employee: str(employee))
        |beam.io.WriteToText('data/pardooutput')
    )
    p1.run()

您可以像在问题的评论中那样将参数传递给 __init__,或者将它们作为辅助输入传递给 Pardo:beam.Pardo(addvals(), 2).

在此处查看类似示例:https://beam.apache.org/documentation/programming-guide/#side-inputs