我们正在使用 Spring Cloud Stream for Kafka,我们正在寻找与消费者 API 的 Exactly Once 语义
We are using Spring Cloud Stream for Kafka and we are looking for Exactly Once Semantics with consumer API
我们正在使用 Spring Cloud Stream for Kafka 并寻找 Exactly Once 语义。
我们有一个解决方案,可以正常工作
1)从生产者启用幂等和交易
2)使用MetaDataStore通过键(offsetId + partitionId + topicName)检查来自消费者端的重复消息
使用上述解决方案,我们没有任何消息丢失,也没有重复处理
但现在我们发现有一个 属性 (producer.sendOffsetsToTransaction
) Kafka API 可以帮助我们修复消费者端的重复处理,而无需任何元数据存储逻辑。现在我不确定如何使用 spring 云流和 属性 .sendOffsetsToTransaction
如果您将 KafkaTransactionManager
添加到应用程序上下文,它会由框架自动处理。
您必须在配置中添加交易 ID 前缀。
spring.kafka.producer.transaction-id-prefix
并且Boot会自动添加一个事务管理器。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
监听器容器在监听器正常退出时,在提交事务之前将偏移量发送给事务。
我们正在使用 Spring Cloud Stream for Kafka 并寻找 Exactly Once 语义。 我们有一个解决方案,可以正常工作 1)从生产者启用幂等和交易 2)使用MetaDataStore通过键(offsetId + partitionId + topicName)检查来自消费者端的重复消息 使用上述解决方案,我们没有任何消息丢失,也没有重复处理
但现在我们发现有一个 属性 (producer.sendOffsetsToTransaction
) Kafka API 可以帮助我们修复消费者端的重复处理,而无需任何元数据存储逻辑。现在我不确定如何使用 spring 云流和 属性 .sendOffsetsToTransaction
如果您将 KafkaTransactionManager
添加到应用程序上下文,它会由框架自动处理。
您必须在配置中添加交易 ID 前缀。
spring.kafka.producer.transaction-id-prefix
并且Boot会自动添加一个事务管理器。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
监听器容器在监听器正常退出时,在提交事务之前将偏移量发送给事务。