RxJava 合并 Observables 但只从第一个获取结果

RxJava merge Observables but take result only from first

我想实现这样的目标: 我有生成可观察对象的请求,而且我有一种可以作为行为主题的内存缓存。在我的代码中,我想以某种方式合并这两个 observable,如果 subject 有一些数据,我想获取这些数据并且不开始请求 observable。可以实现这样的事情吗?

目前我正在做这样的事情:

val resp = run { run(Task.GetMessages()) }

return InboxModel(news = Observable.merge(IoImplementation.cachedUserMessages(), resp).distinctUntilChanged())

但我觉得这样不好

听起来你想尝试在可观察缓存中找到工作,只有当它为空时才尝试做真正的工作。 observable.switchIfEmpty(Observable) 运算符应该做你需要的。如果您的缓存 observable 为空,那么它将订阅真正的工作 observable。

public void handleRequests(Observable<Request> requests) {
    requests.flatMap(Request r -> {
            String key = r.getSomeCacheKey();
            return getFromCache(key)
                .switchIfEmpty(doRealWork(r)
                    .doOnNext(saveToCache(key))
                );
        })
        .subscribe((String response) -> System.out::println);
}

// hashmap based cache may not be what you want but i'm 
// including it for completeness of example
Map<String, Observable<String>> cache = //...

Observable<String> getFromCache(String key) {
    return cache.get(key);
}

Action1<String> saveToCache(String key) {
    return (String response) -> cache.put(key, response);
}

Observable<String> doRealWork(Request someRequest) {
    // imagine real work here
    return Observable.just("some response");
}