浏览所有文档并批量更新其中一些
Browse all documents and bulk update some of them
我正在使用 Elastic 的 Jest client 来浏览文档索引以更新一个字段。我的工作流程是 运行 一个带分页的空查询,看看我是否可以计算额外的字段。如果可以的话,我会一次性更新相关文档。
伪代码
private void process() {
int from = 0
int size = this.properties.batchSize
boolean moreResults = true
while (moreResults) {
moreResults = handleBatch(from, this.properties.batchSize)
from += size
}
}
private boolean handleBatch(int from, int size) {
log.info("Processing records $from to " + (from + size))
def result = search(from, size)
if (result.isSucceeded()) {
// Check each element and perform an upgrade
}
// return true if the query returned at least one item
}
private SearchResult search(int from, int size) {
String query =
'{ "from": ' + from + ', ' +
'"size": ' + size + '}'
Search search = new Search.Builder(query)
.addIndex("my-index")
.addType('my-document')
.build();
jestClient.execute(search)
}
我没有任何错误,但是当我 运行 多次批处理时,看起来正在寻找 "new" 个要升级的文档,而文档总数没有改变。我怀疑一个更新的文件被处理了几次,我可以通过检查处理过的 ID 来确认。
我怎样才能 运行 查询以便处理原始文档并且任何更新都不会干扰它?
而不是 运行 正常搜索(即使用 from
+size
),您需要 运行 一个 scroll
search query。主要区别在于滚动将冻结文档的给定快照(在查询时)并查询它们。第一次滚动查询后发生的任何更改都不会被考虑。
使用 Jest,您需要将代码修改为更像这样:
// 1. Initiate the scroll request
Search search = new Search.Builder(searchSourceBuilder.toString())
.addIndex("my-index")
.addType("my-document")
.addSort(new Sort("_doc"))
.setParameter(Parameters.SIZE, size)
.setParameter(Parameters.SCROLL, "5m")
.build();
JestResult result = jestClient.execute(search);
// 2. Get the scroll_id to use in subsequent request
String scrollId = result.getJsonObject().get("_scroll_id").getAsString();
// 3. Issue scroll search requests until you have retrieved all results
boolean moreResults = true;
while (moreResults) {
SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m")
.setParameter(Parameters.SIZE, size).build();
result = client.execute(scroll);
def hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
moreResults = hits.size() > 0;
}
你需要用上面的代码修改你的process
和handleBatch
方法。它应该很简单,如果没有请告诉我。
我正在使用 Elastic 的 Jest client 来浏览文档索引以更新一个字段。我的工作流程是 运行 一个带分页的空查询,看看我是否可以计算额外的字段。如果可以的话,我会一次性更新相关文档。
伪代码
private void process() {
int from = 0
int size = this.properties.batchSize
boolean moreResults = true
while (moreResults) {
moreResults = handleBatch(from, this.properties.batchSize)
from += size
}
}
private boolean handleBatch(int from, int size) {
log.info("Processing records $from to " + (from + size))
def result = search(from, size)
if (result.isSucceeded()) {
// Check each element and perform an upgrade
}
// return true if the query returned at least one item
}
private SearchResult search(int from, int size) {
String query =
'{ "from": ' + from + ', ' +
'"size": ' + size + '}'
Search search = new Search.Builder(query)
.addIndex("my-index")
.addType('my-document')
.build();
jestClient.execute(search)
}
我没有任何错误,但是当我 运行 多次批处理时,看起来正在寻找 "new" 个要升级的文档,而文档总数没有改变。我怀疑一个更新的文件被处理了几次,我可以通过检查处理过的 ID 来确认。
我怎样才能 运行 查询以便处理原始文档并且任何更新都不会干扰它?
而不是 运行 正常搜索(即使用 from
+size
),您需要 运行 一个 scroll
search query。主要区别在于滚动将冻结文档的给定快照(在查询时)并查询它们。第一次滚动查询后发生的任何更改都不会被考虑。
使用 Jest,您需要将代码修改为更像这样:
// 1. Initiate the scroll request
Search search = new Search.Builder(searchSourceBuilder.toString())
.addIndex("my-index")
.addType("my-document")
.addSort(new Sort("_doc"))
.setParameter(Parameters.SIZE, size)
.setParameter(Parameters.SCROLL, "5m")
.build();
JestResult result = jestClient.execute(search);
// 2. Get the scroll_id to use in subsequent request
String scrollId = result.getJsonObject().get("_scroll_id").getAsString();
// 3. Issue scroll search requests until you have retrieved all results
boolean moreResults = true;
while (moreResults) {
SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m")
.setParameter(Parameters.SIZE, size).build();
result = client.execute(scroll);
def hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
moreResults = hits.size() > 0;
}
你需要用上面的代码修改你的process
和handleBatch
方法。它应该很简单,如果没有请告诉我。