在 Python Apache Beam 中使用值提供程序参数的方法
Ways of using value provider parameter in Python Apache Beam
现在我只能使用 ParDo 获取 class 中的 RunTime 值,还有其他方法可以像在我的函数中那样使用运行时参数吗?
这是我现在得到的代码:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--firestore_document',default='')
def run(argv=None):
parser = argparse.ArgumentParser()
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p
| 'Create inputs' >> beam.Create([''])
| 'Call Firestore' >> beam.ParDo(
CallFirestore(user_options.firestore_document))
| 'Read DB2' >> beam.Map(ReadDB2))
我希望 user_options.firestore_document 无需执行 ParDo
即可用于其他功能
您可以使用价值提供者的唯一方式是在 ParDos 和 Combines 中。不可能在创建中传递值提供者,但您可以定义一个 DoFn,returns 您在构造函数中传递给它的值提供者:
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp):
self.vp = vp
def process(self, unused_elm):
yield self.vp.get()
在您的管道中,您将执行以下操作:
user_options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=pipeline_options) as p:
my_value_provided_pcoll = (
p
| beam.Create([None])
| beam.ParDo(OutputValueProviderFn(user_options.firestore_document))
这样你就不会在 Create 中使用它,因为这是不可能的,但你仍然可以在 PCollection 中获得它。
现在我只能使用 ParDo 获取 class 中的 RunTime 值,还有其他方法可以像在我的函数中那样使用运行时参数吗?
这是我现在得到的代码:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--firestore_document',default='')
def run(argv=None):
parser = argparse.ArgumentParser()
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p
| 'Create inputs' >> beam.Create([''])
| 'Call Firestore' >> beam.ParDo(
CallFirestore(user_options.firestore_document))
| 'Read DB2' >> beam.Map(ReadDB2))
我希望 user_options.firestore_document 无需执行 ParDo
即可用于其他功能您可以使用价值提供者的唯一方式是在 ParDos 和 Combines 中。不可能在创建中传递值提供者,但您可以定义一个 DoFn,returns 您在构造函数中传递给它的值提供者:
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp):
self.vp = vp
def process(self, unused_elm):
yield self.vp.get()
在您的管道中,您将执行以下操作:
user_options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=pipeline_options) as p:
my_value_provided_pcoll = (
p
| beam.Create([None])
| beam.ParDo(OutputValueProviderFn(user_options.firestore_document))
这样你就不会在 Create 中使用它,因为这是不可能的,但你仍然可以在 PCollection 中获得它。