为什么 stream parallel() 不使用所有可用线程?

Why does stream parallel() not use all available threads?

我尝试使用 Java8(1.8.0_172) stream.parallel() 并行 运行 100 Sleep 任务在具有 100 多个可用线程的自定义 ForkJoinPool 中提交。每个任务将 睡眠 1 秒。我预计整个工作将在 ~1 秒后完成,因为 100 次睡眠可以并行完成。但是我观察到 运行7 秒的时间。

    @Test
    public void testParallelStream() throws Exception {
        final int REQUESTS = 100;
        ForkJoinPool forkJoinPool = null;
        try {
            // new ForkJoinPool(256): same results for all tried values of REQUESTS
            forkJoinPool = new ForkJoinPool(REQUESTS);
            forkJoinPool.submit(() -> {

                IntStream stream = IntStream.range(0, REQUESTS);
                final List<String> result = stream.parallel().mapToObj(i -> {
                    try {
                        System.out.println("request " + i);
                        Thread.sleep(1000);
                        return Integer.toString(i);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).collect(Collectors.toList());
                // assertThat(result).hasSize(REQUESTS);
            }).join();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

输出表明~16 个流元素在暂停 1 秒之前执行,然后是另一个~16 等等。所以看起来即使 forkjoinpool 是用 100 个线程创建的,也只有 ~16 个被使用。

一旦我使用超过 23 个线程,就会出现这种模式:

1-23 threads: ~1s
24-35 threads: ~2s
36-48 threads: ~3s
...
System.out.println(Runtime.getRuntime().availableProcessors());
// Output: 4

如您所写,您让流决定执行的并行性。

你的效果是 ArrayList.parallelStream 试图通过平均分割数据来超越你,而不考虑可用线程的数量。这对 CPU-绑定操作很有用,线程数超过 CPU 核心没有用,但不适用于需要等待 IO 的进程。

为什么不将所有项目按顺序强制送入 ForkJoinPool,从而强制使用所有可用线程?

        IntStream stream = IntStream.range(0, REQUESTS);
        List<ForkJoinTask<String>> results
                = stream.mapToObj(i -> forkJoinPool.submit(() -> {

            try {
                System.out.println("request " + i);
                Thread.sleep(1000);
                return Integer.toString(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toList());
        results.forEach(ForkJoinTask::join);

这在我的机器上用了不到两秒钟。

由于 Stream 实现对 Fork/Join 池的使用是一个实现细节,强制它使用不同 Fork/Join 池的技巧也没有记录,似乎是偶然工作的,即有一个 hardcoded constant 确定实际的并行度,具体取决于默认池的并行度。所以最初没有预见到使用不同的池。

但是,已经认识到使用具有不适当目标并行度的不同池是一个错误,即使没有记录此技巧,请参阅 JDK-8190974

已在 Java 10 中修复并向后移植到 Java 8,更新 222。

所以一个简单的解决方案是更新 Java 版本。

您还可以更改默认池的并行度,例如

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");

在做任何事情之前 Fork/Join activity.

但这可能会对其他并行操作产生意想不到的影响。