logstash: jdbc_page_size 没有将我所有的数据转储到 elasticsearch
logstach: jdbc_page_size doesn't dump all my data to elastic search
我想将 tom_test2 postgresql table 导出到弹性搜索。 table 有 176805 行:
=> select count(*) from tom_test2;
count
--------
176805
(1 row)
以下 logstach conf 文件将我的数据正确导入弹性搜索:
input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "xxx"
# The user we wish to execute our statement as
jdbc_user => "xxx"
jdbc_password => "xxx"
# The path to our downloaded jdbc driver
jdbc_driver_library => "xxx"
# The name of the driver class for Postgresql
jdbc_driver_class => "org.postgresql.Driver"
# our query
statement => "select * from tom_test2"
}
}
output {
elasticsearch {
hosts => ["xxx"]
index => "tom"
document_type => "tom_test"
}
}
在弹性搜索中:
GET tom/tom_test/_search
"hits": {
"total": 176805,
"max_score": 1,
}
我正在删除弹性搜索中的索引:
delete tom
我现在想使用 jdbc_page_size 执行相同的操作以防我的数据变大,我的 logstach conf 文件现在是:
input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "xxx"
# The user we wish to execute our statement as
jdbc_user => "xxx"
jdbc_password => "xxx"
# The path to our downloaded jdbc driver
jdbc_driver_library => "xxx"
# The name of the driver class for Postgresql
jdbc_driver_class => "org.postgresql.Driver"
# our query
statement => "select * from tom_test2"
jdbc_page_size => 1000
jdbc_paging_enabled => true
}
}
output {
elasticsearch {
hosts => ["xxx"]
index => "tom"
document_type => "tom_test"
}
}
我现在数错了:
GET tom/tom_test/_search
"hits": {
"total": 106174,
"max_score": 1,
}
因为 176805-106174=70631 行丢失
您面临这种情况的原因 - 您有排序问题:您的查询不控制接收数据的顺序,并且通常 postgresql 不应该保证在无序的后续分页调用中您不获取相同的数据:这会产生一些数据根本不会被获取的情况,而一些数据将被多次获取 :( 即使在这些调用期间数据没有被修改,后台 vacuum worker 也可能会改变数据的顺序物理文件,从而重现描述的情况。
向您的声明添加顺序 SELECT * FROM tom_test2 ORDER BY id
并分页您的数据。但请注意:在这种情况下,您上传到 elasticsearch 将无法确保 table 的准确副本。其原因是,在后续分页请求的 logstash 处理过程中,引入了即将到来的页面中的数据更新,即您正在上传页面 1 到 10000,更新发生在页面 10001 和 20000 上的数据,否则稍后更新...所以你的数据一致性有问题。
或者如果您想获取所有数据并在 logstash 上大量使用内存...,那么您需要控制 jdbc_fetch_size
参数:即您正在执行相同的 SELECT * FROM tom_test2
。使用这种方法,您将创建一个查询结果集,但将 "pump" 分段,并且在 "pumping" 期间修改数据不会导致您:您将在查询开始时获取状态。
因为 jdbc_page_size
中的查询之间的排序无法保证,如 documentation of jdbc_paging_enabled
中的警告。
我建议对大型结果集使用 jdbc_fetch_size
instead of using jdbc_page_size
as the documentation also says that。
P.S:有时 ;) 在 http://discuss.elastic.co 上提问最好由弹性维护者
回答
我想将 tom_test2 postgresql table 导出到弹性搜索。 table 有 176805 行:
=> select count(*) from tom_test2;
count
--------
176805
(1 row)
以下 logstach conf 文件将我的数据正确导入弹性搜索:
input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "xxx"
# The user we wish to execute our statement as
jdbc_user => "xxx"
jdbc_password => "xxx"
# The path to our downloaded jdbc driver
jdbc_driver_library => "xxx"
# The name of the driver class for Postgresql
jdbc_driver_class => "org.postgresql.Driver"
# our query
statement => "select * from tom_test2"
}
}
output {
elasticsearch {
hosts => ["xxx"]
index => "tom"
document_type => "tom_test"
}
}
在弹性搜索中:
GET tom/tom_test/_search
"hits": {
"total": 176805,
"max_score": 1,
}
我正在删除弹性搜索中的索引:
delete tom
我现在想使用 jdbc_page_size 执行相同的操作以防我的数据变大,我的 logstach conf 文件现在是:
input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "xxx"
# The user we wish to execute our statement as
jdbc_user => "xxx"
jdbc_password => "xxx"
# The path to our downloaded jdbc driver
jdbc_driver_library => "xxx"
# The name of the driver class for Postgresql
jdbc_driver_class => "org.postgresql.Driver"
# our query
statement => "select * from tom_test2"
jdbc_page_size => 1000
jdbc_paging_enabled => true
}
}
output {
elasticsearch {
hosts => ["xxx"]
index => "tom"
document_type => "tom_test"
}
}
我现在数错了:
GET tom/tom_test/_search
"hits": {
"total": 106174,
"max_score": 1,
}
因为 176805-106174=70631 行丢失
您面临这种情况的原因 - 您有排序问题:您的查询不控制接收数据的顺序,并且通常 postgresql 不应该保证在无序的后续分页调用中您不获取相同的数据:这会产生一些数据根本不会被获取的情况,而一些数据将被多次获取 :( 即使在这些调用期间数据没有被修改,后台 vacuum worker 也可能会改变数据的顺序物理文件,从而重现描述的情况。
向您的声明添加顺序 SELECT * FROM tom_test2 ORDER BY id
并分页您的数据。但请注意:在这种情况下,您上传到 elasticsearch 将无法确保 table 的准确副本。其原因是,在后续分页请求的 logstash 处理过程中,引入了即将到来的页面中的数据更新,即您正在上传页面 1 到 10000,更新发生在页面 10001 和 20000 上的数据,否则稍后更新...所以你的数据一致性有问题。
或者如果您想获取所有数据并在 logstash 上大量使用内存...,那么您需要控制 jdbc_fetch_size
参数:即您正在执行相同的 SELECT * FROM tom_test2
。使用这种方法,您将创建一个查询结果集,但将 "pump" 分段,并且在 "pumping" 期间修改数据不会导致您:您将在查询开始时获取状态。
因为 jdbc_page_size
中的查询之间的排序无法保证,如 documentation of jdbc_paging_enabled
中的警告。
我建议对大型结果集使用 jdbc_fetch_size
instead of using jdbc_page_size
as the documentation also says that。
P.S:有时 ;) 在 http://discuss.elastic.co 上提问最好由弹性维护者
回答