使用大型并行 Java 8 流时如何防止堆 space 错误
How to prevent heap space error when using large parallel Java 8 stream
如何有效地并行计算 pi(仅作为示例)?
这有效(在我的机器上大约需要 15 秒):
Stream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).mapToDouble(d->4.0d/d).sum()
但以下所有并行变体 运行 都变成了 OutOfMemoryError
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).parallel().map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).map(d->4.0d/d).parallel().sum();
那么,我需要做什么才能并行处理这个(大)流?
我已经检查过自动装箱是否导致内存消耗,但事实并非如此。这也适用:
DoubleStream.iterate(1, d->-(d+Math.abs(2*d)/d)).boxed().limit(999999999L).mapToDouble(d->4/d).sum()
这是因为 parallel()
方法使用的默认 ForkJoinPool
实现不限制创建的线程数。解决方案是提供一个 ForkJoinPool
的自定义实现,该实现受限于它并行执行的线程数。这可以通过以下方式实现:
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());
问题是您正在使用难以并行化的结构。
首先,Stream.iterate(…)
创建一个数字序列,其中每个计算都依赖于先前的值,因此,它没有为并行计算提供空间。更糟糕的是,它创建了一个无限流,它将像大小未知的流一样被实现处理。为了拆分流,必须先将值收集到数组中,然后才能将它们移交给其他计算线程。
其次,提供 limit(…)
并不能改善这种情况,. Applying a limit removes the size information which the implementation just had gathered for the array fragments. The reason is that the stream is ordered, thus a thread processing an array fragment doesn’t know whether it can process all elements as that depends on the information how many previous elements other threads are processing. This is documented:
“… it can be quite expensive on ordered parallel pipelines, especially for large values of maxSize
, since limit(n)
is constrained to return not just any n elements, but the first n elements in the encounter order.”
这很遗憾,因为我们完全知道 iterate
返回的无限序列与 limit(…)
的组合实际上具有完全已知的大小。但是实现不知道。 API 并没有提供一种方法来创建两者的有效组合。但是我们可以自己做:
static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
long remaining=limit;
double value=seed;
public boolean tryAdvance(DoubleConsumer action) {
if(remaining==0) return false;
double d=value;
if(--remaining>0) value=f.applyAsDouble(d);
action.accept(d);
return true;
}
}, false);
}
一旦我们有了这样一个迭代限制方法,我们就可以像
一样使用它
iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()
由于源的顺序性质,这仍然没有从并行执行中获益太多,但它有效。在我的四核机器上,它获得了大约 20% 的收益。
如何有效地并行计算 pi(仅作为示例)?
这有效(在我的机器上大约需要 15 秒):
Stream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).mapToDouble(d->4.0d/d).sum()
但以下所有并行变体 运行 都变成了 OutOfMemoryError
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).parallel().map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).map(d->4.0d/d).parallel().sum();
那么,我需要做什么才能并行处理这个(大)流? 我已经检查过自动装箱是否导致内存消耗,但事实并非如此。这也适用:
DoubleStream.iterate(1, d->-(d+Math.abs(2*d)/d)).boxed().limit(999999999L).mapToDouble(d->4/d).sum()
这是因为 parallel()
方法使用的默认 ForkJoinPool
实现不限制创建的线程数。解决方案是提供一个 ForkJoinPool
的自定义实现,该实现受限于它并行执行的线程数。这可以通过以下方式实现:
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());
问题是您正在使用难以并行化的结构。
首先,Stream.iterate(…)
创建一个数字序列,其中每个计算都依赖于先前的值,因此,它没有为并行计算提供空间。更糟糕的是,它创建了一个无限流,它将像大小未知的流一样被实现处理。为了拆分流,必须先将值收集到数组中,然后才能将它们移交给其他计算线程。
其次,提供 limit(…)
并不能改善这种情况,
“… it can be quite expensive on ordered parallel pipelines, especially for large values of
maxSize
, sincelimit(n)
is constrained to return not just any n elements, but the first n elements in the encounter order.”
这很遗憾,因为我们完全知道 iterate
返回的无限序列与 limit(…)
的组合实际上具有完全已知的大小。但是实现不知道。 API 并没有提供一种方法来创建两者的有效组合。但是我们可以自己做:
static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
long remaining=limit;
double value=seed;
public boolean tryAdvance(DoubleConsumer action) {
if(remaining==0) return false;
double d=value;
if(--remaining>0) value=f.applyAsDouble(d);
action.accept(d);
return true;
}
}, false);
}
一旦我们有了这样一个迭代限制方法,我们就可以像
一样使用它iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()
由于源的顺序性质,这仍然没有从并行执行中获益太多,但它有效。在我的四核机器上,它获得了大约 20% 的收益。