我如何使用 DatastoreIO 和 Dataflow 批量删除数百万个实体
How can I do batch deletes millions on entities using DatastoreIO and Dataflow
我正在尝试使用 Dataflow 删除数百万个 Datastore 实体,但速度非常慢 (5 entities/s)。我希望你能向我解释我应该遵循的模式,以允许它以合理的速度扩展。只是增加更多工人并没有帮助。
Datastore 管理控制台能够删除特定类型的所有实体,但它经常失败,我需要一周或更长时间才能删除 4000 万个实体。 Dataflow 应该也能帮助我删除数百万只匹配特定查询参数的实体。
我猜想应该采用某种类型的批处理策略(例如,我创建了一个包含 1000 个删除的突变),但我不清楚我将如何去做。 DatastoreIO 一次只提供一个实体供我使用。不胜感激。
下面是我目前的慢解决方案。
Pipeline p = Pipeline.create(options);
DatastoreIO.Source source = DatastoreIO.source()
.withDataset(options.getDataset())
.withQuery(getInstrumentQuery(options))
.withNamespace(options.getNamespace());
p.apply("ReadLeafDataFromDatastore", Read.from(source))
.apply("DeleteRecords", ParDo.of(new DeleteInstrument(options.getDataset())));
p.run();
static class DeleteInstrument extends DoFn<Entity, Integer> {
String dataset;
DeleteInstrument(String dataset) {
this.dataset = dataset;
}
@Override
public void processElement(ProcessContext c) {
DatastoreV1.Mutation.Builder mutation = DatastoreV1.Mutation.newBuilder();
mutation.addDelete(c.element().getKey());
final DatastoreV1.CommitRequest.Builder request = DatastoreV1.CommitRequest.newBuilder();
request.setMutation(mutation);
request.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
try {
DatastoreOptions.Builder dbo = new DatastoreOptions.Builder();
dbo.dataset(dataset);
dbo.credential(getCredential());
Datastore db = DatastoreFactory.get().create(dbo.build());
db.commit(request.build());
c.output(1);
count++;
if(count%100 == 0) {
LOG.info(count+"");
}
} catch (Exception e) {
c.output(0);
e.printStackTrace();
}
}
}
使用当前版本的 DatastoreIO 无法直接删除实体。此版本的 DatastoreIO 将被弃用,取而代之的是下一个 Dataflow 版本中的新版本 (v1beta3)。我们认为提供删除实用程序(通过示例或 PTransform)是一个很好的用例,但仍在进行中。
现在您可以批量删除,而不是一次删除一个:
public static class DeleteEntityFn extends DoFn<Entity, Void> {
// Datastore max batch limit
private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
private Datastore db;
private List<Key> keyList = new ArrayList<>();
@Override
public void startBundle(Context c) throws Exception {
// Initialize Datastore Client
// db = ...
}
@Override
public void processElement(ProcessContext c) throws Exception {
keyList.add(c.element().getKey());
if (keyList.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
flush();
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (keyList.size() > 0) {
flush();
}
}
private void flush() throws Exception {
// Make one delete request instead of one for each element.
CommitRequest request =
CommitRequest.newBuilder()
.setMode(CommitRequest.Mode.NON_TRANSACTIONAL)
.setMutation(Mutation.newBuilder().addAllDelete(keyList).build())
.build();
db.commit(request);
keyList.clear();
}
}
我正在尝试使用 Dataflow 删除数百万个 Datastore 实体,但速度非常慢 (5 entities/s)。我希望你能向我解释我应该遵循的模式,以允许它以合理的速度扩展。只是增加更多工人并没有帮助。
Datastore 管理控制台能够删除特定类型的所有实体,但它经常失败,我需要一周或更长时间才能删除 4000 万个实体。 Dataflow 应该也能帮助我删除数百万只匹配特定查询参数的实体。
我猜想应该采用某种类型的批处理策略(例如,我创建了一个包含 1000 个删除的突变),但我不清楚我将如何去做。 DatastoreIO 一次只提供一个实体供我使用。不胜感激。
下面是我目前的慢解决方案。
Pipeline p = Pipeline.create(options);
DatastoreIO.Source source = DatastoreIO.source()
.withDataset(options.getDataset())
.withQuery(getInstrumentQuery(options))
.withNamespace(options.getNamespace());
p.apply("ReadLeafDataFromDatastore", Read.from(source))
.apply("DeleteRecords", ParDo.of(new DeleteInstrument(options.getDataset())));
p.run();
static class DeleteInstrument extends DoFn<Entity, Integer> {
String dataset;
DeleteInstrument(String dataset) {
this.dataset = dataset;
}
@Override
public void processElement(ProcessContext c) {
DatastoreV1.Mutation.Builder mutation = DatastoreV1.Mutation.newBuilder();
mutation.addDelete(c.element().getKey());
final DatastoreV1.CommitRequest.Builder request = DatastoreV1.CommitRequest.newBuilder();
request.setMutation(mutation);
request.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
try {
DatastoreOptions.Builder dbo = new DatastoreOptions.Builder();
dbo.dataset(dataset);
dbo.credential(getCredential());
Datastore db = DatastoreFactory.get().create(dbo.build());
db.commit(request.build());
c.output(1);
count++;
if(count%100 == 0) {
LOG.info(count+"");
}
} catch (Exception e) {
c.output(0);
e.printStackTrace();
}
}
}
使用当前版本的 DatastoreIO 无法直接删除实体。此版本的 DatastoreIO 将被弃用,取而代之的是下一个 Dataflow 版本中的新版本 (v1beta3)。我们认为提供删除实用程序(通过示例或 PTransform)是一个很好的用例,但仍在进行中。
现在您可以批量删除,而不是一次删除一个:
public static class DeleteEntityFn extends DoFn<Entity, Void> {
// Datastore max batch limit
private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
private Datastore db;
private List<Key> keyList = new ArrayList<>();
@Override
public void startBundle(Context c) throws Exception {
// Initialize Datastore Client
// db = ...
}
@Override
public void processElement(ProcessContext c) throws Exception {
keyList.add(c.element().getKey());
if (keyList.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
flush();
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (keyList.size() > 0) {
flush();
}
}
private void flush() throws Exception {
// Make one delete request instead of one for each element.
CommitRequest request =
CommitRequest.newBuilder()
.setMode(CommitRequest.Mode.NON_TRANSACTIONAL)
.setMutation(Mutation.newBuilder().addAllDelete(keyList).build())
.build();
db.commit(request);
keyList.clear();
}
}