反应堆项目:下游缓慢

Project Reactor: downstream is slow

根据我的理解,下游的其余部分需要在线程池中的线程上处理(我将其设置为 1024)

这是我的代码。

Flux<String> ips =
        Flux.fromIterable(items).map(Item::getIp);
ips
        .publishOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(1024)))
        .map(ip -> {
            try {
                Request request = new Request.Builder().url("https://" + ip + ":443").build();
                Response response = okHttpClient.newCall(request).execute();
                return response.code();
            } catch (Exception e) {
            }

            return -1;
        })
        .subscribe(System.out::println);

出于某种原因,与以下代码相比,此代码非常慢:

appRules
        .stream()
        .parallel()
        .map(Item::getIp)
        .forEach(ip -> {
            try {
                Request request = new Request.Builder().url("https://" + ip + ":443").build();
                Response response = okHttpClient.newCall(request).execute();
                System.out.println(response.code());
            } catch (Exception e) {
            }

            System.out.println(-1);
        });

为什么?当您受 IO 限制时,同时处理项目流的正确方法是什么? (而不是 CPU)

执行速度较慢的原因是 Reactor 管道执行默认为 single-threaded。因此,当您使用 Flux.publishOn 运算符时,您只是说您希望管道的这一部分在给定线程池的线程上执行,但它不会同时在单独的线程上执行每个项目。

实现并发的一种选择是使用 parallel Flux,它创建所谓的 rails,其中数据可以并行流动,但它主要用于 CPU-bound 操作。

更好的选择是将阻塞代码包装在 Mono 中并将其委托给专用线程池,类似于您所做的,只是这次每个任务将获得自己的线程:

private static void reactorProcess()
{
    ExecutorService executor = Executors.newFixedThreadPool(1024);

    Flux.range(1, 1024)
        .flatMap(a -> Mono.fromRunnable(() -> simulateHttpCall())
                          .subscribeOn(Schedulers.fromExecutor(executor)))
        .blockLast();

    executor.shutdown();
}

private static void simulateHttpCall()
{
    try
    {
        Thread.sleep(100);
        System.out.println(Thread.currentThread().getName() + ": " + ZonedDateTime.now());
    } catch (InterruptedException e)
    {
        e.printStackTrace();
    }
}

我还要指出,Java 并行流不是这种处理的可行替代方案。它默认使用 ForkJoinPool,这也适用于 CPU-bound 操作,并且只使用与您机器中的 CPU 个内核一样多的线程。

除此之外,如果您想充分利用反应式编程的全部功能,您应该考虑使用支持 non-blocking IO 的 HTTP 客户端,例如 Spring 中的 WebClient .通过使用 non-blocking HTTP 客户端,您无需再担心定义线程池的问题,因为不会阻塞任何线程,并且固定的少量线程将能够为数千个并发请求提供服务。