从并行流中收集结果

Collect results from parallel stream

我有一段代码是这样的:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    ArrayList<Egg> eggs = new ArrayList<>();
    while (hen.hasEgg()) {
        eggs.add(hen.getEgg());
    }
    return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());

但是这样我就必须为每只母鸡创建一个 ArrayList,并且直到一只母鸡被 100% 处理后才会收集鸡蛋。我想要这样的东西:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    while (hen.hasEgg()) {
        yield return hen.getEgg();
    }
}).collect(Collectors.toList());

但是Java没有产量return。有实现的方法吗?

假设存在一个getEggs()方法,你可以使用下面的方法来收集所有的鸡蛋。

List<Egg> eggs = hens.parallelStream()
    .filter(Hen::hasEggs)
    .map(Hen::getEggs)
    .collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);

代码假定 getEggs() returns 一个 Collection。当 Hen 没有 Eggs.

时,您可以消除 filter(Hen::hasEggs) if getEggs() returns 一个空的 Collection

您的 Hen class 不适合 Stream API。如果你不能改变它并且它没有其他有用的方法(比如Collection<Egg> getAllEggs()Iterator<Egg> eggIterator()),你可以像这样创建一个鸡蛋流:

public static Stream<Egg> eggs(Hen hen) {
    Iterator<Egg> it = new Iterator<Egg>() {
        @Override
        public boolean hasNext() {
            return hen.hasEgg();
        }

        @Override
        public Egg next() {
            return hen.getEgg();
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}

现在您可以通过以下方式使用它:

List<Egg> eggs = hens.parallelStream()
                     .flatMap(hen -> eggs(hen))
                     .collect(Collectors.toList());

当然,如果您可以更改 Hen class.

,则可能会实现更好的 Stream

使用 hasEgg()getEgg() 的迭代逻辑是 stateful 因为这些方法的结果取决于之前的调用。因此,除非您设法完全更改接口,否则无法并行处理单个 Hen

也就是说,您担心 ArrayList 是不必要的。当流实现并行执行 collect 操作时,无论如何它都必须缓冲每个线程的值,然后合并这些缓冲区。甚至可能是操作根本无法从并行执行中获益。

你可以做的是用 Stream.Builder 替换 ArrayList 因为它针对只添加直到构造 Stream:

的用例进行了优化
List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
    Stream.Builder<Egg> eggStream = Stream.builder();
    while(hen.hasEgg()) {
        eggStream.add(hen.getEgg());
    }
    return eggStream.build();
}).collect(Collectors.toList());