使用 RxJava 分叉任务并合并结果
Using RxJava to fork into tasks and combine results
我正在尝试执行以下操作:
A
|
|
V
Observable<B>
/\
/ \
/ \
V V
Observable<C> Observable<D>
\ /
\ /
V V
Observable<E>
- 给定输入 [A],异步调用 returns [B]。
- 每个需要[B]的两个任务需要运行并行return[C]和[D]。
- 两个结果合并成[E],然后显示在UI。
我是RxJava的新手,遇到过zip、merge等,但不太明白这类问题需要什么运算符。任何帮助将不胜感激。
PS。
1) 虽然 [C] 和 [D] 都是必需的,但 [E] 仍然可以只用其中之一创建。因此,如果其中一个(或两个)失败,此时最好有一个超时。
2) 是否可以在特定线程中使用它们 运行 - 一个在 computation() 中,另一个在 io() 中?
这是我目前的概念代码。我这样做是线性的:
A -> B -> C -> D -> E
return a2b(a)
.subscribeOn(Schedulers.io())
.flatMap(this::b2c)
.subscribeOn(Schedulers.computation())
.map(this::c2d)
.map(this::d2e)
.cast(E.class)
.startWith(e -> new E.loadingState());
理想情况下,我应该在某处使用以下函数:
Observable<E> cd2e(C c, D d) {
return Observable.just(new E());
}
谢谢。
publish()
运算符以允许多个订阅的方式绑定单个可观察对象。
return a2b(a)
.subscribeOn(Schedulers.io())
.publish( bObservable ->
Observable.zip( bObservable.map( b -> this::b2c ),
bObservable.map( b -> this::b2d ),
(c, d) -> combine( c, d ) )
.subscribe( ... );
运营商绑定观察者链,可以进行多次订阅;在这种情况下,订阅被压缩在一起,将 C
和 D
类型组合成组合的 E
类型。
然后您可以自由添加 observeOn()
运算符以在您想要的线程上完成计算。
我正在尝试执行以下操作:
A
|
|
V
Observable<B>
/\
/ \
/ \
V V
Observable<C> Observable<D>
\ /
\ /
V V
Observable<E>
- 给定输入 [A],异步调用 returns [B]。
- 每个需要[B]的两个任务需要运行并行return[C]和[D]。
- 两个结果合并成[E],然后显示在UI。
我是RxJava的新手,遇到过zip、merge等,但不太明白这类问题需要什么运算符。任何帮助将不胜感激。
PS。
1) 虽然 [C] 和 [D] 都是必需的,但 [E] 仍然可以只用其中之一创建。因此,如果其中一个(或两个)失败,此时最好有一个超时。
2) 是否可以在特定线程中使用它们 运行 - 一个在 computation() 中,另一个在 io() 中?
这是我目前的概念代码。我这样做是线性的:
A -> B -> C -> D -> E
return a2b(a)
.subscribeOn(Schedulers.io())
.flatMap(this::b2c)
.subscribeOn(Schedulers.computation())
.map(this::c2d)
.map(this::d2e)
.cast(E.class)
.startWith(e -> new E.loadingState());
理想情况下,我应该在某处使用以下函数:
Observable<E> cd2e(C c, D d) {
return Observable.just(new E());
}
谢谢。
publish()
运算符以允许多个订阅的方式绑定单个可观察对象。
return a2b(a)
.subscribeOn(Schedulers.io())
.publish( bObservable ->
Observable.zip( bObservable.map( b -> this::b2c ),
bObservable.map( b -> this::b2d ),
(c, d) -> combine( c, d ) )
.subscribe( ... );
运营商绑定观察者链,可以进行多次订阅;在这种情况下,订阅被压缩在一起,将 C
和 D
类型组合成组合的 E
类型。
然后您可以自由添加 observeOn()
运算符以在您想要的线程上完成计算。