Spring 路由器子流的集成事务

Spring Integration transaction for subflow of router

我有一个名为“creationChannel”的频道,它使用 MongoMessageStore 进行备份,如下所示:

  @Bean
  ChannelMessageStore messageStore() {
    return new MongoDbChannelMessageStore(mongoDatabaseFactory);
  }

  @Bean
  PollableChannel creationChannel(ChannelMessageStore messageStore) {
    return MessageChannels.queue("creationChannel", messageStore, "create").get();
  }

我想在我的流程中使用它,但我想确定,如果“createOrderHandler”工作正常,来自那里的消息将是只读的(这同样适用于“updateOrderHandler”,但不同的频道)。

...some code here...
        .<HeadersElement, OperationType>route(
            payload -> route(payload),
            spec -> spec
                .transactional(transactionHandleMessageAdvice)
                .subFlowMapping(
                    OperationType.New,
                    sf -> sf
                        .channel("creationChannel")
                        .log(Level.DEBUG, "Creation of a new order", Message::getPayload)
                        .transform(Mapper::mapCreate)
                        .handle(createOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
                .subFlowMapping(
                    OperationType.Update,
                    sf -> sf
                        .channel("updateChannel")
                        .log(Level.DEBUG, "Update for existing order", Message::getPayload)
                        .transform(Mapper::mapUpdate)
                        .handle(updateOrderHandler,
                            handlerSpec -> handlerSpec.advice(retryOperationsInterceptor))
                )
        )
...some code here...

我试过这样配置“transactionHandleMessageAdvice”:

  @Bean
  TransactionHandleMessageAdvice transactionHandleMessageAdvice(MongoTransactionManager transactionManager) {
    return new TransactionHandleMessageAdvice(transactionManager, new Properties());
  }

但是在处理程序因异常而失败后,消息仍在从数据库中删除。

也许我应该为子流配置 Poller 并以某种方式使用 MongoTransactionManager 配置它?

Maybe I should configure Poller for subflows and configure it with MongoTransactionManager somehow?

这是正确的假设。只要线程在流中移动(例如您的 PollableChannel creationChannel),当前事务就会在消息放入存储区时提交。当前线程中没有更多的事情发生,因此,您从那个 .transactional(transactionHandleMessageAdvice).

开始的当前事务

要使阅读具有事务性,您确实必须在 .transform(Mapper::mapCreate) 端点上配置 Poller。因此,在您再次切换到不同线程之前,来自该队列通道的每个轮询都将是事务性的。

没有办法(也不能)让整个异步流程都具有事务性,因为事务与 ThreadLocal 相关联,并且在调用堆栈返回事务发起者的那一刻,它是提交或回滚。使用异步逻辑,我们只是打算从生产者端“发送并忘记”,让消费者在数据准备好时处理数据。这不是事务的设计目的。