如何在 RxJava 2 中重播订阅者?

How to replay a subscriber in RxJava 2?

我正在 rxJava2 中寻找一种方法,在执行操作后我可以再次调用订阅者。我具体说一下我的意图。

我正在使用 retrofit2 进行联网,但这并不重要。这个问题是关于 rxJava 的。我想在刷新令牌后重试 api 调用。我需要设置的是,每次我进行任何 api 调用时,令牌都可能过期,我有责任调用另一个 api 调用来刷新令牌。令牌后 刷新后,我需要再次 调用 具有过期令牌的相同 api 调用。所以对于用户来说,它会无缝地刷新令牌。

这是我用于所有网络呼叫的默认订阅:

public class DefaultSubscriber<T> implements Observer<T> {

    Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Timber.d("subscribing called now ...");
        disposable = d;
    }

    @Override
    public void onNext(@NonNull T t) {
        Timber.d("onNext called now ..."+t);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        Timber.e(e);
        Timber.d("onError called now ...");
        if (e instanceof HttpException) {
            //todo analytics can go here
            // We had non-2XX http error
            HttpException exception = (HttpException) e;
            if (exception.code() == 401){
              //token expired here, handle it
              //so i'll call another api here (to refresh token)but then i need to replay this subscriber , how ?
            }

        }
        if (e instanceof IOException) {
            // A network or conversion error happened
        }

    }

    @Override
    public void onComplete() {
        Timber.d("onComplete called now ...");
    }

    public void unsubscribe() {
        Timber.d("Un-subscribed called now ..");
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }
}

没有办法重播订阅吗?

您可以使用 retryWhen() 运算符重新订阅。

networkApiObservable
  .retryWhen( errorObservable -> 
    errorObservable.flatMap( error ->
     {
       if ( error.equals(EXPIRED_API_TOKEN) ) {
         return getNetworkApiTokenObservable();
       }
       return Observable.error( error );
     }
  .subscribe();

我假设 getNetworkApiTokenObservable() returns 一个可观察对象,当新的 API 令牌可用于 networkApiObservable 源时发出。