如何短路 Stream 上的 reduce() 操作?
How to short-circuit a reduce() operation on a Stream?
这与 How to short-circuit reduce on Stream? 本质上是同一个问题。但是,由于该问题侧重于布尔值流,并且其答案不能推广到其他类型和减少操作,所以我想问更一般的问题。
我们如何对流进行 reduce,以便它在遇到 absorbing element 进行归约操作时短路?
典型的数学情况是乘法为 0。这个 Stream
:
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
.reduce(1, (a, b) -> a * b);
将消耗最后两个元素(7
和 8
),而不管一旦遇到 0
产品就已知的事实。
不幸的是,Stream API 创建您自己的短路操作的能力有限。不太干净的解决方案是抛出 RuntimeException
并抓住它。这是 IntStream
的实现,但它也可以推广到其他流类型:
public static int reduceWithCancelEx(IntStream stream, int identity,
IntBinaryOperator combiner, IntPredicate cancelCondition) {
class CancelException extends RuntimeException {
private final int val;
CancelException(int val) {
this.val = val;
}
}
try {
return stream.reduce(identity, (a, b) -> {
int res = combiner.applyAsInt(a, b);
if(cancelCondition.test(res))
throw new CancelException(res);
return res;
});
} catch (CancelException e) {
return e.val;
}
}
用法示例:
int product = reduceWithCancelEx(
IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),
1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
输出:
2
3
4
5
0
Result: 0
请注意,即使它适用于并行流,也不能保证其他并行任务在其中一个抛出异常时立即完成。已经开始的子任务可能会 运行 直到完成,因此您可能会处理比预期更多的元素。
更新:替代解决方案,它更长,但更适合并行。它基于自定义拆分器,其中 returns 至多一个元素是所有基础元素累积的结果)。当您以顺序模式使用它时,它会在单个 tryAdvance
调用中完成所有工作。当你拆分它时,每个部分都会生成相应的单个部分结果,这些结果由 Stream 引擎使用 combiner 函数进行缩减。这是通用版本,但原始特化也是可能的。
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;
CancellableReduceSpliterator(Spliterator<T> source, A identity,
BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}
@Override
public boolean tryAdvance(Consumer<? super A> action) {
if (source == null || cancelled.get()) {
source = null;
return false;
}
while (!cancelled.get() && source.tryAdvance(this)) {
if (cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}
@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}
@Override
public Spliterator<A> trySplit() {
if(source == null || cancelled.get()) {
source = null;
return null;
}
Spliterator<T> prefix = source.trySplit();
if (prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result =
(CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
@Override
public long estimateSize() {
// let's pretend we have the same number of elements
// as the source, so the pipeline engine parallelize it in the same way
return source == null ? 0 : source.estimateSize();
}
@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}
@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}
类似于 Stream.reduce(identity, accumulator, combiner)
and Stream.reduce(identity, combiner)
的方法,但具有 cancelPredicate
:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
Predicate<U> cancelPredicate) {
return StreamSupport
.stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
.orElse(identity);
}
public static <T> T reduceWithCancel(Stream<T> stream, T identity,
BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
让我们测试这两个版本并计算实际处理了多少元素。让 0
接近尾声。异常版本:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()), 1,
(a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281721
product: 0/count: 500001
所以当只处理了一些元素时返回结果时,任务继续在后台工作并且计数器仍在增加。这是拆分器版本:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()).boxed(),
1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281353
product: 0/count: 281353
当返回结果时,所有的任务实际上都完成了。
可以使用流的拆分器来实现通用的短路静态reduce方法。它甚至被证明不是很复杂!当人们想以更灵活的方式使用流时,使用拆分器似乎是很多时候的方法。
public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) {
BoxConsumer<T> box = new BoxConsumer<T>();
Spliterator<T> splitr = s.spliterator();
while (!cancelPred.test(acc) && splitr.tryAdvance(box)) {
acc = op.apply(acc, box.value);
}
return acc;
}
public static class BoxConsumer<T> implements Consumer<T> {
T value = null;
public void accept(T t) {
value = t;
}
}
用法:
int product = reduceWithCancel(
Stream.of(1, 2, 0, 3, 4).peek(System.out::println),
1, (acc, i) -> acc * i, i -> i == 0);
System.out.println("Result: " + product);
输出:
1
2
0
Result: 0
可以推广该方法以执行其他类型的终端操作。
这大致基于 this answer 关于 take-while 操作。
我对它的并行化潜力一无所知。
我自己的看法是不使用 reduce()
本身,而是使用现有的短路最终操作。
noneMatch() 或 allMatch() 可在使用具有副作用的谓词时用于此目的。不可否认,这也不是最干净的解决方案,但它确实实现了目标:
AtomicInteger product = new AtomicInteger(1);
IntStream.of(2, 3, 4, 5, 0, 7, 8)
.peek(System.out::println)
.noneMatch(i -> {
if (i == 0) {
product.set(0);
return true;
}
int oldValue = product.get();
while (oldValue != 0 && !product.compareAndSet(oldValue, i * oldValue)) {
oldValue = product.get();
}
return oldValue == 0;
});
System.out.println("Result: " + product.get());
短路可以并联
引入后是这样的 takeWhile
因为 Java 9
int[] last = {1};
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
.takeWhile(i -> last[0] != 0).reduce(1, (a, b) -> (last[0] = a) * b);
这与 How to short-circuit reduce on Stream? 本质上是同一个问题。但是,由于该问题侧重于布尔值流,并且其答案不能推广到其他类型和减少操作,所以我想问更一般的问题。
我们如何对流进行 reduce,以便它在遇到 absorbing element 进行归约操作时短路?
典型的数学情况是乘法为 0。这个 Stream
:
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
.reduce(1, (a, b) -> a * b);
将消耗最后两个元素(7
和 8
),而不管一旦遇到 0
产品就已知的事实。
不幸的是,Stream API 创建您自己的短路操作的能力有限。不太干净的解决方案是抛出 RuntimeException
并抓住它。这是 IntStream
的实现,但它也可以推广到其他流类型:
public static int reduceWithCancelEx(IntStream stream, int identity,
IntBinaryOperator combiner, IntPredicate cancelCondition) {
class CancelException extends RuntimeException {
private final int val;
CancelException(int val) {
this.val = val;
}
}
try {
return stream.reduce(identity, (a, b) -> {
int res = combiner.applyAsInt(a, b);
if(cancelCondition.test(res))
throw new CancelException(res);
return res;
});
} catch (CancelException e) {
return e.val;
}
}
用法示例:
int product = reduceWithCancelEx(
IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),
1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
输出:
2
3
4
5
0
Result: 0
请注意,即使它适用于并行流,也不能保证其他并行任务在其中一个抛出异常时立即完成。已经开始的子任务可能会 运行 直到完成,因此您可能会处理比预期更多的元素。
更新:替代解决方案,它更长,但更适合并行。它基于自定义拆分器,其中 returns 至多一个元素是所有基础元素累积的结果)。当您以顺序模式使用它时,它会在单个 tryAdvance
调用中完成所有工作。当你拆分它时,每个部分都会生成相应的单个部分结果,这些结果由 Stream 引擎使用 combiner 函数进行缩减。这是通用版本,但原始特化也是可能的。
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;
CancellableReduceSpliterator(Spliterator<T> source, A identity,
BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}
@Override
public boolean tryAdvance(Consumer<? super A> action) {
if (source == null || cancelled.get()) {
source = null;
return false;
}
while (!cancelled.get() && source.tryAdvance(this)) {
if (cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}
@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}
@Override
public Spliterator<A> trySplit() {
if(source == null || cancelled.get()) {
source = null;
return null;
}
Spliterator<T> prefix = source.trySplit();
if (prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result =
(CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
@Override
public long estimateSize() {
// let's pretend we have the same number of elements
// as the source, so the pipeline engine parallelize it in the same way
return source == null ? 0 : source.estimateSize();
}
@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}
@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}
类似于 Stream.reduce(identity, accumulator, combiner)
and Stream.reduce(identity, combiner)
的方法,但具有 cancelPredicate
:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
Predicate<U> cancelPredicate) {
return StreamSupport
.stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
.orElse(identity);
}
public static <T> T reduceWithCancel(Stream<T> stream, T identity,
BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
让我们测试这两个版本并计算实际处理了多少元素。让 0
接近尾声。异常版本:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()), 1,
(a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281721
product: 0/count: 500001
所以当只处理了一些元素时返回结果时,任务继续在后台工作并且计数器仍在增加。这是拆分器版本:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()).boxed(),
1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281353
product: 0/count: 281353
当返回结果时,所有的任务实际上都完成了。
可以使用流的拆分器来实现通用的短路静态reduce方法。它甚至被证明不是很复杂!当人们想以更灵活的方式使用流时,使用拆分器似乎是很多时候的方法。
public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) {
BoxConsumer<T> box = new BoxConsumer<T>();
Spliterator<T> splitr = s.spliterator();
while (!cancelPred.test(acc) && splitr.tryAdvance(box)) {
acc = op.apply(acc, box.value);
}
return acc;
}
public static class BoxConsumer<T> implements Consumer<T> {
T value = null;
public void accept(T t) {
value = t;
}
}
用法:
int product = reduceWithCancel(
Stream.of(1, 2, 0, 3, 4).peek(System.out::println),
1, (acc, i) -> acc * i, i -> i == 0);
System.out.println("Result: " + product);
输出:
1
2
0
Result: 0
可以推广该方法以执行其他类型的终端操作。
这大致基于 this answer 关于 take-while 操作。
我对它的并行化潜力一无所知。
我自己的看法是不使用 reduce()
本身,而是使用现有的短路最终操作。
noneMatch() 或 allMatch() 可在使用具有副作用的谓词时用于此目的。不可否认,这也不是最干净的解决方案,但它确实实现了目标:
AtomicInteger product = new AtomicInteger(1);
IntStream.of(2, 3, 4, 5, 0, 7, 8)
.peek(System.out::println)
.noneMatch(i -> {
if (i == 0) {
product.set(0);
return true;
}
int oldValue = product.get();
while (oldValue != 0 && !product.compareAndSet(oldValue, i * oldValue)) {
oldValue = product.get();
}
return oldValue == 0;
});
System.out.println("Result: " + product.get());
短路可以并联
引入后是这样的 takeWhile
因为 Java 9
int[] last = {1};
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
.takeWhile(i -> last[0] != 0).reduce(1, (a, b) -> (last[0] = a) * b);