logstash: jdbc_page_size 没有将我所有的数据转储到 elasticsearch

logstach: jdbc_page_size doesn't dump all my data to elastic search

我想将 tom_test2 postgresql table 导出到弹性搜索。 table 有 176805 行:

=> select count(*) from tom_test2;
 count  
--------
 176805
(1 row)

以下 logstach conf 文件将我的数据正确导入弹性搜索:

input {
    jdbc {
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "xxx"
        # The user we wish to execute our statement as
        jdbc_user => "xxx"
        jdbc_password => "xxx"
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "xxx"
        # The name of the driver class for Postgresql
        jdbc_driver_class => "org.postgresql.Driver"
        # our query
        statement => "select * from tom_test2"
    }
}


output {
    elasticsearch {
        hosts => ["xxx"]
        index => "tom"
        document_type => "tom_test"
    }
}

在弹性搜索中:

GET tom/tom_test/_search

  "hits": {
    "total": 176805,
    "max_score": 1,
}

我正在删除弹性搜索中的索引:

delete tom

我现在想使用 jdbc_page_size 执行相同的操作以防我的数据变大,我的 logstach conf 文件现在是:

input {
    jdbc {
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "xxx"
        # The user we wish to execute our statement as
        jdbc_user => "xxx"
        jdbc_password => "xxx"
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "xxx"
        # The name of the driver class for Postgresql
        jdbc_driver_class => "org.postgresql.Driver"
        # our query
        statement => "select * from tom_test2"

        jdbc_page_size => 1000
        jdbc_paging_enabled => true
    }
}


output {
    elasticsearch {
        hosts => ["xxx"]
        index => "tom"
        document_type => "tom_test"
    }
}

我现在数错了:

GET tom/tom_test/_search

  "hits": {
    "total": 106174,
    "max_score": 1,
}

因为 176805-106174=70631 行丢失

您面临这种情况的原因 - 您有排序问题:您的查询不控制接收数据的顺序,并且通常 postgresql 不应该保证在无序的后续分页调用中您不获取相同的数据:这会产生一些数据根本不会被获取的情况,而一些数据将被多次获取 :( 即使在这些调用期间数据没有被修改,后台 vacuum worker 也可能会改变数据的顺序物理文件,从而重现描述的情况。

向您的声明添加顺序 SELECT * FROM tom_test2 ORDER BY id 并分页您的数据。但请注意:在这种情况下,您上传​​到 elasticsearch 将无法确保 table 的准确副本。其原因是,在后续分页请求的 logstash 处理过程中,引入了即将到来的页面中的数据更新,即您正在上传页面 1 到 10000,更新发生在页面 10001 和 20000 上的数据,否则稍后更新...所以你的数据一致性有问题。

或者如果您想获取所有数据并在 logstash 上大量使用内存...,那么您需要控制 jdbc_fetch_size 参数:即您正在执行相同的 SELECT * FROM tom_test2。使用这种方法,您将创建一个查询结果集,但将 "pump" 分段,并且在 "pumping" 期间修改数据不会导致您:您将在查询开始时获取状态。

因为 jdbc_page_size 中的查询之间的排序无法保证,如 documentation of jdbc_paging_enabled 中的警告。

我建议对大型结果集使用 jdbc_fetch_size instead of using jdbc_page_size as the documentation also says that

P.S:有时 ;) 在 http://discuss.elastic.co 上提问最好由弹性维护者

回答