在 RXjava2 中调用 ObservableEmitter 的 onError 时发生 UndeliverableException

UndeliverableException while calling onError of ObservableEmitter in RXjava2

我有一个创建发射器的方法,如下所示,在改造回调中调用 onError 时出现问题(可能是正常行为)。尝试调用 onError 时出现 UndeliverableException。

我可以通过检查 subscriber.isDiposed() 来解决这个问题我想知道如何调用 onError 因为我需要通知我的 UI 级别。

  • Addition 1
--> RxJava2CallAdapterFactoryalready implemented   
private static Retrofit.Builder builderSwift = new Retrofit.Builder()
           .baseUrl(URL_SWIFT)
           .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
           .addConverterFactory(GsonConverterFactory.create())
           .addConverterFactory(new ToStringConverterFactory());

--> When i added below code to application class app won't crash
--> but i get java.lang.exception instead of my custom exception  

RxJavaPlugins.setErrorHandler(Functions<Throwable>emptyConsumer());

@Override
    public void onFileUploadError(Throwable e) {
        Log.d(TAG, "onFileUploadError: " + e.getMessage());
    }
public Observable<UploadResponseBean> upload(final UploadRequestBean uploadRequestBean, final File file) {
    return Observable.create(new ObservableOnSubscribe<UploadResponseBean>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<UploadResponseBean> subscriber) throws Exception {

                 // ---> There are no problem with subscriber while calling onError        

                // ---> Retrofit2 service request 
                ftsService.upload(token, uploadRequestBean, body).enqueue(new Callback<UploadResponseBean>() {
                    @Override
                    public void onResponse(Call<UploadResponseBean> call, Response<UploadResponseBean> response) {

                        if (response.code() == 200){
                            // --->  calling onNext works properly
                            subscriber.onNext(new UploadResponseBean(response.body().getUrl()));
                        }
                        else{
                            // --->  calling onError throws UndeliverableException
                            subscriber.onError(new NetworkConnectionException(response.message()));                
                        }
                    }

                    @Override
                    public void onFailure(Call call, Throwable t) {
                        subscriber.onError(new NetworkConnectionException(t.getMessage()));
                    }
                });
        }
    });
}

问题就像你说你需要检查 Subscriber 是否已经被释放,那是因为 RxJava2 对于 Subscriber 已经被释放后抛出的错误更加严格。
RxJava2 将这种错误传递给 RxJavaPlugins.onError,默认情况下打印到堆栈跟踪并调用线程未捕获的异常处理程序。你可以阅读完整的解释 here.

现在这里发生了什么,你可能在查询完成和错误传递之前取消订阅(处理)这个 Observable,因此 - 你得到 UndeliverableException.

I wonder how can call onError coz i need to notify my UI level.

因为这是在您的 UI 退订后发生的,所以 UI 不应该在意。在正常流程中,此错误应该正确传送。

关于您的实施的一些一般要点:

  • 如果您之前取消订阅,onError 也会出现同样的问题。
  • 这里没有取消逻辑(这就是导致此问题的原因)所以即使 Subscriber 取消订阅,请求也会继续。
  • 即使您将实现此逻辑(使用 ObservableEmitter.setCancellable() / setDisposable()),如果您在请求完成之前取消订阅,您仍然会遇到此问题 - 这将导致取消和您的 onFailure 逻辑将调用 onError() 并且会发生同样的问题。
  • 当您通过 Retrofit 执行异步调用时,指定的订阅 Scheduler 不会使实际请求发生在 Scheduler 线程上,而只会发生在订阅上。您可以使用 Observable.fromCallable 和 Retrofit 阻塞调用 execute 来更好地控制实际发生的线程调用。

总结一下—— 在这种情况下,用 ObservableEmitter.isDiposed() 保护对 onError() 的调用是一个很好的做法。
但我认为最好的做法是使用 Retrofit RxJava 调用适配器,这样你就可以完成 Observable 进行 Retrofit 调用并且已经考虑了所有这些因素。

自版本 2.1.1 tryOnError 可用以来:

The emitter API (such as FlowableEmitter, SingleEmitter, etc.) now features a new method, tryOnError that tries to emit the Throwable if the sequence is not cancelled/disposed. Unlike the regular onError, if the downstream is no longer willing to accept events, the method returns false and doesn't signal an UndeliverableException.

https://github.com/ReactiveX/RxJava/blob/2.x/CHANGES.md

我发现这个问题是由于在 Fragment 中检索视图模型时使用了不正确的上下文引起的:

ViewModelProviders.of(requireActivity(), myViewModelFactory).get(MyViewModel.class);

因此,视图模型存在于 activity 而不是片段的上下文中。将其更改为以下代码可解决问题。

ViewModelProviders.of(this, myViewModelFactory).get(MyViewModel.class);