动态组合多个 Retrofit Observable

Combine multiple Retrofit Observable dynamically

我有一个这样的 Observable 列表:

 List<Observable<MyObj>> listObservables = new ArrayList<Observable<MyObj>>();

我想将所有 Observable 组合成一个,如果我知道使用 zip() 的 Observable 的数量,我可以处理它,例如我们有 3 个 Observable:

 Observable<MyObj1> obs1= MyRestClient.getSomeData1();
 Observable<MyObj2> obs2= MyRestClient.getSomeData2();
 Observable<MyObj3> obs3= MyRestClient.getSomeData3();

我有一个包装对象:

class MyWrapperObj {
    private MyObj1 onj1;
    private MyObj2 onj2;
    private MyObj3 onj3;

    public MyWrapperObj(MyObj1 onj1, MyObj2 onj2, MyObj3 onj3) {
        this.onj1 = onj1;
        this.onj2 = onj2;
        this.onj3 = onj3;
    }
}

所以我可以这样组合它们:

 Observable<MyWrapperObj> combinedObservable = Observable.zip(obs1, obs2, obs3, new Func3<MyObj1, MyObj2, MyObj3, MyWrapperObj>() {
        @Override
        public MyWrapperObj call(MyObj1 obj1, MyObj2 obj2, MyObj3 obj3) {
            return new MyWrapperObj(obj1, obj2, obj3);
        }
    });

    combinedObservable.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyWrapperObj>() {
                @Override
                public void onCompleted() {                       
                }

                @Override
                public void onError(Throwable throwable) {
                }

                @Override
                public void onNext(MyWrapperObj wrapperObj) {
                }
            });

一切正常,所以我的问题是如何为 n 个 Observable 组织这种组合?

响应

正如@maciekjanusz 在我的回答中提到的那样:

  Observable<MyWrapperObj> combinedObservable = Observable.zip(listObservables, new FuncN<MyWrapperObj>() {
        @Override
        public MyWrapperObjcall(Object... args) {
            return null;
        }
    });

您可以等待所有可观察对象完成,方法是使用 .zip(observable1, ..., observableN, funcN).first() 运算符。有一个重载,接受 Observable> 参数(如在 FlatMap 中)。

第一个重载采用 Iterable> - 您可以传递任意大小的可观察对象列表,第二个参数 - FuncN - 接收值列表。

假设您有列表:

List<Observable<MyObj>> listObservables

您可以考虑使用 Observable.concatDelayError

如果它正在完成所有 Obbservable 的优势,即使它们中的任何一个以错误完成(在这种情况下导致错误)。

请记住,此序列中的每个 Observable 必须 return onNext 方法 Subscriber 的结果。结果也必须具有相同的类型。

示例:

Observable.concatDelayError(listObservables);

如果你想 zip n Observables,将它们放在一个列表中并应用 public static <R> Observable<R> zip(@NotNull java.lang.Iterable<? extends Observable<?>> ws, rx.functions.FuncN<? extends R> zipFunction) 工厂方法。

List<Observable<String>> observables = Arrays.asList(Observable.just("String 1"), Observable.just("String 2"));

Observable.zip(observables, args -> {
    // put your zipping code here
});

例如,如果您想为所有可观察对象的每个发射创建一个字符串列表:

Observable.zip(observables, Arrays::asList);

或者,如果在 android 上使用没有 retrolambda 的 RxJava:

Observable.zip(observables, args -> Arrays.asList(args));