寻找一个卡夫卡连接器将数据更新到弹性搜索

Looking for a kafka connector to upsert datas to elasticsearch

有没有可以处理这种请求的kafka连接器?

我以这种格式接收 kafka 主题中的数据(JSON 中的行数是随机的):

{
"1574922337":[{"price": 1, "product": 2], 
"1574922338":[{"price": 13, "product": 5}], 
"1574922339":[{"price": 0.2, "product": 1}]
}

我想从 kafka 连接器解析从主题接收到的这个 JSON 事件消息,以便使用 id=TIMESTAMP 一次创建 3 个文档,并且使用 UPSERT (如果 id 已经存在,我们只更新它,如果不存在,将添加它)。

请问您有解决方法或参考资料吗?

或者任何可以使用 MAVEN 轻松编译的开源连接器,都会对其进行一些更改以适应这种要求。

等待您的帮助。

好的,开始吧。正如我所建议的,通过以下方式利用 Logstash 应该可以很好地工作:

主要的 Logstash 配置如下:

input {
  kafka {
    bootstrap_servers => "..."
    topic_id => "message_topic"
    auto_offset_reset => "smallest"
    reset_beginning => true
  }
}
filter {
  mutate {
    remove_field => ["@timestamp", "@version", "host"]
  }
  ruby {
    path => "/path/to/split.rb"
  }
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}
output {
  elasticsearch {
    hosts => "https://..."
    http_compression => true
    index => "product_index"
    document_id => "%{id}"
    document_type => "type_name"
    action => "update"
    doc_as_upsert => true
  }
}

split.rb 中的 ruby 代码非常简单。所做的是遍历每个时间戳,并为时间戳指向的数组的每个元素创建一个新事件,时间戳作为 id 字段。

def register(params)
end

def filter(event)
  events = []
  event.to_hash.each do |timestamp,array|
    array.each do |sub| 
      subEvent = LogStash::Event.new(sub)
      subEvent.set('id', timestamp)
      events << subEvent
    end
  end
  return events
end

基本上,它为您上面给出的示例消息生成的内容如下:

{"id":"1574922337","product":2,"price":1}
{"id":"1574922339","product":1,"price":0.2}
{"id":"1574922338","product":5,"price":13}