从 Xpath 创建嵌套字段以检查现有文档

Created nested fields from Xpath & check for existing documents

我有两个问题;

  1. 正在解析 xml 数据并将其添加到索引中记录的数组中

  2. 检查索引中的现有记录,如果存在,则将该记录的新数据添加到现有记录的数组中

我有一个 jdbc 输入,其中有一个 xml 列,

input {
  jdbc {
    ....
    statement => "SELECT event_xml....
  }
}

然后是 xml 过滤器来解析数据, 我如何使最后 3 个 xpath 成为一个数组?我需要 mutate 或 ruby 过滤器吗?我好像想不通

filter {  
  xml {       
    source => "event_xml"              
    remove_namespaces => true 
    store_xml => false
    force_array => false
    xpath => [ "/CaseNumber/text()", "case_number" ]
    xpath => [ "/FormName/text()", "[conversations][form_name]" ]
    xpath => [ "/EventDate/text()", "[conversations][event_date]" ]
    xpath => [ "/CaseNote/text()", "[conversations][case_note]" ]
  }
}

所以在 Elastic 搜索中它会像这样。

{
    "case_number" : "12345",
    "conversations" :
        [
            {
                "form_name" : "form1",
                "event_date" : "2019-01-09T00:00:00Z",
                "case_note" : "this is a case note"
            }
        ]                
}

所以第二个问题是,如果已经有唯一的 case_number“12345”,而不是为此创建新记录,请将新的 xml 值添加到对话数组。所以它看起来像这样

{
    "case_number" : "12345",
    "conversations" : [
        {
            "form_name" : "form1",
            "event_date" : "2019-01-09T00:00:00Z",
            "case_note" : "this is a case note"
        },
        {
            "form_name" : "form2",
            "event_date" : "2019-05-09T00:00:00Z",
            "case_note" : "this is another case note"
        }
    ]                
}

我的输出过滤器

output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "cases"  
        manage_template => false
      }
    }

这可能吗?谢谢

这个 ruby 过滤器创建了数组

ruby {
    code => '
        event.set("conversations", [Hash[
          "publish_event_id", event.get("publish_event_id"),
          "form_name", event.get("form_name"),
          "event_date", event.get("event_date"),
          "case_note", event.get("case_note")
        ]])
      '
  }

因为输出是由

解决的
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "cases"  
    document_id => "%{case_number}"
    action => "update"
    doc_as_upsert => true
    script => "     
                boolean recordExists = false;                                                        
                for (int i = 0; i < ctx._source.conversations.length; i++) 
                {                  
                    if(ctx._source.conversations[i].publish_event_id == params.event.get('conversations')[0].publish_event_id)
                    {
                        recordExists = true;
                    }                  
                }     
                if(!recordExists){
                    ctx._source.conversations.add(params.event.get('conversations')[0]); 
                }
              "
    manage_template => false
  }
}