流媒体流 (Apache beam /PYTHON) 上的使用问题 add_value_provider_argument
Usage problem add_value_provider_argument on a streaming stream ( Apache beam /PYTHON)
我们想使用函数参数创建自定义数据流模板add_value_provider_argument
无法在不输入 add_value_provider_argument ()
中定义的变量的情况下启动以下命令
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input_topic',
help='The Cloud Pub/Sub topic to read from.\n'
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".'
)
parser.add_value_provider_argument(
'--window_size',
type=float,
default=1.0,
help='Output file\'s window size in number of minutes.'
)
parser.add_value_provider_argument(
'--output_path',
help='GCS Path of the output file including filename prefix.'
)
def run():
pipeline_options = PipelineOptions(streaming=True, save_main_session=True)
custom_options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=custom_options)as pipeline:
print ("cecei est un test", custom_options.input_topic)
(pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=custom_options.input_topic.get())
| 'Window into' >> GroupWindowsIntoBatches(custom_options.window_size.get())
| 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(custom_options.output_path.get()))
)
if __name__ == '__main__':
run()
我用
执行这个文件
python luckycart_check.py \
--runner DataflowRunner \
--project $PROJECT_NAME \
--staging_location gs://$BUCKET_NAME/staging \
--temp_location gs://$BUCKET_NAME/temp \
--template_location gs://$BUCKET_NAME/templates/luckycartTEMPLATE \
我收到以下错误:
File "/home/jupyter/env/local/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 106, in get
'%s.get() not called from a runtime context' % self)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input_topic, type: str, default_value: None).get() not called from a runtime context
(env) jupyter@luckykart:~/clement/terraform/basics$
如果您在创建管道时没有指定 --input_topic
,它将是 RuntimeValueProvider
类型,这意味着您只能 get()
它的值,当 Dataflow 作业是 运行。这是正常的。
一些转换如 WriteToBigQuery
接受 ValueProvider
个参数(没有 .get()
)。但是,ReadFromPubSub
目前不接受 ValueProvider
参数,因为它是作为 Dataflow 中的本机转换实现的。
有关使用 ValueProvider 创建模板的更多信息,请参阅此文档:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
我们想使用函数参数创建自定义数据流模板add_value_provider_argument
无法在不输入 add_value_provider_argument ()
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input_topic',
help='The Cloud Pub/Sub topic to read from.\n'
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".'
)
parser.add_value_provider_argument(
'--window_size',
type=float,
default=1.0,
help='Output file\'s window size in number of minutes.'
)
parser.add_value_provider_argument(
'--output_path',
help='GCS Path of the output file including filename prefix.'
)
def run():
pipeline_options = PipelineOptions(streaming=True, save_main_session=True)
custom_options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=custom_options)as pipeline:
print ("cecei est un test", custom_options.input_topic)
(pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=custom_options.input_topic.get())
| 'Window into' >> GroupWindowsIntoBatches(custom_options.window_size.get())
| 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(custom_options.output_path.get()))
)
if __name__ == '__main__':
run()
我用
执行这个文件python luckycart_check.py \
--runner DataflowRunner \
--project $PROJECT_NAME \
--staging_location gs://$BUCKET_NAME/staging \
--temp_location gs://$BUCKET_NAME/temp \
--template_location gs://$BUCKET_NAME/templates/luckycartTEMPLATE \
我收到以下错误:
File "/home/jupyter/env/local/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 106, in get
'%s.get() not called from a runtime context' % self)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input_topic, type: str, default_value: None).get() not called from a runtime context
(env) jupyter@luckykart:~/clement/terraform/basics$
如果您在创建管道时没有指定 --input_topic
,它将是 RuntimeValueProvider
类型,这意味着您只能 get()
它的值,当 Dataflow 作业是 运行。这是正常的。
一些转换如 WriteToBigQuery
接受 ValueProvider
个参数(没有 .get()
)。但是,ReadFromPubSub
目前不接受 ValueProvider
参数,因为它是作为 Dataflow 中的本机转换实现的。
有关使用 ValueProvider 创建模板的更多信息,请参阅此文档:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates