如何使用 RxJava 协调 Completable 执行列表?

How to coordinate a list of Completable executions with RxJava?

我有一个 BlockingTasks 的列表,我想按顺序 运行:

class BlockingTaskManager {
    val tasks = mutableListOf<BlockingTask>()

    fun run () : Completable {
        /* What can I put here in order to run
           all tasks and return a completable according with the
           requirements described below? */
    }
}

interface BlockingTask {

    fun run () : Completable

    fun getDescription(): String

    fun blockPipe(): Boolean
}

要求


我考虑过使用 Completable.concat { tasks },但我不知道如何调用 currentIterationTask.run() 以及如果 run() 导致错误以及日志记录如何使管道失败currentInterationTask.getDescription().

另一个想法是使用 andThen(),但我不确定它是否会按需要工作:

    tasks.forEachIndexed {
        idx, task ->
        var isBroken = false
        task.run()
                .doOnComplete {
                    logTaskCompletedSuccessfully(task)
                }
                .doOnError {
                    error ->
                    logTaskFailed(task, error)
                    isBroken = task.blockPipe()
                }
                .andThen {
                    if (!isBroken && idx < tasks.size) {
                        tasks[idx+1]
                    }
                }
    }

您可以通过多种方式进行:

 fun run(): Completable {
    return Completable.concat(tasks.map { task ->
        task.run()
                .doOnComplete { logTaskCompletedSuccessfully(task) }
                .doOnError { error ->
                    logTaskFailed(task, error)
                }
    })
}

顺序执行由 concat 运算符实现,来自某些 Completable 的错误将使用 onError() 停止流,并且您将获得日志记录功能或成功或错误。

至于blockPipe()标志,如果我理解正确的话,它是用来标志任务失败的,你应该中断流,如果是这样的话,对我来说似乎没有必要,因为任何失败任务,抛出 Exception 而不是举起标志,异常将用 onError 中断流。


另一种选择是使用更具反应性的方法而不是迭代列表中的任务,用 Observable 迭代它。
Observable 开始,迭代任务列表,然后 flatMap 每个任务到 Completable。 此处实现了顺序执行,因为您没有对每个 Completable 应用任何调度程序,因此执行顺序得以保持。

 fun run(): Completable {
    return Observable.fromIterable(tasks)
            .flatMapCompletable { task ->
                task.run()
                        .doOnComplete { logTaskCompletedSuccessfully(task) }
                        .doOnError { error ->
                            logTaskFailed(task, error)
                        }
            }
            .subscribeOn(Schedulers.newThread() // Schedulers.io())
}