对使用新的 Kafka 幂等生产者防止重复感到困惑 API
Confused about preventing duplicates with new Kafka idempotent producer API
我的应用程序有 5 个以上的消费者在 kafka 主题上消费五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后进行手动_立即确认并转到下一条消息。
我正在尝试解决他们向出站主题发送成功时的情况。然后我们有一个 failure/lose 消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(
我发现 kafka 现在有幂等生产者,但从我读到的内容来看,它只保证生产者会话。
"When producer restarts, new PID gets assigned. So the idempotency is promised only for a single producer session" - (博客) - https://hevodata.com/blog/kafka-exactly-once
这对我来说似乎没什么用。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。
我有什么遗漏吗?
使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。
相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。
如果配置了 KafkaTransactionManager
或 ChainedKafkaTransactionManager
,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。
如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate
(如果您使用本机 API,则为 Producer
)将偏移量发送到事务。
使用消费者提交偏移量不是事务的一部分,因此事情不会按预期进行。
使用事务管理器时,侦听器容器将 Producer
绑定到线程,以便任何下游 KafkaTemplate
操作参与消费者启动的事务。参见 the documentation。
我的应用程序有 5 个以上的消费者在 kafka 主题上消费五个分区。(使用 kafka 版本 11)我的消费者每个都向另一个主题产生一条消息,然后将一些状态保存到数据库中,然后进行手动_立即确认并转到下一条消息。
我正在尝试解决他们向出站主题发送成功时的情况。然后我们有一个 failure/lose 消费者。当另一个消费者接管分区时,它将向出站主题发出另一条消息。这很糟糕:(
我发现 kafka 现在有幂等生产者,但从我读到的内容来看,它只保证生产者会话。
"When producer restarts, new PID gets assigned. So the idempotency is promised only for a single producer session" - (博客) - https://hevodata.com/blog/kafka-exactly-once
这对我来说似乎没什么用。在我的用例中,重点是当我在另一个消费者上重播消息时,它不会复制出站消息。
我有什么遗漏吗?
使用事务时,您不应使用任何基于消费者的机制(手动或其他方式)来提交偏移量。
相反,您使用生产者将偏移量发送到事务,因此偏移量提交是事务的一部分。
如果配置了 KafkaTransactionManager
或 ChainedKafkaTransactionManager
,Spring 侦听器容器将在侦听器正常退出时将偏移量发送到事务。
如果您不使用 Kafka 事务管理器,则需要使用 KafkaTemplate
(如果您使用本机 API,则为 Producer
)将偏移量发送到事务。
使用消费者提交偏移量不是事务的一部分,因此事情不会按预期进行。
使用事务管理器时,侦听器容器将 Producer
绑定到线程,以便任何下游 KafkaTemplate
操作参与消费者启动的事务。参见 the documentation。