Kotlin flatMapMerge 在多次组合相同流时不一致
Kotlin flatMapMerge inconsistent when combining same flow multiple times
我有一个应用 flatMapMerge
共享流 A 以创建流 B。然后我将流 A 与流 B 组合以获得最终结果 collectLatest
。有时它会输出所有结果。但是,在其他时候,它会跳过一些输出。这是一个简化的例子:
val flowA = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)
val flowB = flowA.flatMapMerge {
// In the real app, this flow is created in a complex way
flow {
emit(it + 2)
emit(it + 3)
emit(it + 4)
emit(it + 5)
emit(it + 6)
emit(it + 7)
emit(it + 8)
emit(it + 9)
}
}
flowA.emit(1)
flowA.combine(flowB, ::Pair).collect { (a, b) ->
// I need access to the value of flowA for calculations here
Log.d(TAG, "result A: $a, B: $b")
}
第一 运行:
result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 9
result A: 1, B: 10
第二 运行:
result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 10
预期输出:
result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10
我也试过调整回放,但是没有效果。我也试过 collectLatest
而不是 collect
,但也没有变化。
我应该使用 flatMapMerge
以外的东西吗?我试过 flatMapLatest
,但只输出第一个和最后一个结果。 MutableSharedFlow
应该是别的东西吗?
如何每次都输出所有结果?
这里的问题是 combine 函数
自 official documentation 起,合并 函数:
Returns a Flow whose values are generated with transform function by
combining the most recently emitted values by each flow.
如果 combine()
的转换函数内部有一些繁重的计算,则 combine() 可能不会发出所有结果。
因此,为了获得所有结果,以下策略工作正常。
val flowA = MutableSharedFlow<Int>()
.onStart { emit(1) }
val flowB = flowA.flatMapMerge {
flow {
for (i in 2..9) {
emit(Pair(it, i))
}
}
}
runBlocking {
flowB.collect { (a, b) ->
println("result A: $a, B: $b")
}
}
输出:
result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10
我有一个应用 flatMapMerge
共享流 A 以创建流 B。然后我将流 A 与流 B 组合以获得最终结果 collectLatest
。有时它会输出所有结果。但是,在其他时候,它会跳过一些输出。这是一个简化的例子:
val flowA = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)
val flowB = flowA.flatMapMerge {
// In the real app, this flow is created in a complex way
flow {
emit(it + 2)
emit(it + 3)
emit(it + 4)
emit(it + 5)
emit(it + 6)
emit(it + 7)
emit(it + 8)
emit(it + 9)
}
}
flowA.emit(1)
flowA.combine(flowB, ::Pair).collect { (a, b) ->
// I need access to the value of flowA for calculations here
Log.d(TAG, "result A: $a, B: $b")
}
第一 运行:
result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 9
result A: 1, B: 10
第二 运行:
result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 10
预期输出:
result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10
我也试过调整回放,但是没有效果。我也试过 collectLatest
而不是 collect
,但也没有变化。
我应该使用 flatMapMerge
以外的东西吗?我试过 flatMapLatest
,但只输出第一个和最后一个结果。 MutableSharedFlow
应该是别的东西吗?
如何每次都输出所有结果?
这里的问题是 combine 函数
自 official documentation 起,合并 函数:
Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.
如果 combine()
的转换函数内部有一些繁重的计算,则 combine() 可能不会发出所有结果。
因此,为了获得所有结果,以下策略工作正常。
val flowA = MutableSharedFlow<Int>()
.onStart { emit(1) }
val flowB = flowA.flatMapMerge {
flow {
for (i in 2..9) {
emit(Pair(it, i))
}
}
}
runBlocking {
flowB.collect { (a, b) ->
println("result A: $a, B: $b")
}
}
输出:
result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10