使用 logstash 处理删除场景的增量索引
Incremental indexing using logstash handle delete scenario
每当插入或更新新行时,我都使用下面的 Logstash 配置进行增量索引我能够从 MSSQL 服务器获取这些特定行并将其作为文档插入到 elasticsearch 中,但挑战在于删除操作.
Logstash 配置文件
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
tracking_column_type => "timestamp"
use_column_value => true
jdbc_password => "xxxx"
clean_run => true
schedule => "*/1 * * * *"
statement => "Select * from [dbo].[xxxx] where modified_date >:sql_last_value"
}
}
filter {
mutate {
remove_field => ["@version","@timestamp"]
}
}
output {
elasticsearch {
hosts => "xxxxx"
user => "xxxxx"
password => "xxxxx"
index => "xxxxx"
document_type => "_doc"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
如何使用 Logstash 使用增量索引方法删除在 MSSQL 服务器中被删除的文档。我不知道如何处理特别是删除操作。
任何人都可以建议如何实现这一目标吗?
大家好,我可以使用以下代码处理插入、更新和删除操作。这可能对正在尝试相同的人有所帮助
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx:1433;databaseName=xxxxx;"
jdbc_user => "xxxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
tracking_column_type => "timestamp"
use_column_value => true
jdbc_password => "xxxxx"
clean_run => true
schedule => "*/1 * * * *"
statement => "Select * from [dbo].[xxxx] where modified_date >:sql_last_value"
}
}
filter {
if [is_deleted] {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
}
}
output {
elasticsearch {
hosts => "xxxxx"
user => "elastic"
password => "xxxxx"
index => "xxxxx"
action => "%{[@metadata][elasticsearch_action]}"
document_type => "_doc"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
感谢所有人,特别是克劳迪奥 M
每当插入或更新新行时,我都使用下面的 Logstash 配置进行增量索引我能够从 MSSQL 服务器获取这些特定行并将其作为文档插入到 elasticsearch 中,但挑战在于删除操作.
Logstash 配置文件
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
tracking_column_type => "timestamp"
use_column_value => true
jdbc_password => "xxxx"
clean_run => true
schedule => "*/1 * * * *"
statement => "Select * from [dbo].[xxxx] where modified_date >:sql_last_value"
}
}
filter {
mutate {
remove_field => ["@version","@timestamp"]
}
}
output {
elasticsearch {
hosts => "xxxxx"
user => "xxxxx"
password => "xxxxx"
index => "xxxxx"
document_type => "_doc"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
如何使用 Logstash 使用增量索引方法删除在 MSSQL 服务器中被删除的文档。我不知道如何处理特别是删除操作。
任何人都可以建议如何实现这一目标吗?
大家好,我可以使用以下代码处理插入、更新和删除操作。这可能对正在尝试相同的人有所帮助
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx:1433;databaseName=xxxxx;"
jdbc_user => "xxxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
tracking_column_type => "timestamp"
use_column_value => true
jdbc_password => "xxxxx"
clean_run => true
schedule => "*/1 * * * *"
statement => "Select * from [dbo].[xxxx] where modified_date >:sql_last_value"
}
}
filter {
if [is_deleted] {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
}
}
mutate {
remove_field => [ "is_deleted","@version","@timestamp" ]
}
}
}
output {
elasticsearch {
hosts => "xxxxx"
user => "elastic"
password => "xxxxx"
index => "xxxxx"
action => "%{[@metadata][elasticsearch_action]}"
document_type => "_doc"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
感谢所有人,特别是克劳迪奥 M