Java 8 中如何实现惰性流?

How are lazy streams implemented in Java 8?

我正在阅读 Java 8,特别是 "Streams API"。我想知道流是如何惰性化的?

我相信流只是作为一个库添加的,并且没有对语言进行任何更改以支持惰性。另外,如果有人告诉我这是通过反思实现的,我会感到震惊。

没有反射或代理。反射和代理会带来性能成本,除非没有替代方案并且性能在 Java.

中排名第一,否则应该避免这种性能成本。

使懒惰成为可能的是做事的实用风格。基本上,stream 以源(例如:列表)、中间操作数(例如:过滤器、映射..)和终端操作(例如:计数、求和等)开始。 中间步骤延迟执行,因为您传递在管道中链接的函数 (lambda),以便在终端步骤执行。

ex: filter(Predicate<? super T>)
本例中的

filter 需要一个函数来告诉我们流中的对象是否满足某些条件。

许多来自 Java7 的功能已被用于提高效率。例如:调用动态来执行 lambda 而不是代理或匿名内部 类 和 ForkJoin 池来并行执行。

如果您对 Java 8 个内部机制感兴趣,那么您必须观看该领域专家 Brian Goetz 的演讲,它在 Youtube.

为什么你需要反省才能变得懒惰?例如,考虑这个 class:

class LazySeq<T> {

    private final List<T> list;
    private Predicate<? super T> predicate;

    public LazySeq(List<T> input) {
        this.list = new ArrayList<>(input);
    }

    //Here you just store the predicate, but you don't perform a filtering
    //You could also return a new LazySeq with a new state
    public LazySeq<T> filter(Predicate<? super T> predicate) {
        this.predicate = predicate;
        return this;
    }

    public void forEach(Consumer<? super T> consumer){
        if(predicate == null) {
            list.forEach(consumer);
        } else {
            for(T elem : list) {
                if(predicate.test(elem)) {
                    consumer.accept(elem);
                }
            }
        }
    }
}

当您在惰性序列上调用 filter 时,过滤不会立即发生,例如:

LazySeq<Integer> lazySeq = new LazySeq<>(Arrays.asList(1, 2, 3, 4));
lazySeq = lazySeq.filter(i -> i%2 == 0);

如果您在调用过滤器后查看序列的内容,您会发现它始终是 1, 2, 3, 4。但是,当调用终端操作时,例如 forEach,则将在使用消费者之前完成过滤。例如:

lazySeq.filter(i -> i%2 == 0).forEach(System.out::println);

将打印 2 和 4。

这个和Streams是一样的道理。从一个来源,您可以链接具有某些属性的操作。这些操作要么是中间的,return 是惰性流(例如 filtermap),要么是终端的(例如 forEach)。其中一些终端操作是短路的(例如 findFirst),因此您可能不会遍历所有管道(您可以想到 return 的 for 循环中的早期 return例如数组中值的索引)。

调用终端操作时,此操作链开始执行,以便最终获得预期结果。

惰性可以通过在应用中间操作时在管道上存储新状态来实现,而当您调用终端操作时,您可以逐一处理数据中的所有状态。

Stream API 并没有真正以这种方式实现(它有点复杂)但实际上原理就在这里。

streaming 不是数据的容器,而是逻辑的容器。你只需要传入一个接口实例来记住逻辑。

考虑以下代码:

class FilterIterable<T> implements Iterable<T>
{
    private Iterable<? extends T> iter;
    private Predicate<? super T> pred;

    public FilterIterable(Iterable<? extends T> iter, Predicate<? super T> pred) {
        this.iter = iter;
        this.pred = pred;
    }

    public Iterator<T> iterator() {
        return FilterIterator<T>();
    }

    class FilterIterator<T> implements Iterator<T>
    {
        private Iterator<? extends T> iterator = iter.iterator();
        private T next = null;

        FilterIterator() {
            getNext();
        }

        private void getNext() {
            next = null;
            while (iterator.hasNext()) {
                T temp = iterator.next();
                if (pred.test(temp)) {
                    next = temp;
                    break;
                }
            }
        }

        public boolean hasNext() { return next != null; }

        public T next() {
            T temp = next;
            getNext();
            return temp;
        }       
    }
}

逻辑包装在 pred 中,但仅在对象被迭代时调用。事实上,这个 class 不存储任何数据,它只保留一个可能包含数据的可迭代对象,甚至只是另一个逻辑持有者本身。

并且元素也会按需返回。这种范式使所谓的流 api 懒惰。