从多个来源收集数据的惯用方法是什么?

What is the idiomatic way to collect data from multiple sources?

想象一个数据服务器,数据在 40 个节点之间随机分片,您希望从中计算每 200 条记录的值。所以加载 200,计算,加载 200,计算,等等。您的服务器每秒能够处理 500 条记录,但您有足够的带宽从每个服务器读取 50 records/second(最大吞吐量为 2000 条记录)。

您可以按顺序执行此操作,这是最简单的选择:

var cache = mutableListOf()
for (serv in servers) {
    for(record in serv.loadData()) {
        cache += record
        if (cache.count() == 500) {
            process(cache)
            cache.popFront(500)
        }

    }
}

这不会浪费任何 space 内存,但只会加载 50 records/s,并且不会并行处理结果。所以另一种方法是先从所有服务器获取结果,然后迭代:

var queue = ConcurrentLinkedDeque()
coroutineScope {
    for (serv in servers) {
        launch(Dispatchers.IO) {
            for (record in serv.loadData()) {
                queue += record
            }
        }
    }
}

for (batch in queue.chunked(500)) {
    process(batch)
}

这将充分利用您的吞吐量,但会在并发队列中浪费 space,并且原样也不允许并行进行处理和加载。

所以这似乎是利用 Flow 的好机会。我们希望保留从多个源并行加载的能力,因此我们将 queue += record 替换为 emit(record),然后批处理并处理 collect{} 中的结果,但是 Flow.emit 不是t 多线程安全(并且上下文由于 launch 而改变,但是可以克服,即使它是不可取的)。

假设 serv.loadData() 以增量方式加载数据,这仍然可以通过在队列太满时暂停数据加载来实现。但是这样写感觉真的很笨拙。

那么 - 假设您不关心数据的加载顺序 - 在当前版本的 Kotlin 中完成此操作的惯用方法是什么?

这是 flatMapMerge 的一种方法,它会自动并行化您发出的内部流:

suspend fun main() {
    servers.asFlow()
            .flatMapMerge(servers.size) { server -> flow {
                for (record in server.loadData()) {
                    emit(record)
                }
            } }
            .chunked(500)
            .flowOn(Dispatchers.IO) // optional
            .collect { batch ->
                process(batch)
            }
}

fun <T> Flow<T>.chunked(size: Int) = flow {
    var chunk = mutableListOf<T>()
    collect {
        chunk.add(it)
        if (chunk.size == size) {
            emit(chunk)
            chunk = mutableListOf()
        }
    }
    chunk.takeIf { it.isNotEmpty() }?.also { emit(it) }
}

Flow 仍然没有 chunked 的标准实现,所以我提供了一个快速而简单的实现。