Mono.fromCallable 线程行为
Mono.fromCallable Threading Behaviour
我想了解 Reactor 中阻塞函数的行为,但其他一些事情让我完全无法学习。这是代码:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
以下是 AFAIK 所发生情况的摘要:
订阅发生在 main
线程上。
main
在 flatMap
中变得自由,以从上游带来下一个元素。因此,doOnNext
应该总是打印 main
.
处理完 100_000 个元素后,main
将变为空闲并打印 here
.
相反,情况是这样的:
前 256 个元素按预期打印在 main
(在 doOnNext
中)。
大约 1 秒后,下一个 256,然后下一个,依此类推。从第二批开始的元素打印在 elastic
个线程上。
这是我的问题:
为什么要分批处理 256 个元素? Schedulers.elastic()
应该按需创建线程,因此理想情况下应该始终有一个线程可用于从 main 获取请求(忽略 JVM 对我可以创建的线程数的限制)。
为什么第二个 'batch'(及以后)中的元素被打印在 elastic
个线程上?我希望它们在 main
上发布。事实上,当您删除阻塞调用时会发生这种情况
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
此处,所有元素在 doOnNext
中打印 main
并且 here
仅在流完成时打印(释放主线程)。
我是不是漏掉了什么?
好的。让我们通过添加新语句来修改您的示例:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName())) // #4
.flatMap(a -> Mono.just(a)
.subscribeOn(Schedulers.elastic())) // #2
.subscribeOn(Schedulers.immediate()) // NEW, #1
.subscribe(); // #3
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
默认情况下,此订阅链将在当前线程中执行。医生说 Mono::subscribeOn(Scheduler)
:
Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.
和反应堆参考:
subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission.
有了这些知识,我们可以读出语句:
- 在第 3 行我们订阅了
Flux
;
- 在第 1 行中,我们成功地更改了数字发射的上下文;
- 与
flatMap
一致,我们没有任何可见的工作,因为它将在数字发布后工作;
- 同时
flatMap
请求 Integer.MAX_VALUE(默认情况下)从 Flux::range
中排出元素,并且 Flux::range
在 开始发射到 256 一次(因为flatMap
有背压);
- 第 4 行中的回调将使用当前线程,因为第 2 行中的
subscribeOn
尚未执行;
- 第 2 行中的新值
subscribeOn
将为每个元素调用并为每个值选择一个线程。从这一刻起,所有工作都将在 Schedulers::elastic
中完成。因为 #2 比 #1 更接近发布者,#2 将更改源发射的上下文,所有上游工作将使用此调度程序代替 #1;
- 退出
subscribe
。
我们可以提取两条规则:
- 如果
subscribeOn
在时间上比其他 subscribeOn
晚执行 - 所有上游都将使用它;
- 如果
subscribeOn
比其他时间执行得早 - 可以用 hidden 转换替换为其他 subscribeOn
.
您需要了解,整个反应器都建立在 reactive streams 规范之上。
因此,上面的每个运算符都是发布者和订阅者的组合。
场景 2
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
发布者在主线程 (Flux.Range) 上启动,订阅者 (doOnNext) 正在订阅(在主线程上调用 onNext),它被委托给一个比订阅者可以发布。
所以整个 Flux.Range 在主线程上被调用并被分配到弹性线程池来处理
场景 1
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", Publisher: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic())).doOnNext(a -> System.out.println(a + ", Subscriber: " + Thread.currentThread().getName()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
Flux.Range 的前 256 次调用发生在主线程上。请记住,reactive streams 是一种背压驱动的编程,由于平面地图只能合并 256 个上游,平面地图操作员(它是订阅者)不会在 256 个事件后调用 onNext 调用,因为它们都是 blocked/waiting 完成。
现在主线程被释放了,因为它完成了参与反应管道的责任并走出了传送带。即基本上继续执行下一行代码。
这就是为什么您会在前 256 个元素之后看到“这里”的原因。
然后主线程继续按照代码无限期阻塞。
一旦其中一个阻塞线程完成作业,它们将触发订阅者在完成较早请求的线程(即弹性线程)上调用 onNext。
弹性线程现在会将调用委托给内部反应器管道中的另一个弹性线程。
这意味着对于我发出的同一个项目,打印“Publisher”的线程和打印“Subscriber”的线程是不一样的,即使它们是弹性线程。
欢迎验证以下代码
我想了解 Reactor 中阻塞函数的行为,但其他一些事情让我完全无法学习。这是代码:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
以下是 AFAIK 所发生情况的摘要:
订阅发生在
main
线程上。main
在flatMap
中变得自由,以从上游带来下一个元素。因此,doOnNext
应该总是打印main
.处理完 100_000 个元素后,
main
将变为空闲并打印here
.
相反,情况是这样的:
前 256 个元素按预期打印在
main
(在doOnNext
中)。大约 1 秒后,下一个 256,然后下一个,依此类推。从第二批开始的元素打印在
elastic
个线程上。
这是我的问题:
为什么要分批处理 256 个元素?
Schedulers.elastic()
应该按需创建线程,因此理想情况下应该始终有一个线程可用于从 main 获取请求(忽略 JVM 对我可以创建的线程数的限制)。为什么第二个 'batch'(及以后)中的元素被打印在
elastic
个线程上?我希望它们在main
上发布。事实上,当您删除阻塞调用时会发生这种情况public static void main(String[] args) throws InterruptedException { Flux.range(1, 100_000) .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName())) .flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic())) .subscribe(); System.out.println("Here"); Thread.sleep(Integer.MAX_VALUE); }
此处,所有元素在 doOnNext
中打印 main
并且 here
仅在流完成时打印(释放主线程)。
我是不是漏掉了什么?
好的。让我们通过添加新语句来修改您的示例:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName())) // #4
.flatMap(a -> Mono.just(a)
.subscribeOn(Schedulers.elastic())) // #2
.subscribeOn(Schedulers.immediate()) // NEW, #1
.subscribe(); // #3
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
默认情况下,此订阅链将在当前线程中执行。医生说 Mono::subscribeOn(Scheduler)
:
Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.
和反应堆参考:
subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission.
有了这些知识,我们可以读出语句:
- 在第 3 行我们订阅了
Flux
; - 在第 1 行中,我们成功地更改了数字发射的上下文;
- 与
flatMap
一致,我们没有任何可见的工作,因为它将在数字发布后工作; - 同时
flatMap
请求 Integer.MAX_VALUE(默认情况下)从Flux::range
中排出元素,并且Flux::range
在 开始发射到 256 一次(因为flatMap
有背压); - 第 4 行中的回调将使用当前线程,因为第 2 行中的
subscribeOn
尚未执行; - 第 2 行中的新值
subscribeOn
将为每个元素调用并为每个值选择一个线程。从这一刻起,所有工作都将在Schedulers::elastic
中完成。因为 #2 比 #1 更接近发布者,#2 将更改源发射的上下文,所有上游工作将使用此调度程序代替 #1; - 退出
subscribe
。
我们可以提取两条规则:
- 如果
subscribeOn
在时间上比其他subscribeOn
晚执行 - 所有上游都将使用它; - 如果
subscribeOn
比其他时间执行得早 - 可以用 hidden 转换替换为其他subscribeOn
.
您需要了解,整个反应器都建立在 reactive streams 规范之上。 因此,上面的每个运算符都是发布者和订阅者的组合。
场景 2
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
发布者在主线程 (Flux.Range) 上启动,订阅者 (doOnNext) 正在订阅(在主线程上调用 onNext),它被委托给一个比订阅者可以发布。 所以整个 Flux.Range 在主线程上被调用并被分配到弹性线程池来处理
场景 1
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", Publisher: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic())).doOnNext(a -> System.out.println(a + ", Subscriber: " + Thread.currentThread().getName()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
Flux.Range 的前 256 次调用发生在主线程上。请记住,reactive streams 是一种背压驱动的编程,由于平面地图只能合并 256 个上游,平面地图操作员(它是订阅者)不会在 256 个事件后调用 onNext 调用,因为它们都是 blocked/waiting 完成。
现在主线程被释放了,因为它完成了参与反应管道的责任并走出了传送带。即基本上继续执行下一行代码。 这就是为什么您会在前 256 个元素之后看到“这里”的原因。 然后主线程继续按照代码无限期阻塞。
一旦其中一个阻塞线程完成作业,它们将触发订阅者在完成较早请求的线程(即弹性线程)上调用 onNext。 弹性线程现在会将调用委托给内部反应器管道中的另一个弹性线程。
这意味着对于我发出的同一个项目,打印“Publisher”的线程和打印“Subscriber”的线程是不一样的,即使它们是弹性线程。 欢迎验证以下代码