Dataflow into Beam Pipeline 中的附加参数
The Additional Paramates at Dataflow into Beam Pipeline
我正在研究数据流,我已经通过 Python SDK 构建了自定义管道。
我想将数据流 UI 中的参数添加到我的自定义管道中。
使用附加参数。 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue
引用
然后我将 add_argument
更改为 add_value_provider_argument
,然后是 google docs
class CustomParams(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input_topic",
type = str,
)
parser.add_value_provider_argument(
"--window_size",
type = int,
default = 5,
)
def run():
pipeline_options = PipelineOptions(pipeline_args, .....)
custom_param = pipeline_options.view_as(CustomParams)
.....
pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)
之后,我尝试为 GCP 制作模板。上传脚本看起来像
python custom_pipeline.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
但是我在创建上传到 GCS 的模板时遇到了错误,像这样
TypeError: expected string or bytes-like object
在第 beam.io.ReadFromPubSub()
行
看起来我从 add_value_provider_argument
得到的东西是 RuntimeValueProvider 对象。所以我很困惑我必须做些什么来解决这个问题?
我尝试解决这个问题
转换数据类型
beam.io.ReadFromPubSub(str(custom_param.input_topic))
但是出现了这个错误,
ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").
那么请问有人对此有疑难解答吗?我不知道该怎么做。
如@mk_sta
所述
It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?
并在 、ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.
中进行了解释
您可以查看 I/O methods that accept runtime parameters 了解不同 SDK 对 ValueProvider
的支持。
所以此时,如果你从PythonSDK切换到JavaSDK,PubSubIO的Read是支持ValueProvider的。
我正在研究数据流,我已经通过 Python SDK 构建了自定义管道。 我想将数据流 UI 中的参数添加到我的自定义管道中。 使用附加参数。 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue
引用然后我将 add_argument
更改为 add_value_provider_argument
,然后是 google docs
class CustomParams(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input_topic",
type = str,
)
parser.add_value_provider_argument(
"--window_size",
type = int,
default = 5,
)
def run():
pipeline_options = PipelineOptions(pipeline_args, .....)
custom_param = pipeline_options.view_as(CustomParams)
.....
pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)
之后,我尝试为 GCP 制作模板。上传脚本看起来像
python custom_pipeline.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
但是我在创建上传到 GCS 的模板时遇到了错误,像这样
TypeError: expected string or bytes-like object
在第 beam.io.ReadFromPubSub()
看起来我从 add_value_provider_argument
得到的东西是 RuntimeValueProvider 对象。所以我很困惑我必须做些什么来解决这个问题?
我尝试解决这个问题
转换数据类型
beam.io.ReadFromPubSub(str(custom_param.input_topic))
但是出现了这个错误,
ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").
那么请问有人对此有疑难解答吗?我不知道该怎么做。
如@mk_sta
所述It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?
并在 ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.
您可以查看 I/O methods that accept runtime parameters 了解不同 SDK 对 ValueProvider
的支持。
所以此时,如果你从PythonSDK切换到JavaSDK,PubSubIO的Read是支持ValueProvider的。