RxJava 对可变长度的 Retrofit Observable 数组进行 zip 操作

RxJava zip operation on a varying length of Retrofit Observable array

我有不同长度的 Observable 数组。我想压缩请求(即发出一堆 API 请求并等待所有请求完成),但我不知道如何实现压缩功能。

Observable.zip(observables, new FuncN<List<ResponseBody>>() {
    @Override
    public List<ResponseBody> call(Object... args) {
        return Arrays.asList(args); <- compile error here
    }
});

这里的 obserables 是一个 List<Observable<ResponseBody>> 的数组,它的长度是先验未知的。

调用zip函数的参数无法更正为ResponseBody...。如何做到returnObservable<List<ResponseBody>>

FuncNRxJava1.x.x设计上的约束吗?

P.S。我正在使用 RxJava 1.1.6.

只需 merge 您的观察值并使用 toList:

收集结果
Observable.merge(observables).toList()

我确认 merge 运算符有效并找到导致订单的罪魁祸首:1st request, 1st response, 2nd request, 2nd response, 3rd request, 3rd response.

observables 列表中的可观察对象在添加期间未在 Scheduler.io() 上订阅。

之前:

observables.add(mViewModelDelegate.get().getApiService()
                    .rxGetCategoryBrandList(baseUrl, categoryId));

之后:

observables.add(mViewModelDelegate.get().getApiService()
                    .rxGetCategoryBrandList(baseUrl, categoryId)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

用户合并并确保所有可观察的请求都有自己的线程。 你可以试试这个:

private void runMyTest() {
    List<Single<String>> singleObservableList = new ArrayList<>();
    singleObservableList.add(getSingleObservable(500, "AAA"));
    singleObservableList.add(getSingleObservable(300, "BBB"));
    singleObservableList.add(getSingleObservable(100, "CCC"));
    Single.merge(singleObservableList)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(System.out::println);
}

private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
    return Single
            .create((SingleOnSubscribe<String>) e -> {
                    try {
                        Thread.sleep(waitMilliSeconds);
                    } catch (InterruptedException exception) {
                        exception.printStackTrace();
                    }
                    System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = " +Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
                    if(!e.isDisposed()) e.onSuccess(name);
                })
            .subscribeOn(Schedulers.io());
}

输出:

System.out: name = CCC, waitMilliSeconds = 100, thread name = RxCachedThreadScheduler-4, id =463

System.out: CCC

System.out: name = BBB, waitMilliSeconds = 300, thread name = RxCachedThreadScheduler-3, id =462

System.out: BBB

System.out: name = AAA, waitMilliSeconds = 500, thread name = RxCachedThreadScheduler-2, id =461

System.out: AAA