如何使用 logstash 从搜索中删除 elasticsearch 中的所有文档
How to delete all documents in elasticsearch with logstash from a search
我正在使用 logstash 将数据传递给 elasticsearch,我想知道如何删除所有文档。
我这样做是为了删除那些带有id的,但是我现在需要的是删除所有匹配固定值的文档,例如Fixedfield = "Base1"不管获取的id是否jdbc 输入是否存在。
我的想法是删除 elasticsearch fixedField = "Base1" 存在的所有文档并插入我从 jdbc 输入中获得的新文档,这样我就避免留下不再存在的文档在我的来源中(jdbc 输入)。
一个更完整的例子
我的document_id组成:001、002、003等
我的固定字段由"Base1"三个组成document_id
有什么想法吗?
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
statement => "Select * from public.test"
}
}
filter {
if [is_deleted] {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
}
}
output {
elasticsearch {
hosts => "xxxxx"
user => "xxxxx"
password => "xxxxx"
index => "xxxxx"
document_type => "_doc"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
我终于设法消除了,但是.....我现在遇到的问题显然是当输入开始时,它会计算它获得的记录数,当它继续输出时,它会在第一个中消除轮并在以下n-1轮显示错误信息:
[HTTP Output Failure] Encountered non-2xx HTTP code 409
{:response_code=>409,
:url=>"http://localhost:9200/my_index/_delete_by_query",
另一个,我认为可能发生的是,_delete_by_query不是批量批量删除,而是查询/删除,这将导致查询返回 n 个结果,因此尝试删除 n 次.
关于如何迭代它一次或如何避免该错误的任何想法?
我澄清错误不仅显示一次,而且要删除的文档数显示n-1次
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
statement => "Select * from public.test"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => "localhost:9200"
index => "%{[@metadata][miEntidad]}"
document_type => "%{[@metadata][miDocumento]}"
document_id => "%{id}"
}
http {
url => "http://localhost:9200/my_index/_delete_by_query"
http_method => "post"
format => "message"
content_type => "application/json; charset=UTF-8"
message => '{"query": { "term": { "properties.codigo.keyword": "TEX_FOR_SEARCH_AND_DELETE" } }}'
}
}
最后变成这样:
output {
http {
url => "http://localhost:9200/%{[@metadata][miEntidad]}/_delete_by_query?conflicts=proceed"
http_method => "post"
format => "message"
content_type => "application/json; charset=UTF-8"
message => '{"query": { "term": { "properties.code.keyword": "%{[properties][code]}" } }}'
}
jdbc {
connection_string => 'xxxxxxxx'
statement => ["UPDATE test SET estate = 'A' WHERE entidad = ? ","%{[@metadata][miEntidad]}"]
}
}
我正在使用 logstash 将数据传递给 elasticsearch,我想知道如何删除所有文档。
我这样做是为了删除那些带有id的,但是我现在需要的是删除所有匹配固定值的文档,例如Fixedfield = "Base1"不管获取的id是否jdbc 输入是否存在。
我的想法是删除 elasticsearch fixedField = "Base1" 存在的所有文档并插入我从 jdbc 输入中获得的新文档,这样我就避免留下不再存在的文档在我的来源中(jdbc 输入)。 一个更完整的例子
我的document_id组成:001、002、003等
我的固定字段由"Base1"三个组成document_id
有什么想法吗?
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
statement => "Select * from public.test"
}
}
filter {
if [is_deleted] {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
}
}
output {
elasticsearch {
hosts => "xxxxx"
user => "xxxxx"
password => "xxxxx"
index => "xxxxx"
document_type => "_doc"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
我终于设法消除了,但是.....我现在遇到的问题显然是当输入开始时,它会计算它获得的记录数,当它继续输出时,它会在第一个中消除轮并在以下n-1轮显示错误信息:
[HTTP Output Failure] Encountered non-2xx HTTP code 409 {:response_code=>409, :url=>"http://localhost:9200/my_index/_delete_by_query",
另一个,我认为可能发生的是,_delete_by_query不是批量批量删除,而是查询/删除,这将导致查询返回 n 个结果,因此尝试删除 n 次.
关于如何迭代它一次或如何避免该错误的任何想法? 我澄清错误不仅显示一次,而且要删除的文档数显示n-1次
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
statement => "Select * from public.test"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => "localhost:9200"
index => "%{[@metadata][miEntidad]}"
document_type => "%{[@metadata][miDocumento]}"
document_id => "%{id}"
}
http {
url => "http://localhost:9200/my_index/_delete_by_query"
http_method => "post"
format => "message"
content_type => "application/json; charset=UTF-8"
message => '{"query": { "term": { "properties.codigo.keyword": "TEX_FOR_SEARCH_AND_DELETE" } }}'
}
}
最后变成这样:
output {
http {
url => "http://localhost:9200/%{[@metadata][miEntidad]}/_delete_by_query?conflicts=proceed"
http_method => "post"
format => "message"
content_type => "application/json; charset=UTF-8"
message => '{"query": { "term": { "properties.code.keyword": "%{[properties][code]}" } }}'
}
jdbc {
connection_string => 'xxxxxxxx'
statement => ["UPDATE test SET estate = 'A' WHERE entidad = ? ","%{[@metadata][miEntidad]}"]
}
}