如何在 RxJava 中制作冷单 'hot'

How to make a cold Single 'hot' in RxJava

我有一些代码可以发出网络请求,每个代码 returns 一个 Single

public Single<Response> getSomeData(String endpoint)    
{
    return Single.fromCallable(makeNewRequest(endpoint));
}

只要调用者实际 subscribes() 到返回的 Single,这就有效。但是在某些特定情况下(对于 POST 请求),调用者对结果不感兴趣,并且从不调用 subscribe().

由于fromCallable()将执行推迟到订阅时间,这意味着请求实际上不会被执行。

问题:将此(冷)Single 转换为热的正确(惯用)方法是什么,以便立即执行网络请求,而不管调用者是否调用 subscribe() ?

我会说你需要 publish operator combined with connect. Before that, you'll need to convert the Single into an Observable with toObservable()

此外,请注意,如果调用者对实际结果不感兴趣,Completable 套件可能更好。您可以使用 toCompletable()

Single 转换为 Completable

您可以使用 SingleSubject 缓存任何相关方的响应,但请注意,您必须以某种方式向此 API 调用添加计划和可能的取消:

public Single<Response> getSomeData(String endpoint, 
        Scheduler scheduler, Consumer<? super Disposable> dispose) {

    Callable<Response> requestor = makeNewRequest(endpoint);
    SingleSubject<Response> result = SingleSubject.create();
    Disposable d = scheduler.scheduleDirect(() -> {
         try {
             result.onSuccess(requestor.call());
         } catch (Exception ex) {
             result.onError(ex);
         }
    });
    if (dispose != null) {
        dispose.accept(d);
    }
    return result;
}

请注意,尽管此设置现在非常热门,但您必须再次调用 getSomeData() 才能向同一端点发出新请求并获得新结果。

我终于通过使用 Single.cache() 和订阅虚拟观察者解决了这个问题。这样,无论 API 客户端是否实际订阅了返回的可观察对象,都会执行请求。代码如下所示:

public Single<Response> getSomeData(String endpoint)    
{
    Single<Response> res = Single.fromCallable(makeNewRequest(endpoint))
        .subscribeOn(Schedulers.io()) // if needed
        .cache();
    res.subscribe(dummyObserver);
    return res;
}

private static final SingleObserver<Response> dummyObserver = new SingleObserver<Response>() {
    public void onSubscribe(Disposable d) { }
    public void onSuccess(Response s) { }
    public void onError(Throwable e) { }
};

更新:

这是另一种不需要虚拟观察者对象的方法:

public Single<Response> getSomeData(String endpoint)    
{
    Single<Response> res = Single.fromCallable(makeNewRequest(endpoint))
        .subscribeOn(Schedulers.io()); // if needed

    SingleSubject subject = SingleSubject.create(); 
    res.subscribe(subject);
    return subject;
}