使用 Kafka Connect 更新现有文档上的 Elasticsearch 字段而不是创建新的
Use Kafka Connect to update Elasticsearch field on existing document instead of creating new
我有 Kafka 设置 运行 Elasticsearch 连接器,我正在根据特定主题的传入消息成功地将新文档索引到 ES 索引中。
但是,根据关于另一个主题的传入消息,我需要将数据附加到同一索引中特定文档的字段。
下面的伪模式:
{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": []
}
^ 根据上述主题中的数据,此文档正在 ES 中正常创建。
但是,我如何使用来自另一个主题的消息将项目添加到 views
字段。像这样:
article-view
主题架构:
{
"article_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"user_id": 123456,
"timestamp: 136389734
}
而不是简单地在 article-view
索引(我什至不想拥有)上创建一个新文档。它将此附加到文章文档的 views
字段,相应的 _id
等于消息中的 article_id
。
所以一条消息后的最终结果将是:
{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": [
{
"user_id": 123456,
"timestamp: 136389734
}
]
}
使用 ES API 可以使用脚本。像这样:
{
"script": {
"lang": "painless",
"params": {
"newItems": [{
"timestamp": 136389734,
"user_id": 123456
}]
},
"source": "ctx._source.views.addAll(params.newItems)"
}
}
我可以像上面那样动态批量生成脚本,然后使用ESPython库中的helpers.bulk
函数,通过这种方式批量更新文档。
Kafka Connect / Elasticsearch 可以吗?我没有在 Confluent 的网站上找到任何文档来解释如何执行此操作。
这似乎是一个相当标准的要求,也是人们需要使用 Kafka / 像 ES 这样的接收器连接器做的一件显而易见的事情。
谢谢!
编辑:使用 write.method=upsert ()
可以进行部分更新
Elasticsearch 连接器不支持此功能。您可以就地更新文档,但需要发送完整的文档,而不是用于附加的增量,我认为这是您所追求的。
我有 Kafka 设置 运行 Elasticsearch 连接器,我正在根据特定主题的传入消息成功地将新文档索引到 ES 索引中。
但是,根据关于另一个主题的传入消息,我需要将数据附加到同一索引中特定文档的字段。
下面的伪模式:
{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": []
}
^ 根据上述主题中的数据,此文档正在 ES 中正常创建。
但是,我如何使用来自另一个主题的消息将项目添加到 views
字段。像这样:
article-view
主题架构:
{
"article_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"user_id": 123456,
"timestamp: 136389734
}
而不是简单地在 article-view
索引(我什至不想拥有)上创建一个新文档。它将此附加到文章文档的 views
字段,相应的 _id
等于消息中的 article_id
。
所以一条消息后的最终结果将是:
{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": [
{
"user_id": 123456,
"timestamp: 136389734
}
]
}
使用 ES API 可以使用脚本。像这样:
{
"script": {
"lang": "painless",
"params": {
"newItems": [{
"timestamp": 136389734,
"user_id": 123456
}]
},
"source": "ctx._source.views.addAll(params.newItems)"
}
}
我可以像上面那样动态批量生成脚本,然后使用ESPython库中的helpers.bulk
函数,通过这种方式批量更新文档。
Kafka Connect / Elasticsearch 可以吗?我没有在 Confluent 的网站上找到任何文档来解释如何执行此操作。
这似乎是一个相当标准的要求,也是人们需要使用 Kafka / 像 ES 这样的接收器连接器做的一件显而易见的事情。
谢谢!
编辑:使用 write.method=upsert (
Elasticsearch 连接器不支持此功能。您可以就地更新文档,但需要发送完整的文档,而不是用于附加的增量,我认为这是您所追求的。