反应堆项目:下游缓慢
Project Reactor: downstream is slow
根据我的理解,下游的其余部分需要在线程池中的线程上处理(我将其设置为 1024)
这是我的代码。
Flux<String> ips =
Flux.fromIterable(items).map(Item::getIp);
ips
.publishOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(1024)))
.map(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
return response.code();
} catch (Exception e) {
}
return -1;
})
.subscribe(System.out::println);
出于某种原因,与以下代码相比,此代码非常慢:
appRules
.stream()
.parallel()
.map(Item::getIp)
.forEach(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
System.out.println(response.code());
} catch (Exception e) {
}
System.out.println(-1);
});
为什么?当您受 IO 限制时,同时处理项目流的正确方法是什么? (而不是 CPU)
执行速度较慢的原因是 Reactor 管道执行默认为 single-threaded。因此,当您使用 Flux.publishOn
运算符时,您只是说您希望管道的这一部分在给定线程池的线程上执行,但它不会同时在单独的线程上执行每个项目。
实现并发的一种选择是使用 parallel Flux,它创建所谓的 rails,其中数据可以并行流动,但它主要用于 CPU-bound 操作。
更好的选择是将阻塞代码包装在 Mono 中并将其委托给专用线程池,类似于您所做的,只是这次每个任务将获得自己的线程:
private static void reactorProcess()
{
ExecutorService executor = Executors.newFixedThreadPool(1024);
Flux.range(1, 1024)
.flatMap(a -> Mono.fromRunnable(() -> simulateHttpCall())
.subscribeOn(Schedulers.fromExecutor(executor)))
.blockLast();
executor.shutdown();
}
private static void simulateHttpCall()
{
try
{
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + ": " + ZonedDateTime.now());
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
我还要指出,Java 并行流不是这种处理的可行替代方案。它默认使用 ForkJoinPool,这也适用于 CPU-bound 操作,并且只使用与您机器中的 CPU 个内核一样多的线程。
除此之外,如果您想充分利用反应式编程的全部功能,您应该考虑使用支持 non-blocking IO 的 HTTP 客户端,例如 Spring 中的 WebClient .通过使用 non-blocking HTTP 客户端,您无需再担心定义线程池的问题,因为不会阻塞任何线程,并且固定的少量线程将能够为数千个并发请求提供服务。
根据我的理解,下游的其余部分需要在线程池中的线程上处理(我将其设置为 1024)
这是我的代码。
Flux<String> ips =
Flux.fromIterable(items).map(Item::getIp);
ips
.publishOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(1024)))
.map(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
return response.code();
} catch (Exception e) {
}
return -1;
})
.subscribe(System.out::println);
出于某种原因,与以下代码相比,此代码非常慢:
appRules
.stream()
.parallel()
.map(Item::getIp)
.forEach(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
System.out.println(response.code());
} catch (Exception e) {
}
System.out.println(-1);
});
为什么?当您受 IO 限制时,同时处理项目流的正确方法是什么? (而不是 CPU)
执行速度较慢的原因是 Reactor 管道执行默认为 single-threaded。因此,当您使用 Flux.publishOn
运算符时,您只是说您希望管道的这一部分在给定线程池的线程上执行,但它不会同时在单独的线程上执行每个项目。
实现并发的一种选择是使用 parallel Flux,它创建所谓的 rails,其中数据可以并行流动,但它主要用于 CPU-bound 操作。
更好的选择是将阻塞代码包装在 Mono 中并将其委托给专用线程池,类似于您所做的,只是这次每个任务将获得自己的线程:
private static void reactorProcess()
{
ExecutorService executor = Executors.newFixedThreadPool(1024);
Flux.range(1, 1024)
.flatMap(a -> Mono.fromRunnable(() -> simulateHttpCall())
.subscribeOn(Schedulers.fromExecutor(executor)))
.blockLast();
executor.shutdown();
}
private static void simulateHttpCall()
{
try
{
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + ": " + ZonedDateTime.now());
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
我还要指出,Java 并行流不是这种处理的可行替代方案。它默认使用 ForkJoinPool,这也适用于 CPU-bound 操作,并且只使用与您机器中的 CPU 个内核一样多的线程。
除此之外,如果您想充分利用反应式编程的全部功能,您应该考虑使用支持 non-blocking IO 的 HTTP 客户端,例如 Spring 中的 WebClient .通过使用 non-blocking HTTP 客户端,您无需再担心定义线程池的问题,因为不会阻塞任何线程,并且固定的少量线程将能够为数千个并发请求提供服务。