kafka-connect-elasticsearch:当使用 "write.method" 作为 upsert 时,是否可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?
kafka-connect-elasticsearch: When using "write.method" as upsert, is it possible to use same AVRO object on kafka topic to send partial document?
我正在尝试对 Elasticsearch (ES) kafka 连接器使用 "write.method" upsert。在我的 kafka 流应用程序中,我正在编写我想要插入的文档,关于 ES 连接器配置为读取的 kafka 主题。我在这个主题上使用 avro 对象作为 kafka 值。我文档的 AVRO 定义如下所示:
{
"type": "record",
"name": "Document",
"fields": [
{
"name": "id",
"type": ["null", "string"],
},
{
"name": "name",
"type": ["null", "string"]
},
{
"name": "address",
"type": ["null", "string"]
}
]
}
文档有时仅包含 ID 和名称,有时仅包含地址。当我只发送地址时,id 和名称会被覆盖,反之亦然。我已将 behavior.on.null.values
设置为 ignore
,希望 ES 连接器会忽略 null id 和 name 值,但这并没有按预期工作。
尽管当我在我的 kafka 主题上使用两个不同的 AVRO 对象时,第一个仅包含 id 和名称,另一个仅包含地址,upsert 模式行为符合预期。但是对于同一个kafka主题允许多个AVRO对象定义,我需要将主题的兼容模式设置为NONE,这并不理想。
解决手头问题的正确方法是什么?
设置 behavior.on.null.values = ignore
只是告诉连接器,如果它收到一条消息,其中 整个消息 为空,则忽略该消息(其他选项将失败,或者删除 Elasticsearch 中匹配消息的键和空值的目标文档,即墓碑消息)。
连接器不支持您描述的部分更新行为。它可以insert/update/delete,但只能是整个文档
如果你想要部分更新插入行为,那么你需要自己实现,要么在自定义连接器中,要么通过在你的 Kafka Streams 应用程序中存储状态,以便能够在每次出现增量时发出完整的记录.
可以使用 write.method=upsert
进行部分更新
我正在尝试对 Elasticsearch (ES) kafka 连接器使用 "write.method" upsert。在我的 kafka 流应用程序中,我正在编写我想要插入的文档,关于 ES 连接器配置为读取的 kafka 主题。我在这个主题上使用 avro 对象作为 kafka 值。我文档的 AVRO 定义如下所示:
{
"type": "record",
"name": "Document",
"fields": [
{
"name": "id",
"type": ["null", "string"],
},
{
"name": "name",
"type": ["null", "string"]
},
{
"name": "address",
"type": ["null", "string"]
}
]
}
文档有时仅包含 ID 和名称,有时仅包含地址。当我只发送地址时,id 和名称会被覆盖,反之亦然。我已将 behavior.on.null.values
设置为 ignore
,希望 ES 连接器会忽略 null id 和 name 值,但这并没有按预期工作。
尽管当我在我的 kafka 主题上使用两个不同的 AVRO 对象时,第一个仅包含 id 和名称,另一个仅包含地址,upsert 模式行为符合预期。但是对于同一个kafka主题允许多个AVRO对象定义,我需要将主题的兼容模式设置为NONE,这并不理想。
解决手头问题的正确方法是什么?
设置 behavior.on.null.values = ignore
只是告诉连接器,如果它收到一条消息,其中 整个消息 为空,则忽略该消息(其他选项将失败,或者删除 Elasticsearch 中匹配消息的键和空值的目标文档,即墓碑消息)。
连接器不支持您描述的部分更新行为。它可以insert/update/delete,但只能是整个文档
如果你想要部分更新插入行为,那么你需要自己实现,要么在自定义连接器中,要么通过在你的 Kafka Streams 应用程序中存储状态,以便能够在每次出现增量时发出完整的记录.
可以使用 write.method=upsert