RxJava 对可变长度的 Retrofit Observable 数组进行 zip 操作
RxJava zip operation on a varying length of Retrofit Observable array
我有不同长度的 Observable 数组。我想压缩请求(即发出一堆 API 请求并等待所有请求完成),但我不知道如何实现压缩功能。
Observable.zip(observables, new FuncN<List<ResponseBody>>() {
@Override
public List<ResponseBody> call(Object... args) {
return Arrays.asList(args); <- compile error here
}
});
这里的 obserables
是一个 List<Observable<ResponseBody>>
的数组,它的长度是先验未知的。
调用zip函数的参数无法更正为ResponseBody...
。如何做到returnObservable<List<ResponseBody>>
?
是FuncN
RxJava1.x.x设计上的约束吗?
P.S。我正在使用 RxJava 1.1.6.
只需 merge 您的观察值并使用 toList
:
收集结果
Observable.merge(observables).toList()
我确认 merge
运算符有效并找到导致订单的罪魁祸首:1st request, 1st response, 2nd request, 2nd response, 3rd request, 3rd response
.
observables
列表中的可观察对象在添加期间未在 Scheduler.io()
上订阅。
之前:
observables.add(mViewModelDelegate.get().getApiService()
.rxGetCategoryBrandList(baseUrl, categoryId));
之后:
observables.add(mViewModelDelegate.get().getApiService()
.rxGetCategoryBrandList(baseUrl, categoryId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
用户合并并确保所有可观察的请求都有自己的线程。
你可以试试这个:
private void runMyTest() {
List<Single<String>> singleObservableList = new ArrayList<>();
singleObservableList.add(getSingleObservable(500, "AAA"));
singleObservableList.add(getSingleObservable(300, "BBB"));
singleObservableList.add(getSingleObservable(100, "CCC"));
Single.merge(singleObservableList)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(System.out::println);
}
private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
return Single
.create((SingleOnSubscribe<String>) e -> {
try {
Thread.sleep(waitMilliSeconds);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = " +Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
if(!e.isDisposed()) e.onSuccess(name);
})
.subscribeOn(Schedulers.io());
}
输出:
System.out: name = CCC, waitMilliSeconds = 100, thread name =
RxCachedThreadScheduler-4, id =463
System.out: CCC
System.out: name = BBB, waitMilliSeconds = 300, thread name =
RxCachedThreadScheduler-3, id =462
System.out: BBB
System.out: name = AAA, waitMilliSeconds = 500, thread name =
RxCachedThreadScheduler-2, id =461
System.out: AAA
我有不同长度的 Observable 数组。我想压缩请求(即发出一堆 API 请求并等待所有请求完成),但我不知道如何实现压缩功能。
Observable.zip(observables, new FuncN<List<ResponseBody>>() {
@Override
public List<ResponseBody> call(Object... args) {
return Arrays.asList(args); <- compile error here
}
});
这里的 obserables
是一个 List<Observable<ResponseBody>>
的数组,它的长度是先验未知的。
调用zip函数的参数无法更正为ResponseBody...
。如何做到returnObservable<List<ResponseBody>>
?
是FuncN
RxJava1.x.x设计上的约束吗?
P.S。我正在使用 RxJava 1.1.6.
只需 merge 您的观察值并使用 toList
:
Observable.merge(observables).toList()
我确认 merge
运算符有效并找到导致订单的罪魁祸首:1st request, 1st response, 2nd request, 2nd response, 3rd request, 3rd response
.
observables
列表中的可观察对象在添加期间未在 Scheduler.io()
上订阅。
之前:
observables.add(mViewModelDelegate.get().getApiService()
.rxGetCategoryBrandList(baseUrl, categoryId));
之后:
observables.add(mViewModelDelegate.get().getApiService()
.rxGetCategoryBrandList(baseUrl, categoryId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
用户合并并确保所有可观察的请求都有自己的线程。 你可以试试这个:
private void runMyTest() {
List<Single<String>> singleObservableList = new ArrayList<>();
singleObservableList.add(getSingleObservable(500, "AAA"));
singleObservableList.add(getSingleObservable(300, "BBB"));
singleObservableList.add(getSingleObservable(100, "CCC"));
Single.merge(singleObservableList)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(System.out::println);
}
private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
return Single
.create((SingleOnSubscribe<String>) e -> {
try {
Thread.sleep(waitMilliSeconds);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = " +Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
if(!e.isDisposed()) e.onSuccess(name);
})
.subscribeOn(Schedulers.io());
}
输出:
System.out: name = CCC, waitMilliSeconds = 100, thread name = RxCachedThreadScheduler-4, id =463
System.out: CCC
System.out: name = BBB, waitMilliSeconds = 300, thread name = RxCachedThreadScheduler-3, id =462
System.out: BBB
System.out: name = AAA, waitMilliSeconds = 500, thread name = RxCachedThreadScheduler-2, id =461
System.out: AAA