Retrofit 和 RxJava:做多个独立的请求并检测它们何时全部完成
Retrofit & RxJava : do multiple independant request and detect when they are all completed
在启动我的应用程序时,我需要加载多个独立数据并检查它们是否都已正确加载。
我所做的是:首先进行身份验证,然后发送所有加载数据的请求。
每个调用都是独立的,它只是用来更新本地数据库。
通话效果很好。
但是我如何得知所有请求都已完成?
public static void loadData() {
LoginService loginService =
RetrofitHelper.createService(LoginService.class);
user = new User(ApplicationCore.syncLogin, ApplicationCore.syncPassword);
loginService.login(user)
.doOnNext(accessToken -> storeCredentials(accessToken))
.doOnNext(a -> processData1())
.doOnNext(a -> processData2())
.doOnNext(a -> processData3())
.doOnNext(a -> processData4())
.subscribeOn(Schedulers.io())
.onErrorResumeNext(Observable.empty())
.subscribe(a -> Log.d("XXX","*********** END *********"));
}
private static DisposableSingleObserver processData1 () {
return RetrofitHelper.createService(Data1Service.class, true, authType, authToken).fetchAll(ApplicationCore.appVersionNum)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<List<Data1>>() {
@Override
public void onSuccess(List<Data1> dataList) {
if (Data1DB.updateData(new ArrayList<>(dataList)) ) { // success
} else {
Log.d(TAG,"Error on processData1");
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"Error on processData1",e);
}
});
}
在我的日志中,我看到“*********** END *********”在收到数据之前的网络调用后立即显示。
我希望能够观察到最后一个数据被处理的时间。
有几个选项可用,但请考虑这个(伪代码):
loginService.login(user)
.doOnNext(accessToken -> storeCredentials(accessToken))
.flatMapCompletable(a -> Completable.mergeArray(
fetchAll1(...)
.map(data->...Data1DB.updateData...)
.toCompletable(),
fetchAll2(...)
...
)
)
.subscribe(...)
现在是这样实现的。
如果每个 processDataX 函数中没有 "subscribeOn",则每个 "processData1()" 将按顺序执行,如果添加了 "subscribeOn",则将并行执行。
然后当所有完成后执行订阅。
public static void loadRefMatrix() {
LoginService loginService =
RetrofitHelper.createService(LoginService.class);
user = new User(ApplicationCore.syncLogin, ApplicationCore.syncPassword);
loginService.login(user)
.doOnNext(accessToken -> storeCredentials(accessToken))
.flatMapCompletable(a -> Completable.mergeArray(
processData1(),
processData2(),
processData3(),
processData4(),
processData5(),
processData6(),
processData7()
)
)
.subscribeOn(Schedulers.io())
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
Log.d(TAG, "*********** END *********");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "*********** ERROR *********");
}
})
)
;
}
private static Completable processData1 () {
return RetrofitHelper.createService(Data1Service.class, true, authType, authToken).fetchAll(ApplicationCore.appVersionNum)
.map(data -> {
if (Data1DB.updatData(new ArrayList<>(data)) ) { // success
} else {
Log.d(TAG,"Error on processAcuityMatrix do data");
}
return true;
})
.subscribeOn(Schedulers.io()) // <-- this to process in parallele
.toCompletable();
}
在启动我的应用程序时,我需要加载多个独立数据并检查它们是否都已正确加载。
我所做的是:首先进行身份验证,然后发送所有加载数据的请求。 每个调用都是独立的,它只是用来更新本地数据库。 通话效果很好。
但是我如何得知所有请求都已完成?
public static void loadData() {
LoginService loginService =
RetrofitHelper.createService(LoginService.class);
user = new User(ApplicationCore.syncLogin, ApplicationCore.syncPassword);
loginService.login(user)
.doOnNext(accessToken -> storeCredentials(accessToken))
.doOnNext(a -> processData1())
.doOnNext(a -> processData2())
.doOnNext(a -> processData3())
.doOnNext(a -> processData4())
.subscribeOn(Schedulers.io())
.onErrorResumeNext(Observable.empty())
.subscribe(a -> Log.d("XXX","*********** END *********"));
}
private static DisposableSingleObserver processData1 () {
return RetrofitHelper.createService(Data1Service.class, true, authType, authToken).fetchAll(ApplicationCore.appVersionNum)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<List<Data1>>() {
@Override
public void onSuccess(List<Data1> dataList) {
if (Data1DB.updateData(new ArrayList<>(dataList)) ) { // success
} else {
Log.d(TAG,"Error on processData1");
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"Error on processData1",e);
}
});
}
在我的日志中,我看到“*********** END *********”在收到数据之前的网络调用后立即显示。
我希望能够观察到最后一个数据被处理的时间。
有几个选项可用,但请考虑这个(伪代码):
loginService.login(user)
.doOnNext(accessToken -> storeCredentials(accessToken))
.flatMapCompletable(a -> Completable.mergeArray(
fetchAll1(...)
.map(data->...Data1DB.updateData...)
.toCompletable(),
fetchAll2(...)
...
)
)
.subscribe(...)
现在是这样实现的。 如果每个 processDataX 函数中没有 "subscribeOn",则每个 "processData1()" 将按顺序执行,如果添加了 "subscribeOn",则将并行执行。
然后当所有完成后执行订阅。
public static void loadRefMatrix() {
LoginService loginService =
RetrofitHelper.createService(LoginService.class);
user = new User(ApplicationCore.syncLogin, ApplicationCore.syncPassword);
loginService.login(user)
.doOnNext(accessToken -> storeCredentials(accessToken))
.flatMapCompletable(a -> Completable.mergeArray(
processData1(),
processData2(),
processData3(),
processData4(),
processData5(),
processData6(),
processData7()
)
)
.subscribeOn(Schedulers.io())
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
Log.d(TAG, "*********** END *********");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "*********** ERROR *********");
}
})
)
;
}
private static Completable processData1 () {
return RetrofitHelper.createService(Data1Service.class, true, authType, authToken).fetchAll(ApplicationCore.appVersionNum)
.map(data -> {
if (Data1DB.updatData(new ArrayList<>(data)) ) { // success
} else {
Log.d(TAG,"Error on processAcuityMatrix do data");
}
return true;
})
.subscribeOn(Schedulers.io()) // <-- this to process in parallele
.toCompletable();
}