IO 上的 RxJava 竞争条件

RxJava race condition on IO

问题

当 API 调用之一由于缺少授权令牌而以失败告终时,我正在尝试注销我的用户。但是我无法设法将 Schedulers.io() 上的操作与 SharedPrefs.commit() 上的操作同步。所有呼叫都是同时进行的,因此所有呼叫都同时尝试注销。 我想要实现的是只有第一个触发注销,剩下的两个要注意,它发生了并且根本不触发任何东西。

建设

我有 3 个 API 调用使用令牌自动调整。 令牌保存在 SharedPrefs 内。 所有 API 调用都是在 Schedulers.io() 上进行的。 当调用令牌刷新失败时,我正在尝试使用注销方法从 SharedPrefs 中删除数据。

注销方法代码如下:

 Flowable.fromCallable { isUserLogged() }
            .filter { it }
            .flatMap { logoutUseCase.execute() }
            .doOnComplete { showLogin() }
            .subscribeOn(Schedulers.single())
            .blockingSubscribe()

此方法在 Authenticator 内部调用 class 连接到 OkHttpClient 使用 Retrofit

您可以使用 Observable.amb() 运算符来解决竞争。

如果每个 API 调用在没有身份验证令牌时发出错误,那么您可以这样组织它们:

Observable.amb( apiCall1Observable, apiCall2Observable, apiCall3Observable )
  .observeOn(Schedulers.io())
  .doOnError( error -> {
    logoutUseCase.execute()
   } )
  .subscribe( value -> {
   },
   error -> {
   });

amb() 运算符将select 第一个发出值或终止事件。这将触发只会执行一次的 logoutUseCase.execute()

关键是重构您个人 API 调用的状态,将关于何时何地注销的决定推向更高级别。

我设法以非常简单的方式解决了我的问题。 由于 Authenticator 仅在 -

时触发

authentication challenge from either a remote web server or a proxy server

发生,因此我可以同步负责刷新令牌的代码部分。这样,每当发生刷新令牌失败时,logout() 方法将被同步并且每个调用中的数据都会更新。

代码

override fun authenticate(route: Route, response: Response): Request? {
    synchronized(this, {
      ###refresh token code###
    })
}