Kotlin flatMapMerge 在多次组合相同流时不一致

Kotlin flatMapMerge inconsistent when combining same flow multiple times

我有一个应用 flatMapMerge 共享流 A 以创建流 B。然后我将流 A 与流 B 组合以获得最终结果 collectLatest。有时它会输出所有结果。但是,在其他时候,它会跳过一些输出。这是一个简化的例子:

val flowA = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)
val flowB = flowA.flatMapMerge {
    // In the real app, this flow is created in a complex way
    flow {
        emit(it + 2)
        emit(it + 3)
        emit(it + 4)
        emit(it + 5)
        emit(it + 6)
        emit(it + 7)
        emit(it + 8)
        emit(it + 9)
    }
}
flowA.emit(1)
flowA.combine(flowB, ::Pair).collect { (a, b) ->
    // I need access to the value of flowA for calculations here
    Log.d(TAG, "result A: $a, B: $b")
}

第一 运行:

result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 9
result A: 1, B: 10

第二 运行:

result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 10

预期输出:

result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10

我也试过调整回放,但是没有效果。我也试过 collectLatest 而不是 collect,但也没有变化。

我应该使用 flatMapMerge 以外的东西吗?我试过 flatMapLatest,但只输出第一个和最后一个结果。 MutableSharedFlow 应该是别的东西吗?

如何每次都输出所有结果?

这里的问题是 combine 函数

official documentation 起,合并 函数:

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.

如果 combine() 的转换函数内部有一些繁重的计算,则 combine() 可能不会发出所有结果。

因此,为了获得所有结果,以下策略工作正常。

val flowA = MutableSharedFlow<Int>()
        .onStart { emit(1) }

    val flowB = flowA.flatMapMerge {
        flow {
            for (i in 2..9) {
                emit(Pair(it, i))
            }
        }
    }

    runBlocking {
        flowB.collect { (a, b) ->
            println("result A: $a, B: $b")

        }
    }

输出:

result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10