ElasticSearch NEST:批量索引操作不使用指定的文档 ID
ElasticSearch NEST: Bulk-indexing operation does not make use of specified document IDs
我目前使用 ElasticSearch NEST 7.x 库。
在托管我的 ElasticSearch 主节点的虚拟机上,我是 运行 一个通过 REST 接收 JSON 数据的 Web 服务器。这些 JSON 数据随后将保存在 ElasticSearch 中。
首先将接收到的JSON数据传入该方法进行解析:
private static (bool Success, string ErrorMessage) TryReadRawJsonData(
string rawJsonData, out IEnumerable<(string Index, ExpandoObject JsonContent)> jsonLines)
{
var results = new List<(string Index, ExpandoObject JsonContent)>();
foreach (string rawDataLine in HttpContext.Current.Server.UrlDecode(rawJsonData).Split('\n').Where(line => !string.IsNullOrWhiteSpace(line)))
{
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(rawDataLine);
if (!Dynamic.HasProperty(expandoObject, "IndexId"))
{
jsonLines = Enumerable.Empty<(string, ExpandoObject)>();
return (Success: false, ErrorMessage: $"No field named 'IndexId' found in {rawDataLine}.");
}
string indexId = (string)expandoObject.IndexId.ToLower();
results.Add((indexId, JsonContent: expandoObject));
}
jsonLines = results;
return (Success: true, ErrorMessage: null);
}
如果解析成功,return 值随后会传递到此方法中以进行批量索引:
private static async Task<HttpResponseMessage> BulkIndexAsync(IEnumerable<(string Index, ExpandoObject JsonContent)> contents)
{
foreach (var group in contents.GroupBy(line => line.Index))
{
BulkResponse bulkIndexResponse =
await ElasticClient.BulkAsync(bulk => bulk.Index(group.Key).IndexMany(group.Select(member => member.JsonContent)));
if (bulkIndexResponse.Errors)
{
return new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content = new StringContent(bulkIndexResponse.ItemsWithErrors
.Select(itemWithError =>
$"Index: {itemWithError.Index}; " +
$"Document Id: {itemWithError.Id}; " +
$"Error: {itemWithError.Error.Reason}.")
.ConcatenateIntoString(separator: "\n"))
};
}
}
return new HttpResponseMessage(HttpStatusCode.OK);
}
批量索引操作成功,但遗憾的是文档 ID 与我预期的不同。这是一个例子:
{
"_index": "dummyindex",
"_type": "_doc",
"_id": "U1W4Z20BcmiMRnw-blTi",
"_score": 1.0,
"_source": {
"IndexId": "dummyindex",
"Id": "0c2d48bd-6842-4f15-b7f2-57fa259b0642",
"UserId": "dummy_user_1",
"Country": "dummy_stan"
}
}
如您所见,Id
字段为 0c2d48bd-6842-4f15-b7f2-57fa259b0642
,根据 documentation,应自动将其推断为文档 ID。但是,_id
字段设置为 U1W4Z20BcmiMRnw-blTi
而不是 0c2d48bd-6842-4f15-b7f2-57fa259b0642
。
我做错了什么?
答案取自 here:
ExpandoObject
上的 Id
不是类型的 属性,而是 ExpandoObject
支持的基础 IDictionary<string,object>
中的键.
您可以通过 ExpandoObject
和
的属性来查看这一点
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
""IndexId"": ""dummyindex"",
""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
""UserId"": ""dummy_user_1"",
""Country"": ""dummy_stan""
}
");
Type t = expandoObject.GetType();
PropertyInfo[] properties = t.GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
foreach (PropertyInfo property in properties)
{
Console.WriteLine(property.ToString());
}
打印
System.Dynamic.ExpandoClass Class
System.Collections.Generic.ICollection`1[System.String] System.Collections.Generic.IDictionary<System.String,System.Object>.Keys
System.Collections.Generic.ICollection`1[System.Object] System.Collections.Generic.IDictionary<System.String,System.Object>.Values
System.Object System.Collections.Generic.IDictionary<System.String,System.Object>.Item [System.String]
Int32 System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.Count
Boolean System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.IsReadOnly
要解决您的问题,您可以为每个文档指定 Id
,但是通过将第二个委托参数传递给 .IndexMany()
:
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
""IndexId"": ""dummyindex"",
""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
""UserId"": ""dummy_user_1"",
""Country"": ""dummy_stan""
}
");
var bulkResponse = client.Bulk(bu => bu
.IndexMany(new[] { expandoObject }, (b, d) => b.Id((Id)d.Id))
);
从 d.Id
到 Id
的转换(或者可能是字符串,因为这是实际类型,但是转换到 Id
将使用从字符串到 Id
) 是必需的,因为 d
是动态类型,没有它运行时无法分派。
我目前使用 ElasticSearch NEST 7.x 库。
在托管我的 ElasticSearch 主节点的虚拟机上,我是 运行 一个通过 REST 接收 JSON 数据的 Web 服务器。这些 JSON 数据随后将保存在 ElasticSearch 中。
首先将接收到的JSON数据传入该方法进行解析:
private static (bool Success, string ErrorMessage) TryReadRawJsonData(
string rawJsonData, out IEnumerable<(string Index, ExpandoObject JsonContent)> jsonLines)
{
var results = new List<(string Index, ExpandoObject JsonContent)>();
foreach (string rawDataLine in HttpContext.Current.Server.UrlDecode(rawJsonData).Split('\n').Where(line => !string.IsNullOrWhiteSpace(line)))
{
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(rawDataLine);
if (!Dynamic.HasProperty(expandoObject, "IndexId"))
{
jsonLines = Enumerable.Empty<(string, ExpandoObject)>();
return (Success: false, ErrorMessage: $"No field named 'IndexId' found in {rawDataLine}.");
}
string indexId = (string)expandoObject.IndexId.ToLower();
results.Add((indexId, JsonContent: expandoObject));
}
jsonLines = results;
return (Success: true, ErrorMessage: null);
}
如果解析成功,return 值随后会传递到此方法中以进行批量索引:
private static async Task<HttpResponseMessage> BulkIndexAsync(IEnumerable<(string Index, ExpandoObject JsonContent)> contents)
{
foreach (var group in contents.GroupBy(line => line.Index))
{
BulkResponse bulkIndexResponse =
await ElasticClient.BulkAsync(bulk => bulk.Index(group.Key).IndexMany(group.Select(member => member.JsonContent)));
if (bulkIndexResponse.Errors)
{
return new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content = new StringContent(bulkIndexResponse.ItemsWithErrors
.Select(itemWithError =>
$"Index: {itemWithError.Index}; " +
$"Document Id: {itemWithError.Id}; " +
$"Error: {itemWithError.Error.Reason}.")
.ConcatenateIntoString(separator: "\n"))
};
}
}
return new HttpResponseMessage(HttpStatusCode.OK);
}
批量索引操作成功,但遗憾的是文档 ID 与我预期的不同。这是一个例子:
{
"_index": "dummyindex",
"_type": "_doc",
"_id": "U1W4Z20BcmiMRnw-blTi",
"_score": 1.0,
"_source": {
"IndexId": "dummyindex",
"Id": "0c2d48bd-6842-4f15-b7f2-57fa259b0642",
"UserId": "dummy_user_1",
"Country": "dummy_stan"
}
}
如您所见,Id
字段为 0c2d48bd-6842-4f15-b7f2-57fa259b0642
,根据 documentation,应自动将其推断为文档 ID。但是,_id
字段设置为 U1W4Z20BcmiMRnw-blTi
而不是 0c2d48bd-6842-4f15-b7f2-57fa259b0642
。
我做错了什么?
答案取自 here:
ExpandoObject
上的 Id
不是类型的 属性,而是 ExpandoObject
支持的基础 IDictionary<string,object>
中的键.
您可以通过 ExpandoObject
和
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
""IndexId"": ""dummyindex"",
""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
""UserId"": ""dummy_user_1"",
""Country"": ""dummy_stan""
}
");
Type t = expandoObject.GetType();
PropertyInfo[] properties = t.GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
foreach (PropertyInfo property in properties)
{
Console.WriteLine(property.ToString());
}
打印
System.Dynamic.ExpandoClass Class
System.Collections.Generic.ICollection`1[System.String] System.Collections.Generic.IDictionary<System.String,System.Object>.Keys
System.Collections.Generic.ICollection`1[System.Object] System.Collections.Generic.IDictionary<System.String,System.Object>.Values
System.Object System.Collections.Generic.IDictionary<System.String,System.Object>.Item [System.String]
Int32 System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.Count
Boolean System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<System.String,System.Object>>.IsReadOnly
要解决您的问题,您可以为每个文档指定 Id
,但是通过将第二个委托参数传递给 .IndexMany()
:
dynamic expandoObject = JsonConvert.DeserializeObject<ExpandoObject>(@"{
""IndexId"": ""dummyindex"",
""Id"": ""0c2d48bd-6842-4f15-b7f2-57fa259b0642"",
""UserId"": ""dummy_user_1"",
""Country"": ""dummy_stan""
}
");
var bulkResponse = client.Bulk(bu => bu
.IndexMany(new[] { expandoObject }, (b, d) => b.Id((Id)d.Id))
);
从 d.Id
到 Id
的转换(或者可能是字符串,因为这是实际类型,但是转换到 Id
将使用从字符串到 Id
) 是必需的,因为 d
是动态类型,没有它运行时无法分派。