kafka-connect-elasticsearch:如何发送文件删除?
kafka-connect-elasticsearch: how to send deletes of documents?
我有一个如下所示的处理流:
mysql.database -> debezium-connector -> database topic -> faust.agent(stream processing to add a field) -> sink topic -> elasticsearch-sink-connector -> elasticsearch cluster
此处理流大部分都在工作,但我无法弄清楚如何处理来自数据库主题的已删除行事件。就像一行被删除一样,我希望它也从 elasticsearch 中删除。我可以在可以操纵事件的浮士德部分使用条件。有没有办法标记一个事件,以便当它被 elasticsearch-sink-connector 拾取时,它会删除给定的文档而不是添加它?我查看了文档,但看不到这方面的细节。接收器连接器是否仅用于将文档添加到索引?
查看 config for the connector,您似乎可以将 behavior.on.null.values
设置为 delete
。然后,您只需要确保针对应删除文档的键设置墓碑(空)。
Debezium 将 by default 生成用于删除的逻辑删除消息。
我有一个如下所示的处理流:
mysql.database -> debezium-connector -> database topic -> faust.agent(stream processing to add a field) -> sink topic -> elasticsearch-sink-connector -> elasticsearch cluster
此处理流大部分都在工作,但我无法弄清楚如何处理来自数据库主题的已删除行事件。就像一行被删除一样,我希望它也从 elasticsearch 中删除。我可以在可以操纵事件的浮士德部分使用条件。有没有办法标记一个事件,以便当它被 elasticsearch-sink-connector 拾取时,它会删除给定的文档而不是添加它?我查看了文档,但看不到这方面的细节。接收器连接器是否仅用于将文档添加到索引?
查看 config for the connector,您似乎可以将 behavior.on.null.values
设置为 delete
。然后,您只需要确保针对应删除文档的键设置墓碑(空)。
Debezium 将 by default 生成用于删除的逻辑删除消息。