Stream reduce() 要求到底包含什么?

What do the Stream reduce() requirements exactly entail?

在并行流上使用 reduce() 操作时,the OCP exam book 指出 reduce() 参数必须遵守某些原则。这些原则如下:

  1. The identity must be defined such that for all elements in the stream u, combiner.apply(identity, u) is equal to u.
  2. The accumulator operator op must be associative and stateless such that (a op b) op c is equal to a op (b op c).
  3. The combiner operator must also be associative and stateless and compatible with the identity, such that for all of u and t combiner.apply(u, accumulator.apply(identity, t)) is equal to accumulator.apply(u,t) .

书中给出了两个例子来说明这些原理,请看下面的代码:

关联示例:

System.out.println(
        Arrays.asList(1, 2, 3, 4, 5, 6)
                .parallelStream()
                .reduce(0, (a, b) -> (a - b)));

这本书是怎么说的:

It may output -21, 3, or some other value as the accumulator function violates the associativity property.

身份要求示例:

System.out.println(
        Arrays.asList("w", "o", "l", "f")
                .parallelStream()
                .reduce("X", String::concat));

这本书是怎么说的:

You can see other problems if we use an identity parameter that is not truly an identity value. It can output XwXoXlXf. As part of the parallel process, the identity is applied to multiple elements in the stream, resulting in very unexpected data.

我不明白那些例子。在累加器示例中,累加器从 0 - 1 开始,即 -1,然后是 -1 - 2,即 -3,然后是 -6,一直到 -21.我明白,因为生成的 arraylist 不同步,结果可能由于竞争条件等的可能性而无法预测,但为什么累加器不是关联的? (a+b) 不会造成不可预测的结果吗?我真的不明白示例中使用的累加器有什么问题以及为什么它不是关联的,但话又说回来我仍然不完全理解 "associative principle" 是什么意思。

我也不明白身份示例。我知道如果 4 个单独的线程同时开始累积身份,结果确实可能是 XwXoXlXf,但这与身份参数本身有什么关系?到底什么才是合适的身份?

我想知道是否有人可以在这些原则上更多地启发我。

谢谢

why isn't the accumulator associative?

它不是关联的,因为减法运算的顺序决定了最终结果。

如果您 运行 连载 Stream,您将得到以下预期结果:

0 - 1 - 2 - 3 - 4 - 5 - 6 = -21

另一方面,对于并行 Streams,工作被拆分到多个线程。比如reduce在6个线程上并行执行,然后合并中间结果,可以得到不同的结果:

0 - 1   0 - 2   0 - 3      0 - 4     0 - 5    0 - 6
  -1     -2      -3         -4        -5        -6

  -1 - (-2)         -3 - (-4)          -5 - (-6)
      1                 1                  1
           1   -   1
               0            -     1

                        -1

或者,为了简化一个长示例:

(1 - 2) - 3 = -4
1 - (2 - 3) =  2

因此减法不具有结合性。

另一方面,a+b 不会导致同样的问题,因为加法是结合运算符(即 (a+b)+c == a+(b+c))。

identity 示例的问题在于,当 reduce 在多个线程上并行执行时,"X" 会附加到每个中间结果的开头。

What exactly would be a proper identity to use then?

如果将标识值更改为 "" :

System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("", String::concat));

你会得到 "wolf" 而不是 "XwXoXlXf"。

我举两个例子。首先身份被破坏的地方:

int result = Stream.of(1, 2, 3, 4, 5, 6)
        .parallel()
        .reduce(10, (a, b) -> a + b);

System.out.println(result); // 81 on my run

基本上你已经违反了这条规则:The identity value must be an identity for the accumulator function.  This means that for all u, accumulator(identity, u) is equal to u.

或者为了更简单,让我们看看该规则是否适用于我们流中的一些随机数据:

 Integer identity = 10;
 BinaryOperator<Integer> combiner = (x, y) -> x + y;
 boolean identityRespected = combiner.apply(identity, 1) == 1;
 System.out.println(identityRespected); // prints false

还有第二个例子:

/**
 * count letters, adding a bit more all the time
 */
private static int howMany(List<String> tokens) {
    return tokens.stream()
            .parallel()
            .reduce(0, // identity
                    (i, s) -> { // accumulator
                        return s.length() + i;
                    }, (left, right) -> { // combiner
                        return left + right + left; // notice the extra left here
                    });
}

然后你调用它:

List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");

System.out.println(howMany(left));  // 38 on my run
System.out.println(howMany(right)); // 50 on my run

