使用 Logstash 批量创建和批量更新 ElasticSearch 索引文档

Bulk Creating & Bulk Updating ElasticSearch Index Document using Logstash

文档未更新,而是新建而不是更新文档。

COMPANY_ID 是一个唯一的列..

样本数据

COMPANY_NAME,LOGO_EXT,COMPANY_ID

ABC有限公司,JPG,ABC000001

XYZ 有限公司,PNG,ABC000002

AAA LLC,ABC000003

我可以创建索引和文档。

问题是当我更新索引时,文档正在创建而不是被更新。 例如。 之前 ABC有限公司,JPG,ABC000001

之后 ABCD有限公司,JPG,ABC000001

因此只有 COMPANY_NAME 应该更新。

1.使用以下代码成功创建索引:-

BAT 文件

cd C:\logstash-7.3.1\bin logstash -f C:\logstash.conf

C:\logstash.conf 文件

input {  
    jdbc {  
        jdbc_driver_library => "C:\sqljdbc_7.4\enu\mssql-jdbc-7.4.1.jre8.jar"  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"  
        jdbc_connection_string => "jdbc:sqlserver://;user=;password=;"  
        jdbc_user => ""  
        jdbc_password => ""  
        statement => "SELECT COMPANY_NAME,LOGO_EXT,COMPANY_ID from dbo.CompanyMaster WITH(NOLOCK) ORDER BY COMPANY_NAME"  
    }  
}  
filter {}  
output {  
    stdout {  
        codec => json_lines  
    }  
    elasticsearch {  
        hosts => "http://localhost:9200"  
        index => "companylistindex"  
        document_id => "%{COMPANY_ID}"
        action => index
    }  
}  

2。更新代码

input {  
    jdbc {  
        jdbc_driver_library => "C:\sqljdbc_7.4\enu\mssql-jdbc-7.4.1.jre8.jar"  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"  
        jdbc_connection_string => "jdbc:sqlserver://;user=;password=;"  
        jdbc_user => ""  
        jdbc_password => ""  
        statement => "SELECT COMPANY_NAME,LOGO_EXT,COMPANY_ID from dbo.CompanyMaster WITH(NOLOCK) WHERE ModifiedOn>'2019-11-01'"  
    }  
}  
filter {}  
output {  
    stdout {  
        codec => json_lines  
    }  
    elasticsearch {  
        hosts => "http://localhost:9200"  
        index => "companylistindex"  
        document_id => "%{COMPANY_ID}"
    }  
}

请帮我更新文档

注意:如果 COMPANY_NAME 或 LOGO_EXT 不同,则需要更新。 COMPANY_ID 是唯一列。

除了@Polynomial Proton 建议的评论之外,您不再需要 2 输出部分。像下面这样的 1 部分就可以了:

output {  
    stdout {  
        codec => json_lines  
    }  
    elasticsearch {  
        hosts => "http://localhost:9200"  
        index => "companylistindex"  
        document_id => "%{COMPANY_ID}"
        action => "update"
        doc_as_upsert => "true"
    }  
}

这将负责索引和更新。