Java Collector.combiner 总是接到供应商的电话
Java Collector.combiner getting called with supplier values always
问题:创建一个收集器实现,它将并行地乘以整数流和 return 长。
实施:
public class ParallelMultiplier implements Collector<Integer, Long, Long> {
@Override
public BiConsumer<Long, Integer> accumulator() {
// TODO Auto-generated method stub
return (operand1, operand2) -> {
System.out.println("Accumulating Values (Accumulator, Element): (" + operand1 + ", " + operand2 + ")");
long Lval = operand1.longValue();
int Ival = operand2.intValue();
Lval *= Ival;
operand1 = Long.valueOf(Lval);
System.out.println("Acc Updated : " + operand1);
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
// TODO Auto-generated method stub
return Collections.unmodifiableSet(EnumSet.of(UNORDERED));
}
@Override
public BinaryOperator<Long> combiner() {
return (operand1, operand2) -> {
System.out.println("Combining Values : (" + operand1 + ", " + operand2 + ")");
long Lval1 = operand1.longValue();
long Lval2 = operand2.longValue();
Lval1 *= Lval2;
return Long.valueOf(Lval1);
};
}
@Override
public Function<Long, Long> finisher() {
// TODO Auto-generated method stub
return Function.identity();
}
@Override
public Supplier<Long> supplier() {
return () -> new Long(1L);
}
}
观察到的输出:
Accumulating Values (Accumulator, Element): (1, 4)
Acc Updated : 4
Accumulating Values (Accumulator, Element): (1, 3)
Acc Updated : 3
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 8)
Accumulating Values (Accumulator, Element): (1, 6)
Accumulating Values (Accumulator, Element): (1, 2)
Acc Updated : 2
Acc Updated : 8
Accumulating Values (Accumulator, Element): (1, 5)
Accumulating Values (Accumulator, Element): (1, 1)
Acc Updated : 5
Acc Updated : 6
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 7)
Acc Updated : 7
Combining Values : (1, 1)
Combining Values : (1, 1)
Acc Updated : 1
Combining Values : (1, 1)
Combining Values : (1, 1)
Combining Values : (1, 1)
调用:
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
Collector<Integer, Long, Long> parallelMultiplier = new ParallelMultiplier();
result = intList.parallelStream().collect(parallelMultiplier);
即乘法结果为 1,本应为 8 的阶乘。我也没有使用 'concurrent' 特性。
理想情况下,我应该得到子流的乘法结果,并输入 combiner() 操作,但这似乎并没有发生在这里。
抛开 boxing/unboxing 的低效实现不谈,有没有我可能犯错的线索??
正如 Flown 所说,Java 的原始包装器类型是不可变的,不能用作累加器。因为您正在并行计算乘法,所以我们希望使用可变 Long
的线程安全实现,它是 AtomicLong
.
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;
public class ParallelMultiplier implements Collector<Integer, AtomicLong, Long> {
@Override
public BiConsumer<AtomicLong, Integer> accumulator() {
return (operand1, operand2) -> operand1.set(operand1.longValue() * operand2.longValue());
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
}
@Override
public BinaryOperator<AtomicLong> combiner() {
return (operand1, operand2) -> new AtomicLong(operand1.longValue() * operand2.longValue());
}
@Override
public Function<AtomicLong, Long> finisher() {
return l -> l.longValue();
}
@Override
public Supplier<AtomicLong> supplier() {
return () -> new AtomicLong(1);
}
}
根据您提供的内容进行测试,得出正确答案,8! = 40320
。
您的收藏家有点不对劲。这是一个简化版本(为什么你不起作用 - 见最后):
static class ParallelMultiplier implements Collector<Integer, long[], Long> {
@Override
public BiConsumer<long[], Integer> accumulator() {
return (left, right) -> left[0] *= right;
}
@Override
public BinaryOperator<long[]> combiner() {
return (left, right) -> {
left[0] = left[0] * right[0];
return left;
};
}
@Override
public Function<long[], Long> finisher() {
return arr -> arr[0];
}
@Override
public Supplier<long[]> supplier() {
return () -> new long[] { 1L };
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.noneOf(Characteristics.class));
}
}
你的问题可以这样举例:
static Long test(Long left, Long right) {
left = left * right;
return left;
}
long l = 12L;
long r = 13L;
test(l, r);
System.out.println(l); // still 12
问题:创建一个收集器实现,它将并行地乘以整数流和 return 长。
实施:
public class ParallelMultiplier implements Collector<Integer, Long, Long> {
@Override
public BiConsumer<Long, Integer> accumulator() {
// TODO Auto-generated method stub
return (operand1, operand2) -> {
System.out.println("Accumulating Values (Accumulator, Element): (" + operand1 + ", " + operand2 + ")");
long Lval = operand1.longValue();
int Ival = operand2.intValue();
Lval *= Ival;
operand1 = Long.valueOf(Lval);
System.out.println("Acc Updated : " + operand1);
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
// TODO Auto-generated method stub
return Collections.unmodifiableSet(EnumSet.of(UNORDERED));
}
@Override
public BinaryOperator<Long> combiner() {
return (operand1, operand2) -> {
System.out.println("Combining Values : (" + operand1 + ", " + operand2 + ")");
long Lval1 = operand1.longValue();
long Lval2 = operand2.longValue();
Lval1 *= Lval2;
return Long.valueOf(Lval1);
};
}
@Override
public Function<Long, Long> finisher() {
// TODO Auto-generated method stub
return Function.identity();
}
@Override
public Supplier<Long> supplier() {
return () -> new Long(1L);
}
}
观察到的输出:
Accumulating Values (Accumulator, Element): (1, 4)
Acc Updated : 4
Accumulating Values (Accumulator, Element): (1, 3)
Acc Updated : 3
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 8)
Accumulating Values (Accumulator, Element): (1, 6)
Accumulating Values (Accumulator, Element): (1, 2)
Acc Updated : 2
Acc Updated : 8
Accumulating Values (Accumulator, Element): (1, 5)
Accumulating Values (Accumulator, Element): (1, 1)
Acc Updated : 5
Acc Updated : 6
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 7)
Acc Updated : 7
Combining Values : (1, 1)
Combining Values : (1, 1)
Acc Updated : 1
Combining Values : (1, 1)
Combining Values : (1, 1)
Combining Values : (1, 1)
调用:
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
Collector<Integer, Long, Long> parallelMultiplier = new ParallelMultiplier();
result = intList.parallelStream().collect(parallelMultiplier);
即乘法结果为 1,本应为 8 的阶乘。我也没有使用 'concurrent' 特性。 理想情况下,我应该得到子流的乘法结果,并输入 combiner() 操作,但这似乎并没有发生在这里。
抛开 boxing/unboxing 的低效实现不谈,有没有我可能犯错的线索??
正如 Flown 所说,Java 的原始包装器类型是不可变的,不能用作累加器。因为您正在并行计算乘法,所以我们希望使用可变 Long
的线程安全实现,它是 AtomicLong
.
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;
public class ParallelMultiplier implements Collector<Integer, AtomicLong, Long> {
@Override
public BiConsumer<AtomicLong, Integer> accumulator() {
return (operand1, operand2) -> operand1.set(operand1.longValue() * operand2.longValue());
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
}
@Override
public BinaryOperator<AtomicLong> combiner() {
return (operand1, operand2) -> new AtomicLong(operand1.longValue() * operand2.longValue());
}
@Override
public Function<AtomicLong, Long> finisher() {
return l -> l.longValue();
}
@Override
public Supplier<AtomicLong> supplier() {
return () -> new AtomicLong(1);
}
}
根据您提供的内容进行测试,得出正确答案,8! = 40320
。
您的收藏家有点不对劲。这是一个简化版本(为什么你不起作用 - 见最后):
static class ParallelMultiplier implements Collector<Integer, long[], Long> {
@Override
public BiConsumer<long[], Integer> accumulator() {
return (left, right) -> left[0] *= right;
}
@Override
public BinaryOperator<long[]> combiner() {
return (left, right) -> {
left[0] = left[0] * right[0];
return left;
};
}
@Override
public Function<long[], Long> finisher() {
return arr -> arr[0];
}
@Override
public Supplier<long[]> supplier() {
return () -> new long[] { 1L };
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.noneOf(Characteristics.class));
}
}
你的问题可以这样举例:
static Long test(Long left, Long right) {
left = left * right;
return left;
}
long l = 12L;
long r = 13L;
test(l, r);
System.out.println(l); // still 12