Creating a template for DataFlow throws the error AttributeError: 'RuntimeValueProvider' object has no attribute 'tableId'

Creating a template for DataFlow throws the error AttributeError: 'RuntimeValueProvider' object has no attribute 'tableId'

我用 Apache Beam 创建了一个管道,它在 DataFlow 运行器上成功运行。我正在尝试创建一个模板,但是当为 apache_beam.io.gcp.bigquery.WriteToBigQuery 转换器使用 RuntimeValueProvider 时,会抛出以下错误:

AttributeError: 'RuntimeValueProvider' 对象没有属性 'tableId'.

代码(摘录)如下所示:

class ProcessOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--output_gcs',
            dest='output_gcs',
            default='gs://nlp-text-classification/results/Whosebug_template',
            type=str,
            required=False,
            help='Google Cloud Storage Path.')

        parser.add_value_provider_argument(
            '--output_bq_table',
            dest='output_bq_table',
            default='nlp-text-classification:Whosebug.template_test',
            type=str,
            required=False,
            help='BigQuery table.')

process_options = options.view_as(ProcessOptions)

with beam.Pipeline(options=options) as p:
    [...]
    "Write Posts to BigQuery" >> beam.io.WriteToBigQuery(table=process_options.output_bq_table,
                                                         schema=table_schema)
    [...]

这是一个错误还是我做错了什么?

目前 Dataflow 使用默认不支持模板的 runner-native 源。我们确实有一个支持模板的 Beam 源。要使用它,您必须使用以下标志启用实验。

--experiment=use_beam_bq_sink

而不是 'Write To BigQuery' >> beam.io.Write(beam.io.BigQuerySink(..)) 我使用了 beam.io.WriteToBigQuery(..) 并且还使用了 --experiment=use_beam_bq_sink 这对我有用,我不再收到以下错误 AttributeError: 'RuntimeValueProvider' object has no attribute 'datasetId