我是否正确使用 flatMap 来合并多个 API 调用的结果?

Am I using flatMap correctly to merge results from multiple API calls?

我想进行多次 API 调用(使用三个不同的查询)并合并结果,然后将它们显示在 onNext() 中。它正在工作,但我担心 flatMap 对此并不理想。

@GET("www.examle.com/api/data/")
Observable<WebResultsResponse> getWebResults(@Query("param1") String query);

-----

private List<WebResult> resultsList;

private void requestWebResults(String query) {
    resultsList.clear();

    final Observable<List<WebResult>> observable = MainApplication.apiProvider.getApiProviderA.getWebResults("query1")
            .subscribeOn(Schedulers.io())
            .flatMap(new Function<WebResultsResponse, ObservableSource<List<WebResult>>>() {
                @Override
                public ObservableSource<List<WebResult>> apply(WebResultsResponse response) throws Exception {
                    if(response.getData() != null && response.getData().getResults() != null)
                        resultsList.addAll(response.getData().getResults());

                    return MainApplication.apiProvider.getApiProviderA.getWebResults("query2")
                            .flatMap(new Function<WebResultsResponse, ObservableSource<List<WebResult>>>() {
                                @Override
                                public ObservableSource<List<WebResult>> apply(WebResultsResponse response) throws Exception {
                                    if(response.getData() != null && response.getData().getResults() != null)
                                        resultsList.addAll(response.getData().getResults());

                                    return MainApplication.apiProvider.getApiProviderA.getWebResults("query3")
                                            .flatMap(new Function<WebResultsResponse, ObservableSource<List<WebResult>>>() {
                                                @Override
                                                public ObservableSource<List<WebResult>> apply(WebResultsResponse response) throws Exception {
                                                    if(response.getData() != null && response.getData().getResults() != null)
                                                        resultsList.addAll(response.getData().getResults());

                                                    return Observable.just(resultsList);
                                                }
                                            });
                                }
                            });
                }
            })
            .observeOn(AndroidSchedulers.mainThread());


    observer = new DisposableObserver<List<WebResult>>() {
        @Override
        public void onNext(List<WebResult> results) {
            // do something with results
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };

    observable.subscribe(observer);
}

flatMap() 的用法是否正确?我可以以某种方式将 resultsList 传递到链中而不是将其声明为全局变量吗?

如果你不在乎哪个是第一个returns,你可以简单地合并它们

Observable<List<WebResult>> observable = MainApplication.apiProvider.getApiProviderA.getWebResults("query1")
                .mergeWith(MainApplication.apiProvider.getApiProviderA.getWebResults("query2"))
                .mergeWith(MainApplication.apiProvider.getApiProviderA.getWebResults("query3"))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

在 onNext() 中,您将获得每个结果作为它自己的列表,如果您想在所有结果都完成后获得结果,您可以使用

Observable<List<WebResult>> observable = Observable.zip(MainApplication.apiProvider.getApiProviderA.getWebResults("query1"), MainApplication.apiProvider.getApiProviderA.getWebResults("query2"), MainApplication.apiProvider.getApiProviderA.getWebResults("query3"), (webResults, webResults2, webResults3) -> {
            List<WebResult> allResults = new ArrayList<>();
            allResults.addAll(webResults);
            allResults.addAll(webResults2);
            allResults.addAll(webResults3);
            return allResults;
        });

并且在 onNext() 中,您将获得一次发射,所有结果加在一起

的帮助下,我使用 zip 想出了这个解决方案:

List<Observable<WebResultsResponse>> results = new ArrayList<>();
results.add(MainApplication.apiProvider.getApiProviderA.getWebResults("query1"));
results.add(MainApplication.apiProvider.getApiProviderA.getWebResults("query2"));
results.add(MainApplication.apiProvider.getApiProviderA.getWebResults("query3"));

Observable<List<WebResult>> observable = Observable.zip(results, new Function<Object[], List<WebResult>>() {
    @Override
    public List<WebResult> apply(Object[] responses) throws Exception {
        List<WebResult> allResults = new ArrayList<>();
        for(int i=0; i<responses.length; i++) {
            WebResultsResponse response = (WebResultsResponse)responses[i];
            if(response != null && response.getData() != null && response.getData().getResults() != null)
                allResults.addAll(response.getData().getResults());
        }
        return allResults;
    }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io());