为什么具有短路操作的并行 Java Stream 会评估 Stream 的所有元素,而顺序 Stream 不会?

Why does a parallel Java Stream with a short-curcuit operation evaluate all elements of the Stream while a sequential Stream does not?

考虑parallel()和sequential()这两个测试方法:

  @Test
  public void parallel() throws Exception
  {
    System.out.println( "parallel start." );
    IntStream.of( 0, 1 ).parallel().map( this::work ).findAny();
    System.out.println( "parallel done." );
  }

  @Test
  public void sequential() throws Exception
  {
    System.out.println( "sequential start." );
    IntStream.of( 0, 1 ).map( this::work ).findAny();
    System.out.println( "sequential done." );
  }

  private int work(int i)
  {
    System.out.println( "working... " + i );
    Threads.sleepSafe( i * 1000 );
    System.out.println( "worked. " + i );
    return i;
  }

Threads.sleepSafe() 是 Thread.sleep() 的简单包装器,它吞下异常并且在传递 0 时不执行任何操作。

当测试方法为运行时,结果是这样的:

sequential start.
working... 0
worked. 0
sequential done.

parallel start.
working... 1
working... 0
worked. 0
sleeping for 1000 ms ...
slept for 1000 ms.
worked. 1
parallel done.

sequential() 按我的预期运行,但 parallel() 不: 我希望 parallel() 中的 findAny() 到 return 尽快 work() returns 第一次(即值 0,因为它不睡觉) ,但它只在 work() 之后 returns 也完成了值 1.

为什么?

有没有办法让第一次 findAny() return 尽快 work() return 秒?

如果你想要一个并行流,那么是的,它会同时调用work方法几次。

请注意,如果您的并行流有 1,000 个元素并使用 5 个线程,那么 work 将被并行流最多调用 5 次,而不是 1,000 次。

如果您只想调用 work 一次,请使用顺序流。

并行模式下的

Stream API 基于 ForkJoinPool 范例,默认使用最大 X 个线程(其中 X 等于可用处理器的数量)。如果你要增加迭代次数,你可以检查这个规则。

通常,可以通过两种方式自定义并行流的默认线程池数量:

  • 提交并行流执行到自己的ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething))
  • 使用系统属性更改公共池的大小:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") 目标并行度为 20 个线程。

Is there a way to make findAny() return as soon as work() returns for the first time?

根据ForkJoin算法的思路,基本上答案是。它 "waits" 而所有线程都将完成它们的工作。但如前所述,您可以将工人的数量限制为单个工人。显然它没有场景,因为这种方法类似于顺序执行,但冗余操作会增加额外的开销。

并行流仍然支持短路,但如果所有线程都推迟其工作直到处理先前元素的线程确认操作尚未结束,则使用并行流没有任何优势。

因此,只要最终结果正确组装(即丢弃多余的元素),并行流处理超出必要数量的未指定数量的元素就是预期的行为。

只是你的例子,只包含两个元素,只是多处理了一个元素可以解释为“所有元素都被处理”。

当元素数量较少时,并行处理通常没有什么好处 and/or 实际操作是找到可预测地位于流的第一个元素中的东西。如果你做一些像

这样的事情,事情会变得更有趣
IntStream.range(0, 2000).parallel()
    .map(i -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); return i;})
    .filter(i->i%397==396)
    .findAny();

请注意,在 return 最终结果之前,终端操作将等待所有工作线程完成,因此当找到结果时已经开始对元素求值时,该元素的处理将完成。这是设计使然。它确保当您的应用程序代码在流操作之后继续运行时,不会并发访问源集合或您的 lambda 表达式访问的其他数据。

the package documentation比较:

In almost all cases, terminal operations are eager, completing their traversal of the data source and processing of the pipeline before returning. Only the terminal operations iterator() and spliterator() are not; …

因此短路并行流不会处理所有元素,但当其他工作线程仍在处理过时元素时,可能仍需要更长的时间才能return 已评估的结果。

如果您想要早期 return,接受可能仍然 运行 的后台线程,Stream API 不适合您。考虑

private int work(int i) throws InterruptedException {
    System.out.println( "working... " + i );
    Thread.sleep(i * 1000);
    System.out.println( "worked. " + i );
    return i;
}
public void parallel() throws Exception {
    System.out.println( "parallel start." );
    List<Callable<Integer>> jobs = IntStream.range(0, 100)
      .collect(ArrayList::new, (l,i) -> l.add(() -> work(i)), List::addAll);
    ExecutorService pool = Executors.newFixedThreadPool(10);
    Integer result = pool.invokeAny(jobs);
    pool.shutdown();
    System.out.println( "parallel done, result="+result );
}

请注意,这不仅是在第一个作业完成后 return 秒,它还支持通过中断取消所有已经 运行 的作业。