Spring 数据 Elasticsearch 批量 Index/Delete - 数百万条记录

Spring Data Elasticsearch Bulk Index/Delete - Millions of Records

我正在使用 Spring Data Elasticsearch 4.2.5,我们有一项工作是对特定数据库进行 ETL(提取、转换和加载数据)table。我正在使用 Elasticsearch 为这些数据编制索引,而工作是 运行。数据将包含数百万条记录甚至更多。目前,我正在对每次迭代进行索引。我读到,在每次迭代中使用 elasticsearch 索引可能需要一些时间。我想使用类似 bulk-index 的东西,但为此我需要将 indexQuery 对象添加到列表中。将数百万条记录添加到列表并进行批量索引可能会带来内存问题。

我需要应用类似的删除过程。当根据一些公共ID删除记录时,我需要删除相关的弹性文件,这也是数以百万计的。

有什么方法可以indexing/deleting非常快地满足这个要求吗?如果我的理解不正确,非常感谢任何帮助并纠正我。

索引

for (Map.Entry<Integer, ObjectDetails> key : objectDetailsHashMap.entrySet()) {
    indexDocument(elasticsearchOperations, key, oPath);
    // other code to insert data in db table...
 }

private void indexDocument(ElasticsearchOperations elasticsearchOperations,
                              Map.Entry<Integer, ObjectDetails> key, String oPath) {
    String docId = "" + key.getValue().getCatalogId() + key.getValue().getObjectId();

    byte[] nameBytes = key.getValue().getName();
    byte[] physicalNameBytes = key.getValue().getPhysicalName();
    byte[] definitionBytes =  key.getValue().getDefinition();
    byte[] commentBytes = key.getValue().getComment();

    IndexQuery indexQuery = new IndexQueryBuilder()
            .withId(docId)
            .withObject(new MetadataSearch(
                    key.getValue().getObjectId(),
                    key.getValue().getCatalogId(),
                    key.getValue().getParentId(),
                    key.getValue().getTypeCode(),
                    key.getValue().getStartVersion(),
                    key.getValue().getEndVersion(),
                    nameBytes != null ? new String(nameBytes, StandardCharsets.UTF_8) : "-",
                    physicalNameBytes != null ? new String(physicalNameBytes, StandardCharsets.UTF_8) : "-",
                    definitionBytes != null ? new String(definitionBytes, StandardCharsets.UTF_8) : "-",
                    commentBytes != null ? new String(commentBytes, StandardCharsets.UTF_8) : "-",
                    oPath
            ))
            .build();

    elasticsearchOperations.index(indexQuery, IndexCoordinates.of("portal_idx"));
}

正在删除

private void deleteElasticDocuments(String catalogId) {
    String queryText = martServerContext.getQueryCacheInstance().getQuery(QUERY_PORTAL_GET_OBJECTS_IN_PORTAL_BY_MODEL);
    MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
    mapSqlParameterSource.addValue("cId", Integer.parseInt(catalogId));
    namedParameterJdbcTemplate.query(queryText, mapSqlParameterSource, (resultSet -> {
        int objectId = resultSet.getInt(O_ID);
        String docId = catalogId + objectId;
        elasticsearchOperations.delete(docId, IndexCoordinates.of("portal_idx"));
    }));
}

要添加文档,您可以使用批量索引,例如通过收集文档以在 list/array 或其他任何地方建立索引,当达到预定义的大小时 - 例如 500 个条目 - 然后对这些文件进行批量插入.

对于删除,没有批量操作,但您可以收集要删除的 ID,再次以最大大小在列表或数组中删除,然后使用 ElasticsearchOperations.idsQuery(List<String>) 为这些 ID 创建查询并将其传递到delete(query) 方法。

编辑 29.09.2021:

idsQuery 是刚刚在 4.3 分支中添加的,它是这样简化的 (https://github.com/spring-projects/spring-data-elasticsearch/blob/main/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java#L193-L200):

@Override
public Query idsQuery(List<String> ids) {

    Assert.notNull(ids, "ids must not be null");

    return new NativeSearchQueryBuilder().withQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[] {})))
            .build();
}