流媒体流 (Apache beam /PYTHON) 上的使用问题 add_value_provider_argument

Usage problem add_value_provider_argument on a streaming stream ( Apache beam /PYTHON)

我们想使用函数参数创建自定义数据流模板add_value_provider_argument

无法在不输入 add_value_provider_argument ()

中定义的变量的情况下启动以下命令
class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):     
        parser.add_value_provider_argument(
            '--input_topic',
            help='The Cloud Pub/Sub topic to read from.\n'
                 '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".'
        )
        parser.add_value_provider_argument(
            '--window_size',
            type=float,
            default=1.0,
            help='Output file\'s window size in number of minutes.'
        )
        parser.add_value_provider_argument(
            '--output_path',
            help='GCS Path of the output file including filename prefix.'
        )

def run():
    pipeline_options = PipelineOptions(streaming=True, save_main_session=True)
    custom_options = pipeline_options.view_as(UserOptions)

    with beam.Pipeline(options=custom_options)as pipeline:
        print ("cecei est un test", custom_options.input_topic)
        (pipeline 
         | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=custom_options.input_topic.get())
         | 'Window into' >> GroupWindowsIntoBatches(custom_options.window_size.get())
         | 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(custom_options.output_path.get()))

        )               

if __name__ == '__main__':
    run()

我用

执行这个文件
python luckycart_check.py \
    --runner DataflowRunner \
    --project $PROJECT_NAME \
    --staging_location gs://$BUCKET_NAME/staging \
    --temp_location gs://$BUCKET_NAME/temp \
    --template_location gs://$BUCKET_NAME/templates/luckycartTEMPLATE \

我收到以下错误:

 File "/home/jupyter/env/local/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 106, in get
    '%s.get() not called from a runtime context' % self)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input_topic, type: str, default_value: None).get() not called from a runtime context
(env) jupyter@luckykart:~/clement/terraform/basics$ 

如果您在创建管道时没有指定 --input_topic,它将是 RuntimeValueProvider 类型,这意味着您只能 get() 它的值,当 Dataflow 作业是 运行。这是正常的。

一些转换如 WriteToBigQuery 接受 ValueProvider 个参数(没有 .get())。但是,ReadFromPubSub 目前不接受 ValueProvider 参数,因为它是作为 Dataflow 中的本机转换实现的。

有关使用 ValueProvider 创建模板的更多信息,请参阅此文档:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates