Elasticsearch/Logstash 使用计划时重复输出
Elasticsearch/Logstash duplicating output when use schedule
我正在使用 Elasticsearch 和 Logstash。
我想在数据库更改时更新索引。所以我决定使用 LS schedule。但是每 1 分钟输出附加数据库 table 条记录。
示例:合同 table 有 2 行。
前1分钟总产量:2,后1分钟总产量为:4;
我该如何解决?
这是我的配置文件。命令是 bin/logstash -f contract.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/resource"
jdbc_user => "postgres"
jdbc_validate_connection => true
jdbc_driver_library => "/var/www/html/iltodgeree/logstash/postgres.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * FROM contracts;"
schedule => "* * * * *"
codec => "json"
}
}
output {
elasticsearch {
index => "resource_contracts"
document_type => "metadata"
hosts => "localhost:9200"
}
}
您需要通过指定 document_id
设置和使用合同 table 中的 ID 字段来修改输出。这样,您将永远不会重复。
output {
elasticsearch {
index => "resource_contracts"
document_type => "metadata"
document_id => "%{ID_FIELD}"
hosts => "localhost:9200"
}
}
此外,如果您的 contracts
table 中有更新时间戳,您可以修改输入中的 SQL 语句,如下所示,以便仅复制最近更改的记录:
statement => "SELECT * FROM contracts WHERE timestamp > :sql_last_value;"
我正在使用 Elasticsearch 和 Logstash。 我想在数据库更改时更新索引。所以我决定使用 LS schedule。但是每 1 分钟输出附加数据库 table 条记录。 示例:合同 table 有 2 行。 前1分钟总产量:2,后1分钟总产量为:4; 我该如何解决?
这是我的配置文件。命令是 bin/logstash -f contract.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/resource"
jdbc_user => "postgres"
jdbc_validate_connection => true
jdbc_driver_library => "/var/www/html/iltodgeree/logstash/postgres.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * FROM contracts;"
schedule => "* * * * *"
codec => "json"
}
}
output {
elasticsearch {
index => "resource_contracts"
document_type => "metadata"
hosts => "localhost:9200"
}
}
您需要通过指定 document_id
设置和使用合同 table 中的 ID 字段来修改输出。这样,您将永远不会重复。
output {
elasticsearch {
index => "resource_contracts"
document_type => "metadata"
document_id => "%{ID_FIELD}"
hosts => "localhost:9200"
}
}
此外,如果您的 contracts
table 中有更新时间戳,您可以修改输入中的 SQL 语句,如下所示,以便仅复制最近更改的记录:
statement => "SELECT * FROM contracts WHERE timestamp > :sql_last_value;"