使用 RXJava 处理缓存

Cache handling with RXJava

我正在尝试使用 rxJava 实现此工作流,但我确定我是否误用或做错了什么。

这是我的完整代码片段。

public class LoginTask extends BaseBackground<LoginResult> {
  private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
  private XMLRPCClient xmlrpcClient;
  private UserCredentialsHolder userCredentialsHolder;

  @Inject
  public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
    this.xmlrpcClient = client;
    this.userCredentialsHolder = userCredentialsHolder;
  }

  @Override
  public LoginResult performRequest() throws Exception {
    return UserApi.login(
        xmlrpcClient,
        userCredentialsHolder.getUserName(),
        userCredentialsHolder.getPlainPassword());


  }

  @Override
  public Observable<LoginResult> getObservable() {
    return cachedLoginResult.getObservable()
        .onErrorResumeNext(
            Observable.create(
                ((Observable.OnSubscribe<LoginResult>) subscriber -> {
                  try {
                    if (!subscriber.isUnsubscribed()) {
                      subscriber.onNext(performRequest()); // actually performRequest
                    }
                    subscriber.onCompleted();
                  } catch (Exception e) {
                    subscriber.onError(e);
                  }
                })
            )
                .doOnNext(cachedLoginResult::setLoginResult)
                .retry((attempts, t) -> attempts < 3)
                .doOnError(throwable -> cachedLoginResult.purgeCache())
        );
  }


  private static class CachedLoginResult {
    private LoginResult lr = null;
    private long when = 0;

    private CachedLoginResult() {
    }

    public boolean hasCache() {
      return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
    }

    public void setLoginResult(LoginResult lr) {
      if (lr != null) {
          this.lr = lr;
          this.when = System.currentTimeMillis();
      }
    }

    public void purgeCache() {
      this.lr = null;
      this.when = 0;
    }

    public Observable<LoginResult> getObservable() {
      return Observable.create(new Observable.OnSubscribe<LoginResult>() {
        @Override
        public void call(Subscriber<? super LoginResult> subscriber) {
          if (!subscriber.isUnsubscribed()) {
            if (hasCache()) {
              subscriber.onNext(lr);
              subscriber.onCompleted();
            } else {
              subscriber.onError(new RuntimeException("No cache"));
            }
          }
        }
      });
    }
  }
}

因为我找不到任何类似的例子,而且我在 1 天前才开始使用 rxjava "playing",所以我不确定我的实现。

感谢您的宝贵时间。

我觉得这段代码没问题,干得好:)

您在 LoginTask 中使用 Observable.create 是正确的,否则调用的结果可能会在内部缓存,然后 retry 将无济于事...

我认为这对于 CachedLoginResultObservable 来说是不必要的。在这里,您可以使用 Observable.justObservable.error 实用方法来简化代码,例如:

public Observable<LoginResult> getObservable() {
  if (hasCache()) {
      return Observable.just(lr);
  } else {
      return Observable.error(new RuntimeException("No cache"));
  }
}

注意:just 存储您告诉它在内部发出的值,因此重新订阅将始终产生该值。这就是我在上面暗示的,例如,你不应该做 Observable.just(performRequest()).retry(3),因为 performRequest 只会被调用一次。

如果我没理解错的话,你想执行一次登录并以反应方式缓存结果?如果是这样,这是我将如何执行此操作的示例:

import java.util.concurrent.ThreadLocalRandom;

import rx.*;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;


public class CachingLogin {
    static class LoginResult {

    }
    /** Guarded by this. */
    AsyncSubject<LoginResult> cache;
    public Observable<LoginResult> login(String username, String password) {
        AsyncSubject<LoginResult> c;
        boolean doLogin = false;
        synchronized (this) {
            if (cache == null || cache.hasThrowable()) {
                cache = AsyncSubject.create();
                doLogin = true;
            }
            c = cache;
        }
        if (doLogin) {
            Observable.just(1).subscribeOn(Schedulers.io())
            .map(v -> loginAPI(username, password))
            .retry(3).subscribe(c);
        }
        return c;
    }
    public void purgeCache() {
        synchronized (this) {
            cache = null;
        }
    }
    static LoginResult loginAPI(String username, String password) {
        if (ThreadLocalRandom.current().nextDouble() < 0.3) {
            throw new RuntimeException("Failed");
        }
        return new LoginResult();
    }
}