在 Confluent Elasticsearch Connector 中为空值时避免覆盖字段

Avoid overwriting fields when for null values in Confluent Elasticsearch Connector

我有一个扩充管道,它更新动态数量的字段,写入 Kafka,然后发送到 Elasticsearch。我们正在使用 Confluent Elasticsearch Connector.

例如,如果发送到 ES 连接器的第一条记录如下:

{id: 1, name: "Bob", age: null}

丰富的记录类似于:

{id: 1, name: null, age: 34}

我希望 Elasticsearch 中的结果记录为:

{id: 1, name: "Bob", age: 34}

丰富记录必须具有空值(即在我们上面的示例中 name: null)而不是根本不设置键的原因是它来自 Avro 数据和我们的模式列表几个字段是可选的。由于浓缩管道正在更新动态数量的字段,这似乎是最直接的解决方案(即,可能更新一条记录中的 name 字段,但更新另一条记录中的 age 字段)。由于可选的 Avro 字段默认为 null,这就是我们的空值的来源。

我尝试了 write.method=upsert 设置 ,但这似乎仍然会覆盖所有具有 null 作为丰富记录值的字段。 IE。根据上面的示例,ES 中的结果记录看起来像 {id: 1, name: null, age: 34}。上面链接的 post 似乎通过为单个记录类型设置多个 Avro 模式来解决这个问题,这对我们不起作用,因为它增加了太多的复杂性。

我注意到 ES 连接器也有 behavior.on.null.values 的设置,但我的理解是,这是针对整个记录 null 而不是单个字段的情况。

Confluent ES Sink Connector 中是否有类似 nullToUnset in the Datastax C* Connector 的设置?

如果没有,有什么好的实现方法吗?

相关的代码行在这里: https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java#L170

这基本上意味着源文档按原样发送到索引 - 没有修改。

您最好的选择可能是添加一个 SMT 来读取源文档并删除任何具有空值的字段。