RxJava - 重复 API 调用直到返回所有项目
RxJava - Repeat API calls until all item returned
我有一个 API 调用 returns 按页排列的项目列表。我用retrofit实现,界面是:
Observable<QueryResult> queryData(@Body QueryParams params);
QueryParams 和 QueryResult 定义为:
class QueryParams {
int pageIndex, pageSize; // for pagination;
... // other query criteria
}
class QueryResult {
int pageIndex, pageSize;
int totalCount; // This is the total data size which is used to know if there are still data to retreat.
... // List of data returned by page;
}
然后我使用此代码获取 100 个数据项的第一页:
params.pageIndex = 1;
params.pageSize = 100;
queryData(params).subscribe(...);
API的设计目的是逐页获取数据列表,这样我就可以高效地响应UI表示。
不知何故,在某些情况下,我需要一次获取所有数据并处理一些任务,然后再表示给 UI。使用这样设计的界面,我必须多次调用 queryData() 直到获取所有数据或至少两次(第一个获取 totalCount 并将其传递给 pageSize 进行第二次调用)。
所以,我的问题是如何使用 RxJava 礼仪链接 API 调用来获取所有数据?
提前致谢。
更新来自@Abu
的解决方案
Observable<QueryResult> query(final QueryParams params) {
return queryData(params)
.concatMap(new Func1<QueryResult, Observable<QueryResult>>() {
@Override
public Observable<QueryResult> call(final QueryResult result) {
int retrievedCount = result.getPageSize() * (result.getPageIndex() - 1) + result.resultList.size();
if (retrievedCount >= result.getCount()) {
return Observable.just(result);
}
QueryParams nextParams = params.clone();
nextParams.setPageIndex(results.getPageIndex() + 1);
return query(nextParams).map(new Func1<QueryResult, QueryResult>() {
@Override
public QueryResult call(QueryResult nextResult) {
nextResult.resultList.addAll(result.resultList);
return nextResult;
}
});
}
}
一个可能是用 concatMap
和 concatWith
运算符递归地做。
这是示例代码。
private Observable<List<Integer>> getResponse(final int index) {
return getData(index)
.concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
@Override
public ObservableSource<? extends List<Integer>> apply(List<Integer> integers) throws Exception {
if (index == 10) {
return Observable.just(integers);
}else {
return Observable.just(integers)
.concatWith(getResponse(index + 1));
}
}
});
}
private Observable<List<Integer>> getData(int index){
List<Integer> dataList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
dataList.add(index*10 + i);
}
return Observable.just(dataList);
}
用法:
getResponse(1)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, throwable.getMessage());
}
});
这将按顺序递归地为您提供所有数据。您将获得第一个索引 1 的数据,它们的索引 2 ,.......
如果有更好的解决方案我等着看。
编辑:
要获取完整的数据使用列表,可以这样更新您的代码:
private Observable<List<Integer>> getResponse(final int index) {
return getData(index)
.concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
@Override
public ObservableSource<? extends List<Integer>> apply(final List<Integer> integerList) throws Exception {
if (index < 9){
return getResponse(index+1)
.map(new Function<List<Integer>, List<Integer>>() {
@Override
public List<Integer> apply(List<Integer> integers) throws Exception {
integers.addAll(integerList);
return integers;
}
});
}else {
return Observable.just(integerList);
}
}
});
}
private Observable<List<Integer>> getData(int index){
Util.printThreadInfo(index);
final List<Integer> dataList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
dataList.add(index*10 + i);
}
return Observable.just(dataList);
}
用法:
Observable.defer(new Callable<ObservableSource<? extends List<Integer>>>() {
@Override
public ObservableSource<? extends List<Integer>> call() throws Exception {
return getResponse(1);
}
}).subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Collections.sort(integers);
Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, throwable.getMessage());
}
});
这将立即为您提供完整的数据。
我认为您不应该以这种方式获取所有数据,因为如果您的页面大小为 100,那么您将创建 100 个网络调用。你api应该给你一个电话的所有数据。
我只是更新了我的答案以说明如何做到这一点。
我有一个 API 调用 returns 按页排列的项目列表。我用retrofit实现,界面是:
Observable<QueryResult> queryData(@Body QueryParams params);
QueryParams 和 QueryResult 定义为:
class QueryParams {
int pageIndex, pageSize; // for pagination;
... // other query criteria
}
class QueryResult {
int pageIndex, pageSize;
int totalCount; // This is the total data size which is used to know if there are still data to retreat.
... // List of data returned by page;
}
然后我使用此代码获取 100 个数据项的第一页:
params.pageIndex = 1;
params.pageSize = 100;
queryData(params).subscribe(...);
API的设计目的是逐页获取数据列表,这样我就可以高效地响应UI表示。
不知何故,在某些情况下,我需要一次获取所有数据并处理一些任务,然后再表示给 UI。使用这样设计的界面,我必须多次调用 queryData() 直到获取所有数据或至少两次(第一个获取 totalCount 并将其传递给 pageSize 进行第二次调用)。
所以,我的问题是如何使用 RxJava 礼仪链接 API 调用来获取所有数据?
提前致谢。
更新来自@Abu
的解决方案Observable<QueryResult> query(final QueryParams params) {
return queryData(params)
.concatMap(new Func1<QueryResult, Observable<QueryResult>>() {
@Override
public Observable<QueryResult> call(final QueryResult result) {
int retrievedCount = result.getPageSize() * (result.getPageIndex() - 1) + result.resultList.size();
if (retrievedCount >= result.getCount()) {
return Observable.just(result);
}
QueryParams nextParams = params.clone();
nextParams.setPageIndex(results.getPageIndex() + 1);
return query(nextParams).map(new Func1<QueryResult, QueryResult>() {
@Override
public QueryResult call(QueryResult nextResult) {
nextResult.resultList.addAll(result.resultList);
return nextResult;
}
});
}
}
一个可能是用 concatMap
和 concatWith
运算符递归地做。
这是示例代码。
private Observable<List<Integer>> getResponse(final int index) {
return getData(index)
.concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
@Override
public ObservableSource<? extends List<Integer>> apply(List<Integer> integers) throws Exception {
if (index == 10) {
return Observable.just(integers);
}else {
return Observable.just(integers)
.concatWith(getResponse(index + 1));
}
}
});
}
private Observable<List<Integer>> getData(int index){
List<Integer> dataList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
dataList.add(index*10 + i);
}
return Observable.just(dataList);
}
用法:
getResponse(1)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, throwable.getMessage());
}
});
这将按顺序递归地为您提供所有数据。您将获得第一个索引 1 的数据,它们的索引 2 ,.......
如果有更好的解决方案我等着看。
编辑:
要获取完整的数据使用列表,可以这样更新您的代码:
private Observable<List<Integer>> getResponse(final int index) {
return getData(index)
.concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
@Override
public ObservableSource<? extends List<Integer>> apply(final List<Integer> integerList) throws Exception {
if (index < 9){
return getResponse(index+1)
.map(new Function<List<Integer>, List<Integer>>() {
@Override
public List<Integer> apply(List<Integer> integers) throws Exception {
integers.addAll(integerList);
return integers;
}
});
}else {
return Observable.just(integerList);
}
}
});
}
private Observable<List<Integer>> getData(int index){
Util.printThreadInfo(index);
final List<Integer> dataList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
dataList.add(index*10 + i);
}
return Observable.just(dataList);
}
用法:
Observable.defer(new Callable<ObservableSource<? extends List<Integer>>>() {
@Override
public ObservableSource<? extends List<Integer>> call() throws Exception {
return getResponse(1);
}
}).subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Collections.sort(integers);
Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, throwable.getMessage());
}
});
这将立即为您提供完整的数据。
我认为您不应该以这种方式获取所有数据,因为如果您的页面大小为 100,那么您将创建 100 个网络调用。你api应该给你一个电话的所有数据。
我只是更新了我的答案以说明如何做到这一点。