如何从 akka 流源中获取迭代器?
How to get an iterator from an akka streams Source?
我正在尝试创建一个流,我可以通过 Iterator
之类的东西使用它。
我正在实现一个公开类似迭代器接口的库,所以这将是我使用的最简单的东西。
到目前为止我设计的图形基本上是 Source<Iterator<DataRow>>
。到目前为止我看到的一件事是将它展平为 Source<DataRow>
然后使用 http://doc.akka.io/japi/akka/current/akka/stream/javadsl/StreamConverters.html#asJavaStream-- followed by https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#iterator--
但是考虑到可能会有很多行,我想知道避免展平步骤是否有意义(至少在 akka 流上下文中,我假设每个元素的开销很小当通过阶段时),或者如果有更直接的方式。
此外,我很好奇背压在创建的流中是如何工作的,尤其是子迭代器;它只缓冲一个元素吗?
扁平化步骤
将 Source<Iterator<DataRow>>
扁平化为 Source<DataRow>
确实会增加一些开销,因为您必须使用 flatMapConcat
which does eventually create a new GraphStage
。
但是,如果您有 "many" 行,那么这个单独的阶段可能会派上用场,因为它将为展平步骤提供并发性。
背压
如果你查看 StreamConverters.asJavaStream
的 at the code,你会看到有一个 QueueSink
正在生成一个 Future 以从 akka 流中提取下一个元素,然后执行Await.result(nextElementFuture, Inf)
等待 Future 完成,以便下一个元素可以转发到 java 流。
回答你的问题:是的,子 Iterator 只缓冲一个元素,但 QueueSink 有一个 Future,它也可能有下一个 DataRow
。因此 javaStream & Iterator 可能有 2 个元素被缓冲,不管你原来的 akka Source
.
或者,您可以在幕后使用 prefixAndTail(1)
实现迭代器以实现 hasNext
和 next
。
我正在尝试创建一个流,我可以通过 Iterator
之类的东西使用它。
我正在实现一个公开类似迭代器接口的库,所以这将是我使用的最简单的东西。
到目前为止我设计的图形基本上是 Source<Iterator<DataRow>>
。到目前为止我看到的一件事是将它展平为 Source<DataRow>
然后使用 http://doc.akka.io/japi/akka/current/akka/stream/javadsl/StreamConverters.html#asJavaStream-- followed by https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#iterator--
但是考虑到可能会有很多行,我想知道避免展平步骤是否有意义(至少在 akka 流上下文中,我假设每个元素的开销很小当通过阶段时),或者如果有更直接的方式。
此外,我很好奇背压在创建的流中是如何工作的,尤其是子迭代器;它只缓冲一个元素吗?
扁平化步骤
将 Source<Iterator<DataRow>>
扁平化为 Source<DataRow>
确实会增加一些开销,因为您必须使用 flatMapConcat
which does eventually create a new GraphStage
。
但是,如果您有 "many" 行,那么这个单独的阶段可能会派上用场,因为它将为展平步骤提供并发性。
背压
如果你查看 StreamConverters.asJavaStream
的 at the code,你会看到有一个 QueueSink
正在生成一个 Future 以从 akka 流中提取下一个元素,然后执行Await.result(nextElementFuture, Inf)
等待 Future 完成,以便下一个元素可以转发到 java 流。
回答你的问题:是的,子 Iterator 只缓冲一个元素,但 QueueSink 有一个 Future,它也可能有下一个 DataRow
。因此 javaStream & Iterator 可能有 2 个元素被缓冲,不管你原来的 akka Source
.
或者,您可以在幕后使用 prefixAndTail(1)
实现迭代器以实现 hasNext
和 next
。