如何使用 logstash 从搜索中删除 elasticsearch 中的所有文档

How to delete all documents in elasticsearch with logstash from a search

我正在使用 logstash 将数据传递给 elasticsearch,我想知道如何删除所有文档。

我这样做是为了删除那些带有id的,但是我现在需要的是删除所有匹配固定值的文档,例如Fixedfield = "Base1"不管获取的id是否jdbc 输入是否存在。

我的想法是删除 elasticsearch fixedField = "Base1" 存在的所有文档并插入我从 jdbc 输入中获得的新文档,这样我就避免留下不再存在的文档在我的来源中(jdbc 输入)。 一个更完整的例子

我的document_id组成:001、002、003等

我的固定字段由"Base1"三个组成document_id

有什么想法吗?

input {
  jdbc {
  jdbc_driver_library => ""
  jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
  statement => "Select * from public.test"
  }
}

filter {
if [is_deleted] {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "delete"
            }
        }
        mutate {
            remove_field => [ "is_deleted","@version","@timestamp" ]
        }
    } else {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "index"
            }
        }
        mutate {
            remove_field => [ "is_deleted","@version","@timestamp" ]
        }
    } 
}

output {
  elasticsearch {
    hosts => "xxxxx"
    user => "xxxxx"
    password => "xxxxx"
    index => "xxxxx"
    document_type => "_doc"
    document_id => "%{id}"
  }
  stdout { codec => rubydebug }
}

我终于设法消除了,但是.....我现在遇到的问题显然是当输入开始时,它会计算它获得的记录数,当它继续输出时,它会在第一个中消除轮并在以下n-1轮显示错误信息:

[HTTP Output Failure] Encountered non-2xx HTTP code 409 {:response_code=>409, :url=>"http://localhost:9200/my_index/_delete_by_query",

另一个,我认为可能发生的是,_delete_by_query不是批量批量删除,而是查询/删除,这将导致查询返回 n 个结果,因此尝试删除 n 次.

关于如何迭代它一次或如何避免该错误的任何想法? 我澄清错误不仅显示一次,而且要删除的文档数显示n-1次

input {
  jdbc {
  jdbc_driver_library => ""
  jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
  statement => "Select * from public.test"
  }
}



output {
   stdout { codec => json_lines }

  elasticsearch {
        hosts => "localhost:9200"
        index => "%{[@metadata][miEntidad]}"
        document_type => "%{[@metadata][miDocumento]}"
        document_id => "%{id}"
  }


 http {
      url => "http://localhost:9200/my_index/_delete_by_query"
      http_method => "post"
      format => "message"
      content_type => "application/json; charset=UTF-8"
      message => '{"query": { "term": { "properties.codigo.keyword": "TEX_FOR_SEARCH_AND_DELETE"  } }}'
  }
 }

最后变成这样:

output {  

    http {
                url => "http://localhost:9200/%{[@metadata][miEntidad]}/_delete_by_query?conflicts=proceed"
                http_method => "post"
                format => "message"
                content_type => "application/json; charset=UTF-8"
                message => '{"query": { "term": { "properties.code.keyword": "%{[properties][code]}"  } }}'

    }

    jdbc {
                connection_string => 'xxxxxxxx'       
                statement => ["UPDATE test SET estate = 'A' WHERE entidad = ? ","%{[@metadata][miEntidad]}"]   
    } 
}