致命异常:RxCachedThreadScheduler-1 触发器处理时。为什么?

FATAL EXCEPTION: RxCachedThreadScheduler-1 when trigger dispose. Why?

我有以下 RxJava 2 代码(在 Kotlin 中),它有一个 Observable

disposable = Observable.create<String>({
    subscriber ->
            try {
                Thread.sleep(2000)
                subscriber.onNext("Test")
                subscriber.onComplete()
            } catch (exception: Exception) {
                subscriber.onError(exception)
            }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({ result -> Log.d("Test", "Completed $result") },
             { error -> Log.e("Test", "Completed ${error.message}") })

趁着还是Thread.sleep(2000),我执行disposable?.dispose()调用,会报错

FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.elyeproj.rxstate, PID: 10202
java.lang.InterruptedException 
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:371)
    at java.lang.Thread.sleep(Thread.java:313)
    at presenter.MainPresenter$loadData.subscribe(MainPresenter.kt:41)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)

我希望 dispose 可以帮助以静默方式取消操作,或者至多在订阅时使用 Log.e 捕获错误。但是,它只是按照上面的错误消息崩溃了。

Exception为什么会逃逸? dispose 不是应该在不崩溃的情况下悄悄取消整个操作吗?

这里有多种因素的组合:

    使用 subscribeOn 的流的
  1. dispose 也会处理所使用的线程。这也涉及在使用 Schedulers.io() 时调用 Thread.interrupt()。这会导致您的情况出现异常。
  2. InterruptedException 是由 Thread.sleep 抛出的 Exception,因此它会被您的代码捕获并像任何其他异常一样传递给 onError
  3. dispose 之后调用 onError 将错误重定向到全局错误处理程序,这是由于 RxJava2 的从不丢弃错误的策略。要解决这个问题,请在调用 onError 之前检查 subscriber.isDisposed() 或使用 RxJava 2.1.1 的新 subscriber.tryOnError.

    if (!subscriber.isDisposed()) {
      subscriber.onError(exception)
    }
    

如果您使用的是 rxjava2,请将此添加到您的初始化代码中

RxJavaPlugins.setErrorHandler(t -> {
        logger.log(Level.SEVERE,null, t);
    });

希望对您有所帮助