Logstash 是否支持 Elasticsearch 的 _update_by_query?

Does Logstash support Elasticsearch's _update_by_query?

Elasticsearch 输出插件是否支持elasticsearch 的_update_by_query? https://www.elastic.co/guide/en/logstash/6.5/plugins-outputs-elasticsearch.html https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html

elasticsearch 输出插件只能调用 _bulk 端点,即使用 Bulk API

如果你想通过查询调用更新API,你需要使用http输出插件并自己在事件中构建查询。如果你解释你想要实现的目标,我可以用更多细节更新我的答案。

注意:有一个 issue 请求此功能,但它在两年后仍然开放。

更新

因此,如果您的输入事件是 {"cname":"wang", "cage":11},并且您希望通过查询所有具有 "cname":"wang" 的文档来更新以设置 "cage":11,您的查询需要如下所示:

POST your-index/_update_by_query
{
  "script": {
    "source": "ctx._source.cage = params.cage",
    "lang": "painless",
    "params": {
      "cage": 11
    }
  },
  "query": {
    "term": {
      "cname": "wang"
    }
  }
}

因此您的 Logstash 配置应如下所示(您的输入可能会有所不同,但我使用 stdin 进行测试):

input {
  stdin {
    codec => "json"
  }
}
filter {
  mutate {
    add_field => {
      "[script][lang]" => "painless"
      "[script][source]" => "ctx._source.cage = params.cage"
      "[script][params][cage]" => "%{cage}"
      "[query][term][cname]" => "%{cname}"
    }
    remove_field => ["host", "@version", "@timestamp", "cname", "cage"]
  }
}
output {
  http {
    url => "http://localhost:9200/index/doc/_update_by_query"
    http_method => "post"
    format => "json"
  }
}

使用标准的 elasticsearch 插件可以获得相同的结果:

input {
    elasticsearch {
        hosts => "${ES_HOSTS}"
        user => "${ES_USER}"
        password => "${ES_PWD}"
        index => "<your index pattern>"
        size => 500
        scroll => "5m"
        docinfo => true
    }
}

filter {
    ...
}

output {
    elasticsearch {
        hosts => "${ES_HOSTS}"
        user => "${ES_USER}"
        password => "${ES_PWD}"
        action => "update"
        document_id => "%{[@metadata][_id]}"
        index => "%{[@metadata][_index]}"
    }
}