IO 上的 RxJava 竞争条件
RxJava race condition on IO
问题
当 API 调用之一由于缺少授权令牌而以失败告终时,我正在尝试注销我的用户。但是我无法设法将 Schedulers.io()
上的操作与 SharedPrefs.commit()
上的操作同步。所有呼叫都是同时进行的,因此所有呼叫都同时尝试注销。
我想要实现的是只有第一个触发注销,剩下的两个要注意,它发生了并且根本不触发任何东西。
建设
我有 3 个 API 调用使用令牌自动调整。
令牌保存在 SharedPrefs
内。
所有 API 调用都是在 Schedulers.io()
上进行的。
当调用令牌刷新失败时,我正在尝试使用注销方法从 SharedPrefs
中删除数据。
注销方法代码如下:
Flowable.fromCallable { isUserLogged() }
.filter { it }
.flatMap { logoutUseCase.execute() }
.doOnComplete { showLogin() }
.subscribeOn(Schedulers.single())
.blockingSubscribe()
此方法在 Authenticator
内部调用 class 连接到 OkHttpClient
使用 Retrofit
您可以使用 Observable.amb()
运算符来解决竞争。
如果每个 API 调用在没有身份验证令牌时发出错误,那么您可以这样组织它们:
Observable.amb( apiCall1Observable, apiCall2Observable, apiCall3Observable )
.observeOn(Schedulers.io())
.doOnError( error -> {
logoutUseCase.execute()
} )
.subscribe( value -> {
},
error -> {
});
amb()
运算符将select 第一个发出值或终止事件。这将触发只会执行一次的 logoutUseCase.execute()
。
关键是重构您个人 API 调用的状态,将关于何时何地注销的决定推向更高级别。
我设法以非常简单的方式解决了我的问题。
由于 Authenticator
仅在 -
时触发
authentication challenge from either a remote web server or a proxy server
发生,因此我可以同步负责刷新令牌的代码部分。这样,每当发生刷新令牌失败时,logout()
方法将被同步并且每个调用中的数据都会更新。
代码
override fun authenticate(route: Route, response: Response): Request? {
synchronized(this, {
###refresh token code###
})
}
问题
当 API 调用之一由于缺少授权令牌而以失败告终时,我正在尝试注销我的用户。但是我无法设法将 Schedulers.io()
上的操作与 SharedPrefs.commit()
上的操作同步。所有呼叫都是同时进行的,因此所有呼叫都同时尝试注销。
我想要实现的是只有第一个触发注销,剩下的两个要注意,它发生了并且根本不触发任何东西。
建设
我有 3 个 API 调用使用令牌自动调整。
令牌保存在 SharedPrefs
内。
所有 API 调用都是在 Schedulers.io()
上进行的。
当调用令牌刷新失败时,我正在尝试使用注销方法从 SharedPrefs
中删除数据。
注销方法代码如下:
Flowable.fromCallable { isUserLogged() }
.filter { it }
.flatMap { logoutUseCase.execute() }
.doOnComplete { showLogin() }
.subscribeOn(Schedulers.single())
.blockingSubscribe()
此方法在 Authenticator
内部调用 class 连接到 OkHttpClient
使用 Retrofit
您可以使用 Observable.amb()
运算符来解决竞争。
如果每个 API 调用在没有身份验证令牌时发出错误,那么您可以这样组织它们:
Observable.amb( apiCall1Observable, apiCall2Observable, apiCall3Observable )
.observeOn(Schedulers.io())
.doOnError( error -> {
logoutUseCase.execute()
} )
.subscribe( value -> {
},
error -> {
});
amb()
运算符将select 第一个发出值或终止事件。这将触发只会执行一次的 logoutUseCase.execute()
。
关键是重构您个人 API 调用的状态,将关于何时何地注销的决定推向更高级别。
我设法以非常简单的方式解决了我的问题。
由于 Authenticator
仅在 -
authentication challenge from either a remote web server or a proxy server
发生,因此我可以同步负责刷新令牌的代码部分。这样,每当发生刷新令牌失败时,logout()
方法将被同步并且每个调用中的数据都会更新。
代码
override fun authenticate(route: Route, response: Response): Request? {
synchronized(this, {
###refresh token code###
})
}