轮询状态更新的长任务
Polling for long task for status update
我有一个需要执行以下操作的用例
调用一个 POST 端点来启动一个进程并给出一个状态
称为 "processing"。假设我们有 POST /accounts
开始创建帐户资源。
调用 GET 端点 ( GET /accounts/{id}) 这将提供
帐户资源的状态。假设只有两个
状态 --> "processing" 和 "completed"。我需要继续投票
GET 端点,直到资源状态更改为
"completed"
一旦 GET /accounts/{id} returns 状态完成,我需要
做 return 完成的资源。
我的主要问题是如何使用长轮询在 rxjava 中执行此操作。我在这里查看了一些链接
- https://github.com/ReactiveX/RxJava/issues/3482
- https://github.com/ReactiveX/RxJava/issues/448
在这些例子中我无法理解的是..如何在匹配特定谓词后取消订阅..ie。 GET /accounts/{id} 的状态已完成,获取并结束订阅。
非常感谢任何帮助。
您可以使用 retryWhen()
如下所示的内容。
考虑到我们有
private Single<Account> createAccount() // API call to create account
private Single<Account> getAccount(int id) // API call to get account info
获取账户信息时,如果状态为"processing",我们抛出错误,这将触发retryWhen()。下面的代码将每 5 秒重试一次,直到状态变为 "completed"。只有 getAccount() 流将再次重试。
createAccount()
.flatMap(
createdAcc -> getAccount(accountId)
.flatMap(
status -> status == "procesing" ?
Single.error(new Throwable()) :
Single.just(status)
)
.doOnError(throwable -> Log.e("", "retrying"))
.retryWhen(errors -> errors.flatMapSingle(error -> Single.timer(5, TimeUnit.SECONDS)))
)
.subscribe();
我有一个需要执行以下操作的用例
调用一个 POST 端点来启动一个进程并给出一个状态 称为 "processing"。假设我们有 POST /accounts 开始创建帐户资源。
调用 GET 端点 ( GET /accounts/{id}) 这将提供 帐户资源的状态。假设只有两个 状态 --> "processing" 和 "completed"。我需要继续投票 GET 端点,直到资源状态更改为 "completed"
一旦 GET /accounts/{id} returns 状态完成,我需要 做 return 完成的资源。
我的主要问题是如何使用长轮询在 rxjava 中执行此操作。我在这里查看了一些链接
- https://github.com/ReactiveX/RxJava/issues/3482
- https://github.com/ReactiveX/RxJava/issues/448 在这些例子中我无法理解的是..如何在匹配特定谓词后取消订阅..ie。 GET /accounts/{id} 的状态已完成,获取并结束订阅。
非常感谢任何帮助。
您可以使用 retryWhen()
如下所示的内容。
考虑到我们有
private Single<Account> createAccount() // API call to create account
private Single<Account> getAccount(int id) // API call to get account info
获取账户信息时,如果状态为"processing",我们抛出错误,这将触发retryWhen()。下面的代码将每 5 秒重试一次,直到状态变为 "completed"。只有 getAccount() 流将再次重试。
createAccount()
.flatMap(
createdAcc -> getAccount(accountId)
.flatMap(
status -> status == "procesing" ?
Single.error(new Throwable()) :
Single.just(status)
)
.doOnError(throwable -> Log.e("", "retrying"))
.retryWhen(errors -> errors.flatMapSingle(error -> Single.timer(5, TimeUnit.SECONDS)))
)
.subscribe();