如何为可观察对象的条件链接实现这种通用模式
How to implement this common pattern for conditional chaining of observables
public Observable<List<A>> buildObservable(final String bId) {
final Observable<List<B>> bObs = bRepository.getBs(bId);
return Observable.create(new ObservableOnSubscribe<List<A>>() {
@Override
public void subscribe(final ObservableEmitter<List<A>> subscriber) throws Exception {
bObs.subscribe(new DefaultObserver<List<B>>() {
@Override
public void onNext(final List<B> l_B) {
if (l_B.isEmpty()) {
subscriber.onNext(new LinkedList<A>());
subscriber.onComplete();
} else {
final List<A> l_A = new LinkedList<A>();
for (B b : l_B) {
Observable<A> aObs = aRepository.getbyB(b);
aObs.subscribe(new DefaultObserver<A>() {
@Override
public void onNext(A a) {
l_A.add(a);
if (l_A.size() == l_B.size()) {
subscriber.onNext(l_A);
subscriber.onComplete();
}
}
});
}
}
}
});
}
});
}
我发现自己不得不一遍又一遍地实施上述模式。
我有一个带有订阅者的可观察 bObs,它触发一组以 bObs 的值为条件的可观察量。在它们都 return 之后,由函数编辑的主要观察者 return 发出一个值。
我觉得应该有更简单的方法来做到这一点,因为这种模式可能很常见。是否有某种复合运算符可以帮助我获取 aObs 和 bObs 并构建顶级可观察对象?
根据经验,尽可能避免使用 .create()
自己创建可观察对象,因为几乎总有更好更短的错误证明标准方法,例如.just()
.empty()
.from()
等等
其次,你是完全正确的,在 Rx 中使用命令式流控制如 if
/for
是很奇怪的,因为 Rx 是建立在 CPS 之上的(Continuation-Passing Style) 本身就是 流量控制的概念。
所以在你的情况下它会是这样的:
bObs.flatMap((b) -> { b.aRepository.getbyB(b).defaultIfEmpty(new ArrayList()); } ).toList()
采用上面 Maxim 的建议,我找到了一个令我满意的解决方案:
return bRepository
.getBs(bId)
.defaultIfEmpty(new ArrayList<B>())
.flatMap(Observable::from)
.flatMap((b)->{aRepository.getbyB(b);})
.toList()
.toObservable();
public Observable<List<A>> buildObservable(final String bId) {
final Observable<List<B>> bObs = bRepository.getBs(bId);
return Observable.create(new ObservableOnSubscribe<List<A>>() {
@Override
public void subscribe(final ObservableEmitter<List<A>> subscriber) throws Exception {
bObs.subscribe(new DefaultObserver<List<B>>() {
@Override
public void onNext(final List<B> l_B) {
if (l_B.isEmpty()) {
subscriber.onNext(new LinkedList<A>());
subscriber.onComplete();
} else {
final List<A> l_A = new LinkedList<A>();
for (B b : l_B) {
Observable<A> aObs = aRepository.getbyB(b);
aObs.subscribe(new DefaultObserver<A>() {
@Override
public void onNext(A a) {
l_A.add(a);
if (l_A.size() == l_B.size()) {
subscriber.onNext(l_A);
subscriber.onComplete();
}
}
});
}
}
}
});
}
});
}
我发现自己不得不一遍又一遍地实施上述模式。 我有一个带有订阅者的可观察 bObs,它触发一组以 bObs 的值为条件的可观察量。在它们都 return 之后,由函数编辑的主要观察者 return 发出一个值。
我觉得应该有更简单的方法来做到这一点,因为这种模式可能很常见。是否有某种复合运算符可以帮助我获取 aObs 和 bObs 并构建顶级可观察对象?
根据经验,尽可能避免使用 .create()
自己创建可观察对象,因为几乎总有更好更短的错误证明标准方法,例如.just()
.empty()
.from()
等等
其次,你是完全正确的,在 Rx 中使用命令式流控制如 if
/for
是很奇怪的,因为 Rx 是建立在 CPS 之上的(Continuation-Passing Style) 本身就是 流量控制的概念。
所以在你的情况下它会是这样的:
bObs.flatMap((b) -> { b.aRepository.getbyB(b).defaultIfEmpty(new ArrayList()); } ).toList()
采用上面 Maxim 的建议,我找到了一个令我满意的解决方案:
return bRepository
.getBs(bId)
.defaultIfEmpty(new ArrayList<B>())
.flatMap(Observable::from)
.flatMap((b)->{aRepository.getbyB(b);})
.toList()
.toObservable();