RxJava:如何在取消订阅时中断线程?

RxJava: How to interrupt thread on unsubscribe?

我使用 Observable.create() 创建一个可观察对象以在调度程序上执行一些工作(例如 Schedulers.io() 然后 return AndroidSchedulers.mainThread() 上的结果。

val subscription = observable<T> {
        try {
            // perform action synchronously
            it.onNext(action.invoke(context, args))
            it.onCompleted()
        } catch (t: Exception) {
            it.onError(t)
        }
    }.subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    {
                        // handle result here
                        result.set(it)
                    },
                    {
                        // handle error here
                        errorHandler.handleTaskError(model, this, it)
                    },
                    {
                        // notify completed
                        model.completeTask(this)
                    }
            )

action.invoke()里面的操作是同步的,可能是阻塞IO操作。当用户决定取消它时,我取消订阅可观察的:subscription.unsubscribe()

但是,I/O 操作没有被中断。有没有rx-javaAPI中断运行?

据我所知,使用 RxJava 是不可能的。

您可以在可观察对象之外保留对 action 对象的引用,并在 unsubscribe 调用时中断它。

当您调用yourSubscription.unsubscribe();时,Rx会调用您的退订代码。

此取消订阅代码将是 Subscription class,当您创建 Observable.[=18= 时,您可以 addsubscriber ]

Observable<Object> obs = Observable.create(subscriber -> {

      subscriber.add(new Subscription() {
            @Override
            public void unsubscribe() {
                 // perform unsubscription
            }

            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        });
      subscriber.onNext(/**...*/);
      subscriber.onCompleted();
}

所以在 unsubscribe 方法中,如果有办法的话,你可以中断你的工作。

请注意,当您取消订阅一个 Observable 或一个 Observable 完成时,将调用取消订阅方法。 (它会自行退订)

编辑 : 考虑vladimir mironov评论

RxJava 调度程序使用 Furure.cancel(true) 来中断 运行 操作,因此如果您有中断敏感操作(例如 Thread.sleep()),您应该会看到线程中断,尽管您对此无能为力,因为此时整个下游链已被取消订阅,并且没有更多事件可以到达您的订阅者。

如果您的操作对中断不敏感并且没有提供任何中断它的标准方法(例如在套接字上调用 close(),或在 [=21= 上调用 cancel() ] 声明),RxJava 对此无能为力。

如果您可以异步调用一些 close() 方法,请考虑使用 using(),它让您指定一个清理回调,可以在取消订阅时关闭您的资源。