不要使用 Spring Cloud Stream 在反应函数中将消息标记为已确认
Don't mark message as acknowledged in reactive Function with Spring Cloud Stream
我在 Horsham SR1 版本中使用 spring-cloud-stream
和 Java 13。我正在使用 Google Pub/Sub 作为基础消息系统。
我有一个反应式 Function
,看起来像这样:
@Bean
public Function<Flux<Message>, Mono<Void>> messageConsumer() {
return messageFlux ->
messageFlux
.flatMap(message -> {
// do something
return something;
})
.doOnError(throwable -> log.error("could not process message", throwable))
.then();
}
我怎样才能Spring不确认错误信息?在 flatMap
方法中抛出异常是否足够?
您必须了解每种方法都有利有弊,而对于反应式,我们无法查看流。它完全在你的控制之下。事实上,主要区别之一是上面的函数只被调用一次,如果它是命令式函数,它将在每条消息上被调用。
基本上,对于反应式用户,有效地将操作单元声明为整个流(无论这在您的应用程序上下文中可能意味着什么)。
命令式的操作单元是单个消息,因此我们可以做每条消息 acks、nacks 等事情。
我在 Horsham SR1 版本中使用 spring-cloud-stream
和 Java 13。我正在使用 Google Pub/Sub 作为基础消息系统。
我有一个反应式 Function
,看起来像这样:
@Bean
public Function<Flux<Message>, Mono<Void>> messageConsumer() {
return messageFlux ->
messageFlux
.flatMap(message -> {
// do something
return something;
})
.doOnError(throwable -> log.error("could not process message", throwable))
.then();
}
我怎样才能Spring不确认错误信息?在 flatMap
方法中抛出异常是否足够?
您必须了解每种方法都有利有弊,而对于反应式,我们无法查看流。它完全在你的控制之下。事实上,主要区别之一是上面的函数只被调用一次,如果它是命令式函数,它将在每条消息上被调用。
基本上,对于反应式用户,有效地将操作单元声明为整个流(无论这在您的应用程序上下文中可能意味着什么)。 命令式的操作单元是单个消息,因此我们可以做每条消息 acks、nacks 等事情。