为什么 flatMap 在 RxJava 中用 merge 实现?

Why flatMap is implemented with merge in RxJava?

为什么 RxJava 1.x flatMap() 操作符是用 merge 实现的?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

从 flatMap() 调用我可以 return 只有一个符合 <? extends Observable<? extends R>> 的 Observable。然后 map(func) 调用将它包装到另一个 Observable 中,所以我们有这样的东西 Observable<? extends Observable<? extends R>>。这让我觉得在 map(func) 之后调用 merge() 是不必要的。

据说 merge() 运算符执行以下操作:

Flattens an Observable that emits Observables into a single Observable that emits the items emitted by those Observables, without any transformation.

现在在平面地图中我们只能有一个发出一个 Observable 的 Observable。为什么合并?我在这里错过了什么?

谢谢。

Now inside the flat map we can only have an Observable that emits one Observable.

否 - 您有一个可观察项发出一个可观察项每个源可观察项。因此,如果您的源可观察对象有更多项,您将发出多个可观察对象。

这就是为什么你需要 merge()

查看签名可能会有帮助:

想象一下,对于 Observable<String>,您想 flatMap 到各个字符。使用 flatMap 的方法是:

Observable.just("foo", "hello")
          .flatMap(s -> Observable.from(s.split("")))

这个 Observable 的类型是什么?这是一个 Observable<String>.

现在不再使用 flatMap,而是使用具有相同功能的 map 。什么类型的?

Observable.just("foo", "hello")
          .map(s -> Observable.from(s.split("")))

你会发现它实际上是 Observable<Observable<String>>... 如果我们订阅这个 observable 并打印出发出的项目,我们会得到:

rx.Observable@5b275dab
rx.Observable@61832929

不是很有用。更糟糕的是,这些 Observable 还没有被订阅,所以它们不会发出任何数据:(

我们看到 flatMap 的目标是让函数为每个源项生成一个内部 Observable<T>,然后订阅这些内部 observable 并在输出中将它们的发射压平 Observable<T>merge 就是这样做的!

为了验证这一点,将上面的地图结果包装在 Observable.merge(...):

Observable<Observable<String>> mapped = 
    Observable.just("foo", "hello")
              .map(s -> Observable.from(s.split("")));

Observable.merge(mapped)
          .subscribe(System.out::println);

这输出:

f
o
o
h
e
l
l
o