Java 8 流处理不流畅

Java 8 stream processing not fluent

我在 Java 8 个流中遇到问题,其中数据是突然批量处理的,而不是在请求时处理。我有一个相当复杂的流,必须并行化,因为我使用 concat 合并两个流。

我的问题源于这样一个事实,即数据似乎在几分钟内——有时甚至几小时内——被大量解析。我希望这个处理在 Stream 读取传入数据时立即发生,以分散工作量。批量处理似乎在几乎所有方面都违反直觉。

所以,问题是为什么会发生这种批量回收以及我该如何避免它。

我的输入是一个未知大小的 Spliterator,我使用 forEach 作为终端操作。

并行流的基本原则是 encounter 顺序不必与 processing 顺序匹配。如有必要,这可以在组装正确排序的结果时同时处理子列表或子树的项目。这明确地允许批量处理,甚至强制要求 ordered 流的并行处理。

此行为由 SpliteratortrySplit 实现的特定实现决定。 specification 表示:

If this Spliterator is ORDERED, the returned Spliterator must cover a strict prefix of the elements

API Note:

An ideal trySplit method efficiently (without traversal) divides its elements exactly in half, allowing balanced parallel computation.


为什么这个策略在规范中固定而不是,例如even/odd 分裂?

好吧,考虑一个简单的用例。列表将被过滤并收集到新列表中,因此必须保留遇到顺序。使用前缀规则,它很容易实现。拆分出一个前缀,同时过滤两个块,然后将前缀过滤的结果添加到新列表中,然后添加过滤后的后缀。

用奇数策略,那是不可能的。你可以同时过滤这两个部分,但之后,你不知道如何正确地连接结果,除非你在整个操作过程中跟踪每个项目的位置。

即便如此,加入这些装备物品也会比每个区块执行 addAll 复杂得多。


您可能已经注意到,这仅适用于您可能必须保留的遭遇顺序。如果您的拆分器未报告 ORDERED 特征,则不需要 return 前缀。尽管如此,您可能从 AbstractSpliterator 继承的默认实现旨在与有序拆分器兼容。因此,如果你想要不同的策略,你必须自己实现拆分操作。

或者您使用不同的方式实现无序流,例如

Stream.generate(()->{
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return Thread.currentThread().getName();
}).parallel().forEach(System.out::println);

可能更接近您的预期。