使用 Spring Kafka 处理有关死信主题的消息
Handling messages on a dead letter topic using Spring Kafka
我们目前通过使用 Spring Kafka(在 Spring 启动应用程序中)进行了死信主题 (DLT) 配置。我们在 SeekToCurrentErrorHandler
中使用 DeadLetterPublishingRecoverer
。我们将后一个分配给 ConcurrentKafkaListenerContainerFactory
.
在处理我们的第一条消息时;由于我们服务中的一个愚蠢错误,我们最终出现了一些 NullPointerException
异常,并且 20% 的消息最终出现在 DLT 上(这是预期的行为,对我们来说是完美的)。
我们修复了错误,但现在我们想再次处理那 20% 的消息。我们看到的可能性:
- 编写一个小应用程序,将消息从 DLT 复制到原始主题
- 在我们的应用程序中添加第二个
@KafkaEventListener
,它从 DLT 中读取
解决方案 2 是我的首选解决方案,因为将其移回原始主题也意味着其他消费者群体再次收到消息(通常应该没问题,因为我们所有的服务都是幂等的)。
我想知道是否有其他最佳实践可以解决这个问题。
如果没有,我也想知道如何动态地 activate/deactive DLT 的 @KafkaEventListener
(因为你不想让这个监听器一直运行)
感谢您的反馈!
约臣
我认为解决方案 2 很完美。
I was also wondering how I can dynamically activate/deactive the
@KafkaEventListener for the DLT (as you don't want to have this
listener all the time up)
您可以使用自 2.2 以来引入的@KafkaListener 属性 autoStartup。
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "123", topics = "XXX.DLT", autoStartup = "true"){
//do your processing
}
//After you are done
registry.getListenerContainer("123").stop();
我们目前通过使用 Spring Kafka(在 Spring 启动应用程序中)进行了死信主题 (DLT) 配置。我们在 SeekToCurrentErrorHandler
中使用 DeadLetterPublishingRecoverer
。我们将后一个分配给 ConcurrentKafkaListenerContainerFactory
.
在处理我们的第一条消息时;由于我们服务中的一个愚蠢错误,我们最终出现了一些 NullPointerException
异常,并且 20% 的消息最终出现在 DLT 上(这是预期的行为,对我们来说是完美的)。
我们修复了错误,但现在我们想再次处理那 20% 的消息。我们看到的可能性:
- 编写一个小应用程序,将消息从 DLT 复制到原始主题
- 在我们的应用程序中添加第二个
@KafkaEventListener
,它从 DLT 中读取
解决方案 2 是我的首选解决方案,因为将其移回原始主题也意味着其他消费者群体再次收到消息(通常应该没问题,因为我们所有的服务都是幂等的)。
我想知道是否有其他最佳实践可以解决这个问题。
如果没有,我也想知道如何动态地 activate/deactive DLT 的 @KafkaEventListener
(因为你不想让这个监听器一直运行)
感谢您的反馈!
约臣
我认为解决方案 2 很完美。
I was also wondering how I can dynamically activate/deactive the @KafkaEventListener for the DLT (as you don't want to have this listener all the time up)
您可以使用自 2.2 以来引入的@KafkaListener 属性 autoStartup。
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "123", topics = "XXX.DLT", autoStartup = "true"){
//do your processing
}
//After you are done
registry.getListenerContainer("123").stop();