Android RxJava2 应用程序在从创建的可观察对象调用 emitter.onError() 时崩溃

Android RxJava2 App crashing on calling emitter.onError() from created observable

我正在 create() 的帮助下手动创建 Observable。现在在里面,我检查了一些条件并基于此,我想通知订户有关错误。以下是我创建可观察对象的方式:

public Observable<User> loginUser(String email, String password) {
    return Observable.create(
        emitter -> {
            myAsynchronousWork.onCompleteListener(
               result -> {
                   if(!result.isSuccess()) {
                      // This causes the crash.
                      emitter.onError(new Throwable(result.getError()));
                   } else {
                      // Process result & create User object & return it. This works as expected.
                      emitter.onNext(user);
                      emitter.onComplete();
                   }
               }
            );

        }
    );

}

& 然后我订阅 loginUser() 喜欢:

loginUser("", "")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(user -> {
                    Log.d("TAG", "logged in user => " + user.getUuid());
                    Log.d("TAG", "user name => " + user.getUserName());
                    Log.d("TAG", "user email => " + user.getEmailId());
                }, throwable -> {
                    Log.e("TAG", "error in login => " + throwable.getMessage());
                }, () -> {

                });

我希望调用 emitter.onError() 应该进入 loginUser()subscribe() 的 onError 内部,我在其中记录了异常,但应用程序因 [= 返回的异常而崩溃17=] 在 logcat 就像没有人来处理它!

我通过调试进行了检查,发现在线 emitter.onError() 时,emitter"null"。但是 onNext 和 onComplete 不会造成任何问题。 请让我知道我做错了什么?

创建可观察对象时,您必须自己处理所有可能的错误。所以您的代码应该如下所示:

public Observable<User> loginUser(String email, String password) {
    return Observable.create(
        emitter -> {
          try{
            myAsynchronousWork.onCompleteListener(
               result -> {
                 try{
                   if(!result.isSuccess()) {
                      // This causes the crash.
                      emitter.onError(new Throwable(result.getError()));
                   } else {
                      // Process result & create User object & return it. This works as expected.
                      emitter.onNext(user);
                      emitter.onComplete();
                   }
                 }catch(Exception e){
                    emitter.onError(e);
                 }
               }
            );
         }catch(Exception e){
            emitter.onError(e);
        }

        }
    );

}

PS:在创建 observable 和 disposable 时始终确保错误检查。

我的应用程序崩溃的原因是,我实际上遇到了这样的事情:

myAsynchronousWork.onCompleteListener(
               result -> {
                   if(!result.isSuccess()) {
                      // This causes the crash.
                      emitter.onError(new Throwable(result.getError()));
                   } else {
                      // Process result & create User object & return it. This works as expected.
                      emitter.onNext(user);
                      emitter.onComplete();
                   }
               },
               exception -> {
                   emitter.onError(exception); // This was the reason of problem!
               }
            );

我没有发布完整的部分,因为我不知道这是导致异常的原因。

这里发生的事情是,我的代码首先进入 exception-> 部分,该部分通知观察者有关错误并且观察者正在终止。现在我的 result-> 部分正在执行 & 在这里当我试图再次调用 emitter.onError() 时,它崩溃了,因为没有 emitter 来处理这个 & RxJava2 正在全局抛出它。

所以对我来说,我删除了 exception -> 部分,因为它会在 result -> 上出现,我可以在其中检查结果并且我还用 emitter.onError 包装了

if(!emitter.isDisposed()) emitter.onError();

因为在我的例子中,如果发射器被处置,忽略错误是可以的。

使用 emitter.tryOnError 对我有用,它会在 subscribe() 内调用错误,也不会 UndeliverableException 或应用程序停止 运行。