如何使用 RxJava 协调 Completable 执行列表?
How to coordinate a list of Completable executions with RxJava?
我有一个 BlockingTasks
的列表,我想按顺序 运行:
class BlockingTaskManager {
val tasks = mutableListOf<BlockingTask>()
fun run () : Completable {
/* What can I put here in order to run
all tasks and return a completable according with the
requirements described below? */
}
}
interface BlockingTask {
fun run () : Completable
fun getDescription(): String
fun blockPipe(): Boolean
}
要求
- 如果管道中的任何
Completable
以错误结束,整个执行应该停止。;
- 当
run()
返回的 Completable
完成时,无论是错误还是成功,都应发出包含时间戳和 getDescription()
返回的字符串的日志;
我考虑过使用 Completable.concat { tasks }
,但我不知道如何调用 currentIterationTask.run()
以及如果 run()
导致错误以及日志记录如何使管道失败currentInterationTask.getDescription()
.
另一个想法是使用 andThen()
,但我不确定它是否会按需要工作:
tasks.forEachIndexed {
idx, task ->
var isBroken = false
task.run()
.doOnComplete {
logTaskCompletedSuccessfully(task)
}
.doOnError {
error ->
logTaskFailed(task, error)
isBroken = task.blockPipe()
}
.andThen {
if (!isBroken && idx < tasks.size) {
tasks[idx+1]
}
}
}
您可以通过多种方式进行:
fun run(): Completable {
return Completable.concat(tasks.map { task ->
task.run()
.doOnComplete { logTaskCompletedSuccessfully(task) }
.doOnError { error ->
logTaskFailed(task, error)
}
})
}
顺序执行由 concat
运算符实现,来自某些 Completable
的错误将使用 onError()
停止流,并且您将获得日志记录功能或成功或错误。
至于blockPipe()
标志,如果我理解正确的话,它是用来标志任务失败的,你应该中断流,如果是这样的话,对我来说似乎没有必要,因为任何失败任务,抛出 Exception
而不是举起标志,异常将用 onError
中断流。
另一种选择是使用更具反应性的方法而不是迭代列表中的任务,用 Observable
迭代它。
从 Observable
开始,迭代任务列表,然后 flatMap
每个任务到 Completable
。
此处实现了顺序执行,因为您没有对每个 Completable
应用任何调度程序,因此执行顺序得以保持。
fun run(): Completable {
return Observable.fromIterable(tasks)
.flatMapCompletable { task ->
task.run()
.doOnComplete { logTaskCompletedSuccessfully(task) }
.doOnError { error ->
logTaskFailed(task, error)
}
}
.subscribeOn(Schedulers.newThread() // Schedulers.io())
}
我有一个 BlockingTasks
的列表,我想按顺序 运行:
class BlockingTaskManager {
val tasks = mutableListOf<BlockingTask>()
fun run () : Completable {
/* What can I put here in order to run
all tasks and return a completable according with the
requirements described below? */
}
}
interface BlockingTask {
fun run () : Completable
fun getDescription(): String
fun blockPipe(): Boolean
}
要求
- 如果管道中的任何
Completable
以错误结束,整个执行应该停止。; - 当
run()
返回的Completable
完成时,无论是错误还是成功,都应发出包含时间戳和getDescription()
返回的字符串的日志;
我考虑过使用 Completable.concat { tasks }
,但我不知道如何调用 currentIterationTask.run()
以及如果 run()
导致错误以及日志记录如何使管道失败currentInterationTask.getDescription()
.
另一个想法是使用 andThen()
,但我不确定它是否会按需要工作:
tasks.forEachIndexed {
idx, task ->
var isBroken = false
task.run()
.doOnComplete {
logTaskCompletedSuccessfully(task)
}
.doOnError {
error ->
logTaskFailed(task, error)
isBroken = task.blockPipe()
}
.andThen {
if (!isBroken && idx < tasks.size) {
tasks[idx+1]
}
}
}
您可以通过多种方式进行:
fun run(): Completable {
return Completable.concat(tasks.map { task ->
task.run()
.doOnComplete { logTaskCompletedSuccessfully(task) }
.doOnError { error ->
logTaskFailed(task, error)
}
})
}
顺序执行由 concat
运算符实现,来自某些 Completable
的错误将使用 onError()
停止流,并且您将获得日志记录功能或成功或错误。
至于blockPipe()
标志,如果我理解正确的话,它是用来标志任务失败的,你应该中断流,如果是这样的话,对我来说似乎没有必要,因为任何失败任务,抛出 Exception
而不是举起标志,异常将用 onError
中断流。
另一种选择是使用更具反应性的方法而不是迭代列表中的任务,用 Observable
迭代它。
从 Observable
开始,迭代任务列表,然后 flatMap
每个任务到 Completable
。
此处实现了顺序执行,因为您没有对每个 Completable
应用任何调度程序,因此执行顺序得以保持。
fun run(): Completable {
return Observable.fromIterable(tasks)
.flatMapCompletable { task ->
task.run()
.doOnComplete { logTaskCompletedSuccessfully(task) }
.doOnError { error ->
logTaskFailed(task, error)
}
}
.subscribeOn(Schedulers.newThread() // Schedulers.io())
}