使用三个调用在 Elastic Search 中使用索引策略更新文档效率不高

Updating a document with index strategy in Elastic Search with three calls is not efficient

我有一个 AWS Elastic Search 服务器。 使用映射模板和索引策略。

{
  "index_patterns": "users*",
  "order": 6,
  "version": 6,
  "aliases": {
    "users": {}
  },
  "settings": {
    "number_of_shards": 5
  },
  "mappings": {
    "_doc": {
      "dynamic": "strict",
      "properties": {
        "id": { "type": "keyword" },
        "emailAddress": { "type": "keyword" }
      }
    }
  }
}

指数策略为{index_patterns}-{yyyy}-{MM}-{order}-{version}

public async Task<Result> HandleEventAsync(UserChanged @event, CancellationToken cancellationToken)
{
    // 1. Get User, I could get away with this call if Index was known and strategy not used
    var userMaybe =
        await _usersRepository.GetByIdAsync(@event.AggregateId.ToString(), cancellationToken);

    if (userMaybe.HasValue)
    {
        var user = userMaybe.Value.User;

        var partialUpdate = new
        {
            name = @event.Profile.Name,
            birthDate = @event.Profile.BirthDate?.ToString("yyyy-MM-dd"),
            gender = @event.Profile.Gender.ToString(),
            updatedDate = DateTime.UtcNow,
            updatedTimestampEpochInMilliseconds = EpochGenerator.EpochTimestampInMilliseconds(),
        };

        // 2. Remove fields with NULL values (if found any)
        // 3. Partial or Full update of the document, in this case partial
        var result = await _usersRepository.UpdateAsync(user.Id, partialUpdate, userMaybe.Value.Index, cancellationToken: cancellationToken);

        return result.IsSuccess ? Result.Ok() : Result.Fail($"Failed to update User {user.Id}");
    }

    return Result.Fail("User doesn't exist");
}

因此,在这种方法中,我使用了 SQS 消息,出于查找索引的原因,我从 Elastic Search 中检索了文档,因为我并不清楚它,使用以下方法删除任何 NULL 字段,因为更新中的序列化程序将包含 NULL 值,然后部分更新文档。

这是针对 1 次更新的 3 次 Elastic Search 操作,我知道可以删除 NULL 值 UpdateByQuery 调用并决定只容忍文档中的空值,但我们可能会遇到无法使用 [=24= 进行查询的问题] 用于这些字段(如果需要)。

private async Task<Result> RemoveNullFieldsFromDocumentAsync(
            object document,
            string documentId,
            string indexName = null, 
            string typeName = null,
            CancellationToken cancellationToken = default)
{
    var result = Result.Ok();
    var allNullProperties = GetNullPropertyValueNames(document);
    if (allNullProperties.AnyAndNotNull())
    {
        var script = allNullProperties.Select(p => $"ctx._source.remove('{p}')").Aggregate((p1, p2) => $"{p1}; {p2};");
        result = await UpdateByQueryIdAsync(
                                        documentId, 
                                        script,
                                        indexName,
                                        typeName,
                                        cancellationToken: cancellationToken);
    }

    return result;
}

private static IReadOnlyList<string> GetNullPropertyValueNames(object document)
{
    var allPublicProperties =  document.GetType().GetProperties().ToList();

    var allObjects = allPublicProperties.Where(pi => pi.PropertyType.IsClass).ToList();

    var allNames = new List<string>();

    foreach (var propertyInfo in allObjects)
    {
        if (propertyInfo.PropertyType == typeof(string))
        {
            var isNullOrEmpty = ((string) propertyInfo.GetValue(document)).IsNullOrEmpty();
            if (isNullOrEmpty)
            {
                allNames.Add(propertyInfo.Name.ToCamelCase());
            }
        }
        else if (propertyInfo.PropertyType.IsClass)
        {
            if (propertyInfo.GetValue(document).IsNull())
            {
                allNames.Add(propertyInfo.Name.ToCamelCase());
            }
            else
            {
                var namesWithobjectName = GetNullPropertyValueNames(propertyInfo.GetValue(document))
                    .Select(p => $"{propertyInfo.PropertyType.Name.ToCamelCase()}.{p.ToCamelCase()}");
                allNames.AddRange(namesWithobjectName);
            }
        }
    }

    return allNames;
}

public async Task<Result> UpdateByQueryIdAsync(
    string documentId,
    string script,
    string indexName = null, 
    string typeName = null, 
    bool waitForCompletion= false,
    CancellationToken cancellationToken = default)
{
    Guard.Argument(documentId, nameof(documentId)).NotNull().NotEmpty().NotWhiteSpace();
    Guard.Argument(script, nameof(script)).NotNull().NotEmpty().NotWhiteSpace();

    var response = await Client.UpdateByQueryAsync<T>(
        u => u.Query(q => q.Ids(i => i.Values(documentId)))
                .Conflicts(Conflicts.Proceed)
                .Script(s => s.Source(script))
                .Refresh()
                .WaitForCompletion(waitForCompletion)
                .Index(indexName ?? DocumentMappings.IndexStrategy)
                .Type(typeName ?? DocumentMappings.TypeName), 
        cancellationToken);

    var errorMessage = response.LogResponseIfError(_logger);

    return errorMessage.IsNullOrEmpty() ? Result.Ok() : Result.Fail(errorMessage);
}

我的问题是,如果我更改策略以对所有用户文档使用常量索引,这些文档的数量并不重要,目前不会真正增长到数十亿,我对 Elastic 的性能会有影响吗搜索、sharding/indexing 等?

是的。单个索引可以处理大量数据:您不需要将它们拆分得那么小。事实上,从性能的角度来看,具有小分片的小索引实际上更糟糕,因为它导致每个节点有很多分片,占用堆 space 开销。

如果您有大量数据定期传入,那么创建一个基于日期的索引是有意义的,所以也许 index_name-yyyyMMdd 模式会起作用。

最后,您始终可以使用通配符搜索所有索引。所以你可以通过查询 index_name-* 来搜索上面的内容。在您现有的模式中,您可以执行相同的操作:index_patterns-*index_patterns-yyyy-*,等等

关于分片大小的一些信息:https://www.elastic.co/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster