Google DataFlow:从 BigQuery 读取,合并三个字符串字段,将 key/value 字段写入 Google Cloud Spanner
Google DataFlow: Read from BigQuery, combine three string fields, write key/value fields to Google Cloud Spanner
None 提供的 DataFlow 模板符合我需要做的事情,所以我正在尝试编写自己的模板。我设法 运行 示例代码,如字数统计示例,没有问题,所以我尝试将从 BigQuery 读取并写入 Spanner 的部分单独示例拼凑在一起,但源代码中有太多我不明白的东西无法适应自己的问题
我真的很迷茫,非常感谢任何帮助!
目标是使用 DataFlow 和 Apache Beam SDK 从具有 3 个字符串字段和 1 个整数字段的 BigQuery table 中读取,然后将 3 个字符串字段的内容连接成一个字符串并将新的string 在一个名为 "key" 的新字段中,然后我想将键字段和整数字段(未更改)写入已经存在的 Spanner table,理想情况下使用新键追加行并更新具有已存在键的行的整数字段。
我正在尝试在 Java 中执行此操作,因为 Python 没有 i/o 连接器。非常感谢任何关于使用 Python 执行此操作的建议。
现在,如果我能从 BigQuery 读取 table 并将我从 table 得到的任何内容写入 Spanner 中的 table,我会非常高兴,但我可以甚至让这成为现实。
问题:
- 我正在使用 Maven,但我不知道我需要在 pom 文件中放入哪些依赖项
- 我不知道 java 文件开头需要哪个包和导入
- 我不知道我应该使用 readTableRows() 还是 read(SerializableFunction) 从 BigQuery 读取数据
- 我不知道如何访问 PCollection 中的字符串字段以连接它们或如何创建仅包含键和整数字段的新 PCollection
- 我需要以某种方式将 PCollection 变成一个 Mutation 以写入 Spanner
- 我想使用 INSERT UPDATE 查询写入 Spanner table,这在 Spanner i/o 连接器中似乎不是一个选项。
老实说,我什至都不好意思展示我正在尝试的代码 运行。
public class SimpleTransfer {
public static void main(String[] args) {
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For Cloud execution, set the Cloud Platform project, staging location, and specify DataflowRunner.
options.setProject("myproject");
options.setStagingLocation("gs://mybucket");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows()
.from(tableSpec);
// Hopefully some day add a transform
// Somehow make a Mutation
PCollection<Mutation> mutation = rowsFromBigQuery;
// Only way I found to write to Spanner, not even sure if that works.
SpannerWriteResult result = mutation.apply(
SpannerIO.write().withInstanceId("myinstance").withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}
}
处理这些奇怪的数据类型令人生畏,但一旦您习惯了 TableRow
和 Mutation
类型,您将能够编写强大的管道代码。
您需要做的第一件事是获取 TableRow
中的 PCollection
,并将它们转换为方便您使用的中间格式。让我们使用 Beam 的 KV
,它定义了一个键值对。在以下代码片段中,我们从 TableRow
中提取值,并连接您想要的字符串:
rowsFromBigQuery
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
TypeDescriptors.integers()))
.via(tableRow -> KV.of(
(String) tableRow.get("myKey1")
+ (String) tableRow.get("myKey2")
+ (String) tableRow.get("myKey3"),
(Integer) tableRow.get("myIntegerField"))))
最后,为了写入 Spanner,我们使用 Mutation
类型的对象,它定义了我们想要应用于 Spanner 中的行的突变类型。我们将使用另一个 MapElements
转换来完成它,它需要 N 个输入和 returns N 个输出。我们在此处定义 insert 或 update 突变:
myKvPairsPCollection
.apply(
MapElements.into(TypeDescriptor.of(Mutation.class))
.via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
.set("key").to(elm.getKey())
.set("value").to(elm.getValue()));
然后你可以将输出传递给SpannerIO.write
。整个管道看起来像这样:
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
PCollection<TableRow> rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows().from(tableSpec));
// Take in a TableRow, and convert it into a key-value pair
PCollection<Mutation> mutations = rowsFromBigQuery
// First we make the TableRows into the appropriate key-value
// pair of string key and integer.
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
TypeDescriptors.integers()))
.via(tableRow -> KV.of(
(String) tableRow.get("myKey1")
+ (String) tableRow.get("myKey2")
+ (String) tableRow.get("myKey3"),
(Integer) tableRow.get("myIntegerField"))))
// Now we construct the mutations
.apply(
MapElements.into(TypeDescriptor.of(Mutation.class))
.via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
.set("key").to(elm.getKey())
.set("value").to(elm.getValue()));
// Now we pass the mutations to spanner
SpannerWriteResult result = mutations.apply(
SpannerIO.write()
.withInstanceId("myinstance")
.withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}
None 提供的 DataFlow 模板符合我需要做的事情,所以我正在尝试编写自己的模板。我设法 运行 示例代码,如字数统计示例,没有问题,所以我尝试将从 BigQuery 读取并写入 Spanner 的部分单独示例拼凑在一起,但源代码中有太多我不明白的东西无法适应自己的问题
我真的很迷茫,非常感谢任何帮助!
目标是使用 DataFlow 和 Apache Beam SDK 从具有 3 个字符串字段和 1 个整数字段的 BigQuery table 中读取,然后将 3 个字符串字段的内容连接成一个字符串并将新的string 在一个名为 "key" 的新字段中,然后我想将键字段和整数字段(未更改)写入已经存在的 Spanner table,理想情况下使用新键追加行并更新具有已存在键的行的整数字段。
我正在尝试在 Java 中执行此操作,因为 Python 没有 i/o 连接器。非常感谢任何关于使用 Python 执行此操作的建议。
现在,如果我能从 BigQuery 读取 table 并将我从 table 得到的任何内容写入 Spanner 中的 table,我会非常高兴,但我可以甚至让这成为现实。
问题:
- 我正在使用 Maven,但我不知道我需要在 pom 文件中放入哪些依赖项
- 我不知道 java 文件开头需要哪个包和导入
- 我不知道我应该使用 readTableRows() 还是 read(SerializableFunction) 从 BigQuery 读取数据
- 我不知道如何访问 PCollection 中的字符串字段以连接它们或如何创建仅包含键和整数字段的新 PCollection
- 我需要以某种方式将 PCollection 变成一个 Mutation 以写入 Spanner
- 我想使用 INSERT UPDATE 查询写入 Spanner table,这在 Spanner i/o 连接器中似乎不是一个选项。
老实说,我什至都不好意思展示我正在尝试的代码 运行。
public class SimpleTransfer {
public static void main(String[] args) {
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For Cloud execution, set the Cloud Platform project, staging location, and specify DataflowRunner.
options.setProject("myproject");
options.setStagingLocation("gs://mybucket");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows()
.from(tableSpec);
// Hopefully some day add a transform
// Somehow make a Mutation
PCollection<Mutation> mutation = rowsFromBigQuery;
// Only way I found to write to Spanner, not even sure if that works.
SpannerWriteResult result = mutation.apply(
SpannerIO.write().withInstanceId("myinstance").withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}
}
处理这些奇怪的数据类型令人生畏,但一旦您习惯了 TableRow
和 Mutation
类型,您将能够编写强大的管道代码。
您需要做的第一件事是获取 TableRow
中的 PCollection
,并将它们转换为方便您使用的中间格式。让我们使用 Beam 的 KV
,它定义了一个键值对。在以下代码片段中,我们从 TableRow
中提取值,并连接您想要的字符串:
rowsFromBigQuery
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
TypeDescriptors.integers()))
.via(tableRow -> KV.of(
(String) tableRow.get("myKey1")
+ (String) tableRow.get("myKey2")
+ (String) tableRow.get("myKey3"),
(Integer) tableRow.get("myIntegerField"))))
最后,为了写入 Spanner,我们使用 Mutation
类型的对象,它定义了我们想要应用于 Spanner 中的行的突变类型。我们将使用另一个 MapElements
转换来完成它,它需要 N 个输入和 returns N 个输出。我们在此处定义 insert 或 update 突变:
myKvPairsPCollection
.apply(
MapElements.into(TypeDescriptor.of(Mutation.class))
.via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
.set("key").to(elm.getKey())
.set("value").to(elm.getValue()));
然后你可以将输出传递给SpannerIO.write
。整个管道看起来像这样:
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
PCollection<TableRow> rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows().from(tableSpec));
// Take in a TableRow, and convert it into a key-value pair
PCollection<Mutation> mutations = rowsFromBigQuery
// First we make the TableRows into the appropriate key-value
// pair of string key and integer.
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
TypeDescriptors.integers()))
.via(tableRow -> KV.of(
(String) tableRow.get("myKey1")
+ (String) tableRow.get("myKey2")
+ (String) tableRow.get("myKey3"),
(Integer) tableRow.get("myIntegerField"))))
// Now we construct the mutations
.apply(
MapElements.into(TypeDescriptor.of(Mutation.class))
.via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
.set("key").to(elm.getKey())
.set("value").to(elm.getValue()));
// Now we pass the mutations to spanner
SpannerWriteResult result = mutations.apply(
SpannerIO.write()
.withInstanceId("myinstance")
.withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}