RxJava:仅当第一个 observable 抛出错误并从第一个开始重复时才执行第二个 observable

RxJava: Execute second observables only if first one throws an error and repeat from the first

我正在使用 retorift 来点击 getAricle api 并获取与用户相关的文章列表。如果传递的令牌已过期,getArticle api 将抛出错误,如果是这样,我必须调用 refreshToken api 来获取新令牌,然后我必须再次调用 getArticle api

 ApiController.createRx().getArticle(token)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ response -> toast(response.body().url) }, { e ->
                println(e.printStackTrace())
                if(e is HttpException && e.code() in  arrayOf(401,403)){                      
                   //Here I want to call refresh tolken api
                   toast("Auth error")
                }
                else
                   toast(R.string.something_went_wrong)
            })

编辑

尽管给出的答案显示了一些方向,但这些并不是我问题的直接答案。这就是解决问题的方法,但我觉得可以将其重构为更好的代码

ApiController.createRx().getArticle(Preference.getToken())
            .flatMap { value ->
                if (value.code() in arrayOf(403, 401)) {
                    ApiController.refreshToken()
                    ApiController.createRx().getArticle(Preference.getToken())
                } else Observable.just(value)
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ response -> println("Success") }, { e ->
                e.printStackTrace()
                toast(R.string.something_went_wrong)
            })



fun refreshToken() {
        val token:String?=ApiController.createRx().refreshToken(Preferences.getRefreshToken()).blockingFirst()?.body()?.token
        if (token != null) Preferences.setAuthToken(token)
    }

编辑

我将我的代码重构为更简洁的版本

Observable.defer { ApiController.createRx().getArticle(Preferences.getToken()) }
            .flatMap {
                if (it.code() in arrayOf(401, 403)) {
                    ApiController.refreshToken()
                    Observable.error(Throwable())
                } else Observable.just(it)
            }
            .retry(1)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({println("Success") }, {
              it.printStackTrace()
              toast(R.string.something_went_wrong)
            })



 fun refreshToken() {
        var token: String? = null
        try {
            token = createRx().refreshToken(Preferences.getRefreshToken()).blockingFirst().body()!!.token
        } catch (e: Exception) {
            throw e
        }
        println("saving token")
        if (token != null) Preferences.setAuthToken(token)
    }

编辑

请检查我的答案以获得最终的重构代码

我已经实现了这件事。这是该代码的稍微修改后的版本:

private Observable<Object> refreshTokenIfNotAuthorized(Observable<? extends Throwable> errors) {
    final AtomicBoolean alreadyRetried = new AtomicBoolean(false);

    return errors.flatMap(error -> {

        boolean isAuthorizationError = /* some logic analyzing each error*/ ;

        if (isAuthorizationError && !alreadyRetried.get()) {
            try {
                alreadyRetried.set(true);
                String newToken = federatedTokenRefresher.refreshToken()
                                                         .toBlocking()
                                                         .first();

                setLogin(newToken);
                return Observable.just(null);

            } catch (Exception e) {
                return Observable.error(error);
            }

        }
        return Observable.error(error);
    });
}

你可以像这样使用这个方法:

doSomethingRequiringAuth().retryWhen(this::refreshTokenIfNotAuthorized);

您会收到什么样的错误?。您似乎可以使用 onErrorResumeNext 运算符。

这个运算符一旦接收到一个 throwable,就允许你return一个 Observable 而不是 onError

中的 throwable
@Test
    public void observableOnErrorResumeException() {
        Integer[] numbers = {0, 1, 2, 3, 4, 5};

        Observable.from(numbers)
                .doOnNext(number -> {
                    if (number > 3) {
                        try {
                            throw new IllegalArgumentException();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }

                })
                .onErrorResumeNext(t -> Observable.just(666))
                .subscribe(System.out::println);

    }

您可以在此处查看更多示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

我会使用 groupBy 运算符给你另一个选择

/**
 * In this example we create a response code group.
 */
@Test
public void testGroupByCode() {
    Observable.from(Arrays.asList(401,403, 200))
            .groupBy(code -> code)
            .subscribe(groupByCode -> {
                switch (groupByCode.getKey()) {
                    case 401: {
                        System.out.println("refresh token");
                        processResponse(groupByCode);
                        break;
                    }
                    case 403: {
                        System.out.println("refresh token");
                        processResponse(groupByCode);
                        break;
                    }
                    default: {
                        System.out.println("Do the toast");
                        processResponse(groupByCode);
                    }
                }
            });
}

private void processResponse(GroupedObservable<Integer, Integer> groupByCode) {
    groupByCode.asObservable().subscribe(value -> System.out.println("Response code:" + value));
}

我在阅读了更多有关 RxJava 的信息后解决了我的问题,这就是我实现它的方式。 首先 retrofit 会向 onErroronNext\onSuccess 抛出 4xx 错误取决于我们如何定义它。 例如:

@GET("content") fun getArticle(@Header("Authorization") token: String):Single<Article>

这会将所有 4xx 错误抛给 onError 而不是 Single<Article> 如果您将其定义为 Single<Response<Article>> 那么来自服务器的所有响应包括 4xx 将转到 onNext\onSuccess

Single.defer { ApiController.createRx().getArticle(Preferences.getAuthToken())}
                .doOnError {
                    if (it is HttpException && it.code() == 401)
                        ApiController.refreshToken()
                }
                .retry { attempts, error -> attempts < 3 && error is HttpException && error.code() == 401 }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({println("Success") }, {
                  it.printStackTrace()
                  toast(R.string.something_went_wrong)
                })

我使用 defer 作为我实际 Observable 的包装器,因为我想在令牌刷新后重试时重新创建可观察到的文章提取,因为我希望再次调用 Preferences.getAuthToken()因为我的刷新令牌代码优先存储新获取的令牌。

retry returns 如果 HttpException 为 401 且未尝试重试超过 2 次则为真