Google DataFlow:从 BigQuery 读取,合并三个字符串字段,将 key/value 字段写入 Google Cloud Spanner

Google DataFlow: Read from BigQuery, combine three string fields, write key/value fields to Google Cloud Spanner

None 提供的 DataFlow 模板符合我需要做的事情,所以我正在尝试编写自己的模板。我设法 运行 示例代码,如字数统计示例,没有问题,所以我尝试将从 BigQuery 读取并写入 Spanner 的部分单独示例拼凑在一起,但源代码中有太多我不明白的东西无法适应自己的问题

我真的很迷茫,非常感谢任何帮助!

目标是使用 DataFlow 和 Apache Beam SDK 从具有 3 个字符串字段和 1 个整数字段的 BigQuery table 中读取,然后将 3 个字符串字段的内容连接成一个字符串并将新的string 在一个名为 "key" 的新字段中,然后我想将键字段和整数字段(未更改)写入已经存在的 Spanner table,理想情况下使用新键追加行并更新具有已存在键的行的整数字段。

我正在尝试在 Java 中执行此操作,因为 Python 没有 i/o 连接器。非常感谢任何关于使用 Python 执行此操作的建议。

现在,如果我能从 BigQuery 读取 table 并将我从 table 得到的任何内容写入 Spanner 中的 table,我会非常高兴,但我可以甚至让这成为现实。

问题:

老实说,我什至都不好意思展示我正在尝试的代码 运行。

public class SimpleTransfer {

    public static void main(String[] args) {
        // Create and set your PipelineOptions.
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

        // For Cloud execution, set the Cloud Platform project, staging location, and specify DataflowRunner.
        options.setProject("myproject");
        options.setStagingLocation("gs://mybucket");
        options.setRunner(DataflowRunner.class);

        // Create the Pipeline with the specified options.
        Pipeline p = Pipeline.create(options);

        String tableSpec = "database.mytable";

        // read whole table from bigquery
        rowsFromBigQuery =
            p.apply(
                BigQueryIO.readTableRows()
                    .from(tableSpec);

        // Hopefully some day add a transform

        // Somehow make a Mutation
        PCollection<Mutation> mutation = rowsFromBigQuery;

        // Only way I found to write to Spanner, not even sure if that works.
        SpannerWriteResult result = mutation.apply(
            SpannerIO.write().withInstanceId("myinstance").withDatabaseId("mydatabase").grouped());

        p.run().waitUntilFinish();

    }
}

处理这些奇怪的数据类型令人生畏,但一旦您习惯了 TableRowMutation 类型,您将能够编写强大的管道代码。

您需要做的第一件事是获取 TableRow 中的 PCollection,并将它们转换为方便您使用的中间格式。让我们使用 Beam 的 KV,它定义了一个键值对。在以下代码片段中,我们从 TableRow 中提取值,并连接您想要的字符串:

rowsFromBigQuery
            .apply(
                MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
                                                     TypeDescriptors.integers()))
                    .via(tableRow -> KV.of(
                               (String) tableRow.get("myKey1")
                               + (String) tableRow.get("myKey2")
                               + (String) tableRow.get("myKey3"),
                               (Integer) tableRow.get("myIntegerField"))))

最后,为了写入 Spanner,我们使用 Mutation 类型的对象,它定义了我们想要应用于 Spanner 中的行的突变类型。我们将使用另一个 MapElements 转换来完成它,它需要 N 个输入和 returns N 个输出。我们在此处定义 insert 或 update 突变:

myKvPairsPCollection
            .apply(
                MapElements.into(TypeDescriptor.of(Mutation.class))
                    .via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
                                    .set("key").to(elm.getKey())
                                    .set("value").to(elm.getValue()));

然后你可以将输出传递给SpannerIO.write。整个管道看起来像这样:

        Pipeline p = Pipeline.create(options);

        String tableSpec = "database.mytable";

        // read whole table from bigquery
        PCollection<TableRow> rowsFromBigQuery =
            p.apply(
                BigQueryIO.readTableRows().from(tableSpec));

        // Take in a TableRow, and convert it into a key-value pair
        PCollection<Mutation> mutations = rowsFromBigQuery
            // First we make the TableRows into the appropriate key-value
            // pair of string key and integer.
            .apply(
                MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
                                                     TypeDescriptors.integers()))
                    .via(tableRow -> KV.of(
                               (String) tableRow.get("myKey1")
                               + (String) tableRow.get("myKey2")
                               + (String) tableRow.get("myKey3"),
                               (Integer) tableRow.get("myIntegerField"))))
            // Now we construct the mutations
            .apply(
                MapElements.into(TypeDescriptor.of(Mutation.class))
                    .via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
                                    .set("key").to(elm.getKey())
                                    .set("value").to(elm.getValue()));

        // Now we pass the mutations to spanner
        SpannerWriteResult result = mutations.apply(
            SpannerIO.write()
                    .withInstanceId("myinstance")
                    .withDatabaseId("mydatabase").grouped());

        p.run().waitUntilFinish();

    }