实现 retryWhen 逻辑

Implement retryWhen logic

我有一个应用程序需要会话 (cookie) 来处理网络调用。我正在使用 Retrofit+RxJava。但是,会话可能会过期(Retrofit 错误,状态为 401 Unauthorized),在这种情况下,我想重新验证(以获取新的 cookie)并重试之前的调用。我将如何使用 RxJava

我的例子:

getServerApi().getDialogs(offset, getCookies())
     .subscribeOn(Schedulers.newThread())
     .observeOn(AndroidSchedulers.mainThread())
     .retryWhen(observable -> {...}) // Need some logic
     .subscribe(dialogsEnvelope -> getView().setDialogs(dialogsEnvelope),
                throwable -> getView().setError(processFail(throwable)));

虽然 Interceptor 可能是针对此特定问题的更好解决方案,但该问题特别要求使用 retryWhen 的解决方案,因此这是一种解决方法:

retryWhen(new Func1<Observable<Throwable>, Observable<?>>(){

    @Override
    public void Observable<?> call(Observable<Throwable>> attempts) {
        return attempts.flatMap(new Func1<Throwable, Observable<?>>() {

            @Override
            public Observable<?> call(Throwable throwable) {
                 if (throwable instanceof RetrofitError) {
                     RetrofitError retrofitError = (RetrofitError) throwable;
                     if (retrofitError.getKind() == RetrofitError.Kind.HTTP && retrofitError.getResponse().getStatus() == 401) {
                         // this is the error we care about - to trigger a retry we need to emit anything other than onError or onCompleted
                         return Observable.just(new Object());
                     } else {
                         // some other kind of error: just pass it along and don't retry
                         return Observable.error(throwable);
                     }
                 } else {
                     // some other kind of error: just pass it along and don't retry
                     return Observable.error(throwable);
                 }
             }
        });
    }
})

但是,如果是简单的retry,您的getCookies将不会被再次调用。那只会重新订阅相同的 Observable,但 getCookiesObservable 创建之前被调用。所以我认为你必须将源 Observable 的创建包装在 defer.

利用OkHttp的强大Interceptor.

public class RecoverInterceptor implements Interceptor {
  String getAuth() {
    // check if we have auth, if not, authorize
    return "Bearer ...";
  }

  void clearAuth() {
    // clear everything
  }

  @Override public Response intercept(Chain chain) throws IOException {
    final Request request = chain.request();
    if (request.urlString().startsWith("MY ENDPOINT")) {
      final Request signed = request.newBuilder()
          .header("Authorization", getAuth())
          .build();
      final Response response = chain.proceed(signed);
      if (response.code() == 401) {
        clearAuth();
        return intercept(chain);
      } else {
        return response;
      }
    } else {
      return chain.proceed(request);
    }
  }
}

记得同步你的授权过程代码,这样两个并发请求就不会同时调用它。

在浏览互联网寻找正确答案时 - 我发现 this cool gist 描述了如何在 OkHttp Interceptor 的帮助下刷新 OAuth 令牌(类似于已接受的答案,但更完整)。

它与 RxJava 无关,但对我来说它更容易接受,因为我不必用 retryWith 逻辑包装每个 Observable - 一切都在较低级别完成(OkHttp 库).