如何使用 RxJava2 将不同的异步源聚合成一个?
How to aggregate different async sources into a Single using RxJava2?
假设我有这个同步方法:
public FruitBowl getFruitBowl() {
Apple apple = getApple(); // IO intensive
Banana banana = getBanana(); // CPU intensive
return new FruitBowl(apple, banana);
}
我可以使用 Java 并发 API 将其转换为异步方法,结果会像这样:
public Future<FruitBowl> getFruitBowl() {
Future<Apple> appleFuture = getAppleAsync(); // IO intensive
Future<Banana> bananaFuture = getBananaAsync(); // CPU intensive
return createFruitBowlAsync(appleFuture, bananaFuture); // Awaits appleFuture and bananaFuture and then returns a new FruitBowl
}
在利用它的调度程序(io 和计算)和 return Single 的同时,惯用的 Rx 方法是什么?
您可以使用 zip
运算符。并为每个异步操作定义不同的线程。如果不这样做,这些方法将在同一个线程上一个接一个地执行。
我将创建这两种方法的 Observable
版本,以便分别 return Observable<Apple>
和 Observable<Banana>
并以这种方式使用它们:
Observalbe.zip(getAppleObservable().subscribeOn(Schedulers.newThread()),
getBananaObservable().subscribeOn(Schedulers.newThread()),
(apple, banana) -> new FruitBowl(apple, banana)))
.subscribe(/* do your work here with FruitBowl object */);
Here 有关如何使用 zip
运算符
并行化操作的更多详细信息
假设我有这个同步方法:
public FruitBowl getFruitBowl() {
Apple apple = getApple(); // IO intensive
Banana banana = getBanana(); // CPU intensive
return new FruitBowl(apple, banana);
}
我可以使用 Java 并发 API 将其转换为异步方法,结果会像这样:
public Future<FruitBowl> getFruitBowl() {
Future<Apple> appleFuture = getAppleAsync(); // IO intensive
Future<Banana> bananaFuture = getBananaAsync(); // CPU intensive
return createFruitBowlAsync(appleFuture, bananaFuture); // Awaits appleFuture and bananaFuture and then returns a new FruitBowl
}
在利用它的调度程序(io 和计算)和 return Single 的同时,惯用的 Rx 方法是什么?
您可以使用 zip
运算符。并为每个异步操作定义不同的线程。如果不这样做,这些方法将在同一个线程上一个接一个地执行。
我将创建这两种方法的 Observable
版本,以便分别 return Observable<Apple>
和 Observable<Banana>
并以这种方式使用它们:
Observalbe.zip(getAppleObservable().subscribeOn(Schedulers.newThread()),
getBananaObservable().subscribeOn(Schedulers.newThread()),
(apple, banana) -> new FruitBowl(apple, banana)))
.subscribe(/* do your work here with FruitBowl object */);
Here 有关如何使用 zip
运算符