合并 Observables 列表并等待所有完成

Combine a list of Observables and wait until all completed

TL;DR 如何将Task.whenAll(List<Task>)转换成RxJava?

我现有的代码使用 Bolts 构建异步任务列表,并等待所有这些任务完成后再执行其他步骤。本质上,它构建了一个 List<Task> 和 returns 一个 Task,当列表中的 所有 任务完成时,它被标记为已完成,根据example on the Bolts site.

我正在寻找用 RxJava 替换 Bolts 并且我假设这种方法构建异步任务列表(大小事先不知道)并将它们全部包装成一个单个 Observable 是可能的,但我不知道如何。

我已经尝试查看 mergezipconcat 等...但无法着手处理我想要的 List<Observable>正在建立,因为如果我正确理解文档,他们似乎都适合一次只处理两个 Observables

我正在努力学习 RxJava 并且对它仍然很陌生所以如果这是一个明显的问题或在某处的文档中有解释请原谅我;我试过搜索。任何帮助将不胜感激。

您可能看过与 2 个 Observables 一起工作的 zip 运算符。

还有静态方法Observable.zip。它有一种对你有用的形式:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

您可以查看 javadoc for more.

听起来您正在寻找 Zip operator

它有几种不同的使用方式,让我们来看一个例子。假设我们有一些不同类型的简单可观察对象:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

等待它们的最简单方法是这样的:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

请注意,在 zip 函数中,参数具有与压缩的可观察对象类型相对应的具体类型。

也可以直接压缩可观察对象列表:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...或者将列表包装成 Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

然而,在这两种情况下,zip 函数只能接受一个 Object[] 参数,因为列表中的可观察对象的类型及其数量是事先不知道的。这意味着 zip 函数必须检查参数的数量并相应地转换它们。

无论如何,上面所有的例子最终都会打印1 Blah true

编辑: 使用 Zip 时,确保压缩的 Observables 全部发出相同数量的项目。在上面的示例中,所有三个可观察对象都发出了一个项目。如果我们将它们改成这样:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

然后 1, Blah, True2, Hello, True 将是唯一传递到 zip 函数的项目。项目 3 永远不会被压缩,因为其他可观察对象已经完成。

如果您有动态任务组合,您可以使用 flatMap。像这样:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Another good example of parallel execution

注意:我不太了解您对错误处理的要求。例如,如果只有一项任务失败了怎么办。我认为你应该验证这种情况。

我正在使用 JavaRx Observables 和 RxKotlin 在 Kotlin 中编写一些计算代码。我想观察要完成的可观察对象列表,同时给我更新进度和最新结果。最后是returns最好的计算结果。一个额外的要求是 运行 Observables 并行使用我所有的 cpu 核心。我最终得到了这个解决方案:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}

使用 Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

设置函数参数的类型很重要,否则会出现编译错误

最后一个参数类型随着参数个数的变化而变化: BiFunction 为 2 功能 3 为 3 功能 4 为 4 ...

提出的建议中,实际上将可观察到的结果相互结合,这可能是也可能不是想要的,但问题中没有问到。在这个问题中,所需要的只是执行每个操作,一个一个地执行或并行执行(未指定,但链接的 Bolts 示例是关于并行执行的)。此外,zip() 将在任何可观察对象完成时立即完成,因此它违反了要求。

对于 Observables 的并行执行,flatMap() is fine, but merge() would be more straight-forward. Note that merge will exit on error of any of the Observables, if you rather postpone the exit until all observables have finished, you should be looking at mergeDelayError().

一对一的话,我觉得应该用Observable.concat() static method。它的 javadoc 状态如下:

concat(java.lang.Iterable> sequences) Flattens an Iterable of Observables into one Observable, one after the other, without interleaving them

如果您不想并行执行,这听起来像是您所追求的。

另外,如果您只对完成任务感兴趣,而不是 return 值,您可能应该查看 Completable instead of Observable.

TLDR:对于任务的逐一执行和完成时的 oncompletion 事件,我认为 Completable.concat() 最适合。对于并行执行,Completable.merge() 或 Completable.mergeDelayError() 听起来像是解决方案。前者会在任何可完成的错误上立即停止,后者会执行它们,即使其中一个有错误,然后才报告错误。

我遇到了类似的问题,我需要从 rest call 中获取搜索项,同时还要从 RecentSearchProvider.AUTHORITY 中整合已保存的建议,并将它们组合到一个统一的列表中。我试图使用@MyDogTom 解决方案,不幸的是 RxJava 中没有 Observable.from。经过一些研究,我找到了适合我的解决方案。

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

我从 observables 数组中创建了一个 observable,其中包含来自 Internet 的建议和结果列表,具体取决于查询。之后,您只需使用 flatMapIterable 和 运行 使用平面图检查这些任务,将结果放入数组中,稍后可以将其提取到回收视图中。

如果你使用Project Reactor,你可以使用Mono.when

Mono.when(publisher1, publisher2)
.map(i-> {
    System.out.println("everything is done!");
    return i;
}).block()