如何使用事务性 DatastoreIO
How to use transactional DatastoreIO
我正在使用我的流数据流管道中的 DatastoreIO,并且在使用相同的键写入实体时出现错误。
2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
如果我在密钥中使用随机数,一切正常,但我需要更新相同的密钥,那么是否有使用 DataStoreIO 执行此操作的事务性方法?
static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
private static final long serialVersionUID = 0;
private final String namespace;
private final String kind;
CreateEntityFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
}
public Entity makeEntity(String key, Tile tile) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(kind, key );
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder.build());
entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build());
return entityBuilder.build();
}
@Override
public void processElement(ProcessContext c) {
String key = c.element().getKey();
// this works key = key.concat(":" + UUID.randomUUID().toString());
c.output(makeEntity(key, c.element().getValue()));
}
}
...
...
inputData = pipeline
.apply(PubsubIO.Read.topic(pubsubTopic));
windowedDataStreaming = inputData
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
...
...
...
//Create a Datastore entity
PCollection<Entity> siteTileEntities = tileSiteKeyed
.apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));
// write site tiles to datastore
siteTileEntities
.apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));
// Run the pipeline
pipeline.run();
此错误表明 Cloud Datastore 收到了一个 Commit
请求,该请求对同一键进行了两次更改(即它尝试两次插入同一实体或两次修改同一实体)。
您可以通过在每个 Commit
请求的每个键中仅包含一个变更来避免该错误。
您的代码片段没有解释 tileSiteKeyed
是如何创建的。大概是 PCollection<KV<String, Tile>
,但如果它可能有重复的 String
键,那就可以解释问题了。
通常一个PCollection<KV<K, V>>
可能包含多个具有相同密钥的KV对。如果您想确保每个 window 的唯一键,您可以使用 GroupByKey
来做到这一点。这会给你一个 PCollection<KV<K, Iterable<V>>>
,每个 window 都有唯一的键。然后扩充 CreateEntityFn
以采用 Iterable<Tile>
并创建一个包含您需要进行的更改的单一突变。
我正在使用我的流数据流管道中的 DatastoreIO,并且在使用相同的键写入实体时出现错误。
2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
如果我在密钥中使用随机数,一切正常,但我需要更新相同的密钥,那么是否有使用 DataStoreIO 执行此操作的事务性方法?
static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
private static final long serialVersionUID = 0;
private final String namespace;
private final String kind;
CreateEntityFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
}
public Entity makeEntity(String key, Tile tile) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(kind, key );
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder.build());
entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build());
return entityBuilder.build();
}
@Override
public void processElement(ProcessContext c) {
String key = c.element().getKey();
// this works key = key.concat(":" + UUID.randomUUID().toString());
c.output(makeEntity(key, c.element().getValue()));
}
}
...
...
inputData = pipeline
.apply(PubsubIO.Read.topic(pubsubTopic));
windowedDataStreaming = inputData
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
...
...
...
//Create a Datastore entity
PCollection<Entity> siteTileEntities = tileSiteKeyed
.apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));
// write site tiles to datastore
siteTileEntities
.apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));
// Run the pipeline
pipeline.run();
此错误表明 Cloud Datastore 收到了一个 Commit
请求,该请求对同一键进行了两次更改(即它尝试两次插入同一实体或两次修改同一实体)。
您可以通过在每个 Commit
请求的每个键中仅包含一个变更来避免该错误。
您的代码片段没有解释 tileSiteKeyed
是如何创建的。大概是 PCollection<KV<String, Tile>
,但如果它可能有重复的 String
键,那就可以解释问题了。
通常一个PCollection<KV<K, V>>
可能包含多个具有相同密钥的KV对。如果您想确保每个 window 的唯一键,您可以使用 GroupByKey
来做到这一点。这会给你一个 PCollection<KV<K, Iterable<V>>>
,每个 window 都有唯一的键。然后扩充 CreateEntityFn
以采用 Iterable<Tile>
并创建一个包含您需要进行的更改的单一突变。