为什么 flatMap 在 RxJava 中用 merge 实现?
Why flatMap is implemented with merge in RxJava?
为什么 RxJava 1.x flatMap() 操作符是用 merge 实现的?
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
从 flatMap() 调用我可以 return 只有一个符合 <? extends Observable<? extends R>>
的 Observable。然后 map(func) 调用将它包装到另一个 Observable 中,所以我们有这样的东西 Observable<? extends Observable<? extends R>>
。这让我觉得在 map(func) 之后调用 merge() 是不必要的。
据说 merge() 运算符执行以下操作:
Flattens an Observable that emits Observables into a single Observable
that emits the items emitted by those Observables, without any
transformation.
现在在平面地图中我们只能有一个发出一个 Observable 的 Observable。为什么合并?我在这里错过了什么?
谢谢。
Now inside the flat map we can only have an Observable that emits one Observable.
否 - 您有一个可观察项发出一个可观察项每个源可观察项。因此,如果您的源可观察对象有更多项,您将发出多个可观察对象。
这就是为什么你需要 merge()
。
查看签名可能会有帮助:
想象一下,对于 Observable<String>
,您想 flatMap
到各个字符。使用 flatMap 的方法是:
Observable.just("foo", "hello")
.flatMap(s -> Observable.from(s.split("")))
这个 Observable 的类型是什么?这是一个 Observable<String>
.
现在不再使用 flatMap
,而是使用具有相同功能的 map
。什么类型的?
Observable.just("foo", "hello")
.map(s -> Observable.from(s.split("")))
你会发现它实际上是 Observable<Observable<String>>
...
如果我们订阅这个 observable 并打印出发出的项目,我们会得到:
rx.Observable@5b275dab
rx.Observable@61832929
不是很有用。更糟糕的是,这些 Observable
还没有被订阅,所以它们不会发出任何数据:(
我们看到 flatMap
的目标是让函数为每个源项生成一个内部 Observable<T>
,然后订阅这些内部 observable 并在输出中将它们的发射压平 Observable<T>
。 merge
就是这样做的!
为了验证这一点,将上面的地图结果包装在 Observable.merge(...)
:
Observable<Observable<String>> mapped =
Observable.just("foo", "hello")
.map(s -> Observable.from(s.split("")));
Observable.merge(mapped)
.subscribe(System.out::println);
这输出:
f
o
o
h
e
l
l
o
为什么 RxJava 1.x flatMap() 操作符是用 merge 实现的?
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
从 flatMap() 调用我可以 return 只有一个符合 <? extends Observable<? extends R>>
的 Observable。然后 map(func) 调用将它包装到另一个 Observable 中,所以我们有这样的东西 Observable<? extends Observable<? extends R>>
。这让我觉得在 map(func) 之后调用 merge() 是不必要的。
据说 merge() 运算符执行以下操作:
Flattens an Observable that emits Observables into a single Observable that emits the items emitted by those Observables, without any transformation.
现在在平面地图中我们只能有一个发出一个 Observable 的 Observable。为什么合并?我在这里错过了什么?
谢谢。
Now inside the flat map we can only have an Observable that emits one Observable.
否 - 您有一个可观察项发出一个可观察项每个源可观察项。因此,如果您的源可观察对象有更多项,您将发出多个可观察对象。
这就是为什么你需要 merge()
。
查看签名可能会有帮助:
想象一下,对于 Observable<String>
,您想 flatMap
到各个字符。使用 flatMap 的方法是:
Observable.just("foo", "hello")
.flatMap(s -> Observable.from(s.split("")))
这个 Observable 的类型是什么?这是一个 Observable<String>
.
现在不再使用 flatMap
,而是使用具有相同功能的 map
。什么类型的?
Observable.just("foo", "hello")
.map(s -> Observable.from(s.split("")))
你会发现它实际上是 Observable<Observable<String>>
...
如果我们订阅这个 observable 并打印出发出的项目,我们会得到:
rx.Observable@5b275dab
rx.Observable@61832929
不是很有用。更糟糕的是,这些 Observable
还没有被订阅,所以它们不会发出任何数据:(
我们看到 flatMap
的目标是让函数为每个源项生成一个内部 Observable<T>
,然后订阅这些内部 observable 并在输出中将它们的发射压平 Observable<T>
。 merge
就是这样做的!
为了验证这一点,将上面的地图结果包装在 Observable.merge(...)
:
Observable<Observable<String>> mapped =
Observable.just("foo", "hello")
.map(s -> Observable.from(s.split("")));
Observable.merge(mapped)
.subscribe(System.out::println);
这输出:
f
o
o
h
e
l
l
o