Async Observable 内部的 Async Observable
Async Observable inside Async Observable
我遇到了一个小问题,我正在使用 RxJava 从服务器检索数据,然后将这些数据与本地的数据进行核对,问题是我开始检查本地的文件并存储数据,然后我启动 observable我在数据完全下载之前获得成功,然后我在成功后收到 "initRemoteData" 结果。
所以我需要做的是在调用 "InitRemoteData" 时阻止可观察对象,当可观察对象给出我继续的结果时。
private Observable<CheckStatus> getCheckObservable() {
return Observable.defer(() -> {
DataDTO dto;
try {
dto = Utils.decryptData(
localfile.getNonce(),
localfile.getEncryptedData(),
password);
} catch (WrongPasswordException e) {
return Observable.just(CheckStatus.WRONG_PASSWORD);
}
try {
storeDataPrefs(dto);
} catch (RuntimeException e) {
return Observable.just(CheckStatus.OTHER_ERROR);
}
storeDatabase(dto);
initRemoteData();//<-Here i did call for another observable but the observable keep going without waiting it to finish.
return Observable.just(CheckStatus.SUCCESS);
});
}
可观察调用:
getCheckObservable()
.subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(Subscribers.create(checkResult -> replaceFragment(
CheckAccountCompleteFragment
.newInstance(checkResult),
AddToBackStack.FALSE)));
更新:
initRemoteData 代码:
Observable.zip(getObservable1()),//remote Data
getObservable2(), // local Data
(observableResult1,observableResult2)->{
doSomethingWith(observableResult1,observableResult2); // compare between data
return null;
}).subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
.observeOn(AndroidSchedulers.mainThread())
.doOnCompleted(() -> {
doSomething2(); // update fields
}
})
.subscribe();
如果我猜对了,您需要让 getCheckObservable()
等待两个结果?如果是:Observable.combineLatest()
可以帮到你。
更新:
好吧,那么为什么不只是 return
这个 "waiting" Observable in "big" 一个
像这样:
Observable.zip(getObservable1()),//remote Data
getObservable2(), // local Data
(observableResult1,observableResult2)->{
doSomethingWith(observableResult1,observableResult2); // compare between data
return CheckStatus.SUCCESS; // <-- CHANGES
}).subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
.observeOn(AndroidSchedulers.mainThread())
.doOnCompleted(() -> {
doSomething2(); // update fields
}
});
和
private Observable<CheckStatus> getCheckObservable() {
return Observable.defer(() -> {
DataDTO dto;
try {
dto = Utils.decryptData(
localfile.getNonce(),
localfile.getEncryptedData(),
password);
} catch (WrongPasswordException e) {
return Observable.just(CheckStatus.WRONG_PASSWORD);
}
try {
storeDataPrefs(dto);
} catch (RuntimeException e) {
return Observable.just(CheckStatus.OTHER_ERROR);
}
storeDatabase(dto);
return initRemoteData(); // <-- CHANGES
});
}
我遇到了一个小问题,我正在使用 RxJava 从服务器检索数据,然后将这些数据与本地的数据进行核对,问题是我开始检查本地的文件并存储数据,然后我启动 observable我在数据完全下载之前获得成功,然后我在成功后收到 "initRemoteData" 结果。
所以我需要做的是在调用 "InitRemoteData" 时阻止可观察对象,当可观察对象给出我继续的结果时。
private Observable<CheckStatus> getCheckObservable() {
return Observable.defer(() -> {
DataDTO dto;
try {
dto = Utils.decryptData(
localfile.getNonce(),
localfile.getEncryptedData(),
password);
} catch (WrongPasswordException e) {
return Observable.just(CheckStatus.WRONG_PASSWORD);
}
try {
storeDataPrefs(dto);
} catch (RuntimeException e) {
return Observable.just(CheckStatus.OTHER_ERROR);
}
storeDatabase(dto);
initRemoteData();//<-Here i did call for another observable but the observable keep going without waiting it to finish.
return Observable.just(CheckStatus.SUCCESS);
});
}
可观察调用:
getCheckObservable()
.subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(Subscribers.create(checkResult -> replaceFragment(
CheckAccountCompleteFragment
.newInstance(checkResult),
AddToBackStack.FALSE)));
更新: initRemoteData 代码:
Observable.zip(getObservable1()),//remote Data
getObservable2(), // local Data
(observableResult1,observableResult2)->{
doSomethingWith(observableResult1,observableResult2); // compare between data
return null;
}).subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
.observeOn(AndroidSchedulers.mainThread())
.doOnCompleted(() -> {
doSomething2(); // update fields
}
})
.subscribe();
如果我猜对了,您需要让 getCheckObservable()
等待两个结果?如果是:Observable.combineLatest()
可以帮到你。
更新:
好吧,那么为什么不只是 return
这个 "waiting" Observable in "big" 一个
像这样:
Observable.zip(getObservable1()),//remote Data
getObservable2(), // local Data
(observableResult1,observableResult2)->{
doSomethingWith(observableResult1,observableResult2); // compare between data
return CheckStatus.SUCCESS; // <-- CHANGES
}).subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
.observeOn(AndroidSchedulers.mainThread())
.doOnCompleted(() -> {
doSomething2(); // update fields
}
});
和
private Observable<CheckStatus> getCheckObservable() {
return Observable.defer(() -> {
DataDTO dto;
try {
dto = Utils.decryptData(
localfile.getNonce(),
localfile.getEncryptedData(),
password);
} catch (WrongPasswordException e) {
return Observable.just(CheckStatus.WRONG_PASSWORD);
}
try {
storeDataPrefs(dto);
} catch (RuntimeException e) {
return Observable.just(CheckStatus.OTHER_ERROR);
}
storeDatabase(dto);
return initRemoteData(); // <-- CHANGES
});
}