在 RxJava 中取消一个 Observable

Cancelling an Observable in RxJava

我有一个正在执行下载的 Observable。但是,我想在单击按钮时取消该可观察对象。

这怎么可能?

谢谢。

您无法真正告诉一个可观察对象取消网络请求,因为 RxJava 不知道可观察对象内部的代码 运行。你可以做的是使用doOnUnsubscribe检查网络请求是否仍然是运行,如果是取消它,那么当你想取消时取消订阅即可。

这应该给您大概的想法。总结就是用Observable.using()订阅ObservableSubscriber这样当用户点击取消按钮的时候可以调用subscriber.unsubscribe()

这是大纲。

Subscriber<T> subscriber = ...;
Observable
    // create a stream from a socket and dispose of socket
    // appropriately
    .using(socketCreator(host, port, quietTimeoutMs),
           socketObservableFactory(charset), 
           socketDisposer(), true)
    // cannot ask host to slow down so buffer on
    // backpressure
   .onBackpressureBuffer()
   .subscriber(subscriber);

点击按钮调用:

subscriber.unsubscribe()

这将停止可观察流并调用 socketDisposer() 停止网络 activity。

您没有指定下载的性质(ftp、http 等),但答案并没有真正改变,因为所有这些传输都符合这种模式。

在我的例子中,我有一个在特定条件下发出项目的发布主题。在某些情况下我想取消它,所以我添加了一个密封的 class 来保存状态:

sealed class DelayedItem {
   object Ignore : DelayedItem()
   data class Handle(val text: String) : DelayedItem()
}

我的订阅者被退回了:

myPublishSubject
            .subscribeOn(Schedulers.io())
            .debounce(500, TimeUnit.MILLISECONDS)
            .subscribe {
                if (it is DelayedItem.Handle)
                    handleItem(it)
            }

希望对您有所帮助!