Rx Java 并行映射函数
Rx Java map functions in parallel
可以说是 RxJava 和响应式编程的新手。
我试图将两个函数并行映射为单个 Observable 管道的一部分,但似乎无法以这种方式工作。这是我的代码。
Observable.fromCallable(thatReturnsNumberOne())
.observeOn(newThread())
.map(doubleIt())
.observeOn(newThread())
.map(doubleIt())
.subscribe(testSubscriber);
我希望同时产生 2 个 doubleIt() 调用。但似乎总是第一个 doubleIt() 完成后,第二个才开始。即blocking/sequential.
我错过了什么?
我假设 thatReturnsNumberOne()
只有 returns 一个值。返回的值按顺序传递给每个运算符。通过使用 observeOn(newThread())
,当值到达链中的那个点时,您只能更改为新线程。
如果你想并行计算,你必须使用多个 observables:
Observable.fromCallable(thatReturnsNumberOne())
.flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread())
.combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()),
doubles -> doubles[0] + doubles[1]))
.subscribe(testSubscriber);
可以说是 RxJava 和响应式编程的新手。
我试图将两个函数并行映射为单个 Observable 管道的一部分,但似乎无法以这种方式工作。这是我的代码。
Observable.fromCallable(thatReturnsNumberOne())
.observeOn(newThread())
.map(doubleIt())
.observeOn(newThread())
.map(doubleIt())
.subscribe(testSubscriber);
我希望同时产生 2 个 doubleIt() 调用。但似乎总是第一个 doubleIt() 完成后,第二个才开始。即blocking/sequential.
我错过了什么?
我假设 thatReturnsNumberOne()
只有 returns 一个值。返回的值按顺序传递给每个运算符。通过使用 observeOn(newThread())
,当值到达链中的那个点时,您只能更改为新线程。
如果你想并行计算,你必须使用多个 observables:
Observable.fromCallable(thatReturnsNumberOne())
.flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread())
.combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()),
doubles -> doubles[0] + doubles[1]))
.subscribe(testSubscriber);