并行流与串行流

Parallel stream vs serial stream

在 Java 8 中,并行流是否可能给出与串行流不同的结果?据我了解,并行流与串行流相同,只是分为多个子流。这是速度的问题。对元素的所有操作都已完成,子流的结果在最后合并。最后,在我看来,并行流和串行流的操作结果应该是相同的。所以我的问题是,这段代码是否可能给我不同的结果?如果可能,为什么会发生?

int[] i = {1, 2, 5, 10, 9, 7, 25, 24, 26, 34, 21, 23, 23, 25, 27, 852, 654, 25, 58};
Double serial = Arrays.stream(i).filter(si -> {
    return si > 5;
}).mapToDouble(Double::new).map(NewClass::add).reduce(Math::atan2).getAsDouble();

Double parallel = Arrays.stream(i).filter(si -> {
    return si > 5;
}).parallel().mapToDouble(Double::new).map(NewClass::add).reduce(Math::atan2).getAsDouble();

System.out.println("serial: " + serial);
System.out.println("parallel: " + parallel);

public static double add(double i) {
    return i + 0.005;
}

结果是:

serial: 3.6971567726175894E-23

parallel: 0.779264049587662

如果元素以不同的顺序给出,您的 reduce 方法会产生不同的结果。

因此,如果您使用并行流,则无法保证原始顺序。

如果您使用不同的缩减方法(例如 (x,y) -> x+y),效果会很好。

当您对并行流使用 reduce 时,操作未按特定顺序完成。

因此,如果您希望并行流产生可预测的结果,无论事情以何种顺序完成,您的 reduce 操作都必须有相同的答案。

例如,使用加法进行归约是有意义的,因为加法是结合的。不管你做哪一个,在这两种情况下答案都是 6

(1 + 2) + 3
1 + (2 + 3)

atan2 不是关联的。

Math.atan2(Math.atan2(1, 2), 3) == 0.15333604941031637

Math.atan2(1, Math.atan2(2, 3)) == 1.0392451500584097

reduce() 的 java 文档说:

Performs a reduction on the elements of this stream, using an associative accumulation function, [...] The accumulator function must be an associative function.

单词 "associative" 链接到此 java 文档:

An operator or function op is associative if the following holds:

 (a op b) op c == a op (b op c)

The importance of this to parallel evaluation can be seen if we expand this to four terms:

 a op b op c op d == (a op b) op (c op d)

So we can evaluate (a op b) in parallel with (c op d), and then invoke op on the results.

Examples of associative operations include numeric addition, min, and max, and string concatenation.

正如@PaulBoddington 在评论中提到的,atan2 不是关联的,因此对于归约操作无效。


无关

您的直播顺序有点不对。你应该过滤after并行操作,lambda可以缩短,你不应该装箱double:

double parallel = Arrays.stream(i)
                        .parallel()           // <-- before filter
                        .filter(si -> si > 5) // <-- shorter
                        .asDoubleStream()     // <-- not boxing
                        .reduce(Math::atan2)
                        .getAsDouble();