对使用新的 Kafka 幂等生产者防止重复感到困惑 API

Confused about preventing duplicates with new Kafka idempotent producer API

我的应用程序有 5 个以上的消费者在 kafka 主题上消费五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后进行手动_立即确认并转到下一条消息。

我正在尝试解决他们向出站主题发送成功时的情况。然后我们有一个 failure/lose 消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(

我发现 kafka 现在有幂等生产者,但从我读到的内容来看,它只保证生产者会话。

"When producer restarts, new PID gets assigned. So the idempotency is promised only for a single producer session" - (博客) - https://hevodata.com/blog/kafka-exactly-once

这对我来说似乎没什么用。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。

我有什么遗漏吗?

使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。

相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。

如果配置了 KafkaTransactionManagerChainedKafkaTransactionManager,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。

如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate(如果您使用本机 API,则为 Producer)将偏移量发送到事务。

使用消费者提交偏移量不是事务的一部分,因此事情不会按预期进行。

使用事务管理器时,侦听器容器将 Producer 绑定到线程,以便任何下游 KafkaTemplate 操作参与消费者启动的事务。参见 the documentation