无法使用ChainedKafkaTransaction同步Kafka和MQ事务
Unable to synchronise Kafka and MQ transactions usingChainedKafkaTransaction
我们有一个 spring 引导应用程序,它使用来自 IBM MQ 的消息进行一些转换并将结果发布到 Kafka 主题。为此,我们使用 https://spring.io/projects/spring-kafka。我知道 Kafka 不支持 XA;然而,在文档中我发现了一些关于使用 ChainedKafkaTransactionManager
链接多个事务管理器并同步事务的输入。同一文档还提供了一个示例,说明如何在从 Kafka 读取消息并将它们存储在数据库中时同步 Kafka 和数据库。
我在 se 案例中遵循相同的示例,并将 JmsTransactionManager
与 KafkaTransactionManager
链接在 ChainedKafkaTransactionManager
的保护伞下。 bean 定义如下:
@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.connectionFactory());
factory.setTransactionManager(this.jmsTransactionManager());
return factory;
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(this.connectionFactory());
}
@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}
@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Processing the message here then publishing it to Kafka using KafkaTemplate
kafkaTemplate.send(sourceTopic,transformedMessage);
// Then throw an exception just to test the transaction behaviour
throw new RuntimeException("Not good Pal!");
}
当 运行 应用程序发生的事情是他的消息不断回滚到 MQ 队列中,但消息在 Kafka 主题中不断增长,这对我来说意味着 kafkaTemplate 交互不会被回滚。
如果我根据文档很好地理解,情况应该不是这样。 "If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer."
在我们的 application.yaml 中,我们通过设置 spring.kafka.producer.transaction-id-prefix
配置 Kafka 生产者使用事务
问题是我在这里缺少什么以及我应该如何修复它。
提前感谢您的投入。
消费者默认可以看到未提交的记录;将 isolation.level
消费者 属性 设置为 read_committed
以避免从回滚事务中接收记录。
我们有一个 spring 引导应用程序,它使用来自 IBM MQ 的消息进行一些转换并将结果发布到 Kafka 主题。为此,我们使用 https://spring.io/projects/spring-kafka。我知道 Kafka 不支持 XA;然而,在文档中我发现了一些关于使用 ChainedKafkaTransactionManager
链接多个事务管理器并同步事务的输入。同一文档还提供了一个示例,说明如何在从 Kafka 读取消息并将它们存储在数据库中时同步 Kafka 和数据库。
我在 se 案例中遵循相同的示例,并将 JmsTransactionManager
与 KafkaTransactionManager
链接在 ChainedKafkaTransactionManager
的保护伞下。 bean 定义如下:
@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.connectionFactory());
factory.setTransactionManager(this.jmsTransactionManager());
return factory;
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(this.connectionFactory());
}
@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}
@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Processing the message here then publishing it to Kafka using KafkaTemplate
kafkaTemplate.send(sourceTopic,transformedMessage);
// Then throw an exception just to test the transaction behaviour
throw new RuntimeException("Not good Pal!");
}
当 运行 应用程序发生的事情是他的消息不断回滚到 MQ 队列中,但消息在 Kafka 主题中不断增长,这对我来说意味着 kafkaTemplate 交互不会被回滚。
如果我根据文档很好地理解,情况应该不是这样。 "If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer."
在我们的 application.yaml 中,我们通过设置 spring.kafka.producer.transaction-id-prefix
问题是我在这里缺少什么以及我应该如何修复它。 提前感谢您的投入。
消费者默认可以看到未提交的记录;将 isolation.level
消费者 属性 设置为 read_committed
以避免从回滚事务中接收记录。