RxJava 中的嵌套 Single.flatMap 与 Single.zip 相同吗?

Nested Single.flatMap in RxJava vs Single.zip, are the same?

我正面临困惑,举个例子4单身:

val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())

数据class我的对象( field1:字符串, 字段2:整数, 字段 3:整数, field4:字符串... .... field10:整数 )

我有一个 service10.execute(s1 : String s2 : Int s3 : Int s4 : String)

如果我这样做:

s1.flatMap { str -> 
    s2.flatMap { int1 ->
        s3.flatMap { int2 ->
            s4.flatMap { str2 ->
                ...
                s10.flatmap { int10
                  service10.execute(myObj(str, int1, int2, str2..., int10))
                }
            }
        }
    }
}

等同于:

Single.zip(
            listOf(
                s1,
                s2,
                s3,
                s4
              ...,
              ...,
              s10
            )
        ) { array ->
            val str = array[0] as String
            val int1 = array[1] as Int
            val int2 = array[2] as Int
            val str2 = array[3] as String
            ...
            val str10 = array[9] as Int
        }

1) flatMap是在那里并行执行还是顺序执行? 2) 如果嵌套的 flatMap 是顺序的,有没有办法让它们像 zip 一样平行?

不,嵌套的 flatMaps 不会使 Singles 运行 并行,正如以下测试所证明的那样:

    // so we can be sure service1 and service2 are active
    val bothSubscribed = CountDownLatch(2)
    // so we can simulate a blocking, long running operation on both services
    val subscribeThreadsStillRunning = CountDownLatch(1)

    val service5 = { str: String, str2: String ->
        Observable.just("service5: $str, $str2").singleOrError()
    }

    val scheduler = Schedulers.io()

    val createSingle = { value: String ->
        Observable
            .create<String> { emitter ->
                println("subscribe $value on ${Thread.currentThread().name}")
                bothSubscribed.countDown()
                subscribeThreadsStillRunning.await(10, SECONDS)
                emitter.onNext(value)
            }
            .singleOrError()
            .subscribeOn(scheduler)
    }

    val s1 = createSingle("outer")
    val s4 = createSingle("inner")

    s1.flatMap { outer ->
        s4.flatMap { inner ->
            service5(outer, inner)
        }
    }.subscribe()

    assert(bothSubscribed.await(5, SECONDS))
    subscribeThreadsStillRunning.countDown()

可以通过记住 lambda 中的代码在执行 lambda 之前 运行 来理解原因(这样说似乎很明显,但我花了一些时间才明白)。 s4.flatMap 是触发订阅 s4 的原因,但此代码在 outer 可用之前不会执行,即直到 s1 已经发出并因此完成。

Zip 似乎是完美的解决方案,我不确定您为什么要使用平面地图。我想不出办法去做。它还具有类型安全 API,因此您不必在示例中使用基于数组 API。

Singles
        .zip(s1, s4) { outer, inner -> service5(outer, inner) }
        .flatMap { it }
        .subscribe()

请注意,我使用了 "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1" 中的 Singles,因为 lambda 与 Kotlin 配合得更好。