Java 8 stream 懒惰在实践中没有用吗?

Is Java 8 stream laziness useless in practice?

我最近读了很多关于 Java 8 流的文章,还有几篇关于使用 Java 8 流进行延迟加载的文章,特别是:here and over here。我似乎无法摆脱延迟加载完全无用的感觉(或者充其量是提供零性能价值的次要语法便利)。

我们以这段代码为例:

int[] myInts = new int[]{1,2,3,5,8,13,21};

IntStream myIntStream = IntStream.of(myInts);

int[] myChangedArray = myIntStream
                        .peek(n -> System.out.println("About to square: " + n))
                        .map(n -> (int)Math.pow(n, 2))
                        .peek(n -> System.out.println("Done squaring, result: " + n))
                        .toArray();

这将登录控制台,因为调用了 terminal operation,在本例中为 toArray(),并且我们的流是惰性的,仅在调用终端操作时才执行。当然我也可以这样做:

  IntStream myChangedInts = myIntStream
    .peek(n -> System.out.println("About to square: " + n))
    .map(n -> (int)Math.pow(n, 2))
    .peek(n -> System.out.println("Done squaring, result: " + n));

并且不会打印任何内容,因为地图没有生成,因为我不需要数据。直到我这样称呼:

  int[] myChangedArray = myChangedInts.toArray();

瞧,我得到了映射数据和控制台日志。除了我认为它没有任何好处。我意识到我可以在调用 toArray() 之前很久 定义 过滤器代码,并且我可以绕过这个“没有真正过滤的流”,但是那又怎样?是这是唯一的好处?

这些文章似乎暗示与懒惰相关的性能提升,例如:

In the Java 8 Streams API, the intermediate operations are lazy and their internal processing model is optimized to make it being capable of processing the large amount of data with high performance.

Java 8 Streams API optimizes stream processing with the help of short circuiting operations. Short Circuit methods ends the stream processing as soon as their conditions are satisfied. In normal words short circuit operations, once the condition is satisfied just breaks all of the intermediate operations, lying before in the pipeline. Some of the intermediate as well as terminal operations have this behavior.

这听起来确实像是跳出循环,与懒惰完全无关。

最后,在第二篇文章中出现了这句令人费解的台词:

Lazy operations achieve efficiency. It is a way not to work on stale data. Lazy operations might be useful in the situations where input data is consumed gradually rather than having whole complete set of elements beforehand. For example consider the situations where an infinite stream has been created using Stream#generate(Supplier<T>) and the provided Supplier function is gradually receiving data from a remote server. In those kind of the situations server call will only be made at a terminal operation when it's needed.

不处理陈旧数据?什么?延迟加载如何防止某人处理过时数据?


TLDR:除了能够在以后 运行 filter/map/reduce/whatever 操作(提供零性能优势)之外,延迟加载还有什么好处吗?

如果是这样,真实世界的用例是什么?

你是对的,map().reduce()map().collect() 不会带来好处,但 findAny() findFirst()anyMatch(), allMatch(), 等等,基本上可以短路的任何操作。

您的终端操作 toArray() 可能支持您的论点,因为它需要流的所有元素。

有些终端操作不会。对于这些,如果不延迟执行流,那将是一种浪费。两个例子:

//example 1: print first element of 1000 after transformations
IntStream.range(0, 1000)
    .peek(System.out::println)
    .mapToObj(String::valueOf)
    .peek(System.out::println)
    .findFirst()
    .ifPresent(System.out::println);

//example 2: check if any value has an even key
boolean valid = records.
    .map(this::heavyConversion)
    .filter(this::checkWithWebService)
    .mapToInt(Record::getKey)
    .anyMatch(i -> i % 2 == 0)

第一个流将打印:

0
0
0

