从多个来源收集数据的惯用方法是什么?
What is the idiomatic way to collect data from multiple sources?
想象一个数据服务器,数据在 40 个节点之间随机分片,您希望从中计算每 200 条记录的值。所以加载 200,计算,加载 200,计算,等等。您的服务器每秒能够处理 500 条记录,但您有足够的带宽从每个服务器读取 50 records/second(最大吞吐量为 2000 条记录)。
您可以按顺序执行此操作,这是最简单的选择:
var cache = mutableListOf()
for (serv in servers) {
for(record in serv.loadData()) {
cache += record
if (cache.count() == 500) {
process(cache)
cache.popFront(500)
}
}
}
这不会浪费任何 space 内存,但只会加载 50 records/s,并且不会并行处理结果。所以另一种方法是先从所有服务器获取结果,然后迭代:
var queue = ConcurrentLinkedDeque()
coroutineScope {
for (serv in servers) {
launch(Dispatchers.IO) {
for (record in serv.loadData()) {
queue += record
}
}
}
}
for (batch in queue.chunked(500)) {
process(batch)
}
这将充分利用您的吞吐量,但会在并发队列中浪费 space,并且原样也不允许并行进行处理和加载。
所以这似乎是利用 Flow
的好机会。我们希望保留从多个源并行加载的能力,因此我们将 queue += record
替换为 emit(record)
,然后批处理并处理 collect{}
中的结果,但是 Flow.emit
不是t 多线程安全(并且上下文由于 launch
而改变,但是可以克服,即使它是不可取的)。
假设 serv.loadData()
以增量方式加载数据,这仍然可以通过在队列太满时暂停数据加载来实现。但是这样写感觉真的很笨拙。
那么 - 假设您不关心数据的加载顺序 - 在当前版本的 Kotlin 中完成此操作的惯用方法是什么?
这是 flatMapMerge
的一种方法,它会自动并行化您发出的内部流:
suspend fun main() {
servers.asFlow()
.flatMapMerge(servers.size) { server -> flow {
for (record in server.loadData()) {
emit(record)
}
} }
.chunked(500)
.flowOn(Dispatchers.IO) // optional
.collect { batch ->
process(batch)
}
}
fun <T> Flow<T>.chunked(size: Int) = flow {
var chunk = mutableListOf<T>()
collect {
chunk.add(it)
if (chunk.size == size) {
emit(chunk)
chunk = mutableListOf()
}
}
chunk.takeIf { it.isNotEmpty() }?.also { emit(it) }
}
Flow 仍然没有 chunked
的标准实现,所以我提供了一个快速而简单的实现。
想象一个数据服务器,数据在 40 个节点之间随机分片,您希望从中计算每 200 条记录的值。所以加载 200,计算,加载 200,计算,等等。您的服务器每秒能够处理 500 条记录,但您有足够的带宽从每个服务器读取 50 records/second(最大吞吐量为 2000 条记录)。
您可以按顺序执行此操作,这是最简单的选择:
var cache = mutableListOf()
for (serv in servers) {
for(record in serv.loadData()) {
cache += record
if (cache.count() == 500) {
process(cache)
cache.popFront(500)
}
}
}
这不会浪费任何 space 内存,但只会加载 50 records/s,并且不会并行处理结果。所以另一种方法是先从所有服务器获取结果,然后迭代:
var queue = ConcurrentLinkedDeque()
coroutineScope {
for (serv in servers) {
launch(Dispatchers.IO) {
for (record in serv.loadData()) {
queue += record
}
}
}
}
for (batch in queue.chunked(500)) {
process(batch)
}
这将充分利用您的吞吐量,但会在并发队列中浪费 space,并且原样也不允许并行进行处理和加载。
所以这似乎是利用 Flow
的好机会。我们希望保留从多个源并行加载的能力,因此我们将 queue += record
替换为 emit(record)
,然后批处理并处理 collect{}
中的结果,但是 Flow.emit
不是t 多线程安全(并且上下文由于 launch
而改变,但是可以克服,即使它是不可取的)。
假设 serv.loadData()
以增量方式加载数据,这仍然可以通过在队列太满时暂停数据加载来实现。但是这样写感觉真的很笨拙。
那么 - 假设您不关心数据的加载顺序 - 在当前版本的 Kotlin 中完成此操作的惯用方法是什么?
这是 flatMapMerge
的一种方法,它会自动并行化您发出的内部流:
suspend fun main() {
servers.asFlow()
.flatMapMerge(servers.size) { server -> flow {
for (record in server.loadData()) {
emit(record)
}
} }
.chunked(500)
.flowOn(Dispatchers.IO) // optional
.collect { batch ->
process(batch)
}
}
fun <T> Flow<T>.chunked(size: Int) = flow {
var chunk = mutableListOf<T>()
collect {
chunk.add(it)
if (chunk.size == size) {
emit(chunk)
chunk = mutableListOf()
}
}
chunk.takeIf { it.isNotEmpty() }?.also { emit(it) }
}
Flow 仍然没有 chunked
的标准实现,所以我提供了一个快速而简单的实现。