Dataflow into Beam Pipeline 中的附加参数

The Additional Paramates at Dataflow into Beam Pipeline

我正在研究数据流,我已经通过 Python SDK 构建了自定义管道。 我想将数据流 UI 中的参数添加到我的自定义管道中。 使用附加参数。 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue

引用

然后我将 add_argument 更改为 add_value_provider_argument,然后是 google docs

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):        
        parser.add_value_provider_argument(
            "--input_topic",
            type = str,
        )
        parser.add_value_provider_argument(
            "--window_size",
            type = int,
            default = 5,
        )

def run():
    pipeline_options = PipelineOptions(pipeline_args, .....)
    custom_param = pipeline_options.view_as(CustomParams)
    .....
    pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)

之后,我尝试为 GCP 制作模板。上传脚本看起来像

  python custom_pipeline.py \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

但是我在创建上传到 GCS 的模板时遇到了错误,像这样

TypeError: expected string or bytes-like object

在第 beam.io.ReadFromPubSub()

看起来我从 add_value_provider_argument 得到的东西是 RuntimeValueProvider 对象。所以我很困惑我必须做些什么来解决这个问题?

我尝试解决这个问题

转换数据类型

beam.io.ReadFromPubSub(str(custom_param.input_topic))

但是出现了这个错误,

ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").

那么请问有人对此有疑难解答吗?我不知道该怎么做。

如@mk_sta

所述

It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?

并在 ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.

中进行了解释

您可以查看 I/O methods that accept runtime parameters 了解不同 SDK 对 ValueProvider 的支持。

所以此时,如果你从PythonSDK切换到JavaSDK,PubSubIO的Read是支持ValueProvider的。