也就是说,中间操作将 运行 仅针对一个元素。这是一个重要的优化。如果它不是懒惰的,那么所有 peek() 调用都必须对所有元素进行 运行 ( 绝对没有必要 因为你只对一个元素感兴趣) .中间操作可能很昂贵(例如在第二个示例中)

短路端子操作(其中 toArray 不是)使此优化成为可能。

好问题。

假设您编写教科书般完美的代码,适当优化的 forstream 之间的性能差异并不明显(流在 class 加载方面往往略好,但在大多数情况下差异不明显。

考虑以下示例。

// Some lengthy computation
private static int doStuff(int i) {
    try { Thread.sleep(1000); } catch (InterruptedException e) { }
    return i;
}

public static OptionalInt findFirstGreaterThanStream(int value) {
    return IntStream
            .of(MY_INTS)
            .map(Main::doStuff)
            .filter(x -> x > value)
            .findFirst();
}

public static OptionalInt findFirstGreaterThanFor(int value) {
    for (int i = 0; i < MY_INTS.length; i++) {
        int mapped = Main.doStuff(MY_INTS[i]);
        if(mapped > value){
            return OptionalInt.of(mapped);
        }
    }
    return OptionalInt.empty();
}

鉴于上述方法,下一个测试应该显示它们大约在同一时间执行。

public static void main(String[] args) {
    long begin;
    long end;

    begin = System.currentTimeMillis();
    System.out.println(findFirstGreaterThanStream(5));
    end = System.currentTimeMillis();
    System.out.println(end-begin);

    begin = System.currentTimeMillis();
    System.out.println(findFirstGreaterThanFor(5));
    end = System.currentTimeMillis();
    System.out.println(end-begin);
}

OptionalInt[8]

5119

OptionalInt[8]

5001

反正我们大部分时间都花在了doStuff方法上。假设我们想要添加更多线程。

调整流方法很简单(考虑到您的操作满足并行流的前提条件)。

public static OptionalInt findFirstGreaterThanParallelStream(int value) {
    return IntStream
            .of(MY_INTS)
            .parallel()
            .map(Main::doStuff)
            .filter(x -> x > value)
            .findFirst();
}

在没有流的情况下实现相同的行为可能很棘手。

public static OptionalInt findFirstGreaterThanParallelFor(int value, Executor executor) {
    AtomicInteger counter = new AtomicInteger(0);

    CompletableFuture<OptionalInt> cf = CompletableFuture.supplyAsync(() -> {
        while(counter.get() != MY_INTS.length-1);
        return OptionalInt.empty();
    });

    for (int i = 0; i < MY_INTS.length; i++) {
        final int current = MY_INTS[i];
        executor.execute(() -> {
            int mapped = Main.doStuff(current);
            if(mapped > value){
                cf.complete(OptionalInt.of(mapped));
            } else {
                counter.incrementAndGet();
            }
        });
    }

    try {
        return cf.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
        return OptionalInt.empty();
    }
}

测试再次大约在同一时间执行。

public static void main(String[] args) {
    long begin;
    long end;

    begin = System.currentTimeMillis();
    System.out.println(findFirstGreaterThanParallelStream(5));
    end = System.currentTimeMillis();
    System.out.println(end-begin);

    ExecutorService executor = Executors.newFixedThreadPool(10);
    begin = System.currentTimeMillis();
    System.out.println(findFirstGreaterThanParallelFor(5678, executor));
    end = System.currentTimeMillis();
    System.out.println(end-begin);

    executor.shutdown();
    executor.awaitTermination(10, TimeUnit.SECONDS);
    executor.shutdownNow();
}

OptionalInt[8]

1004

OptionalInt[8]

1004

总而言之,尽管我们没有从流中获得很大的性能优势(考虑到您在 for 替代方案中编写了优秀的多线程代码),代码本身往往更易于维护。

一个(有点跑题)最后的说明:

与编程语言一样,更高级别的抽象(streams 相对于 fors)以牺牲性能为代价使东西更容易开发。我们并没有从汇编语言转向过程语言,再转向面向对象语言,因为后者提供了更好的性能。我们搬家是因为它让我们更有效率(以更低的成本开发同样的东西)。如果您能够从流中获得与使用 for 和正确编写的多线程代码相同的性能,我会说这已经是一个胜利。

我有一个来自我们代码库的真实示例,因为我将对其进行简化,所以不能完全确定您是否会喜欢它或完全掌握它...

我们有一项服务需要 List<CustomService>,我想调用它。现在为了调用它,我将进入一个数据库(比现实简单得多)并获得一个 List<DBObject>;为了从中获得 List<CustomService>,需要进行一些繁重的转换。

这里是我的选择,原地改造,通过榜单。简单,但可能不是那么理想。第二个选项,重构服务,接受 List<DBObject>Function<DBObject, CustomService>。这听起来微不足道,但它使 懒惰 (除其他外)。该服务有时可能只需要该列表中的几个元素,或者有时需要 属性 的 max,等等 - 因此我不需要对 所有元素进行大量转换,这就是 Stream API 基于拉动的懒惰是赢家的地方。

在 Streams 出现之前,我们曾经使用 guava。它 Lists.transform( list, function) 也很懒惰。

这不是流本身的基本特征,即使没有番石榴也可以完成,但那样就简单多了。 findFirst 提供的例子很好,最容易理解;这就是整个惰性点,元素仅在需要时才被拉取,它们不会以块的形式从中间操作传递到另一个操作,而是一次从一个阶段传递到另一个阶段。

Laziness 对 API 的用户非常有用,尤其是当 Stream 管道评估的最终结果可能非常大时!

最简单的例子是 Java API 本身的 Files.lines 方法。如果您不想将整个文件读入内存并且只需要前 N 行,那么只需写:

Stream<String> stream = Files.lines(path); // lazy operation

List<String> result = stream.limit(N).collect(Collectors.toList()); // read and collect

一个尚未提及的有趣用例是流操作的任意组合,来自代码库的不同部分,响应不同类型的业务或技术要求。

例如,假设您有一个应用程序,其中某些用户可以看到所有数据,而其他某些用户只能看到其中的一部分。检查用户权限的代码部分可以简单地对正在传递的任何流施加过滤器。

如果没有惰性流,代码的同一部分可能会过滤已经实现的完整集合,但这可能是昂贵的,没有真正的收益。

或者,代码的同一部分可能想要将其过滤器附加到数据源,但现在它必须知道数据是否来自数据库,因此它可以强加一个额外的 WHERE 子句,或其他一些来源。

对于惰性流,它是一个可以以任何方式实现的过滤器。对来自数据库的流施加的过滤器可以转化为上述 WHERE 子句,与过滤整个 table 读取产生的内存中集合相比,性能明显提高。

所以,更好的抽象、更好的性能、更好的代码可读性和可维护性,对我来说听起来像是一个胜利。 :)

非惰性实现将处理所有输入并将输出收集到每个操作的新集合中。很显然,无限大的source是不可能的,否则很耗内存,在reduce和short-circuiting操作的时候又是不必要的耗内存,所以有很大的好处。

检查下面的例子

Stream.of("0","0","1","2","3","4")
                .distinct()
                .peek(a->System.out.println("after distinct: "+a))
                .anyMatch("1"::equals);

如果它不表现得像惰性 ,您会认为所有元素都会首先通过 distinct 过滤。但是由于惰性执行,它的行为有所不同。它将流式传输计算结果所需的最少元素量。

上面的例子会打印

after distinct: 0
after distinct: 1

分析工作原理:

首先"0"一直走到终端操作却不满足。必须流式传输另一个元素。

第二个 "0" 通过 .distinct() 过滤,永远不会到达终端操作。

终端操作还没满足,流下一个元素

"1"经过终端操作,满足

不需要流式处理更多元素。