Java8 并行流需要时间来求和值

Java8 Parallel Stream taking time to sum values

我正在练习 java8 并行流部分并编写一个程序,将作为参数传递的数字从 0 加到该数字。

例如,如果我传递了 10,它将对从 1 到 10 的数字求和并 return 输出。

下面是程序

public class ParellelStreamExample {



    public static void main(String[] args) {
        System.out.println("Long Range value - "+ Long.MIN_VALUE + " to "+ Long.MAX_VALUE);
        long startTime = System.nanoTime();
        long sum = sequentailSum(100000000);
        System.out.println(
                "Time in sequential execution " + (System.nanoTime() - startTime) / 1000000 + " msec with sum = " + sum);
        long startTime1 = System.nanoTime();
        long sum1 = parellelSum(100000000);
        System.out.println("Time in parallel execution " + (System.nanoTime() - startTime1) / 1000000
                + " msec with sum = " + sum1);

    }

    private static Long parellelSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    }

    private static Long sequentailSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).reduce(0L, Long::sum);
    }
}

我收到的输出是

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 1741 msec with sum = 5000000050000000

Exception in thread "main" java.lang.OutOfMemoryError
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.SliceOps.opEvaluateParallelLazy(SliceOps.java:155)
    at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.parellelSum(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.main(ParellelStreamExample.java:14)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Long.valueOf(Long.java:840)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.lambda[=12=](ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample$$Lambda/250421012.apply(Unknown Source)
    at java.util.stream.Stream.next(Stream.java:1033)
    at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
    at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

为什么这个程序没有 运行 在并行部分和 gc 开销发生, 相反,它应该 运行 在并行部分更快,因为它使用 fork/join 框架并通过内部线程进行处理。

哪里出了问题?

这里有几处出错了。

  1. 您正在尝试使用 System.nanoTime() 而不是 JMH 之类的东西来对代码进行基准测试。
  2. 您正在尝试在 Long 上并行化一个简单的计算 (sum),而不是使用 LongStream。如果 JVM 不能摆脱装箱,指针追踪的开销很容易压倒并行的好处。
  3. 您正在尝试并行化由 iterate 生成的固有顺序流。 Stream 框架将尝试通过缓冲流并将其分派给多个线程来完成您的要求,这会增加很多开销。
  4. 您正在对有序并行流使用 limit。这需要流框架进行大量额外的同步,以确保恰好 n 个第一个元素用于产生结果。您会看到,如果将 .unordered() 放入并行流中,执行时间将显着减少,但结果将是不确定的,因为您将得到 some n 个元素而不是必须 first n 个元素。

正确的做法是使用 JMH 并将 iterate(...).limit(...) 替换为 LongStream.rangeClosed(1, n)

我明确表示不讨论基准缺陷 (;))。这里的主要问题似乎是对使用特定 Stream 函数及其行为的理解。

试试这样的:

LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum)

但公平地说,顺序的也应该进行调整:

LongStream.rangeClosed(1, n).reduce(0L, Long::sum)

现在我得到了这个运行时行为:

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 90 msec with sum = 5000000050000000
Time in parallel execution 25 msec with sum = 5000000050000000

我想,这就是您所期望的。

与其他所有方法一样 API,您必须了解具体方法的作用,尤其是如果您想并行进行。但正如您所见,即使是顺序处理也充分利用了这种不同的方法。

查看 https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps 以了解方法类型。

例如的使用限制:

Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism.