使用大型并行 Java 8 流时如何防止堆 space 错误

How to prevent heap space error when using large parallel Java 8 stream

如何有效地并行计算 pi(仅作为示例)?

这有效(在我的机器上大约需要 15 秒):

Stream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).mapToDouble(d->4.0d/d).sum()

但以下所有并行变体 运行 都变成了 OutOfMemoryError

DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).parallel().map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).map(d->4.0d/d).parallel().sum();

那么,我需要做什么才能并行处理这个(大)流? 我已经检查过自动装箱是否导致内存消耗,但事实并非如此。这也适用:

DoubleStream.iterate(1, d->-(d+Math.abs(2*d)/d)).boxed().limit(999999999L).mapToDouble(d->4/d).sum()

这是因为 parallel() 方法使用的默认 ForkJoinPool 实现不限制创建的线程数。解决方案是提供一个 ForkJoinPool 的自定义实现,该实现受限于它并行执行的线程数。这可以通过以下方式实现:

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());

问题是您正在使用难以并行化的结构。

首先,Stream.iterate(…) 创建一个数字序列,其中每个计算都依赖于先前的值,因此,它没有为并行计算提供空间。更糟糕的是,它创建了一个无限流,它将像大小未知的流一样被实现处理。为了拆分流,必须先将值收集到数组中,然后才能将它们移交给其他计算线程。

其次,提供 limit(…) 并不能改善这种情况,. Applying a limit removes the size information which the implementation just had gathered for the array fragments. The reason is that the stream is ordered, thus a thread processing an array fragment doesn’t know whether it can process all elements as that depends on the information how many previous elements other threads are processing. This is documented:

“… it can be quite expensive on ordered parallel pipelines, especially for large values of maxSize, since limit(n) is constrained to return not just any n elements, but the first n elements in the encounter order.”

这很遗憾,因为我们完全知道 iterate 返回的无限序列与 limit(…) 的组合实际上具有完全已知的大小。但是实现不知道。 API 并没有提供一种方法来创建两者的有效组合。但是我们可以自己做:

static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
  return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
     Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
       long remaining=limit;
       double value=seed;
       public boolean tryAdvance(DoubleConsumer action) {
           if(remaining==0) return false;
           double d=value;
           if(--remaining>0) value=f.applyAsDouble(d);
           action.accept(d);
           return true;
       }
   }, false);
}

一旦我们有了这样一个迭代限制方法,我们就可以像

一样使用它
iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()

由于源的顺序性质,这仍然没有从并行执行中获益太多,但它有效。在我的四核机器上,它获得了大约 20% 的收益。