使用 Kafka Connect 更新现有文档上的 Elasticsearch 字段而不是创建新的

Use Kafka Connect to update Elasticsearch field on existing document instead of creating new

我有 Kafka 设置 运行 Elasticsearch 连接器,我正在根据特定主题的传入消息成功地将新文档索引到 ES 索引中。

但是,根据关于另一个主题的传入消息,我需要将数据附加到同一索引中特定文档的字段。

下面的伪模式:

{
   "_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "title": "A title",
   "body": "A body",
   "created_at": 164584548,
   "views": []
}

^ 根据上​​述主题中的数据,此文档正在 ES 中正常创建。

但是,我如何使用来自另一个主题的消息将项目添加到 views 字段。像这样:

article-view 主题架构:

{
   "article_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "user_id": 123456,
   "timestamp: 136389734
}

而不是简单地在 article-view 索引(我什至不想拥有)上创建一个新文档。它将此附加到文章文档的 views 字段,相应的 _id 等于消息中的 article_id

所以一条消息后的最终结果将是:

{
   "_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "title": "A title",
   "body": "A body",
   "created_at": 164584548,
   "views": [
       {
           "user_id": 123456,
           "timestamp: 136389734
       }
   ]
}

使用 ES API 可以使用脚本。像这样:

{
    "script": {
        "lang": "painless",
        "params": {
            "newItems": [{
                "timestamp": 136389734,
                "user_id": 123456
            }]
        },
        "source": "ctx._source.views.addAll(params.newItems)"
    }
}

我可以像上面那样动态批量生成脚本,然后使用ESPython库中的helpers.bulk函数,通过这种方式批量更新文档。

Kafka Connect / Elasticsearch 可以吗?我没有在 Confluent 的网站上找到任何文档来解释如何执行此操作。

这似乎是一个相当标准的要求,也是人们需要使用 Kafka / 像 ES 这样的接收器连接器做的一件显而易见的事情。

谢谢!

编辑:使用 write.method=upsert ()

可以进行部分更新

Elasticsearch 连接器不支持此功能。您可以就地更新文档,但需要发送完整的文档,而不是用于附加的增量,我认为这是您所追求的。