Spring Cloud Stream with RabbitMQ binder,如何应用@Transactional?

Spring Cloud Stream with RabbitMQ binder, how to apply @Transactional?

我有一个Spring Cloud Stream application that receives events from RabbitMQ using the Rabbit Binder。我的申请可以总结为:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
       // Transform events and store them in MongoDB using 
       // spring-boot-data-mongodb-reactive
       ...
}

问题是 @Transactional 似乎不能与 Spring Cloud Stream 一起使用(或者至少这是我的印象),因为如果写入 MongoDB 时出现异常该事件似乎已经 ack:ed 到 RabbitMQ,并且不会重试该操作。

考虑到我想实现与使用 @Transactionalspring-amqp:

函数基本相同的功能
  1. 使用 Spring 时是否必须手动确认发送到 RabbitMQ 的消息 Cloud Stream 和 Rabbit Binder?
  2. 如果可以,我该如何实现?

这里有几个问题。

  1. 确认消息不需要交易
  2. Reactor-based @StreamListener 方法只被调用一次,只是为了设置 Flux 所以 @Transactional 在那个方法上是没有意义的 - 消息然后通过通量流动所以任何与单个消息有关的事情都必须在通量的上下文中完成。
  3. Spring 事务绑定到线程 - Reactor 是 non-blocking;消息将在第一次切换时被确认。

是的,您需要使用手动确认;大概是 mongodb 存储操作的结果。您可能需要使用 Flux<Message<Event>> 才能访问频道和交付标签 headers.