Schedulers.boundedElastic 似乎使用相同的线程进行处理
Schedulers.boundedElastic appears to use same Thread for processing
我对 API 的理解是使用 Schedulers.boundedElastic() 或像 Schedulers.newBoundedElastic(3, 10, "MyThreadGroup") 这样的变体;或 Schedulers.fromExecutor(executor) 允许在多个线程中处理一个 IO 操作。
但是使用以下示例代码进行的模拟似乎表明单个 thread/same 线程正在执行 flatMap
中的工作
Flux.range(0, 100)
.flatMap(i -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return Flux.just(i);
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
Thread.sleep(10000); // main thread
//This yields the following
Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...
上面的输出向我表明 flatMap 中的相同线程是 运行。当在多个 IO 的订阅上调用 flatMap 时,有没有办法让多个线程处理项目?我期待看到 boundedElastic-1、boundedElastic-2 ...
.
1.非阻塞IO并发(首选)
如果您有机会使用非阻塞 IO(如 Spring WebClient),那么您无需担心线程或调度程序,您可以立即获得并发性:
Flux.range(0, 100)
.flatMap(i -> Mono.delay(Duration.ofMillis(500)) // e.g.: reactive webclient call
.doOnNext(x -> System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread()
.getName())))
.subscribe();
2。阻塞IO并发
如果可以选择,最好避免阻塞 IO。万一你无法避免它,你只需要对你的代码稍作修改并将 subscribeOn
应用于内部 Mono
:
Flux.range(0, 100)
.flatMap(i -> Mono.fromRunnable(() -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.boundedElastic()))
.subscribe();
在多个线程上获取 flatMap 运行 的一种方法是创建一个 ParallelFlux。下面的示例代码可以解决问题。
Flux.range(0, 1000)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(i -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return Flux.just(i);
})
.subscribe();
Thread.sleep(10000);
我对 API 的理解是使用 Schedulers.boundedElastic() 或像 Schedulers.newBoundedElastic(3, 10, "MyThreadGroup") 这样的变体;或 Schedulers.fromExecutor(executor) 允许在多个线程中处理一个 IO 操作。
但是使用以下示例代码进行的模拟似乎表明单个 thread/same 线程正在执行 flatMap
中的工作Flux.range(0, 100)
.flatMap(i -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return Flux.just(i);
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
Thread.sleep(10000); // main thread
//This yields the following
Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...
上面的输出向我表明 flatMap 中的相同线程是 运行。当在多个 IO 的订阅上调用 flatMap 时,有没有办法让多个线程处理项目?我期待看到 boundedElastic-1、boundedElastic-2 ... .
1.非阻塞IO并发(首选)
如果您有机会使用非阻塞 IO(如 Spring WebClient),那么您无需担心线程或调度程序,您可以立即获得并发性:
Flux.range(0, 100)
.flatMap(i -> Mono.delay(Duration.ofMillis(500)) // e.g.: reactive webclient call
.doOnNext(x -> System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread()
.getName())))
.subscribe();
2。阻塞IO并发
如果可以选择,最好避免阻塞 IO。万一你无法避免它,你只需要对你的代码稍作修改并将 subscribeOn
应用于内部 Mono
:
Flux.range(0, 100)
.flatMap(i -> Mono.fromRunnable(() -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.boundedElastic()))
.subscribe();
在多个线程上获取 flatMap 运行 的一种方法是创建一个 ParallelFlux。下面的示例代码可以解决问题。
Flux.range(0, 1000)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(i -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return Flux.just(i);
})
.subscribe();
Thread.sleep(10000);