Reactive Spring Cloud Stream 从队列中读取所有数据,但 non-reactive 一条一条地读取消息
Reactive Spring Cloud Stream reads all data from queue, but non-reactive reads messages one by one
我注意到 Spring Cloud Stream 在反应性和非反应性行为方面的差异。当我使用非反应性方法时,消息会被一条一条地阅读。这是为流监听器截取的代码:
@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
Thread.sleep(2000);
System.out.println(input.toUpperCase());
}
此代码一条一条地消费消息。我可以从 RabbitMQ 管理站点看到这个:
重启后应用程序继续使用重启前完成的消息。但是使用反应式流侦听器服务会立即从队列中获取所有消息。应用程序重新启动后,它不会继续处理消息,因为队列为空。
这里是反应流监听器的片段:
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
input
.delayElements(Duration.ofSeconds(2))
.map(String::toUpperCase)
.doOnEach(System.out::println)
.subscribe();
}
我想了解为什么会发生这种情况,是否可以在处理前面的元素之前停止读取整个队列。是我对 Reactor 运算符的错误理解还是预期的行为?是否可以以某种方式指定背压?
我们可能需要看看反应式是如何连接的,因为它看起来确实很奇怪。话虽如此,如果您使用的是最新的 spring-cloud-stream,我会建议对您的应用程序进行轻微更改并依赖 Spring Cloud Function 支持,这实际上为您的案例提供了实现反应式的替代支持消息处理程序 - https://spring.io/blog/2018/08/28/spring-cloud-stream-fishtown-m2-2-1-0-m2-release-announcement。
您应该对博客中的示例进行的唯一更改是:
@Bean
public Consumer<Flux<String>> foo() {
//
}
因为反应式是非阻塞的; delayElements()
将工作交给另一个线程;这将 returns 的侦听器线程释放到容器并确认消息;因此下一个交付(并延迟);因此,所有消息最终都会进入调度程序的队列。 Reactive 确实不适合这种用例,除非你使用手动 acks,如下所示。
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) {
input
.doOnEach(System.out::println)
.delayElements(Duration.ofSeconds(2))
.map(m -> {
return MessageBuilder.withPayload(m.getPayload().toUpperCase())
.copyHeaders(m.getHeaders())
.build();
})
.doOnEach(System.out::println)
.doOnNext(m -> {
try {
m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
catch (IOException e) {
e.printStackTrace();
}
})
.subscribe();
}
和
spring.cloud.stream.bindings.input.group=foo
spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual
我注意到 Spring Cloud Stream 在反应性和非反应性行为方面的差异。当我使用非反应性方法时,消息会被一条一条地阅读。这是为流监听器截取的代码:
@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
Thread.sleep(2000);
System.out.println(input.toUpperCase());
}
此代码一条一条地消费消息。我可以从 RabbitMQ 管理站点看到这个:
重启后应用程序继续使用重启前完成的消息。但是使用反应式流侦听器服务会立即从队列中获取所有消息。应用程序重新启动后,它不会继续处理消息,因为队列为空。
这里是反应流监听器的片段:
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
input
.delayElements(Duration.ofSeconds(2))
.map(String::toUpperCase)
.doOnEach(System.out::println)
.subscribe();
}
我想了解为什么会发生这种情况,是否可以在处理前面的元素之前停止读取整个队列。是我对 Reactor 运算符的错误理解还是预期的行为?是否可以以某种方式指定背压?
我们可能需要看看反应式是如何连接的,因为它看起来确实很奇怪。话虽如此,如果您使用的是最新的 spring-cloud-stream,我会建议对您的应用程序进行轻微更改并依赖 Spring Cloud Function 支持,这实际上为您的案例提供了实现反应式的替代支持消息处理程序 - https://spring.io/blog/2018/08/28/spring-cloud-stream-fishtown-m2-2-1-0-m2-release-announcement。 您应该对博客中的示例进行的唯一更改是:
@Bean
public Consumer<Flux<String>> foo() {
//
}
因为反应式是非阻塞的; delayElements()
将工作交给另一个线程;这将 returns 的侦听器线程释放到容器并确认消息;因此下一个交付(并延迟);因此,所有消息最终都会进入调度程序的队列。 Reactive 确实不适合这种用例,除非你使用手动 acks,如下所示。
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) {
input
.doOnEach(System.out::println)
.delayElements(Duration.ofSeconds(2))
.map(m -> {
return MessageBuilder.withPayload(m.getPayload().toUpperCase())
.copyHeaders(m.getHeaders())
.build();
})
.doOnEach(System.out::println)
.doOnNext(m -> {
try {
m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
catch (IOException e) {
e.printStackTrace();
}
})
.subscribe();
}
和
spring.cloud.stream.bindings.input.group=foo
spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual