RxJava 主题缓存结果重试

RxJava Subject Cache Result With Retry

我有 Observable<FeaturedItemList> getFeatured() 每次打开页面时都会调用它。从同一页面上的两个不同组件调用此函数。由于它是从网络检索的,所以我将其缓存并使其可与 ReplaySubject.

共享
public Observable<FeaturedItemList> getFeatured() {
    if(mFeaturedReplaySubject == null) {
        mFeaturedReplaySubject = ReplaySubject.create();
        getFromNetwork().subscribe(mFeaturedReplaySubject);
    }

    return mFeaturedReplaySubject;
}

然后我意识到,当请求由于某些原因失败时,如果用户返回该页面,除非用户终止应用程序,否则它不会显示任何结果。所以我决定有一些重试逻辑。这是我的做法:

public Observable<FeaturedItemList> getFeatured() {
    synchronized (this) {
        if (mFeaturedReplaySubject == null) {
            mFeaturedReplaySubject = ReplaySubject.create();
            getFromNetwork().subscribe(mFeaturedReplaySubject);

            return mFeaturedReplaySubject;
        } else {
            return mFeaturedReplaySubject.onErrorResumeNext(throwable -> {
                mFeaturedReplaySubject = null;
                return getFeatured();
            });
        }
    }
}

虽然这行得通,但恐怕我在这里做的事情不太好,因为这种方法无法涵盖这种情况。 有没有更好的方法?

同样为了分享使用主题的可观察性,我在某处读到我可以使用 connect()publish()share(),但我不确定如何使用它。

密码

private Observable<FeaturedItemList> mFeatured =
    // initialized on construction
    getFromNetwork()
        .retry(3) // number of times to retry
        .cache();

public Observable<FeaturedItemList> getFeatured() {
    return mFeatured;
}

说明

网络来源

您的 getFromNetwork() 函数应 return 定期观察,它应该在每次订阅时访问网络。 调用时不接入网络。例如:

Future<FeaturedItemList> makeAsyncNetworkRequest() {
    ... initiate network request here ...
}

Observable<FeaturedItemList> getFromNetwork() {
    return Observable.fromCallable(this::makeAsyncNetworkRequest)
        .flatMap(Observable::fromFuture);
}

重试

有一个 .retryXxx() 运算符家族,它们仅在出现错误时被激活。根据各种条件,他们要么重新订阅源代码,要么将错误传递给下线。 如果没有错误,这些运算符什么都不做。我在我的示例中使用了简单的 retry(count),它会立即重试指定次数。您可以使用 retryWhen() 添加延迟或任何复杂的逻辑(有关示例,请参见 here and here)。

缓存

cache() 操作员记录事件的顺序并将其重播给所有新订阅者。不好的是它不可刷新。它永远存储上游的结果,无论是成功还是错误,并且永远不会重试。

替代cache()

replay().refCount() 向所有现有订阅者重播事件,但一旦所有订阅者取消订阅(或完成)就忘记一切。它将在新订阅者到达时重新订阅 getFromNetwork()(当然会重试错误)。

提到但不需要的运算符

share()publish().refCount() 的 shorthand。它允许多个并发订阅者共享一个订阅,即对 subscribe() 进行一次调用,而不是为每个订阅者调用。 cache()replay().refCount() 都包含相同的功能。