如何忽略 Kafka Connect Elasticsearch 中的错误结果

How to ignore error result in Kafka Connect Elasticsearch

我正在尝试 运行 kafka connect for elastic search 。 但是由于一些错误,我在kafka主题中输入了错误的记录。

现在我解决了这个问题并插入了正确的值,但是弹性搜索仍然在主题中的先前记录上抛出错误

这里是错误

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lambdaDemo0': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"lambdaDemo0-9749-0e710000fd04"; line: 1, column: 13]

有什么方法可以忽略主题中的旧记录并告诉 kafka connect 选择最新记录? 我正在尝试删除主题,我将主题标记为删除,但主题中仍有记录。

我尝试了以下两个属性,但似乎确实有效

drop.invalid.message=true
behavior.on.malformed.documents=ignore

请指教如何清理题目中的错误记录

您可以告诉 Kafka Connect 跳过坏记录

errors.tolerance = all

您可以选择将这些消息路由到另一个主题(称为死信队列)进行检查,方法是添加

errors.tolerance = all
errors.deadletterqueue.topic.name = my-dlq-topic

这些设置对 Kafka Connect 有效,任何连接器在 serialisation/deserialisation 处理阶段失败。有关详细信息,请参阅 this article