使用 ValueProvider 从 Dataflow 模板读取 BigQuery 时出现异常

Exception when reading BigQuery from Dataflow template using ValueProvider

我正在尝试创建一个模板以从 BigQuery 中读取,不幸的是我在尝试构建模板时遇到异常。

An exception occured while executing the Java class. Cannot call validate if table is dynamically set.

正在阅读 the documentation,似乎在从批处理模板读取 BigQuery 时调用了一个特殊函数:

Note: If you want to run a batch pipeline that reads from BigQuery, you must use .withTemplateCompatibility() on all BigQuery reads.

所以,这是我的代码片段:

PCollection<Discount> discountFromBigQuery = p.apply("Parse Discounts from BigQuery", BigQueryIO.read((SerializableFunction<SchemaAndRecord, Discount>) record -> {
        GenericRecord row = record.getRecord();
        return new Discount(row);
    }).withTemplateCompatibility().from(options.getBigQueryDiscountPath()).withCoder(SerializableCoder.of(Discount.class)));

显然,options.getBigQueryDiscountPath()ValueProvider<String>

那么,我怎样才能摆脱这个错误并模板化 BigQuery 读取部分?

这是我使用的 Maven 依赖项:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>com.google.cloud.dataflow</groupId>
    <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
    <version>2.5.0</version>
</dependency>

我相信您遇到的错误已定义 here。请注意解释,其中提到

Note that a table or query check can fail if the table or dataset are created by earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.

要解决这个问题,请尝试在 BigQueryIO.read 调用中添加 withoutValidation method

顺便说一下,withoutValidation() 需要添加到链的末尾,如下所示。

    // queryString is of type ValueProvider<String>
    PCollection<TableRow> rowsFromBigQuery = pipeline.apply(
                BigQueryIO.readTableRows()
                        .fromQuery(queryString)
                        .usingStandardSql()
                        .withMethod(options.getReadMethod())
                        .withoutValidation());