如何在所有并行 HTTP 请求完成后调用方法?

How to Invoke a Method When All of the Parallel HTTP Requests Are Completed?

我需要获取类别,然后获取该类别的频道,最后在从服务器检索到所有类别及其频道时调用一个方法。我想我需要使用 RxJava,但我找不到类似的实现。 (最好不使用 lambda/retrolambda 表达式)。

@GET("/api/{categoryId})
Call<Category> getCategory(@Path("categoryId") String categoryId)


private void getCategories() {
    for (Tab t : tabs) {
        Call<Category> getCategory = videoAPI.getCategory(t.getId());
        getCategory.enqueue(new Callback<Category>() {
            @Override
            public void onResponse(Call<Category> call, Response<Category> response) {
                Category cat = response.body();
                categories.add(cat);
                // I will call the getChannels(String categoryId) method here, 
                // however I think implementing RxJava would be much better.
            }

            @Override
            public void onFailure(Call<Category> call, Throwable t) {
                Log.i(TAG, "failure: " + t.getLocalizedMessage());
            }
        });
    }
}

你可以用

Observable
    .fromArray(/*your list of observables go here, make sure that within flatMap you get as type Observable<T>, not Observable<List<T>>*/)
    .flatMap(/*here you subscribe every item to a different thread, so they're parallel requests: subscribeOn(Schedulers.computation())*/)
    .subscribe (/*each request*/,/*error*/,/*completed all requests*/)

现在你的请求需要是 Observable 类型

@GET("/api/{categoryId}) 
Observable<Category> getCategory(@Path("categoryId") String categoryId) 

Java中的示例代码:

// Setup a list of observables
List<Observable<Category>> parallelRequests = new ArrayList<>();
for (Tab t : tabs) {
    parallelRequests.add(videoAPI.getCategory(t.getId()));
}
Observable[] array = new Observable[parallelRequests.size()];

// Convert the list to array
parallelRequests.toArray(array);

Observable
        .fromArray(array)
        .flatMap(observable -> observable.subscribeOn(Schedulers.computation()))
        .subscribe(o -> {
            // each request is fulfilled
        }, Throwable::printStackTrace, () -> finishFunctionHere());

或者,如果您使用的是 Kotlin

Observable
// The asterisk is called "spread operator": It converts an array to vararg
.fromArray(*tabs.map { api.getCategory(it.getId()) }.toTypedArray())
.flatMap { it.subscribeOn(Schedulers.computation()) }
.subscribe({ category ->
    // onEach
}, Throwable::printStackTrace, {
    // All requests were fulfilled
})