Rx Java 并行映射函数

Rx Java map functions in parallel

可以说是 RxJava 和响应式编程的新手。

我试图将两个函数并行映射为单个 Observable 管道的一部分,但似乎无法以这种方式工作。这是我的代码。

Observable.fromCallable(thatReturnsNumberOne())
                .observeOn(newThread())
                .map(doubleIt())
                .observeOn(newThread())
                .map(doubleIt())
                .subscribe(testSubscriber);

我希望同时产生 2 个 doubleIt() 调用。但似乎总是第一个 doubleIt() 完成后,第二个才开始。即blocking/sequential.

我错过了什么?

我假设 thatReturnsNumberOne() 只有 returns 一个值。返回的值按顺序传递给每个运算符。通过使用 observeOn(newThread()),当值到达链中的那个点时,您只能更改为新线程。

如果你想并行计算,你必须使用多个 observables:

Observable.fromCallable(thatReturnsNumberOne())
    .flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread())
        .combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()),
        doubles -> doubles[0] + doubles[1]))
    .subscribe(testSubscriber);