如何从 akka 流源中获取迭代器?

How to get an iterator from an akka streams Source?

我正在尝试创建一个流,我可以通过 Iterator 之类的东西使用它。 我正在实现一个公开类似迭代器接口的库,所以这将是我使用的最简单的东西。

到目前为止我设计的图形基本上是 Source<Iterator<DataRow>>。到目前为止我看到的一件事是将它展平为 Source<DataRow> 然后使用 http://doc.akka.io/japi/akka/current/akka/stream/javadsl/StreamConverters.html#asJavaStream-- followed by https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#iterator--

但是考虑到可能会有很多行,我想知道避免展平步骤是否有意义(至少在 akka 流上下文中,我假设每个元素的开销很小当通过阶段时),或者如果有更直接的方式。

此外,我很好奇背压在创建的流中是如何工作的,尤其是子迭代器;它只缓冲一个元素吗?

扁平化步骤

Source<Iterator<DataRow>> 扁平化为 Source<DataRow> 确实会增加一些开销,因为您必须使用 flatMapConcat which does eventually create a new GraphStage

但是,如果您有 "many" 行,那么这个单独的阶段可能会派上用场,因为它将为展平步骤提供并发性。

背压

如果你查看 StreamConverters.asJavaStreamat the code,你会看到有一个 QueueSink 正在生成一个 Future 以从 akka 流中提取下一个元素,然后执行Await.result(nextElementFuture, Inf) 等待 Future 完成,以便下一个元素可以转发到 java 流。

回答你的问题:是的,子 Iterator 只缓冲一个元素,但 QueueSink 有一个 Future,它也可能有下一个 DataRow。因此 javaStream & Iterator 可能有 2 个元素被缓冲,不管你原来的 akka Source.

或者,您可以在幕后使用 prefixAndTail(1) 实现迭代器以实现 hasNextnext