将分页功能转换为流程

Converting paging function to a Flow

我有大量的sqlite数据库,表示为Source[File, NotUsed]。对于每个数据库,我想对结果进行分页。内存限制意味着我不能急切地这样做。假设结果类型是 Foo,那么我试图弄清楚如何创建一个 Flow[File, Foo, NotUsed],它在内部使用对资源的惰性递归调用。

我看到 Source.unfold 方法允许我这样做,但它只能创建一个 Source,这意味着我无法为它提供 File 的必要输入.我看不到如何将 Source 转换为 Flow(通过 fromSinkAndSource 除外,但这不会通过管道传递值)。我不确定这条调查路径是否会产生任何结果。

有人建议我应该使用 GraphDSLMerge,但我无法理解 Merge 应该有多少个输入端口以及我如何使用实际上会把它们连在一起。

我认为您正在寻找 flatMapConcat 运算符:

Signature

def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]

Description

Transform each input element into a Source whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.

emits when the current consumed substream has an element available

backpressures when downstream backpressures

completes when upstream completes and all consumed substreams complete