寻找一个卡夫卡连接器将数据更新到弹性搜索
Looking for a kafka connector to upsert datas to elasticsearch
有没有可以处理这种请求的kafka连接器?
我以这种格式接收 kafka 主题中的数据(JSON 中的行数是随机的):
{
"1574922337":[{"price": 1, "product": 2],
"1574922338":[{"price": 13, "product": 5}],
"1574922339":[{"price": 0.2, "product": 1}]
}
我想从 kafka 连接器解析从主题接收到的这个 JSON 事件消息,以便使用 id=TIMESTAMP
一次创建 3 个文档,并且使用 UPSERT
(如果 id
已经存在,我们只更新它,如果不存在,将添加它)。
请问您有解决方法或参考资料吗?
或者任何可以使用 MAVEN 轻松编译的开源连接器,都会对其进行一些更改以适应这种要求。
等待您的帮助。
好的,开始吧。正如我所建议的,通过以下方式利用 Logstash 应该可以很好地工作:
- a
kafka
input 阅读消息
- a
ruby
filter 将消息分割成多个事件
elasticsearch
output 执行更新插入
主要的 Logstash 配置如下:
input {
kafka {
bootstrap_servers => "..."
topic_id => "message_topic"
auto_offset_reset => "smallest"
reset_beginning => true
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version", "host"]
}
ruby {
path => "/path/to/split.rb"
}
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output {
elasticsearch {
hosts => "https://..."
http_compression => true
index => "product_index"
document_id => "%{id}"
document_type => "type_name"
action => "update"
doc_as_upsert => true
}
}
split.rb
中的 ruby 代码非常简单。所做的是遍历每个时间戳,并为时间戳指向的数组的每个元素创建一个新事件,时间戳作为 id
字段。
def register(params)
end
def filter(event)
events = []
event.to_hash.each do |timestamp,array|
array.each do |sub|
subEvent = LogStash::Event.new(sub)
subEvent.set('id', timestamp)
events << subEvent
end
end
return events
end
基本上,它为您上面给出的示例消息生成的内容如下:
{"id":"1574922337","product":2,"price":1}
{"id":"1574922339","product":1,"price":0.2}
{"id":"1574922338","product":5,"price":13}
有没有可以处理这种请求的kafka连接器?
我以这种格式接收 kafka 主题中的数据(JSON 中的行数是随机的):
{
"1574922337":[{"price": 1, "product": 2],
"1574922338":[{"price": 13, "product": 5}],
"1574922339":[{"price": 0.2, "product": 1}]
}
我想从 kafka 连接器解析从主题接收到的这个 JSON 事件消息,以便使用 id=TIMESTAMP
一次创建 3 个文档,并且使用 UPSERT
(如果 id
已经存在,我们只更新它,如果不存在,将添加它)。
请问您有解决方法或参考资料吗?
或者任何可以使用 MAVEN 轻松编译的开源连接器,都会对其进行一些更改以适应这种要求。
等待您的帮助。
好的,开始吧。正如我所建议的,通过以下方式利用 Logstash 应该可以很好地工作:
- a
kafka
input 阅读消息 - a
ruby
filter 将消息分割成多个事件 elasticsearch
output 执行更新插入
主要的 Logstash 配置如下:
input {
kafka {
bootstrap_servers => "..."
topic_id => "message_topic"
auto_offset_reset => "smallest"
reset_beginning => true
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version", "host"]
}
ruby {
path => "/path/to/split.rb"
}
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output {
elasticsearch {
hosts => "https://..."
http_compression => true
index => "product_index"
document_id => "%{id}"
document_type => "type_name"
action => "update"
doc_as_upsert => true
}
}
split.rb
中的 ruby 代码非常简单。所做的是遍历每个时间戳,并为时间戳指向的数组的每个元素创建一个新事件,时间戳作为 id
字段。
def register(params)
end
def filter(event)
events = []
event.to_hash.each do |timestamp,array|
array.each do |sub|
subEvent = LogStash::Event.new(sub)
subEvent.set('id', timestamp)
events << subEvent
end
end
return events
end
基本上,它为您上面给出的示例消息生成的内容如下:
{"id":"1574922337","product":2,"price":1}
{"id":"1574922339","product":1,"price":0.2}
{"id":"1574922338","product":5,"price":13}