如何在 onComplete 之后显式取消订阅 observable

How to explicitly unSubscribe from observable after the onComplete

在下面的代码中,如何以及在何处放置 unSubscribe 以在完成 onComplete 之后明确取消订阅 Observable

getObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(new Subscriber<Boolean>() {
        @Override
        public void onCompleted() {
            doSomething();
        }

        @Override
        public void onError(Throwable e) {
            thereIsError();
        }

        @Override
        public void onNext(Boolean status) {
            updateView();
        }
    });

简短的回答是,您没有理由明确取消订阅,因为这是由观察者链完成的。您可以通过添加此运算符来自己测试:

.doOnUnsubscribe( () -> System.out.println("Unsubscribed") )

您可以保存 subscribe 方法返回的 Disposable 并在 onCompleted 回调中使用它

Disposable d = 
getObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(new Subscriber<Boolean>() {
        @Override
        public void onCompleted() {
            doSomething();
            d.dispose();
        }

        @Override
        public void onError(Throwable e) {
            thereIsError();
        }

        @Override
        public void onNext(Boolean status) {
            updateView();
        }
    });
 disposables.add(sampleObservable()
            // Run on a background thread
            .subscribeOn(Schedulers.io())
            // Be notified on the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableObserver<String>() {
                @Override
                public void onComplete() {
                    // Do some work for completion
                }

                @Override
                public void onError(Throwable e) {
                     // Do some work for error
                }

                @Override
                public void onNext(String value) {
                    // Do some work for next
                }
            }));

清除并退订

@Override
protected void onDestroy() {
    super.onDestroy();
    disposables.clear(); // do not send event after activity has been destroyed
    disposables.dispose();
}

如果您想立即取消订阅,只需使用 take(1)。在你调用 take(1) 之后,它应该调用 onComplete。

Disposable d = 
getObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .take(1)//take the first, then stop emissions
    .subscribe(new Subscriber<Boolean>() {
        @Override
        public void onCompleted() {
            doSomething();
            d.dispose();
        }

        @Override
        public void onError(Throwable e) {
            thereIsError();
        }

        @Override
        public void onNext(Boolean status) {
            updateView();
        }
    });