PubSub 到 Spanner 流媒体管道
PubSub to Spanner Streaming Pipeline
我正在尝试将 JSON 类型的 PubSub 消息流式传输到 spanner 数据库,并且 insert_update 工作得很好。
Spanner table 具有复合主键,因此需要在从 PubSub 插入新数据之前删除现有数据(因此仅存在最新数据)。 Spanner 替换或 insert/update 突变在这种情况下不起作用。
我添加了管道
import org.apache.beam.* ;
public class PubSubToSpannerPipeline {
// JSON to TableData Object
public static class PubSubToTableDataFn extends DoFn<String, TableData> {
@ProcessElement
public void processElement(ProcessContext c) {
.
.
.
}
}
public interface PubSubToSpannerOptions extends PipelineOptions, StreamingOptions {
.
.
.
}
public static void main(String[] args) {
PubSubToSpannerOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(PubSubToSpannerOptions.class);
options.setStreaming(true);
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(options.getProjectId())
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId());
Pipeline pipeLine = Pipeline.create(options);
PCollection<TableData> tableDataMsgs = pipeLine.apply(PubsubIO.readStrings()
.fromSubscription(options.getInputSubscription()))
.apply("ParsePubSubMessage", ParDo.of(new PubSubToTableDataFn ()));
// Window function
PCollection<TableData> tableDataJson = tableDataMsgs
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<MutationGroup> upsertMutationGroup = tableDataJson.apply("TableDataMutation",
MapElements.via(new SimpleFunction<TableData, MutationGroup>() {
public MutationGroup apply(TableData input) {
String object_id = input.objectId;
pipeLine.apply("ReadExistingData", SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery("SELECT object_id, mapped_object_id, mapped_object_name from TableName where object_id ='" + object_id + "'")
.apply("MutationForExistingTableData",
ParDo.of(new DoFn<Struct, Mutation>(){
@ProcessElement
public void processElement(ProcessContext c) {
Struct str = c.element();
c.output(Mutation.delete("TableName", KeySet.newBuilder()
.addKey(Key.newBuilder()
.append(str.getString("object_id"))
.append(str.getString("mapped_object_id"))
.append(str.getString("mapped_object_name")).build()).build()));
}
} ))
.apply("DeleteExistingTableData", SpannerIO.write().withSpannerConfig(spannerConfig));
Mutation dataMutation = Mutation.newReplaceBuilder("TableName",
.
.
.
);
List<Mutation> list = new ArrayList<Mutation>();
List<Map<String, String>> mappingList = input.listOfObjectRows;
for (Map<String, String> objectMap : mappingList ) {
list.add(Mutation.newReplaceBuilder("TableName",
.
.
.);
}
return MutationGroup.create(dataMutation, list);
}
} )));
upsertMutationGroup.apply("WriteDataToSpanner", SpannerIO.write()
.withSpannerConfig(spannerConfig)
.grouped());
// Run the pipeline.
pipeLine.run().waitUntilFinish();
}
}
class TableData implements Serializable {
String objectId;
List<Map<String, String>> listOfObjectRows;
}
预计现有映射数据必须从 table 中删除,然后才能插入或更新数据。
我不完全确定你在做什么,但看起来你想要:
- 使用与 pubsub 消息匹配的密钥(或部分密钥)读取一些现有数据
- 删除此数据
- 从 pubsub 消息中插入新数据
一个选项是创建一个 DoFn
来在读写事务中执行此 read/delete/insert(或 read/update)。这将保持数据库的一致性...
使用 SpannerIO.WriteFn 作为模型 - 您需要在 @Setup
和 @Teardown
事件处理程序中将 SpannerAccessor
设置为瞬态 create/delete
您的 DoFn
的 @ProcessElement
处理程序将创建一个 Read-write Transaction,您可以在其中读取键的行,更新或删除它们,然后插入新元素(秒)。
这种方法的缺点是每个 Spanner 事务只会处理一条 Pub/Sub 消息(除非你在前面的步骤中做了一些聪明的事情,比如将它们分组),这是一个复杂的读写交易。如果您的 messages/sec 速率相对较低,这很好,但如果不是,这种方法会给您的数据库带来更多负载。
第二种选择是使用键范围的盲目删除。这只有在 object_id 是复合键的第一部分(它似乎来自您的代码)时才有效。
您将创建一个包含删除突变的 MutationGroup
,该删除突变使用具有键范围的删除突变盲删除其键以 object_id 开头的任何现有行,然后插入突变到替换删除的行。
MutationGroup.create(
// Delete rows with key starting with object_id.
Mutation.delete("TableName", KeySet.newBuilder()
.addRange(
KeyRange.closedClosed(
Key.of(str.getString("object_id")),
Key.of(str.getString("object_id"))))
.build()),
// Insert replacement rows.
Mutation.newInsertBuilder("TableName")
.set("column").to("value"),
...
.build(),
Mutation.newInsertBuilder("TableName")
...);
这将像以前一样传递给 SpannerIO.write().grouped(),以便可以对它们进行批处理以提高效率。
我正在尝试将 JSON 类型的 PubSub 消息流式传输到 spanner 数据库,并且 insert_update 工作得很好。 Spanner table 具有复合主键,因此需要在从 PubSub 插入新数据之前删除现有数据(因此仅存在最新数据)。 Spanner 替换或 insert/update 突变在这种情况下不起作用。 我添加了管道
import org.apache.beam.* ;
public class PubSubToSpannerPipeline {
// JSON to TableData Object
public static class PubSubToTableDataFn extends DoFn<String, TableData> {
@ProcessElement
public void processElement(ProcessContext c) {
.
.
.
}
}
public interface PubSubToSpannerOptions extends PipelineOptions, StreamingOptions {
.
.
.
}
public static void main(String[] args) {
PubSubToSpannerOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(PubSubToSpannerOptions.class);
options.setStreaming(true);
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(options.getProjectId())
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId());
Pipeline pipeLine = Pipeline.create(options);
PCollection<TableData> tableDataMsgs = pipeLine.apply(PubsubIO.readStrings()
.fromSubscription(options.getInputSubscription()))
.apply("ParsePubSubMessage", ParDo.of(new PubSubToTableDataFn ()));
// Window function
PCollection<TableData> tableDataJson = tableDataMsgs
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<MutationGroup> upsertMutationGroup = tableDataJson.apply("TableDataMutation",
MapElements.via(new SimpleFunction<TableData, MutationGroup>() {
public MutationGroup apply(TableData input) {
String object_id = input.objectId;
pipeLine.apply("ReadExistingData", SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery("SELECT object_id, mapped_object_id, mapped_object_name from TableName where object_id ='" + object_id + "'")
.apply("MutationForExistingTableData",
ParDo.of(new DoFn<Struct, Mutation>(){
@ProcessElement
public void processElement(ProcessContext c) {
Struct str = c.element();
c.output(Mutation.delete("TableName", KeySet.newBuilder()
.addKey(Key.newBuilder()
.append(str.getString("object_id"))
.append(str.getString("mapped_object_id"))
.append(str.getString("mapped_object_name")).build()).build()));
}
} ))
.apply("DeleteExistingTableData", SpannerIO.write().withSpannerConfig(spannerConfig));
Mutation dataMutation = Mutation.newReplaceBuilder("TableName",
.
.
.
);
List<Mutation> list = new ArrayList<Mutation>();
List<Map<String, String>> mappingList = input.listOfObjectRows;
for (Map<String, String> objectMap : mappingList ) {
list.add(Mutation.newReplaceBuilder("TableName",
.
.
.);
}
return MutationGroup.create(dataMutation, list);
}
} )));
upsertMutationGroup.apply("WriteDataToSpanner", SpannerIO.write()
.withSpannerConfig(spannerConfig)
.grouped());
// Run the pipeline.
pipeLine.run().waitUntilFinish();
}
}
class TableData implements Serializable {
String objectId;
List<Map<String, String>> listOfObjectRows;
}
预计现有映射数据必须从 table 中删除,然后才能插入或更新数据。
我不完全确定你在做什么,但看起来你想要:
- 使用与 pubsub 消息匹配的密钥(或部分密钥)读取一些现有数据
- 删除此数据
- 从 pubsub 消息中插入新数据
一个选项是创建一个 DoFn
来在读写事务中执行此 read/delete/insert(或 read/update)。这将保持数据库的一致性...
使用 SpannerIO.WriteFn 作为模型 - 您需要在 @Setup
和 @Teardown
事件处理程序中将 SpannerAccessor
设置为瞬态 create/delete
您的 DoFn
的 @ProcessElement
处理程序将创建一个 Read-write Transaction,您可以在其中读取键的行,更新或删除它们,然后插入新元素(秒)。
这种方法的缺点是每个 Spanner 事务只会处理一条 Pub/Sub 消息(除非你在前面的步骤中做了一些聪明的事情,比如将它们分组),这是一个复杂的读写交易。如果您的 messages/sec 速率相对较低,这很好,但如果不是,这种方法会给您的数据库带来更多负载。
第二种选择是使用键范围的盲目删除。这只有在 object_id 是复合键的第一部分(它似乎来自您的代码)时才有效。
您将创建一个包含删除突变的 MutationGroup
,该删除突变使用具有键范围的删除突变盲删除其键以 object_id 开头的任何现有行,然后插入突变到替换删除的行。
MutationGroup.create(
// Delete rows with key starting with object_id.
Mutation.delete("TableName", KeySet.newBuilder()
.addRange(
KeyRange.closedClosed(
Key.of(str.getString("object_id")),
Key.of(str.getString("object_id"))))
.build()),
// Insert replacement rows.
Mutation.newInsertBuilder("TableName")
.set("column").to("value"),
...
.build(),
Mutation.newInsertBuilder("TableName")
...);
这将像以前一样传递给 SpannerIO.write().grouped(),以便可以对它们进行批处理以提高效率。