使用三个调用在 Elastic Search 中使用索引策略更新文档效率不高
Updating a document with index strategy in Elastic Search with three calls is not efficient
我有一个 AWS Elastic Search 服务器。
使用映射模板和索引策略。
{
"index_patterns": "users*",
"order": 6,
"version": 6,
"aliases": {
"users": {}
},
"settings": {
"number_of_shards": 5
},
"mappings": {
"_doc": {
"dynamic": "strict",
"properties": {
"id": { "type": "keyword" },
"emailAddress": { "type": "keyword" }
}
}
}
}
指数策略为{index_patterns}-{yyyy}-{MM}-{order}-{version}
public async Task<Result> HandleEventAsync(UserChanged @event, CancellationToken cancellationToken)
{
// 1. Get User, I could get away with this call if Index was known and strategy not used
var userMaybe =
await _usersRepository.GetByIdAsync(@event.AggregateId.ToString(), cancellationToken);
if (userMaybe.HasValue)
{
var user = userMaybe.Value.User;
var partialUpdate = new
{
name = @event.Profile.Name,
birthDate = @event.Profile.BirthDate?.ToString("yyyy-MM-dd"),
gender = @event.Profile.Gender.ToString(),
updatedDate = DateTime.UtcNow,
updatedTimestampEpochInMilliseconds = EpochGenerator.EpochTimestampInMilliseconds(),
};
// 2. Remove fields with NULL values (if found any)
// 3. Partial or Full update of the document, in this case partial
var result = await _usersRepository.UpdateAsync(user.Id, partialUpdate, userMaybe.Value.Index, cancellationToken: cancellationToken);
return result.IsSuccess ? Result.Ok() : Result.Fail($"Failed to update User {user.Id}");
}
return Result.Fail("User doesn't exist");
}
因此,在这种方法中,我使用了 SQS 消息,出于查找索引的原因,我从 Elastic Search 中检索了文档,因为我并不清楚它,使用以下方法删除任何 NULL 字段,因为更新中的序列化程序将包含 NULL 值,然后部分更新文档。
这是针对 1 次更新的 3 次 Elastic Search 操作,我知道可以删除 NULL 值 UpdateByQuery 调用并决定只容忍文档中的空值,但我们可能会遇到无法使用 [=24= 进行查询的问题] 用于这些字段(如果需要)。
private async Task<Result> RemoveNullFieldsFromDocumentAsync(
object document,
string documentId,
string indexName = null,
string typeName = null,
CancellationToken cancellationToken = default)
{
var result = Result.Ok();
var allNullProperties = GetNullPropertyValueNames(document);
if (allNullProperties.AnyAndNotNull())
{
var script = allNullProperties.Select(p => $"ctx._source.remove('{p}')").Aggregate((p1, p2) => $"{p1}; {p2};");
result = await UpdateByQueryIdAsync(
documentId,
script,
indexName,
typeName,
cancellationToken: cancellationToken);
}
return result;
}
private static IReadOnlyList<string> GetNullPropertyValueNames(object document)
{
var allPublicProperties = document.GetType().GetProperties().ToList();
var allObjects = allPublicProperties.Where(pi => pi.PropertyType.IsClass).ToList();
var allNames = new List<string>();
foreach (var propertyInfo in allObjects)
{
if (propertyInfo.PropertyType == typeof(string))
{
var isNullOrEmpty = ((string) propertyInfo.GetValue(document)).IsNullOrEmpty();
if (isNullOrEmpty)
{
allNames.Add(propertyInfo.Name.ToCamelCase());
}
}
else if (propertyInfo.PropertyType.IsClass)
{
if (propertyInfo.GetValue(document).IsNull())
{
allNames.Add(propertyInfo.Name.ToCamelCase());
}
else
{
var namesWithobjectName = GetNullPropertyValueNames(propertyInfo.GetValue(document))
.Select(p => $"{propertyInfo.PropertyType.Name.ToCamelCase()}.{p.ToCamelCase()}");
allNames.AddRange(namesWithobjectName);
}
}
}
return allNames;
}
public async Task<Result> UpdateByQueryIdAsync(
string documentId,
string script,
string indexName = null,
string typeName = null,
bool waitForCompletion= false,
CancellationToken cancellationToken = default)
{
Guard.Argument(documentId, nameof(documentId)).NotNull().NotEmpty().NotWhiteSpace();
Guard.Argument(script, nameof(script)).NotNull().NotEmpty().NotWhiteSpace();
var response = await Client.UpdateByQueryAsync<T>(
u => u.Query(q => q.Ids(i => i.Values(documentId)))
.Conflicts(Conflicts.Proceed)
.Script(s => s.Source(script))
.Refresh()
.WaitForCompletion(waitForCompletion)
.Index(indexName ?? DocumentMappings.IndexStrategy)
.Type(typeName ?? DocumentMappings.TypeName),
cancellationToken);
var errorMessage = response.LogResponseIfError(_logger);
return errorMessage.IsNullOrEmpty() ? Result.Ok() : Result.Fail(errorMessage);
}
我的问题是,如果我更改策略以对所有用户文档使用常量索引,这些文档的数量并不重要,目前不会真正增长到数十亿,我对 Elastic 的性能会有影响吗搜索、sharding/indexing 等?
是的。单个索引可以处理大量数据:您不需要将它们拆分得那么小。事实上,从性能的角度来看,具有小分片的小索引实际上更糟糕,因为它导致每个节点有很多分片,占用堆 space 开销。
如果您有大量数据定期传入,那么创建一个基于日期的索引是有意义的,所以也许 index_name-yyyyMMdd
模式会起作用。
最后,您始终可以使用通配符搜索所有索引。所以你可以通过查询 index_name-*
来搜索上面的内容。在您现有的模式中,您可以执行相同的操作:index_patterns-*
或 index_patterns-yyyy-*
,等等
关于分片大小的一些信息:https://www.elastic.co/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster
我有一个 AWS Elastic Search 服务器。 使用映射模板和索引策略。
{
"index_patterns": "users*",
"order": 6,
"version": 6,
"aliases": {
"users": {}
},
"settings": {
"number_of_shards": 5
},
"mappings": {
"_doc": {
"dynamic": "strict",
"properties": {
"id": { "type": "keyword" },
"emailAddress": { "type": "keyword" }
}
}
}
}
指数策略为{index_patterns}-{yyyy}-{MM}-{order}-{version}
public async Task<Result> HandleEventAsync(UserChanged @event, CancellationToken cancellationToken)
{
// 1. Get User, I could get away with this call if Index was known and strategy not used
var userMaybe =
await _usersRepository.GetByIdAsync(@event.AggregateId.ToString(), cancellationToken);
if (userMaybe.HasValue)
{
var user = userMaybe.Value.User;
var partialUpdate = new
{
name = @event.Profile.Name,
birthDate = @event.Profile.BirthDate?.ToString("yyyy-MM-dd"),
gender = @event.Profile.Gender.ToString(),
updatedDate = DateTime.UtcNow,
updatedTimestampEpochInMilliseconds = EpochGenerator.EpochTimestampInMilliseconds(),
};
// 2. Remove fields with NULL values (if found any)
// 3. Partial or Full update of the document, in this case partial
var result = await _usersRepository.UpdateAsync(user.Id, partialUpdate, userMaybe.Value.Index, cancellationToken: cancellationToken);
return result.IsSuccess ? Result.Ok() : Result.Fail($"Failed to update User {user.Id}");
}
return Result.Fail("User doesn't exist");
}
因此,在这种方法中,我使用了 SQS 消息,出于查找索引的原因,我从 Elastic Search 中检索了文档,因为我并不清楚它,使用以下方法删除任何 NULL 字段,因为更新中的序列化程序将包含 NULL 值,然后部分更新文档。
这是针对 1 次更新的 3 次 Elastic Search 操作,我知道可以删除 NULL 值 UpdateByQuery 调用并决定只容忍文档中的空值,但我们可能会遇到无法使用 [=24= 进行查询的问题] 用于这些字段(如果需要)。
private async Task<Result> RemoveNullFieldsFromDocumentAsync(
object document,
string documentId,
string indexName = null,
string typeName = null,
CancellationToken cancellationToken = default)
{
var result = Result.Ok();
var allNullProperties = GetNullPropertyValueNames(document);
if (allNullProperties.AnyAndNotNull())
{
var script = allNullProperties.Select(p => $"ctx._source.remove('{p}')").Aggregate((p1, p2) => $"{p1}; {p2};");
result = await UpdateByQueryIdAsync(
documentId,
script,
indexName,
typeName,
cancellationToken: cancellationToken);
}
return result;
}
private static IReadOnlyList<string> GetNullPropertyValueNames(object document)
{
var allPublicProperties = document.GetType().GetProperties().ToList();
var allObjects = allPublicProperties.Where(pi => pi.PropertyType.IsClass).ToList();
var allNames = new List<string>();
foreach (var propertyInfo in allObjects)
{
if (propertyInfo.PropertyType == typeof(string))
{
var isNullOrEmpty = ((string) propertyInfo.GetValue(document)).IsNullOrEmpty();
if (isNullOrEmpty)
{
allNames.Add(propertyInfo.Name.ToCamelCase());
}
}
else if (propertyInfo.PropertyType.IsClass)
{
if (propertyInfo.GetValue(document).IsNull())
{
allNames.Add(propertyInfo.Name.ToCamelCase());
}
else
{
var namesWithobjectName = GetNullPropertyValueNames(propertyInfo.GetValue(document))
.Select(p => $"{propertyInfo.PropertyType.Name.ToCamelCase()}.{p.ToCamelCase()}");
allNames.AddRange(namesWithobjectName);
}
}
}
return allNames;
}
public async Task<Result> UpdateByQueryIdAsync(
string documentId,
string script,
string indexName = null,
string typeName = null,
bool waitForCompletion= false,
CancellationToken cancellationToken = default)
{
Guard.Argument(documentId, nameof(documentId)).NotNull().NotEmpty().NotWhiteSpace();
Guard.Argument(script, nameof(script)).NotNull().NotEmpty().NotWhiteSpace();
var response = await Client.UpdateByQueryAsync<T>(
u => u.Query(q => q.Ids(i => i.Values(documentId)))
.Conflicts(Conflicts.Proceed)
.Script(s => s.Source(script))
.Refresh()
.WaitForCompletion(waitForCompletion)
.Index(indexName ?? DocumentMappings.IndexStrategy)
.Type(typeName ?? DocumentMappings.TypeName),
cancellationToken);
var errorMessage = response.LogResponseIfError(_logger);
return errorMessage.IsNullOrEmpty() ? Result.Ok() : Result.Fail(errorMessage);
}
我的问题是,如果我更改策略以对所有用户文档使用常量索引,这些文档的数量并不重要,目前不会真正增长到数十亿,我对 Elastic 的性能会有影响吗搜索、sharding/indexing 等?
是的。单个索引可以处理大量数据:您不需要将它们拆分得那么小。事实上,从性能的角度来看,具有小分片的小索引实际上更糟糕,因为它导致每个节点有很多分片,占用堆 space 开销。
如果您有大量数据定期传入,那么创建一个基于日期的索引是有意义的,所以也许 index_name-yyyyMMdd
模式会起作用。
最后,您始终可以使用通配符搜索所有索引。所以你可以通过查询 index_name-*
来搜索上面的内容。在您现有的模式中,您可以执行相同的操作:index_patterns-*
或 index_patterns-yyyy-*
,等等
关于分片大小的一些信息:https://www.elastic.co/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster