如何使用事务性 DatastoreIO

How to use transactional DatastoreIO

我正在使用我的流数据流管道中的 DatastoreIO,并且在使用相同的键写入实体时出现错误。

2016-12-10T22:51:04.385Z: Error:   (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT

如果我在密钥中使用随机数,一切正常,但我需要更新相同的密钥,那么是否有使用 DataStoreIO 执行此操作的事务性方法?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
  private static final long serialVersionUID = 0;

  private final String namespace;
  private final String kind;

  CreateEntityFn(String namespace, String kind) {
    this.namespace = namespace;
    this.kind = kind;
  }

  public Entity makeEntity(String key, Tile tile) {
    Entity.Builder entityBuilder = Entity.newBuilder();
    Key.Builder keyBuilder = makeKey(kind, key );
    if (namespace != null) {
      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
    }
    entityBuilder.setKey(keyBuilder.build());
    entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build()); 
    return entityBuilder.build();   
  }

  @Override
  public void processElement(ProcessContext c) { 
    String key = c.element().getKey();
    // this works    key = key.concat(":" + UUID.randomUUID().toString());
    c.output(makeEntity(key, c.element().getValue()));
  }
}

...

...
 inputData = pipeline
                .apply(PubsubIO.Read.topic(pubsubTopic));
 windowedDataStreaming = inputData
                .apply(Window.<String>into(
                      SlidingWindows.of(Duration.standardMinutes(15))
                                    .every(Duration.standardSeconds(31))));


                             ...
                             ...
                             ...
 //Create a Datastore entity 
 PCollection<Entity> siteTileEntities = tileSiteKeyed
      .apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));       

 // write site tiles to datastore
 siteTileEntities
      .apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));

 // Run the pipeline
 pipeline.run(); 

此错误表明 Cloud Datastore 收到了一个 Commit 请求,该请求对同一键进行了两次更改(即它尝试两次插入同一实体或两次修改同一实体)。

您可以通过在每个 Commit 请求的每个键中仅包含一个变更来避免该错误。

您的代码片段没有解释 tileSiteKeyed 是如何创建的。大概是 PCollection<KV<String, Tile>,但如果它可能有重复的 String 键,那就可以解释问题了。

通常一个PCollection<KV<K, V>>可能包含多个具有相同密钥的KV对。如果您想确保每个 window 的唯一键,您可以使用 GroupByKey 来做到这一点。这会给你一个 PCollection<KV<K, Iterable<V>>>,每个 window 都有唯一的键。然后扩充 CreateEntityFn 以采用 Iterable<Tile> 并创建一个包含您需要进行的更改的单一突变。