在处理时停止 rxJava 可观察链执行
Stop rxJava observable chain execution on disposing
在一个应用程序中调试 rxJava 网络调用时,我遇到了一种情况,如果我们 dispose
或 clear
通过订阅 observables
链返回的处理对象,那么只有第一个 observable
被 flatMap
.
处置,而不是其他链接 observables
看看下面的演示代码片段:
CompositeDisposable testCompositeDisposal = new CompositeDisposable();
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext");
}).doOnDispose(()-> {
Log.w("Debug: ", "doOnDispose: observable has been disposed");
}).subscribe();
testCompositeDisposal.add(disposable);
}
@Override
public void onStop() {
super.onStop();
testCompositeDisposal.clear();
}
输出:
W/Debug:: First: 0
W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here.
W/Debug:: First: 1
W/Debug:: First: 2
W/Debug:: First: 3
W/Debug:: First: 4
正如您在上面的日志输出中看到的那样,当我处理给定的 rxJava 可观察链时,只有第一个可观察停止发射项目。
我想停止所有已链接的可观察对象。
解决这个问题的惯用方法是什么?
两件事:
flatMap
可以预先消耗上游的物品(android 最多 16 个);
- 第二个更适用于您的用例,在您调用
onNext
之前,您应该检查观察者是否已被处置(通过 .isDisposed()
)并在发生这种情况时中止。
此外,second flatMap
被终止(实际上它永远不会被调用)。 第一个 继续。
编辑
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
if(sbr.isDisposed()) return; // this will cause subscription to terminate.
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext");
}).doOnDispose(()-> {
Log.w("Debug: ", "doOnDispose: observable has been disposed");
}).subscribe();
testCompositeDisposal.add(disposable);
}
在一个应用程序中调试 rxJava 网络调用时,我遇到了一种情况,如果我们 dispose
或 clear
通过订阅 observables
链返回的处理对象,那么只有第一个 observable
被 flatMap
.
observables
看看下面的演示代码片段:
CompositeDisposable testCompositeDisposal = new CompositeDisposable();
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext");
}).doOnDispose(()-> {
Log.w("Debug: ", "doOnDispose: observable has been disposed");
}).subscribe();
testCompositeDisposal.add(disposable);
}
@Override
public void onStop() {
super.onStop();
testCompositeDisposal.clear();
}
输出:
W/Debug:: First: 0
W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here.
W/Debug:: First: 1
W/Debug:: First: 2
W/Debug:: First: 3
W/Debug:: First: 4
正如您在上面的日志输出中看到的那样,当我处理给定的 rxJava 可观察链时,只有第一个可观察停止发射项目。
我想停止所有已链接的可观察对象。
解决这个问题的惯用方法是什么?
两件事:
flatMap
可以预先消耗上游的物品(android 最多 16 个);- 第二个更适用于您的用例,在您调用
onNext
之前,您应该检查观察者是否已被处置(通过.isDisposed()
)并在发生这种情况时中止。
此外,second flatMap
被终止(实际上它永远不会被调用)。 第一个 继续。
编辑
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
if(sbr.isDisposed()) return; // this will cause subscription to terminate.
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext");
}).doOnDispose(()-> {
Log.w("Debug: ", "doOnDispose: observable has been disposed");
}).subscribe();
testCompositeDisposal.add(disposable);
}