如何使用 Avro 在 Spring Kafka 1.3 中移动过去不可反序列化的消息

How to move past undeserializable messages in Spring Kafka 1.3 with Avro

我无法升级到 Spring 5,所以我只能使用 spring-kafka 1.3 及其有限的错误处理。所以我无法从 spring-kafka 2.

访问 ConsumerAwareErrorHandler 或 SeekToCurrent 错误处理程序

我正在使用 @KafkaListener 注释方法来收听主题,我已将 io.confluent.kafka.serializers.KafkaAvroDeserializer 配置为我的值反序列化器。

问题是,如果我的主题中出现一条非 Avro 格式的消息,KafkaMessageListenerContainer 轮询循环就会卡住。反序列化器在消息上抛出异常,并且轮询循环从不寻找过去,因此下一次通过循环时,它会尝试反序列化相同的消息并继续循环,每秒将相同的错误转储数千次到我的日志中。

似乎没有办法获得 NeverRetryPolicy 或边缘方面的任何东西,但我可以 factory.getContainerProperties().setErrorHandler()。不幸的是,我不确定我能从那里做什么。

有没有什么东西可以自动连接到我的错误处理程序中,我可以用它来寻找错误前移 1 个偏移量?不确定那是什么,文档并没有太多谈论你可以用 ErrorHandler 做什么,我能找到的大多数例子都是针对 spring-kafka 2.X。就像,它不会反序列化,我无法对消息做任何事情,它永远不会工作,我想避免再次重试它,而且似乎大多数 Whosebug 问题都是关于做相反的事情。

我还看到一些人只是用他们自己的 class 包装来自 Avro 的 Deserializer 来处理异常和 returns null。那是更好的计划吗?

问题是反序列化在 Spring 获取数据之前就失败了 - 问题出在 Kafka 本身。

在 2.2 中,我们添加了 ErrorHandlingDeserializer2,它包装了真正的反序列化器并向侦听器容器发送信号,以便将错误发送到错误处理程序。

在旧版本中,您需要编写自己的反序列化器包装器 - 但是,容器中没有代码来处理这种情况,因此您的 catch 块需要 return 一个真正的对象向您的听众发出反序列化失败的信号。

假设您的听众收到 Invoice。你的 catch 块可以创建一个子类,比如 BadInvoice 然后你可以在你的监听器中检测到它并丢弃它。

I've also seen some folks just wrap the Deserializer from Avro with their own class that eats the exception and returns null. Is that a better plan?

如果您从未获得真正的空记录,您可以 return null,但您必须将 @Payload(required = false) 添加到方法参数中。