动态组合多个 Retrofit Observable
Combine multiple Retrofit Observable dynamically
我有一个这样的 Observable 列表:
List<Observable<MyObj>> listObservables = new ArrayList<Observable<MyObj>>();
我想将所有 Observable 组合成一个,如果我知道使用 zip()
的 Observable 的数量,我可以处理它,例如我们有 3 个 Observable:
Observable<MyObj1> obs1= MyRestClient.getSomeData1();
Observable<MyObj2> obs2= MyRestClient.getSomeData2();
Observable<MyObj3> obs3= MyRestClient.getSomeData3();
我有一个包装对象:
class MyWrapperObj {
private MyObj1 onj1;
private MyObj2 onj2;
private MyObj3 onj3;
public MyWrapperObj(MyObj1 onj1, MyObj2 onj2, MyObj3 onj3) {
this.onj1 = onj1;
this.onj2 = onj2;
this.onj3 = onj3;
}
}
所以我可以这样组合它们:
Observable<MyWrapperObj> combinedObservable = Observable.zip(obs1, obs2, obs3, new Func3<MyObj1, MyObj2, MyObj3, MyWrapperObj>() {
@Override
public MyWrapperObj call(MyObj1 obj1, MyObj2 obj2, MyObj3 obj3) {
return new MyWrapperObj(obj1, obj2, obj3);
}
});
combinedObservable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<MyWrapperObj>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(MyWrapperObj wrapperObj) {
}
});
一切正常,所以我的问题是如何为 n 个 Observable 组织这种组合?
响应
正如@maciekjanusz 在我的回答中提到的那样:
Observable<MyWrapperObj> combinedObservable = Observable.zip(listObservables, new FuncN<MyWrapperObj>() {
@Override
public MyWrapperObjcall(Object... args) {
return null;
}
});
您可以等待所有可观察对象完成,方法是使用
.zip(observable1, ..., observableN, funcN).first() 运算符。有一个重载,接受 Observable> 参数(如在 FlatMap 中)。
第一个重载采用 Iterable> - 您可以传递任意大小的可观察对象列表,第二个参数 - FuncN - 接收值列表。
假设您有列表:
List<Observable<MyObj>> listObservables
您可以考虑使用 Observable.concatDelayError
如果它正在完成所有 Obbservable
的优势,即使它们中的任何一个以错误完成(在这种情况下导致错误)。
请记住,此序列中的每个 Observable
必须 return onNext
方法 Subscriber
的结果。结果也必须具有相同的类型。
示例:
Observable.concatDelayError(listObservables);
如果你想 zip
n Observables,将它们放在一个列表中并应用 public static <R> Observable<R> zip(@NotNull java.lang.Iterable<? extends Observable<?>> ws, rx.functions.FuncN<? extends R> zipFunction)
工厂方法。
List<Observable<String>> observables = Arrays.asList(Observable.just("String 1"), Observable.just("String 2"));
Observable.zip(observables, args -> {
// put your zipping code here
});
例如,如果您想为所有可观察对象的每个发射创建一个字符串列表:
Observable.zip(observables, Arrays::asList);
或者,如果在 android 上使用没有 retrolambda 的 RxJava:
Observable.zip(observables, args -> Arrays.asList(args));
我有一个这样的 Observable 列表:
List<Observable<MyObj>> listObservables = new ArrayList<Observable<MyObj>>();
我想将所有 Observable 组合成一个,如果我知道使用 zip()
的 Observable 的数量,我可以处理它,例如我们有 3 个 Observable:
Observable<MyObj1> obs1= MyRestClient.getSomeData1();
Observable<MyObj2> obs2= MyRestClient.getSomeData2();
Observable<MyObj3> obs3= MyRestClient.getSomeData3();
我有一个包装对象:
class MyWrapperObj {
private MyObj1 onj1;
private MyObj2 onj2;
private MyObj3 onj3;
public MyWrapperObj(MyObj1 onj1, MyObj2 onj2, MyObj3 onj3) {
this.onj1 = onj1;
this.onj2 = onj2;
this.onj3 = onj3;
}
}
所以我可以这样组合它们:
Observable<MyWrapperObj> combinedObservable = Observable.zip(obs1, obs2, obs3, new Func3<MyObj1, MyObj2, MyObj3, MyWrapperObj>() {
@Override
public MyWrapperObj call(MyObj1 obj1, MyObj2 obj2, MyObj3 obj3) {
return new MyWrapperObj(obj1, obj2, obj3);
}
});
combinedObservable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<MyWrapperObj>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(MyWrapperObj wrapperObj) {
}
});
一切正常,所以我的问题是如何为 n 个 Observable 组织这种组合?
响应
正如@maciekjanusz 在我的回答中提到的那样:
Observable<MyWrapperObj> combinedObservable = Observable.zip(listObservables, new FuncN<MyWrapperObj>() {
@Override
public MyWrapperObjcall(Object... args) {
return null;
}
});
您可以等待所有可观察对象完成,方法是使用 .zip(observable1, ..., observableN, funcN).first() 运算符。有一个重载,接受 Observable> 参数(如在 FlatMap 中)。
第一个重载采用 Iterable> - 您可以传递任意大小的可观察对象列表,第二个参数 - FuncN - 接收值列表。
假设您有列表:
List<Observable<MyObj>> listObservables
您可以考虑使用 Observable.concatDelayError
如果它正在完成所有 Obbservable
的优势,即使它们中的任何一个以错误完成(在这种情况下导致错误)。
请记住,此序列中的每个 Observable
必须 return onNext
方法 Subscriber
的结果。结果也必须具有相同的类型。
示例:
Observable.concatDelayError(listObservables);
如果你想 zip
n Observables,将它们放在一个列表中并应用 public static <R> Observable<R> zip(@NotNull java.lang.Iterable<? extends Observable<?>> ws, rx.functions.FuncN<? extends R> zipFunction)
工厂方法。
List<Observable<String>> observables = Arrays.asList(Observable.just("String 1"), Observable.just("String 2"));
Observable.zip(observables, args -> {
// put your zipping code here
});
例如,如果您想为所有可观察对象的每个发射创建一个字符串列表:
Observable.zip(observables, Arrays::asList);
或者,如果在 android 上使用没有 retrolambda 的 RxJava:
Observable.zip(observables, args -> Arrays.asList(args));