Logstash 是否支持 Elasticsearch 的 _update_by_query?
Does Logstash support Elasticsearch's _update_by_query?
Elasticsearch 输出插件是否支持elasticsearch 的_update_by_query?
https://www.elastic.co/guide/en/logstash/6.5/plugins-outputs-elasticsearch.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
elasticsearch
输出插件只能调用 _bulk
端点,即使用 Bulk API。
如果你想通过查询调用更新API,你需要使用http
输出插件并自己在事件中构建查询。如果你解释你想要实现的目标,我可以用更多细节更新我的答案。
注意:有一个 issue 请求此功能,但它在两年后仍然开放。
更新
因此,如果您的输入事件是 {"cname":"wang", "cage":11}
,并且您希望通过查询所有具有 "cname":"wang"
的文档来更新以设置 "cage":11
,您的查询需要如下所示:
POST your-index/_update_by_query
{
"script": {
"source": "ctx._source.cage = params.cage",
"lang": "painless",
"params": {
"cage": 11
}
},
"query": {
"term": {
"cname": "wang"
}
}
}
因此您的 Logstash 配置应如下所示(您的输入可能会有所不同,但我使用 stdin
进行测试):
input {
stdin {
codec => "json"
}
}
filter {
mutate {
add_field => {
"[script][lang]" => "painless"
"[script][source]" => "ctx._source.cage = params.cage"
"[script][params][cage]" => "%{cage}"
"[query][term][cname]" => "%{cname}"
}
remove_field => ["host", "@version", "@timestamp", "cname", "cage"]
}
}
output {
http {
url => "http://localhost:9200/index/doc/_update_by_query"
http_method => "post"
format => "json"
}
}
使用标准的 elasticsearch 插件可以获得相同的结果:
input {
elasticsearch {
hosts => "${ES_HOSTS}"
user => "${ES_USER}"
password => "${ES_PWD}"
index => "<your index pattern>"
size => 500
scroll => "5m"
docinfo => true
}
}
filter {
...
}
output {
elasticsearch {
hosts => "${ES_HOSTS}"
user => "${ES_USER}"
password => "${ES_PWD}"
action => "update"
document_id => "%{[@metadata][_id]}"
index => "%{[@metadata][_index]}"
}
}
Elasticsearch 输出插件是否支持elasticsearch 的_update_by_query? https://www.elastic.co/guide/en/logstash/6.5/plugins-outputs-elasticsearch.html https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
elasticsearch
输出插件只能调用 _bulk
端点,即使用 Bulk API。
如果你想通过查询调用更新API,你需要使用http
输出插件并自己在事件中构建查询。如果你解释你想要实现的目标,我可以用更多细节更新我的答案。
注意:有一个 issue 请求此功能,但它在两年后仍然开放。
更新
因此,如果您的输入事件是 {"cname":"wang", "cage":11}
,并且您希望通过查询所有具有 "cname":"wang"
的文档来更新以设置 "cage":11
,您的查询需要如下所示:
POST your-index/_update_by_query
{
"script": {
"source": "ctx._source.cage = params.cage",
"lang": "painless",
"params": {
"cage": 11
}
},
"query": {
"term": {
"cname": "wang"
}
}
}
因此您的 Logstash 配置应如下所示(您的输入可能会有所不同,但我使用 stdin
进行测试):
input {
stdin {
codec => "json"
}
}
filter {
mutate {
add_field => {
"[script][lang]" => "painless"
"[script][source]" => "ctx._source.cage = params.cage"
"[script][params][cage]" => "%{cage}"
"[query][term][cname]" => "%{cname}"
}
remove_field => ["host", "@version", "@timestamp", "cname", "cage"]
}
}
output {
http {
url => "http://localhost:9200/index/doc/_update_by_query"
http_method => "post"
format => "json"
}
}
使用标准的 elasticsearch 插件可以获得相同的结果:
input {
elasticsearch {
hosts => "${ES_HOSTS}"
user => "${ES_USER}"
password => "${ES_PWD}"
index => "<your index pattern>"
size => 500
scroll => "5m"
docinfo => true
}
}
filter {
...
}
output {
elasticsearch {
hosts => "${ES_HOSTS}"
user => "${ES_USER}"
password => "${ES_PWD}"
action => "update"
document_id => "%{[@metadata][_id]}"
index => "%{[@metadata][_index]}"
}
}