附加数组 json logstash elasticsearch

append array of json logstash elasticsearch

我如何使用来自 csv

的 logstash 在带有 json 对象的 elasticsearch 上附加一个数组

csv 示例

包含行的 csv

id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2

结果应该是 2 个文档

{
    "id": 1,
    [{
        "key1": "toto1",
        "key2": "toto2"
    }, {
        "key1": "titi1 ",
        "key2": "titi2"
    }]
}
,{
    "id": 2,
    [{
        "key1": "tata1",
        "key2": "tata2"
    }]
}

亲切

首先,创建您的 ES 映射,如有必要,将您的内部对象声明为嵌套对象。

{
 "mappings": {
    "key_container": {
      "properties": {
        "id": {
          "type": "keyword",
          "index": true
        },
        "keys": {
          "type": "nested",
          "properties": {
            "key1": {
              "type": "keyword",
              "index": true
            },
            "key2": {
              "type": "text",
              "index": true
            }
          }
        }
      }
    }
  }
 }

键 属性 将包含嵌套对象数组。

您可以使用 logstash 在两步中加载 csv:

  1. 索引(创建)仅包含 id 的基础对象 属性
  2. 使用包含嵌套对象数组的键属性更新基础对象

第一次logstash配置(仅相关部分):

filter {
    csv {
        columns => ["id","key1","key1"]
        separator => ","
        # Remove the keys because the will be loaded in the next hop with update
        remove_field => [ "key1", "key2"]
    }
    # Remove the row containing the column names
    if [id] == "id" {
        drop { }
    }
}
output {
    elasticsearch {
        action => "index"
        document_id => "%{id}"
        hosts => [ "localhost:9200" ]
        index => "key_container"
    }
}

第二步logstash配置(必须在elasticsearch中启用脚本):

filter {
    csv {
        columns => ["id","key1","key2"]
        separator => ","
    }
    # Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
    mutate{
        rename => {
            "key1" => "[key][key1]"
            "key2" => "[key][key2]"
        }
    }
}
output {
    elasticsearch {
        action => "update"
        document_id => "%{id}"
        doc_as_upsert => "true"
        hosts => [ "localhost:9200" ]
        index => "key_container"
        script_lang => "groovy"
        # key_container.keys is an array of key objects
        # arrays can be built only with scripts and defined as an array when we put the first element into it
        script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
    }
}

总而言之,您需要这两个跃点加载,因为数组创建需要仅在更新时可用的脚本。