Elastic Search 摄取附件插件块
Elastic Search ingest attachment plugin blocks
我正在使用 NEST (C#) 和 ingest attachment plugin 将成千上万的文档提取到 Elastic 搜索实例中。不幸的是,过了一会儿一切都停滞不前——即没有更多的文件被摄取。日志显示:
[2019-02-20T17:35:07,528][INFO ][o.e.m.j.JvmGcMonitorService] [BwAAiDl] [gc][7412] overhead, spent [326ms] collecting in the last [1s]
不确定这是否告诉了任何人任何信息?顺便说一句,是否有更有效的方法来摄取许多文档(而不是使用数千个 REST 请求)?
我正在使用这种代码:
client.Index(new Document
{
Id = Guid.NewGuid(),
Path = somePath,
Content = Convert.ToBase64String(File.ReadAllBytes(somePath))
}, i => i.Pipeline("attachments"));
定义管道:
client.PutPipeline("attachments", p => p
.Description("Document attachment pipeline")
.Processors(pr => pr
.Attachment<Document>(a => a
.Field(f => f.Content)
.TargetField(f => f.Attachment)
)
.Remove<Document>(r => r
.Field(f => f.Content)
)
)
);
日志表明在Elasticsearch 服务器端执行垃圾收集花费了相当多的时间;这很可能是您看到的大量停止事件的原因。如果您在集群上启用了监控(理想情况下将此类数据导出到单独的集群),我会分析这些数据,看看它是否能阐明为什么会发生大型 GC。
are there more efficient ways to ingest many documents (rather than using thousands of REST requests)?
是的,您在单独的索引请求中为每个附件编制索引。根据每个附件的大小,base64 编码,您可能希望一次发送多个请求
// Your collection of documents
var documents = new[]
{
new Document
{
Id = Guid.NewGuid(),
Path = "path",
Content = "content"
},
new Document
{
Id = Guid.NewGuid(),
Path = "path",
Content = "content" // base64 encoded bytes
}
};
var client = new ElasticClient();
var bulkResponse = client.Bulk(b => b
.Pipeline("attachments")
.IndexMany(documents)
);
如果您正在从文件系统读取文档,您可能想要懒惰地枚举它们并发送批量请求。在这里,您也可以使用 BulkAll
辅助方法。
首先有一些懒惰枚举的文档集合
public static IEnumerable<Document> GetDocuments()
{
var count = 0;
while (count++ < 20)
{
yield return new Document
{
Id = Guid.NewGuid(),
Path = "path",
Content = "content" // base64 encoded bytes
};
}
}
然后配置BulkAll
调用
var client = new ElasticClient();
// set up the observable configuration
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
.Pipeline("attachments")
.Size(10)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
// set up what to do in response to next bulk call, exception and completion
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
// perform some action e.g. incrementing counter
// to indicate how many have been indexed
},
onError: e =>
{
exception = e;
waitHandle.Set();
},
onCompleted: () =>
{
waitHandle.Set();
});
// start the observable process
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for indexing to finish, either forever,
// or set a max timeout as here.
waitHandle.WaitOne(TimeSpan.FromHours(1));
if (exception != null)
throw exception;
大小决定了每个请求中要发送多少文档。对于集群的大小没有硬性规定,因为它可能取决于许多因素,包括摄取管道、文档映射、文档的字节大小、集群硬件等。您可以配置observable 重试未能被索引的文档,如果您看到 ,则说明您已达到集群可以并发处理的极限。
另一个建议是文档 ID。我看到您正在为文档的 ID 使用新的 Guid,这对我来说意味着您不关心每个文档的值是什么。如果是这种情况,我建议不要发送 Id 值,而是让 Elasticsearch 为每个文档生成一个 id。这很可能导致 improvement in performance (我相信 Elasticsearch 和 Lucene 中的实现在这个 post 之后发生了轻微的变化,但重点仍然存在) .
我正在使用 NEST (C#) 和 ingest attachment plugin 将成千上万的文档提取到 Elastic 搜索实例中。不幸的是,过了一会儿一切都停滞不前——即没有更多的文件被摄取。日志显示:
[2019-02-20T17:35:07,528][INFO ][o.e.m.j.JvmGcMonitorService] [BwAAiDl] [gc][7412] overhead, spent [326ms] collecting in the last [1s]
不确定这是否告诉了任何人任何信息?顺便说一句,是否有更有效的方法来摄取许多文档(而不是使用数千个 REST 请求)?
我正在使用这种代码:
client.Index(new Document
{
Id = Guid.NewGuid(),
Path = somePath,
Content = Convert.ToBase64String(File.ReadAllBytes(somePath))
}, i => i.Pipeline("attachments"));
定义管道:
client.PutPipeline("attachments", p => p
.Description("Document attachment pipeline")
.Processors(pr => pr
.Attachment<Document>(a => a
.Field(f => f.Content)
.TargetField(f => f.Attachment)
)
.Remove<Document>(r => r
.Field(f => f.Content)
)
)
);
日志表明在Elasticsearch 服务器端执行垃圾收集花费了相当多的时间;这很可能是您看到的大量停止事件的原因。如果您在集群上启用了监控(理想情况下将此类数据导出到单独的集群),我会分析这些数据,看看它是否能阐明为什么会发生大型 GC。
are there more efficient ways to ingest many documents (rather than using thousands of REST requests)?
是的,您在单独的索引请求中为每个附件编制索引。根据每个附件的大小,base64 编码,您可能希望一次发送多个请求
// Your collection of documents
var documents = new[]
{
new Document
{
Id = Guid.NewGuid(),
Path = "path",
Content = "content"
},
new Document
{
Id = Guid.NewGuid(),
Path = "path",
Content = "content" // base64 encoded bytes
}
};
var client = new ElasticClient();
var bulkResponse = client.Bulk(b => b
.Pipeline("attachments")
.IndexMany(documents)
);
如果您正在从文件系统读取文档,您可能想要懒惰地枚举它们并发送批量请求。在这里,您也可以使用 BulkAll
辅助方法。
首先有一些懒惰枚举的文档集合
public static IEnumerable<Document> GetDocuments()
{
var count = 0;
while (count++ < 20)
{
yield return new Document
{
Id = Guid.NewGuid(),
Path = "path",
Content = "content" // base64 encoded bytes
};
}
}
然后配置BulkAll
调用
var client = new ElasticClient();
// set up the observable configuration
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
.Pipeline("attachments")
.Size(10)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
// set up what to do in response to next bulk call, exception and completion
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
// perform some action e.g. incrementing counter
// to indicate how many have been indexed
},
onError: e =>
{
exception = e;
waitHandle.Set();
},
onCompleted: () =>
{
waitHandle.Set();
});
// start the observable process
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for indexing to finish, either forever,
// or set a max timeout as here.
waitHandle.WaitOne(TimeSpan.FromHours(1));
if (exception != null)
throw exception;
大小决定了每个请求中要发送多少文档。对于集群的大小没有硬性规定,因为它可能取决于许多因素,包括摄取管道、文档映射、文档的字节大小、集群硬件等。您可以配置observable 重试未能被索引的文档,如果您看到
另一个建议是文档 ID。我看到您正在为文档的 ID 使用新的 Guid,这对我来说意味着您不关心每个文档的值是什么。如果是这种情况,我建议不要发送 Id 值,而是让 Elasticsearch 为每个文档生成一个 id。这很可能导致 improvement in performance (我相信 Elasticsearch 和 Lucene 中的实现在这个 post 之后发生了轻微的变化,但重点仍然存在) .