基本上你已经违反了这个规则:Additionally, the combiner function must be compatible with the accumulator function 或者在代码中:

// this must hold!
// combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

Integer identity = 0;
String t = "aa";
Integer u = 3; // "bbb"
BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
BinaryOperator<Integer> combiner = (left, right) -> left + right + left;

int first = accumulator.apply(identity, t); // 2
int second = combiner.apply(u, first); // 3 + 2 + 3 = 8

Integer shouldBe8 = accumulator.apply(u, t);

System.out.println(shouldBe8 == second); // false

虽然问题已经被回答和接受,但我认为可以用更简单、更实用的方式回答。

如果您没有有效的 identity 和关联的 accumulator/combiner,reduce 操作的结果将取决于:

  1. Stream内容
  2. 处理Stream
  3. 的线程数

关联性

让我们尝试一个非关联的例子 accumulator/combiner(基本上,我们通过改变线程数来减少一个序列中 50 个数字的列表并并行):

System.out.println("sequential: reduce="+
    IntStream.rangeClosed(1, 50).boxed()
        .reduce(
            0, 
            (a,b)->a-b, 
            (a,b)->a-b));
for (int n=1; n<6; n++) {
    ForkJoinPool pool = new ForkJoinPool(n);
    final int finalN = n;
    try {
        pool.submit(()->{
            System.out.println(finalN+" threads : reduce="+
                IntStream.rangeClosed(1, 50).boxed()
                    .parallel()
                    .reduce(
                        0, 
                        (a,b)->a-b, 
                        (a,b)->a-b));
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }

这将显示以下结果(Oracle JDK 10.0.1):

sequential: reduce=-1275
1 threads : reduce=325
2 threads : reduce=-175
3 threads : reduce=-25
4 threads : reduce=75
5 threads : reduce=-25

这表明结果取决于reduce计算中涉及的线程数。

备注:

  • 有趣的是,一个线程的顺序化简和并行化简不会导致相同的结果。我找不到很好的解释。
  • 根据我的实验,相同的 Stream 内容和相同的线程数在多次 运行 时总是导致相同的减少值。我想这是因为并行流使用确定性 Spliterator.
  • 我不会使用 Boyarsky&Selikoff OCP8 书籍示例,因为流太小 (1,2,3,4,5,6) 并且(在我的机器上)为 [=18 生成相同的减少值 3 =] 1、2、3、4 或 5 个线程。
  • 并行流的默认线程数是可用的 CPU 核心数。这就是为什么您可能不会在每台机器上得到相同的减少结果。

身份

对于 identity,正如 E运行 在 "XwXoXlXf" 示例中所写的那样,有 4 个线程,每个线程将使用 identity 作为一种 String前缀。但请注意:虽然 OCP 书中建议 ""0 是有效的 identity,但它取决于 accumulator/combiner 函数。例如:

  • 0 是累加器 (a,b)->a+b 的有效 identity(因为 a+0=a
  • 1 是累加器 (a,b)->a*b 的有效 identity(因为 a*1=a,但 0 无效,因为 a*0=0!)

顺序流的归约如下所示:对流的每对元素顺序应用归约函数,期望在每一步接收一个与流中其他元素类型相同的元素。在下一步再次应用相同的 函数 ,依此类推。

a   b   c   d   e
│   │   │   │   │
└─┬─┘   │   │   │
 a+b    │   │   │   a+b=sum1
  │     │   │   │
  └──┬──┘   │   │
  sum1+c    │   │   sum1+c=sum2
     │      │   │
     └──┬───┘   │
     sum2+d     │   sum2+d=sum3
        │       │
        └──┬────┘
        sum3+e      sum3+e=total;

减少并行流具有相同的期望值,但不能保证下一步应捕获哪对元素(或它们在前面步骤中的总和)。因此,结果可能会有所不同。

a   b   c   d   e
│   │   │   │   │
└─┬─┘   └─┬─┘   │
 a+b     c+d    │   a+b=sum1   c+d=sum2

or:
    │   │   │   │
    └─┬─┘   └─┬─┘
     b+c     d+e    b+c=sum1   d+e=sum2

  │       │     │
  └───┬───┘     │
  sum1+sum2     │   sum..+sum..=sum..

or:

│     │   │     │
└──┬──┘   └──┬──┘
 a+sum1    sum2+e   sum..+sum..=sum..

另请参阅:Generate all possible string combinations by replacing the hidden “#” number sign