将值与 Java8 流合并

Combine values with Java8 stream

如果我有一个包含整数的列表,有没有办法构造另一个列表,如果新列表的头部的差异低于阈值,则对整数求和?我想使用 Java 8 个流来解决这个问题。它应该类似于 Scan operator 的 RxJava.

Example: 5, 2, 2, 5, 13    
Threashold: 2    
Result: 5, 9, 13

Intermediate results:    
5
5, 2
5, 4 (2 and 2 summed)
5, 9 (4 and 5 summed)
5, 9, 13

顺序流解决方案可能如下所示:

List<Integer> result = Stream.of(5, 2, 2, 5, 13).collect(ArrayList::new, (list, n) -> {
    if(!list.isEmpty() && Math.abs(list.get(list.size()-1)-n) < 2)
        list.set(list.size()-1, list.get(list.size()-1)+n);
    else
        list.add(n);
}, (l1, l2) -> {throw new UnsupportedOperationException();});
System.out.println(result);

虽然它看起来并没有比好的旧解决方案好多少:

List<Integer> input = Arrays.asList(5, 2, 2, 5, 13);
List<Integer> list = new ArrayList<>();
for(Integer n : input) {
    if(!list.isEmpty() && Math.abs(list.get(list.size()-1)-n) < 2)
        list.set(list.size()-1, list.get(list.size()-1)+n);
    else
        list.add(n);
}
System.out.println(list);

看来你的问题不是关联的,所以不容易并行化。例如,如果你像这样将输入分成两组 (5, 2), (2, 5, 13),你不能说第二组的前两项是否应该合并,直到第一组被处理。因此我无法指定正确的组合器功能。

Java 8 种方式是定义自定义 IntSpliterator class:

static class IntThreasholdSpliterator extends Spliterators.AbstractIntSpliterator {
    private PrimitiveIterator.OfInt it;
    private int threashold;
    private int sum;

    public IntThreasholdSpliterator(int threashold, IntStream stream, long est) {
        super(est, ORDERED );
        this.it = stream.iterator();
        this.threashold = threashold;
    }

    @Override
    public boolean tryAdvance(IntConsumer action) {
        if(!it.hasNext()){
            return false;
        }
        int next = it.nextInt();
        if(next<threashold){
            sum += next;
        }else {
            action.accept(next + sum);
            sum = 0;
        }
        return true;
    }

}

public static void main( String[] args )
{
    IntThreasholdSpliterator s = new IntThreasholdSpliterator(3, IntStream.of(5, 2, 2, 5, 13), 5);
    List<Integer> rs= StreamSupport.intStream(s, false).mapToObj(Integer::valueOf).collect(toList());
    System.out.println(rs);
}

您也可以将其破解为

    List<Integer> list = Arrays.asList(5, 2, 2, 5, 13);
    int[] sum = {0};
    list = list.stream().filter(s -> {
        if(s<=2) sum[0]+=s;
        return s>2;
    }).map(s -> {
        int rs = s + sum[0];
        sum[0] = 0;
        return rs;
    }).collect(toList());
    System.out.println(list);

但我不确定这种 hack 是否适合生产代码。

因为, (+1) 组合函数不是关联的,所以reduce() 不会起作用,并且不可能为Collector 编写组合函数.相反,这个组合函数需要从左到右应用,前一个部分结果被输入到下一个操作中。这称为 fold-left 操作,不幸的是 Java 流没有这样的操作。

(他们应该吗?让我知道。)

在捕获和改变对象以保持部分状态时,可以使用 forEachOrdered 编写自己的左折叠操作。首先,让我们将组合函数提取到它自己的方法中:

// extracted from Tagir Valeev's answer
void combine(List<Integer> list, int n) {
    if (!list.isEmpty() && Math.abs(list.get(list.size()-1)-n) < 2)
        list.set(list.size()-1, list.get(list.size()-1)+n);
    else
        list.add(n);
}

然后,创建初始结果列表并从内部调用组合函数 forEachOrdered:

List<Integer> result = new ArrayList<>();
IntStream.of(5, 2, 2, 5, 13)
         .forEachOrdered(n -> combine(result, n));

这给出了想要的结果

[5, 9, 13]

原则上这可以在并行流上完成,但考虑到 forEachOrdered 的语义,性能可能会降低到顺序。另请注意,forEachOrdered 操作是一次执行一个操作,因此我们不必担心我们正在改变的数据的线程安全性。

我知道 Stream 的大师 "Tagir Valeev" 和 "Stuart Marks" 已经指出 reduce() 不会工作,因为组合函数不是关联的,我在这里冒着投反对票的风险.不管怎样:

如果我们强制流是顺序的呢?那么我们不能使用reduce吗?关联性属性不是只有在使用并行时才需要吗?

    Stream<Integer> s = Stream.of(5, 2, 2, 5, 13);
    LinkedList<Integer> result =  s.sequential().reduce(new LinkedList<Integer>(), 
                 (list, el) -> { 
                     if (list.isEmpty() || Math.abs(list.getLast() - el) >= 2) {
                         list.add(el);
                     } else {
                         list.set(list.size() - 1, list.getLast() + el);
                     }
                     return list; 
                 }, (list1, list2) -> {
                         //don't really needed, as we are sequential
                         list1.addAll(list2); return list1;
                      });