RxJava2 将两个 Single 转换为 Completable

RxJava2 Convert two Single's into Completable

我有两个单曲,我想做的是将它们转换成一个 Completable

 final Single<Boolean> httpRequestOne = createHttpRequestOne();
 final Single<Boolean> httpRequestTwo = createHttpRequestTwo();

如果两个单选 return 为真,Completable 的结果应该是 onSuccess 否则会是 onError。

我也希望它们 运行 并行,所以我认为 concat 在这里无济于事

final Flowable<Boolean> flowable = httpRequestOne.concatWith(httpRequestTwo);

我会使用 zip 运算符,这样您就可以:

  • 合并两个操作的结果
  • 并行化工作(订阅不同的调度程序)

最终您可以将这两个结果转换为您想要的单个结果(在您的情况下,它只是一个逻辑与)。

private Completable createCompletable() {
        return Single.zip(createHttpRequestOne().subscribeOn(Schedulers.newThread()),
                createHttpRequestTwo().subscribeOn(Schedulers.newThread()), (b1, b2) -> b1 && b2)
                .flatMap(new Function<Boolean, SingleSource<Boolean>>() {
                    @Override
                    public SingleSource<Boolean> apply(@NonNull Boolean aBoolean) throws Exception {
                        if (aBoolean) {
                            return Single.just(true);
                        } else {
                            return Single.error(new Throwable("one of the Single returned FALSE"));
                        }
                    }
                }).toCompletable();
    }

然后订阅它:

createCompletable()
     .subscribe(() -> logd("onCompelte"),
                throwable -> logd("error: " + throwable.getMessage()));

如果 Single 都完成发射 TRUE,onComplete 将从你的可完成发射,否则你会得到一个错误,并且 onError 将在你的订阅中被触发。