我如何使用 DatastoreIO 和 Dataflow 批量删除数百万个实体

How can I do batch deletes millions on entities using DatastoreIO and Dataflow

我正在尝试使用 Dataflow 删除数百万个 Datastore 实体,但速度非常慢 (5 entities/s)。我希望你能向我解释我应该遵循的模式,以允许它以合理的速度扩展。只是增加更多工人并没有帮助。

Datastore 管理控制台能够删除特定类型的所有实体,但它经常失败,我需要一周或更长时间才能删除 4000 万个实体。 Dataflow 应该也能帮助我删除数百万只匹配特定查询参数的实体。

我猜想应该采用某种类型的批处理策略(例如,我创建了一个包含 1000 个删除的突变),但我不清楚我将如何去做。 DatastoreIO 一次只提供一个实体供我使用。不胜感激。

下面是我目前的慢解决方案。

Pipeline p = Pipeline.create(options);
DatastoreIO.Source source = DatastoreIO.source()
    .withDataset(options.getDataset())
    .withQuery(getInstrumentQuery(options))
    .withNamespace(options.getNamespace());
p.apply("ReadLeafDataFromDatastore", Read.from(source))
 .apply("DeleteRecords", ParDo.of(new DeleteInstrument(options.getDataset())));
p.run();

static class DeleteInstrument extends DoFn<Entity, Integer> {
 String dataset;
  DeleteInstrument(String dataset) {
    this.dataset = dataset;
  }
  @Override
  public void processElement(ProcessContext c) {
    DatastoreV1.Mutation.Builder mutation = DatastoreV1.Mutation.newBuilder();
    mutation.addDelete(c.element().getKey());
    final DatastoreV1.CommitRequest.Builder request = DatastoreV1.CommitRequest.newBuilder();
    request.setMutation(mutation);
    request.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
    try {
      DatastoreOptions.Builder dbo = new DatastoreOptions.Builder();
      dbo.dataset(dataset);
      dbo.credential(getCredential());
      Datastore db = DatastoreFactory.get().create(dbo.build());
      db.commit(request.build());
      c.output(1);
      count++;
      if(count%100 == 0) {
        LOG.info(count+"");
      }
    } catch (Exception e) {
      c.output(0);
      e.printStackTrace();
    }
  }
}

使用当前版本的 DatastoreIO 无法直接删除实体。此版本的 DatastoreIO 将被弃用,取而代之的是下一个 Dataflow 版本中的新版本 (v1beta3)。我们认为提供删除实用程序(通过示例或 PTransform)是一个很好的用例,但仍在进行中。

现在您可以批量删除,而不是一次删除一个:

  public static class DeleteEntityFn extends DoFn<Entity, Void> {
    // Datastore max batch limit
    private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
    private Datastore db;
    private List<Key> keyList = new ArrayList<>();

    @Override
    public void startBundle(Context c) throws Exception {
      // Initialize Datastore Client 
      // db = ...  
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
      keyList.add(c.element().getKey());
      if (keyList.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
        flush();
      }
    }

    @Override
    public void finishBundle(Context c) throws Exception {
      if (keyList.size() > 0) {
        flush();
      }
    }

    private void flush() throws Exception {
      // Make one delete request instead of one for each element.
      CommitRequest request =
          CommitRequest.newBuilder()
              .setMode(CommitRequest.Mode.NON_TRANSACTIONAL)
              .setMutation(Mutation.newBuilder().addAllDelete(keyList).build())
              .build();
      db.commit(request);
      keyList.clear();
    }
  }