RxJava 组合请求序列
RxJava Combine Sequence Of Requests
问题
我有两个 Api。 Api 1 给了我一个项目列表,Api 2 给了我从 Api 1 得到的每个项目的更详细信息。到目前为止我解决它的方式导致性能不佳.
问题
在 Retrofit 和 RxJava 的帮助下高效快速地解决了这个问题。
我的方法
目前我的解决方案如下所示:
第 1 步:Retrofit 从 Api 1.
执行 Single<ArrayList<Information>>
第 2 步:我遍历这些项目并向每个项目发出请求 Api 2.
第三步:Retrofit Returns依次执行Single<ExtendedInformation>
for
每一项
第 4 步:在所有调用表单 Api 2 完全执行后,我为所有项目创建一个新对象,结合信息和扩展信息。
我的代码
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
flatMap
运算符旨在满足这些类型的工作流程。
我将用一个简单的五步示例来概括概括。希望您可以轻松地在代码中重构相同的原则:
@Test fun flatMapExample() {
// (1) constructing a fake stream that emits a list of values
Observable.just(listOf(1, 2, 3, 4, 5))
// (2) convert our List emission into a stream of its constituent values
.flatMap { numbers -> Observable.fromIterable(numbers) }
// (3) subsequently convert each individual value emission into an Observable of some
// newly calculated type
.flatMap { number ->
when(number) {
1 -> Observable.just("A1")
2 -> Observable.just("B2")
3 -> Observable.just("C3")
4 -> Observable.just("D4")
5 -> Observable.just("E5")
else -> throw RuntimeException("Unexpected value for number [$number]")
}
}
// (4) collect all the final emissions into a list
.toList()
.subscribeBy(
onSuccess = {
// (5) handle all the combined results (in list form) here
println("## onNext($it)")
},
onError = { error ->
println("## onError(${error.message})")
}
)
}
(顺便说一句,如果排放的顺序很重要,请改用 concatMap
)。
希望对您有所帮助。
tl;dr 使用 concatMapEager
或 flatMap
并异步或在调度程序上执行子调用。
长话短说
我不是 android 开发人员,所以我的问题仅限于纯 RxJava(版本 1 和版本 2)。
如果我把图片弄对了,所需的流程是:
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
假设 Retrofit 为
生成了客户端
interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
如果项目的顺序不重要,那么可以只使用 flatMap
:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
但是仅当改造构建器配置为
要么使用异步适配器(调用将在 okhttp 内部执行器中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行器。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
或者使用基于调度器的适配器(调用将在 RxJava 调度器上进行调度)。这是我的首选,因为您明确选择了使用哪个调度程序,它很可能是 IO 调度程序,但您可以自由尝试不同的调度程序。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
原因是 flatMap
将订阅 api2.extendedInfo(...)
创建的每个可观察对象并将它们合并到生成的可观察对象中。因此结果将按照收到的顺序显示。
如果改造客户端未设置为异步或在调度程序上设置为运行,则有可能设置一个:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
这个结构与前一个结构几乎相同,它指示 在本地 每个 api2.extendedInfo
应该 运行.[=34= 在哪个调度程序上执行]
可以调整flatMap
的maxConcurrency
参数来控制同时执行多少个请求。尽管我对此持谨慎态度,但您不希望 运行 同时进行所有查询。通常默认的 maxConcurrency
就足够了 (128
).
现在如果原始查询的顺序很重要。 concatMap
通常是按顺序但按顺序执行与 flatMap
相同的操作的运算符,如果代码需要等待所有子查询执行,则结果会很慢。虽然解决方案比 concatMapEager
更进一步,但这个解决方案将按顺序订阅 observable,并根据需要缓冲结果。
假设改装客户端是异步的或 运行 在特定的调度程序上:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
或者如果必须在本地设置调度程序:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
也可以在此运算符中调整并发性。
此外,如果 Api 返回 Flowable
,则可以在 RxJava 2.1.7 中使用目前仍处于测试阶段的 .parallel
。但是结果不按顺序排列,我不知道如何(还?)在没有排序的情况下对它们进行排序。
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
我用 RxJava2 解决了类似的问题。并行执行 Api 2 的请求稍微加快了工作速度。
private InformationRepository informationRepository;
//init....
public Single<List<FullInformation>> getFullInformation() {
return informationRepository.getInformationList()
.subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(this::getFullInformation)
.collect(ArrayList::new, List::add);
}
private Single<FullInformation> getFullInformation(Information information) {
return informationRepository.getExtendedInformation(information)
.map(extendedInformation -> new FullInformation(information, extendedInformation))
.subscribeOn(Schedulers.io());//execute requests in parallel
}
InformationRepository - 只是界面。它的实现对我们来说并不有趣。
public interface InformationRepository {
Single<List<Information>> getInformationList();
Single<ExtendedInformation> getExtendedInformation(Information information);
}
FullInformation - 结果容器。
public class FullInformation {
private Information information;
private ExtendedInformation extendedInformation;
public FullInformation(Information information, ExtendedInformation extendedInformation) {
this.information = information;
this.extendedInformation = extendedInformation;
}
}
检查下方是否正常。
假设您需要进行多个网络调用——调用以获取 Github 用户信息和 Github 用户事件等。
并且您想在更新 UI 之前等待每个 return。 RxJava 可以在这里帮助你。
让我们首先定义我们的 Retrofit 对象来访问 Github 的 API,然后为两个网络请求调用设置两个可观察对象。
Retrofit repo = new Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
Observable<JsonObject> userObservable = repo
.create(GitHubUser.class)
.getUser(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Observable<JsonArray> eventsObservable = repo
.create(GitHubEvents.class)
.listEvents(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
它使用的界面如下:
public interface GitHubUser {
@GET("users/{user}")
Observable<JsonObject> getUser(@Path("user") String user);
}
public interface GitHubEvents {
@GET("users/{user}/events")
Observable<JsonArray> listEvents(@Path("user") String user);
}
在我们使用 RxJava 的 zip 方法合并我们的两个 Observable 并等待它们完成之后再创建一个新的 Observable。
Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
@Override
public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
return new UserAndEvents(jsonObject, jsonElements);
}
});
最后让我们在新组合的 Observable 上调用 subscribe 方法:
combined.subscribe(new Subscriber<UserAndEvents>() {
...
@Override
public void onNext(UserAndEvents o) {
// You can access the results of the
// two observabes via the POJO now
}
});
不再需要在线程等中等待网络调用完成。 RxJava 已经在 zip() 中为你完成了所有这些工作。
希望我的回答对你有帮助。
尝试使用 Observable.zip()
运算符。它将等到两个 Api 调用完成后再继续流。然后你可以通过调用 flatMap()
插入一些逻辑。
问题
我有两个 Api。 Api 1 给了我一个项目列表,Api 2 给了我从 Api 1 得到的每个项目的更详细信息。到目前为止我解决它的方式导致性能不佳.
问题
在 Retrofit 和 RxJava 的帮助下高效快速地解决了这个问题。
我的方法
目前我的解决方案如下所示:
第 1 步:Retrofit 从 Api 1.
执行Single<ArrayList<Information>>
第 2 步:我遍历这些项目并向每个项目发出请求 Api 2.
第三步:Retrofit Returns依次执行Single<ExtendedInformation>
for
每一项
第 4 步:在所有调用表单 Api 2 完全执行后,我为所有项目创建一个新对象,结合信息和扩展信息。
我的代码
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
flatMap
运算符旨在满足这些类型的工作流程。
我将用一个简单的五步示例来概括概括。希望您可以轻松地在代码中重构相同的原则:
@Test fun flatMapExample() {
// (1) constructing a fake stream that emits a list of values
Observable.just(listOf(1, 2, 3, 4, 5))
// (2) convert our List emission into a stream of its constituent values
.flatMap { numbers -> Observable.fromIterable(numbers) }
// (3) subsequently convert each individual value emission into an Observable of some
// newly calculated type
.flatMap { number ->
when(number) {
1 -> Observable.just("A1")
2 -> Observable.just("B2")
3 -> Observable.just("C3")
4 -> Observable.just("D4")
5 -> Observable.just("E5")
else -> throw RuntimeException("Unexpected value for number [$number]")
}
}
// (4) collect all the final emissions into a list
.toList()
.subscribeBy(
onSuccess = {
// (5) handle all the combined results (in list form) here
println("## onNext($it)")
},
onError = { error ->
println("## onError(${error.message})")
}
)
}
(顺便说一句,如果排放的顺序很重要,请改用 concatMap
)。
希望对您有所帮助。
tl;dr 使用 concatMapEager
或 flatMap
并异步或在调度程序上执行子调用。
长话短说
我不是 android 开发人员,所以我的问题仅限于纯 RxJava(版本 1 和版本 2)。
如果我把图片弄对了,所需的流程是:
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
假设 Retrofit 为
生成了客户端interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
如果项目的顺序不重要,那么可以只使用 flatMap
:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
但是仅当改造构建器配置为
要么使用异步适配器(调用将在 okhttp 内部执行器中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行器。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
或者使用基于调度器的适配器(调用将在 RxJava 调度器上进行调度)。这是我的首选,因为您明确选择了使用哪个调度程序,它很可能是 IO 调度程序,但您可以自由尝试不同的调度程序。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
原因是 flatMap
将订阅 api2.extendedInfo(...)
创建的每个可观察对象并将它们合并到生成的可观察对象中。因此结果将按照收到的顺序显示。
如果改造客户端未设置为异步或在调度程序上设置为运行,则有可能设置一个:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
这个结构与前一个结构几乎相同,它指示 在本地 每个 api2.extendedInfo
应该 运行.[=34= 在哪个调度程序上执行]
可以调整flatMap
的maxConcurrency
参数来控制同时执行多少个请求。尽管我对此持谨慎态度,但您不希望 运行 同时进行所有查询。通常默认的 maxConcurrency
就足够了 (128
).
现在如果原始查询的顺序很重要。 concatMap
通常是按顺序但按顺序执行与 flatMap
相同的操作的运算符,如果代码需要等待所有子查询执行,则结果会很慢。虽然解决方案比 concatMapEager
更进一步,但这个解决方案将按顺序订阅 observable,并根据需要缓冲结果。
假设改装客户端是异步的或 运行 在特定的调度程序上:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
或者如果必须在本地设置调度程序:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
也可以在此运算符中调整并发性。
此外,如果 Api 返回 Flowable
,则可以在 RxJava 2.1.7 中使用目前仍处于测试阶段的 .parallel
。但是结果不按顺序排列,我不知道如何(还?)在没有排序的情况下对它们进行排序。
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
我用 RxJava2 解决了类似的问题。并行执行 Api 2 的请求稍微加快了工作速度。
private InformationRepository informationRepository;
//init....
public Single<List<FullInformation>> getFullInformation() {
return informationRepository.getInformationList()
.subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(this::getFullInformation)
.collect(ArrayList::new, List::add);
}
private Single<FullInformation> getFullInformation(Information information) {
return informationRepository.getExtendedInformation(information)
.map(extendedInformation -> new FullInformation(information, extendedInformation))
.subscribeOn(Schedulers.io());//execute requests in parallel
}
InformationRepository - 只是界面。它的实现对我们来说并不有趣。
public interface InformationRepository {
Single<List<Information>> getInformationList();
Single<ExtendedInformation> getExtendedInformation(Information information);
}
FullInformation - 结果容器。
public class FullInformation {
private Information information;
private ExtendedInformation extendedInformation;
public FullInformation(Information information, ExtendedInformation extendedInformation) {
this.information = information;
this.extendedInformation = extendedInformation;
}
}
检查下方是否正常。
假设您需要进行多个网络调用——调用以获取 Github 用户信息和 Github 用户事件等。
并且您想在更新 UI 之前等待每个 return。 RxJava 可以在这里帮助你。 让我们首先定义我们的 Retrofit 对象来访问 Github 的 API,然后为两个网络请求调用设置两个可观察对象。
Retrofit repo = new Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
Observable<JsonObject> userObservable = repo
.create(GitHubUser.class)
.getUser(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Observable<JsonArray> eventsObservable = repo
.create(GitHubEvents.class)
.listEvents(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
它使用的界面如下:
public interface GitHubUser {
@GET("users/{user}")
Observable<JsonObject> getUser(@Path("user") String user);
}
public interface GitHubEvents {
@GET("users/{user}/events")
Observable<JsonArray> listEvents(@Path("user") String user);
}
在我们使用 RxJava 的 zip 方法合并我们的两个 Observable 并等待它们完成之后再创建一个新的 Observable。
Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
@Override
public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
return new UserAndEvents(jsonObject, jsonElements);
}
});
最后让我们在新组合的 Observable 上调用 subscribe 方法:
combined.subscribe(new Subscriber<UserAndEvents>() {
...
@Override
public void onNext(UserAndEvents o) {
// You can access the results of the
// two observabes via the POJO now
}
});
不再需要在线程等中等待网络调用完成。 RxJava 已经在 zip() 中为你完成了所有这些工作。 希望我的回答对你有帮助。
尝试使用 Observable.zip()
运算符。它将等到两个 Api 调用完成后再继续流。然后你可以通过调用 flatMap()
插入一些逻辑。