Retrofit 和 RxJava:做多个独立的请求并检测它们何时全部完成

Retrofit & RxJava : do multiple independant request and detect when they are all completed

在启动我的应用程序时,我需要加载多个独立数据并检查它们是否都已正确加载。

我所做的是:首先进行身份验证,然后发送所有加载数据的请求。 每个调用都是独立的,它只是用来更新本地数据库。 通话效果很好。

但是我如何得知所有请求都已完成?

public static void loadData() {

        LoginService loginService =
                RetrofitHelper.createService(LoginService.class);

        user = new User(ApplicationCore.syncLogin, ApplicationCore.syncPassword);

        loginService.login(user)
                .doOnNext(accessToken -> storeCredentials(accessToken))
                .doOnNext(a -> processData1())
                .doOnNext(a -> processData2())
                .doOnNext(a -> processData3())
                .doOnNext(a -> processData4())
                .subscribeOn(Schedulers.io())
                .onErrorResumeNext(Observable.empty())
                .subscribe(a -> Log.d("XXX","*********** END *********"));
    }

private static DisposableSingleObserver processData1 () {


        return RetrofitHelper.createService(Data1Service.class, true, authType, authToken).fetchAll(ApplicationCore.appVersionNum)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableSingleObserver<List<Data1>>() {
                    @Override
                    public void onSuccess(List<Data1> dataList) {
                        if (Data1DB.updateData(new ArrayList<>(dataList)) ) {  // success

                        } else {

                            Log.d(TAG,"Error on processData1");
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG,"Error on processData1",e);
                    }
                });
    }

在我的日志中,我看到“*********** END *********”在收到数据之前的网络调用后立即显示。

我希望能够观察到最后一个数据被处理的时间。

有几个选项可用,但请考虑这个(伪代码):

    loginService.login(user)
            .doOnNext(accessToken -> storeCredentials(accessToken))
            .flatMapCompletable(a -> Completable.mergeArray(
                     fetchAll1(...)
                         .map(data->...Data1DB.updateData...)
                         .toCompletable(),
                     fetchAll2(...)
                     ...
                 )
             )
             .subscribe(...)

现在是这样实现的。 如果每个 processDataX 函数中没有 "subscribeOn",则每个 "processData1()" 将按顺序执行,如果添加了 "subscribeOn",则将并行执行。

然后当所有完成后执行订阅。

          public static void loadRefMatrix() {

                LoginService loginService =
                        RetrofitHelper.createService(LoginService.class);

                user = new User(ApplicationCore.syncLogin, ApplicationCore.syncPassword);
        loginService.login(user)
                .doOnNext(accessToken -> storeCredentials(accessToken))
                .flatMapCompletable(a -> Completable.mergeArray(
                        processData1(),
                        processData2(),
                        processData3(),
                        processData4(),
                        processData5(),
                        processData6(),
                        processData7()
                        )
                )         
    .subscribeOn(Schedulers.io())
                .subscribe(new DisposableCompletableObserver() {
    @Override
    public void onComplete() {
        Log.d(TAG, "*********** END *********");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "*********** ERROR *********");
    }
})

 )
                ;
            }

         private static Completable processData1 () {

                return RetrofitHelper.createService(Data1Service.class, true, authType, authToken).fetchAll(ApplicationCore.appVersionNum)
                      .map(data -> {
                                if (Data1DB.updatData(new ArrayList<>(data)) ) {  // success

                                } else {
                                    Log.d(TAG,"Error on processAcuityMatrix do data");
                                }
                                return true;
                            })
     .subscribeOn(Schedulers.io())  // <-- this to process in parallele                   
    .toCompletable();
        }