为什么 stream parallel() 不使用所有可用线程?
Why does stream parallel() not use all available threads?
我尝试使用 Java8(1.8.0_172) stream.parallel() 并行 运行 100 Sleep 任务在具有 100 多个可用线程的自定义 ForkJoinPool 中提交。每个任务将 睡眠 1 秒。我预计整个工作将在 ~1 秒后完成,因为 100 次睡眠可以并行完成。但是我观察到 运行7 秒的时间。
@Test
public void testParallelStream() throws Exception {
final int REQUESTS = 100;
ForkJoinPool forkJoinPool = null;
try {
// new ForkJoinPool(256): same results for all tried values of REQUESTS
forkJoinPool = new ForkJoinPool(REQUESTS);
forkJoinPool.submit(() -> {
IntStream stream = IntStream.range(0, REQUESTS);
final List<String> result = stream.parallel().mapToObj(i -> {
try {
System.out.println("request " + i);
Thread.sleep(1000);
return Integer.toString(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
// assertThat(result).hasSize(REQUESTS);
}).join();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
输出表明~16 个流元素在暂停 1 秒之前执行,然后是另一个~16 等等。所以看起来即使 forkjoinpool 是用 100 个线程创建的,也只有 ~16 个被使用。
一旦我使用超过 23 个线程,就会出现这种模式:
1-23 threads: ~1s
24-35 threads: ~2s
36-48 threads: ~3s
...
System.out.println(Runtime.getRuntime().availableProcessors());
// Output: 4
如您所写,您让流决定执行的并行性。
你的效果是 ArrayList.parallelStream
试图通过平均分割数据来超越你,而不考虑可用线程的数量。这对 CPU-绑定操作很有用,线程数超过 CPU 核心没有用,但不适用于需要等待 IO 的进程。
为什么不将所有项目按顺序强制送入 ForkJoinPool,从而强制使用所有可用线程?
IntStream stream = IntStream.range(0, REQUESTS);
List<ForkJoinTask<String>> results
= stream.mapToObj(i -> forkJoinPool.submit(() -> {
try {
System.out.println("request " + i);
Thread.sleep(1000);
return Integer.toString(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})).collect(Collectors.toList());
results.forEach(ForkJoinTask::join);
这在我的机器上用了不到两秒钟。
由于 Stream 实现对 Fork/Join 池的使用是一个实现细节,强制它使用不同 Fork/Join 池的技巧也没有记录,似乎是偶然工作的,即有一个 hardcoded constant 确定实际的并行度,具体取决于默认池的并行度。所以最初没有预见到使用不同的池。
但是,已经认识到使用具有不适当目标并行度的不同池是一个错误,即使没有记录此技巧,请参阅 JDK-8190974。
已在 Java 10 中修复并向后移植到 Java 8,更新 222。
所以一个简单的解决方案是更新 Java 版本。
您还可以更改默认池的并行度,例如
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
在做任何事情之前 Fork/Join activity.
但这可能会对其他并行操作产生意想不到的影响。
我尝试使用 Java8(1.8.0_172) stream.parallel() 并行 运行 100 Sleep 任务在具有 100 多个可用线程的自定义 ForkJoinPool 中提交。每个任务将 睡眠 1 秒。我预计整个工作将在 ~1 秒后完成,因为 100 次睡眠可以并行完成。但是我观察到 运行7 秒的时间。
@Test
public void testParallelStream() throws Exception {
final int REQUESTS = 100;
ForkJoinPool forkJoinPool = null;
try {
// new ForkJoinPool(256): same results for all tried values of REQUESTS
forkJoinPool = new ForkJoinPool(REQUESTS);
forkJoinPool.submit(() -> {
IntStream stream = IntStream.range(0, REQUESTS);
final List<String> result = stream.parallel().mapToObj(i -> {
try {
System.out.println("request " + i);
Thread.sleep(1000);
return Integer.toString(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
// assertThat(result).hasSize(REQUESTS);
}).join();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
输出表明~16 个流元素在暂停 1 秒之前执行,然后是另一个~16 等等。所以看起来即使 forkjoinpool 是用 100 个线程创建的,也只有 ~16 个被使用。
一旦我使用超过 23 个线程,就会出现这种模式:
1-23 threads: ~1s
24-35 threads: ~2s
36-48 threads: ~3s
...
System.out.println(Runtime.getRuntime().availableProcessors());
// Output: 4
如您所写,您让流决定执行的并行性。
你的效果是 ArrayList.parallelStream
试图通过平均分割数据来超越你,而不考虑可用线程的数量。这对 CPU-绑定操作很有用,线程数超过 CPU 核心没有用,但不适用于需要等待 IO 的进程。
为什么不将所有项目按顺序强制送入 ForkJoinPool,从而强制使用所有可用线程?
IntStream stream = IntStream.range(0, REQUESTS);
List<ForkJoinTask<String>> results
= stream.mapToObj(i -> forkJoinPool.submit(() -> {
try {
System.out.println("request " + i);
Thread.sleep(1000);
return Integer.toString(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})).collect(Collectors.toList());
results.forEach(ForkJoinTask::join);
这在我的机器上用了不到两秒钟。
由于 Stream 实现对 Fork/Join 池的使用是一个实现细节,强制它使用不同 Fork/Join 池的技巧也没有记录,似乎是偶然工作的,即有一个 hardcoded constant 确定实际的并行度,具体取决于默认池的并行度。所以最初没有预见到使用不同的池。
但是,已经认识到使用具有不适当目标并行度的不同池是一个错误,即使没有记录此技巧,请参阅 JDK-8190974。
已在 Java 10 中修复并向后移植到 Java 8,更新 222。
所以一个简单的解决方案是更新 Java 版本。
您还可以更改默认池的并行度,例如
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
在做任何事情之前 Fork/Join activity.
但这可能会对其他并行操作产生意想不到的影响。