RxJava2 materialize() 在出错后不发出下一个项目

RxJava2 materialize() doesn't emit next item after an error

我一直在尝试将我的 onErrors 转换为通知以保持流发射项目。据我了解, materialize() 运算符就是这样做的。所以基本上:

materialize() / dematerialize() are available to turn terminal events into Notification

所以我根据这个问题做了一个测试()。我尝试了以下方法:

 @Test
public void materializeTest() {
    final Observable<String> stringObservable = Observable.fromArray("1", "2", "3")
            .flatMap(x -> {
                if (x.equals("2")) {
                    return Observable.error(new NullPointerException());
                }

                return Observable.just(x);
            })
            .materialize()
            .map(n -> n.getValue());

    final TestObserver<String> testObs = stringObservable.test();
    Java6Assertions.assertThat(testObs.values().size()).isEqualTo(2);

    testObs.assertValueAt(0, "1");
    testObs.assertValueAt(1, "3");
}

结果是在“2”给出错误后不再发射任何项目。我也试过扭曲我自己的通知对象 (MyNotification<T>) 并做类似的事情:

stringObs
  .map(string -> MyNotification.success(string)
  .onErrorReturn(error -> MyNotification.error())

但最终结果始终相同:在“2”之后不再发出任何项目。我 100% 做错了什么,但无法真正理解是什么。

对于 flatMap,如果其中一个内部 Observable 失败,序列将终止,并且不会从上游转换更多项目。 materialize() 之前就已经发生了。

因此,与其尝试实现合并流,不如单独实现内部源:

Observable.fromArray("1", "2", "3")
        .flatMap(x -> {
            if (x.equals("2")) {
                return Observable.<String>error(new NullPointerException())
                                 .materialize();
            }

            return Observable.just(x)
                             .materialize();
        })
        .filter(n -> n.isOnNext())
        .map(n -> n.getValue());