使用 RXJava 处理缓存
Cache handling with RXJava
我正在尝试使用 rxJava 实现此工作流,但我确定我是否误用或做错了什么。
- 用户要求登录
- 如果 loginResult 在缓存中可用,则 "emit" 缓存的 LoginResult
- 否则实际执行对 web 服务的请求,如果一切成功则缓存结果
- 如果发生错误最多重试 3 次,如果发生第 4 次则清除缓存。
这是我的完整代码片段。
public class LoginTask extends BaseBackground<LoginResult> {
private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
private XMLRPCClient xmlrpcClient;
private UserCredentialsHolder userCredentialsHolder;
@Inject
public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
this.xmlrpcClient = client;
this.userCredentialsHolder = userCredentialsHolder;
}
@Override
public LoginResult performRequest() throws Exception {
return UserApi.login(
xmlrpcClient,
userCredentialsHolder.getUserName(),
userCredentialsHolder.getPlainPassword());
}
@Override
public Observable<LoginResult> getObservable() {
return cachedLoginResult.getObservable()
.onErrorResumeNext(
Observable.create(
((Observable.OnSubscribe<LoginResult>) subscriber -> {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(performRequest()); // actually performRequest
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
})
)
.doOnNext(cachedLoginResult::setLoginResult)
.retry((attempts, t) -> attempts < 3)
.doOnError(throwable -> cachedLoginResult.purgeCache())
);
}
private static class CachedLoginResult {
private LoginResult lr = null;
private long when = 0;
private CachedLoginResult() {
}
public boolean hasCache() {
return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
}
public void setLoginResult(LoginResult lr) {
if (lr != null) {
this.lr = lr;
this.when = System.currentTimeMillis();
}
}
public void purgeCache() {
this.lr = null;
this.when = 0;
}
public Observable<LoginResult> getObservable() {
return Observable.create(new Observable.OnSubscribe<LoginResult>() {
@Override
public void call(Subscriber<? super LoginResult> subscriber) {
if (!subscriber.isUnsubscribed()) {
if (hasCache()) {
subscriber.onNext(lr);
subscriber.onCompleted();
} else {
subscriber.onError(new RuntimeException("No cache"));
}
}
}
});
}
}
}
因为我找不到任何类似的例子,而且我在 1 天前才开始使用 rxjava "playing",所以我不确定我的实现。
感谢您的宝贵时间。
我觉得这段代码没问题,干得好:)
您在 LoginTask
中使用 Observable.create
是正确的,否则调用的结果可能会在内部缓存,然后 retry
将无济于事...
我认为这对于 CachedLoginResult
的 Observable
来说是不必要的。在这里,您可以使用 Observable.just
和 Observable.error
实用方法来简化代码,例如:
public Observable<LoginResult> getObservable() {
if (hasCache()) {
return Observable.just(lr);
} else {
return Observable.error(new RuntimeException("No cache"));
}
}
注意:just
存储您告诉它在内部发出的值,因此重新订阅将始终产生该值。这就是我在上面暗示的,例如,你不应该做 Observable.just(performRequest()).retry(3)
,因为 performRequest
只会被调用一次。
如果我没理解错的话,你想执行一次登录并以反应方式缓存结果?如果是这样,这是我将如何执行此操作的示例:
import java.util.concurrent.ThreadLocalRandom;
import rx.*;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
public class CachingLogin {
static class LoginResult {
}
/** Guarded by this. */
AsyncSubject<LoginResult> cache;
public Observable<LoginResult> login(String username, String password) {
AsyncSubject<LoginResult> c;
boolean doLogin = false;
synchronized (this) {
if (cache == null || cache.hasThrowable()) {
cache = AsyncSubject.create();
doLogin = true;
}
c = cache;
}
if (doLogin) {
Observable.just(1).subscribeOn(Schedulers.io())
.map(v -> loginAPI(username, password))
.retry(3).subscribe(c);
}
return c;
}
public void purgeCache() {
synchronized (this) {
cache = null;
}
}
static LoginResult loginAPI(String username, String password) {
if (ThreadLocalRandom.current().nextDouble() < 0.3) {
throw new RuntimeException("Failed");
}
return new LoginResult();
}
}
我正在尝试使用 rxJava 实现此工作流,但我确定我是否误用或做错了什么。
- 用户要求登录
- 如果 loginResult 在缓存中可用,则 "emit" 缓存的 LoginResult
- 否则实际执行对 web 服务的请求,如果一切成功则缓存结果
- 如果发生错误最多重试 3 次,如果发生第 4 次则清除缓存。
这是我的完整代码片段。
public class LoginTask extends BaseBackground<LoginResult> {
private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
private XMLRPCClient xmlrpcClient;
private UserCredentialsHolder userCredentialsHolder;
@Inject
public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
this.xmlrpcClient = client;
this.userCredentialsHolder = userCredentialsHolder;
}
@Override
public LoginResult performRequest() throws Exception {
return UserApi.login(
xmlrpcClient,
userCredentialsHolder.getUserName(),
userCredentialsHolder.getPlainPassword());
}
@Override
public Observable<LoginResult> getObservable() {
return cachedLoginResult.getObservable()
.onErrorResumeNext(
Observable.create(
((Observable.OnSubscribe<LoginResult>) subscriber -> {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(performRequest()); // actually performRequest
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
})
)
.doOnNext(cachedLoginResult::setLoginResult)
.retry((attempts, t) -> attempts < 3)
.doOnError(throwable -> cachedLoginResult.purgeCache())
);
}
private static class CachedLoginResult {
private LoginResult lr = null;
private long when = 0;
private CachedLoginResult() {
}
public boolean hasCache() {
return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
}
public void setLoginResult(LoginResult lr) {
if (lr != null) {
this.lr = lr;
this.when = System.currentTimeMillis();
}
}
public void purgeCache() {
this.lr = null;
this.when = 0;
}
public Observable<LoginResult> getObservable() {
return Observable.create(new Observable.OnSubscribe<LoginResult>() {
@Override
public void call(Subscriber<? super LoginResult> subscriber) {
if (!subscriber.isUnsubscribed()) {
if (hasCache()) {
subscriber.onNext(lr);
subscriber.onCompleted();
} else {
subscriber.onError(new RuntimeException("No cache"));
}
}
}
});
}
}
}
因为我找不到任何类似的例子,而且我在 1 天前才开始使用 rxjava "playing",所以我不确定我的实现。
感谢您的宝贵时间。
我觉得这段代码没问题,干得好:)
您在 LoginTask
中使用 Observable.create
是正确的,否则调用的结果可能会在内部缓存,然后 retry
将无济于事...
我认为这对于 CachedLoginResult
的 Observable
来说是不必要的。在这里,您可以使用 Observable.just
和 Observable.error
实用方法来简化代码,例如:
public Observable<LoginResult> getObservable() {
if (hasCache()) {
return Observable.just(lr);
} else {
return Observable.error(new RuntimeException("No cache"));
}
}
注意:just
存储您告诉它在内部发出的值,因此重新订阅将始终产生该值。这就是我在上面暗示的,例如,你不应该做 Observable.just(performRequest()).retry(3)
,因为 performRequest
只会被调用一次。
如果我没理解错的话,你想执行一次登录并以反应方式缓存结果?如果是这样,这是我将如何执行此操作的示例:
import java.util.concurrent.ThreadLocalRandom;
import rx.*;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
public class CachingLogin {
static class LoginResult {
}
/** Guarded by this. */
AsyncSubject<LoginResult> cache;
public Observable<LoginResult> login(String username, String password) {
AsyncSubject<LoginResult> c;
boolean doLogin = false;
synchronized (this) {
if (cache == null || cache.hasThrowable()) {
cache = AsyncSubject.create();
doLogin = true;
}
c = cache;
}
if (doLogin) {
Observable.just(1).subscribeOn(Schedulers.io())
.map(v -> loginAPI(username, password))
.retry(3).subscribe(c);
}
return c;
}
public void purgeCache() {
synchronized (this) {
cache = null;
}
}
static LoginResult loginAPI(String username, String password) {
if (ThreadLocalRandom.current().nextDouble() < 0.3) {
throw new RuntimeException("Failed");
}
return new LoginResult();
}
}