如果第二个立即失败,ReactiveX concat 不会从第一个可观察对象产生 onNext
ReactiveX concat doesn't produce onNext from first observable if second fails immediately
我连接两个可观察对象,首先显示缓存中的数据,然后开始从网络加载数据并显示更新后的数据。
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如果没有网络连接,则在调用 OnSubscribe 后第二个 observable 会立即失败。
如果第二个 observable 立即失败,第一个 observable 的数据将丢失。永远不会在订阅者中调用 onNext 方法。
我认为,这可能是由于 OperatorConcat.ConcatSubscriber
中的以下代码所致
@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}
看起来在收到错误后取消订阅,所有待处理的 onNext 都丢失了。
解决我的问题的最佳方法是什么?
更新
看起来我找到了解决方案,我没有为串联的可观察对象设置 observOn,而是为每个可观察对象设置了 observOn。
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
RxJava 中的运算符通常设计用于短路 onError 通知。因为连接的可观察对象是异步源,所以您正在经历短路。如果你不想短路,那么你可以对物化的可观察对象进行连接,然后执行你想要的处理:
Observable.concat(
getContentFromCache.materialize().subscribeOn(dbScheduler),
getContentFromNetwork.materialize().subscribeOn(networkScheduler)
)
另一种方法是使用 onErrorResumeNext
:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.onErrorResumeNext(something)
.subscibeOn(networkScheduler)
)
observeOn
的默认行为是 onError
事件可以跳到队列前面,这里引用文档:
Note that onError
notifications will cut ahead of onNext
notifications
on the emission thread if Scheduler
is truly asynchronous.
这里有一个小测试来说明事情:
Scheduler newThreadScheduler = Schedulers.newThread();
Observable<Integer> stream = Observable.create(integerEmitter -> {
integerEmitter.onNext(1);
integerEmitter.onNext(2);
integerEmitter.onNext(3);
integerEmitter.onNext(4);
integerEmitter.onNext(5);
integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
.observeOn(newThreadScheduler).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
通常消费者会期望以下顺序:1 > 2 > 3 > 4 > 5 > Error
。但是仅使用 observeOn
可能会导致错误,测试将失败。
此行为很久以前就在这里实施 https://github.com/ReactiveX/RxJava/issues/1680,请检查这样做的动机。为避免这种行为,可以使用带有 delayError
参数的重载 observeOn
:
indicates if the onError
notification may not cut ahead of onNext
notification on the other side of the scheduling boundary. If true
a sequence ending in onError
will be replayed in the same order as was received
from upstream
这是您通常所期望的,因此将 observeOn(newThreadScheduler)
更改为 observeOn(newThreadScheduler, true)
将修复测试。
然后是@Neil 的问题:为什么@Rostyslav 提出的解决方案有效?它正在运行,因为最终序列没有线程切换。
在建议的解决方案中,最终序列由同一线程上的两个序列制作而成:第一个序列是来自缓存的数据,第二个序列只是来自网络的错误。它们是在同一个线程上一起制作的,并且在没有线程切换之后 - 订阅者在 AndroidSchedulers.mainThread()
上观察。如果您尝试将 final Scheduler
更改为其他值,它将再次失败。
This is the small Example for concat both Observable
// First Observable //
private Observable<Book> getAutobiography()
{
String [] booknames= new String[]{"book1","book2"};
final ArrayList<Book> arrayList = new ArrayList<>();
for(String name:booknames){
Book book = new Book();
book.setBooktype("Autobiograph");
book.setBookname(name);
arrayList.add(book);
}
return Observable.create(new ObservableOnSubscribe<Book>() {
@Override
public void subscribe(ObservableEmitter<Book> emitter) throws Exception {
for(Book book:arrayList)
{
emitter.onNext(book);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
// Second Observable
private Observable<Book> getProgrammingBooks()
{
String [] booknames= new String[]{"book3","book4"};
final ArrayList<Book> arrayList = new ArrayList<>();
for(String name:booknames){
Book book = new Book();
book.setBooktype("Programming");
book.setBookname(name);
arrayList.add(book);
}
return Observable.create(new ObservableOnSubscribe<Book>() {
@Override
public void subscribe(ObservableEmitter<Book> emitter) throws Exception {
for(Book book:arrayList)
{
emitter.onNext(book);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
// Concat them
Observable.concat(getProgrammingBooks(),getAutobiography()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Book>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Book book) {
Log.d("bookname",""+book.getBookname());
Log.d("booktype",""+book.getBooktype());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// it print both values //
我连接两个可观察对象,首先显示缓存中的数据,然后开始从网络加载数据并显示更新后的数据。
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如果没有网络连接,则在调用 OnSubscribe 后第二个 observable 会立即失败。
如果第二个 observable 立即失败,第一个 observable 的数据将丢失。永远不会在订阅者中调用 onNext 方法。
我认为,这可能是由于 OperatorConcat.ConcatSubscriber
中的以下代码所致 @Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}
看起来在收到错误后取消订阅,所有待处理的 onNext 都丢失了。
解决我的问题的最佳方法是什么?
更新
看起来我找到了解决方案,我没有为串联的可观察对象设置 observOn,而是为每个可观察对象设置了 observOn。
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
RxJava 中的运算符通常设计用于短路 onError 通知。因为连接的可观察对象是异步源,所以您正在经历短路。如果你不想短路,那么你可以对物化的可观察对象进行连接,然后执行你想要的处理:
Observable.concat(
getContentFromCache.materialize().subscribeOn(dbScheduler),
getContentFromNetwork.materialize().subscribeOn(networkScheduler)
)
另一种方法是使用 onErrorResumeNext
:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.onErrorResumeNext(something)
.subscibeOn(networkScheduler)
)
observeOn
的默认行为是 onError
事件可以跳到队列前面,这里引用文档:
Note that
onError
notifications will cut ahead ofonNext
notifications on the emission thread ifScheduler
is truly asynchronous.
这里有一个小测试来说明事情:
Scheduler newThreadScheduler = Schedulers.newThread();
Observable<Integer> stream = Observable.create(integerEmitter -> {
integerEmitter.onNext(1);
integerEmitter.onNext(2);
integerEmitter.onNext(3);
integerEmitter.onNext(4);
integerEmitter.onNext(5);
integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
.observeOn(newThreadScheduler).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
通常消费者会期望以下顺序:1 > 2 > 3 > 4 > 5 > Error
。但是仅使用 observeOn
可能会导致错误,测试将失败。
此行为很久以前就在这里实施 https://github.com/ReactiveX/RxJava/issues/1680,请检查这样做的动机。为避免这种行为,可以使用带有 delayError
参数的重载 observeOn
:
indicates if the
onError
notification may not cut ahead ofonNext
notification on the other side of the scheduling boundary. Iftrue
a sequence ending inonError
will be replayed in the same order as was received from upstream
这是您通常所期望的,因此将 observeOn(newThreadScheduler)
更改为 observeOn(newThreadScheduler, true)
将修复测试。
然后是@Neil 的问题:为什么@Rostyslav 提出的解决方案有效?它正在运行,因为最终序列没有线程切换。
在建议的解决方案中,最终序列由同一线程上的两个序列制作而成:第一个序列是来自缓存的数据,第二个序列只是来自网络的错误。它们是在同一个线程上一起制作的,并且在没有线程切换之后 - 订阅者在 AndroidSchedulers.mainThread()
上观察。如果您尝试将 final Scheduler
更改为其他值,它将再次失败。
This is the small Example for concat both Observable
// First Observable //
private Observable<Book> getAutobiography()
{
String [] booknames= new String[]{"book1","book2"};
final ArrayList<Book> arrayList = new ArrayList<>();
for(String name:booknames){
Book book = new Book();
book.setBooktype("Autobiograph");
book.setBookname(name);
arrayList.add(book);
}
return Observable.create(new ObservableOnSubscribe<Book>() {
@Override
public void subscribe(ObservableEmitter<Book> emitter) throws Exception {
for(Book book:arrayList)
{
emitter.onNext(book);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
// Second Observable
private Observable<Book> getProgrammingBooks()
{
String [] booknames= new String[]{"book3","book4"};
final ArrayList<Book> arrayList = new ArrayList<>();
for(String name:booknames){
Book book = new Book();
book.setBooktype("Programming");
book.setBookname(name);
arrayList.add(book);
}
return Observable.create(new ObservableOnSubscribe<Book>() {
@Override
public void subscribe(ObservableEmitter<Book> emitter) throws Exception {
for(Book book:arrayList)
{
emitter.onNext(book);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
// Concat them
Observable.concat(getProgrammingBooks(),getAutobiography()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Book>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Book book) {
Log.d("bookname",""+book.getBookname());
Log.d("booktype",""+book.getBooktype());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// it print both values //