将分页功能转换为流程
Converting paging function to a Flow
我有大量的sqlite数据库,表示为Source[File, NotUsed]
。对于每个数据库,我想对结果进行分页。内存限制意味着我不能急切地这样做。假设结果类型是 Foo
,那么我试图弄清楚如何创建一个 Flow[File, Foo, NotUsed]
,它在内部使用对资源的惰性递归调用。
我看到 Source.unfold
方法允许我这样做,但它只能创建一个 Source
,这意味着我无法为它提供 File
的必要输入.我看不到如何将 Source
转换为 Flow
(通过 fromSinkAndSource 除外,但这不会通过管道传递值)。我不确定这条调查路径是否会产生任何结果。
有人建议我应该使用 GraphDSL
和 Merge
,但我无法理解 Merge
应该有多少个输入端口以及我如何使用实际上会把它们连在一起。
我认为您正在寻找 flatMapConcat
运算符:
Signature
def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]
Description
Transform each input element into a Source
whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.
emits when the current consumed substream has an element available
backpressures when downstream backpressures
completes when upstream completes and all consumed substreams complete
我有大量的sqlite数据库,表示为Source[File, NotUsed]
。对于每个数据库,我想对结果进行分页。内存限制意味着我不能急切地这样做。假设结果类型是 Foo
,那么我试图弄清楚如何创建一个 Flow[File, Foo, NotUsed]
,它在内部使用对资源的惰性递归调用。
我看到 Source.unfold
方法允许我这样做,但它只能创建一个 Source
,这意味着我无法为它提供 File
的必要输入.我看不到如何将 Source
转换为 Flow
(通过 fromSinkAndSource 除外,但这不会通过管道传递值)。我不确定这条调查路径是否会产生任何结果。
有人建议我应该使用 GraphDSL
和 Merge
,但我无法理解 Merge
应该有多少个输入端口以及我如何使用实际上会把它们连在一起。
我认为您正在寻找 flatMapConcat
运算符:
Signature
def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]
Description
Transform each input element into a
Source
whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.emits when the current consumed substream has an element available
backpressures when downstream backpressures
completes when upstream completes and all consumed substreams complete