在 pardo 类名中使用 args
using args in pardo classname
是否可以在pardo(classname(args))中给出参数。因为我试过了,但它说过程函数需要 3 个参数,给定了 2 个。我也试过 init func.. 没有希望。请帮忙。如果可能的话,让我知道如何重写这段代码。
注意:splitcols 和 filtercols 工作正常,因为没有给出参数。
import apache_beam as beam
class splitcols(beam.DoFn):
def process(self,elements):
return [elements.split(',')]
class filtercols(beam.DoFn):
def process(self,elements):
if elements[1]=='Drs.':
return [elements]
class addvals(beam.DoFn):
def process(self,elements,a): #here I tried to accept the arg
return [(elements[a],1)]
p1 = beam.Pipeline()
attendance_count = (
p1
|beam.io.ReadFromText('100Records.csv')
|beam.ParDo(splitcols())
|beam.ParDo(filtercols())
|beam.ParDo(addvals(2)) #here I tried to give args
#|beam.CombinePerKey(sum)
#|beam.Map(lambda employee: str(employee))
|beam.io.WriteToText('data/pardooutput')
)
p1.run()
您可以像在问题的评论中那样将参数传递给 __init__
,或者将它们作为辅助输入传递给 Pardo:beam.Pardo(addvals(), 2)
.
在此处查看类似示例:https://beam.apache.org/documentation/programming-guide/#side-inputs
是否可以在pardo(classname(args))中给出参数。因为我试过了,但它说过程函数需要 3 个参数,给定了 2 个。我也试过 init func.. 没有希望。请帮忙。如果可能的话,让我知道如何重写这段代码。 注意:splitcols 和 filtercols 工作正常,因为没有给出参数。
import apache_beam as beam
class splitcols(beam.DoFn):
def process(self,elements):
return [elements.split(',')]
class filtercols(beam.DoFn):
def process(self,elements):
if elements[1]=='Drs.':
return [elements]
class addvals(beam.DoFn):
def process(self,elements,a): #here I tried to accept the arg
return [(elements[a],1)]
p1 = beam.Pipeline()
attendance_count = (
p1
|beam.io.ReadFromText('100Records.csv')
|beam.ParDo(splitcols())
|beam.ParDo(filtercols())
|beam.ParDo(addvals(2)) #here I tried to give args
#|beam.CombinePerKey(sum)
#|beam.Map(lambda employee: str(employee))
|beam.io.WriteToText('data/pardooutput')
)
p1.run()
您可以像在问题的评论中那样将参数传递给 __init__
,或者将它们作为辅助输入传递给 Pardo:beam.Pardo(addvals(), 2)
.
在此处查看类似示例:https://beam.apache.org/documentation/programming-guide/#side-inputs