Java Collector.combiner 总是接到供应商的电话

Java Collector.combiner getting called with supplier values always

问题:创建一个收集器实现,它将并行地乘以整数流和 return 长。

实施:

public class ParallelMultiplier implements Collector<Integer, Long, Long> {

    @Override
    public BiConsumer<Long, Integer> accumulator() {
        // TODO Auto-generated method stub
        return (operand1, operand2) -> {
            System.out.println("Accumulating Values (Accumulator, Element): (" + operand1 + ", " + operand2 + ")");
                                        long Lval = operand1.longValue(); 
                                        int Ival = operand2.intValue();
                                        Lval *= Ival;
                                        operand1 = Long.valueOf(Lval);
                                        System.out.println("Acc Updated : " + operand1);
                                        };
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        // TODO Auto-generated method stub
        return Collections.unmodifiableSet(EnumSet.of(UNORDERED));
    }

    @Override
    public BinaryOperator<Long> combiner() {        
        return (operand1, operand2) -> {
            System.out.println("Combining Values : (" + operand1 + ", " + operand2 + ")");
                                        long Lval1 = operand1.longValue(); 
                                        long Lval2 = operand2.longValue();
                                        Lval1 *= Lval2; 
                                        return Long.valueOf(Lval1);
                                        };
    }

    @Override
    public Function<Long, Long> finisher() {
        // TODO Auto-generated method stub
        return Function.identity();
    }

    @Override
    public Supplier<Long> supplier() {      
        return () -> new Long(1L);
    }

}

观察到的输出:

Accumulating Values (Accumulator, Element): (1, 4)
Acc Updated : 4
Accumulating Values (Accumulator, Element): (1, 3)
Acc Updated : 3
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 8)
Accumulating Values (Accumulator, Element): (1, 6)
Accumulating Values (Accumulator, Element): (1, 2)
Acc Updated : 2
Acc Updated : 8
Accumulating Values (Accumulator, Element): (1, 5)
Accumulating Values (Accumulator, Element): (1, 1)
Acc Updated : 5
Acc Updated : 6
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 7)
Acc Updated : 7
Combining Values : (1, 1)
Combining Values : (1, 1)
Acc Updated : 1
Combining Values : (1, 1)
Combining Values : (1, 1)
Combining Values : (1, 1)

调用:

List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        Collector<Integer, Long, Long> parallelMultiplier = new ParallelMultiplier();

        result = intList.parallelStream().collect(parallelMultiplier);

即乘法结果为 1,本应为 8 的阶乘。我也没有使用 'concurrent' 特性。 理想情况下,我应该得到子流的乘法结果,并输入 combiner() 操作,但这似乎并没有发生在这里。

抛开 boxing/unboxing 的低效实现不谈,有没有我可能犯错的线索??

正如 Flown 所说,Java 的原始包装器类型是不可变的,不能用作累加器。因为您正在并行计算乘法,所以我们希望使用可变 Long 的线程安全实现,它是 AtomicLong.

import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;

public class ParallelMultiplier implements Collector<Integer, AtomicLong, Long> {

    @Override
    public BiConsumer<AtomicLong, Integer> accumulator() {
        return (operand1, operand2) -> operand1.set(operand1.longValue() * operand2.longValue());
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
    }

    @Override
    public BinaryOperator<AtomicLong> combiner() {        
        return (operand1, operand2) -> new AtomicLong(operand1.longValue() * operand2.longValue());
    }

    @Override
    public Function<AtomicLong, Long> finisher() {
        return l -> l.longValue();
    }

    @Override
    public Supplier<AtomicLong> supplier() {      
        return () -> new AtomicLong(1);
    }

}

根据您提供的内容进行测试,得出正确答案,8! = 40320

您的收藏家有点不对劲。这是一个简化版本(为什么你不起作用 - 见最后):

 static class ParallelMultiplier implements Collector<Integer, long[], Long> {

    @Override
    public BiConsumer<long[], Integer> accumulator() {

        return (left, right) -> left[0] *= right;
    }

    @Override
    public BinaryOperator<long[]> combiner() {
        return (left, right) -> {
            left[0] = left[0] * right[0];
            return left;
        };
    }

    @Override
    public Function<long[], Long> finisher() {
        return arr -> arr[0];
    }

    @Override
    public Supplier<long[]> supplier() {
        return () -> new long[] { 1L };
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.noneOf(Characteristics.class));
    }
}

你的问题可以这样举例:

static Long test(Long left, Long right) {
    left = left * right;
    return left;
}

long l = 12L;
long r = 13L;

test(l, r);
System.out.println(l); // still 12