附加数组 json logstash elasticsearch
append array of json logstash elasticsearch
我如何使用来自 csv
的 logstash 在带有 json 对象的 elasticsearch 上附加一个数组
csv 示例
包含行的 csv
id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2
结果应该是 2 个文档
{
"id": 1,
[{
"key1": "toto1",
"key2": "toto2"
}, {
"key1": "titi1 ",
"key2": "titi2"
}]
}
,{
"id": 2,
[{
"key1": "tata1",
"key2": "tata2"
}]
}
亲切
首先,创建您的 ES 映射,如有必要,将您的内部对象声明为嵌套对象。
{
"mappings": {
"key_container": {
"properties": {
"id": {
"type": "keyword",
"index": true
},
"keys": {
"type": "nested",
"properties": {
"key1": {
"type": "keyword",
"index": true
},
"key2": {
"type": "text",
"index": true
}
}
}
}
}
}
}
键 属性 将包含嵌套对象数组。
您可以使用 logstash 在两步中加载 csv:
- 索引(创建)仅包含 id 的基础对象 属性
- 使用包含嵌套对象数组的键属性更新基础对象
第一次logstash配置(仅相关部分):
filter {
csv {
columns => ["id","key1","key1"]
separator => ","
# Remove the keys because the will be loaded in the next hop with update
remove_field => [ "key1", "key2"]
}
# Remove the row containing the column names
if [id] == "id" {
drop { }
}
}
output {
elasticsearch {
action => "index"
document_id => "%{id}"
hosts => [ "localhost:9200" ]
index => "key_container"
}
}
第二步logstash配置(必须在elasticsearch中启用脚本):
filter {
csv {
columns => ["id","key1","key2"]
separator => ","
}
# Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
mutate{
rename => {
"key1" => "[key][key1]"
"key2" => "[key][key2]"
}
}
}
output {
elasticsearch {
action => "update"
document_id => "%{id}"
doc_as_upsert => "true"
hosts => [ "localhost:9200" ]
index => "key_container"
script_lang => "groovy"
# key_container.keys is an array of key objects
# arrays can be built only with scripts and defined as an array when we put the first element into it
script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
}
}
总而言之,您需要这两个跃点加载,因为数组创建需要仅在更新时可用的脚本。
我如何使用来自 csv
的 logstash 在带有 json 对象的 elasticsearch 上附加一个数组csv 示例
包含行的 csv
id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2
结果应该是 2 个文档
{
"id": 1,
[{
"key1": "toto1",
"key2": "toto2"
}, {
"key1": "titi1 ",
"key2": "titi2"
}]
}
,{
"id": 2,
[{
"key1": "tata1",
"key2": "tata2"
}]
}
亲切
首先,创建您的 ES 映射,如有必要,将您的内部对象声明为嵌套对象。
{
"mappings": {
"key_container": {
"properties": {
"id": {
"type": "keyword",
"index": true
},
"keys": {
"type": "nested",
"properties": {
"key1": {
"type": "keyword",
"index": true
},
"key2": {
"type": "text",
"index": true
}
}
}
}
}
}
}
键 属性 将包含嵌套对象数组。
您可以使用 logstash 在两步中加载 csv:
- 索引(创建)仅包含 id 的基础对象 属性
- 使用包含嵌套对象数组的键属性更新基础对象
第一次logstash配置(仅相关部分):
filter {
csv {
columns => ["id","key1","key1"]
separator => ","
# Remove the keys because the will be loaded in the next hop with update
remove_field => [ "key1", "key2"]
}
# Remove the row containing the column names
if [id] == "id" {
drop { }
}
}
output {
elasticsearch {
action => "index"
document_id => "%{id}"
hosts => [ "localhost:9200" ]
index => "key_container"
}
}
第二步logstash配置(必须在elasticsearch中启用脚本):
filter {
csv {
columns => ["id","key1","key2"]
separator => ","
}
# Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
mutate{
rename => {
"key1" => "[key][key1]"
"key2" => "[key][key2]"
}
}
}
output {
elasticsearch {
action => "update"
document_id => "%{id}"
doc_as_upsert => "true"
hosts => [ "localhost:9200" ]
index => "key_container"
script_lang => "groovy"
# key_container.keys is an array of key objects
# arrays can be built only with scripts and defined as an array when we put the first element into it
script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
}
}
总而言之,您需要这两个跃点加载,因为数组创建需要仅在更新时可用的脚本。