Schdulers.elastic 没有在 Reactor 中创建新线程
Schdulers.elastic not creating new threads in Reactor
我正在尝试创建一个流,其中通量发出 10 个项目,每个项目并行,每个项目休眠 1 秒。由于每个项目都在单独的线程上发布,我预计整个过程需要 1 秒。但日志显示它需要 10 秒。
我尝试将 subscribeOn 更改为 publishOn,映射到 doOnNext。但是 none 似乎有效。
我是 Reactor 的新手,正在尝试了解我哪里出错了。非常感激任何的帮助。谢谢
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.publishOn(Schedulers.elastic())
.map(count -> {
logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
})
.blockLast();
}
2020-03-30 16:17:29.799 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 10
您必须首先通过调用 parallel
方法创建一个并行通量,然后您必须使用 runOn
来实现并行。
Flux.range(1,10)
.parallel()
.runOn(Schedulers.elastic())
.map(count -> {
System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
}).subscribe();
- 使用
Schedulers.boundedElastic()
,因为不鼓励使用 Scheduler.elastic()
parallel
默认情况下会根据您的 CPU 内核创建线程。如果您想要更多线程,请使用 parallel(10)
- 我想这就是您想要看到的。
规范要求 onNext
事件按顺序调用。您的 map
有效地将输入 onNext
事件转换为阻塞 1 秒的 onNext
事件。根据规范,10 个传入 onNext
导致一系列 10 个传出 onNext
,每个块持续 1s => 10s 的阻塞。
如果您想在 10 个并行 rails 上分配阻塞工作负载,您绝对 100% 必须使用 parallel(10).runOn(Scheduler.elastic())
。 (runOn 的 Scheduler
也可以是 Schedulers.boundedElastic()
,或 Schedulers.newParallel(10)
)。
参考:https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
您可以在后台启动这些进程并获得多线程。这不是并行性。在执行 CPU 密集型任务时应使用并行调度程序,在 I/O 或阻塞操作时使用弹性。
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
.flatMap(count -> {
log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
})
.blockLast();
}
Mono<Integer> method5(int count) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(count);
}
你会得到这样的东西
23:42:33.289 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10
我正在尝试创建一个流,其中通量发出 10 个项目,每个项目并行,每个项目休眠 1 秒。由于每个项目都在单独的线程上发布,我预计整个过程需要 1 秒。但日志显示它需要 10 秒。
我尝试将 subscribeOn 更改为 publishOn,映射到 doOnNext。但是 none 似乎有效。
我是 Reactor 的新手,正在尝试了解我哪里出错了。非常感激任何的帮助。谢谢
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.publishOn(Schedulers.elastic())
.map(count -> {
logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
})
.blockLast();
}
2020-03-30 16:17:29.799 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 10
您必须首先通过调用 parallel
方法创建一个并行通量,然后您必须使用 runOn
来实现并行。
Flux.range(1,10)
.parallel()
.runOn(Schedulers.elastic())
.map(count -> {
System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
}).subscribe();
- 使用
Schedulers.boundedElastic()
,因为不鼓励使用Scheduler.elastic()
parallel
默认情况下会根据您的 CPU 内核创建线程。如果您想要更多线程,请使用parallel(10)
- 我想这就是您想要看到的。
规范要求 onNext
事件按顺序调用。您的 map
有效地将输入 onNext
事件转换为阻塞 1 秒的 onNext
事件。根据规范,10 个传入 onNext
导致一系列 10 个传出 onNext
,每个块持续 1s => 10s 的阻塞。
如果您想在 10 个并行 rails 上分配阻塞工作负载,您绝对 100% 必须使用 parallel(10).runOn(Scheduler.elastic())
。 (runOn 的 Scheduler
也可以是 Schedulers.boundedElastic()
,或 Schedulers.newParallel(10)
)。
参考:https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
您可以在后台启动这些进程并获得多线程。这不是并行性。在执行 CPU 密集型任务时应使用并行调度程序,在 I/O 或阻塞操作时使用弹性。
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
.flatMap(count -> {
log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
})
.blockLast();
}
Mono<Integer> method5(int count) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(count);
}
你会得到这样的东西
23:42:33.289 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10