RxJava 中的嵌套 Single.flatMap 与 Single.zip 相同吗?
Nested Single.flatMap in RxJava vs Single.zip, are the same?
我正面临困惑,举个例子4单身:
val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())
数据class我的对象(
field1:字符串,
字段2:整数,
字段 3:整数,
field4:字符串...
....
field10:整数
)
我有一个 service10.execute(s1 : String s2 : Int s3 : Int s4 : String)
如果我这样做:
s1.flatMap { str ->
s2.flatMap { int1 ->
s3.flatMap { int2 ->
s4.flatMap { str2 ->
...
s10.flatmap { int10
service10.execute(myObj(str, int1, int2, str2..., int10))
}
}
}
}
}
等同于:
Single.zip(
listOf(
s1,
s2,
s3,
s4
...,
...,
s10
)
) { array ->
val str = array[0] as String
val int1 = array[1] as Int
val int2 = array[2] as Int
val str2 = array[3] as String
...
val str10 = array[9] as Int
}
1) flatMap是在那里并行执行还是顺序执行?
2) 如果嵌套的 flatMap 是顺序的,有没有办法让它们像 zip 一样平行?
不,嵌套的 flatMap
s 不会使 Single
s 运行 并行,正如以下测试所证明的那样:
// so we can be sure service1 and service2 are active
val bothSubscribed = CountDownLatch(2)
// so we can simulate a blocking, long running operation on both services
val subscribeThreadsStillRunning = CountDownLatch(1)
val service5 = { str: String, str2: String ->
Observable.just("service5: $str, $str2").singleOrError()
}
val scheduler = Schedulers.io()
val createSingle = { value: String ->
Observable
.create<String> { emitter ->
println("subscribe $value on ${Thread.currentThread().name}")
bothSubscribed.countDown()
subscribeThreadsStillRunning.await(10, SECONDS)
emitter.onNext(value)
}
.singleOrError()
.subscribeOn(scheduler)
}
val s1 = createSingle("outer")
val s4 = createSingle("inner")
s1.flatMap { outer ->
s4.flatMap { inner ->
service5(outer, inner)
}
}.subscribe()
assert(bothSubscribed.await(5, SECONDS))
subscribeThreadsStillRunning.countDown()
可以通过记住 lambda 中的代码在执行 lambda 之前 运行 来理解原因(这样说似乎很明显,但我花了一些时间才明白)。 s4.flatMap
是触发订阅 s4
的原因,但此代码在 outer
可用之前不会执行,即直到 s1
已经发出并因此完成。
Zip 似乎是完美的解决方案,我不确定您为什么要使用平面地图。我想不出办法去做。它还具有类型安全 API,因此您不必在示例中使用基于数组 API。
Singles
.zip(s1, s4) { outer, inner -> service5(outer, inner) }
.flatMap { it }
.subscribe()
请注意,我使用了 "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1"
中的 Singles
,因为 lambda 与 Kotlin 配合得更好。
我正面临困惑,举个例子4单身:
val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())
数据class我的对象( field1:字符串, 字段2:整数, 字段 3:整数, field4:字符串... .... field10:整数 )
我有一个 service10.execute(s1 : String s2 : Int s3 : Int s4 : String)
如果我这样做:
s1.flatMap { str ->
s2.flatMap { int1 ->
s3.flatMap { int2 ->
s4.flatMap { str2 ->
...
s10.flatmap { int10
service10.execute(myObj(str, int1, int2, str2..., int10))
}
}
}
}
}
等同于:
Single.zip(
listOf(
s1,
s2,
s3,
s4
...,
...,
s10
)
) { array ->
val str = array[0] as String
val int1 = array[1] as Int
val int2 = array[2] as Int
val str2 = array[3] as String
...
val str10 = array[9] as Int
}
1) flatMap是在那里并行执行还是顺序执行? 2) 如果嵌套的 flatMap 是顺序的,有没有办法让它们像 zip 一样平行?
不,嵌套的 flatMap
s 不会使 Single
s 运行 并行,正如以下测试所证明的那样:
// so we can be sure service1 and service2 are active
val bothSubscribed = CountDownLatch(2)
// so we can simulate a blocking, long running operation on both services
val subscribeThreadsStillRunning = CountDownLatch(1)
val service5 = { str: String, str2: String ->
Observable.just("service5: $str, $str2").singleOrError()
}
val scheduler = Schedulers.io()
val createSingle = { value: String ->
Observable
.create<String> { emitter ->
println("subscribe $value on ${Thread.currentThread().name}")
bothSubscribed.countDown()
subscribeThreadsStillRunning.await(10, SECONDS)
emitter.onNext(value)
}
.singleOrError()
.subscribeOn(scheduler)
}
val s1 = createSingle("outer")
val s4 = createSingle("inner")
s1.flatMap { outer ->
s4.flatMap { inner ->
service5(outer, inner)
}
}.subscribe()
assert(bothSubscribed.await(5, SECONDS))
subscribeThreadsStillRunning.countDown()
可以通过记住 lambda 中的代码在执行 lambda 之前 运行 来理解原因(这样说似乎很明显,但我花了一些时间才明白)。 s4.flatMap
是触发订阅 s4
的原因,但此代码在 outer
可用之前不会执行,即直到 s1
已经发出并因此完成。
Zip 似乎是完美的解决方案,我不确定您为什么要使用平面地图。我想不出办法去做。它还具有类型安全 API,因此您不必在示例中使用基于数组 API。
Singles
.zip(s1, s4) { outer, inner -> service5(outer, inner) }
.flatMap { it }
.subscribe()
请注意,我使用了 "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1"
中的 Singles
,因为 lambda 与 Kotlin 配合得更好。