如何正确使用 Observable.zip() 组合多个 Retrofit 调用?

How to properly use Observable.zip() to combine multiple Retrofit call?

我是 RxJava 的新手。我的要求是在开始时进行 3 次改装调用,然后等到所有主题都执行完毕。这是我已经实现的,它工作得很好,但我想知道,这段代码是否可以比这更好,我是否正确地实现了调度程序。

public class CombinedGroupProductPage {
    private List<Product> groupProductList;
    private List<Product> relatedProductList;
    private List<Product> upsellProductList;
    //constructor and getter setters here
    .....
    .....
}

Here is my implementation

private void getAllData() {
    loadingProgress.setVisibility(View.VISIBLE);

    ApiInterface apiService = ApiClient.getRxClient().create(ApiInterface.class);
    Observable<List<Product>> call = apiService.getRelatedProduct1(gson.toJson(model.getGroupedProducts()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.io());
    Observable<List<Product>> call1 = apiService.getRelatedProduct1(gson.toJson(model.getRelatedIds()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.io());
    Observable<List<Product>> call2 = apiService.getRelatedProduct1(gson.toJson(model.getUpsellIds()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.io());


    Observable<CombinedGroupProductPage> combined = Observable.zip(call, call1, call2, new Function3<List<Product>, List<Product>, List<Product>, CombinedGroupProductPage>() {

        @Override
        public CombinedGroupProductPage apply(List<Product> list, List<Product> list2, List<Product> list3) throws Exception {
            return new CombinedGroupProductPage(list, list2, list3);
        }
    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
    combined.subscribe(new Observer<CombinedGroupProductPage>() {
        @Override
        public void onSubscribe(Disposable d) {
            // loadingProgress.setVisibility(View.VISIBLE);
        }

        @Override
        public void onNext(CombinedGroupProductPage combinedGroupProductPage) {
            Log.e("Tag", combinedGroupProductPage.toString());
            Log.e("Tag", combinedGroupProductPage.getGroupProductList().get(0).getName());
            loadingProgress.setVisibility(View.GONE);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}

请问这段代码能不能缩减?如有任何帮助,我们将不胜感激。

使用 Schedulers.io() 而不是 Schedulers.newThread()Schedulers.io() 使用线程池而 Schedulers.newThread() 不使用。创建线程代价高昂,应尽可能避免。

使用 .subscribeOn(Schedulers.io()) 进行不同的调用还可以让您删除现在无用的 .observeOn(Schedulers.io())