从并行流中收集结果
Collect results from parallel stream
我有一段代码是这样的:
List<Egg> eggs = hens.parallelStream().map(hen -> {
ArrayList<Egg> eggs = new ArrayList<>();
while (hen.hasEgg()) {
eggs.add(hen.getEgg());
}
return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());
但是这样我就必须为每只母鸡创建一个 ArrayList,并且直到一只母鸡被 100% 处理后才会收集鸡蛋。我想要这样的东西:
List<Egg> eggs = hens.parallelStream().map(hen -> {
while (hen.hasEgg()) {
yield return hen.getEgg();
}
}).collect(Collectors.toList());
但是Java没有产量return。有实现的方法吗?
假设存在一个getEggs()
方法,你可以使用下面的方法来收集所有的鸡蛋。
List<Egg> eggs = hens.parallelStream()
.filter(Hen::hasEggs)
.map(Hen::getEggs)
.collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
代码假定 getEggs()
returns 一个 Collection
。当 Hen
没有 Eggs
.
时,您可以消除 filter(Hen::hasEggs)
if getEggs()
returns 一个空的 Collection
您的 Hen
class 不适合 Stream API。如果你不能改变它并且它没有其他有用的方法(比如Collection<Egg> getAllEggs()
或Iterator<Egg> eggIterator()
),你可以像这样创建一个鸡蛋流:
public static Stream<Egg> eggs(Hen hen) {
Iterator<Egg> it = new Iterator<Egg>() {
@Override
public boolean hasNext() {
return hen.hasEgg();
}
@Override
public Egg next() {
return hen.getEgg();
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}
现在您可以通过以下方式使用它:
List<Egg> eggs = hens.parallelStream()
.flatMap(hen -> eggs(hen))
.collect(Collectors.toList());
当然,如果您可以更改 Hen
class.
,则可能会实现更好的 Stream
使用 hasEgg()
和 getEgg()
的迭代逻辑是 stateful 因为这些方法的结果取决于之前的调用。因此,除非您设法完全更改接口,否则无法并行处理单个 Hen
。
也就是说,您担心 ArrayList
是不必要的。当流实现并行执行 collect
操作时,无论如何它都必须缓冲每个线程的值,然后合并这些缓冲区。甚至可能是操作根本无法从并行执行中获益。
你可以做的是用 Stream.Builder
替换 ArrayList
因为它针对只添加直到构造 Stream
:
的用例进行了优化
List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
Stream.Builder<Egg> eggStream = Stream.builder();
while(hen.hasEgg()) {
eggStream.add(hen.getEgg());
}
return eggStream.build();
}).collect(Collectors.toList());
我有一段代码是这样的:
List<Egg> eggs = hens.parallelStream().map(hen -> {
ArrayList<Egg> eggs = new ArrayList<>();
while (hen.hasEgg()) {
eggs.add(hen.getEgg());
}
return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());
但是这样我就必须为每只母鸡创建一个 ArrayList,并且直到一只母鸡被 100% 处理后才会收集鸡蛋。我想要这样的东西:
List<Egg> eggs = hens.parallelStream().map(hen -> {
while (hen.hasEgg()) {
yield return hen.getEgg();
}
}).collect(Collectors.toList());
但是Java没有产量return。有实现的方法吗?
假设存在一个getEggs()
方法,你可以使用下面的方法来收集所有的鸡蛋。
List<Egg> eggs = hens.parallelStream()
.filter(Hen::hasEggs)
.map(Hen::getEggs)
.collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
代码假定 getEggs()
returns 一个 Collection
。当 Hen
没有 Eggs
.
filter(Hen::hasEggs)
if getEggs()
returns 一个空的 Collection
您的 Hen
class 不适合 Stream API。如果你不能改变它并且它没有其他有用的方法(比如Collection<Egg> getAllEggs()
或Iterator<Egg> eggIterator()
),你可以像这样创建一个鸡蛋流:
public static Stream<Egg> eggs(Hen hen) {
Iterator<Egg> it = new Iterator<Egg>() {
@Override
public boolean hasNext() {
return hen.hasEgg();
}
@Override
public Egg next() {
return hen.getEgg();
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}
现在您可以通过以下方式使用它:
List<Egg> eggs = hens.parallelStream()
.flatMap(hen -> eggs(hen))
.collect(Collectors.toList());
当然,如果您可以更改 Hen
class.
Stream
使用 hasEgg()
和 getEgg()
的迭代逻辑是 stateful 因为这些方法的结果取决于之前的调用。因此,除非您设法完全更改接口,否则无法并行处理单个 Hen
。
也就是说,您担心 ArrayList
是不必要的。当流实现并行执行 collect
操作时,无论如何它都必须缓冲每个线程的值,然后合并这些缓冲区。甚至可能是操作根本无法从并行执行中获益。
你可以做的是用 Stream.Builder
替换 ArrayList
因为它针对只添加直到构造 Stream
:
List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
Stream.Builder<Egg> eggStream = Stream.builder();
while(hen.hasEgg()) {
eggStream.add(hen.getEgg());
}
return eggStream.build();
}).collect(Collectors.toList());