如何在 flatMap() 中等待 Observable 的 onComplete()
How to wait for Observable's onComplete() in flatMap()
我有一个 Observable
从数据库中读取数据。如果数据是 null
我需要从网络上获取它。因此,我首先 Observable
执行 flatMap
,检查数据库操作的结果,如果是 null
,我将启动另一个 Observable
从网络获取数据。
注意: Observable
有不同的Subscriber
是因为我根据数据的来源(这样的逻辑)有不同的后处理。
Observable.just(readDataFromDb()).flatMap(new Func1<SomeData, Observable<String>>() {
@Override public Observable<SomeData> call(SomeData s) {
if (s == null) {
getReadFromNetworkObservable().subscribe(new AnotherSubscriber()); // this one might not complete
return Observable.empty(); // I think I need to send this one only after readFromNetwork() completed
} else {
return Observable.just(s);
}
}
}).subscribe(new SomeSubscirber());
鉴于我发送 Observable.empty()
以排除 SomeSubscriber
的数据处理,我有预感我的第二个 Observable
不能总是完成,因为它可能只是垃圾收集。我想我在测试时看到了它。
至此,我想我只需要等到Observable
谁从网络读取完成再发送Observable.empty()
。那么我可以使执行同步吗?但我仍然觉得我做错了。
您可以使用 .toBlocking
快捷方式将任何 observable 设为阻塞(查看完整信息 https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators)
Data d = getReadFromNetworkObservable()
.toBlocking()
.first() // or single() or singleOrDefault()
// manipulate with data here
此处描述了将缓存与网络数据相结合:http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/
这里:RxJava and Cached Data
我有一个 Observable
从数据库中读取数据。如果数据是 null
我需要从网络上获取它。因此,我首先 Observable
执行 flatMap
,检查数据库操作的结果,如果是 null
,我将启动另一个 Observable
从网络获取数据。
注意: Observable
有不同的Subscriber
是因为我根据数据的来源(这样的逻辑)有不同的后处理。
Observable.just(readDataFromDb()).flatMap(new Func1<SomeData, Observable<String>>() {
@Override public Observable<SomeData> call(SomeData s) {
if (s == null) {
getReadFromNetworkObservable().subscribe(new AnotherSubscriber()); // this one might not complete
return Observable.empty(); // I think I need to send this one only after readFromNetwork() completed
} else {
return Observable.just(s);
}
}
}).subscribe(new SomeSubscirber());
鉴于我发送 Observable.empty()
以排除 SomeSubscriber
的数据处理,我有预感我的第二个 Observable
不能总是完成,因为它可能只是垃圾收集。我想我在测试时看到了它。
至此,我想我只需要等到Observable
谁从网络读取完成再发送Observable.empty()
。那么我可以使执行同步吗?但我仍然觉得我做错了。
您可以使用 .toBlocking
快捷方式将任何 observable 设为阻塞(查看完整信息 https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators)
Data d = getReadFromNetworkObservable()
.toBlocking()
.first() // or single() or singleOrDefault()
// manipulate with data here
此处描述了将缓存与网络数据相结合:http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/
这里:RxJava and Cached Data