如何以 RxJava2 的方式组织异步调用链

How to organise chain of async calls in RxJava2 way

我将下一个代码封装在 AsyncTask 中 class

        AsyncTask<Void, Void, Void> task = new AsyncTask<Void, Void, Void>() {
        @Override
        protected Void doInBackground(Void... params) {
            try {
                if (NetworkUtils.isNetworkConnected(mContext))
                    mIndicatorTable.sync().blockingGet();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } finally {
                try {
                    final MobileServiceList<IndicatorModel> results = mIndicatorTable.table().read(null).blockingGet();
                    if (isViewAttached()) {
                        getMvpView().refreshIndicatorList(results);
                        getMvpView().hideLoading();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return null;
        };

mIndicatorTable.sync()mIndicatorTable.table().read(null) 都是 returns Single<Boolean>mIndicatorTable.sync() - 将本地存储与远程存储同步,如果网络不可用,那么我们只在网络准备就绪时使用 mIndicatorTable.table().read(null) 从本地存储读取 - 我们执行同步,然后从本地存储读取(我们不关心如果同步取消或中断)。毕竟我们应该调用View来刷新RecycleView。如何用 RxJava2 实现?

#更新

工作版本

getDataManager().syncAll()
     .onErrorResumeNext(Single.just(false))
     .flatMap(list -> toSingle(mIndicatorTable.getTable().read(null)))
     .subscribeOn(Schedulers.newThread())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(results -> {
        if (isViewAttached()) {
            getMvpView().refreshIndicatorList(results);
            getMvpView().hideLoading();
        }
     })

rx-java 只需调用 .flatMap() 即可轻松链接异步调用。

mIndicatorTable.sync()
        .flatMap(new Function<Boolean, SingleSource<MobileServiceList<IndicatorModel>>>() {
            @Override
            public SingleSource<MobileServiceList<IndicatorModel>> apply(Boolean aBoolean) throws Exception {
                return mIndicatorTable.table().read(null);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(...)

但是您想在另一个线程中继续调用 sync(),而 RecycleView 只能从主线程访问。这就是 rx-android 派上用场的地方。 .subscribeOn(Schedulers.io()) 在另一个线程中实例化流,并且 .observeOn(AndroidSchedulers.mainThread()) 确保在主线程中执行此行下方的语句(此处为 .subscribe(...))。