RxJava 主题缓存结果重试
RxJava Subject Cache Result With Retry
我有 Observable<FeaturedItemList> getFeatured()
每次打开页面时都会调用它。从同一页面上的两个不同组件调用此函数。由于它是从网络检索的,所以我将其缓存并使其可与 ReplaySubject
.
共享
public Observable<FeaturedItemList> getFeatured() {
if(mFeaturedReplaySubject == null) {
mFeaturedReplaySubject = ReplaySubject.create();
getFromNetwork().subscribe(mFeaturedReplaySubject);
}
return mFeaturedReplaySubject;
}
然后我意识到,当请求由于某些原因失败时,如果用户返回该页面,除非用户终止应用程序,否则它不会显示任何结果。所以我决定有一些重试逻辑。这是我的做法:
public Observable<FeaturedItemList> getFeatured() {
synchronized (this) {
if (mFeaturedReplaySubject == null) {
mFeaturedReplaySubject = ReplaySubject.create();
getFromNetwork().subscribe(mFeaturedReplaySubject);
return mFeaturedReplaySubject;
} else {
return mFeaturedReplaySubject.onErrorResumeNext(throwable -> {
mFeaturedReplaySubject = null;
return getFeatured();
});
}
}
}
虽然这行得通,但恐怕我在这里做的事情不太好,因为这种方法无法涵盖这种情况。
有没有更好的方法?
同样为了分享使用主题的可观察性,我在某处读到我可以使用 connect()
、publish()
和 share()
,但我不确定如何使用它。
密码
private Observable<FeaturedItemList> mFeatured =
// initialized on construction
getFromNetwork()
.retry(3) // number of times to retry
.cache();
public Observable<FeaturedItemList> getFeatured() {
return mFeatured;
}
说明
网络来源
您的 getFromNetwork()
函数应 return 定期观察,它应该在每次订阅时访问网络。 调用时不接入网络。例如:
Future<FeaturedItemList> makeAsyncNetworkRequest() {
... initiate network request here ...
}
Observable<FeaturedItemList> getFromNetwork() {
return Observable.fromCallable(this::makeAsyncNetworkRequest)
.flatMap(Observable::fromFuture);
}
重试
有一个 .retryXxx()
运算符家族,它们仅在出现错误时被激活。根据各种条件,他们要么重新订阅源代码,要么将错误传递给下线。 如果没有错误,这些运算符什么都不做。我在我的示例中使用了简单的 retry(count)
,它会立即重试指定次数。您可以使用 retryWhen()
添加延迟或任何复杂的逻辑(有关示例,请参见 here and here)。
缓存
cache()
操作员记录事件的顺序并将其重播给所有新订阅者。不好的是它不可刷新。它永远存储上游的结果,无论是成功还是错误,并且永远不会重试。
替代cache()
replay().refCount()
向所有现有订阅者重播事件,但一旦所有订阅者取消订阅(或完成)就忘记一切。它将在新订阅者到达时重新订阅 getFromNetwork()
(当然会重试错误)。
提到但不需要的运算符
share()
是 publish().refCount()
的 shorthand。它允许多个并发订阅者共享一个订阅,即对 subscribe()
进行一次调用,而不是为每个订阅者调用。 cache()
和 replay().refCount()
都包含相同的功能。
我有 Observable<FeaturedItemList> getFeatured()
每次打开页面时都会调用它。从同一页面上的两个不同组件调用此函数。由于它是从网络检索的,所以我将其缓存并使其可与 ReplaySubject
.
public Observable<FeaturedItemList> getFeatured() {
if(mFeaturedReplaySubject == null) {
mFeaturedReplaySubject = ReplaySubject.create();
getFromNetwork().subscribe(mFeaturedReplaySubject);
}
return mFeaturedReplaySubject;
}
然后我意识到,当请求由于某些原因失败时,如果用户返回该页面,除非用户终止应用程序,否则它不会显示任何结果。所以我决定有一些重试逻辑。这是我的做法:
public Observable<FeaturedItemList> getFeatured() {
synchronized (this) {
if (mFeaturedReplaySubject == null) {
mFeaturedReplaySubject = ReplaySubject.create();
getFromNetwork().subscribe(mFeaturedReplaySubject);
return mFeaturedReplaySubject;
} else {
return mFeaturedReplaySubject.onErrorResumeNext(throwable -> {
mFeaturedReplaySubject = null;
return getFeatured();
});
}
}
}
虽然这行得通,但恐怕我在这里做的事情不太好,因为这种方法无法涵盖这种情况。 有没有更好的方法?
同样为了分享使用主题的可观察性,我在某处读到我可以使用 connect()
、publish()
和 share()
,但我不确定如何使用它。
密码
private Observable<FeaturedItemList> mFeatured =
// initialized on construction
getFromNetwork()
.retry(3) // number of times to retry
.cache();
public Observable<FeaturedItemList> getFeatured() {
return mFeatured;
}
说明
网络来源
您的 getFromNetwork()
函数应 return 定期观察,它应该在每次订阅时访问网络。 调用时不接入网络。例如:
Future<FeaturedItemList> makeAsyncNetworkRequest() {
... initiate network request here ...
}
Observable<FeaturedItemList> getFromNetwork() {
return Observable.fromCallable(this::makeAsyncNetworkRequest)
.flatMap(Observable::fromFuture);
}
重试
有一个 .retryXxx()
运算符家族,它们仅在出现错误时被激活。根据各种条件,他们要么重新订阅源代码,要么将错误传递给下线。 如果没有错误,这些运算符什么都不做。我在我的示例中使用了简单的 retry(count)
,它会立即重试指定次数。您可以使用 retryWhen()
添加延迟或任何复杂的逻辑(有关示例,请参见 here and here)。
缓存
cache()
操作员记录事件的顺序并将其重播给所有新订阅者。不好的是它不可刷新。它永远存储上游的结果,无论是成功还是错误,并且永远不会重试。
替代cache()
replay().refCount()
向所有现有订阅者重播事件,但一旦所有订阅者取消订阅(或完成)就忘记一切。它将在新订阅者到达时重新订阅 getFromNetwork()
(当然会重试错误)。
提到但不需要的运算符
share()
是 publish().refCount()
的 shorthand。它允许多个并发订阅者共享一个订阅,即对 subscribe()
进行一次调用,而不是为每个订阅者调用。 cache()
和 replay().refCount()
都包含相同的功能。