RxJava - 重复 API 调用直到返回所有项目

RxJava - Repeat API calls until all item returned

我有一个 API 调用 returns 按页排列的项目列表。我用retrofit实现,界面是:

Observable<QueryResult> queryData(@Body QueryParams params);

QueryParams 和 QueryResult 定义为:

class QueryParams {
    int pageIndex, pageSize; // for pagination;
    ... // other query criteria
}

class QueryResult {
    int pageIndex, pageSize;
    int totalCount; // This is the total data size which is used to know if there are still data to retreat.
    ... // List of data returned by page;
}

然后我使用此代码获取 100 个数据项的第一页:

params.pageIndex = 1;
params.pageSize = 100;
queryData(params).subscribe(...);

API的设计目的是逐页获取数据列表,这样我就可以高效地响应UI表示。

不知何故,在某些情况下,我需要一次获取所有数据并处理一些任务,然后再表示给 UI。使用这样设计的界面,我必须多次调用 queryData() 直到获取所有数据或至少两次(第一个获取 totalCount 并将其传递给 pageSize 进行第二次调用)。

所以,我的问题是如何使用 RxJava 礼仪链接 API 调用来获取所有数据?

提前致谢。

更新来自@Abu

的解决方案
Observable<QueryResult> query(final QueryParams params) {
    return queryData(params)
            .concatMap(new Func1<QueryResult, Observable<QueryResult>>() {
                @Override
                public Observable<QueryResult> call(final QueryResult result) {
                    int retrievedCount = result.getPageSize() * (result.getPageIndex() - 1) + result.resultList.size();
                    if (retrievedCount >= result.getCount()) {
                        return Observable.just(result);
                    }

                    QueryParams nextParams = params.clone();
                    nextParams.setPageIndex(results.getPageIndex() + 1);
                    return query(nextParams).map(new Func1<QueryResult, QueryResult>() {
                        @Override
                        public QueryResult call(QueryResult nextResult) {
                            nextResult.resultList.addAll(result.resultList);
                            return nextResult;
                        }
                    });
                }
}

一个可能是用 concatMapconcatWith 运算符递归地做。

这是示例代码。

    private Observable<List<Integer>> getResponse(final int index) {



    return getData(index)
            .concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
                @Override
                public ObservableSource<? extends List<Integer>> apply(List<Integer> integers) throws Exception {

                    if (index == 10) {
                        return Observable.just(integers);
                    }else {
                        return Observable.just(integers)
                                .concatWith(getResponse(index + 1));
                    }
                }
            });
   }


   private Observable<List<Integer>> getData(int index){

      List<Integer> dataList = new ArrayList<>();

      for (int i = 0; i < 10; i++) {
          dataList.add(index*10 + i);
      }

      return Observable.just(dataList);

   }

用法:

        getResponse(1)
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e(TAG, throwable.getMessage());
                }
            });

这将按顺序递归地为您提供所有数据。您将获得第一个索引 1 的数据,它们的索引 2 ,.......

如果有更好的解决方案我等着看。

编辑:

要获取完整的数据使用列表,可以这样更新您的代码:

    private Observable<List<Integer>> getResponse(final int index) {

    return getData(index)
            .concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
                @Override
                public ObservableSource<? extends List<Integer>> apply(final List<Integer> integerList) throws Exception {

                    if (index < 9){
                        return getResponse(index+1)
                                .map(new Function<List<Integer>, List<Integer>>() {
                                    @Override
                                    public List<Integer> apply(List<Integer> integers) throws Exception {
                                        integers.addAll(integerList);
                                        return integers;
                                    }
                                });
                    }else {
                        return Observable.just(integerList);
                    }

                }
            });
}


private Observable<List<Integer>> getData(int index){

    Util.printThreadInfo(index);

    final List<Integer> dataList = new ArrayList<>();

    for (int i = 0; i < 10; i++) {
        dataList.add(index*10 + i);
    }

    return Observable.just(dataList);

}

用法:

        Observable.defer(new Callable<ObservableSource<? extends List<Integer>>>() {
        @Override
        public ObservableSource<? extends List<Integer>> call() throws Exception {
            return getResponse(1);
        }
    }).subscribeOn(Schedulers.io())
            .subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    Collections.sort(integers);
                    Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e(TAG, throwable.getMessage());
                }
            });

这将立即为您提供完整的数据。

我认为您不应该以这种方式获取所有数据,因为如果您的页面大小为 100,那么您将创建 100 个网络调用。你api应该给你一个电话的所有数据。

我只是更新了我的答案以说明如何做到这一点。