如何在 RxJava 中制作冷单 'hot'
How to make a cold Single 'hot' in RxJava
我有一些代码可以发出网络请求,每个代码 returns 一个 Single
。
public Single<Response> getSomeData(String endpoint)
{
return Single.fromCallable(makeNewRequest(endpoint));
}
只要调用者实际 subscribes()
到返回的 Single
,这就有效。但是在某些特定情况下(对于 POST 请求),调用者对结果不感兴趣,并且从不调用 subscribe()
.
由于fromCallable()
将执行推迟到订阅时间,这意味着请求实际上不会被执行。
问题:将此(冷)Single
转换为热的正确(惯用)方法是什么,以便立即执行网络请求,而不管调用者是否调用 subscribe()
?
我会说你需要 publish
operator combined with connect
. Before that, you'll need to convert the Single
into an Observable
with toObservable()
此外,请注意,如果调用者对实际结果不感兴趣,Completable
套件可能更好。您可以使用 toCompletable()
将 Single
转换为 Completable
您可以使用 SingleSubject
缓存任何相关方的响应,但请注意,您必须以某种方式向此 API 调用添加计划和可能的取消:
public Single<Response> getSomeData(String endpoint,
Scheduler scheduler, Consumer<? super Disposable> dispose) {
Callable<Response> requestor = makeNewRequest(endpoint);
SingleSubject<Response> result = SingleSubject.create();
Disposable d = scheduler.scheduleDirect(() -> {
try {
result.onSuccess(requestor.call());
} catch (Exception ex) {
result.onError(ex);
}
});
if (dispose != null) {
dispose.accept(d);
}
return result;
}
请注意,尽管此设置现在非常热门,但您必须再次调用 getSomeData()
才能向同一端点发出新请求并获得新结果。
我终于通过使用 Single.cache()
和订阅虚拟观察者解决了这个问题。这样,无论 API 客户端是否实际订阅了返回的可观察对象,都会执行请求。代码如下所示:
public Single<Response> getSomeData(String endpoint)
{
Single<Response> res = Single.fromCallable(makeNewRequest(endpoint))
.subscribeOn(Schedulers.io()) // if needed
.cache();
res.subscribe(dummyObserver);
return res;
}
private static final SingleObserver<Response> dummyObserver = new SingleObserver<Response>() {
public void onSubscribe(Disposable d) { }
public void onSuccess(Response s) { }
public void onError(Throwable e) { }
};
更新:
这是另一种不需要虚拟观察者对象的方法:
public Single<Response> getSomeData(String endpoint)
{
Single<Response> res = Single.fromCallable(makeNewRequest(endpoint))
.subscribeOn(Schedulers.io()); // if needed
SingleSubject subject = SingleSubject.create();
res.subscribe(subject);
return subject;
}
我有一些代码可以发出网络请求,每个代码 returns 一个 Single
。
public Single<Response> getSomeData(String endpoint)
{
return Single.fromCallable(makeNewRequest(endpoint));
}
只要调用者实际 subscribes()
到返回的 Single
,这就有效。但是在某些特定情况下(对于 POST 请求),调用者对结果不感兴趣,并且从不调用 subscribe()
.
由于fromCallable()
将执行推迟到订阅时间,这意味着请求实际上不会被执行。
问题:将此(冷)Single
转换为热的正确(惯用)方法是什么,以便立即执行网络请求,而不管调用者是否调用 subscribe()
?
我会说你需要 publish
operator combined with connect
. Before that, you'll need to convert the Single
into an Observable
with toObservable()
此外,请注意,如果调用者对实际结果不感兴趣,Completable
套件可能更好。您可以使用 toCompletable()
Single
转换为 Completable
您可以使用 SingleSubject
缓存任何相关方的响应,但请注意,您必须以某种方式向此 API 调用添加计划和可能的取消:
public Single<Response> getSomeData(String endpoint,
Scheduler scheduler, Consumer<? super Disposable> dispose) {
Callable<Response> requestor = makeNewRequest(endpoint);
SingleSubject<Response> result = SingleSubject.create();
Disposable d = scheduler.scheduleDirect(() -> {
try {
result.onSuccess(requestor.call());
} catch (Exception ex) {
result.onError(ex);
}
});
if (dispose != null) {
dispose.accept(d);
}
return result;
}
请注意,尽管此设置现在非常热门,但您必须再次调用 getSomeData()
才能向同一端点发出新请求并获得新结果。
我终于通过使用 Single.cache()
和订阅虚拟观察者解决了这个问题。这样,无论 API 客户端是否实际订阅了返回的可观察对象,都会执行请求。代码如下所示:
public Single<Response> getSomeData(String endpoint)
{
Single<Response> res = Single.fromCallable(makeNewRequest(endpoint))
.subscribeOn(Schedulers.io()) // if needed
.cache();
res.subscribe(dummyObserver);
return res;
}
private static final SingleObserver<Response> dummyObserver = new SingleObserver<Response>() {
public void onSubscribe(Disposable d) { }
public void onSuccess(Response s) { }
public void onError(Throwable e) { }
};
更新:
这是另一种不需要虚拟观察者对象的方法:
public Single<Response> getSomeData(String endpoint)
{
Single<Response> res = Single.fromCallable(makeNewRequest(endpoint))
.subscribeOn(Schedulers.io()); // if needed
SingleSubject subject = SingleSubject.create();
res.subscribe(subject);
return subject;
}