Reactor - 异步/非阻塞
Reactor - Async / non-blocking
我认为下面的通量链将通过事件循环(如 JS)放置/执行。因此 运行 下面的代码将首先打印阻塞 for
循环,然后执行通量链。
但是整个通量总是在移动到 for
循环之前首先执行。 [我确实有一些 sleep
阻塞的语句。但是有2个doOnNext
阶段]
AtomicInteger atomicInteger = new AtomicInteger(0);
// reactor
Flux.generate(synchronousSink -> {
if (atomicInteger.incrementAndGet() < 3) {
synchronousSink.next(atomicInteger.get());
} else
synchronousSink.complete();
})
.doOnNext(i -> {
System.out.println(
"A - Received " + i + " by " + Thread.currentThread().getName()
);
sleep(Duration.ofSeconds(1));
}).doOnNext(i -> {
System.out.println(
"B - Received " + i + " : by " + Thread.currentThread().getName()
);
sleep(Duration.ofSeconds(1));
}).subscribe();
for (int i = 0; i < 5; i++) {
System.out.println("For " + i + " by " + Thread.currentThread().getName());
sleep(Duration.ofMillis(500));
}
它打印
A - Received 1 by main
B - Received 1 by main
A - Received 2 by main
B - Received 2 by main
For 0 by main
For 1 by main
For 2 by main
For 3 by main
For 4 by main
有人可以解释这种行为并回答这些问题吗?
- 当我们使用 reactor 时,只有使用一些调度程序才能实现 async/non-blocking 行为吗?
- 如果我不使用任何调度程序并让代码使用当前线程执行,即使对于 IO 密集型应用程序,我们是否可以期望使用 WebFlux 而不是 Spring MVC 有更好的性能差异?
- 线程阻塞不是 Reactor 的正确用法。要使其以非阻塞方式工作,您应该使用
publishOn
/subscribeOn
那么输出应该是:
For 0 by main
A - Received 1 by boundedElastic-3
For 1 by main
For 2 by main
B - Received 1 : by boundedElastic-3
For 3 by main
For 4 by main
A - Received 2 by boundedElastic-3
有关 publishOn
与 subscribeOn
的更多信息,请参阅:
- 当然 - Reactor 支持 HTTP(包括 Websockets)、TCP 和 UDP 的非阻塞。更重要的是 Reactor 作为默认工作在 Netty 服务器上,它改变了处理请求的方式。
例如在 Tomcat 中,请求-响应由同一个线程处理——而且这个线程正在等待响应,所以它被阻塞了。在 Netty 中,一个线程可以处理发送请求,另一个线程可以处理接收响应 - 线程不会隐式等待响应。
我认为下面的通量链将通过事件循环(如 JS)放置/执行。因此 运行 下面的代码将首先打印阻塞 for
循环,然后执行通量链。
但是整个通量总是在移动到 for
循环之前首先执行。 [我确实有一些 sleep
阻塞的语句。但是有2个doOnNext
阶段]
AtomicInteger atomicInteger = new AtomicInteger(0);
// reactor
Flux.generate(synchronousSink -> {
if (atomicInteger.incrementAndGet() < 3) {
synchronousSink.next(atomicInteger.get());
} else
synchronousSink.complete();
})
.doOnNext(i -> {
System.out.println(
"A - Received " + i + " by " + Thread.currentThread().getName()
);
sleep(Duration.ofSeconds(1));
}).doOnNext(i -> {
System.out.println(
"B - Received " + i + " : by " + Thread.currentThread().getName()
);
sleep(Duration.ofSeconds(1));
}).subscribe();
for (int i = 0; i < 5; i++) {
System.out.println("For " + i + " by " + Thread.currentThread().getName());
sleep(Duration.ofMillis(500));
}
它打印
A - Received 1 by main
B - Received 1 by main
A - Received 2 by main
B - Received 2 by main
For 0 by main
For 1 by main
For 2 by main
For 3 by main
For 4 by main
有人可以解释这种行为并回答这些问题吗?
- 当我们使用 reactor 时,只有使用一些调度程序才能实现 async/non-blocking 行为吗?
- 如果我不使用任何调度程序并让代码使用当前线程执行,即使对于 IO 密集型应用程序,我们是否可以期望使用 WebFlux 而不是 Spring MVC 有更好的性能差异?
- 线程阻塞不是 Reactor 的正确用法。要使其以非阻塞方式工作,您应该使用
publishOn
/subscribeOn
那么输出应该是:
For 0 by main A - Received 1 by boundedElastic-3 For 1 by main For 2 by main B - Received 1 : by boundedElastic-3 For 3 by main For 4 by main A - Received 2 by boundedElastic-3
有关 publishOn
与 subscribeOn
的更多信息,请参阅:
- 当然 - Reactor 支持 HTTP(包括 Websockets)、TCP 和 UDP 的非阻塞。更重要的是 Reactor 作为默认工作在 Netty 服务器上,它改变了处理请求的方式。 例如在 Tomcat 中,请求-响应由同一个线程处理——而且这个线程正在等待响应,所以它被阻塞了。在 Netty 中,一个线程可以处理发送请求,另一个线程可以处理接收响应 - 线程不会隐式等待响应。