Spring 路由器子流的集成事务
Spring Integration transaction for subflow of router
我有一个名为“creationChannel”的频道,它使用 MongoMessageStore 进行备份,如下所示:
@Bean
ChannelMessageStore messageStore() {
return new MongoDbChannelMessageStore(mongoDatabaseFactory);
}
@Bean
PollableChannel creationChannel(ChannelMessageStore messageStore) {
return MessageChannels.queue("creationChannel", messageStore, "create").get();
}
我想在我的流程中使用它,但我想确定,如果“createOrderHandler”工作正常,来自那里的消息将是只读的(这同样适用于“updateOrderHandler”,但不同的频道)。
...some code here...
.<HeadersElement, OperationType>route(
payload -> route(payload),
spec -> spec
.transactional(transactionHandleMessageAdvice)
.subFlowMapping(
OperationType.New,
sf -> sf
.channel("creationChannel")
.log(Level.DEBUG, "Creation of a new order", Message::getPayload)
.transform(Mapper::mapCreate)
.handle(createOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
.subFlowMapping(
OperationType.Update,
sf -> sf
.channel("updateChannel")
.log(Level.DEBUG, "Update for existing order", Message::getPayload)
.transform(Mapper::mapUpdate)
.handle(updateOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
)
...some code here...
我试过这样配置“transactionHandleMessageAdvice”:
@Bean
TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
return new TransactionHandleMessageAdvice(transactionManager, new Properties());
}
但是在处理程序因异常而失败后,消息仍在从数据库中删除。
也许我应该为子流配置 Poller 并以某种方式使用 MongoTransactionManager 配置它?
Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?
这是正确的假设。只要线程在流中移动(例如您的 PollableChannel creationChannel
),当前事务就会在消息放入存储区时提交。当前线程中没有更多的事情发生,因此,您从那个 .transactional(transactionHandleMessageAdvice)
.
开始的当前事务
要使阅读具有事务性,您确实必须在 .transform(Mapper::mapCreate)
端点上配置 Poller
。因此,在您再次切换到不同线程之前,来自该队列通道的每个轮询都将是事务性的。
没有办法(也不能)让整个异步流程都具有事务性,因为事务与 ThreadLocal
相关联,并且在调用堆栈返回事务发起者的那一刻,它是提交或回滚。使用异步逻辑,我们只是打算从生产者端“发送并忘记”,让消费者在数据准备好时处理数据。这不是事务的设计目的。
我有一个名为“creationChannel”的频道,它使用 MongoMessageStore 进行备份,如下所示:
@Bean
ChannelMessageStore messageStore() {
return new MongoDbChannelMessageStore(mongoDatabaseFactory);
}
@Bean
PollableChannel creationChannel(ChannelMessageStore messageStore) {
return MessageChannels.queue("creationChannel", messageStore, "create").get();
}
我想在我的流程中使用它,但我想确定,如果“createOrderHandler”工作正常,来自那里的消息将是只读的(这同样适用于“updateOrderHandler”,但不同的频道)。
...some code here...
.<HeadersElement, OperationType>route(
payload -> route(payload),
spec -> spec
.transactional(transactionHandleMessageAdvice)
.subFlowMapping(
OperationType.New,
sf -> sf
.channel("creationChannel")
.log(Level.DEBUG, "Creation of a new order", Message::getPayload)
.transform(Mapper::mapCreate)
.handle(createOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
.subFlowMapping(
OperationType.Update,
sf -> sf
.channel("updateChannel")
.log(Level.DEBUG, "Update for existing order", Message::getPayload)
.transform(Mapper::mapUpdate)
.handle(updateOrderHandler,
handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
)
)
...some code here...
我试过这样配置“transactionHandleMessageAdvice”:
@Bean
TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
return new TransactionHandleMessageAdvice(transactionManager, new Properties());
}
但是在处理程序因异常而失败后,消息仍在从数据库中删除。
也许我应该为子流配置 Poller 并以某种方式使用 MongoTransactionManager 配置它?
Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?
这是正确的假设。只要线程在流中移动(例如您的 PollableChannel creationChannel
),当前事务就会在消息放入存储区时提交。当前线程中没有更多的事情发生,因此,您从那个 .transactional(transactionHandleMessageAdvice)
.
要使阅读具有事务性,您确实必须在 .transform(Mapper::mapCreate)
端点上配置 Poller
。因此,在您再次切换到不同线程之前,来自该队列通道的每个轮询都将是事务性的。
没有办法(也不能)让整个异步流程都具有事务性,因为事务与 ThreadLocal
相关联,并且在调用堆栈返回事务发起者的那一刻,它是提交或回滚。使用异步逻辑,我们只是打算从生产者端“发送并忘记”,让消费者在数据准备好时处理数据。这不是事务的设计目的。