RxJava - 多播可完成结果
RxJava - Multicast completable result
我有一个 Completable 可以按顺序发出两个网络请求(类似于握手)。这个操作可以在我的代码中的很多地方触发。我想要实现的是:
- 当 completable 在它已经开始工作时被多次调用时,它不应该重新启动,而是只创建一个 "handshake",然后 return 向所有订阅者发送 "onComplete" 事件
- 当第二次调用completable,但它已经完成了它的工作或之前遇到错误时,应该重新启动它(它不应该简单地缓存以前的结果)。
我尝试应用不同的解决方案,包括 this one,但出于某种原因,我仍然可以看到并行发出多个请求。
public Completable prepareMuticastCompletable() {
return networkService.makeHandshake() // Completable, makes two network requests
// transform it into ConnectableObservable
.toObservable()
.replay()
// stay connected as long as there are any subscribers
.refCount()
// transform it back to Completable
.ignoreElements()
.doOnSubscribe(disposable -> Log.d("Test", "subscribe"));
}
如果我尝试一次多次订阅此 Completable,我希望看到多个 "onComplete" 事件,但操作只执行一次(两个 HTTP 请求)。
D/Test: subscribe
D/Test: subscribe
D/OkHttp: --> GET https://.../startHandshake
D/OkHttp: --> GET https://.../startHandshake
D/OkHttp: <-- 200 OK https://.../startHandshake (503ms, 184-byte body)
D/OkHttp: <-- 200 OK https://.../startHandshake (508ms, 184-byte body)
D/OkHttp: --> POST https://.../finishHandshake (151-byte body)
D/OkHttp: --> POST https://.../finishHandshake (151-byte body)
D/OkHttp: <-- 200 OK https://.../finishHandshake (151ms, 196-byte body)
D/OkHttp: <-- 200 OK https://.../finishHandshake (150ms, 196-byte body)
D/OkHttp: done 2
D/OkHttp: done 1
我错过了什么?我应该使用其他 RxJava 方法吗?
编辑:
订阅(用于测试)如下所示:
private Completable prepareCompletable() {
return prepareMuticastCompletable()
.andThen(someOtherNotRelevantCompletable());
}
//...
prepareCompletable()
.doOnComplete(() -> Log.d("OkHttp", "done 1"))
.subscribe();
prepareCompletable()
.doOnComplete(() -> Log.d("OkHttp", "done 2"))
.subscribe();
您必须缓存冷序列并以某种方式重用它:
Completable c = prepareCompletable()
c.doOnComplete(() -> Log.d("OkHttp", "done 1"))
.subscribe();
c.doOnComplete(() -> Log.d("OkHttp", "done 2"))
.subscribe();
但请注意,如果第一个响应很快,您仍然会接到两次电话。
我有一个 Completable 可以按顺序发出两个网络请求(类似于握手)。这个操作可以在我的代码中的很多地方触发。我想要实现的是:
- 当 completable 在它已经开始工作时被多次调用时,它不应该重新启动,而是只创建一个 "handshake",然后 return 向所有订阅者发送 "onComplete" 事件
- 当第二次调用completable,但它已经完成了它的工作或之前遇到错误时,应该重新启动它(它不应该简单地缓存以前的结果)。
我尝试应用不同的解决方案,包括 this one,但出于某种原因,我仍然可以看到并行发出多个请求。
public Completable prepareMuticastCompletable() {
return networkService.makeHandshake() // Completable, makes two network requests
// transform it into ConnectableObservable
.toObservable()
.replay()
// stay connected as long as there are any subscribers
.refCount()
// transform it back to Completable
.ignoreElements()
.doOnSubscribe(disposable -> Log.d("Test", "subscribe"));
}
如果我尝试一次多次订阅此 Completable,我希望看到多个 "onComplete" 事件,但操作只执行一次(两个 HTTP 请求)。
D/Test: subscribe
D/Test: subscribe
D/OkHttp: --> GET https://.../startHandshake
D/OkHttp: --> GET https://.../startHandshake
D/OkHttp: <-- 200 OK https://.../startHandshake (503ms, 184-byte body)
D/OkHttp: <-- 200 OK https://.../startHandshake (508ms, 184-byte body)
D/OkHttp: --> POST https://.../finishHandshake (151-byte body)
D/OkHttp: --> POST https://.../finishHandshake (151-byte body)
D/OkHttp: <-- 200 OK https://.../finishHandshake (151ms, 196-byte body)
D/OkHttp: <-- 200 OK https://.../finishHandshake (150ms, 196-byte body)
D/OkHttp: done 2
D/OkHttp: done 1
我错过了什么?我应该使用其他 RxJava 方法吗?
编辑: 订阅(用于测试)如下所示:
private Completable prepareCompletable() {
return prepareMuticastCompletable()
.andThen(someOtherNotRelevantCompletable());
}
//...
prepareCompletable()
.doOnComplete(() -> Log.d("OkHttp", "done 1"))
.subscribe();
prepareCompletable()
.doOnComplete(() -> Log.d("OkHttp", "done 2"))
.subscribe();
您必须缓存冷序列并以某种方式重用它:
Completable c = prepareCompletable()
c.doOnComplete(() -> Log.d("OkHttp", "done 1"))
.subscribe();
c.doOnComplete(() -> Log.d("OkHttp", "done 2"))
.subscribe();
但请注意,如果第一个响应很快,您仍然会接到两次电话。