RxJava:从 Nullable 对象创建可观察对象

RxJava : Creating observable from a Nullable object

我是 RxJava 的新手。我有这样的代码。我正在从 repository.getStatus() 返回的字符串创建一个可观察对象。如果它是 null ,我必须继续使用 proceed 方法而不做任何事情。如果它不是 null ,我必须先调用 repository.init(),然后再调用 proceed。这是我所做的。

Flowable.just(repository.getStatus()) // getStatus return a string , which can be null
        .doOnError(throwable -> proceed())
        .flatMapCompletable(s -> repository.init())
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new DisposableCompletableObserver() {
                       @Override
                       public void onComplete() {
                           proceed();
                       }

                       @Override
                       public void onError(@NonNull Throwable e) {
                            handleErrors(t));
                       }
                  });

private void proceed(){
    // 
}

万一 repository.getStatus() 为 null ,它不会调用 doOnError(throwable -> proceed()) 吗? 现在当 repository.getStatus() 为空时它正在崩溃。处理这种用例的最佳 Rx 方法是什么?

由于您使用的是 Rx2,因此您可以使用新的 Maybe 类型,它相当于 Optional 的流式传输。不幸的是,它没有 fromNullable 工厂方法(参见 https://github.com/ReactiveX/RxJava/issues/5019),但你可以使用这个:

Maybe.fromCallable(() -> repository.getStatus())

而不是管道 repository.getStatus()repository.init() ,你可以根据你的条件从这两个中的任何一个创建一个可观察的。

尝试这样的事情

    Flowable.just(repository.getStatus() == null ? Completable.complete() : repository.init())
            .flatMapCompletable(c -> c)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableCompletableObserver() {
                @Override
                public void onComplete() {
                    proceed();
                }

                @Override
                public void onError(@NonNull Throwable e) {
                     handleErrors(t));
                }
            });

你甚至可以让你的 repository.getStatus() 成为 MayBe 并做这样的事情

Maybe.create((MaybeOnSubscribe<String>) e -> {
        if (repository.getStatus() != null) e.onSuccess(repository.getStatus());
        e.onComplete();
    }).flatMapCompletable(s -> Completable.fromAction(() -> repository.init()))
            .subscribe(new DisposableCompletableObserver() {
                @Override
                public void onComplete() {
                    proceed();
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    handleErrors(t));
                }
            });