ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢?

CompletableFuture on ParallelStream gets batched and runs slower than sequential stream?

方法一

通常,非常快,效果很好。

public static int loops = 500;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
        .collect(Collectors.toList()).stream() // collect first, else will be sequential
        .map(CompletableFuture::join)
        .mapToLong(Long::longValue)
        .summaryStatistics();

log.info("cf completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cf completed in :: 1054, summaryStats :: LongSummaryStatistics{count=500, sum=504008, min=1000, average=1008.016000, max=1017} 

我明白,如果我不先收集流,那么由于懒惰的性质,流将 spring 一个接一个地更新 CompletableFutures,并同步运行。 所以,作为一个实验:

方法二

删除中间收集步骤,但也使流并行! :

Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
        .parallel()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
        .map(CompletableFuture::join) // direct join
        .mapToLong(Long::longValue).summaryStatistics();

log.info("cfps_directJoin completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cfps_directJoin completed in :: 8098, summaryStats :: LongSummaryStatistics{count=500, sum=505002, min=1000, average=1010.004000, max=1015}

总结:

我观察到的一个模式:

  1. 并行流方法一次“批处理”60 个调用,因此有 500 个循环,500/60 ~ 8 个批次,每个需要 1 秒,因此总共 8 个
  2. 所以,当我将循环计数减少到 300 时,有 300/60 = 5 个批次,实际上需要 5 秒才能完成。

所以,问题是:

并行+直集方式为什么会出现这种调用批处理?


为了完成,这是我的虚拟网络调用方法:

    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        log.info(" {} going to sleep..", i);
        try {
            TimeUnit.MILLISECONDS.sleep(1000); // 1 second
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }

这是关于 ForJoinPool 当您阻塞其内部线程时如何处理事情以及它产生了多少新线程的产物。虽然,我可能可以找到发生这种情况的确切线路,但我不确定这是否值得。有两个原因:

  • 逻辑可以改变

  • ForkJoinPool 中的代码非常重要

看来对我们俩来说,ForkJoinPool.commonPool().getParallelism()会return11,所以我得到的结果和你一样。如果您记录 ForkJoinPool.commonPool().getPoolSize() 以了解您的代码使用了多少个活动线程,您会发现在一段时间后,它只是稳定在 64。所以可以同时处理的最大任务是64,这与你看到的结果(那些8 seconds)相当。

如果我 运行 你的代码 -Djava.util.concurrent.ForkJoinPool.common.parallelism=50,它现在在 2 seconds 中执行,并且池大小增加到 256。也就是说,有一个内部逻辑可以调整这些事情。