在转发时发出进度项导致单个可观察
Emitting progress items while forwarding result in a single observable
我正在通过 REST API 分两步启动操作:
- 开始操作并return一个任务id
- 使用给定的 ID 轮询任务并在操作 return 秒完成时完成序列。
轮询任务 ID 将 return 一个 202,表示操作仍在进行中,完成时为 200。任何其他代码都是错误的。
我需要向订阅者传达每个步骤的响应。
以前,我会让 do 运算符将步骤之间的响应推送到 ReplaySubject。
startReboot()
.do(onNext: { response in
operationStatus.next(response)
)
.flatMap({ response in
// If we could not get the task ID from the response we error
guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError) }
return Observable.just(taskID)
})
.flatMap({ taskID in
return pollTask(withID: taskID) // internally, it uses retryWhen to check the api again with a five second delay
})
.do(onNext: { response in
operationStatus.next(response)
})
.subscribe(onError: { _ in
showOperationFailedIcon()
}, onCompleted: {
showOperationCompleteIcon()
})
在其他地方,该主题的订阅者会执行以下操作:
operationStatus.subscribe(onNext: { response
showResponse(response)
})
所以基本上我是在展示操作的进度以及我们同时从每个步骤获得的响应。
当时我还不熟悉 Rx 来想出一个更干净的解决方案。但是现在我已经熟悉了它,在我看来应该有一个解决方案,我们不使用副作用并将其包含在一个最终的可观察对象中。不过,我还是找不到办法。
我在想这样的事情:
let opObs = startReboot()
let pollingObs = pollTask(/* where does the id come from? */)
Observable.concat(opObs, pollingObs)
.subscribe(onNext : { response in
showResponse(response)
}, onError: { _ in
showOperationFailedIcon()
}, onCompleted: {
showOperationCompleteIcon()
})
但这意味着一旦 opObs 完成,我需要将任务 ID 保存在 monad 之外的变量中,并包装 pollingObs 以在它启动时获取它——再次引入副作用。
是否有一个运算符或运算符组合可用于将每个步骤的响应发送给订阅者并将其传递给另一个可观察对象?
像这样的东西应该有用。请注意使用 share()
以避免触发两次 startReboot 序列。
let opObs = startReboot().share()
let pollingObs = opObs.flatMap {
guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError)
return pollTask(withID: taskID)
}
Observable.concat(opObs, pollingObs)
.subscribe(onNext : { response in
showResponse(response)
}, onError: { _ in
showOperationFailedIcon()
}, onCompleted: {
showOperationCompleteIcon()
})
我正在通过 REST API 分两步启动操作:
- 开始操作并return一个任务id
- 使用给定的 ID 轮询任务并在操作 return 秒完成时完成序列。
轮询任务 ID 将 return 一个 202,表示操作仍在进行中,完成时为 200。任何其他代码都是错误的。
我需要向订阅者传达每个步骤的响应。
以前,我会让 do 运算符将步骤之间的响应推送到 ReplaySubject。
startReboot()
.do(onNext: { response in
operationStatus.next(response)
)
.flatMap({ response in
// If we could not get the task ID from the response we error
guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError) }
return Observable.just(taskID)
})
.flatMap({ taskID in
return pollTask(withID: taskID) // internally, it uses retryWhen to check the api again with a five second delay
})
.do(onNext: { response in
operationStatus.next(response)
})
.subscribe(onError: { _ in
showOperationFailedIcon()
}, onCompleted: {
showOperationCompleteIcon()
})
在其他地方,该主题的订阅者会执行以下操作:
operationStatus.subscribe(onNext: { response
showResponse(response)
})
所以基本上我是在展示操作的进度以及我们同时从每个步骤获得的响应。
当时我还不熟悉 Rx 来想出一个更干净的解决方案。但是现在我已经熟悉了它,在我看来应该有一个解决方案,我们不使用副作用并将其包含在一个最终的可观察对象中。不过,我还是找不到办法。
我在想这样的事情:
let opObs = startReboot()
let pollingObs = pollTask(/* where does the id come from? */)
Observable.concat(opObs, pollingObs)
.subscribe(onNext : { response in
showResponse(response)
}, onError: { _ in
showOperationFailedIcon()
}, onCompleted: {
showOperationCompleteIcon()
})
但这意味着一旦 opObs 完成,我需要将任务 ID 保存在 monad 之外的变量中,并包装 pollingObs 以在它启动时获取它——再次引入副作用。
是否有一个运算符或运算符组合可用于将每个步骤的响应发送给订阅者并将其传递给另一个可观察对象?
像这样的东西应该有用。请注意使用 share()
以避免触发两次 startReboot 序列。
let opObs = startReboot().share()
let pollingObs = opObs.flatMap {
guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError)
return pollTask(withID: taskID)
}
Observable.concat(opObs, pollingObs)
.subscribe(onNext : { response in
showResponse(response)
}, onError: { _ in
showOperationFailedIcon()
}, onCompleted: {
showOperationCompleteIcon()
})