如何确保所有观察者从多个并行订阅中看到相同的结果
How to ensure that all observers see the same result from multiple parallel subscription
我有两个可观察对象。
首先 - 从互联网或缓存中获取会话;
Observable<SessionKey> getSession = getSessionFromInternetOrCache();
第二 - 调用服务器 api,使用会话
Observable<MyResult> apiCall = getSession.flatMap(session -> {
return myApi.getResult(session);
})
问题是我有几个独立的组件 (gui)。它们是并行开始的。 apiCall
也并行启动。我得到了几个会话密钥。
我想要什么行为:getSessions
应该调用一次,其他可观察对象首先等待 getSessions
调用。在第一次 getSessions
调用后,所有 getSessions
都将是 return 缓存会话;
实现该行为的想法是什么?
UPD
行为应该是:从互联网检索会话应该阻止其他 getSessions 调用,直到会话不被缓存。
您可以在 getSessionFromInternetOrCache
中对返回的 Observable 调用 cache()
,这将确保会话仅被检索一次并重播给稍后试图观察它的任何人。我假设实际的会话检索仅在订阅 Observable 时发生。
编辑:这个例子说明了我的意思:
public class SingleSessions {
static Observable<String> getSession;
public static void main(String[] args) throws Exception {
getSession =
Observable.just("abc")
.doOnSubscribe(() -> System.out.println("This should happen once"))
.delay(500 + new Random().nextInt(2) * 700, TimeUnit.MILLISECONDS)
.timeout(1000, TimeUnit.MILLISECONDS, Observable.just("cde"))
.cache();
System.out.println("Asking for the session key");
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
System.out.println("Sleeping...");
Thread.sleep(2000);
System.out.println("...done");
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
}
}
我有两个可观察对象。
首先 - 从互联网或缓存中获取会话;
Observable<SessionKey> getSession = getSessionFromInternetOrCache();
第二 - 调用服务器 api,使用会话
Observable<MyResult> apiCall = getSession.flatMap(session -> {
return myApi.getResult(session);
})
问题是我有几个独立的组件 (gui)。它们是并行开始的。 apiCall
也并行启动。我得到了几个会话密钥。
我想要什么行为:getSessions
应该调用一次,其他可观察对象首先等待 getSessions
调用。在第一次 getSessions
调用后,所有 getSessions
都将是 return 缓存会话;
实现该行为的想法是什么?
UPD 行为应该是:从互联网检索会话应该阻止其他 getSessions 调用,直到会话不被缓存。
您可以在 getSessionFromInternetOrCache
中对返回的 Observable 调用 cache()
,这将确保会话仅被检索一次并重播给稍后试图观察它的任何人。我假设实际的会话检索仅在订阅 Observable 时发生。
编辑:这个例子说明了我的意思:
public class SingleSessions {
static Observable<String> getSession;
public static void main(String[] args) throws Exception {
getSession =
Observable.just("abc")
.doOnSubscribe(() -> System.out.println("This should happen once"))
.delay(500 + new Random().nextInt(2) * 700, TimeUnit.MILLISECONDS)
.timeout(1000, TimeUnit.MILLISECONDS, Observable.just("cde"))
.cache();
System.out.println("Asking for the session key");
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
System.out.println("Sleeping...");
Thread.sleep(2000);
System.out.println("...done");
getSession.subscribe(System.out::println);
getSession.subscribe(System.out::println);
}
}