为什么反应流订阅者在当前线程中执行?
Why does reactive stream subscriber is executed in current thread?
subscribe
文档状态:
Keep in mind that since the sequence can be asynchronous, this will
immediately return control to the calling thread. This can give the
impression the consumer is not invoked when executing in a main thread
or a unit test for instance.
那么为什么我看到每个元素都打印出来了?表示lambda在主线程中执行
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeUnit;
public class ReactorMain {
public static void main(String[] args) {
Flux.just(1,2,3,4,5,6,7,8,9,10)
.limitRequest(5)
.skip(3)
.subscribe(value -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Value: " + value);
})
;
}
}
这是因为值的生成不包含任何异步元素。因此,当您订阅 Flux 时,它将使用当前线程做它能做的尽可能多的事情,直到一个异步元素让它等待并使用该线程做其他事情。
如果要生成您的元素,例如您以异步方式调用 Web 服务,您将以异步方式接收值。
您可以通过延迟序列来测试它:
public static void main(String[] args) {
Flux.just(1,2,3,4,5,6,7,8,9,10)
.limitRequest(5)
.skip(3)
.delayElements(Duration.ofSeconds(2))
.subscribe(value -> {
System.out.println("Value: " + value);
})
;
}
subscribe
文档状态:
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
那么为什么我看到每个元素都打印出来了?表示lambda在主线程中执行
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeUnit;
public class ReactorMain {
public static void main(String[] args) {
Flux.just(1,2,3,4,5,6,7,8,9,10)
.limitRequest(5)
.skip(3)
.subscribe(value -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Value: " + value);
})
;
}
}
这是因为值的生成不包含任何异步元素。因此,当您订阅 Flux 时,它将使用当前线程做它能做的尽可能多的事情,直到一个异步元素让它等待并使用该线程做其他事情。
如果要生成您的元素,例如您以异步方式调用 Web 服务,您将以异步方式接收值。
您可以通过延迟序列来测试它:
public static void main(String[] args) {
Flux.just(1,2,3,4,5,6,7,8,9,10)
.limitRequest(5)
.skip(3)
.delayElements(Duration.ofSeconds(2))
.subscribe(value -> {
System.out.println("Value: " + value);
})
;
}