RxJava 合并 Observables 但只从第一个获取结果
RxJava merge Observables but take result only from first
我想实现这样的目标:
我有生成可观察对象的请求,而且我有一种可以作为行为主题的内存缓存。在我的代码中,我想以某种方式合并这两个 observable,如果 subject 有一些数据,我想获取这些数据并且不开始请求 observable。可以实现这样的事情吗?
目前我正在做这样的事情:
val resp = run { run(Task.GetMessages()) }
return InboxModel(news = Observable.merge(IoImplementation.cachedUserMessages(), resp).distinctUntilChanged())
但我觉得这样不好
听起来你想尝试在可观察缓存中找到工作,只有当它为空时才尝试做真正的工作。 observable.switchIfEmpty(Observable)
运算符应该做你需要的。如果您的缓存 observable 为空,那么它将订阅真正的工作 observable。
public void handleRequests(Observable<Request> requests) {
requests.flatMap(Request r -> {
String key = r.getSomeCacheKey();
return getFromCache(key)
.switchIfEmpty(doRealWork(r)
.doOnNext(saveToCache(key))
);
})
.subscribe((String response) -> System.out::println);
}
// hashmap based cache may not be what you want but i'm
// including it for completeness of example
Map<String, Observable<String>> cache = //...
Observable<String> getFromCache(String key) {
return cache.get(key);
}
Action1<String> saveToCache(String key) {
return (String response) -> cache.put(key, response);
}
Observable<String> doRealWork(Request someRequest) {
// imagine real work here
return Observable.just("some response");
}
我想实现这样的目标: 我有生成可观察对象的请求,而且我有一种可以作为行为主题的内存缓存。在我的代码中,我想以某种方式合并这两个 observable,如果 subject 有一些数据,我想获取这些数据并且不开始请求 observable。可以实现这样的事情吗?
目前我正在做这样的事情:
val resp = run { run(Task.GetMessages()) }
return InboxModel(news = Observable.merge(IoImplementation.cachedUserMessages(), resp).distinctUntilChanged())
但我觉得这样不好
听起来你想尝试在可观察缓存中找到工作,只有当它为空时才尝试做真正的工作。 observable.switchIfEmpty(Observable)
运算符应该做你需要的。如果您的缓存 observable 为空,那么它将订阅真正的工作 observable。
public void handleRequests(Observable<Request> requests) {
requests.flatMap(Request r -> {
String key = r.getSomeCacheKey();
return getFromCache(key)
.switchIfEmpty(doRealWork(r)
.doOnNext(saveToCache(key))
);
})
.subscribe((String response) -> System.out::println);
}
// hashmap based cache may not be what you want but i'm
// including it for completeness of example
Map<String, Observable<String>> cache = //...
Observable<String> getFromCache(String key) {
return cache.get(key);
}
Action1<String> saveToCache(String key) {
return (String response) -> cache.put(key, response);
}
Observable<String> doRealWork(Request someRequest) {
// imagine real work here
return Observable.just("some response");
}